Press "Enter" to skip to content

An Asynchronous Task

makes Following on from Funsize fun times we’re going to make the task run asynchronously. This won’t save us CPU time, as the work still needs to be done, but it should save us wall-clock time. We’ll be done in fewer minutes, but we’ll be doing more work in those minutes.

Anatomy of a partials task

The main loop for work in a funsize task follows the following pseudocode:

for every partial we've been told to build
    Set up a working directory, including downloading tools like 'mar' and 'mbsdiff'

    Download the 'from' MAR
    Verify its signature
    Unpack the MAR
    Virus scan the unpacked contents

    Download the 'to' MAR
    Verify its signature
    Unpack the MAR
    Virus scan the unpacked contents

    Set up some metadata
    Generate the partial using ''
    Copy the resulting file to the artifacts area
    Clean up

Because each task generates multiple partials, we have what’s known as an embarrassingly parallel task. Each iteration in that for loop could easily run at the same time, as long as they’re given a different working environment.

Starting out

We know we’ll be doing web requests, so let’s add the ‘aiohttp’ library to our virtual environment, and get to work.

Let’s keep the set-up in ‘main’, but move the work out into different functions. This will make things easier to read in any case. Setting up an event loop in place of the earlier ‘for’ loop:

    loop = asyncio.get_event_loop()
    manifest = loop.run_until_complete(async_main(args, signing_certs))

async_main can now contain the for loop that will iterate over the partials we want to create, but instead of doing them sequentially, we can create them as futures, and then await them all. We can move the bulk of the work into a new `manage_partials` function.

async def async_main(args, signing_certs);
    tasks = []
    master_env = WorkEnv()
    await master_env.setup()
    for definition in task["extra"]["funsize"]["partials"]:
        workenv = WorkEnv()
        await workenv.clone(master_env)
        tasks.append(asyncio.ensure_future(manage_partial(...all the args...)))

    manifest = await asyncio.gather(*tasks)
    return manifest

We could have just called return await asyncio.gather(*tasks) if we didn’t want to clean up the working environments. Instead, since setting up a work environment involves downloading some tools, we optimise a bit by only doing that once and then cloning it locally for each task.

The heavy(ish) lifting

Most of the work is done in called processes, so we need to migrate away from using the sh module, even if that’s very useful. Perhaps in the future sh can be async-aware.

Thankfully, the asyncio module has create_subprocess_shell, which works in a similar way, and is easy to use. We can call that with the command we’re given and still set up the process’s environment and current working directory, and get the output. The results is the run_commandfunction.

async def run_command(cmd, cwd='/', env=None, label=None, silent=False):
    if not env:
        env = dict()
    process = await asyncio.create_subprocess_shell(cmd,
                                                    cwd=cwd, env=env)
    stdout, stderr = await process.communicate()

    await process.wait()

    if silent:

    if not stderr:
        stderr = ""
    if not stdout:
        stdout = ""

    if label:
        label = "{}: ".format(label)
        label = ''

    for line in stdout.splitlines():
        log.debug("%s%s", label, line.decode('utf-8'))
    for line in stderr.splitlines():
        log.warn("%s%s", label, line.decode('utf-8'))

We do the same for download functions, and things Just Work.

Task Durations

We’ve done well with the overall task time, using the same amount of cpu in a shorter space of time. We still have more optimisation to do within the partials generation itself, though.