# coding: utf-8 import asyncore import email import os import shutil import smtpd import sys from StringIO import StringIO import tempfile import threading from django.conf import settings from django.core import mail from django.core.mail import EmailMessage, mail_admins, mail_managers, EmailMultiAlternatives from django.core.mail import send_mail, send_mass_mail from django.core.mail.backends import console, dummy, locmem, filebased, smtp from django.core.mail.message import BadHeaderError from django.test import TestCase from django.utils.translation import ugettext_lazy from django.utils.functional import wraps def alter_django_settings(**kwargs): oldvalues = {} nonexistant = [] for setting, newvalue in kwargs.iteritems(): try: oldvalues[setting] = getattr(settings, setting) except AttributeError: nonexistant.append(setting) setattr(settings, setting, newvalue) return oldvalues, nonexistant def restore_django_settings(state): oldvalues, nonexistant = state for setting, oldvalue in oldvalues.iteritems(): setattr(settings, setting, oldvalue) for setting in nonexistant: delattr(settings, setting) def with_django_settings(**kwargs): def decorator(test): @wraps(test) def decorated_test(self): state = alter_django_settings(**kwargs) try: return test(self) finally: restore_django_settings(state) return decorated_test return decorator class MailTests(TestCase): """ Non-backend specific tests. """ def test_ascii(self): email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com']) message = email.message() self.assertEqual(message['Subject'].encode(), 'Subject') self.assertEqual(message.get_payload(), 'Content') self.assertEqual(message['From'], 'from@example.com') self.assertEqual(message['To'], 'to@example.com') def test_multiple_recipients(self): email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com', 'other@example.com']) message = email.message() self.assertEqual(message['Subject'].encode(), 'Subject') self.assertEqual(message.get_payload(), 'Content') self.assertEqual(message['From'], 'from@example.com') self.assertEqual(message['To'], 'to@example.com, other@example.com') def test_header_injection(self): email = EmailMessage('Subject\nInjection Test', 'Content', 'from@example.com', ['to@example.com']) self.assertRaises(BadHeaderError, email.message) email = EmailMessage(ugettext_lazy('Subject\nInjection Test'), 'Content', 'from@example.com', ['to@example.com']) self.assertRaises(BadHeaderError, email.message) def test_space_continuation(self): """ Test for space continuation character in long (ascii) subject headers (#7747) """ email = EmailMessage('Long subject lines that get wrapped should use a space continuation character to get expected behaviour in Outlook and Thunderbird', 'Content', 'from@example.com', ['to@example.com']) message = email.message() self.assertEqual(message['Subject'], 'Long subject lines that get wrapped should use a space continuation\n character to get expected behaviour in Outlook and Thunderbird') def test_message_header_overrides(self): """ Specifying dates or message-ids in the extra headers overrides the default values (#9233) """ headers = {"date": "Fri, 09 Nov 2001 01:08:47 -0000", "Message-ID": "foo"} email = EmailMessage('subject', 'content', 'from@example.com', ['to@example.com'], headers=headers) self.assertEqual(email.message().as_string(), 'Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: subject\nFrom: from@example.com\nTo: to@example.com\ndate: Fri, 09 Nov 2001 01:08:47 -0000\nMessage-ID: foo\n\ncontent') def test_from_header(self): """ Make sure we can manually set the From header (#9214) """ email = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'}) message = email.message() self.assertEqual(message['From'], 'from@example.com') def test_multiple_message_call(self): """ Regression for #13259 - Make sure that headers are not changed when calling EmailMessage.message() """ email = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'}) message = email.message() self.assertEqual(message['From'], 'from@example.com') message = email.message() self.assertEqual(message['From'], 'from@example.com') def test_unicode_address_header(self): """ Regression for #11144 - When a to/from/cc header contains unicode, make sure the email addresses are parsed correctly (especially with regards to commas) """ email = EmailMessage('Subject', 'Content', 'from@example.com', ['"Firstname Sürname" ', 'other@example.com']) self.assertEqual(email.message()['To'], '=?utf-8?q?Firstname_S=C3=BCrname?= , other@example.com') email = EmailMessage('Subject', 'Content', 'from@example.com', ['"Sürname, Firstname" ', 'other@example.com']) self.assertEqual(email.message()['To'], '=?utf-8?q?S=C3=BCrname=2C_Firstname?= , other@example.com') def test_unicode_headers(self): email = EmailMessage(u"Gżegżółka", "Content", "from@example.com", ["to@example.com"], headers={"Sender": '"Firstname Sürname" ', "Comments": 'My Sürname is non-ASCII'}) message = email.message() self.assertEqual(message['Subject'], '=?utf-8?b?R8W8ZWfFvMOzxYJrYQ==?=') self.assertEqual(message['Sender'], '=?utf-8?q?Firstname_S=C3=BCrname?= ') self.assertEqual(message['Comments'], '=?utf-8?q?My_S=C3=BCrname_is_non-ASCII?=') def test_safe_mime_multipart(self): """ Make sure headers can be set with a different encoding than utf-8 in SafeMIMEMultipart as well """ headers = {"Date": "Fri, 09 Nov 2001 01:08:47 -0000", "Message-ID": "foo"} subject, from_email, to = 'hello', 'from@example.com', '"Sürname, Firstname" ' text_content = 'This is an important message.' html_content = '

