o
    wl~iA                     @  s  d Z ddlmZ ddlZddlmZmZmZ ddlmZ ddl	Z	ddl	m
Z
mZmZ ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZmZ ddlmZ ddlmZ ddlmZ ddlmZ dZ dZ!G dd dZ"G dd dej#Z#G dd dej$Z$dS )a  MongoDB transport module for kombu.

Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: Yes
* Supports Priority: Yes
* Supports TTL: Yes

Connection String
=================
 *Unreviewed*

Transport Options
=================

* ``connect_timeout``,
* ``ssl``,
* ``ttl``,
* ``capped_queue_size``,
* ``default_hostname``,
* ``default_port``,
* ``default_database``,
* ``messages_collection``,
* ``routing_collection``,
* ``broadcast_collection``,
* ``queues_collection``,
* ``calc_queue_size``,
    )annotationsN)datetime	timedeltatimezone)Empty)MongoClienterrors
uri_parser)
CursorType)VersionMismatch)_detect_environment)bytes_to_str)dumpsloads)cached_property)maybe_sanitize_url   )virtualto_rabbitmq_queue_argumentsz3Kombu requires MongoDB version 1.3+ (server is {0})zKKombu requires MongoDB version 2.2+ (server is {0}) for TTL indexes supportc                   @  sF   e Zd ZdZdd Zdd Zdd Zdd	d
Zdd Zdd Z	e	Z
dS )BroadcastCursorzCursor for broadcast queues.c                 C  s   || _ d| _| jdd d S )Nr   F)rewind)_cursor_offsetpurge)selfcursor r   ^/var/www/newdalilibackend/backend/venv/lib/python3.10/site-packages/kombu/transport/mongodb.py__init__D   s   zBroadcastCursor.__init__c                 C  s   | j ji | j S N)r   
collectioncount_documentsr   r   r   r   r   get_sizeI   s   zBroadcastCursor.get_sizec                 C  s   | j   d S r    )r   closer#   r   r   r   r%   L   s   zBroadcastCursor.closeTc                 C  s2   |r| j   | j ji | _| j | j| _ d S r    )r   r   r!   r"   r   skip)r   r   r   r   r   r   O   s   
zBroadcastCursor.purgec                 C  s   | S r    r   r#   r   r   r   __iter__W   s   zBroadcastCursor.__iter__c              
   C  sd   	 zt | j}W n tjjy' } zdt|v r"|   W Y d }~q  d }~ww 	 |  jd7  _|S )NTznot valid at serverr   )nextr   pymongor   OperationFailurestrr   r   )r   msgexcr   r   r   __next__Z   s   
zBroadcastCursor.__next__N)T)__name__
__module____qualname____doc__r   r$   r%   r   r'   r.   r(   r   r   r   r   r   A   s    
r   c                      s\  e Zd ZdZdZi ZdZdZdZdZ	dZ
dZdZdZd	Zd
ZdZdZejjd Z fddZdd Zdd Z fddZdd Zdd Zdd Zdd Zdd Z fd d!ZdGd#d$Zd%d& Z d'd( Z!dGd)d*Z"d+d, Z#d-d. Z$d/d0 Z%e&d1d2 Z'e&d3d4 Z(e&d5d6 Z)e&d7d8 Z*e&d9d: Z+d;d< Z,d=d> Z-d?d@ Z.dAdB Z/dCdD Z0dEdF Z1  Z2S )HChannelzMongoDB Channel.TFNi z	127.0.0.1ii  kombu_defaultmessageszmessages.routingzmessages.broadcastzmessages.queues)connect_timeoutsslttlcapped_queue_sizedefault_hostnamedefault_portdefault_databasemessages_collectionrouting_collectionbroadcast_collectionqueues_collectioncalc_queue_sizec                   s"   t  j|i | i | _| j d S r    )superr   _broadcast_cursorsclient)r   vargskwargs	__class__r   r   r      s   
zChannel.__init__c              	   K  s8   | j r| jjd|id||| |ddidd d S d S )N_id$set	x-expires)rI   options	expire_atTupsert)r8   queues
update_one_get_queue_expire)r   queuerF   r   r   r   
_new_queue   s   	
zChannel._new_queuec                 C  s~   || j v rz	t| |}W n ty   d }Y nw | jjd|idtjfgd}| jr0| 	| |d u r7t
 tt|d S )NrS   priority)sortpayload)_fanout_queuesr(   _get_broadcast_cursorStopIterationr5   find_one_and_deleter)   	ASCENDINGr8   _update_queues_expirer   r   r   )r   rS   r,   r   r   r   _get   s   


