DataRaccoon

Problem Intro

As more companies get used to the idea of building data products (which, in my opinion is a different problem as compared to build a software product), chances are you will need to consider upstream/down streams systems.

If you are working for a big company with resources under your disposal, great. Otherwise, chances are you will need to manage your own product which comes with architectural / design choices that a data scientist is completely new to.

One of the common problems that comes up often enough is integrating with feature stores or multiple data sources. Usually such processing happens on the database side and not your machine - resulting in an IO (input-output) problem. Coupled with the rising popularity of Fastapi, it seems like a good idea to get familiarity with asynchronous work flow.

What is asynchronous?

Personally, I find Fastapi's explanation of async. Citing:

Asynchronous code just means that the language 💬 has a way to tell the computer / program 🤖 that at some point in the code, it 🤖 will have to wait for something else to finish somewhere else. Let's say that something else is called "slow-file" 📝.

Another special note that asynchronous is related to concurrency and is part of a larger topic, and usually studied with parallelism. The Realpython referrences will be a good place to start.

Pre-req & setup

Familiarity with python 3.7+ ! (and the use of generators)

Note, for these examples I executed them with Jupyter notebooks!

These are the packages you need:

asyncio
nest-asyncio
# custom async packages you might need depending on your use case
aiohttp

Normal python code

Consider the following code,

import time


def feat1():
    for i in [10, 20, 40]:
        time.sleep(1)
        yield i


def feat2():
    for i in [50, 500, 5000]:
        time.sleep(1))
        yield i


time_now = time.perf_counter()
value1 = [x for x in feat1()]
value2 = [x for x in feat2()]
time_end = time.perf_counter()
print("total time taken is {}".format(time_end - time_now))

"""
total time taken is 6.0033508999913465
"""

During value1 when the program was executing, each iteration python was taking 1 second before the next data arrives. During this 1 second, we could be receiving output / do data processing instead of waiting and doing nothing.

This is when Async comes into play:

Async Version

This is the equivalent of the above, just in async!

# async generators
async def afeat1():
    for i in [10, 20, 40]:
        await asyncio.sleep(1)
        yield i


async def afeat2():
    for i in [50, 500, 5000]:
        await asyncio.sleep(1)
        yield i


async def process_feat(asyncgenerator):
    empty_list = []
    async for i in asyncgenerator:
        empty_list.append(i)
    return empty_list


data = asyncio.gather(process_feat(afeat1()), process_feat(afeat2()))

# typical examples will not run
s = time.perf_counter()
output = await data
print(output)
elapsed = time.perf_counter() - s
print(elapsed)  # 3.003277199997683
"""
[[10, 20, 40], [50, 500, 5000]]
3.005606100021396
"""

(If you encountered an error, go to the next section)

The first thing to realize is writing async code is expensive!

Always go back to fundamentals! Before deciding to apply async, check that your code has an IO problem or it already fits your required SLA!

We'll go through in details in the Async sections! but generally the following changes are noticeable:

  • functions have an await keyword infront of them
  • time.sleep has become asyncio.sleep
  • for loops also have async in them

Most importantly of all, you need to tell python which tasks to asyncio.gather so the async code can run! Thus, from the earlier example of 6 seconds, by running both asynchronously, taking 3 seconds instead.

Running with Python

If you are running it as a python script or in a python console, you will encounter this error:

SyntaxError: 'await' outside function

To run it in python, you need to change it to:

async def main():
    data = await asyncio.gather(process_feat(afeat1()), process_feat(afeat2()))
    return data

output = asyncio.run(main())

This is how your script should look:

import asyncio
import time

# async generators
async def afeat1():
    for i in [10, 20, 40]:
        await asyncio.sleep(1)
        yield i


async def afeat2():
    for i in [50, 500, 5000]:
        await asyncio.sleep(1)
        yield i


async def process_feat(asyncgenerator):
    empty_list = []
    async for i in asyncgenerator:
        empty_list.append(i)
    return empty_list


async def main():
    data = await asyncio.gather(process_feat(afeat1()), process_feat(afeat2()))
    return data