This is an important message.

' msg = EmailMultiAlternatives('Message from Firstname Sürname', text_content, from_email, [to], headers=headers) msg.attach_alternative(html_content, "text/html") msg.encoding = 'iso-8859-1' self.assertEqual(msg.message()['To'], '=?iso-8859-1?q?S=FCrname=2C_Firstname?= ') self.assertEqual(msg.message()['Subject'].encode(), u'=?iso-8859-1?q?Message_from_Firstname_S=FCrname?=') def test_encoding(self): """ Regression for #12791 - Encode body correctly with other encodings than utf-8 """ email = EmailMessage('Subject', 'Firstname Sürname is a great guy.', 'from@example.com', ['other@example.com']) email.encoding = 'iso-8859-1' message = email.message() self.assertTrue(message.as_string().startswith('Content-Type: text/plain; charset="iso-8859-1"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: Subject\nFrom: from@example.com\nTo: other@example.com')) self.assertEqual(message.get_payload(), 'Firstname S=FCrname is a great guy.') # Make sure MIME attachments also works correctly with other encodings than utf-8 text_content = 'Firstname Sürname is a great guy.' html_content = '

Firstname Sürname is a great guy.

' msg = EmailMultiAlternatives('Subject', text_content, 'from@example.com', ['to@example.com']) msg.encoding = 'iso-8859-1' msg.attach_alternative(html_content, "text/html") self.assertEqual(msg.message().get_payload(0).as_string(), 'Content-Type: text/plain; charset="iso-8859-1"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\n\nFirstname S=FCrname is a great guy.') self.assertEqual(msg.message().get_payload(1).as_string(), 'Content-Type: text/html; charset="iso-8859-1"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\n\n

Firstname S=FCrname is a great guy.

') def test_attachments(self): """Regression test for #9367""" headers = {"Date": "Fri, 09 Nov 2001 01:08:47 -0000", "Message-ID": "foo"} subject, from_email, to = 'hello', 'from@example.com', 'to@example.com' text_content = 'This is an important message.' html_content = '

This is an important message.