zChannel._getc                   s:   | j s	t |S || jv r| | S | jd|iS NrS   )rA   rB   _sizerX   rY   r$   r5   r"   r   rS   rG   r   r   r`      s
   
zChannel._sizec                 K  sr   t ||| j|ddd}| jr1| |d|d< | |}|d ur1|d d u s-||d k r1||d< | j| d S )NT)reverse)rW   rS   rU   zx-message-ttlrM   )r   _get_message_priorityr8   rR   _get_message_expirer5   
insert_one)r   rS   messagerF   data
msg_expirer   r   r   _put   s   
zChannel._putc                 K  s   | j t||d d S )N)rW   rS   )	broadcastre   r   )r   exchangerf   routing_keyrF   r   r   r   _put_fanout   s   zChannel._put_fanoutc                 C  s:   |  |}|| jv r| |  |S | jd|i |S r_   )r`   rX   rY   r   r5   delete_many)r   rS   sizer   r   r   _purge   s   

zChannel._purgec                 C  s:   t | jj| d }| jd|i}|t dd |D B S )Ntablerk   c                 s  s&    | ]}|d  |d |d fV  qdS )rl   patternrS   Nr   ).0rr   r   r   	<genexpr>   s
    
z$Channel.get_table.<locals>.<genexpr>)	frozensetstate	exchangesroutingfind)r   rk   localRoutesbrokerRoutesr   r   r   	get_table   s   

zChannel.get_tablec                 C  sp   |  |jdkr| |||| || j|< ||||d}| }| jr+| |d|d< | jj|d|idd d S )Nfanout)rk   rS   rl   rr   rK   rM   rJ   TrN   )	typeoftype_create_broadcast_cursorrX   copyr8   rR   ry   rQ   )r   rk   rl   rr   rS   lookuprg   r   r   r   _queue_bind   s   
zChannel._queue_bindc                   s   | j d|i | jr| jd|i t j|fi | || jv rAz| j	|}W n
 t
y4   Y d S w |  | j	| d S d S )NrS   rI   )ry   rn   r8   rP   
delete_onerB   queue_deleterX   rC   popKeyErrorr%   )r   rS   rF   r   rG   r   r   r     s   
zChannel.queue_delete
mongodb://c                 C  s  | j j}|j}|drd}d| }||s|| }|t|d  s(|| j7 }|jrMd|vrM|d\}}|j}|jrC|d|j 7 }|d | d | }|j	rS|j	n| j
}tj||dd}|d	 pd|j}	|	d
v rl| j}	d| j| jryt| jd nd d}
|
|d  i }|
 D ]K\}}t|trt|dkr|d n|}|||< | }||vs|| |kr|||< q|| |krqtd| d| d||d|d| d q|}
| |
}
d|
v r|
d ||	|
fS )Nzsrv://zmongodb+srv://zmongodb+@z://:F)validatedatabase)/NTi  )auto_start_requestr7   connectTimeoutMSrL   r   r   z,MongoDB transport: Option conflict for key 'z' and 'z' with different values: z vs z. Using value for 'z'.tlsr7   )
connectionrD   hostname
startswithlenr:   useridsplitpasswordportr;   r	   	parse_urivirtual_hostr<   r7   r6   intupdateitems
isinstancelistlowerwarningswarnget_prepare_client_optionsr   )r   schemerD   r   headtailcredentialsr   parseddbnamerL   
normalizedkvvallkr   r   r   
_parse_uri  sd   


"



zChannel._parse_uric                 C  sB   t jdkr|dd  t|dtrt jj}||d  |d< |S )N   r   readpreference)r)   version_tupler   r   r   r   read_preferences_MONGOS_MODES)r   rL   modesr   r   r   r   _  s   
zChannel._prepare_client_optionsc                 K  s   t |fi |S r    r   )r   	argumentsrF   r   r   r   prepare_queue_argumentsg  s   zChannel.prepare_queue_argumentsc                 C  s   | j |d\}}}||d< t }|dkrddlm} |  n|dkr,ddlm} |  tdi |}|| }	| d }
|
	d	d }
t
tt|
	d
}|dk rYtt|
| jrg|dk rgtt|
|	S )N)r   hostgeventr   )monkeyeventlet)monkey_patchversion-.)r   r   )   r   r   )r   r   r   r   	patch_allr   r   r   server_infor   tuplemapr   r   E_SERVER_VERSIONformatr8   E_NO_TTL_INDEXES)r   r   r   r   confenvr   r   	mongoconnr   version_strr   r   r   r   _openj  s&   
zChannel._openc                 C  s*   | j | v r	dS |j| j | jdd dS )z0Create capped collection for broadcast messages.NT)ro   capped)r?   list_collection_namescreate_collectionr9   r   r   r   r   r   _create_broadcast  s   
zChannel._create_broadcastc                 C  s   || j  }|jg ddd || j dg || j }|ddg | jrC|jdgdd |jdgdd || j jdgdd d	S d	S )
zEnsure indexes on collections.)rS   r   )rU   r   )rI   r   T)
backgroundr   )rk   r   )rM   r   r   )expireAfterSecondsN)r=   create_indexr?   r>   r8   r@   )r   r   r5   ry   r   r   r   _ensure_indexes  s   



zChannel._ensure_indexesc                 C  s    |   }| | | | |S )zActually creates connection.)r   r   r   r   r   r   r   _create_client  s   

zChannel._create_clientc                 C  s   |   S r    )r   r#   r   r   r   rD     s   zChannel.clientc                 C     | j | j S r    )rD   r=   r#   r   r   r   r5        zChannel.messagesc                 C  r   r    )rD   r>   r#   r   r   r   ry     r   zChannel.routingc                 C  r   r    )rD   r?   r#   r   r   r   rj     r   zChannel.broadcastc                 C  r   r    )rD   r@   r#   r   r   r   rP     r   zChannel.queuesc              	   C  s6   z| j | W S  ty   | | j| d d | Y S w r    )rC   r   r   rX   ra   r   r   r   rY     s   zChannel._get_broadcast_cursorc                 C  sR   t jdkrd|itjd}nd|idd}| jjdi |}t| }| j|< |S )Nr   rS   )filtercursor_typeT)querytailabler   )r)   r   r
   TAILABLErj   rz   r   rC   )r   rk   rl   rr   rS   r   r   retr   r   r   r     s   
z Channel._create_broadcast_cursorc                 C  s4   | di  d}|d ur|  tt|d S d S )N
properties
expirationmilliseconds)r   get_nowr   r   )r   rf   valuer   r   r   rd     s   zChannel._get_message_expirec              	   C  sl   t |tr| jd|i}|sdS |d }n|}z|d | }W n ttfy,   Y dS w |  t|d S )zGet expiration header named `argument` of queue definition.

        Note:
        ----
            `queue` must be either queue name or options itself.
        rI   NrL   r   r   )r   r+   rP   find_oner   	TypeErrorr   r   )r   rS   argumentdocrg   r   r   r   r   rR     s   

zChannel._get_queue_expirec                 C  sL   |  |d}|s
dS | jd|idd|ii | jd|idd|ii dS )z,Update expiration field on queues documents.rK   NrS   rJ   rM   rI   )rR   ry   update_manyrP   )r   rS   rM   r   r   r   r]     s   zChannel._update_queues_expirec                 C  s   t tjS )zReturn current time in UTC.)r   nowr   utcr#   r   r   r   r     r   zChannel.get_now)r   )3r/   r0   r1   r2   supports_fanoutrX   r7   r8   r6   r9   rA   r:   r;   r<   r=   r>   r?   r@   r   r3   from_transport_optionsr   rT   r^   r`   ri   rm   rp   r}   r   r   r   r   r   r   r   r   r   r   rD   r5   ry   rj   rP   rY   r   rd   rR   r]   r   __classcell__r   r   rG   r   r3   p   sd    


