"""
HTTP AsyncIO support

This module provides asyncio wrappers around the awscrt.http module.
All network operations in `awscrt.aio.http` are asynchronous and use Python's asyncio framework.
"""

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0.

import _awscrt
import awscrt.exceptions
from awscrt.http import (
    HttpClientConnectionBase, HttpRequest, HttpClientStreamBase, HttpProxyOptions,
    Http2Setting, HttpVersion
)
from awscrt.io import (
    ClientBootstrap, SocketOptions, TlsConnectionOptions, InputStream
)
import asyncio
from collections import deque
from io import BytesIO
from concurrent.futures import Future
from typing import List, Tuple, Optional, Callable, AsyncIterator
import threading


class AIOHttpClientConnectionUnified(HttpClientConnectionBase):
    """
    An async unified HTTP client connection for either a HTTP/1 or HTTP/2 connection.

    Use `AIOHttpClientConnectionUnified.new()` to establish a new connection.
    """

    @classmethod
    async def new(cls,
                  host_name: str,
                  port: int,
                  bootstrap: Optional[ClientBootstrap] = None,
                  socket_options: Optional[SocketOptions] = None,
                  tls_connection_options: Optional[TlsConnectionOptions] = None,
                  proxy_options: Optional[HttpProxyOptions] = None,
                  manual_window_management: bool = False,
                  initial_window_size: Optional[int] = None) -> "AIOHttpClientConnectionUnified":
        """
        Asynchronously establish a new AIOHttpClientConnectionUnified.

        Args:
            host_name (str): Connect to host.

            port (int): Connect to port.

            bootstrap (Optional [ClientBootstrap]): Client bootstrap to use when initiating socket connection.
                If None is provided, the default singleton is used.

            socket_options (Optional[SocketOptions]): Optional socket options.
                If None is provided, then default options are used.

            tls_connection_options (Optional[TlsConnectionOptions]): Optional TLS
                connection options. If None is provided, then the connection will
                be attempted over plain-text.

            proxy_options (Optional[HttpProxyOptions]): Optional proxy options.
                If None is provided then a proxy is not used.

            manual_window_management (bool): Set to True to manually manage the flow-control window
                of each stream. If False, the connection maintains flow-control windows such that
                no back-pressure is applied and data arrives as fast as possible. If True, the
                flow-control window of each stream shrinks as body data is received (headers,
                padding, and other metadata do not affect the window). `initial_window_size`
                determines the starting size of each stream's window. When a stream's window
                reaches 0, no further data is received until `update_window()` is called.
                For HTTP/2, this only controls stream windows; connection window is controlled
                by `conn_manual_window_management`. Default is False.

            initial_window_size (Optional[int]): The starting size of each stream's flow-control
                window. Required if `manual_window_management` is True, ignored otherwise.
                For HTTP/2, this becomes the `INITIAL_WINDOW_SIZE` setting and can be overridden
                by `initial_settings`. Must be <= 2^31-1 or connection fails. If set to 0 with
                `manual_window_management` True, streams start with zero window.
                Required if manual_window_management is True, ignored otherwise.

        Returns:
            AIOHttpClientConnectionUnified: A new unified HTTP client connection.
        """
        future = cls._generic_new(
            host_name,
            port,
            bootstrap,
            socket_options,
            tls_connection_options,
            proxy_options,
            asyncio_connection=True,
            manual_window_management=manual_window_management,
            initial_window_size=initial_window_size)
        return await asyncio.wrap_future(future)

    async def close(self) -> None:
        """Close the connection asynchronously.

        Shutdown is asynchronous. This call has no effect if the connection is already
        closing.

        Returns:
            None: When shutdown is complete.
        """
        _awscrt.http_connection_close(self._binding)
        await asyncio.wrap_future(self.shutdown_future)

    def request(self,
                request: 'HttpRequest',
                request_body_generator: AsyncIterator[bytes] = None,
                loop: Optional[asyncio.AbstractEventLoop] = None) -> 'AIOHttpClientStreamUnified':
        """Create `AIOHttpClientStreamUnified` to carry out the request/response exchange.

        Args:
            request (HttpRequest): Definition for outgoing request.
            request_body_generator (AsyncIterator[bytes], optional): Async iterator providing chunks of the request body.
                If provided, the body will be sent incrementally as chunks become available.
            loop (Optional[asyncio.AbstractEventLoop]): Event loop to use for async operations.
                If None, the current event loop is used.

        Returns:
            AIOHttpClientStreamUnified: Stream for the HTTP request/response exchange.
        """
        return AIOHttpClientStreamUnified(self, request, request_body_generator, loop)


