
    5iU.                        d Z ddlmZ ddlmZ ddlmZ ddlmZm	Z	m
Z
 ddlmZmZmZmZ ddlmZ ddlZ G d	 d
e      Ze G d d             Ze G d d             Zeegdf   Z	 eegdf   Z	 e G d d             Ze G d d             Ze G d d             Ze G d d             Ze G d d             Z G d de      Z G d de      Zy)z
MQTT Request Response module
    )Sequence)IntEnum)	dataclass)CallableUnionOptional)NativeResourcemqtt5mqtt
exceptions)FutureNc                        e Zd ZdZdZ	 dZ	 dZy)SubscriptionStatusEventTypezO
    The type of change to the state of a streaming operation subscription
    r         N)__name__
__module____qualname____doc__SUBSCRIPTION_ESTABLISHEDSUBSCRIPTION_LOSTSUBSCRIPTION_HALTED     ~/home/marpiech/ifpan-abm-pgxpred/analysis/marpiech-gwas-test/venv/lib/python3.12/site-packages/awscrt/mqtt_request_response.pyr   r      s.      !  r   r   c                   .    e Zd ZU dZdZeed<   dZded<   y)SubscriptionStatusEventaR  
    An event that describes a change in subscription status for a streaming operation.

    Args:
        type (SubscriptionStatusEventType):  The type of status change represented by the event
        error (Optional[Exception]):  Describes an underlying reason for the event.  Only set for SubscriptionLost and SubscriptionHalted.
    NtypezOptional[Exception]error)r   r   r   r   r   r   __annotations__r   r   r   r   r   r   '   s     )-D
%,#'E 'r   r   c                   *    e Zd ZU dZeed<   dZded<   y)IncomingPublishEventz
    An event that describes an incoming message on a streaming operation.

    Args:
        topic (str):  MQTT Topic that the response was received on.
        payload (Optional[bytes]):  The payload of the incoming message.
    topicNOptional[bytes]payloadr   r   r   r   strr    r%   r   r   r   r"   r"   4        J!%G%r   r"   c                   >    e Zd ZU dZeed<   dZded<   dZded<   d Zy)	StreamingOperationOptionsa  
    Configuration options for an MQTT-based streaming operation.

    Args:
        subscription_topic_filter (str):  Topic filter that the streaming operation should listen on
        subscription_status_listener (SubscriptionStatusListener): function object to invoke when the operation's subscription status changes
        incoming_publish_listener (IncomingPublishListener): function object to invoke when a publish packet arrives that matches the subscription topic filter
    subscription_topic_filterNz$Optional[SubscriptionStatusListener]subscription_status_listenerz!Optional[IncomingPublishListener]incoming_publish_listenerc                     t        | j                  t              sJ t        | j                        s| j                  J t        | j
                        s| j
                  J yyzE
        Stringently type-checks an instance's field values.
        N)
isinstancer+   r'   callabler,   r-   selfs    r   validatez"StreamingOperationOptions.validateZ   s^     $88#>>>99:d>_>_>ggg6674;Y;Y;aaa;a7r   )	r   r   r   r   r'   r    r,   r-   r4   r   r   r   r*   r*   L   s-      #"KO "HOEIBIbr   r*   c                   *    e Zd ZU dZeed<   dZded<   y)Responsez
    Encapsulates a response to an AWS IoT Core MQTT-based service request

    Args:
        topic (str):  MQTT Topic that the response was received on.
        payload (Optional[bytes]):  The payload of the response.
    r#   Nr$   r%   r&   r   r   r   r6   r6   c   r(   r   r6   c                   0    e Zd ZU dZeed<   dZded<   d Zy)ResponsePatha   
    A response path is a pair of values - MQTT topic and a JSON path - that describe how a response to
    an MQTT-based request may arrive.  For a given request type, there may be multiple response paths and each
    one is associated with a separate JSON schema for the response body.

    Args:
        topic (str):  MQTT topic that a response may arrive on.
        correlation_token_json_path (Optional[str]):  JSON path for finding correlation tokens within payloads that arrive on this path's topic.
    r#   NOptional[str]correlation_token_json_pathc                     t        | j                  t              sJ t        | j                  t              s| j                  J yyr/   )r0   r#   r'   r:   r2   s    r   r4   zResponsePath.validate~   s?     $**c***$::C@DDdDdDlllDl@r   )r   r   r   r   r'   r    r:   r4   r   r   r   r8   r8   p   s     J377mr   r8   c                   N    e Zd ZU dZded<   ded<   eed<   eed<   dZd	ed