E
	




r3   c                   @  sz   e Zd ZdZeZdZdZejZej	j
ejf Z
ej	jejejf ZdZdZej	jjeg ddZdd	 ZddddZdS )	TransportzMongoDB Transport.Tr   mongodbr)   )directtopicr~   )exchange_typec                 C  s   t jS r    )r)   r   r#   r   r   r   driver_version  s   zTransport.driver_versionF**urir+   returnc                 C  sB   |sdS |r|S d|vrt |S |dd\}}dt ||gS )Nr   ,r   )r   r   join)r   r   include_passwordmaskuri1	remainderr   r   r   as_uri   s   zTransport.as_uriN)Fr   )r   r+   r   r+   )r/   r0   r1   r2   r3   can_parse_urlpolling_intervalr;   r   r   connection_errorsr   ConnectionFailurechannel_errorsr*   driver_typedriver_name
implementsextendrv   r   r  r   r   r   r   r     s(    
r   )%r2   
__future__r   r   r   r   r   rS   r   r)   r   r   r	   pymongo.cursorr
   kombu.exceptionsr   kombu.utils.compatr   kombu.utils.encodingr   kombu.utils.jsonr   r   kombu.utils.objectsr   kombu.utils.urlr    r   baser   r   r   r   r3   r   r   r   r   r   <module>   s0    /   