Parallelize Pandas map() or apply()

Pandas is a very useful data analysis library for Python. It can be very useful for handling large amounts of data.

Unfortunately Pandas runs on a single thread, and doesn’t parallelize for you. And if you’re doing lots of computation on lots of data, such as for creating features for Machine Learning, it can be pretty slow depending on what you’re doing.

To tackle this problem, you essentially have to break your data into smaller chunks, and compute over them in parallel, making use of the Python multiprocessing library.

Let’s say you have a large Pandas DataFrame:

import pandas as pd

data = pd.DataFrame(...) #Load data

And you want to apply() a function to the data like so:

def work(x):
    # Do something to x
    # return something

data = data.apply(work)

What you can do is break the DataFrame into smaller chunks using numpy, and use a Pool from the multiprocessing library to do work in parallel on each chunk, like so:

import numpy as np
from multiprocessing import cpu_count, Parallel

cores = cpu_count() #Number of CPU cores on your system
partitions = cores #Define as many partitions as you want

def parallelize(data, func):
    data_split = np.array_split(data, partitions)
    pool = Pool(cores)
    data = pd.concat(pool.map(func, data_split))
    pool.close()
    pool.join()
    return data

And that’s it. Now you can call parallelize on your DataFrame like so:

data = parallelize(data, work);

Run it, and watch your system’s CPU utilization shoot up to 100%! And it should finish much faster, depending on how many cores you have. 8 cores should theoretically be 8x faster. Or you could fire up an AWS EC2 instance with 32 cores and run it 32x faster!

8 thoughts on “Parallelize Pandas map() or apply()”

  1. Hey, thanks for the explanation Adeel. I want to create a new column in my dataframe df which is generated by applying a function f to an existing column in df. This is embarrassingly parallel i.e. the function is applied to each row individually and independently to produce the new column, so each row is only dependent on itself and not on any other rows.

    My previous code (before paralellising) was:

    df[“new_column”] = df[“existing_column”].apply(f)

    But when I try to apply your code to paralellise, it gets stuck in some infinite loop because it never finishes. I even tried to properly define each sub chunk of the original df and create a test function which explicitly defines what I want to do… still not working. Any tips or help would be greatly appreciated! Thanks

    # doesnt seem to work
    import multiprocessing
    from multiprocessing import Pool
    num_partitions = 5
    num_cores = multiprocessing.cpu_count()
    # print(num_partitions,num_cores)
    def parallelize_dataframe(df, func):
    a,b,c,d,e = np.array_split(df, num_partitions)
    pool = Pool(num_cores)
    df = pd.concat(pool.map(func, [a,b,c,d,e]))
    pool.close()
    pool.join()
    return df

    def test_func(data):
    data[“new_column”] = data[“existing_column”].apply(f)
    return data

    t0 = time.time()
    test = parallelize_dataframe(df, test_func)
    t1 = time.time()
    print(“running in parallel:”,t1-t0,”s”)

    1. Hello Killian,

      Sorry about the delayed reply, been really busy. Hmm, I’m not sure without seeing your dataframe or function “f”. Try the code below. It’s essentially what you pasted, but with a square function that’s used to apply to an existing column, to create the new column. Seems to work fine, and in parallel. BTW, make sure you call parallelize_dataframe() from this if statement: if __name__ == ‘__main__’:

      import multiprocessing
      import pandas as pd
      import numpy as np
      from multiprocessing import Pool
      num_partitions = 5
      num_cores = multiprocessing.cpu_count()
      
      def parallelize_dataframe(df, func):
          a,b,c,d,e = np.array_split(df, num_partitions)
          pool = Pool(num_cores)
          df = pd.concat(pool.map(func, [a,b,c,d,e]))
          pool.close()
          pool.join()
          return df
      
      def square(x):
          return x**2
      
      def test_func(data):
          print "Process working on: ",data
          data["square"] = data["col"].apply(square)
          return data
      
      df = pd.DataFrame({'col': [0,1,2,3,4,5,6,7,8,9]})
      
      if __name__ == '__main__':
          test = parallelize_dataframe(df, test_func)
          print test
      

      Edit: Note: the intermediate print’s will look funny (overlapped). This is a good sign. It means it’s running in parallel.

      1. Hi Adeel, Thanks for the explanation. I am planning to read the the large CSV file and convert load the data into Oracle Database table in parallel. Could you guide me or send me the sample code to read the large CSV file as chunks and load into Oracle table in parallel.

        Thanks,
        Mark

  2. Hi Adeel.
    My code is entering into an infinite execution and is not yielding anything at all.
    Kindly contact me @kuldeep.gautam007@gmail.com as I can’t share the details here due to the privacy policy of the organisation I am working with.

    Thanks.

  3. Hi Adeel
    the pool.map action throws this exception:
    * ‘float’ object is not iterable
    any idea why this may happen?

    thanks
    Shay

    1. Hello Shay. It seems you’re passing a float to your pool.map(). The first argument should be the function that will be called. The second argument should be a Python list or numpy array, of arguments that are passed to the function through pool.map().

      1. Hi Adeel
        thanks for your quick response
        I (think) I am passing the right arguments (name of function and list of DataFrames)
        and I have no idea where this float appeared from…
        Shay

Leave a Reply

Your email address will not be published. Required fields are marked *