websauna.system.model.retry module

Transaction retry point support for command line applications and daemons..

exception websauna.system.model.retry.CannotRetryAnymore[source]

Bases: Exception

We have reached the limit of transaction retry counts in @retryable.

exception websauna.system.model.retry.NotRetryable[source]

Bases: Exception

Transaction retry mechanism not configured.

exception websauna.system.model.retry.TooDeepInTransactions[source]

Bases: Exception

@retryable function messed up with the transaction management

exception websauna.system.model.retry.TransactionAlreadyInProcess[source]

Bases: Exception

ensure_transactionless() detected a dangling transactions.

websauna.system.model.retry.ensure_transactionless(msg=None, transaction_manager=<transaction._manager.ThreadTransactionManager object>)[source]

Make sure the current thread doesn’t already have db transaction in process.

Parameters

transaction_manager (Union[TransactionManager, ThreadTransactionManager]) – TransactionManager to check. Defaults to thread local transaction manager (ThreadTransactionManager).

websauna.system.model.retry.is_retryable(txn, error)[source]

Check if this transaction is one caused by database conflict.

These transactions should not be caught in catch all exception expressions.

Parameters
  • txn

  • error

Returns

websauna.system.model.retry.retryable(tm=None, get_tm=None)[source]

Function decorator for§ SQL Serialized transaction conflict resolution through retries.

You need to give either tm or get_tm argument.

  • New transaction is started when entering the decorated function

  • If there is already a transaction in progress when entering the decorated function raise an error

  • Commit when existing the decorated function

  • If the commit fails due to a SQL serialization conflict then try to rerun the decorated function max tm.retry_attempt_count times. Usually this is configured in TODO.

Example:

from websauna.system.model.retry import retryable

def deposit_eth(web3: Web3, dbsession: Session, opid: UUID):

    @retryable(tm=dbsession.transaction_manager)
    def perform_tx():
        op = dbsession.query(CryptoOperation).get(opid)
        op.mark_performed()
        op.mark_broadcasted()
        # Transaction confirmation count updater will make sure we have enough blocks,
        # and then will call mark_completed()

    perform_tx()

Example using class based transaction manager resolver:

from websauna.system.model.retry import retryable

class OperationQueueManager:

    def __init__(self, web3: Web3, dbsession: Session, asset_network_id, registry: Registry):
        assert isinstance(registry, Registry)
        assert isinstance(asset_network_id, UUID)
        self.web3 = web3
        self.dbsession = dbsession
        self.asset_network_id = asset_network_id
        self.registry = registry
        self.tm = self.dbsession.transaction_manager

    def _get_tm(*args, **kargs):
        self = args[0]
        return self.tm

    @retryable(get_tm=_get_tm)
    def get_waiting_operation_ids(self) -> List[Tuple[UUID, CryptoOperationType]]:
        wait_list = self.dbsession.query(CryptoOperation, CryptoOperation.id, CryptoOperation.state, CryptoOperation.operation_type).filter_by(network_id=self.asset_network_id, state=CryptoOperationState.waiting)

        # Flatten
        wait_list = [(o.id, o.operation_type) for o in wait_list]
        return wait_list

    def run_waiting_operations(self):
        # Performed inside TX retry boundary
        ops = self.get_waiting_operation_ids()

Transaction manager needs retry_attempt_count attribute set by Websauna framework.

Parameters
  • tm (Optional[TransactionManager]) – Transaction manager used to control the TX execution

  • get_tm (Optional[Callable]) – Factory function that is called with args and kwargs to get the transaction manager