<   d Zy)RequestOptionsa  
    Configuration options for an MQTT-based request-response operation.

    Args:
        subscription_topic_filters (Sequence[str]):  Set of topic filters that should be subscribed to in order to cover all possible response paths.  Sometimes using wildcards can cut down on the subscriptions needed; other times that isn't valid.
        response_paths (Sequence[ResponsePath]):  Set of all possible response paths associated with this request type.
        publish_topic (str): Topic to publish the request to once response subscriptions have been established.
        payload (bytes): Payload to publish to 'publishTopic' in order to initiate the request
        correlation_token (Optional[str]): Correlation token embedded in the request that must be found in a response message.  This can be null to support certain services which don't use correlation tokens.  In that case, the client only allows one token-less request at a time.
    zSequence[str]subscription_topic_filterszSequence[ResponsePath]response_pathspublish_topicr%   Nr9   correlation_tokenc                    t        | j                  t              sJ | j                  D ]  }t        |t              rJ  t        | j                  t              sJ | j                  D ]  }|j                           t        | j                  t              sJ t        | j                  t              sJ t        | j                  t              s| j                  J yyr/   )
r0   r>   r   r'   r?   r4   r@   r%   bytesrA   )r3   topic_filterresponse_paths      r   r4   zRequestOptions.validate   s     $998DDD ;; 	1LlC000	1 $--x888!00 	%M""$	% $,,c222$,,...$00#6$:P:P:XXX:X6r   )	r   r   r   r   r    r'   rC   rA   r4   r   r   r   r=   r=      s1    	 !0/,,N)--Yr   r=   c                   :    e Zd ZU dZeed<   eed<   dZded<   d Zy)	ClientOptionsa
  
    MQTT-based request-response client configuration options

    Args:
        max_request_response_subscriptions (int):  Maximum number of subscriptions that the client will concurrently use for request-response operations
        max_streaming_subscriptions (int):  Maximum number of subscriptions that the client will concurrently use for streaming operations
        operation_timeout_in_seconds (Optional[int]):  Duration, in seconds, that a request-response operation will wait for completion before giving up
    "max_request_response_subscriptionsmax_streaming_subscriptions<   zOptional[int]operation_timeout_in_secondsc                     t        | j                  t              sJ t        | j                  t              sJ t        | j                  t              sJ yr/   )r0   rH   intrI   rK   r2   s    r   r4   zClientOptions.validate   sF     $AA3GGG$::C@@@$;;SAAAr   N)r   r   r   r   rM   r    rK   r4   r   r   r   rG   rG      s&     ),+!$$46 /6Br   rG   c                   v     e Zd ZdZdeej                  ej                  f   de	f fdZ
defdZdefdZ xZS )ClientaL  
    MQTT-based request-response client tuned for AWS MQTT services.

    Supports streaming operations (listen to a stream of modeled events from an MQTT topic) and request-response
    operations (performs the subscribes, publish, and incoming publish correlation and error checking needed to
    perform simple request-response operations over MQTT).

    Args:
        protocol_client (Union[mqtt5.Client, mqtt.Connection]): MQTT client to use as transport
        client_options (ClientOptions): The ClientOptions dataclass to used to configure the new request response Client.

    protocol_clientclient_optionsc                 t   t        |t        j                        st        |t        j                        sJ t        |t
              sJ |j                          t        | !          t        |t        j                        rt        j                  ||      | _        y t        j                  ||      | _        y N)r0   r
   rO   r   
