Python aiohttp.web 模块,StreamResponse() 实例源码
我们从Python开源项目中,提取了以下34个代码示例,用于说明如何使用aiohttp.web.StreamResponse()。
def handle_async_mjpeg_stream(self, request):
"""Generate an HTTP MJPEG stream from the camera."""
from haffmpeg import CameraMjpegAsync
stream = CameraMjpegAsync(get_binary(), loop=self.hass.loop)
yield from stream.open_camera(
self._input, extra_cmd=self._extra_arguments)
response = web.StreamResponse()
response.content_type = 'multipart/x-mixed-replace;boundary=ffserver'
yield from response.prepare(request)
try:
while True:
data = yield from stream.read(102400)
if not data:
break
response.write(data)
finally:
self.hass.async_add_job(stream.close())
yield from response.write_eof()
def __call__(self):
if hasattr(self.context, 'file_path'):
filepath = str(self.context.file_path.absolute())
filename = self.context.file_path.name
with open(filepath, 'rb') as f:
resp = StreamResponse(headers={
'CONTENT-disPOSITION': 'attachment; filename="%s"' % filename
})
resp.content_type = mimetypes.guess_type(filename)
data = f.read()
resp.content_length = len(data)
await resp.prepare(self.request)
resp.write(data)
return resp
# Field File
def download(self, disposition=None):
if disposition is None:
disposition = self.request.GET.get('disposition', 'attachment')
file = self.field.get(self.field.context or self.context)
if not isinstance(file, self.file_class) or not file.valid:
return HTTPNotFound(text='No file found')
cors_renderer = app_settings['cors_renderer'](self.request)
headers = await cors_renderer.get_headers()
headers.update({
'CONTENT-disPOSITION': f'{disposition}; filename="%s"' % file.filename
})
download_resp = StreamResponse(headers=headers)
download_resp.content_type = file.guess_content_type()
if file.size:
download_resp.content_length = file.size
await download_resp.prepare(self.request)
resp = await file.download(self.context, download_resp)
return resp
def serve_file(self, fi):
filepath = str(fi.file_path.absolute())
filename = fi.file_path.name
with open(filepath, 'rb') as f:
resp = StreamResponse()
resp.content_type, _ = mimetypes.guess_type(filename)
disposition = 'filename="{}"'.format(filename)
if 'text' not in resp.content_type:
disposition = 'attachment; ' + disposition
resp.headers['CONTENT-disPOSITION'] = disposition
data = f.read()
resp.content_length = len(data)
await resp.prepare(self.request)
resp.write(data)
return resp
def test_stream_response_multiple_chunks(loop, test_client):
@asyncio.coroutine
def handler(request):
resp = web.StreamResponse()
resp.enable_chunked_encoding()
yield from resp.prepare(request)
resp.write(b'x')
resp.write(b'y')
resp.write(b'z')
return resp
app = web.Application()
app.router.add_get('/', handler)
client = yield from test_client(app)
resp = yield from client.get('/')
assert 200 == resp.status
data = yield from resp.read()
assert b'xyz' == data
def file_handler(request: web.Request):
path = '/file' + request.match_info.get('path')
data = await parse(request, global_handlers.keys())
data['path'] = path
handler = global_handlers[request.method]
try:
result = await handler(*(data, request, None)[:len(signature(handler).parameters)])
except InvalidRequest as err:
return web.Response(text=json.dumps({
'status': 1,
'data': str(err)
}, ensure_ascii=False),status=err.status_code, content_type='application/json')
if isinstance(result, web.StreamResponse):
return result
return web.Response(text=json.dumps({
'status': 0,
**({'data': result} if result is not None else {})
}, content_type='application/json')
def ajax_handler(request: web.Request):
action = request.match_info.get('action')
data = await parse(request, global_handlers.keys())
if action not in global_handlers[request.method]:
raise web.HTTPBadRequest()
handler = global_handlers[request.method][action]
try:
result = await handler(*(data, status=err.status_code, content_type='application/json')
def response_factory(app, handler):
async def response(request):
logging.info('Response handler...')
r = await handler(request)
if isinstance(r, web.StreamResponse):
return r
if isinstance(r, bytes):
resp = web.Response(body=r)
resp.content_type = 'application/octet-stream'
return resp
if isinstance(r, str):
if r.startswith('redirect:'):
return web.HTTPFound(r[9:])
resp = web.Response(body=r.encode('utf-8'))
resp.content_type = 'text/html;charset=utf-8'
return resp
if isinstance(r, dict):
template = r.get('__template__')
if template is None:
resp = web.Response(body=json.dumps(r, ensure_ascii=False, default=lambda o: o.__dict__).encode('utf-8'))
resp.content_type = 'application/json;charset=utf-8'
return resp
else:
# ???jinja2????????????
r['__user__'] = request.__user__
resp = web.Response(body=app['__templating__'].get_template(template).render(**r).encode('utf-8'))
resp.content_type = 'text/html;charset=utf-8'
return resp
if isinstance(r, int) and 100 <= r < 600:
return web.Response(status=r)
if isinstance(r, tuple) and len(r) == 2:
status, message = r
if isinstance(status, int) and 100 <= status < 600:
return web.Response(status=status, text=str(message))
# default
resp = web.Response(body=str(r).encode('utf-8'))
resp.content_type = 'text/plain;charset=utf-8'
return resp
return response
def response_factory(app,handler):
async def response(request):
logging.info('Response handler...')
r = await handler(request)
if isinstance(r,web.StreamResponse):
return r
if isinstance(r,bytes):
resp = web.Response(body=r)
resp.content_type = 'application/octet-stream'
return resp
if isinstance(r,str):
if r.startswith('redirect'):
return web.HTTPFound(r[9:])
resp = web.Response(body=r.encode('utf-8'))
resp.content_type = 'text/html;charset=utf-8'
return resp
if isinstance(r,dict):
template = r.get('__template__')
if template is None:
resp = web.Response(body=json.dumps(r,ensure_ascii=False,default=lambda o:o.__dict__).encode('utf-8'))
resp.content_type = 'application/json;charset=utf-8'
return resp
else:
resp = web.Response(body=app['__templating__'].get_template(template).render(**r).encode('utf-8'))
resp.content_type = 'text/html;charset=utf-8'
return resp
if isinstance(r,int) and r>=100 and r<600:
return web.Response(r)
if isinstance(r,tuple) and len(r) == 2:
t,m=r
if isinstance(t,int) and t>=100 and t<600:
return web.Response(t,str(m))
# default
resp = web.Response(body=str(r).encode('utf-8'))
resp.content_type = 'text/plain;charset=utf-8'
return resp
return response
#?????
def make_streaming_response(req, code, content, headers=None):
if not headers:
headers = []
response = web.StreamResponse(status=code, headers=headers)
await response.prepare(req)
for chunk in content:
response.write(chunk)
await response.drain()
return response
def response_factory(app, handler):
@asyncio.coroutine
def response(request):
logging.info('Response handler...')
r = yield from handler(request)
if isinstance(r, bytes):
res = web.Response(body = r)
res.content_type = 'application/octet-stream'
return res
if isinstance(r, str):
if r.startswith('redirect:'):
return web.HTTPFound(r[9:])
res = web.Response(body = r.encode('utf-8'))
res.content_type = 'text/html; charset=utf-8'
return res
if isinstance(r, dict):
template = r.get('__template__')
if template is None:
res = web.Response(body = json.dumps(r, ensure_ascii = False, default = lambda o: o.__dict__).encode('utf-8'))
res.content_type = 'application/json;charset=utf-8'
return res
else:
r['__user__'] = request.__user__
res = web.Response(body = app['__templating__'].get_template(template).render(**r).encode('utf-8'))
res.content_type = 'text/html;charset=utf-8'
return res
if isinstance(r, int) and r >= 100 and r < 600:
return web.Response(r)
if isinstance(r, tuple) and len(r) == 2:
t, m = r
if isinstance(t, int) and t >= 100 and t < 600:
return web.Response(t, str(m))
#default:
res = web.Request(body = str(r).encode('utf-8'))
res.content_type = 'text/plain;charset=utf-8'
return res
return response
def download_file(self, request):
file_id = request.match_info['file_id']
record = await db.tracks.find_one({ "file_id": file_id })
if not record:
return web.HTTPNotFound()
file = await self.bot.get_file(file_id)
file_path = file["file_path"]
range = request.headers.get("range")
copy_headers = ["content-length", "content-range", "etag", "last-modified"]
async with self.bot.download_file(file_path, range) as r:
# Prepare headers
resp = web.StreamResponse(status=r.status)
resp.content_type = record["mime_type"]
for h in copy_headers:
val = r.headers.get(h)
if val:
resp.headers[h] = val
await resp.prepare(request)
# Send content
while True:
chunk = await r.content.read(chunk_size)
if not chunk:
break
resp.write(chunk)
return resp
def binary(self, data, content_type='application/octet-stream', file_name=None):
self.response = web.StreamResponse()
self.response.content_length = len(data)
self.response.content_type = content_type
if file_name:
for char in '/<>:\"\'\\|?* ':
file_name = file_name.replace(char, '')
self.response.headers.add('Content-disposition',
'attachment; filename="{}"'.format(file_name))
await self.response.prepare(self.request)
self.response.write(data)
def tick(request):
if 'text/event-stream' not in request.headers.getall('ACCEPT', []):
raise HTTPNotAcceptable(reason="'text/event-stream' not found in Accept headers.")
resp = StreamResponse(
status=200,
reason='OK',
headers={
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
}
)
log.debug('opening new event stream on request: %s', request)
await resp.prepare(request)
request.app.connections.add(resp)
resp.should_stop = False
try:
while not resp.should_stop:
ts = time.monotonic()
payload = json.dumps({'data': ts})
resp.write(build_message(payload, id=ts, event='tick'))
await resp.drain()
await asyncio.sleep(1)
finally:
request.app.connections.remove(resp)
return resp
def restricted_api(handler):
@user_to_request
@wraps(handler)
async def decorator(*args):
request = _get_request(args)
if not request[cfg.REQUEST_USER_KEY]:
return json_response({'error': 'Access denied'}, status=403)
response = await handler(*args)
if not isinstance(response, StreamResponse):
response = json_response(response, dumps=json.dumps)
return response
return decorator
def response_factory(app, web.StreamResponse):
return r
elif isinstance(r, bytes):
resp = web.Response(body=r)
resp.content_type = 'application/octet-stream'
return resp
elif isinstance(r, str):
if r.startswith('redirect:'):
return web.HTTPFound(r[9:])
resp = web.Response(body=r.encode('utf-8'))
resp.content_type = 'text/html;charset=utf-8'
return resp
elif isinstance(r, dict):
template = r.get('__template__')
if template is None:
resp = web.Response(
body=json.dumps(r, default=lambda o: o.__dict__).encode('utf-8'))
resp.content_type = 'application/json;charset=utf-8'
return resp
else:
# ???jinja2????????????
r['__user__'] = request.__user__
resp = web.Response(body=app['__templating__'].get_template(template).render(**r).encode('utf-8'))
resp.content_type = 'text/html;charset=utf-8'
return resp
elif isinstance(r, int) and 100 <= r < 600:
return web.Response(status=r)
elif isinstance(r, text=str(message))
else:
resp = web.Response(body=str(r).encode('utf-8'))
resp.content_type = 'text/plain;charset=utf-8'
return resp
return response
def get_file(request: web.Request):
filename = request.match_info.get('name').strip()
filepath = os.path.join(config.args.storage, filename)
_, ext = os.path.splitext(filepath)
etag = hashlib.sha1(filename.encode('utf-8')).hexdigest()
if not os.path.exists(filepath):
raise web.HTTPNotFound()
if 'if-none-match' in request.headers:
raise web.HTTPNotModified(headers={
'ETag': etag
})
stat = os.stat(filepath)
if request.method == 'HEAD':
resp = web.Response()
else:
resp = web.StreamResponse()
resp.headers['Content-Type'] = mimetypes.types_map.get(ext, 'application/octet-stream')
resp.headers['ETag'] = etag
resp.headers['Cache-Control'] = 'max-age=31536000'
resp.headers['X-Content-SHA1'] = get_hash_from_name(filename)
resp.content_length = stat.st_size
resp.last_modified = stat.st_mtime
if request.method == 'HEAD':
return resp
yield from resp.prepare(request)
with open(filepath, 'rb') as f:
for chunk in chunks(f):
resp.write(chunk)
yield from resp.drain()
yield from resp.write_eof()
resp.force_close()
return resp
def response_factory(app, handler):
async def response_middleware(request):
r = await handler(request)
logging.info('Response handling...')
if isinstance(r, str):
if r.startswith('redirect'):
return web.HTTPFound(r[9:])
resp = web.Response(body=r.encode('utf-8'))
resp.content_type = 'text/html;charset=utf-8'
return resp
if isinstance(r, default=lambda o: o.__dict__).encode('utf-8'))
resp.content_type = 'application/json;charset=utf-8'
return resp
else:
resp = web.Response(body=app['__template_env__'].get_template(template).render(**r).encode('utf-8'))
resp.content_type = 'text/html;charset=utf-8'
return resp
if isinstance(r, str(m))
# default:
resp = web.Response(body=str(r).encode('utf-8'))
resp.content_type = 'text/plain;charset=utf-8'
return resp
return response_middleware
def hello(request):
resp = web.StreamResponse()
name = request.match_info.get('name', 'Anonymous')
answer = ('Hello,' + name).encode('utf8')
resp.content_length = len(answer)
resp.content_type = 'text/plain'
await resp.prepare(request)
await asyncio.sleep(100, loop=loop)
resp.write(answer)
await resp.write_eof()
return resp
def response_factory(app, bytes):
resp = web.Response(body=r)
resp.content_type = 'application/octect-stream'
return resp
if isinstance(r, default=lambda o:o.__dict__).encode('utf-8'))
resp.content_type = 'application/json;charset=utf-8'
return resp
else:
r['__user__'] = request.__user__
resp = web.Response(body=app['__templating__'].get_template(template).render(**r).encode('utf-8'))
resp.content_type = 'text/html;charset=utf-8'
return resp
if isinstance(r, int) and t >= 100 and t < 600:
return web.ReferenceError(t, str(m))
# default
resp = web.Response(body=str(r).encode('utf-8'))
resp.content_type = 'text/plain;charset=utf-8'
return resp
return response
def binary(self, type='application/octet-stream', *, filename: str=None):
self.response = web.StreamResponse()
self.response.content_length = len(data)
self.response.content_type = type
if filename:
self.response.headers['Content-disposition'] = 'attachment; filename="{0}"'.format(filename)
await self.response.prepare(self.request)
self.response.write(data)
def response_factory(self, app, handler):
'''
???
'''
async def response(request):
logging.info('Response handler...')
r = await handler(request)
if isinstance(r, web.StreamResponse):
return r
if isinstance(r, bytes):
resp = web.Response(body=r)
resp.content_type = 'application/octet-stream'
return resp
if isinstance(r, str):
if r.startswith('redirect:'):
return web.HTTPFound(r[9:])
resp = web.Response(body=r.encode('utf-8'))
resp.content_type = 'text/html;charset=utf-8'
return resp
if isinstance(r, dict):
template = r.get('__template__')
if template is None:
resp = web.Response(body=json.dumps(
r, default=lambda o: o.__dict__).encode('utf-8'))
resp.content_type = 'application/json;charset=utf-8'
return resp
else:
resp = web.Response(body=app['__templating__'].get_template(
template).render(**r).encode('utf-8'))
resp.content_type = 'text/html;charset=utf-8'
return resp
resp = web.Response(body=str(r).encode('utf-8'))
resp.content_type = 'text/plain;charset=utf-8'
return resp
return response
def view_image(self, request):
resp = web.StreamResponse()
resp.content_type = "image/png"
await resp.prepare(request)
resp.write(base64.b64decode(self.IMAGE))
await resp.drain()
return resp
def test_response_prepared_with_clone(loop, test_client):
@asyncio.coroutine
def handler(request):
cloned = request.clone()
resp = web.StreamResponse()
yield from resp.prepare(cloned)
return resp
app = web.Application()
app.router.add_get('/', handler)
client = yield from test_client(app)
resp = yield from client.get('/')
assert 200 == resp.status
def handle_async_mjpeg_stream(self, request):
"""Return a MJPEG stream image response directly from the camera."""
streaming_url = SYNO_API_URL.format(
self._synology_url, WEBAPI_PATH, self._streaming_path)
streaming_payload = {
'api': STREAMING_API,
'method': 'Stream',
'version': '1',
'cameraId': self._camera_id,
'format': 'mjpeg'
}
stream = None
response = None
try:
with async_timeout.timeout(TIMEOUT, loop=self.hass.loop):
stream = yield from self._websession.get(
streaming_url,
params=streaming_payload
)
response = web.StreamResponse()
response.content_type = stream.headers.get(CONTENT_TYPE_HEADER)
yield from response.prepare(request)
while True:
data = yield from stream.content.read(102400)
if not data:
break
response.write(data)
except (asyncio.TimeoutError, aiohttp.errors.ClientError):
_LOGGER.exception("Error on %s", streaming_url)
raise HTTPGatewayTimeout()
finally:
if stream is not None:
self.hass.async_add_job(stream.release())
if response is not None:
yield from response.write_eof()
def handle_async_mjpeg_stream(self, request):
"""Generate an HTTP MJPEG stream from the camera."""
# aiohttp don't support DigestAuth -> Fallback
if self._authentication == HTTP_DIGEST_AUTHENTICATION:
yield from super().handle_async_mjpeg_stream(request)
return
# connect to stream
websession = async_get_clientsession(self.hass)
stream = None
response = None
try:
with async_timeout.timeout(10, loop=self.hass.loop):
stream = yield from websession.get(self._mjpeg_url,
auth=self._auth)
response = web.StreamResponse()
response.content_type = stream.headers.get(CONTENT_TYPE_HEADER)
yield from response.prepare(request)
while True:
data = yield from stream.content.read(102400)
if not data:
break
response.write(data)
except asyncio.TimeoutError:
raise HTTPGatewayTimeout()
finally:
if stream is not None:
self.hass.async_add_job(stream.release())
if response is not None:
yield from response.write_eof()
def response_factory(app, handler):
async def response(request):
logging.info('Response handler...')
r = await handler(request)
# ???????StreamResponse?????
# #treamResponse?aiohttp??response???,?????????????
# StreamResponse??????????
if isinstance(r, web.StreamResponse):
return r
# ???????????????????body?????????????
if isinstance(r, bytes):
resp = web.Response(body=r)
resp.content_type = 'application/octet-stream'
return resp
# ??????????
if isinstance(r, str):
# ??????????????????????????
if r.startswith('redirect:'):
return web.HTTPFound(r[9:]) # ??r??????"redirect:"??
# ???utf8?????????????html?
resp = web.Response(body=r.encode('utf-8'))
resp.content_type = 'text/html;charset=utf-8'
return resp
# ???????????????jinja2????????jinja2.env
if isinstance(r, dict):
template = r.get('__template__')
# ????????????????json?????????????json
if template is None:
resp = web.Response(body=json.dumps(r, default=lambda o: o.__dict__).encode('utf-8'))
resp.content_type = 'application/json;charset=utf-8'
return resp
else:
r["__user__"] = request.__user__ # ??__user__,?????????????????
resp = web.Response(body=app['__templating__'].get_template(template).render(**r).encode('utf-8'))
resp.content_type = 'text/html;charset=utf-8'
return resp
# ?????????????100?600??
# ???r??????404?500?
if isinstance(r, int) and r >= 100 and r < 600:
return web.Response(r)
# ??????????2???
# ????????????100?600??
# ?t?http????m????????????????
if isinstance(r, str(m))
# ????????????????????????
resp = web.Response(body=str(r).encode('utf-8'))
resp.content_type = 'text/plain;charset=utf-8'
return resp
#??6?if?????????????????????????????????????????????????????????????????
#????if?????????????????????????????
return response
# ??????????????????????????????
def response_factory(app, handler):
"""
????????,???????????web.Response?????,?????aiohttp???
:param app: WEB????
:param handler: ??????
:return: ???????
"""
async def response(request):
r = await handler(request)
# ??????????web.Response??,?????
if isinstance(r, web.StreamResponse):
return r
# ????????????,???????
if isinstance(r, dict):
template_file_name = r.get('__template__')
# ??????????__template__??????????????????json??
# ?????????__template__?????HTML??
if template_file_name is None:
json_data = json.dumps(r, default=lambda o: o.__dict__)
resp = web.Response(body=json_data.encode('utf-8'))
resp.content_type = 'application/json;charset=utf-8'
return resp
else:
types = await BlogType.find_all(order_by='level asc')
r['blog_types'] = types
# ??????????
r['__user__'] = request.__user__
# ??????
r['domain_name'] = configs.domain_name
r['website_name'] = configs.website_name
r['ICP_NO'] = configs.ICP_NO
r['github'] = configs.github
templating_env = app['__templating__']
template = templating_env.get_template(template_file_name)
resp = web.Response(body=template.render(**r).encode('utf-8'))
resp.content_type = 'text/html;charset=utf-8'
return resp
return response
def response_factory(app, handler):
@asyncio.coroutine
def response(request):
logging.info('Response handler....')
r = yield from handler(request)
if isinstance(r, bytes):
response = web.Response(body=r)
response.content_type = 'application/octet-stream'
return response
if isinstance(r, str):
if r.startswith('redirect:'):
return web.HTTPFound(r[9:])
response = web.Response(body=r.encode('utf-8'))
response.content_type = 'text/html;charset=utf-8'
return response
if isinstance(r, dict):
template = r.get('__template__')
if template is None:
response = web.Response(
body=json.dumps(
r, default=lambda d: d.__dict__
).encode('utf-8'))
response.content_type = 'application/json;charset=utf-8'
return response
else:
response = web.Response(
body=app['__templating__']
.get_template(template)
.render(**r)
.encode('utf-8')
)
response.content_type = 'text/html;charset=utf-8'
return response
if isinstance(r, str(m))
response = web.Response(body=str(r).encode('utf-8'))
response.content_type = 'text/plain;charset=utf-8'
return response
return response
def response_factory(app, handler):
'''
middleware(???)??url????????????????web.Response
'''
async def response(request):
logging.info('Response handler...')
r = await handler(request)
if isinstance(r, bytes):
resp = web.Response(body = r)
resp.content_type = 'application/octet-stream'
return resp
if isinstance(r, str):
if r.startswith('redirect:'):
return web.HTTPFound(r[9:])
resp = web.Response(body = r.encode('utf-8'))
resp.content_type = 'text/html;charset=utf-8'
return resp
if isinstance(r, dict):
template = r.get('__template__')
if template is None:
# __dict__: ????????????
# default????????
resp = web.Response(body = json.dumps(r, default = lambda o: o.__dict__).encode('utf-8'))
resp.content_type = 'application/json;charset=utf-8'
return resp
else:
r['__user__'] = request.__user__
resp = web.Response(body = app['__templating__'].get_template(template).render(**r).encode('utf-8'))
resp.content_type = 'text/html;charset=utf-8'
return resp
if isinstance(r, int) and r >= 100 and r < 600:
# status = r
return web.Response(r)
if isinstance(r, str(m))
# default:
resp = web.Response(body = str(r).encode('utf-8'))
resp.content_type = 'text/plain;charset=utf-8'
return resp
return response
def response_factory(app, str(m))
# ????????????????????????
resp = web.Response(body=str(r).encode('utf-8'))
resp.content_type = 'text/plain;charset=utf-8'
return resp
#??6?if?????????????????????????????????????????????????????????????????
#????if?????????????????????????????
return response
# ??????????????????????????????
def request_handler_factory(view, handler):
"""Factory to wrap our handler classes."""
assert asyncio.iscoroutinefunction(handler) or is_callback(handler), \
"Handler should be a coroutine or a callback."
@asyncio.coroutine
def handle(request):
"""Handle incoming request."""
if not request.app['hass'].is_running:
return web.Response(status=503)
remote_addr = get_real_ip(request)
authenticated = request.get(KEY_AUTHENTICATED, False)
if view.requires_auth and not authenticated:
yield from process_wrong_login(request)
_LOGGER.warning('Login attempt or request with an invalid '
'password from %s', remote_addr)
persistent_notification.async_create(
request.app['hass'],
'Invalid password used from {}'.format(remote_addr),
'Login attempt Failed', NOTIFICATION_ID_LOGIN)
raise HTTPUnauthorized()
_LOGGER.info('Serving %s to %s (auth: %s)',
request.path, remote_addr, authenticated)
result = handler(request, **request.match_info)
if asyncio.iscoroutine(result):
result = yield from result
if isinstance(result, web.StreamResponse):
# The method handler returned a ready-made Response,how nice of it
return result
status_code = 200
if isinstance(result, tuple):
result, status_code = result
if isinstance(result, str):
result = result.encode('utf-8')
elif result is None:
result = b''
elif not isinstance(result, bytes):
assert False, ('Result should be None,string,bytes or Response. '
'Got: {}').format(result)
return web.Response(body=result, status=status_code)
return handle
def get(self, request):
"""Provide a streaming interface for the event bus."""
# pylint: disable=no-self-use
hass = request.app['hass']
stop_obj = object()
to_write = asyncio.Queue(loop=hass.loop)
restrict = request.GET.get('restrict')
if restrict:
restrict = restrict.split(',') + [EVENT_HOMEASSISTANT_STOP]
@asyncio.coroutine
def forward_events(event):
"""Forward events to the open request."""
if event.event_type == EVENT_TIME_CHANGED:
return
if restrict and event.event_type not in restrict:
return
_LOGGER.debug('STREAM %s FORWARDING %s', id(stop_obj), event)
if event.event_type == EVENT_HOMEASSISTANT_STOP:
data = stop_obj
else:
data = json.dumps(event, cls=rem.JSONEncoder)
yield from to_write.put(data)
response = web.StreamResponse()
response.content_type = 'text/event-stream'
yield from response.prepare(request)
unsub_stream = hass.bus.async_listen(MATCH_ALL, forward_events)
try:
_LOGGER.debug('STREAM %s ATTACHED', id(stop_obj))
# Fire off one message so browsers fire open event right away
yield from to_write.put(STREAM_PING_PAYLOAD)
while True:
try:
with async_timeout.timeout(STREAM_PING_INTERVAL,
loop=hass.loop):
payload = yield from to_write.get()
if payload is stop_obj:
break
msg = "data: {}\n\n".format(payload)
_LOGGER.debug('STREAM %s WRITING %s',
msg.strip())
response.write(msg.encode("UTF-8"))
yield from response.drain()
except asyncio.TimeoutError:
yield from to_write.put(STREAM_PING_PAYLOAD)
finally:
_LOGGER.debug('STREAM %s RESPONSE CLOSED', id(stop_obj))
unsub_stream()
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。