Python aioredis 模块,create_redis_pool() 实例源码
我们从Python开源项目中,提取了以下12个代码示例,用于说明如何使用aioredis.create_redis_pool()。
def set_redis_pool(self, redis_pool: Optional[Redis]):
if redis_pool:
if isinstance(redis_pool, (ConnectionsPool,)):
# If they've passed a raw pool then wrap it up in a Redis object.
# aioredis.create_redis_pool() normally does this for us.
redis_pool = Redis(redis_pool)
if not isinstance(redis_pool, (Redis,)):
raise InvalidRedisPool(
'Invalid Redis connection provided: {}. If unsure,use aioredis.create_redis_pool() to '
'create your redis connection.'.format(redis_pool)
)
if not isinstance(redis_pool._pool_or_conn,)):
raise InvalidRedisPool(
'The provided redis connection is backed by a single connection,rather than a '
'pool of connections. This will lead to lightbus deadlocks and is unsupported. '
'If unsure,use aioredis.create_redis_pool() to create your redis connection.'
)
self._redis_pool = redis_pool
def redis(loop):
"""
yield fixture which creates a redis connection,and flushes redis before the test.
Note: redis is not flushed after the test both for performance and to allow later debugging.
"""
async def _create_redis():
r = await create_redis_pool(('localhost', 6379), loop=loop)
await r.flushall()
return r
async def _close(r):
r.close()
await r.wait_closed()
redis_ = loop.run_until_complete(_create_redis())
yield redis_
loop.run_until_complete(_close(redis_))
def create_pool_lenient(settings: RedisSettings, loop: asyncio.AbstractEventLoop, *,
_retry: int=0) -> Redis:
"""
Create a new redis pool,retrying up to conn_retries times if the connection fails.
:param settings: RedisSettings instance
:param loop: event loop
:param _retry: retry attempt,this is set when the method calls itself recursively
"""
addr = settings.host, settings.port
try:
pool = await aioredis.create_redis_pool(
addr, loop=loop, db=settings.database, password=settings.password,
timeout=settings.conn_timeout
)
except (ConnectionError, OSError, aioredis.RedisError, asyncio.TimeoutError) as e:
if _retry < settings.conn_retries:
logger.warning('redis connection error %s %s,%d retries remaining...',
e.__class__.__name__, e, settings.conn_retries - _retry)
await asyncio.sleep(settings.conn_retry_delay)
else:
raise
else:
if _retry > 0:
logger.info('redis connection successful')
return pool
# recursively attempt to create the pool outside the except block to avoid
# "During handling of the above exception..." madness
return await create_pool_lenient(settings, loop, _retry=_retry + 1)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。