ConnectionrG   r4   super__init___awscrt'mqtt_request_response_client_new_from_5_binding)mqtt_request_response_client_new_from_311)r3   rP   rQ   	__class__s      r   rV   zClient.__init__   s     /5<<8JX\XgXg<hhh.-888!ou||4#KKO]klDM#MMo_mnDMr   optionsc           	          |j                          t               fd}t        j                  | j                  |j
                  |j                  |j                  |j                  |j                  |       S )a  
        Initiate an MQTT-based request-response async workflow

        Args:
            options (RequestOptions): Configuration options for the request to perform

        Returns:
            concurrent.futures.Future: A Future whose result will contain the topic and payload of a response
            to the request. The future will contain an exception if the request fails.
        c                     | dk7  r%j                  t        j                  |              y t        ||      }j	                  |       y )Nr   )r#   r%   )set_exceptionr   	from_coder6   
set_result)
error_coder#   r%   responsefutures       r   on_request_completez0Client.make_request.<locals>.on_request_complete   s=    Q$$Z%9%9*%EF#%A!!(+r   )
r4   r   rW   )mqtt_request_response_client_make_requestrY   r>   r?   r@   r%   rA   )r3   r\   re   rd   s      @r   make_requestzClient.make_request   si     		, 	99$--:A:\:\:A:P:P:A:O:O:A//:A:S:S:M	O r   c                     j                          fd}fd}t        j                  | j                  j                  ||      }t        |      S )a  
        Creates a new streaming operation

        Args:
            options (StreamingOperationOptions): Configuration options for the streaming operation

        Returns:
            StreamingOperation: a new streaming operation.  Opening the operation triggers the client to maintain
            an MQTT subscription for relevant events.  Matching publishes and subscription status changes are
            communicated by invoking configuration-controlled callbacks.
        c                     j                   <t        |       }|dk7  rt        j                  |      |_        j                  |       y y )Nr   )r,   r   r   r`   r   )
event_typerb   eventr\   s      r   on_subscription_status_eventz:Client.create_stream.<locals>.on_subscription_status_event
  sF    33?/
;?","6"6z"BEK44U;	 @r   c                 Z    j                   t        | |      }j                  |       y y rS   )r-   r"   )r#   r%   rk   r\   s      r   on_incoming_publish_eventz7Client.create_stream.<locals>.on_incoming_publish_event  s.    00<,UG<11%8 =r   )r4   rW   *mqtt_request_response_client_create_streamrY   r+   StreamingOperation)r3   r\   rl   rn   stream_bindings    `   r   create_streamzClient.create_stream   sP     		<	9
 !KKMM7<<>Z\uw ".11r   )r   r   r   r   r   r
   rO   r   rT   rG   rV   r=   rg   r*   rr   __classcell__r[   s   @r   rO   rO      sK    oellDOO.K(L o!.oN @2%> 2r   rO   c                   (     e Zd ZdZ fdZd Z xZS )rp   zT
    An operation that represents a stream of events broadcast to an MQTT topic
    c                 0    t         |           || _        y rS   )rU   rV   rY   )r3   bindingr[   s     r   rV   zStreamingOperation.__init__!  s    r   c                 B    t        j                  | j                         y)z
        Triggers the streaming operation to maintain an MQTT subscription for relevant events.  Until a stream is
        opened, no events can be received.
        N)rW   mqtt_streaming_operation_openrY   r2   s    r   openzStreamingOperation.open&  s    
 	--dmm<r   )r   r   r   r   rV   rz   rs   rt   s   @r   rp   rp     s     
=r   rp   )r   collections.abcr   enumr   dataclassesr   typingr   r   r   awscrtr	   r
   r   r   concurrent.futuresr   rW   r   r   r"   SubscriptionStatusListenerIncomingPublishListenerr*   r6   r8   r=   rG   rO   rp   r   r   r   <module>r      s@   %  ! , , : : % ' , 	( 	( 	( 	& 	& 	& &'>&?&EF  #$8#94#?@ 
 b b b, 	& 	& 	& m m m* Y Y YD B B B,Y2^ Y2x= =r   