Asynchronous Multithreading in Python
A practical introduction to blazing fast performance on large inputs
Intro
A little bit of background on what I’m trying to do here (feel free to skip this section if you just want to learn how this works). It took me a long time and lots of trial and error to learn to write parallel code, using StackOverflow, TDS, etc, and to do it in a practical way. There are great tutorials out there for learning the basics, such as this one, but if you’re trying to parallelize, it probably means you have a ton of data or operations to do, and you might run into problems. For example, the problem I worked on required 15 million api calls, which came with scalability issues. I’m no expert on parallel programming, but I wanted to share what I’ve learned and the roadblocks I’ve encountered. Hopefully this will be a useful guide for other learners like myself!
Asynchronous vs Multithreading
This part was super confusing to me at first, so I’ll try to break it down. Asynchronous programming is generally used when a specific action is slow, blocking following actions from being done. You see this often in I/O, such as network calls, where a single call can take hundreds of milliseconds to return. If your program waits for the call to return before doing anything else, you’re wasting valuable time. If you perform the calls asynchronously, you can send calls one after another immediately, and catch the returns later with a callback function. Kind of like a pitcher who can throw multiple balls instead of only one at a time. On the other hand, multithreading will spawn more threads, each of which can run actions in parallel. So if asynchronous programming is the pitcher with multiple balls, multithreading is like having a whole team of pitchers. Check this post out for an in depth explanation.
How fast is it?
Blazing fast. In my problem, each api call took about 100 milliseconds to complete, meaning I could get about 10 calls per second if running sequentially. For 15 million calls that equates to roughly 3 weeks of runtime. With a parallel program I was able to get to 2500+ calls per second, finishing all 15 million in just under 2 hours (this was run on a kubernetes pod that I was using, but speedup will be slightly different depending on different machines, due to memory and cpu constraints).
Setup
Let’s say we had a slow function that takes 100 milliseconds:
def slow_function(data):
time.sleep(100/1000)
return data
And we wanted to run this function many, many times and export the result:
if __name__ == "__main__":
output = []
for i in range(10000):
output.append(slow_function(i)) write_to_file(output)
For 10000 calls, this would take at least 1000 seconds sequentially (100 ms per call for 10000 calls), or about 16 minutes.
Parallelization
To parallelize, we can use python’s built-in multiprocessing
library:
import multiprocessing as mp
First we’ll want to write a callback function, which will tell the program what to do once our slow_function
does return. It takes in a result
variable, which is the output of slow_function
and does something to it. In this case, we’ll just add it to a list:
def catch(result):
global output
output.append(result)
Finally, we set up a threadpool and call our function asynchronously:
JOBS = 200 # 1if __name__ == "__main__":
mp.set_start_method("spawn") # 2
pool = mp.Pool(JOBS) # 3output = []
for i in range(10000):
pool.apply_async(slow_function, args=(i, ), \
callback=catch) # 4 pool.close() # 5
pool.join() write_to_file(output)
- The
JOBS
variable defines how many threads you want to spawn. The more you spawn the faster your program will run, but spawning too many could cause strain on your cpu, or memory overflow. Play around with it until you find a sweet spot (I tried ranges from 4 to 200 with different optimal numbers on different machines). - Set the start method to “spawn” to avoid this issue.
- Set up the threadpool with our chosen number.
- Call our
slow_function
usingpool.apply_async()
while passing in our callback function. - Use
pool.close()
to stop threads from accepting new work andpool.join()
to terminate the threads (see this forum for more details).
**Note: due to the parallel nature of this program, your outputs will not be ordered. It is a good idea to return the input with each output to be able to match them, if needed.
If you run this code as is, it will work well for medium sized inputs (~10k — 100k actions). However, if your inputs are much bigger, on the order of millions of actions, you might run into two main issues: 1) because the operations are so fast, the cpu might be overwhelmed 2) the output could be so large that it overflows memory.
CPU solution
To fix the cpu issue, I ran the process in chunks, with a brief delay in between each chunk:
JOBS = 200
CHUNKSIZE = 1000def process_chunk(chunk, pool):
for data in chunk:
pool.apply_async(slow_function, args=(data, ), \
callback=catch)if __name__ == "__main__":
mp.set_start_method("spawn")
pool = mp.Pool(JOBS) output = []
chunk = []
for i in range(10000):
chunk.append(i) if (i+1) % CHUNKSIZE == 0:
process_chunk(chunk, pool)
chunk = []
time.sleep(500/1000) pool.close()
pool.join() write_to_file(output)
Based on this article, there are functions that automatically create chunks from your input, but this relies on having your entire input ready as a list object, which will cause more memory issues on large inputs.
You can also decrease the number of jobs to ease cpu load.
Memory Solution
To solve the memory issue, I exported the outputs in batches (every few chunks) and cleared the in-memory data store (and added a longer delay between each batch):
JOBS = 200
CHUNKSIZE = 1000
BATCHSIZE = 5def process_chunk(chunk, pool):
for data in chunk:
pool.apply_async(slow_function, args=(data, ), \
callback=catch)if __name__ == "__main__":
mp.set_start_method("spawn")
pool = mp.Pool(JOBS) output = []
chunk = []
for i in range(10000):
chunk.append(i) if (i+1) % CHUNKSIZE == 0:
process_chunk(chunk, pool)
chunk = []
time.sleep(500/1000) if (i+1) % (BATCHSIZE * CHUNKSIZE) == 0:
write_to_file(output)
output = [] pool.close()
pool.join()
Full code
Here is the full parallelized code, including the cpu and memory issue workarounds:
import time
import timeit
import multiprocessing as mp# set parameters here
JOBS = 200
CHUNKSIZE = 1000
BATCHSIZE = 5def slow_function(data):
time.sleep(100/1000)
return datadef catch(result):
global output
output.append(result)def process_chunk(chunk, pool):
for data in chunk:
pool.apply_async(slow_function, args=(data, ), \
callback=catch)if __name__ == "__main__":
mp.set_start_method("spawn")
pool = mp.Pool(JOBS) start = timeit.default_timer() output = []
chunk = []
for i in range(10000):
chunk.append(i) if (i+1) % CHUNKSIZE == 0:
process_chunk(chunk, pool)
chunk = []
time.sleep(500/1000) if (i+1) % (BATCHSIZE * CHUNKSIZE) == 0:
write_to_file(output)
output = [] pool.close()
pool.join() stop = timeit.default_timer()
print("##### done in ", stop - start, " seconds #####")
I included the tunable parameters up top as globals, including the number of threads, the chunk size and the batch size. Play around with these to get optimal performance for your specific machine and for your specific data inputs.
This worked for me for up to order magnitude of 10,000,000+ calls. For magnitudes higher, there may be more issues and roadblocks to overcome.
Benchmark time
Earlier I calculated it would take at least 1000 seconds to run this example program sequentially. With the above code, running locally on a standard 2019 8-core macbook pro, I benchmarked it at ~11 seconds, roughly a 100x speedup. Not bad.