class AIOHttpClientConnection(AIOHttpClientConnectionUnified):
    """
    An async HTTP/1.1 only client connection.

    Use `AIOHttpClientConnection.new()` to establish a new connection.
    """

    @classmethod
    async def new(cls,
                  host_name: str,
                  port: int,
                  bootstrap: Optional[ClientBootstrap] = None,
                  socket_options: Optional[SocketOptions] = None,
                  tls_connection_options: Optional[TlsConnectionOptions] = None,
                  proxy_options: Optional[HttpProxyOptions] = None,
                  manual_window_management: bool = False,
                  initial_window_size: Optional[int] = None,
                  read_buffer_capacity: Optional[int] = None) -> "AIOHttpClientConnection":
        """
        Asynchronously establish a new AIOHttpClientConnection.

        Args:
            host_name (str): Connect to host.

            port (int): Connect to port.

            bootstrap (Optional [ClientBootstrap]): Client bootstrap to use when initiating socket connection.
                If None is provided, the default singleton is used.

            socket_options (Optional[SocketOptions]): Optional socket options.
                If None is provided, then default options are used.

            tls_connection_options (Optional[TlsConnectionOptions]): Optional TLS
                connection options. If None is provided, then the connection will
                be attempted over plain-text.

            proxy_options (Optional[HttpProxyOptions]): Optional proxy options.
                If None is provided then a proxy is not used.

            manual_window_management (bool): If True, enables manual flow control window management.
                Default is False.

            initial_window_size (Optional[int]): Initial window size for flow control.
                Required if manual_window_management is True, ignored otherwise.

            read_buffer_capacity (Optional[int]): Capacity in bytes of the HTTP/1.1 connection's
                read buffer. The buffer grows when the flow-control window of the incoming stream
                reaches zero. Ignored if `manual_window_management` is False. A capacity that is
                too small may hinder throughput. A capacity that is too large may waste memory
                without improving throughput. If None or zero, a default value is used.

        Returns:
            AIOHttpClientConnection: A new HTTP client connection.
        """
        future = cls._generic_new(
            host_name,
            port,
            bootstrap,
            socket_options,
            tls_connection_options,
            proxy_options,
            expected_version=HttpVersion.Http1_1,
            asyncio_connection=True,
            manual_window_management=manual_window_management,
            initial_window_size=initial_window_size,
            read_buffer_capacity=read_buffer_capacity)
        return await asyncio.wrap_future(future)

    def request(self,
                request: 'HttpRequest',
                request_body_generator: AsyncIterator[bytes] = None,
                loop: Optional[asyncio.AbstractEventLoop] = None) -> 'AIOHttpClientStream':
        """Create `AIOHttpClientStream` to carry out the request/response exchange.

        Args:
            request (HttpRequest): Definition for outgoing request.
            request_body_generator (AsyncIterator[bytes], optional): Async iterator providing chunks of the request body.
                Not supported for HTTP/1.1 connections yet, use the request's body_stream instead.
            loop (Optional[asyncio.AbstractEventLoop]): Event loop to use for async operations.
                If None, the current event loop is used.

        Returns:
            AIOHttpClientStream: Stream for the HTTP request/response exchange.
        """
        return AIOHttpClientStream(self, request, loop)


