Examples¶
Generic showcase examples¶
# flake8: noqa
import riprova
@riprova.retry
def task():
"""Retry operation if it fails with constant backoff"""
@riprova.retry(backoff=riprova.ExponentialBackOff(factor=.5))
def task():
"""Retry operation if it fails using exponential backoff"""
@riprova.retry(timeout=10)
def task():
"""Raises a TimeoutError if the retry loop exceeds from 10 seconds"""
def on_retry(err, next_try):
print('Operation error: {}'.format(err))
print('Next try in: {}ms'.format(next_try))
@riprova.retry(on_retry=on_retry)
def task():
"""Subscribe via function callback to every retry attempt"""
def evaluator(response):
# Force retry operation if not a valid response
if response.status >= 400:
raise RuntimeError('invalid response status')
# Otherwise return False, meaning no retry
return False
@riprova.retry(evaluator=evaluator)
def task():
"""Use a custom evaluator function to determine if the operation failed or not"""
@riprova.retry
async def task():
"""Asynchronous coroutines are also supported :)"""
Usage as decorator¶
# -*- coding: utf-8 -*-
import riprova
# Store number of function calls for error simulation
calls = 0
# Register retriable operation with custom evaluator
@riprova.retry
def mul2(x):
global calls
if calls < 3:
calls += 1
raise RuntimeError('simulated call error')
return x * 2
# Run task
result = mul2(2)
print('Result: {}'.format(result))
Usage as context manager¶
# -*- coding: utf-8 -*-
import riprova
# Store number of function calls for error simulation
calls = 0
# Register retriable operation with custom evaluator
def mul2(x):
global calls
if calls < 4:
calls += 1
raise RuntimeError('simulated call error')
return x * 2
# Run task via context manager
with riprova.Retrier() as retry:
result = retry.run(mul2, 2)
print('Result 1: {}'.format(result))
# Or alternatively create a shared retrier and reuse it across multiple
# context managers.
retrier = riprova.Retrier()
with retrier as retry:
calls = 0
result = retry.run(mul2, 4)
print('Result 2: {}'.format(result))
with retrier as retry:
calls = 0
result = retry.run(mul2, 8)
print('Result 3: {}'.format(result))
Timeout retry cycle limit¶
# -*- coding: utf-8 -*-
import riprova
# Store number of function calls for error simulation
calls = 0
# Register retriable operation with custom evaluator
@riprova.retry(timeout=0.3)
def mul2(x):
global calls
if calls < 4:
calls += 1
raise RuntimeError('simulated call error')
return x * 2
# Run task
try:
mul2(2)
except riprova.RetryTimeoutError as err:
print('Timeout error: {}'.format(err))
Retry failed HTTP requests¶
# -*- coding: utf-8 -*-
import pook
import requests
from riprova import retry
# Define HTTP mocks
pook.get('server.com').times(3).reply(503)
pook.get('server.com').times(1).reply(200).json({'hello': 'world'})
# Retry evaluator function used to determine if the operated failed or not
def evaluator(response):
if response.status != 200:
return Exception('failed request')
return False
# On retry even subcriptor
def on_retry(err, next_try):
print('Operation error {}'.format(err))
print('Next try in {}ms'.format(next_try))
# Register retriable operation
@retry(evaluator=evaluator, on_retry=on_retry)
def fetch(url):
return requests.get(url)
# Run request
fetch('http://server.com')
Retry failed HTTP requests using asyncio + aiohttp¶
# -*- coding: utf-8 -*-
# flake8: noqa
"""
Note: only Python 3.5+ compatible.
"""
import pook
import paco
import aiohttp
from riprova import retry
# Define HTTP mocks to simulate failed scenarios
pook.get('server.com').times(4).reply(503)
pook.get('server.com').times(1).reply(200).json({'hello': 'world'})
# Retry evaluator function used to determine if the operated failed or not
async def evaluator(status):
if status != 200:
return Exception('failed request with status {}'.format(status))
return False
# On retry even subcriptor
async def on_retry(err, next_try):
print('Operation error: {}'.format(err))
print('Next try in {}ms'.format(next_try))
# Register retriable operation with custom evaluator
@retry(evaluator=evaluator, on_retry=on_retry)
async def fetch(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return response.status
# Run request
status = paco.run(fetch('http://server.com'))
print('Response status:', status)
Whitelisting custom errors¶
# -*- coding: utf-8 -*-
import riprova
# Custom error object
class MyCustomError(Exception):
pass
# Whitelist of errors that should not be retried
whitelist = riprova.ErrorWhitelist([
ReferenceError,
ImportError,
IOError,
SyntaxError,
IndexError
])
def error_evaluator(error):
"""
Used to determine if an error is legit and therefore
should be retried or not.
"""
return whitelist.isretry(error)
# In order to define a global whitelist policy that would be used
# across all retry instances, overwrite the whitelist attribute in Retrier:
riprova.Retrier.whitelist = whitelist
# Store number of function calls for error simulation
calls = 0
# Register retriable operation with a custom error evaluator
# You should pass the evaluator per retry instance.
@riprova.retry(error_evaluator=error_evaluator)
def mul2(x):
global calls
if calls < 3:
calls += 1
raise RuntimeError('simulated call error')
if calls == 3:
calls += 1
raise ReferenceError('legit error')
return x * 2
# Run task
try:
mul2(2)
except ReferenceError as err:
print('Whitelisted error: {}'.format(err))
print('Retry attempts: {}'.format(calls))
Blacklisting custom errors¶
# -*- coding: utf-8 -*-
import riprova
# Custom error object
class MyCustomError(Exception):
pass
# Blacklist of errors that should exclusively be retried
blacklist = riprova.ErrorBlacklist([
RuntimeError,
MyCustomError
])
def error_evaluator(error):
"""
Used to determine if an error is legit and therefore
should be retried or not.
"""
return blacklist.isretry(error)
# In order to define a global blacklist policy that would be used
# across all retry instances, overwrite the blacklist attribute in Retrier.
# NOTE: blacklist overwrites any whitelist. They are mutually exclusive.
riprova.Retrier.blacklist = blacklist
# Store number of function calls for error simulation
calls = 0
# Register retriable operation with a custom error evaluator
# You should pass the evaluator per retry instance.
@riprova.retry(error_evaluator=error_evaluator)
def mul2(x):
global calls
if calls < 3:
calls += 1
raise RuntimeError('simulated call error')
if calls == 3:
calls += 1
raise Exception('non blacklisted error')
return x * 2
# Run task
try:
mul2(2)
except Exception as err:
print('Blacklisted error: {}'.format(err))
print('Retry attempts: {}'.format(calls))
Constant backoff strategy¶
# -*- coding: utf-8 -*-
import riprova
# Store number of function calls for error simulation
calls = 0
# Max number of retries attempts
retries = 5
# Register retriable operation with custom evaluator
@riprova.retry(backoff=riprova.ConstantBackoff(interval=.5, retries=retries))
def mul2(x):
global calls
if calls < 4:
calls += 1
raise RuntimeError('simulated call error')
return x * 2
# Run task
result = mul2(2)
print('Result: {}'.format(result))
Fibonacci backoff strategy¶
# -*- coding: utf-8 -*-
import riprova
# Store number of function calls for error simulation
calls = 0
# Max number of retries attempts
retries = 5
# Register retriable operation with custom evaluator
@riprova.retry(backoff=riprova.FibonacciBackoff(retries=retries))
def mul2(x):
global calls
if calls < 4:
calls += 1
raise RuntimeError('simulated call error')
return x * 2
# Run task
result = mul2(2)
print('Result: {}'.format(result))
Exponential backoff strategy¶
# -*- coding: utf-8 -*-
import riprova
# Store number of function calls for error simulation
calls = 0
# Max number of retries attempts
retries = 5
# Register retriable operation with custom evaluator
@riprova.retry(backoff=riprova.ExponentialBackOff(factor=1, retries=retries))
def mul2(x):
global calls
if calls < 4:
calls += 1
raise RuntimeError('simulated call error')
return x * 2
# Run task
result = mul2(2)
print('Result: {}'.format(result))