Sending Emails Using asyncio and aiohttp From a Django Application

Hi everyone!

I develop and support the notification service at Ostrovok.ru. The service is written in Python3 and Django. In addition to transactional emails, push notifications, and messages, the service also takes care of mass emailing of marketing offers (not spam! trust me, unsubscribe works better than subscribe on our service) for users who have given their consent. Over time, the database of active recipients grew to more than a million email addresses, which the email service was not ready for. I want to talk about how new Python features allowed us to speed up mass emailing and save resources, and what problems we encountered when working with them.

The original implementation

Initially, we implemented mass emailing with the simplest solution: for each recipient, a task was place in a queue, where one of 60 workers (a feature of our queues is that each workers runs in a separate process) prepared the context, rendered the template, sent an HTTP request to Mailgun to send the email, and created a record in the database that the email was sent. The entire process took up to 12 hours, sending about 0.3 emails per second from each worker and blocking emails for small campaigns.

Asynchronous solution

Quick profiling showed that workers spent a large amount of time on setting up connections with Mailgun, so we started grouping tasks into chunks, one chunk for each worker. Workers began using a single connection with Mailgun, which dropped the time of emailing the list to 9 hours, each worker sending an average of 0.5 emails per second. Subsequent profiling again showed that network requests still took the majority of the time, which led us to the idea of using asyncio.

Before putting all the processing in an asyncio loop, we had to solve several problems:

  1. Django ORM is not yet able to work with asyncio, although it releases GIL during query execution. This means that queries to the database can be executed in a separate thread and not block the main loop.
  2. Current versions of aiohttp require Python versions 3.6 and higher, which required updating the Docker image at the time of implementation. Experiments on older versions of aiohttp and Python 3.5 have shown that the sending speed on these versions is much lower than on newer versions, and is comparable to sequential sending.
  3. Storing a large number of asyncio coroutines quickly consumes all the memory. This means that you can’t prepare all the coroutines for emails in advance and call a loop to process them, you have to prepare data as you send already generated emails.

Taking all this into account, we will create our own asyncio loop inside each of the workers with the ThreadPool type of pattern consisting of:

  • One or more producers working with the database via Django ORM in a separate thread via asyncio.ThreadPoolExecutor. The producer tries to aggregate data requests into small batches, renders templates for the data via Jinja2, and the emailing data to the task queue.
def get_campaign_send_data(ids: Iterable[int]) -> Iterable[Mapping[str, Any]]:
    """ We generate email data, here we work with Django ORM and template rendering."""
    return [{'id': id} for id in ids]


async def mail_campaign_producer(ids: Iterable[int], task_queue: asyncio.Queue) -> None:
    """
    We group recipients into subchannels and generate data for them to send, 
    which we place in the queue. Data generation requires working with the 
    database, so we perform it in ThreadPoolExecutor.
    """

    loop = asyncio.get_event_loop()
    total = len(ids)
    for subchunk_start in range(0, total, PRODUCER_SUBCHUNK_SIZE):
        subchunk_ids = ids[subchunk_start : min(subchunk_start + PRODUCER_SUBCHUNK_SIZE, total)]
        send_tasks = await loop.run_in_executor(None, get_campaign_send_data, subchunk_ids)
        for task in send_tasks:
            await task_queue.put(task)
  • Several hundred email senders are asyncio coroutines that read data from the task queue in an infinite loop, send network requests for each of them, and add the result (response, or exception) to the report queue.
async def send_mail(data: Mapping[str, Any], session: aiohttp.ClientSession) -> Union[Mapping[str, Any], Exception]:
    """ Sending a request to an external service."""
    async with session.post(REQUEST_URL, data=data) as response:
        if response.status_code != 200:
            raise Exception
    return data


async def mail_campaign_sender(
    task_queue: asyncio.Queue, result_queue: asyncio.Queue, session: aiohttp.ClientSession
) -> None:
    """
    Getting data from the queue and sending network requests.
    Don't forget to call task_done so that the calling code will know when 
    the email is sent.. 
    """

    while True:
        try:
            task_data = await task_queue.get()
            result = await send_mail(task_data, session)
            await result_queue.put(result)
        except asyncio.CancelledError:
            # Correctly processing cancellation of the coroutine
            raise
        except Exception as exception:
            # Processing errors in email sending
            await result_queue.put(exception)
        finally:
            task_queue.task_done()
  • One or several workers, grouping data from the queue and writing results to the database using a bulk request.
def process_campaign_results(results: Iterable[Union[Mapping[str, Any], Exception]]) -> None:
    """We process the results of transmission: exception and success and write them to the database"""
    pass