class AIOHttp2ClientConnection(AIOHttpClientConnectionUnified):
    """
    An async HTTP/2 only client connection.

    Use `AIOHttp2ClientConnection.new()` to establish a new connection.
    """

    @classmethod
    async def new(cls,
                  host_name: str,
                  port: int,
                  bootstrap: Optional[ClientBootstrap] = None,
                  socket_options: Optional[SocketOptions] = None,
                  tls_connection_options: Optional[TlsConnectionOptions] = None,
                  proxy_options: Optional[HttpProxyOptions] = None,
                  initial_settings: Optional[List[Http2Setting]] = None,
                  on_remote_settings_changed: Optional[Callable[[List[Http2Setting]], None]] = None,
                  manual_window_management: bool = False,
                  initial_window_size: Optional[int] = None,
                  conn_manual_window_management: bool = False,
                  conn_window_size_threshold: Optional[int] = None,
                  stream_window_size_threshold: Optional[int] = None) -> "AIOHttp2ClientConnection":
        """
        Asynchronously establish an HTTP/2 client connection.
        Notes: to set up the connection, the server must support HTTP/2 and TlsConnectionOptions

        This class extends AIOHttpClientConnection with HTTP/2 specific functionality.

        HTTP/2 specific args:
            initial_settings (List[Http2Setting]): The initial settings to change for the connection.

            on_remote_settings_changed: Optional callback invoked once the remote peer changes its settings.
                And the settings are acknowledged by the local connection.
                The function should take the following arguments and return nothing:

                    *   `settings` (List[Http2Setting]): List of settings that were changed.

            manual_window_management (bool): If True, enables manual flow control window management.
                Default is False.

            initial_window_size (Optional[int]): Initial window size for flow control.
                Required if manual_window_management is True, ignored otherwise.

            conn_manual_window_management (bool): If True, enables manual connection-level flow control
                for the entire HTTP/2 connection. When enabled, the connection's flow-control window
                shrinks as body data is received across all streams. The initial connection window is
                65,535 bytes. When the window reaches 0, all streams stop receiving data until
                `update_window()` is called to increment the connection's window.
                Note: Padding in DATA frames counts against the window, but window updates for padding
                are sent automatically even in manual mode. Default is False.

            conn_window_size_threshold (Optional[int]): Threshold for sending connection-level WINDOW_UPDATE
                frames. Ignored if `conn_manual_window_management` is False. When the connection's window
                is above this threshold, WINDOW_UPDATE frames are batched. When it drops below, the update
                is sent. Default is 32,767 (half of the initial 65,535 window).

            stream_window_size_threshold (Optional[int]): Threshold for sending stream-level WINDOW_UPDATE
                frames. Ignored if `manual_window_management` is False. When a stream's window is above
                this threshold, WINDOW_UPDATE frames are batched. When it drops below, the update is sent.
                Default is half of `initial_window_size`.
        """
        future = cls._generic_new(
            host_name,
            port,
            bootstrap,
            socket_options,
            tls_connection_options,
            proxy_options,
            expected_version=HttpVersion.Http2,
            initial_settings=initial_settings,
            on_remote_settings_changed=on_remote_settings_changed,
            asyncio_connection=True,
            manual_window_management=manual_window_management,
            initial_window_size=initial_window_size,
            conn_manual_window_management=conn_manual_window_management,
            conn_window_size_threshold=conn_window_size_threshold,
            stream_window_size_threshold=stream_window_size_threshold)
        return await asyncio.wrap_future(future)

    def request(self,
                request: 'HttpRequest',
                request_body_generator: AsyncIterator[bytes] = None,
                loop: Optional[asyncio.AbstractEventLoop] = None) -> 'AIOHttp2ClientStream':
        """Create `AIOHttp2ClientStream` to carry out the request/response exchange.

        Args:
            request (HttpRequest): Definition for outgoing request.
            request_body_generator (AsyncIterator[bytes], optional): Async iterator providing chunks of the request body.
                If provided, the body will be sent incrementally as chunks become available from the iterator.
            loop (Optional[asyncio.AbstractEventLoop]): Event loop to use for async operations.
                If None, the current event loop is used.

        Returns:
            AIOHttp2ClientStream: Stream for the HTTP/2 request/response exchange.
        """
        return AIOHttp2ClientStream(self, request, request_body_generator, loop)

    def update_window(self, increment_size: int) -> None:
        """
        Update the connection's flow control window.

        Args:
            increment_size (int): Number of bytes to increment the window by.
        """
        _awscrt.http2_connection_update_window(self._binding, increment_size)


