Pandas is a very useful data analysis library for Python. It can be very useful for handling large amounts of data.
Unfortunately Pandas runs on a single thread, and doesn’t parallelize for you. And if you’re doing lots of computation on lots of data, such as for creating features for Machine Learning, it can be pretty slow depending on what you’re doing.
To tackle this problem, you essentially have to break your data into smaller chunks, and compute over them in parallel, making use of the Python multiprocessing library.
Let’s say you have a large Pandas DataFrame:
import pandas as pd data = pd.DataFrame(...) #Load data
And you want to apply() a function to the data like so:
def work(x): # Do something to x # return something data = data.apply(work)
What you can do is break the DataFrame into smaller chunks using numpy, and use a Pool from the multiprocessing library to do work in parallel on each chunk, like so:
import numpy as np from multiprocessing import cpu_count, Parallel cores = cpu_count() #Number of CPU cores on your system partitions = cores #Define as many partitions as you want def parallelize(data, func): data_split = np.array_split(data, partitions) pool = Pool(cores) data = pd.concat(pool.map(func, data_split)) pool.close() pool.join() return data
And that’s it. Now you can call parallelize on your DataFrame like so:
data = parallelize(data, work);
Run it, and watch your system’s CPU utilization shoot up to 100%! And it should finish much faster, depending on how many cores you have. 8 cores should theoretically be 8x faster. Or you could fire up an AWS EC2 instance with 32 cores and run it 32x faster!
8 thoughts on “Parallelize Pandas map() or apply()”
Hey, thanks for the explanation Adeel. I want to create a new column in my dataframe df which is generated by applying a function f to an existing column in df. This is embarrassingly parallel i.e. the function is applied to each row individually and independently to produce the new column, so each row is only dependent on itself and not on any other rows.
My previous code (before paralellising) was:
df[“new_column”] = df[“existing_column”].apply(f)
But when I try to apply your code to paralellise, it gets stuck in some infinite loop because it never finishes. I even tried to properly define each sub chunk of the original df and create a test function which explicitly defines what I want to do… still not working. Any tips or help would be greatly appreciated! Thanks
# doesnt seem to work
from multiprocessing import Pool
num_partitions = 5
num_cores = multiprocessing.cpu_count()
def parallelize_dataframe(df, func):
a,b,c,d,e = np.array_split(df, num_partitions)
pool = Pool(num_cores)
df = pd.concat(pool.map(func, [a,b,c,d,e]))
data[“new_column”] = data[“existing_column”].apply(f)
t0 = time.time()
test = parallelize_dataframe(df, test_func)
t1 = time.time()
print(“running in parallel:”,t1-t0,”s”)
Sorry about the delayed reply, been really busy. Hmm, I’m not sure without seeing your dataframe or function “f”. Try the code below. It’s essentially what you pasted, but with a square function that’s used to apply to an existing column, to create the new column. Seems to work fine, and in parallel. BTW, make sure you call parallelize_dataframe() from this if statement: if __name__ == ‘__main__’:
Edit: Note: the intermediate print’s will look funny (overlapped). This is a good sign. It means it’s running in parallel.
Hi Adeel, Thanks for the explanation. I am planning to read the the large CSV file and convert load the data into Oracle Database table in parallel. Could you guide me or send me the sample code to read the large CSV file as chunks and load into Oracle table in parallel.
My code is entering into an infinite execution and is not yielding anything at all.
Thank for the previous comments I found them very interesting. I have applied them for text processing, here a little example in github: https://github.com/rafaelvalero/ParallelTextProcessing/blob/master/parallelizing_text_processing.ipynb
the pool.map action throws this exception:
* ‘float’ object is not iterable
any idea why this may happen?
Hello Shay. It seems you’re passing a float to your pool.map(). The first argument should be the function that will be called. The second argument should be a Python list or numpy array, of arguments that are passed to the function through pool.map().
thanks for your quick response
I (think) I am passing the right arguments (name of function and list of DataFrames)
and I have no idea where this float appeared from…