Source code for riprova.retry

# -*- coding: utf-8 -*-
import functools
from .constants import PY_34
from .retrier import Retrier

if PY_34:  # pragma: no cover
    import asyncio
    from .async_retrier import AsyncRetrier
else:
    asyncio, AsyncRetrier, partial = None, None, None


def iscallable(x):
    """
    Returns `True` if the given value is a callable object.
    """
    return any([
        hasattr(x, '__call__'),
        asyncio and asyncio.iscoroutinefunction(x)
    ])


[docs]def retry(timeout=0, backoff=None, evaluator=None, error_evaluator=None, on_retry=None, **kw): """ Decorator function that wraps function, method or coroutine function that would be retried on failure capabilities. Retry policy can be configured via `backoff` param. You can also use a custom evaluator function used to determine when the returned task value is valid or not, retrying the operation accordingly. You can subscribe to every retry attempt via `on_retry` param, which accepts a function or a coroutine function. This function is overloaded: you can pass a function or coroutine function as first argument or an `int` indicating the `timeout` param. This function as decorator. Arguments: timeout (int): optional maximum timeout in seconds. Use `0` for no limit. Defaults to `0`. backoff (riprova.Backoff): optional backoff strategy to use. Defaults to `riprova.ConstantBackoff`. evaluator (function|coroutinefunction): optional retry result evaluator function used to determine if an operator failed or not. Useful when domain-specific evaluation, such as valid HTTP responses. error_evaluator (function|coroutinefunction): optional error evaluator function usef to determine if a reased exception is legit or not, and therefore should be handled as a failure or simply forward the raised exception and stop the retry cycle. This is useful in order to ignore custom error exceptions. on_retry (function|coroutinefunction): optional on retry event subscriber that will be executed before every retry attempt. Useful for reporting and tracing. sleep_fn (function|coroutinefunction): optional sleep function to be used before retry attempts. Defaults to `time.sleep()` or `asyncio.sleep()`. *kwargs (mixed): keyword variadic arguments to pass to `Retrier` or `AsyncRetrier` class constructors. Raises: TypeError: if function is not a function or coroutine function. Returns: function or coroutinefunction: decorated function or coroutine function with retry mechanism. Usage:: @riprova.retry def task(x, y): return x * y task(4, 4) # => 16 @riprova.retry(backoff=riprova.FinonacciBackoff(retries=10)) def task(x, y): return x * y task(4, 4) # => 16 @riprova.retry(timeout=10) async def task(x, y): return x * y await task(4, 4) # => 16 def on_retry(err, next_try): print('Error exception: {}'.format(err)) print('Next try in {}ms'.format(next_try)) @riprova.retry(on_retry=on_retry) async def task(x, y): return x * y await task(4, 4) # => 16 """ def decorator(fn, decorated=True): if not iscallable(fn): raise TypeError('first argument must a coroutine function, a ' 'function or a method.') # Resolve the required retrier instance RetrierClass = (AsyncRetrier if asyncio and asyncio.iscoroutinefunction(fn) else Retrier) # Normalize potentially overloaded timeout param _timeout = timeout if decorated else 0 @functools.wraps(fn) def wrapper(*args, **_kw): # Otherwise return recursive currier function retrier = RetrierClass(backoff=backoff, timeout=_timeout, evaluator=evaluator, error_evaluator=error_evaluator, on_retry=on_retry, **kw) # Run original function via retry safe runner return retrier.run(fn, *args, **_kw) # Return retry wrapper function return wrapper # Return retry delegator or decorator wrapper return decorator(timeout, False) if iscallable(timeout) else decorator