class AIOHttpClientStreamUnified(HttpClientStreamBase):
    __slots__ = (
        '_response_status_future',
        '_response_headers_future',
        '_chunk_futures',
        '_received_chunks',
        '_completion_future',
        '_stream_completed',
        '_status_code',
        '_loop',
        '_deque_lock')

    def __init__(self,
                 connection: AIOHttpClientConnection,
                 request: HttpRequest,
                 request_body_generator: AsyncIterator[bytes] = None,
                 loop: Optional[asyncio.AbstractEventLoop] = None) -> None:

        # Initialize the parent class
        http2_manual_write = request_body_generator is not None and connection.version is HttpVersion.Http2
        super()._init_common(connection, request, http2_manual_write=http2_manual_write)

        # Attach the event loop for async operations
        if loop is None:
            # Use the current event loop if none is provided
            loop = asyncio.get_event_loop()
        elif not isinstance(loop, asyncio.AbstractEventLoop):
            raise TypeError("loop must be an instance of asyncio.AbstractEventLoop")
        self._loop = loop

        # Lock to protect check-then-act sequences on deques for thread safety in free-threaded Python
        self._deque_lock = threading.Lock()
        self._chunk_futures = deque()
        self._received_chunks = deque()
        self._stream_completed = False

        # Create futures for async operations
        self._completion_future = Future()
        self._remote_completion_future = Future()
        self._response_status_future = Future()
        self._response_headers_future = Future()
        self._status_code = None

        self._request_body_generator = request_body_generator
        if self._request_body_generator is not None:
            self._writer = self._loop.create_task(self._set_request_body_generator(self._request_body_generator))

        # Activate the stream immediately
        _awscrt.http_client_stream_activate(self)

    def _on_response(self, status_code: int, name_value_pairs: List[Tuple[str, str]]) -> None:
        self._status_code = status_code
        # invoked from the C thread, so we need to schedule the result setting on the event loop
        self._response_status_future.set_result(status_code)
        self._response_headers_future.set_result(name_value_pairs)

    def _on_body(self, chunk: bytes) -> None:
        """Process body chunk - called from C thread."""
        with self._deque_lock:
            if self._chunk_futures:
                future = self._chunk_futures.popleft()
            else:
                self._received_chunks.append(chunk)
                return

        # Set result outside lock (Future is thread-safe)
        future.set_result(chunk)

    def _resolve_pending_chunk_futures(self) -> None:
        """Helper to resolve all pending chunk futures with empty bytes.

        This indicates end of stream to any waiting get_next_response_chunk() calls.
        Must be called when either the stream completes or remote peer sends END_STREAM.
        """
        # Resolve all pending chunk futures with lock protection
        with self._deque_lock:
            pending_futures = list(self._chunk_futures)
            self._chunk_futures.clear()

        # Set results outside lock (Future is thread-safe)
        for future in pending_futures:
            future.set_result(b"")

    def _on_complete(self, error_code: int) -> None:
        """Set the completion status of the stream."""
        if error_code == 0:
            self._completion_future.set_result(self._status_code)
        else:
            self._completion_future.set_exception(awscrt.exceptions.from_code(error_code))

        self._resolve_pending_chunk_futures()

    def _on_h2_remote_end_stream(self) -> None:
        """Called when the remote peer has finished sending (HTTP/2 only)."""
        self._remote_completion_future.set_result(None)
        self._resolve_pending_chunk_futures()

    async def _set_request_body_generator(self, body_iterator: AsyncIterator[bytes]):
        ...

    async def get_response_status_code(self) -> int:
        """Get the response status code asynchronously.

        Returns:
            int: The response status code.
        """
        return await asyncio.wrap_future(self._response_status_future, loop=self._loop)

    async def get_response_headers(self) -> List[Tuple[str, str]]:
        """Get the response headers asynchronously.

        Returns:
            List[Tuple[str, str]]: The response headers as a list of (name, value) tuples.
        """
        return await asyncio.wrap_future(self._response_headers_future, loop=self._loop)

    async def get_next_response_chunk(self) -> bytes:
        """Get the next chunk from the response body.

        Returns:
            bytes: The next chunk of data from the response body.
                Returns empty bytes when the stream is completed and no more chunks are left.
        """
        with self._deque_lock:
            if self._received_chunks:
                return self._received_chunks.popleft()
            elif self._completion_future.done() or self._remote_completion_future.done():
                return b""
            else:
                future = Future()
                self._chunk_futures.append(future)

        # Await outside lock
        return await asyncio.wrap_future(future, loop=self._loop)

    async def wait_for_completion(self) -> int:
        """Wait asynchronously for the stream to complete.

        Returns:
            int: The response status code.
        """
        return await asyncio.wrap_future(self._completion_future, loop=self._loop)


