o
    wl~i.                     @  s   d Z ddlmZ ddlZddlmZ ddlmZ ddlm	Z	 ddl
mZ dd	lmZ dd
lmZ ddlmZ dZG dd de	Zdd Zdd ZdS )zAmazon SQS Connection.    )annotationsN)
Serializer)	transform)AsyncAWSQueryConnection)
AWSRequest   )boto3)AsyncMessage)
AsyncQueue)AsyncSQSConnectionc                      s  e Zd ZdZ				d6 fdd	Zdd Zdd	 Zd7d
dZ	d7ddZd8ddZ	dd Z
d9ddZd:ddZ			d;ddZd:ddZd:ddZ	d:dd Z	d7d!d"Zd:d#d$Z	d:d%d&Zd:d'd(Zd<d*d+Zd:d,d-ZeZd.d/ Zd:d0d1Z	d:d2d3Zd:d4d5Z  ZS )=r   zAsync SQS Connection.r   Nc                   s^   t d u rtdt j|f||d| |r|ndg| _t|tr(|g| _d S |p+g | _d S )Nzboto3 is not installed)region_namedebugApproximateReceiveCount)r   ImportErrorsuper__init__message_system_attribute_names
isinstancestrmessage_attribute_names)selfsqs_connectionr   regionr   r   kwargs	__class__ l/var/www/newdalilibackend/backend/venv/lib/python3.10/site-packages/kombu/asynchronous/aws/sqs/connection.pyr      s   	zAsyncSQSConnection.__init__c                 C  s^   |  }|r
||d< d|i}i }| dkrd|i}| dkr$d|d< td	|||d|S )
NActiondatagetparamspostz0application/x-www-form-urlencoded; charset=utf-8Content-Type)methodurlheadersr   )copylowerr   )r   	operationr!   	queue_urlr$   param_payloadr&   r   r   r   _create_query_request/   s   z(AsyncSQSConnection._create_query_requestc                 C  s   |  }||d< | jjj}||}| jjj}i }|jd }d| }	|	|d< d|jd |j	}
|
|d< t
| |d}|jd	tj}td||d
|S )NQueueUrljsonVersionzapplication/x-amz-json-r#   z{}.{}targetPrefixzX-Amz-Target)r   r&   r$   )r$   r%   r   )r'   r   metaservice_modeloperation_model	_endpointhostmetadataformatnamejsondumpsencodehttpr    r   DEFAULT_METHODr   )r   r)   r!   r*   r1   r2   r%   r&   json_versioncontent_typetargetr+   r$   r   r   r   _create_json_request@   s0   




z'AsyncSQSConnection._create_json_requestc                 C  s   | j j}| j jj}|j}	i |pi ||	i }
|	dkr%| ||
||}n|	dkr1| ||
|}ntd|	 d|j	
 dkrBdnd}|j|||d | }| j||d	S )
a  Override make_request to support different protocols.

        botocore has changed the default protocol of communicating
        with SQS backend from 'query' to 'json', so we need a special
        implementation of make_request for SQS. More information on this can
        be found in: https://github.com/celery/kombu/pull/1807.

        protocol_params: Optional[dict[str, dict]] of per-protocol additional parameters.
            Supported for the SQS query to json protocol transition.
        queryr8   zUnsupported protocol: .r    zpresign-urlstandard)signing_typecallback)r   _request_signerr0   r1   protocolr    r,   r@   	Exceptionr$   r(   signprepare_mexe)r   operation_namer!   r*   verbrF   protocol_paramssignerr1   rH   
all_paramsrequestrD   prepared_requestr   r   r   make_requestb   s&   
zAsyncSQSConnection.make_requestc                 C  s*   d|i}|rt |d|d< | jd||dS )N	QueueNamedDefaultVisibilityTimeoutCreateQueuerE   )r6   
get_object)r   
queue_namevisibility_timeoutrF   r!   r   r   r   create_queue   s   zAsyncSQSConnection.create_queueFc                 C  s   | j dd |j|dS )NDeleteQueuerE   
get_statusid)r   queueforce_deletionrF   r   r   r   delete_queue   s   zAsyncSQSConnection.delete_queuec                 C  s   | j j|d}|d S )N)rU   r-   )r   get_queue_url)r   ra   resr   r   r   rd      s   z AsyncSQSConnection.get_queue_urlAllc                 C     | j dd|i|j|dS )NGetQueueAttributesAttributeNamerE   )rY   r`   )r   ra   	attributerF   r   r   r   get_queue_attributes   s   z'AsyncSQSConnection.get_queue_attributesc              	   C  s*   | j di |j|d||ii||dddS )NSetQueueAttribute
Attributes)zAttribute.NamezAttribute.Valuer8   rA   )rF   rO   r^   )r   ra   rj   valuerF   r   r   r   set_queue_attribute   s   
z&AsyncSQSConnection.set_queue_attributer   c              	   C  s   d|i}i i d}	|d ur|n| j }
| jr| jnd }|r ||d< |
r:|	d dt|
i |	d tdt|
i |rT|	d dt|i |	d tdt|i |d ur\||d	< | jd
|dtfg||||	dS )NMaxNumberOfMessages)rA   r8   VisibilityTimeoutr8   MessageSystemAttributeNamesrA   MessageSystemAttributeNameMessageAttributeNamesWaitTimeSecondsReceiveMessageMessage)rF   parentrO   )r   r   updatelist_query_object_encodeget_listr	   )r   ra   r*   number_messagesr[   
attributeswait_time_secondsrF   r!   proto_paramsattrsmsg_attr_namesr   r   r   receive_message   s&   
z"AsyncSQSConnection.receive_messagec                 C  s   |  |||S N)delete_message_from_handler   ra   receipt_handlerF   r   r   r   delete_message   s   z!AsyncSQSConnection.delete_messagec                 C  sB   ddd |D it ddd |D id}| jdi |jd||d	S )
NEntriesc                 S     g | ]	}|j |jd qS )IdReceiptHandler`   r   .0mr   r   r   
<listcomp>   s    z;AsyncSQSConnection.delete_message_batch.<locals>.<listcomp>DeleteMessageBatchRequestEntryc                 S  r   r   r   r   r   r   r   r      s    rn   DeleteMessageBatchPOSTrN   rF   rO   r|   rY   r`   )r   ra   messagesrF   p_paramsr   r   r   delete_message_batch   s   z'AsyncSQSConnection.delete_message_batchc                 C  s   | j dd|i||dS )NDeleteMessager   rE   )r_   r   r   r   r   r      s   z-AsyncSQSConnection.delete_message_from_handlec                 C  s.   d|i}|rt ||d< | jd||jd|dS )NMessageBodyDelaySecondsSendMessager   rN   rF   )intrY   r`   )r   ra   message_contentdelay_secondsrF   r!   r   r   r   send_message   s   zAsyncSQSConnection.send_messagec              
   C  sn   i }t |D ]%\}}d|d  }|| d|d | d|d | d|d i q| jd||jd	|d
