o
    f                     @  s   d Z ddlmZ ddlZddlmZmZ ddlmZ ddl	m
Z
mZ ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZmZ dZde e!eZ"e# Z$G dd dej%Z%G dd dej&Z&dS )aG  SQLAlchemy Transport module for kombu.

Kombu transport using SQL Database as the message store.

Features
========
* Type: Virtual
* Supports Direct: yes
* Supports Topic: yes
* Supports Fanout: no
* Supports Priority: no
* Supports TTL: no

Connection String
=================

.. code-block::

    sqla+SQL_ALCHEMY_CONNECTION_STRING
    sqlalchemy+SQL_ALCHEMY_CONNECTION_STRING

For details about ``SQL_ALCHEMY_CONNECTION_STRING`` see SQLAlchemy Engine Configuration documentation.

Examples
--------
.. code-block::

    # PostgreSQL with default driver
    sqla+postgresql://scott:tiger@localhost/mydatabase

    # PostgreSQL with psycopg2 driver
    sqla+postgresql+psycopg2://scott:tiger@localhost/mydatabase

    # PostgreSQL with pg8000 driver
    sqla+postgresql+pg8000://scott:tiger@localhost/mydatabase

    # MySQL with default driver
    sqla+mysql://scott:tiger@localhost/foo

    # MySQL with mysqlclient driver (a maintained fork of MySQL-Python)
    sqla+mysql+mysqldb://scott:tiger@localhost/foo

    # MySQL with PyMySQL driver
    sqla+mysql+pymysql://scott:tiger@localhost/foo

Transport Options
=================

* ``queue_tablename``: Name of table storing queues.
* ``message_tablename``: Name of table storing messages.

Moreover parameters of :func:`sqlalchemy.create_engine()` function can be passed as transport options.
    )annotationsN)dumpsloads)Empty)create_enginetext)OperationalError)sessionmaker)virtual)cached_property)bytes_to_str   )Message)	ModelBase)Queue)class_registrymetadata)r      r   .c                      s   e Zd ZdZdZi Z fddZdd Zdd Zd	d
 Z	e
dd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zedd Zedd  Z  ZS )!ChannelzThe channel class.Nc                   s&   |  |jj t j|fi | d S N)_configure_entity_tablenamesclienttransport_optionssuper__init__)self
connectionkwargs	__class__ V/var/www/html/venv/lib/python3.10/site-packages/kombu/transport/sqlalchemy/__init__.pyr   \   s   zChannel.__init__c                 C  s2   | dd| _| dd| _| jo| j d S  d S )Nqueue_tablenamekombu_queuemessage_tablenamekombu_message)getr#   r%   	queue_clsmessage_cls)r   optsr!   r!   r"   r   `   s   z$Channel._configure_entity_tablenamesc                 C  s<   | j j}|j }|dd  |dd  t|jfi |S )Nr#   r%   )r   r   r   copypopr   hostname)r   conninfor   r!   r!   r"   _engine_from_configk   s
   
zChannel._engine_from_configc                 C  s   | j j}|j| jvrEt1 |j| jv r | j|j W  d    S |  }t|d}t| ||f| j|j< W d    n1 s@w   Y  | j|j S )N)bind)	r   r   r-   _engines_MUTEXr/   r	   r   
create_all)r   r.   engineSessionr!   r!   r"   _openr   s   


zChannel._openc                 C  s$   | j d u r|  \}}| | _ | j S r   )_sessionr6   )r   _r5   r!   r!   r"   session   s   
zChannel.sessionc              	   C  s   | j | j| jj|k }|sitM | j | j| jj|k }|r0|W  d    S | |}| j | z| j   W n t	yP   | j 
  Y n	w W d    |S W d    |S 1 sdw   Y  |S r   )r9   queryr(   filternamefirstr2   addcommitr   rollbackr   queueobjr!   r!   r"   _get_or_create   s4   


zChannel._get_or_createc                 K  s   |  | d S r   )rD   )r   rB   r   r!   r!   r"   
_new_queue      zChannel._new_queuec                 K  sV   |  |}| t||}| j| z| j  W d S  ty*   | j  Y d S w r   )rD   r)   r   r9   r>   r?   r   r@   )r   rB   payloadr   rC   messager!   r!   r"   _put   s   
zChannel._putc                 C  s   |  |}| jjjdkr| jtd z<| j| j 	| jj
|jk	| jjdk| jj| jjd }|rNd|_tt|jW | j  S t | j  w )NsqlitezBEGIN IMMEDIATE TRANSACTIONFr   )rD   r9   r0   r<   executer   r:   r)   with_for_updater;   queue_ididvisibleorder_bysent_atlimitr=   r   r   rG   r?   r   )r   rB   rC   msgr!   r!   r"   _get   s&   


zChannel._getc                 C  s(   |  |}| j| j| jj|jkS r   )rD   r9   r:   r)   r;   rM   rN   rA   r!   r!   r"   
_query_all   s   
zChannel._query_allc                 C  sB   |  |jdd}z| j  W |S  ty    | j  Y |S w )NF)synchronize_session)rU   deleter9   r?   r   r@   )r   rB   countr!   r!   r"   _purge   s   zChannel._purgec                 C  s   |  | S r   )rU   rX   )r   rB   r!   r!   r"   _size   rF   zChannel._sizec                 C  sf   |t vr/t! |t v rt | W  d    S tt||tf|W  d    S 1 s*w   Y  t | S r   )r   r2   typestrr   )r   r<   basensr!   r!   r"   _declarative_cls   s    zChannel._declarative_clsc                 C     |  dtd| jiS )Nr   __tablename__)r_   	QueueBaser#   r   r!   r!   r"   r(      
   zChannel.queue_clsc                 C  r`   )Nr   ra   )r_   MessageBaser%   rc   r!   r!   r"   r)      rd   zChannel.message_cls)__name__
__module____qualname____doc__r7   r1   r   r   r/   r6   propertyr9   rD   rE   rI   rT   rU   rY   rZ   r_   r   r(   r)   __classcell__r!   r!   r   r"   r   V   s,    
	
r   c                   @  s2   e Zd ZdZeZdZdZdZdZe	fZ
dd ZdS )		TransportzThe transport class.Tr   sql
sqlalchemyc                 C  s   dd l }|jS )Nr   )rn   __version__)r   rn   r!   r!   r"   driver_version   s   zTransport.driver_versionN)rf   rg   rh   ri   r   can_parse_urldefault_portdriver_typedriver_namer   connection_errorsrp   r!   r!   r!   r"   rl      s    rl   )'ri   
__future__r   	threadingjsonr   r   rB   r   rn   r   r   sqlalchemy.excr   sqlalchemy.ormr	   kombu.transportr
   kombu.utilsr   kombu.utils.encodingr   modelsr   re   r   r   rb   r   r   VERSIONjoinmapr\   ro   RLockr2   r   rl   r!   r!   r!   r"   <module>   s*    5	 