# -*- coding: utf-8; -*-
################################################################################
#
# Rattail -- Retail Software Framework
# Copyright © 2010-2023 Lance Edgar
#
# This file is part of Rattail.
#
# Rattail is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# Rattail is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# Rattail. If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
"""
Email Framework
"""
import importlib
import os
import smtplib
import logging
import warnings
from email.charset import Charset
from email.message import Message
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
from email.mime.text import MIMEText
import six
from mako.template import Template
from mako.lookup import TemplateLookup
from mako.exceptions import TopLevelLookupException
from rattail import exceptions
from rattail.core import UNSPECIFIED
from rattail.files import resource_path
from rattail.util import load_entry_points
from rattail.time import localtime, make_utc
# NOTE: this bit of magic was stolen from Django
# Don't BASE64-encode UTF-8 messages so that we avoid unwanted attention from
# some spam filters.
utf8_charset = Charset('utf-8')
utf8_charset.body_encoding = None # Python defaults to BASE64
log = logging.getLogger(__name__)
[docs]
def send_email(config, key, data={}, attachments=[],
fallback_key=None, default_subject=None,
enabled=UNSPECIFIED, **kwargs):
"""
Send an email message of the given type, per config, with the given data
and/or attachments.
"""
app = config.get_app()
handler = app.get_email_handler()
email = handler.get_email(key, fallback_key=fallback_key,
default_subject=default_subject)
if enabled is UNSPECIFIED:
enabled = handler.email_is_enabled(email)
if enabled:
kwargs['attachments'] = attachments
handler.send_message(email, data, **kwargs)
else:
log.debug("skipping email of type '%s' per config", key)
[docs]
class EmailHandler(object):
"""
Base class and default implementation for email handlers.
"""
def __init__(self, config):
self.config = config
self.app = config.get_app()
self.enum = self.config.get_enum()
# only assign model attribute if db libs are installed
try:
import sqlalchemy
except ImportError:
pass
else:
self.model = self.config.get_model()
[docs]
def use_entry_points(self):
"""
Returns a boolean indicating if the app should get its list of
available emails from the setuptools entry points, instead of
the (legacy) ``[rattail.mail] emails`` config setting.
"""
return self.config.getbool('rattail.mail', 'emails.use_entry_points',
default=True)
[docs]
def should_record_attempts(self):
"""
Returns a boolean indicating if the app should make a record
of all attempts to send email.
"""
return self.config.getbool('rattail.mail', 'record_attempts',
default=False)
[docs]
def should_send_email_on_failure(self):
"""
Returns a boolean indicating if the app should send an email
alert whenever sending of an email message fails.
"""
return self.config.getbool('rattail.mail', 'send_email_on_failure',
default=False)
[docs]
def get_all_emails(self, **kwargs):
"""
Retrieve the complete set of all Email profiles exposed by the
app and installed packages.
"""
# TODO: maybe deprecate this at some point?
if not self.use_entry_points():
# nb. this is the legacy behavior
return dict([(email.key or email.__name__, email)
for email in self.iter_emails(warn=False)])
# this is the future..
emails = {}
# first get *all* email modules, per entry points
all_modules = load_entry_points('rattail.emails').values()
# but maybe the app config says only to use certain modules
app_modules = self.config.getlist('rattail.mail', 'emails.modules')
if not app_modules:
app_modules = self.config.getlist('rattail.mail', 'emails',
ignore_ambiguous=True)
if app_modules:
warnings.warn(f"URGENT: instead of 'rattail.mail.emails', "
f"you should set 'rattail.mail.emails.modules'",
DeprecationWarning, stacklevel=2)
# figure out which modules we should actually use
if app_modules:
modules = [importlib.import_module(m)
for m in app_modules]
else:
modules = all_modules
# and collect all email profiles from those modules
for module in modules:
mod_emails = self.get_emails_from_module(module)
emails.update(dict([(email.key or email.__name__, email)
for email in mod_emails]))
return emails
[docs]
def get_emails_from_module(self, module, **kwargs):
"""
Return all Email profiles from the given Python module.
"""
emails = []
for name in dir(module):
obj = getattr(module, name)
if (isinstance(obj, type) and issubclass(obj, Email)
and not obj.abstract and obj is not Email):
emails.append(obj)
return emails
[docs]
def get_available_emails(self, **kwargs):
"""
Retrieve the set of "available" Email profiles, i.e. those
which should be exposed to the app users.
"""
emails = self.get_all_emails()
for key in list(emails):
if not self.email_is_available(key):
emails.pop(key)
return emails
[docs]
def iter_emails(self, warn=True):
"""
Iterate over all available email types.
"""
if warn:
warnings.warn("`iter_emails()` method is deprecated! please "
"use `get_available_emails()` instead",
DeprecationWarning, stacklevel=2)
for module in self.config.getlist('rattail.mail', 'emails', default=['rattail.emails']):
module = importlib.import_module(module)
for email in self.get_emails_from_module(module):
yield email
[docs]
def get_email(self, key, fallback_key=None, **kwargs):
"""
Return an email instance of the given type.
"""
factory = Email
emails = self.get_all_emails()
if key in emails:
factory = emails[key]
return factory(self.config, key, fallback_key, **kwargs)
[docs]
def email_is_available(self, email):
"""
Returns boolean indicating if the Email profile for the given
key is "available" and should be exposed to users.
"""
return not self.email_is_hidden(email)
[docs]
def email_is_hidden(self, email):
"""
Returns boolean indicating if the given Email profile is
"hidden" and should not be exposed to users.
:param email: Either an :class:`Email` instance, or the
:attr:`~Email.key` value for one.
"""
key = getattr(email, 'key', email)
hidden = self.config.getbool('rattail.mail',
'{}.hidden'.format(key),
default=False)
return hidden
[docs]
def email_is_enabled(self, email):
"""
Returns boolean indicating if the given Email profile is
"enabled" - that is, emails of its type should be sent as
opposed to being suppressed.
:param email: An :class:`Email` instance.
"""
return email.get_enabled()
def get_enabled(self, email):
warnings.warn("`get_enabled()` method is deprecated! please "
"use `email_is_enabled()` instead",
DeprecationWarning, stacklevel=2)
return self.email_is_enabled(email)
def send_message(self, email, data, **kwargs):
msg = self.make_message(email, data, **kwargs)
if self.should_record_attempts():
attempt = self.record_attempt(email, msg)
try:
self.deliver_message(email, msg)
except Exception as e:
self.record_failure(email, msg, attempt, e)
raise
else:
self.record_success(email, msg, attempt)
else: # don't record attempts
self.deliver_message(email, msg)
def record_attempt(self, email, msg):
model = self.model
session = self.app.make_session()
attempt = model.EmailAttempt()
attempt.key = email.key
attempt.sender = msg['From']
attempt.to = msg.get_all('To')
attempt.cc = msg.get_all('Cc')
attempt.bcc = msg.get_all('Bcc')
attempt.subject = msg['Subject']
attempt.sent = make_utc()
attempt.status_code = self.enum.EMAIL_ATTEMPT_CREATED
session.add(attempt)
session.commit()
session.close()
# session.expunge(attempt)
return attempt
def record_failure(self, email, message, attempt, error):
session = self.app.make_session()
attempt = session.merge(attempt)
attempt.status_code = self.enum.EMAIL_ATTEMPT_FAILURE
attempt.status_text = str(error)
session.commit()
session.close()
if self.should_send_email_on_failure():
self.app.send_email('sendmail_failure', {
'email': email,
'message': message,
'attempt': attempt,
'error': error,
})
def record_success(self, email, message, attempt):
session = self.app.make_session()
attempt = session.merge(attempt)
attempt.status_code = self.enum.EMAIL_ATTEMPT_SUCCESS
session.commit()
session.close()
def make_message(self, email, data, **kwargs):
context = self.make_context(**data)
return email.make_message(context, **kwargs)
def make_context(self, **context):
context['rattail_config'] = self.config
context['app'] = self.app
context['app_title'] = self.config.app_title(default="Rattail")
context['localtime'] = localtime
return context
[docs]
def deliver_message(self, email, msg, recipients=UNSPECIFIED):
"""
Deliver an email message using the given SMTP configuration.
"""
if recipients is UNSPECIFIED:
recips = set()
# nb. we use original recip lists, contained in X_RATTAIL
# headers, instead of the simple strings which e.g. the
# To: header contains. but we also then remove custom
# headers b/c they don't belong in the final msg
to = msg['X_RATTAIL_TO']
del msg['X_RATTAIL_TO']
if to:
recips = recips.union(set(to))
cc = msg['X_RATTAIL_CC']
del msg['X_RATTAIL_CC']
if cc:
recips = recips.union(set(cc))
bcc = msg['X_RATTAIL_BCC']
del msg['X_RATTAIL_BCC']
if bcc:
recips = recips.union(set(bcc))
else:
recips = set(recipients)
if not recips:
raise RuntimeError("No recipients for email: {0}".format(repr(msg)))
server = self.config.get('rattail.mail', 'smtp.server', default='localhost')
username = self.config.get('rattail.mail', 'smtp.username')
password = self.config.get('rattail.mail', 'smtp.password')
if self.config.getbool('rattail.mail', 'send_feedback_only', usedb=False, default=False):
send = email.key == 'user_feedback'
else:
send = self.config.getbool('rattail.mail', 'send_emails', usedb=False, default=True)
if send:
log.debug("attempting to send mail of type: %s", email.key)
log.debug("connecting to server: %s", server)
session = smtplib.SMTP(server)
if username and password:
result = session.login(username, password)
log.debug("login() result is: %s", repr(result))
result = session.sendmail(msg['From'], recips, msg.as_string())
log.debug("sendmail() result is: %s", repr(result))
session.quit()
return True
log.debug("config says no emails for '%s', but would have sent one to: %s", email.key, recips)
return False
def get_email_handler(config):
"""
Creates and returns the configured mail handler.
"""
warnings.warn("`get_email_handler()` is deprecated! please "
"use `AppHandler.get_email_handler()` instead",
DeprecationWarning, stacklevel=2)
app = config.get_app()
return app.get_email_handler()
[docs]
class Email(object):
# Note: The docstring of an email is leveraged by code, hence this odd one.
"""
(This email has no description.)
"""
key = None
fallback_key = None
abstract = False
default_prefix = "[rattail]"
default_subject = "Automated message"
universal_subject = "Automated message"
# Whether or not the email's :attr:`to` attribute is dynamically determined
# at run-time, i.e. via some logic other than typical reading from config.
dynamic_to = False
dynamic_to_help = None
def __init__(self, config, key=None, fallback_key=None, default_subject=None):
self.config = config
self.app = config.get_app()
self.enum = config.get_enum()
# only assign model attribute if db libs are installed
try:
import sqlalchemy
except ImportError:
pass
else:
self.model = self.config.get_model()
if key:
self.key = key
elif not self.key:
self.key = self.__class__.__name__
if self.key == 'Email':
raise exceptions.ConfigurationError("Email instance has no key: {0}".format(repr(self)))
if fallback_key:
self.fallback_key = fallback_key
if default_subject:
self.default_subject = default_subject
templates = config.getlist('rattail.mail', 'templates')
if templates:
templates = [resource_path(p) for p in templates]
self.txt_templates = TemplateLookup(directories=templates)
self.html_templates = TemplateLookup(directories=templates,
# nb. this escapes HTML special chars
default_filters=['h'])
def obtain_sample_data(self, request):
"""
This method is responsible for obtaining the full set of sample data,
to be used as context when generating a preview for the email.
Note, you normally should not override this method! Please see also
the :meth:`sample_data()` method.
"""
return self.sample_data(request)
def sample_data(self, request):
"""
This method can return a dict of sample data, to be used as context
when generating a preview for the email. Subclasses are welcome to
override this method.
"""
return {}
def get_enabled(self):
"""
Get the enabled flag for the email's message type.
"""
enabled = self.config.getbool('rattail.mail', '{0}.enabled'.format(self.key))
if enabled is not None:
return enabled
enabled = self.config.getbool('rattail.mail', 'default.enabled')
if enabled is not None:
return enabled
return self.config.getbool('rattail.mail', 'send_emails', default=True)
def get_sender(self):
"""
Returns the value for the message's ``From:`` header.
:rtype: str
"""
sender = self.config.get('rattail.mail', '{0}.from'.format(self.key))
if not sender:
sender = self.config.get('rattail.mail', 'default.from')
if not sender:
raise exceptions.SenderNotFound(self.key)
return sender
def get_replyto(self):
"""
Get the Reply-To address for the message.
"""
replyto = self.config.get('rattail.mail', '{0}.replyto'.format(self.key))
if not replyto:
replyto = self.config.get('rattail.mail', 'default.replyto')
return replyto
def get_recips(self, type_='to'):
"""
Returns a list of recipients of the given type for the message.
:param type_: Must be one of: ``('to', 'cc', 'bcc', 'all')``.
If you specify ``'all'`` then all recipients will be
returned as a single list, with no indication as to which
"slot" (to, cc, bcc) each came from.
:rtype: list
"""
try:
if type_.lower() not in ('to', 'cc', 'bcc', 'all'):
raise Exception
except:
raise ValueError("Recipient type must be one of ('to', 'cc', 'bcc', 'all'); "
"not: {0}".format(repr(type_)))
type_ = type_.lower()
if type_ == 'all':
recips = []
for typ in ('to', 'cc', 'bcc'):
recips.extend(self.get_recips(typ))
else:
recips = self.config.getlist('rattail.mail', '{0}.{1}'.format(self.key, type_))
if not recips:
recips = self.config.getlist('rattail.mail', 'default.{0}'.format(type_))
return recips or []
def get_prefix(self, data={}, magic=True):
"""
Returns a string to be used as the subject prefix for the message.
:rtype: str
"""
# first look for prefix specific to this email type
prefix = self.config.get('rattail.mail', '{0}.prefix'.format(self.key))
if not prefix:
# next look for global default prefix
prefix = self.config.get('rattail.mail', 'default.prefix')
if not prefix:
# otherwise just use app node title
prefix = self.config.node_title()
if prefix:
prefix = "[{}]".format(prefix)
# fall back to hard-coded default if no prefix yet
# TODO: not sure that's possible since we try node title above
prefix = prefix or self.default_prefix
# force the [DEV] prefix unless running in production
if magic and not self.config.production():
prefix = "[DEV] {}".format(prefix)
return prefix
def get_default_subject(self, **data):
return self.default_subject
def get_subject_template(self, **data):
"""
Returns the template to be used to build the subject.
"""
# use explicitly configured subject if there is one
template = self.config.get('rattail.mail', '{}.subject'.format(self.key))
if template:
return template
# or if this email defines a custom default, use that
default = self.get_default_subject(**data)
if default != self.universal_subject:
return default
# otherwise fall back to the global configured default
return self.config.get('rattail.mail', 'default.subject',
default=self.universal_subject)
def get_subject(self, data={}, render=True, template=UNSPECIFIED):
"""
Returns the base value for the message's subject header, i.e. minus
prefix.
:rtype: str
"""
if template is UNSPECIFIED:
template = self.get_subject_template(**data)
if template and render:
return Template(template).render(**data)
return template
def get_complete_subject(self, data={}, render=True, prefix=UNSPECIFIED, template=UNSPECIFIED):
"""
Returns the value for the message's ``Subject:`` header, i.e. the base
subject with the prefix applied. Note that config may provide the
complete subject also, in which case the prefix and base subject are
not considered.
:rtype: str
"""
if prefix is UNSPECIFIED:
prefix = self.get_prefix(data)
prefix = (prefix or "").rstrip()
if prefix:
prefix = "{} ".format(prefix)
return "{}{}".format(prefix, self.get_subject(data, render=render, template=template))
def get_template(self, type_):
"""
Locate and return the Mako email template of the given type
(e.g. 'html'), or ``None`` if no such template can be found.
"""
if type_ == 'txt':
templates = self.txt_templates
else:
assert type_ == 'html'
templates = self.html_templates
try:
return templates.get_template(f'{self.key}.{type_}.mako')
except TopLevelLookupException:
if self.fallback_key:
try:
return templates.get_template(f'{self.fallback_key}.{type_}.mako')
except TopLevelLookupException:
pass
def normalize_attachments(self, attachments):
normalized = []
for attachment in attachments:
if isinstance(attachment, str):
attachment = self.normalize_attachment(attachment)
normalized.append(attachment)
return normalized
ATTACHMENT_MIME_MAP = {
'.doc': 'application/msword',
'.pdf': 'pdf',
'.xls': 'vnd.ms-excel',
'.xlsx': 'vnd.openxmlformats-officedocument.spreadsheetml.sheet',
}
def normalize_attachment(self, path):
root, ext = os.path.splitext(path)
ext = ext.lower()
if ext == '.csv':
with open(path, 'rb') as f:
part = MIMEText(f.read(), 'csv', 'utf_8')
filename = os.path.basename(path)
part.add_header('Content-Disposition', 'attachment; filename="{}"'.format(filename))
return part
else:
mimetype = self.ATTACHMENT_MIME_MAP.get(ext)
if mimetype:
with open(path, 'rb') as f:
part = MIMEApplication(f.read(), mimetype)
filename = os.path.basename(path)
part.add_header('Content-Disposition', 'attachment; filename="{}"'.format(filename))
return part
raise ValueError("Magic is not (yet) supported, please construct your own attachments for file: {}".format(path))
def make_message(self, data={}, attachments=[], inlines=[],
subject_prefix=UNSPECIFIED, subject_template=UNSPECIFIED,
sender=UNSPECIFIED, replyto=UNSPECIFIED,
to=UNSPECIFIED, cc=UNSPECIFIED, bcc=UNSPECIFIED):
"""
Returns a proper email ``Message`` instance which may be sent via SMTP.
"""
txt_template = self.get_template('txt')
html_template = self.get_template('html')
attachments = self.normalize_attachments(attachments)
# TODO: provide more defaults?
data.setdefault('six', six)
data.setdefault('app', self.app)
if txt_template and html_template:
txt_part = MIMEText(txt_template.render(**data), _charset='utf_8')
html_part = MIMEText(html_template.render(**data), _subtype='html', _charset='utf_8')
if inlines:
html_part = MIMEMultipart(_subtype='related', _subparts=[html_part] + inlines)
msg = MIMEMultipart(_subtype='alternative', _subparts=[txt_part, html_part])
if attachments:
msg = MIMEMultipart(_subtype='mixed', _subparts=[msg] + attachments)
elif txt_template:
msg = MIMEText(txt_template.render(**data), _charset='utf_8')
if attachments:
msg = MIMEMultipart(_subtype='mixed', _subparts=[msg] + attachments)
elif html_template:
msg = SafeMIMEText(html_template.render(**data), 'html', utf8_charset)
if inlines:
msg = MIMEMultipart(_subtype='related', _subparts=[msg] + inlines)
if attachments:
msg = MIMEMultipart(_subtype='mixed', _subparts=[msg] + attachments)
else:
raise exceptions.MailTemplateNotFound(self.key)
self.add_headers(msg, data=data,
subject_prefix=subject_prefix, subject_template=subject_template,
sender=sender, replyto=replyto, to=to, cc=cc, bcc=bcc)
return msg
def add_headers(self, msg, data={},
subject_prefix=UNSPECIFIED, subject_template=UNSPECIFIED,
sender=UNSPECIFIED, replyto=UNSPECIFIED,
to=UNSPECIFIED, cc=UNSPECIFIED, bcc=UNSPECIFIED):
"""
Adds headers for to/from addresses etc. to message
"""
# subject/from
msg['Subject'] = self.get_complete_subject(data, prefix=subject_prefix,
template=subject_template)
if sender is UNSPECIFIED:
sender = self.get_sender()
msg['From'] = sender
# reply-to
if replyto is UNSPECIFIED:
replyto = self.get_replyto()
if replyto:
msg.add_header('Reply-To', replyto)
# recipients
force_to = self.config.getlist('rattail.mail', 'force_to', usedb=False)
if force_to:
to = force_to
cc = None
bcc = None
else:
if to is UNSPECIFIED:
to = self.get_recips('to')
if cc is UNSPECIFIED:
cc = self.get_recips('cc')
if bcc is UNSPECIFIED:
bcc = self.get_recips('bcc')
if not (to or cc or bcc):
raise exceptions.RecipientsNotFound(self.key)
# nb. we preserve the original lists within X_RATTAIL headers,
# for easier SMTP sending later on
if to:
msg['To'] = ', '.join(to)
msg['X_RATTAIL_TO'] = to
if cc:
msg['Cc'] = ', '.join(cc)
msg['X_RATTAIL_CC'] = cc
if bcc:
msg['Bcc'] = ', '.join(bcc)
msg['X_RATTAIL_BCC'] = bcc
# NOTE: this bit of magic was stolen from Django
class SafeMIMEText(MIMEText):
def __init__(self, text, subtype, charset):
self.encoding = charset
if charset == 'utf-8':
# Unfortunately, Python doesn't support setting a Charset instance
# as MIMEText init parameter (http://bugs.python.org/issue16324).
# We do it manually and trigger re-encoding of the payload.
MIMEText.__init__(self, text, subtype, None)
del self['Content-Transfer-Encoding']
# TODO: i don't personally need this yet, if ever?
# # Workaround for versions without http://bugs.python.org/issue19063
# if (3, 2) < sys.version_info < (3, 3, 4):
# payload = text.encode(utf8_charset.output_charset)
# self._payload = payload.decode('ascii', 'surrogateescape')
# self.set_charset(utf8_charset)
# else:
# self.set_payload(text, utf8_charset)
self.set_payload(text, utf8_charset)
self.replace_header('Content-Type', 'text/%s; charset="%s"' % (subtype, charset))
else:
MIMEText.__init__(self, text, subtype, charset)