An Asynchronous Task
Following on from Funsize fun times we're going to make the task run asynchronously. This won't save us CPU time, as the work still needs to be done, but it should save us wall-clock time. We'll be done in fewer minutes, but we'll be doing more work in those minutes.
Anatomy of a partials task
The main loop for work in a funsize task follows the following pseudocode:
for every partial we've been told to build
Set up a working directory, including downloading tools like 'mar' and 'mbsdiff'
Download the 'from' MAR
Verify its signature
Unpack the MAR
Virus scan the unpacked contents
Download the 'to' MAR
Verify its signature
Unpack the MAR
Virus scan the unpacked contents
Set up some metadata
Generate the partial using 'make_incremental_update.sh'
Copy the resulting file to the artifacts area
Clean up
Because each task generates multiple partials, we have what's known as an embarrassingly parallel task. Each iteration in that for loop could easily run at the same time, as long as they're given a different working environment.
Starting out
We know we'll be doing web requests, so let's add the 'aiohttp' library to our virtual environment, and get to work.
Let's keep the set-up in 'main', but move the work out into different functions. This will make things easier to read in any case. Setting up an event loop in place of the earlier 'for' loop:
loop = asyncio.get_event_loop()
manifest = loop.run_until_complete(async_main(args, signing_certs))
loop.close()
async_main can now contain the for loop that will iterate over the partials we want to create, but instead of doing them sequentially, we can create them as futures, and then await them all. We can move the bulk of the work into a new `manage_partials` function.
async def async_main(args, signing_certs);
tasks = []
master_env = WorkEnv()
await master_env.setup()
for definition in task["extra"]["funsize"]["partials"]:
workenv = WorkEnv()
await workenv.clone(master_env)
tasks.append(asyncio.ensure_future(manage_partial(...all the args...)))
manifest = await asyncio.gather(*tasks)
master_env.cleanup()
return manifest
We could have just called return await asyncio.gather(*tasks) if we didn't want to clean up the working environments. Instead, since setting up a work environment involves downloading some tools, we optimise a bit by only doing that once and then cloning it locally for each task.
The heavy(ish) lifting
Most of the work is done in called processes, so we need to migrate away from using the sh module, even if that's very useful. Perhaps in the future sh can be async-aware.
Thankfully, the asyncio module has create_subprocess_shell, which works in a similar way, and is easy to use. We can call that with the command we're given and still set up the process's environment and current working directory, and get the output. The results is the run_commandfunction.
async def run_command(cmd, cwd='/', env=None, label=None, silent=False):
if not env:
env = dict()
process = await asyncio.create_subprocess_shell(cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
cwd=cwd, env=env)
stdout, stderr = await process.communicate()
await process.wait()
if silent:
return
if not stderr:
stderr = ""
if not stdout:
stdout = ""
if label:
label = "{}: ".format(label)
else:
label = ''
for line in stdout.splitlines():
log.debug("%s%s", label, line.decode('utf-8'))
for line in stderr.splitlines():
log.warn("%s%s", label, line.decode('utf-8'))
We do the same for download functions, and things Just Work.
Task Durations
We've done well with the overall task time, using the same amount of cpu in a shorter space of time. We still have more optimisation to do within the partials generation itself, though.