' msg = EmailMultiAlternatives(subject, text_content, from_email, [to], headers=headers) msg.attach_alternative(html_content, "text/html") msg.attach("an attachment.pdf", "%PDF-1.4.%...", mimetype="application/pdf") msg_str = msg.message().as_string() message = email.message_from_string(msg_str) self.assertTrue(message.is_multipart()) self.assertEqual(message.get_content_type(), 'multipart/mixed') self.assertEqual(message.get_default_type(), 'text/plain') payload = message.get_payload() self.assertEqual(payload[0].get_content_type(), 'multipart/alternative') self.assertEqual(payload[1].get_content_type(), 'application/pdf') def test_dummy_backend(self): """ Make sure that dummy backends returns correct number of sent messages """ connection = dummy.EmailBackend() email = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'}) self.assertEqual(connection.send_messages([email, email, email]), 3) def test_arbitrary_keyword(self): """ Make sure that get_connection() accepts arbitrary keyword that might be used with custom backends. """ c = mail.get_connection(fail_silently=True, foo='bar') self.assertTrue(c.fail_silently) def test_custom_backend(self): """Test custom backend defined in this suite.""" conn = mail.get_connection('regressiontests.mail.custombackend.EmailBackend') self.assertTrue(hasattr(conn, 'test_outbox')) email = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'}) conn.send_messages([email]) self.assertEqual(len(conn.test_outbox), 1) def test_backend_arg(self): """Test backend argument of mail.get_connection()""" self.assertTrue(isinstance(mail.get_connection('django.core.mail.backends.smtp.EmailBackend'), smtp.EmailBackend)) self.assertTrue(isinstance(mail.get_connection('django.core.mail.backends.locmem.EmailBackend'), locmem.EmailBackend)) self.assertTrue(isinstance(mail.get_connection('django.core.mail.backends.dummy.EmailBackend'), dummy.EmailBackend)) self.assertTrue(isinstance(mail.get_connection('django.core.mail.backends.console.EmailBackend'), console.EmailBackend)) tmp_dir = tempfile.mkdtemp() try: self.assertTrue(isinstance(mail.get_connection('django.core.mail.backends.filebased.EmailBackend', file_path=tmp_dir), filebased.EmailBackend)) finally: shutil.rmtree(tmp_dir) self.assertTrue(isinstance(mail.get_connection(), locmem.EmailBackend)) @with_django_settings( EMAIL_BACKEND='django.core.mail.backends.locmem.EmailBackend', ADMINS=[('nobody', 'nobody@example.com')], MANAGERS=[('nobody', 'nobody@example.com')]) def test_connection_arg(self): """Test connection argument to send_mail(), et. al.""" mail.outbox = [] # Send using non-default connection connection = mail.get_connection('regressiontests.mail.custombackend.EmailBackend') send_mail('Subject', 'Content', 'from@example.com', ['to@example.com'], connection=connection) self.assertEqual(mail.outbox, []) self.assertEqual(len(connection.test_outbox), 1) self.assertEqual(connection.test_outbox[0].subject, 'Subject') connection = mail.get_connection('regressiontests.mail.custombackend.EmailBackend') send_mass_mail([ ('Subject1', 'Content1', 'from1@example.com', ['to1@example.com']), ('Subject2', 'Content2', 'from2@example.com', ['to2@example.com']), ], connection=connection) self.assertEqual(mail.outbox, []) self.assertEqual(len(connection.test_outbox), 2) self.assertEqual(connection.test_outbox[0].subject, 'Subject1') self.assertEqual(connection.test_outbox[1].subject, 'Subject2') connection = mail.get_connection('regressiontests.mail.custombackend.EmailBackend') mail_admins('Admin message', 'Content', connection=connection) self.assertEqual(mail.outbox, []) self.assertEqual(len(connection.test_outbox), 1) self.assertEqual(connection.test_outbox[0].subject, '[Django] Admin message') connection = mail.get_connection('regressiontests.mail.custombackend.EmailBackend') mail_managers('Manager message', 'Content', connection=connection) self.assertEqual(mail.outbox, []) self.assertEqual(len(connection.test_outbox), 1) self.assertEqual(connection.test_outbox[0].subject, '[Django] Manager message') class BaseEmailBackendTests(object): email_backend = None def setUp(self): self.__settings_state = alter_django_settings(EMAIL_BACKEND=self.email_backend) def tearDown(self): restore_django_settings(self.__settings_state) def assertStartsWith(self, first, second): if not first.startswith(second): self.longMessage = True self.assertEqual(first[:len(second)], second, "First string doesn't start with the second.") def get_mailbox_content(self): raise NotImplementedError def flush_mailbox(self): raise NotImplementedError def get_the_message(self): mailbox = self.get_mailbox_content() self.assertEqual(len(mailbox), 1, "Expected exactly one message, got %d.\n%r" % (len(mailbox), [ m.as_string() for m in mailbox])) return mailbox[0] def test_send(self): email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com']) num_sent = mail.get_connection().send_messages([email]) self.assertEqual(num_sent, 1) message = self.get_the_message() self.assertEqual(message["subject"], "Subject") self.assertEqual(message.get_payload(), "Content") self.assertEqual(message["from"], "from@example.com") self.assertEqual(message.get_all("to"), ["to@example.com"]) def test_send_many(self): email1 = EmailMessage('Subject', 'Content1', 'from@example.com', ['to@example.com']) email2 = EmailMessage('Subject', 'Content2', 'from@example.com', ['to@example.com']) num_sent = mail.get_connection().send_messages([email1, email2]) self.assertEqual(num_sent, 2) messages = self.get_mailbox_content() self.assertEquals(len(messages), 2) self.assertEqual(messages[0].get_payload(), "Content1") self.assertEqual(messages[1].get_payload(), "Content2") def test_send_verbose_name(self): email = EmailMessage("Subject", "Content", '"Firstname Sürname" ', ["to@example.com"]) email.send() message = self.get_the_message() self.assertEqual(message["subject"], "Subject") self.assertEqual(message.get_payload(), "Content") self.assertEqual(message["from"], "=?utf-8?q?Firstname_S=C3=BCrname?= ") @with_django_settings(ADMINS=[('nobody', 'nobody+admin@example.com')], MANAGERS=[('nobody', 'nobody+manager@example.com')]) def test_manager_and_admin_mail_prefix(self): """ String prefix + lazy translated subject = bad output Regression for #13494 """ mail_managers(ugettext_lazy('Subject'), 'Content') message = self.get_the_message() self.assertEqual(message.get('subject'), '[Django] Subject') self.flush_mailbox() mail_admins(ugettext_lazy('Subject'), 'Content') message = self.get_the_message() self.assertEqual(message.get('subject'), '[Django] Subject') @with_django_settings(ADMINS=(), MANAGERS=()) def test_empty_admins(self): """ Test that mail_admins/mail_managers doesn't connect to the mail server if there are no recipients (#9383) """ mail_admins('hi', 'there') self.assertEqual(self.get_mailbox_content(), []) mail_managers('hi', 'there') self.assertEqual(self.get_mailbox_content(), []) def test_idn_send(self): """ Regression test for #14301 """ self.assertTrue(send_mail('Subject', 'Content', 'from@öäü.com', [u'to@öäü.com'])) message = self.get_the_message() self.assertEqual(message.get('subject'), 'Subject') self.assertEqual(message.get('from'), 'from@xn--4ca9at.com') self.assertEqual(message.get('to'), 'to@xn--4ca9at.com') self.flush_mailbox() m = EmailMessage('Subject', 'Content', 'from@öäü.com', [u'to@öäü.com']) m.send() message = self.get_the_message() self.assertEqual(message.get('subject'), 'Subject') self.assertEqual(message.get('from'), 'from@xn--4ca9at.com') self.assertEqual(message.get('to'), 'to@xn--4ca9at.com') def test_recipient_without_domain(self): """ Regression test for #15042 """ self.assertTrue(send_mail("Subject", "Content", "tester", ["django"])) message = self.get_the_message() self.assertEqual(message.get('subject'), 'Subject') self.assertEqual(message.get('from'), "tester") self.assertEqual(message.get('to'), "django") class LocmemBackendTests(BaseEmailBackendTests, TestCase): email_backend = 'django.core.mail.backends.locmem.EmailBackend' def get_mailbox_content(self): return [m.message() for m in mail.outbox] def flush_mailbox(self): mail.outbox = [] def tearDown(self): super(LocmemBackendTests, self).tearDown() mail.outbox = [] def test_locmem_shared_messages(self): """ Make sure that the locmen backend populates the outbox. """ connection = locmem.EmailBackend() connection2 = locmem.EmailBackend() email = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'}) connection.send_messages([email]) connection2.send_messages([email]) self.assertEqual(len(mail.outbox), 2) class FileBackendTests(BaseEmailBackendTests, TestCase): email_backend = 'django.core.mail.backends.filebased.EmailBackend' def setUp(self): super(FileBackendTests, self).setUp() self.tmp_dir = tempfile.mkdtemp() self.__settings_state = alter_django_settings(EMAIL_FILE_PATH=self.tmp_dir) def tearDown(self): restore_django_settings(self.__settings_state) shutil.rmtree(self.tmp_dir) super(FileBackendTests, self).tearDown() def flush_mailbox(self): for filename in os.listdir(self.tmp_dir): os.unlink(os.path.join(self.tmp_dir, filename)) def get_mailbox_content(self): messages = [] for filename in os.listdir(self.tmp_dir): session = open(os.path.join(self.tmp_dir, filename)).read().split('\n' + ('-' * 79) + '\n') messages.extend(email.message_from_string(m) for m in session if m) return messages def test_file_sessions(self): """Make sure opening a connection creates a new file""" msg = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'}) connection = mail.get_connection() connection.send_messages([msg]) self.assertEqual(len(os.listdir(self.tmp_dir)), 1) message = email.message_from_file(open(os.path.join(self.tmp_dir, os.listdir(self.tmp_dir)[0]))) self.assertEqual(message.get_content_type(), 'text/plain') self.assertEqual(message.get('subject'), 'Subject') self.assertEqual(message.get('from'), 'from@example.com') self.assertEqual(message.get('to'), 'to@example.com') connection2 = mail.get_connection() connection2.send_messages([msg]) self.assertEqual(len(os.listdir(self.tmp_dir)), 2) connection.send_messages([msg]) self.assertEqual(len(os.listdir(self.tmp_dir)), 2) msg.connection = mail.get_connection() self.assertTrue(connection.open()) msg.send() self.assertEqual(len(os.listdir(self.tmp_dir)), 3) msg.send() self.assertEqual(len(os.listdir(self.tmp_dir)), 3) class ConsoleBackendTests(BaseEmailBackendTests, TestCase): email_backend = 'django.core.mail.backends.console.EmailBackend' def setUp(self): super(ConsoleBackendTests, self).setUp() self.__stdout = sys.stdout self.stream = sys.stdout = StringIO() def tearDown(self): del self.stream sys.stdout = self.__stdout del self.__stdout super(ConsoleBackendTests, self).tearDown() def flush_mailbox(self): self.stream = sys.stdout = StringIO() def get_mailbox_content(self): messages = self.stream.getvalue().split('\n' + ('-' * 79) + '\n') return [email.message_from_string(m) for m in messages if m] def test_console_stream_kwarg(self): """ Test that the console backend can be pointed at an arbitrary stream. """ s = StringIO() connection = mail.get_connection('django.core.mail.backends.console.EmailBackend', stream=s) send_mail('Subject', 'Content', 'from@example.com', ['to@example.com'], connection=connection) self.assertTrue(s.getvalue().startswith('Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: Subject\nFrom: from@example.com\nTo: to@example.com\nDate: ')) class FakeSMTPServer(smtpd.SMTPServer, threading.Thread): """ Asyncore SMTP server wrapped into a thread. Based on DummyFTPServer from: http://svn.python.org/view/python/branches/py3k/Lib/test/test_ftplib.py?revision=86061&view=markup """ def __init__(self, *args, **kwargs): threading.Thread.__init__(self) smtpd.SMTPServer.__init__(self, *args, **kwargs) self._sink = [] self.active = False self.active_lock = threading.Lock() self.sink_lock = threading.Lock() def process_message(self, peer, mailfrom, rcpttos, data): m = email.message_from_string(data) maddr = email.Utils.parseaddr(m.get('from'))[1] if mailfrom != maddr: return "553 '%s' != '%s'" % (mailfrom, maddr) self.sink_lock.acquire() self._sink.append(m) self.sink_lock.release() def get_sink(self): self.sink_lock.acquire() try: return self._sink[:] finally: self.sink_lock.release() def flush_sink(self): self.sink_lock.acquire() self._sink[:] = [] self.sink_lock.release() def start(self): assert not self.active self.__flag = threading.Event() threading.Thread.start(self) self.__flag.wait() def run(self): self.active = True self.__flag.set() while self.active and asyncore.socket_map: self.active_lock.acquire() asyncore.loop(timeout=0.1, count=1) self.active_lock.release() asyncore.close_all() def stop(self): assert self.active self.active = False self.join() class SMTPBackendTests(BaseEmailBackendTests, TestCase): email_backend = 'django.core.mail.backends.smtp.EmailBackend' def setUp(self): super(SMTPBackendTests, self).setUp() self.server = FakeSMTPServer(('127.0.0.1', 0), None) self.settings = alter_django_settings( EMAIL_HOST="127.0.0.1", EMAIL_PORT=self.server.socket.getsockname()[1]) self.server.start() def tearDown(self): self.server.stop() super(SMTPBackendTests, self).tearDown() def flush_mailbox(self): self.server.flush_sink() def get_mailbox_content(self): return self.server.get_sink()