Recently at my workplace our IT team finally upgraded our distributed Python versions to 3.5.0. While this is a huge upgrade from 2.6, this still came with some growing pains. Unfortunately, Python 3.5.0 doesn’t meet some of the minimum requirements of some popular libraries, including aiohttp.
With these restrictions I still needed to write a script that could pull hundreds of .csv files from our APIs and manipulate the data. Python by itself isn’t event-driven and natively asynchronous (like NodeJS), but the same effect can still be achieved. This article will help detail what I learned while also showing the benefits of asynchronous operations.
Disclaimer: If you have a higher version of Python available (3.5.2+), I highly recommend using aiohttp instead. It’s an incredibly robust library and a great solution for this kind of problem. There are many tutorials online detailing how best to use the library.
Assumptions
This article makes the following assumptions:
You already have familiarity with Python and most of its syntaxYou already have familiarity with basic web requestsYou have a lose concept of asynchronous operations
If you’re just looking for the solution, scroll down to the bottom and the full code is posted. Enjoy!
Setup
Before getting started, ensure that you have requests installed on your machine. The easiest way to install is by typing the following command into your terminal:
$ python -m pip install requests
Alternatively, if you don’t have administrative permissions you can install the library with this command:
$ python -m pip install requests --user
The wrong approach: synchronous requests
To demonstrate the benefits of our parallel approach, let’s first look at approaching the problem in a synchronous manner. I’ll also give an overview of what’s going on in the code. Ultimately, we want to do able to perform a GET request to the URL containing the .csv file and measure the time it takes to read the text inside.
We’ll be downloading multiple .csv files of varying sizes from https://people.sc.fsu.edu/~jburkardt/data/csv/, which provides plenty of data for our example.
As a disclaimer, we’ll be using the Session object from the requests library to perform our GET request.
First, we’ll need a function that executes the web request:c
def fetch(session, csv):
base_url = "https://people.sc.fsu.edu/~jburkardt/data/csv/"
with session.get(base_url + csv) as response:
data = response.text
if response.status_code != 200:
print("FAILURE::{0}".format(url))
# Return .csv data for future consumption
return data
View code gist here
This function takes in a
Session
object and the name of the .csv file desired, performs the web request, then returns the text inside the response.Next, we need a function that can efficiently loop a list of our desired files and measure the time it takes to perform the request:
from timeit import default_timer()
def get_data_synchronous():
csvs_to_fetch = [
"ford_escort.csv",
"cities.csv",
"hw_25000.csv",
"mlb_teams_2012.csv",
"nile.csv",
"homes.csv",
"hooke.csv",
"lead_shot.csv",
"news_decline.csv",
"snakes_count_10000.csv",
"trees.csv",
"zillow.csv"
]
with requests.Session() as session:
print("{0:<30} {1:>20}".format("File", "Completed at"))
# Set any session parameters here before calling `fetch`
# For instance, if you needed to set Headers or Authentication
# this can be done before starting the loop
total_start_time = default_timer()
for csv in csvs_to_fetch:
fetch(session, csv)
elapsed = default_timer() - total_start_time
time_completed_at = "{:5.2f}s".format(elapsed)
print("{0:<30} {1:>20}".format(csv, time_completed_at))
View code gist here
This function creates our
Session
object and then loops through each .csv file in the csvs_to_fetch
list. Once the fetch
operation is completed, the measured time is calculated and displayed in an easy-to-read format.Finally, our
main
function will be simple (for now) and call our function:def main():
# Simple for now
get_data_synchronous()
main()
View code gist here
Once we put it all together, here is what the code looks like for our synchronous example:
import requests
from timeit import default_timer
def fetch(session, csv):
base_url = "https://people.sc.fsu.edu/~jburkardt/data/csv/"
with session.get(base_url + csv) as response:
data = response.text
if response.status_code != 200:
print("FAILURE::{0}".format(url))
# Return .csv data for future consumption
return data
def get_data_synchronous():
csvs_to_fetch = [
"ford_escort.csv",
"cities.csv",
"hw_25000.csv",
"mlb_teams_2012.csv",
"nile.csv",
"homes.csv",
"hooke.csv",
"lead_shot.csv",
"news_decline.csv",
"snakes_count_10000.csv",
"trees.csv",
"zillow.csv"
]
with requests.Session() as session:
print("{0:<30} {1:>20}".format("File", "Completed at"))
# Set any session parameters here before calling `fetch`
# For instance, if you needed to set Headers or Authentication
# this can be done before starting the loop
total_start_time = default_timer()
for csv in csvs_to_fetch:
fetch(session, csv)
elapsed = default_timer() - total_start_time
time_completed_at = "{:5.2f}s".format(elapsed)
print("{0:<30} {1:>20}".format(csv, time_completed_at))
def main():
# Simple for now
get_data_synchronous()
main()
View code gist here
Let’s take a look at the results when we run this script:
Synchronous example. Notice how each operation doesn’t start until the last one is completed
Thankfully, we can vastly improve this performance with Python 3’s built-in
asyncio
library!The right approach: performing multiple requests at once asynchronously
In order to get this to work, we’ll have to rework some of our existing functions. Beginning with
fetch
:import requests
from timeit import default_timer
# We'll need access to this variable later
START_TIME = default_timer()
def fetch(session, csv):
base_url = "https://people.sc.fsu.edu/~jburkardt/data/csv/"
with session.get(base_url + csv) as response:
data = response.text
if response.status_code != 200:
print("FAILURE::{0}".format(url))
# Now we will print how long it took to complete the operation from the
# `fetch` function itself
elapsed = default_timer() - START_TIME
time_completed_at = "{:5.2f}s".format(elapsed)
print("{0:<30} {1:>20}".format(csv, time_completed_at))
return data
View code gist here
Next, we need to make our
get_data
function asynchronous:import asyncio
from timeit import default_timer
from concurrent.futures import ThreadPoolExecutor
async def get_data_asynchronous():
csvs_to_fetch = [
"ford_escort.csv",
"cities.csv",
"hw_25000.csv",
"mlb_teams_2012.csv",
"nile.csv",
"homes.csv",
"hooke.csv",
"lead_shot.csv",
"news_decline.csv",
"snakes_count_10000.csv",
"trees.csv",
"zillow.csv"
]
print("{0:<30} {1:>20}".format("File", "Completed at"))
# Note: max_workers is set to 10 simply for this example,
# you'll have to tweak with this number for your own projects
# as you see fit
with ThreadPoolExecutor(max_workers=10) as executor:
with requests.Session() as session:
# Set any session parameters here before calling `fetch`
# Initialize the event loop
loop = asyncio.get_event_loop()
# Set the START_TIME for the `fetch` function
START_TIME = default_timer()
# Use list comprehension to create a list of
# tasks to complete. The executor will run the `fetch`
# function for each csv in the csvs_to_fetch list
tasks = [
loop.run_in_executor(
executor,
fetch,
*(session, csv) # Allows us to pass in multiple arguments to `fetch`
)
for csv in csvs_to_fetch
]
# Initializes the tasks to run and awaits their results
for response in await asyncio.gather(*tasks):
pass
View code gist here
This code will now create multiple threads for each .csv file and execute the
fetch
function for each that needs to be downloaded.Finally, our
main
function needs a small tweak to properly initialize our async function:def main():
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(get_data_asynchronous())
loop.run_until_complete(future)
main()
View code gist here
Now, let’s run the new code and see the results:
Asynchronous example. Notice how the files are not being obtained in order.
With this small change, all 12 of these .csv files were able to be downloaded in 3.43s vs 10.84s. That is a nearly 70% decrease in the time it took to download!
The Asynchronous Code
import requests
import asyncio
from concurrent.futures import ThreadPoolExecutor
from timeit import default_timer
START_TIME = default_timer()
def fetch(session, csv):
base_url = "https://people.sc.fsu.edu/~jburkardt/data/csv/"
with session.get(base_url + csv) as response:
data = response.text
if response.status_code != 200:
print("FAILURE::{0}".format(url))
elapsed = default_timer() - START_TIME
time_completed_at = "{:5.2f}s".format(elapsed)
print("{0:<30} {1:>20}".format(csv, time_completed_at))
return data
async def get_data_asynchronous():
csvs_to_fetch = [
"ford_escort.csv",
"cities.csv",
"hw_25000.csv",
"mlb_teams_2012.csv",
"nile.csv",
"homes.csv",
"hooke.csv",
"lead_shot.csv",
"news_decline.csv",
"snakes_count_10000.csv",
"trees.csv",
"zillow.csv"
]
print("{0:<30} {1:>20}".format("File", "Completed at"))
with ThreadPoolExecutor(max_workers=10) as executor:
with requests.Session() as session:
# Set any session parameters here before calling `fetch`
loop = asyncio.get_event_loop()
START_TIME = default_timer()
tasks = [
loop.run_in_executor(
executor,
fetch,
*(session, csv) # Allows us to pass in multiple arguments to `fetch`
)
for csv in csvs_to_fetch
]
for response in await asyncio.gather(*tasks):
pass
def main():
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(get_data_asynchronous())
loop.run_until_complete(future)
main()
View code gist here
I hope you enjoyed this article and can use these skills for any projects that require an older Python version (or maybe without as many dependencies). Although Python may not have a straightforward to an async / await pattern, it isn’t difficult at all to achieve fantastic results.
Enjoy!