Python aioredis 模块,Redis() 实例源码
我们从Python开源项目中,提取了以下15个代码示例,用于说明如何使用aioredis.Redis()。
def get_redis(address=None, loop=None, recreate=False) -> aioredis.Redis:
global _redis
address = address or settings.CONfig['redis']
kwargs = utils.parse_redis_url(address)
kwargs['address'] = kwargs.pop('host'), kwargs.pop('port')
if not _redis or recreate:
_redis = await aioredis.create_reconnecting_redis(loop=loop, **kwargs)
return _redis
def set_redis_pool(self, redis_pool: Optional[Redis]):
if redis_pool:
if isinstance(redis_pool, (ConnectionsPool,)):
# If they've passed a raw pool then wrap it up in a Redis object.
# aioredis.create_redis_pool() normally does this for us.
redis_pool = Redis(redis_pool)
if not isinstance(redis_pool, (Redis,)):
raise InvalidRedisPool(
'Invalid Redis connection provided: {}. If unsure,use aioredis.create_redis_pool() to '
'create your redis connection.'.format(redis_pool)
)
if not isinstance(redis_pool._pool_or_conn,)):
raise InvalidRedisPool(
'The provided redis connection is backed by a single connection,rather than a '
'pool of connections. This will lead to lightbus deadlocks and is unsupported. '
'If unsure,use aioredis.create_redis_pool() to create your redis connection.'
)
self._redis_pool = redis_pool
def call_rpc(self, rpc_message: Rpcmessage):
stream = '{}:stream'.format(rpc_message.api_name)
logger.debug(
LBullets(
L("Enqueuing message {} in Redis stream {}", Bold(rpc_message), Bold(stream)),
items=rpc_message.to_dict()
)
)
pool = await self.get_redis_pool()
with await pool as redis:
start_time = time.time()
# Todo: MAXLEN
await redis.xadd(stream=stream, fields=rpc_message.to_dict())
logger.info(L(
"Enqueued message {} in Redis in {} stream {}",
Bold(rpc_message), human_time(time.time() - start_time), Bold(stream)
))
def send_result(self, rpc_message: Rpcmessage, result_message: ResultMessage, return_path: str):
logger.debug(L(
"Sending result {} into Redis using return path {}",
Bold(result_message), Bold(return_path)
))
redis_key = self._parse_return_path(return_path)
pool = await self.get_redis_pool()
with await pool as redis:
start_time = time.time()
p = redis.pipeline()
p.lpush(redis_key, redis_encode(result_message.result))
# Todo: Make result expiry configurable
p.expire(redis_key, timeout=60)
await p.execute()
logger.debug(L(
"? Sent result {} into Redis in {} using return path {}", Bold(return_path)
))
def receive_result(self, return_path: str) -> ResultMessage:
logger.info(L("? Awaiting Redis result for RPC message: {}", Bold(rpc_message)))
redis_key = self._parse_return_path(return_path)
pool = await self.get_redis_pool()
with await pool as redis:
start_time = time.time()
# Todo: Make timeout configurable
_, result = await redis.blpop(redis_key, timeout=5)
result = redis_decode(result)
logger.info(L(
"? Received Redis result in {} for RPC message {}: {}",
human_time(time.time() - start_time), rpc_message, Bold(result)
))
return result
def conn(func):
@functools.wraps(func)
async def wrapper(self, *args, _conn=None, **kwargs):
if _conn is None:
pool = await self._get_pool()
conn_context = await pool
with conn_context as _conn:
if not AIOREdis_BEFORE_ONE:
_conn = aioredis.Redis(_conn)
return await func(self, _conn=_conn, **kwargs)
return await func(self, **kwargs)
return wrapper
def __init__(self, queue=None):
self.queue = queue or list()
redis = mock.Magicmock(aioredis.Redis)
redis.rpush.side_effect = utils.make_coro(result=lambda key, item: self.queue.append(item))
redis.rpop.side_effect = utils.make_coro(result=lambda key: self.queue.pop())
redis.lpush.side_effect = utils.make_coro(result=lambda key, item: self.queue.insert(0, item))
redis.llen.side_effect = utils.make_coro(result=lambda key: len(self.queue))
super(MockQueue, self).__init__(redis, None)
def __init__(self, *,
redis: Redis,
max_concurrent_tasks: int=50,
shutdown_delay: float=6,
timeout_seconds: int=60,
burst_mode: bool=True,
raise_task_exception: bool=False,
semaphore_timeout: float=60) -> None:
"""
:param redis: redis pool to get connection from to pop items from list,also used to optionally
re-enqueue pending jobs on termination
:param max_concurrent_tasks: maximum number of jobs which can be execute at the same time by the event loop
:param shutdown_delay: number of seconds to wait for tasks to finish
:param timeout_seconds: maximum duration of a job,after that the job will be cancelled by the event loop
:param burst_mode: break the iter loop as soon as no more jobs are available by adding an sentinel quit queue
:param raise_task_exception: whether or not to raise an exception which occurs in a processed task
"""
self.redis = redis
self.loop = redis._pool_or_conn._loop
self.max_concurrent_tasks = max_concurrent_tasks
self.task_semaphore = asyncio.Semaphore(value=max_concurrent_tasks, loop=self.loop)
self.shutdown_delay = max(shutdown_delay, 0.1)
self.timeout_seconds = timeout_seconds
self.burst_mode = burst_mode
self.raise_task_exception = raise_task_exception
self.pending_tasks: Set[asyncio.futures.Future] = set()
self.task_exception: Exception = None
self.semaphore_timeout = semaphore_timeout
self.jobs_complete, self.jobs_Failed, self.jobs_timed_out = 0, 0, 0
self.running = False
self._finish_lock = asyncio.Lock(loop=self.loop)
def create_pool_lenient(settings: RedisSettings, loop: asyncio.AbstractEventLoop,
_retry: int=0) -> Redis:
"""
Create a new redis pool,retrying up to conn_retries times if the connection fails.
:param settings: RedisSettings instance
:param loop: event loop
:param _retry: retry attempt,this is set when the method calls itself recursively
"""
addr = settings.host, settings.port
try:
pool = await aioredis.create_redis_pool(
addr, loop=loop, db=settings.database, password=settings.password,
timeout=settings.conn_timeout
)
except (ConnectionError, OSError, aioredis.RedisError, asyncio.TimeoutError) as e:
if _retry < settings.conn_retries:
logger.warning('redis connection error %s %s,%d retries remaining...',
e.__class__.__name__, e, settings.conn_retries - _retry)
await asyncio.sleep(settings.conn_retry_delay)
else:
raise
else:
if _retry > 0:
logger.info('redis connection successful')
return pool
# recursively attempt to create the pool outside the except block to avoid
# "During handling of the above exception..." madness
return await create_pool_lenient(settings, loop, _retry=_retry + 1)
def __init__(self,
loop: asyncio.AbstractEventLoop=None,
redis_settings: RedisSettings=None,
existing_redis: Redis=None) -> None:
"""
:param loop: asyncio loop to use for the redis pool
:param redis_settings: connection settings to use for the pool
:param existing_redis: existing pool,if set no new pool is created,instead this one is used
"""
# the "or getattr(...) or" seems odd but it allows the mixin to work with subclasses which initialise
# loop or redis_settings before calling super().__init__ and don't pass those parameters through in kwargs.
self.loop = loop or getattr(self, 'loop', None) or asyncio.get_event_loop()
self.redis_settings = redis_settings or getattr(self, 'redis_settings', None) or RedisSettings()
self.redis = existing_redis
self._create_pool_lock = asyncio.Lock(loop=self.loop)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。