Python aioredis 模块,create_redis() 实例源码
我们从Python开源项目中,提取了以下35个代码示例,用于说明如何使用aioredis.create_redis()。
def get_app():
redis = await aioredis.create_redis(('localhost', 6379,), db=1)
app = web.Application()
app['redis'] = redis
aiohttp_jinja2.setup(app, loader=jinja2.FileSystemloader('templates/'),
context_processors=(static_processor,))
app.router.add_route('GET', '/', handlers.index)
app.router.add_route('GET', '/login', handlers.login_task)
app.router.add_route('POST', handlers.login)
app.router.add_static('/static', 'static')
async def close_redis(app):
app['redis'].close()
app.on_shutdown.append(close_redis)
return app
def run_bot():
if BotConfig.check():
loop = asyncio.get_event_loop()
logger.info('creating redis connection')
redis_conn = loop.run_until_complete(aioredis.create_redis(('localhost', 6379), encoding="utf-8"))
logger.info('adding signal handlers')
add_signal_handlers(loop, redis_conn)
logger.info('creating tasks: bot and background coros')
create_tasks(loop, redis_conn)
try:
logger.info('starting event loop ')
loop.run_forever()
finally:
loop.close()
def test_send_message(self):
async def func():
SUB = await aioredis.create_redis((os.environ.get(
"REdis_HOST",
"127.0.0.1"
), 6379))
res = await SUB.psubscribe("enibar-*")
subscriber = res[0]
api.redis.send_message('enibar-test', 'test')
await subscriber.wait_message()
reply = await subscriber.get_json()
self.assertEqual(reply, (b'enibar-test', 'test'))
task = asyncio.ensure_future(func())
self.loop.run_until_complete(task)
def store_interface(config, interface, key=None, exist='SET_IF_NOT_EXIST'):
redis = yield from aioredis.create_redis((config.redis_host, config.redis_port))
pipe = redis.pipeline()
interface_id = interface['network_interface_id']
# Only store picked interface data if using default key (not fixed key from instance)
if not key:
key = KEY_ENI + interface_id
pipe.set(key=KEY_ENI + interface_id, value=pickle.dumps(interface), expire=int(config.redis_ttl))
# Store intermediate key lookups so that we can find Metadata given only an IP address
if 'association' in interface:
pipe.set(key=KEY_IP + interface['association']['public_ip'], value=key, expire=int(config.redis_ttl), exist=exist)
for address in interface.get('private_ip_addresses', []):
pipe.set(key=KEY_IP + address['private_ip_address'], exist=exist)
yield from pipe.execute()
redis.close()
yield from redis.wait_closed()
def __init__(self, io_loop: asyncio.AbstractEventLoop = None):
super().__init__()
self.io_loop = io_loop or asyncio.get_event_loop()
self.sub_client = self.io_loop.run_until_complete(
aioredis.create_redis((config.get('REdis', 'host', fallback='localhost'),
config.getint('REdis', 'port', fallback=6379)),
db=config.getint('REdis', 'db', fallback=1)))
self.redis_client = redis.StrictRedis(
host=config.get('REdis',
db=config.getint('REdis', fallback=1), decode_responses=True)
self.initialized = False
self.sub_tasks = list()
self.sub_channels = list()
self.channel_router = dict()
self.crontab_router = defaultdict(dict)
self.datetime = None
self.time = None
self.loop_time = None
def setup(self):
self.redis = await aioredis.create_redis((config.REdis_HOST, config.REdis_PORT, encoding='utf8')
self.__processed_status = await self.get_processed_ids()
self.client = aiohttp.ClientSession()
extension_manager = extension.ExtensionManager(namespace='ofensivaria.bot.commands',
invoke_on_load=True,
invoke_args=(self, self.redis, self.client))
commands = extension_manager.map(self.__extension_manager_callback)
self.commands = [obj for name, obj in sorted(commands)]
prepare_tasks = [c.prepare() for c in self.commands]
await asyncio.gather(*prepare_tasks)
self.__setup = True
def connect(self, endpoint: str, auth: Optional[str] = None) -> None: # type: ignore
endpoint = validate_endpoint(endpoint)
auth = validate_auth(auth)
self._auth = auth
# print('*** redis connecting to ',endpoint,flush=True)
if self._connected:
raise ConnectionError('Already connected.')
if not endpoint.startswith('redis://'):
raise ValueError('Expected endpoint to begin with "redis://".'
'Got: {!r}'.format(endpoint))
host, port = endpoint.replace('redis://', '').split(':') # todo: handle exception
self._subscriber = await aioredis.create_redis((host, port))
self._publisher = await aioredis.create_redis((host, port))
if auth:
await self._subscriber.auth(auth)
await self._publisher.auth(auth)
else:
print('*** WARNING: Redis connection has no password.')
self._connected = True
def redis_conn(loop):
"""
yield fixture which creates a redis connection,and flushes redis before the test.
Note: redis is not flushed after the test both for performance and to allow later debugging.
"""
async def _get_conn():
conn = await create_redis(('localhost', loop=loop)
await conn.flushall()
return conn
conn = loop.run_until_complete(_get_conn())
conn.loop = loop
yield conn
conn.close()
try:
loop.run_until_complete(conn.wait_closed())
except RuntimeError:
pass
def prepare(self):
"""
Called by the backend to prepare SocketShark (i.e. initialize Redis
connection and the receiver class)
"""
redis_receiver = Receiver(loop=asyncio.get_event_loop())
redis_settings = self.config['REdis']
try:
self.redis = await aioredis.create_redis((
redis_settings['host'], redis_settings['port']))
except (OSError, aioredis.RedisError):
self.log.exception('Could not connect to redis')
raise
# Some features (e.g. pinging) don't work on old Redis versions.
info = await self.redis.info('server')
version_info = info['server']['redis_version'].split('.')
major, minor = int(version_info[0]), int(version_info[1])
if not (major > 3 or major == 3 and minor >= 2):
msg = 'Redis version must be at least 3.2'
self.log.exception(msg, version_info=version_info)
raise Exception(msg)
self._redis_connection_handler_task = asyncio.ensure_future(
self._redis_connection_handler())
self.service_receiver = ServiceReceiver(self, redis_receiver)
def __init__(self,
dsn: str = "redis://127.0.0.1:6379/0",
prefix: str = "aiotasks",
loop=None):
super().__init__(loop=loop, prefix=prefix)
_, password, host, port, db = parse_dsn(dsn,
default_port=6379,
default_db=0)
db = int(db)
# if not port:
# port = 6379
#
# port = int(port)
# try:
# db = int(db)
#
# if not db:
# db = 0
# except ValueError:
# db = 0
self._redis_pub = self._loop_subscribers.run_until_complete(
aioredis.create_redis(address=(host, port),
db=db,
password=password,
loop=self._loop_subscribers))
self._redis_sub = self._loop_subscribers.run_until_complete(
aioredis.create_redis(address=(host,
loop=self._loop_subscribers))
def __init__(self,
concurrency: int = 5, prefix=prefix, concurrency=concurrency)
_,
default_db=0)
db = int(db)
# if not port:
# port = 6379
#
# port = int(port)
# try:
# db = int(db)
# if not db:
# db = 0
# except ValueError:
# db = 0
self._redis_consumer = self._loop_delay. \
run_until_complete(aioredis.create_redis(address=(host,
db=db,
password=password,
loop=self._loop_delay))
self._redis_poller = self._loop_delay. \
run_until_complete(aioredis.create_redis(address=(host,
loop=self._loop_delay))
def get_async_redis(self, loop=None):
"""Creates an asynchronous Redis connection.
Parameters
----------
loop = Optional[asyncio.AbstractEventLoop]
The loop used for the asynchronous Redis connection.
"""
if self.loop is not None and loop is None:
loop = self.loop
return await aioredis.create_redis(
'redis://{}:{}'.format(cache_config['HOST'], cache_config['PORT']),
db=cache_config['DB'], password=cache_config['PASSWORD'], loop=loop)
def get_async_redis():
""" initialize an asyncronous redis connection
"""
global ASYNCREdis
if ASYNCREdis is None or ASYNCREdis.closed: # pragma: no branch
address = REdis_PUBSUB["address"]
db = REdis_PUBSUB["db"]
password = REdis_PUBSUB["password"]
ASYNCREdis = yield from aioredis.create_redis(address, db=db, password=password)
return ASYNCREdis
def redis(variables,
loop=loop
)
return loop.run_until_complete(coro)
def install_redis_handle(app):
global SUB
while True:
SUB = await aioredis.create_redis((settings.REdis_HOST, password=settings.REdis_PASSWORD)
res = await SUB.psubscribe("enibar-*")
subscriber = res[0]
while await subscriber.wait_message():
reply = await subscriber.get_json()
await app.redis_handle(reply[0].decode(), reply[1])
await asyncio.sleep(1)
def store_instance(config, instance):
redis = yield from aioredis.create_redis((config.redis_host, config.redis_port))
pipe = redis.pipeline()
instance_id = instance['instance_id']
# Store pickled instance data keyed off instance ID
pipe.set(key=KEY_I + instance_id, value=pickle.dumps(instance), expire=int(config.redis_ttl))
# Store intermediate key lookups so that we can find an instance given only its IP address
for interface in instance.get('network_interfaces', []):
yield from store_interface(config, KEY_I + instance_id, None)
yield from pipe.execute()
redis.close()
yield from redis.wait_closed()
def query(self, query_type: str, **kwargs):
sub_client = None
channel_name1, channel_name2 = None, None
try:
sub_client = await aioredis.create_redis(
(config.get('REdis',
config.getint('REdis',
db=config.getint('REdis', fallback=1))
request_id = self.next_id()
kwargs['RequestID'] = request_id
channel_name1 = self.__Trade_response_format.format('OnRspQry' + query_type, request_id)
channel_name2 = self.__Trade_response_format.format('OnRspError', request_id)
ch1, ch2 = await sub_client.psubscribe(channel_name1, channel_name2)
cb = self.io_loop.create_future()
tasks = [
asyncio.ensure_future(self.query_reader(ch1, cb), loop=self.io_loop),
asyncio.ensure_future(self.query_reader(ch2,
]
self.redis_client.publish(self.__request_format.format('ReqQry' + query_type), json.dumps(kwargs))
rst = await asyncio.wait_for(cb, HANDLER_TIME_OUT, loop=self.io_loop)
await sub_client.punsubscribe(channel_name1, channel_name2)
sub_client.close()
await asyncio.wait(tasks, loop=self.io_loop)
return rst
except Exception as e:
logger.error('%s Failed: %s', query_type, repr(e), exc_info=True)
if sub_client and sub_client.in_pubsub and channel_name1:
await sub_client.unsubscribe(channel_name1, channel_name2)
sub_client.close()
return None
def SubscribeMarketData(self, inst_ids: list):
sub_client = None
channel_name1, fallback=1))
channel_name1 = self.__market_response_format.format('OnRspSubMarketData', 0)
channel_name2 = self.__market_response_format.format('OnRspError', 0)
ch1,
]
self.redis_client.publish(self.__request_format.format('SubscribeMarketData'), json.dumps(inst_ids))
rst = await asyncio.wait_for(cb, loop=self.io_loop)
return rst
except Exception as e:
logger.error('SubscribeMarketData Failed: %s', channel_name2)
sub_client.close()
return None
def UnSubscribeMarketData(self, fallback=1))
channel_name1 = self.__market_response_format.format('OnRspUnSubMarketData',
]
self.redis_client.publish(self.__request_format.format('UnSubscribeMarketData'), channel_name2)
sub_client.close()
return None
def test_order_filter_invalid(self):
"""
Test invalid message order.
"""
shark = SocketShark(TEST_CONfig)
await shark.prepare()
client = MockClient(shark)
session = client.session
subscription = 'simple.topic'
await session.on_client_event({
'event': 'subscribe',
'subscription': subscription,
})
assert client.log.pop() == {
'event': 'subscribe',
'status': 'ok',
}
# Test message from server to client
redis_settings = TEST_CONfig['REdis']
redis = await aioredis.create_redis((
redis_settings['host'], redis_settings['port']))
redis_topic = redis_settings['channel_prefix'] + subscription
await redis.publish_json(redis_topic, {
'subscription': subscription,
'_order': 'invalid',
'data': {'foo': 'invalid'},
})
await redis.publish_json(redis_topic,
'data': {'foo': 'bar'},
})
redis.close()
# Wait for Redis to propagate the messages
await asyncio.sleep(0.1)
has_messages = await shark.run_service_receiver(once=True)
assert has_messages
assert client.log == [{
'event': 'message',
}]
await shark.shutdown()
def ReqOrderInsert(self, **kwargs):
"""
InstrumentID ??
VolumetotalOriginal ??
LimitPrice ??
StopPrice ???
Direction ??
CombOffsetFlag ?,?,??
ContingentCondition ????
TimeCondition ????
"""
sub_client = None
channel_name1, fallback=1))
request_id = self.next_id()
order_ref = self.next_order_ref()
kwargs['nRequestId'] = request_id
kwargs['OrderRef'] = order_ref
channel_name1 = self.__Trade_response_format.format('OnRtnorder', order_ref)
channel_name2 = self.__Trade_response_format.format('OnRspError', request_id)
channel_name3 = self.__Trade_response_format.format('OnRspOrderInsert', ch2, ch3 = await sub_client.psubscribe(channel_name1, channel_name2, channel_name3)
cb = self.io_loop.create_future()
tasks = [
asyncio.ensure_future(self.query_reader(ch1,
asyncio.ensure_future(self.query_reader(ch3,
]
self.redis_client.publish(self.__request_format.format('ReqOrderInsert'), channel_name3)
sub_client.close()
await asyncio.wait(tasks, loop=self.io_loop)
logger.info('ReqOrderInsert,rst: %s', rst)
return rst
except Exception as e:
logger.error('ReqOrderInsert Failed: %s', channel_name2)
sub_client.close()
return None
def init(loop):
# Middlewares
middlewares = [
session_middleware(RedisStorage(await aioredis.create_pool((config.redis_ip, 6379)), cookie_name='w')),
error_middleware,
maintain_middleware,
]
# init server
app = web.Application(loop=loop,
middlewares=middlewares)
redis = await aioredis.create_redis((config.redis_ip, config.redis['port']), loop=loop)
app.redis = RedisFilter(redis)
# Register admin account
if await app.redis.get('User') is None:
await app.redis.set('SecretKey', os.urandom(16), many=False)
config.admin['password'] = await encrypt(app.redis, config.admin['password'])
config.admin['permission'] = 0x0f
await app.redis.set('User', config.admin, many=False)
# Init Profile
if await app.redis.get('Profile') is None:
await app.redis.set('Profile', {
'name': config.RSS['author'],
'link_desc': '',
'text': ''
}, many=False)
# Security
setup_security(app,
SessionIdentityPolicy(),
RedisAuthorizationPolicy(redis))
await compass(app.router)
CONST.CATEGORY = await app.redis.get('Categories') or []
aiohttp_jinja2.setup(app, loader=jinja2.FileSystemloader(config.template_addr))
_handler = app.make_handler(
access_log=logger,
access_log_format=formatters
)
_srv = await loop.create_server(_handler, config.server['host'], config.server['port'])
print('Server started at http://%s:%s...' % (config.server['host'], config.server['port']))
return _srv, _handler, app
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。