async def mail_campaign_reporter(task_queue: asyncio.Queue, result_queue: asyncio.Queue) -> None:
    """
    We group reports into a list and pass them to ThreadPoolExecutor for processing, to write emailing results to the database.
    """
    loop = asyncio.get_event_loop()
    results_chunk = []
    while True:
        try:
            results_chunk.append(await result_queue.get())
            if len(results_chunk) >= REPORTER_BATCH_SIZE:
                await loop.run_in_executor(None, process_campaign_results, results_chunk)
                results_chunk.clear()
        except asyncio.CancelledError:
            await loop.run_in_executor(None, process_campaign_results, results_chunk)
            results_chunk.clear()
            raise
        finally:
            result_queue.task_done()
  • A task queue, of instance asyncio.Queue, limited to a maximum number of items, so that the producer doesn’t overfill it, taking all the memory.
  • A report queue is also an instance of asyncio.Queue with a limit on the maximum number of items.
  • An asynchronous method that creates queues, workers, and finish transmission when they are stopped .
async def send_mail_campaign(
    recipient_ids: Iterable[int], session: aiohttp.ClientSession, loop: asyncio.AbstractEventLoop = None
) -> None:
    """
    Creates a queue and starts workers for processing.
    Waits for recipients to be generated, then waits for reports to be sent and saved. 
    """
    executor = ThreadPoolExecutor(max_workers=PRODUCERS_COUNT + 1)
    loop = loop or asyncio.get_event_loop()
    loop.set_default_executor(executor)

    task_queue = asyncio.Queue(maxsize=2 * SENDERS_COUNT, loop=loop)
    result_queue = asyncio.Queue(maxsize=2 * SENDERS_COUNT, loop=loop)

    producers = [
        asyncio.ensure_future(mail_campaign_producer(recipient_ids, task_queue)) for _ in range(PRODUCERS_COUNT)
    ]
    consumers = [
        asyncio.ensure_future(mail_campaign_sender(task_queue, result_queue, session)) for _ in range(SENDERS_COUNT)
    ]
    reporter = asyncio.ensure_future(mail_campaign_reporter(task_queue, result_queue))

    # We are waiting for all the letters to be prepared
    done, _ = await asyncio.wait(producers)

    # When all sends are completed, we stop the workers
    await task_queue.join()
    while consumers:
        consumers.pop().cancel()

    # When report saving is complete, we also stop the corresponding worker
    await result_queue.join()
    reporter.cancel()
  • Synchronous code that creates a loop and starts the distribution
async def close_session(future: asyncio.Future, session: aiohttp.ClientSession) -> None:
    """
    Close the session when all processing is complete.
    The aiohttp documentation recommends adding a delay before closing the session. 

    """
    await asyncio.wait([future])
    await asyncio.sleep(0.250)
    await session.close()


def mail_campaign_send_chunk(recipient_ids: Iterable[int]) -> None:
    """
    Entry point for starting a mailing list.
    Accepts recipient IDs, creates an asyncio loop, and starts the sending coroutine.

    """
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    # Session
    connector = aiohttp.TCPConnector(limit_per_host=0, limit=0)
    session = aiohttp.ClientSession(
        connector=connector, auth=aiohttp.BasicAuth('api', API_KEY), loop=loop, read_timeout=60
    )

    send_future = asyncio.ensure_future(send_mail_campaign(recipient_ids, session, loop=loop))
    cleanup_future = asyncio.ensure_future(close_session(send_future, session))
    loop.run_until_complete(asyncio.wait([send_future, cleanup_future]))
    loop.close()

After implementing this solution, the time for sending mass emails was reduced to an hour with the same volume of emails and 12 workers involved. That is, each worker sends 20-25 emails per second, which is 50-80 faster than the original solution. The memory consumption of the workers remained at the same level, the processor load increased slightly, and the network utilization increased by many times, which is the expected effect. The number of connections to the database has also increased since each of the threads of production workers and workers who save reports is actively working with the database. At the same time, the released workers can send out smaller emailing lists while the mass campaign is being sent.

Despite all the advantages, this implementation has a number of issues that must be taken into account:

  • You must be careful when handling errors. An unhandled exception may terminate the worker, causing the campaign to fail.
  • When sending is completed, you must not lose reports on recipients who went to the chunk towards the end, and save them to the database.
  • The logic of forcibly stopping-resuming of campaigns becomes more complicated because after stopping the sending workers, it is necessary to compare which recipients were sent emails and which ones were not.
  • After a while, the Mailgun support staff contacted us and asked us to reduce our speed, because mail services start temporarily rejecting emails if the frequency of sending them exceeds the threshold. This is easy to do by reducing the number of workers.
  • You would not be able to use asyncio if some of the stages of sending emails were performing CPU-intensive operations. Rendering templates using jinja2 was not a very resource intensive operation and has almost no effect on the speed of sending.
  • Using asyncio for emailing requires that the mail queue handlers are started by separate processes.

I hope our experience will be useful to you! If you have any questions or ideas, please write in the comments!


Written by Sergey, translated from here