Python email.parser 模块,BytesParser() 实例源码
我们从Python开源项目中,提取了以下10个代码示例,用于说明如何使用email.parser.BytesParser()。
def split_email(self, raw_email):
parsed_email = BytesParser().parsebytes(raw_email)
to_keep = []
attachments = []
if parsed_email.is_multipart():
for p in parsed_email.get_payload():
if p.get_filename():
filename = decode_header(p.get_filename())
if filename[0][1]:
filename = filename[0][0].decode(filename[0][1])
else:
filename = filename[0][0]
attachments.append(File(p.get_payload(decode=True), filename))
else:
to_keep.append(p)
else:
to_keep.append(parsed_email.get_payload())
return to_keep, attachments, parsed_email
def message_from_bytes(s, *args, **kws):
"""Parse a bytes string into a Message object model.
Optional _class and strict are passed to the Parser constructor.
"""
from email.parser import BytesParser
return BytesParser(*args, **kws).parsebytes(s)
def message_from_binary_file(fp, **kws):
"""Read a binary file and parse its contents into a Message object model.
Optional _class and strict are passed to the Parser constructor.
"""
from email.parser import BytesParser
return BytesParser(*args, **kws).parse(fp)
def get_content(self, raw):
data = base64.urlsafe_b64decode(raw)
email_parser = EmailParser(policy=policy.default)
email = email_parser.parsebytes(data)
plain = email.get_body(preferencelist=('plain',))
body = None
if plain:
body = plain.get_payload()
email_dict = dict(email)
email_dict['body'] = body
return email_dict
def _format_denied_recipients(self, original_mail: bytes, recipients: Sequence[str]) -> bytes:
parser = BytesParser()
# Todo: fix type annotation when typeshed has better stubs
msg = None # type: Message
msg = parser.parsebytes(original_mail, True) # type: ignore
msg["Subject"] = "[mailforwarder error] Re: %s" % msg["Subject"]
msg["To"] = msg["From"]
msg["From"] = "mailforwarder bounce <>"
rcptlist = ""
for rcpt in recipients:
rcptlist = "%s\n%s" % (" * %s" % rcpt, rcptlist,)
txt = denied_recipients_template.format(rcptlist=rcptlist)
msg.set_payload(txt, charset='utf-8')
return msg.as_bytes(policy=policy.SMTP)
def add_received_header(self, peer: Tuple[str, int], msg: bytes, channel: PatchedSMTPChannel) -> bytes:
parser = BytesParser(_class=SaneMessage)
# Todo: remove type annotation when issue is fixed
new_msg = None # type: SaneMessage
new_msg = parser.parsebytes(msg) # type: ignore
new_msg.prepend_header("Received",
"from %s (%s:%s)\r\n\tby %s (%s [%s:%s]) with SMTP;\r\n\t%s" %
(channel.seen_greeting, peer[0], peer[1], self.server_name, self.daemon_name,
self._localaddr[0], self._localaddr[1],
timezone.Now().strftime("%a,%d %b %Y %H:%M:%s %z (%Z)")))
return new_msg.as_bytes(policy=policy.SMTP)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。