class AIOHttpClientStream(AIOHttpClientStreamUnified):
    """Async HTTP stream that sends a request and receives a response.

    Create an AIOHttpClientStream with `AIOHttpClientConnection.request()`.

    Attributes:
        connection (AIOHttpClientConnection): This stream's connection.

        completion_future (asyncio.Future): Future that will contain
            the response status code (int) when the request/response exchange
            completes. If the exchange fails to complete, the Future will
            contain an exception indicating why it failed.

    Notes:
        All async method on a stream (await stream.next(), etc.) must be performed in the
        thread that owns the event loop used to create the stream
    """

    def __init__(self, connection: AIOHttpClientConnection, request: HttpRequest,
                 loop: Optional[asyncio.AbstractEventLoop] = None) -> None:
        """Initialize an HTTP client stream.

        Args:
            connection (AIOHttpClientConnection): The connection to send the request on.
            request (HttpRequest): The HTTP request to send.
            loop (Optional[asyncio.AbstractEventLoop]): Event loop to use for async operations.
                If None, the current event loop is used.
        """
        super().__init__(connection, request, loop=loop)


class AIOHttp2ClientStream(AIOHttpClientStreamUnified):
    """HTTP/2 stream that sends a request and receives a response.

    Create an AIOHttp2ClientStream with `AIOHttp2ClientConnection.request()`.
    """

    def __init__(self,
                 connection: AIOHttpClientConnection,
                 request: HttpRequest,
                 request_body_generator: AsyncIterator[bytes] = None,
                 loop: Optional[asyncio.AbstractEventLoop] = None) -> None:
        super().__init__(connection, request, request_body_generator=request_body_generator, loop=loop)

    async def _write_data(self, body, end_stream):
        future = Future()
        body_stream = InputStream.wrap(body, allow_none=True)

        def on_write_complete(error_code: int) -> None:
            if future.cancelled():
                # the future was cancelled, so we don't need to set the result or exception
                return
            if error_code:
                future.set_exception(awscrt.exceptions.from_code(error_code))
            else:
                future.set_result(None)

        _awscrt.http2_client_stream_write_data(self, body_stream, end_stream, on_write_complete)
        await asyncio.wrap_future(future, loop=self._loop)

    async def _set_request_body_generator(self, body_iterator: AsyncIterator[bytes]):
        try:
            async for chunk in body_iterator:
                await self._write_data(BytesIO(chunk), False)
        finally:
            await self._write_data(None, True)
