Python requests_toolbelt 模块,MultipartEncoder() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用requests_toolbelt.MultipartEncoder()。
def send_image_local(recipient_id, image_path):
data = {
'recipient': str({
'id': recipient_id
}),
'message': str({
'attachment': {
'type': 'image',
'payload': {
}
}
}),
'filedata': (os.path.basename(image_path), open(image_path, 'rb'),
'image/png')
}
data = MultipartEncoder(data)
params = {
'access_token': os.environ["PAGE_ACCESS_TOKEN"]
}
headers = {
'Content-Type': data.content_type
}
r = requests.post("https://graph.facebook.com/v2.6/me/messages",
params=params, headers=headers, data=data).json()
def uploadPhoto(self, photo, caption = None, upload_id = None):
if upload_id is None:
upload_id = str(int(time.time() * 1000))
data = {
'upload_id' : upload_id,
'_uuid' : self.uuid,
'_csrftoken' : self.token,
'image_compression' : '{"lib_name":"jt","lib_version":"1.3.0","quality":"87"}',
'photo' : ('pending_media_%s.jpg'%upload_id, open(photo, 'application/octet-stream', {'Content-transfer-encoding':'binary'})
}
m = MultipartEncoder(data, boundary=self.uuid)
self.s.headers.update ({'X-IG-Capabilities' : '3Q4=',
'X-IG-Connection-Type' : 'WIFI',
'Cookie2' : '$Version=1',
'Accept-Language' : 'en-US',
'Accept-Encoding' : 'gzip,deflate',
'Content-type': m.content_type,
'Connection' : 'close',
'User-Agent' : self.USER_AGENT})
response = self.s.post(self.API_URL + "upload/photo/", data=m.to_string())
if response.status_code == 200:
if self.configure(upload_id, caption):
self.expose()
return False
def uploadPhoto(self, caption):
self.expose()
return False
def uploadPhoto(self, boundary=self.uuid)
self.session.headers.update ({'X-IG-Capabilities' : '3Q4=',
'User-Agent' : self.USER_AGENT})
response = self.session.post(self.API_URL + "upload/photo/", caption):
self.expose()
return False
def sendSparkPOST(url, data):
m = MultipartEncoder(fields=data)
request = requests.post(url, data=m,
headers={"Content-Type": m.content_type,
"Authorization": "Bearer "+bearer})
return request
def test_basic_multiple(self):
first = ValueTarget()
second = ValueTarget()
third = ValueTarget()
encoder = MultipartEncoder(fields={
'first': 'foo',
'second': 'bar',
'third': 'baz'
})
parser = StreamingFormDataParser(
headers={'Content-Type': encoder.content_type})
parser.register('first', first)
parser.register('second', second)
parser.register('third', third)
parser.data_received(encoder.to_string())
self.assertEqual(first.value, b'foo')
self.assertEqual(second.value, b'bar')
self.assertEqual(third.value, b'baz')
def test_chunked_single(self):
expected_value = 'hello world'
target = ValueTarget()
encoder = MultipartEncoder(fields={'value': expected_value})
body = encoder.to_string()
parser = StreamingFormDataParser(
headers={'Content-Type': encoder.content_type})
parser.register('value', target)
index = body.index(b'world')
parser.data_received(body[:index])
parser.data_received(body[index:])
self.assertEqual(target.value, expected_value.encode('utf-8'))
def main():
args = parse_args()
with open(args.filename, 'rb') as file_:
content_type = mimetypes.guess_type(args.filename)[0]
fields = {
'name': 'hello world',
'lines': 'first line\r\n\r\nsecond line',
'file': (args.filename, file_, content_type)
}
body = MultipartEncoder(fields=fields).to_string()
if args.decode:
print(body.decode('utf-8'))
else:
print(body)
def http_send_attachmail(to, cc, sub, content, filelist=[], mail_format="html", mail_from=mail_from):
attachNum = str(len(filelist))
attachs = {}
i = 1
for attach in filelist:
idx = 'attach' + str(i)
attachs[idx] = (attach, open(attach, "rb"))
i+=1
fields = {"tos":to, "cc":cc, "subject":sub, "content":content, "from":mail_from, "format":mail_format, "attachNum":attachNum}
fields = dict(fields, **attachs)
m = MultipartEncoder(fields)
headers = {"content-type":m.content_type}
r = requests.post(mail_api, headers=headers)
ret = r.json()
status = str(ret['status']) + "-" + ret['msg']
sendlog(status, to + "|cc:" +cc, sub)
return ret
def upload_data(gpu_ip, job_hash, data_path):
url = 'http://%s:%s/runJobDecorator' % (gpu_ip, settings.GPU_PORT)
file_size = path.getsize(data_path)
pbar = tqdm(total=file_size, unit='B', unit_scale=True)
def callback(monitor):
progress = monitor.bytes_read - callback.last_bytes_read
pbar.update(progress)
callback.last_bytes_read = monitor.bytes_read
callback.last_bytes_read = 0
with open(data_path, 'rb') as f:
data = {
'file': ('uploads.pkl', f, 'application/octet-stream'),
'hash': job_hash
}
encoder = MultipartEncoder(
fields=data
)
monitor = MultipartEncoderMonitor(encoder, callback)
r = requests.post(url, data=monitor, headers={
'Content-Type': monitor.content_type})
remove(data_path)
# pbar might not close when the user interrupts,need to fix this
pbar.close()
status_check(r)
def do_artifacts_artifact_add(opts):
logging.debug('add artifact %r', opts)
url = artifacts_url(opts.service)
image = {
'name': opts.name,
'description': opts.description,
}
# build contents of multipart/form-data,image Meta must come first,hence
# we use an OrderedDict to preserve the order
files = OrderedDict()
for k, v in image.items():
files[k] = (None, io.StringIO(v))
# followed by firmware data
# but first,try to find out the size of firmware data
files['size'] = str(os.stat(opts.infile).st_size)
files['artifact'] = (opts.infile, open(opts.infile, "application/octet-stream", {})
encoder = MultipartEncoder(files)
if sys.stderr.isatty():
try:
from requests_toolbelt import MultipartEncoderMonitor
from clint.textui.progress import Bar as ProgressBar
pb = ProgressBar(expected_size=encoder.len, filled_char='=', every=1024*1024)
monitor = MultipartEncoderMonitor(encoder,
lambda mon: pb.show(mon.bytes_read))
encoder = monitor
except ImportError:
pass
with api_from_opts(opts) as api:
rsp = api.post(url, data=encoder,
headers={'Content-Type': encoder.content_type})
if rsp.status_code == 201:
# created
location = rsp.headers.get('Location', '')
print("created with URL: {}".format(location))
print('artifact ID: ', location.rsplit('/')[-1])
else:
errorprinter(rsp)
def send_attachment(self, recipient_id, attachment_type, attachment_path,
notification_type=NotificationType.regular):
"""Send an attachment to the specified recipient using local path.
Input:
recipient_id: recipient id to send to
attachment_type: type of attachment (image,video,audio,file)
attachment_path: Path of attachment
Output:
Response from API as <dict>
"""
payload = {
'recipient': {
{
'id': recipient_id
}
},
'notification_type': notification_type,
'message': {
{
'attachment': {
'type': attachment_type,
'payload': {}
}
}
},
'filedata': (os.path.basename(attachment_path), open(attachment_path, 'rb'))
}
multipart_data = MultipartEncoder(payload)
multipart_header = {
'Content-Type': multipart_data.content_type
}
return requests.post(self.graph_url, data=multipart_data,
params=self.auth_args, headers=multipart_header).json()
def post_file(session, domain='', file='', login=LOGIN):
""" posts file to the cloud's upload server
param: file - string filename with path
"""
assert domain is not None, 'no domain'
assert file is not None, 'no file'
filetype = guess_type(file)[0]
if not filetype:
filetype = DEFAULT_FILETYPE
if LOGGER:
LOGGER.warning('File {} type is unkNown,using default: {}'.format(file, DEFAULT_FILETYPE))
filename = os.path.basename(file)
quoted_login = quote_plus(login)
timestamp = str(int(time.mktime(datetime.datetime.Now().timetuple()))) + TIME_AMEND
url = urljoin(domain, '?cloud_domain=' + str(CLOUD_DOMAIN_ORD) + '&x-email=' + quoted_login + '&fileapi' + timestamp)
m = MultipartEncoder(fields={'file': (quote_plus(filename), open(file, filetype)})
try:
r = session.post(url, headers={'Content-Type': m.content_type}, verify = VERIFY_SSL)
except Exception as e:
if LOGGER:
LOGGER.error('Post file HTTP request error: {}'.format(e))
return (None, None)
if r.status_code == requests.codes.ok:
if len(r.content):
hash = r.content[:40].decode()
size = int(r.content[41:-2])
return (hash, size)
elif LOGGER:
LOGGER.error('File {} post error,no hash and size received'.format(file))
elif LOGGER:
LOGGER.error('File {} post error,http code: {},msg: {}'.format(file, r.status_code, r.text))
return (None, None)
def __repr__(self):
return '<MultipartEncoder: {0!r}>'.format(self.fields)
def __init__(self, encoder, callback=None):
#: Instance of the :class:`MultipartEncoder` being monitored
self.encoder = encoder
#: Optionally function to call after a read
self.callback = callback or IDENTITY
#: Number of bytes already read from the :class:`MultipartEncoder`
#: instance
self.bytes_read = 0
#: Avoid the same problem in bug #80
self.len = self.encoder.len
def from_fields(cls, fields, boundary=None, encoding='utf-8',
callback=None):
encoder = MultipartEncoder(fields, boundary, encoding)
return cls(encoder, callback)
def __repr__(self):
return '<MultipartEncoder: {0!r}>'.format(self.fields)
def __init__(self, callback=None):
#: Instance of the :class:`MultipartEncoder` being monitored
self.encoder = encoder
#: Optionally function to call after a read
self.callback = callback or IDENTITY
#: Number of bytes already read from the :class:`MultipartEncoder`
#: instance
self.bytes_read = 0
#: Avoid the same problem in bug #80
self.len = self.encoder.len
def send_image(self, image_path):
'''Send an image to the specified recipient.
Image must be PNG or JPEG or GIF (more might be supported).
https://developers.facebook.com/docs/messenger-platform/send-api-reference/image-attachment
Input:
recipient_id: recipient id to send to
image_path: path to image to be sent
Output:
Response from API as <dict>
'''
payload = {
'recipient': json.dumps(
{
'id': recipient_id
}
),
'message': json.dumps(
{
'attachment': {
'type': 'image',
'payload': {}
}
}
),
'filedata': (image_path, 'rb'))
}
multipart_data = MultipartEncoder(payload)
multipart_header = {
'Content-Type': multipart_data.content_type
}
return requests.post(self.base_url, headers=multipart_header).json()
def send_audio(self, audio_path):
'''Send audio to the specified recipient.
Audio must be MP3 or WAV
https://developers.facebook.com/docs/messenger-platform/send-api-reference/audio-attachment
Input:
recipient_id: recipient id to send to
audio_path: path to audio to be sent
Output:
Response from API as <dict>
'''
payload = {
'recipient': json.dumps(
{
'id': recipient_id
}
),
'message': json.dumps(
{
'attachment': {
'type': 'audio',
'filedata': (audio_path, headers=multipart_header).json()
def send_file(self, file_path):
'''Send file to the specified recipient.
https://developers.facebook.com/docs/messenger-platform/send-api-reference/file-attachment
Input:
recipient_id: recipient id to send to
file_path: path to file to be sent
Output:
Response from API as <dict>
'''
payload = {
'recipient': json.dumps(
{
'id': recipient_id
}
),
'message': json.dumps(
{
'attachment': {
'type': 'file',
'filedata': (file_path, headers=multipart_header).json()
def create_upload(self, filepath):
return MultipartEncoder({
'file': (os.path.basename(filepath), open(filepath, self.file_type)
})
def _upload(title, author, text,
author_url='', tph_uuid=None, page_id=None, user_agent=default_user_agent, convert_html=True,
clean_html=True):
if not title:
raise TitlerequiredError('Title is required')
if not text:
raise TextrequiredError('Text is required')
content = convert_html_to_telegraph_format(text, clean_html) if convert_html else text
cookies = dict(tph_uuid=tph_uuid) if tph_uuid and page_id else None
fields = {
'Data': ('content.html', 'plain/text'),
'title': title,
'author': author,
'author_url': author_url,
'page_id': page_id or '0'
}
m = MultipartEncoder(fields, boundary='TelegraPhBoundary21')
headers = {
'Content-Type': m.content_type,
'Accept': 'application/json,text/javascript,*/*; q=0.01',
'User-Agent': user_agent
}
r = requests.Session()
r.mount('https://', requests.adapters.HTTPAdapter(max_retries=3))
response = r.post(save_url, timeout=4, cookies=cookies, data=m.to_string())
result = json.loads(response.text)
if 'path' in result:
result['tph_uuid'] = response.cookies.get('tph_uuid') or tph_uuid
result['url'] = base_url + '/' + result['path']
return result
else:
error_msg = result['error'] if 'error' in result else ''
raise TelegraphError(error_msg)
def handle(self, *args, **options):
self.stdout.write("Uploading database to central server...\n")
encoder = MultipartEncoder({
"project": options['project'],
"file": ("db.sqlite3", open(DB_PATH, "rb"), "application/octet-stream")
})
monitor = MultipartEncoderMonitor(encoder, create_callback(encoder))
r = requests.post(CENTRAL_SERVER_DB_UPLOAD_URL, headers={"Content-Type": monitor.content_type})
print("\nUpload finished! (Returned status {0} {1})".format(r.status_code, r.reason))
def api_upload(service, encData, encMeta, keys):
'''
Uploads data to Send.
Caution! Data is uploaded as given,this function will not encrypt it for you
'''
service += 'api/upload'
files = requests_toolbelt.MultipartEncoder(fields={'file': ('blob', 'application/octet-stream') })
pbar = progbar(files.len)
monitor = requests_toolbelt.MultipartEncoderMonitor(files, lambda files: pbar.update(monitor.bytes_read - pbar.n))
headers = {
'X-File-Metadata' : unpadded_urlsafe_b64encode(encMeta),
'Authorization' : 'send-v1 ' + unpadded_urlsafe_b64encode(keys.authKey),
'Content-type' : monitor.content_type
}
r = requests.post(service, stream=True)
r.raise_for_status()
pbar.close()
body_json = r.json()
secretUrl = body_json['url'] + '#' + unpadded_urlsafe_b64encode(keys.secretKey)
fileId = body_json['id']
fileNonce = unpadded_urlsafe_b64decode(r.headers['WWW-Authenticate'].replace('send-v1 ', ''))
try:
owner_token = body_json['owner']
except:
owner_token = body_json['delete']
return secretUrl, fileId, fileNonce, owner_token
def __repr__(self):
return '<MultipartEncoder: {0!r}>'.format(self.fields)
def __init__(self, callback=None):
#: Instance of the :class:`MultipartEncoder` being monitored
self.encoder = encoder
#: Optionally function to call after a read
self.callback = callback or IDENTITY
#: Number of bytes already read from the :class:`MultipartEncoder`
#: instance
self.bytes_read = 0
#: Avoid the same problem in bug #80
self.len = self.encoder.len
def from_fields(cls, callback=None):
encoder = MultipartEncoder(fields, callback)
def generate_payload(event):
"""
returns a file-like MultipartEncoder instance that can be used to write-out the multi-part request headers and body
:param event: dict payload to send as "Metadata" part in multi-part request
:return: MultipartEncoder
"""
return MultipartEncoder({"Metadata": (None, io.BytesIO(json.dumps(event).encode()), 'application/json')})
def sendSparkPOST(url,
"Authorization": "Bearer "+bearer})
return request
def post_update(context, update):
"""
Updates a Cisco Spark room
:param context: button state and configuration
:type context: ``dict``
:param update: content of the update to be posted there
:type update: ``str`` or ``dict``
If the update is a simple string,it is sent as such to Cisco Spark.
Else if it a dictionary,then it is encoded as MIME Multipart.
"""
logging.info("Posting update to Cisco Spark room")
url = 'https://api.ciscospark.com/v1/messages'
headers = {'Authorization': 'Bearer '+context['spark']['CISCO_SPARK_BTTN_BOT']}
if isinstance(update, dict):
update['roomId'] = context['spark']['id']
payload = MultipartEncoder(fields=update)
headers['Content-Type'] = payload.content_type
else:
payload = {'roomId': context['spark']['id'], 'text': update }
response = requests.post(url=url, data=payload)
if response.status_code != 200:
logging.info(response.json())
raise Exception("Received error code {}".format(response.status_code))
logging.info('- done,check the room with Cisco Spark client software')
#
# handle Twilio API
#
def load_file(path):
_, filename = os.path.split(path)
with open(path, 'rb') as file_:
fields = {
filename: (filename, 'text/plain')
}
encoder = MultipartEncoder(fields=fields)
return (encoder.content_type, encoder.to_string())
def test_smoke(self):
encoder = MultipartEncoder(fields={'name': 'hello'})
parser = StreamingFormDataParser(
headers={'Content-Type': encoder.content_type})
parser.data_received(encoder.to_string())
def test_basic_single(self):
target = ValueTarget()
encoder = MultipartEncoder(fields={'value': 'hello world'})
parser = StreamingFormDataParser(
headers={'Content-Type': encoder.content_type})
parser.register('value', target)
parser.data_received(encoder.to_string())
self.assertEqual(target.value, b'hello world')
def test_break_chunk_at_boundary(self):
expected_first_value = 'hello' * 500
expected_second_value = 'hello' * 500
first = ValueTarget()
second = ValueTarget()
encoder = MultipartEncoder(fields={
'first': 'hello' * 500,
'second': 'hello' * 500
})
body = encoder.to_string()
boundary = encoder.boundary.encode('utf-8')
parser = StreamingFormDataParser(
headers={'Content-Type': encoder.content_type})
parser.register('first', second)
index = body[50:].index(boundary) + 5
parser.data_received(body[:index])
parser.data_received(body[index:])
self.assertEqual(first.value, expected_first_value.encode('utf-8'))
self.assertEqual(second.value, expected_second_value.encode('utf-8'))
def test_mixed_content_varying_chunk_size(self):
with open(data_file_path('file.txt'), 'rb') as file_:
expected_value = file_.read()
with open(data_file_path('file.txt'), 'rb') as file_:
fields = {
'name': 'hello world',
'age': '10',
'cv.txt': ('file.txt', 'text/plain')
}
encoder = MultipartEncoder(fields=fields)
body = encoder.to_string()
content_type = encoder.content_type
for index in range(len(body)):
name = ValueTarget()
age = ValueTarget()
cv = ValueTarget()
parser = StreamingFormDataParser(
headers={'Content-Type': content_type})
parser.register('name', name)
parser.register('age', age)
parser.register('cv.txt', cv)
parser.data_received(body[:index])
parser.data_received(body[index:])
self.assertEqual(name.value, b'hello world')
self.assertEqual(age.value, b'10')
self.assertEqual(cv.value, expected_value)
def test_parameter_contains_crlf(self):
target = ValueTarget()
encoder = MultipartEncoder(fields={'value': 'hello\r\nworld'})
parser = StreamingFormDataParser(
headers={'Content-Type': encoder.content_type})
parser.register('value', target)
parser.data_received(encoder.to_string())
self.assertEqual(target.value, b'hello\r\nworld')
def test_parameter_ends_with_crlf(self):
target = ValueTarget()
encoder = MultipartEncoder(fields={'value': 'hello\r\n'})
parser = StreamingFormDataParser(
headers={'Content-Type': encoder.content_type})
parser.register('value', b'hello\r\n')
def test_parameter_starts_with_crlf(self):
target = ValueTarget()
encoder = MultipartEncoder(fields={'value': '\r\nworld'})
parser = StreamingFormDataParser(
headers={'Content-Type': encoder.content_type})
parser.register('value', b'\r\nworld')
def test_register_after_data_received(self):
encoder = MultipartEncoder(fields={'name': 'hello'})
parser = StreamingFormDataParser(
headers={'Content-Type': encoder.content_type})
parser.data_received(encoder.to_string())
self.assertRaises(ParseFailedException, parser.register,
'name', ValueTarget())
def main():
args = parse_args()
with open(args.filename, 'rb') as fd:
encoder = MultipartEncoder(fields={
'file': ('file', fd, args.content_type)
})
parser = StreamingFormDataParser(
headers={'Content-Type': encoder.content_type})
parser.register('file', ValueTarget())
parser.data_received(encoder.to_string())
def _import_file(self, filename):
if not self.api_key:
key = self.device.keygen()
else:
key = self.api_key
params = {
'type': 'import',
'category': 'configuration',
'key': key
}
path = os.path.basename(filename)
mef = requests_toolbelt.MultipartEncoder(
fields={
'file': (path, open(filename, 'application/octet-stream')
}
)
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
url = 'https://{0}/api/'.format(self.hostname)
request = requests.post(
url,
verify=False,
params=params,
headers={'Content-Type': mef.content_type},
data=mef
)
# if something goes wrong just raise an exception
request.raise_for_status()
response = xml.etree.ElementTree.fromstring(request.content)
if response.attrib['status'] == 'error':
return False
else:
return path
def __repr__(self):
return '<MultipartEncoder: {0!r}>'.format(self.fields)
def __init__(self, callback=None):
#: Instance of the :class:`MultipartEncoder` being monitored
self.encoder = encoder
#: Optionally function to call after a read
self.callback = callback or IDENTITY
#: Number of bytes already read from the :class:`MultipartEncoder`
#: instance
self.bytes_read = 0
#: Avoid the same problem in bug #80
self.len = self.encoder.len
def uploadPhoto(self, upload_id = None, is_sidecar = None):
if upload_id is None:
upload_id = str(int(time.time() * 1000))
data = {
'upload_id' : upload_id, {'Content-transfer-encoding':'binary'})
}
if is_sidecar:
data['is_sidecar']='1'
m = MultipartEncoder(data, caption):
self.expose()
return False
def __repr__(self):
return '<MultipartEncoder: {0!r}>'.format(self.fields)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。