S )NzSendMessageBatchRequestEntry.r   z.Idr   z.MessageBodyz.DelaySeconds   SendMessageBatchr   r   )	enumeraterz   rY   r`   )r   ra   r   rF   r!   imsgprefixr   r   r   send_message_batch   s   z%AsyncSQSConnection.send_message_batchc                 C  s   | j d||d|j|dS )NChangeMessageVisibility)r   rr   rE   r^   )r   ra   r   r[   rF   r   r   r   change_message_visibility   s   z,AsyncSQSConnection.change_message_visibilityc                 C  s<   dd |D }d|it d|id}| jdi |jd||dS )	Nc                 S  s(   g | ]}|d  j |d  j|d dqS )r   r   )r   r   rr   r   )r   tr   r   r   r     s    zFAsyncSQSConnection.change_message_visibility_batch.<locals>.<listcomp>r   (ChangeMessageVisibilityBatchRequestEntryrn   ChangeMessageVisibilityBatchr   r   r   )r   ra   r   rF   entriesr   r   r   r   change_message_visibility_batch  s   
z2AsyncSQSConnection.change_message_visibility_batch c                 C  s(   i }|r||d< | j d|dtfg|dS )NQueueNamePrefix
ListQueuesr-   rE   )r}   r
   )r   r   rF   r!   r   r   r   get_all_queues  s   z!AsyncSQSConnection.get_all_queuesc                 C  s   |  |t| j||S r   )r   r   _on_queue_ready)r   rZ   rF   r   r   r   	get_queue  s   zAsyncSQSConnection.get_queuec                   s   t  fdd|D d S )Nc                 3  s     | ]}|j  r|V  qd S r   )r%   endswith)r   qr7   r   r   	<genexpr>%  s    z5AsyncSQSConnection._on_queue_ready.<locals>.<genexpr>)next)r   r7   queuesr   r   r   r   #  s   z"AsyncSQSConnection._on_queue_readyc                 C  s   | j dd|jidtfg|dS )NListDeadLetterSourceQueuesr-   rE   )r}   r%   r
   )r   ra   rF   r   r   r   get_dead_letter_source_queues(  s
   
z0AsyncSQSConnection.get_dead_letter_source_queuesc                 C  s   | j d|||d|j|dS )NAddPermission)LabelAWSAccountId
ActionNamerE   r^   )r   ra   labelaws_account_idaction_namerF   r   r   r   add_permission/  s   z!AsyncSQSConnection.add_permissionc                 C  rg   )NRemovePermissionr   rE   r^   )r   ra   r   rF   r   r   r   remove_permission9  s   z$AsyncSQSConnection.remove_permission)r   NNN)NN)FN)rf   Nr   )r   NNNN)r   N)__name__
__module____qualname____doc__r   r,   r@   rT   r\   rc   rd   rk   rp   r   r   r   r   r   r   r   r   r   r   lookupr   r   r   r   __classcell__r   r   r   r   r      sL    
"#













	

	


r   c                 C  s"   i }t |d|  dd | D S )Nr   c                 S  s   i | ]\}}||qS r   r   )r   kvr   r   r   
<dictcomp>B  s    z(_query_object_encode.<locals>.<dictcomp>)_query_object_encode_partitems)r   r!   r   r   r   r|   ?  s   r|   c                 C  s   |r| dn|}t |ttfr't|D ]\}}t| | |d  | qd S t |trA| D ]\}}t| | | | q0d S t|| |< d S )NrB   r   )r   r{   tupler   r   dictr   r   )r!   r   partdottedr   itemkeyro   r   r   r   r   E  s   
r   )r   
__future__r   r8   botocore.serializer   viner   !kombu.asynchronous.aws.connectionr   kombu.asynchronous.aws.extr   extr   messager	   ra   r
   __all__r   r|   r   r   r   r   r   <module>   s       -