s = time.perf_counter()
output = asyncio.run(main())
print(output)
elapsed = time.perf_counter() - s
print(elapsed)  

Running it as a python executable in terminal:

(base) /workspaces/asyncio# python example.py 
[[10, 20, 40], [50, 500, 5000]]
3.0077412000100594

However, this code will not work if you are using jupyter notebook.

Running with Jupyter

Running this in a notebook will generate this error:

RuntimeError: asyncio.run() cannot be called from a running event loop

Turns out, Jupyter is already using an event loop. More information on this can be found in stackoverflow here

Use await

The first way to solve it, is not to use asyncio.run and use await directly.

s = time.perf_counter()
# output = asyncio.run(main()) # this line is commented out 
output = await main()
print(output)
elapsed = time.perf_counter() - s
print(elapsed) 

Nested-asyncio

If you need to use it in notebook environment, you can use nest_asyncio and run it in the jupyer cell

import nest_asyncio
nest_asyncio.apply()

Async!

Async is actually a concurrent programming design that received dedicated in Python, evolving rapidly from Python 3.4 . The keywrods async/await are new python-keywords that are used to define coroutines!

Async Functions

Previously, to use async, you need to apply a decorator. Since python 35 onwards it has evolved greatly, perhaps following the javascript design.

import asyncio

@asyncio.coroutine
def py_old_way():
    yield from stuff()

# since py35++
async def py_new_way():
    """Native coroutine, modern syntax"""
    await stuff()

Async Sleep, Loops

For most of your code, there is some learning curve / debugging to figure out the async equivalent.

sleep

For example, time.sleep(x) is now changed to asyncio.sleep(x). With for loops or with statements you will need to add async infront of them.

async with aiohttp.ClientSession() as client:
    pass

async for i in asyncgenerator(x):
    pass

Async Gather

Once you define the tasks to run concurrently, you need to gather them with asyncio.gather - In layman terms its sort of telling python to gather all the tasks (or coroutine) so it can executes in an event loop.

Await

Once the event loop is defined, then you inform python to gather all the results before proceeding.

Async processing

Typically when you hit databases, the response is being returned as a generator. The example below shows an example where you retrieve multiple generators, process the data and the output in an async fashion.

import asyncio
import time


async def afeat1():
    for i in [10, 20, 40]:
        await asyncio.sleep(0.1)
        yield i


async def afeat2():
    for i in [50, 500, 5000]:
        await asyncio.sleep(0.1)
        yield i


async def task(asyncgenerator, input_func):
    output = input_func([i async for i in asyncgenerator])
    return output


async def complex_task(asyncgenerator):
    min_, max_, sum, count = 0, 0, 0, 0
    async for i in asyncgenerator:
        min_ = min(min_, i)
        max_ = max(max_, i)
        sum += i
        count += 1
    mean = sum / count
    return [min_, max_, mean]


s = time.perf_counter()
output = asyncio.gather(
    task(afeat1(), sum), task(afeat2(), max), complex_task(afeat2())
)
# asyncio.run(output)
task1, task2, task3 = await output
print(task1, task2, task3)  # 70 5000 [0, 5000, 1850.0]
elapsed = time.perf_counter() - s
print(elapsed)  # 3.0022729000047548

"""
70 5000 [0, 5000, 1850.0]
0.3041836000047624
"""

Other Modules

There are many other async packages, I usually refer to the resources section over at realpython asyncio tutorial. There are a whole bunch of packages/resources such as using async with files, redis, postgres, kafka etc.

I will be updating the following sections in the future as I explore more async features!

AIOHTTP

AIOHTTP stands for async io http requests, that supports your typical rest comamnds such as get, post, put etc.

This is useful when calling multiple websites and waiting for a response:

import aiohttp
import asyncio


async def fetch(client):
    async with client.get("http://python.org") as resp:
        assert resp.status == 200
        return await resp.text()


async def main():
    async with aiohttp.ClientSession() as client:
        html = await fetch(client)
        print(html)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Referrences

Python Docs

Real Python Tutorials

Other Tutorials: