Parallelize Pandas map() and apply() while accounting for future records

A few blog posts ago, I covered how to parallelize Pandas map() and apply(). You can read more about it at http://blog.adeel.io/2016/11/06/parallelize-pandas-map-or-apply/ … Essentially it works by breaking the data into smaller chunks, and using Python’s multiprocessing capabilities you call map() or apply() on the individual chunks of data, in parallel.

This works great, but what if it’s time series data, and part of the data you need to process each record lies in a future record? For example, if you are tracking the change of price from one moment to what it will be in a moment in the future. In this case the approach I laid out about dividing it into chunks will not work, because as you reach the end of a chunk, you will not have the future records to use.

It turns out that there’s a relatively simple way to do this. Essentially you determine how much in the future you need to go, and include those extra records in each chunk (so some records at the edges are duplicated in chunks), and then drop them at the very end.

So let’s say for each record, you also need records from up to 30 seconds in the future, for your calculation. And each record in your data represents 1 second. So essentially you include 30 extra records in each chunk so they are available for the parallel calculations. And then drop them later.

You start by setting up your parallel processor function like so:

import pandas as pd
import multiprocessing

cpu_count = multiprocessing.cpu_count()

def parallelize(data, split_interval):
    splits = range(0, cpu_count)
    parallel_arguments = []
    for split in splits:
        parallel_arguments.append([split, data, split_interval])
    pool = multiprocessing.Pool(cpu_count)
    data_array = pool.map(worker, parallel_arguments)
    pool.close()
    pool.join()
    final_data = pd.concat(data_array)
    final_data = final_data.groupby(final_data.index).max() #This is where duplicates are dropped.
    return final_data.sort_index()

What you’ve done is defined an array of a tuple of arguments (parameters) that can are iterated over, to spawn each parallel worker. In the tuple we pass a reference to the Pandas DataFrame, and the data chunk the worker function should work on. Note that the worker function returns that chunk, and concatenates it back into a final DataFrame. After doing is, note the groupby() function that is called, this is where we drop the duplicate records at the edges that were included in each chunk.

Here’s what your worker would do to work on its chunk:

def worker(params):
    num = params[0]
    data = params[1]
    split_interval = params[2]
    split_start = num*split_interval
    split_end = ((num+1)*split_interval)+30
    this_data = data.iloc[split_start:split_end].copy()
    # ...do work on this_data chunk, which includes records from 30 seconds in the future
    # Add new columns to this_data, or whatever
    return this_data

Note this line: split_end = ((num+1)*split_interval)+30. In the chunk you’re working on, you’re including the next 30 records, which in this example represent the next 30 seconds that you need in your calculations.

And finally to tie it together, you do:

if __name__ == '__main__':
    data = pd.DataFrame(...) #Load data
    data_count = len(data)
    split_interval = data_count / cpu_count
    final_data = handler(data, split_interval) #This is the data with all the work done on it

Parallelize Pandas map() or apply()

Pandas is a very useful data analysis library for Python. It can be very useful for handling large amounts of data.

Unfortunately Pandas runs on a single thread, and doesn’t parallelize for you. And if you’re doing lots of computation on lots of data, such as for creating features for Machine Learning, it can be pretty slow depending on what you’re doing.

To tackle this problem, you essentially have to break your data into smaller chunks, and compute over them in parallel, making use of the Python multiprocessing library.

Let’s say you have a large Pandas DataFrame:

import pandas as pd

data = pd.DataFrame(...) #Load data

And you want to apply() a function to the data like so:

def work(x):
    # Do something to x
    # return something

data = data.apply(work)

What you can do is break the DataFrame into smaller chunks using numpy, and use a Pool from the multiprocessing library to do work in parallel on each chunk, like so:

import numpy as np
from multiprocessing import cpu_count, Parallel

cores = cpu_count() #Number of CPU cores on your system
partitions = cores #Define as many partitions as you want

def parallelize(data, func):
    data_split = np.array_split(data, partitions)
    pool = Pool(cores)
    data = pd.concat(pool.map(func, data_split))
    pool.close()
    pool.join()
    return data

And that’s it. Now you can call parallelize on your DataFrame like so:

data = parallelize(data, work);

Run it, and watch your system’s CPU utilization shoot up to 100%! And it should finish much faster, depending on how many cores you have. 8 cores should theoretically be 8x faster. Or you could fire up an AWS EC2 instance with 32 cores and run it 32x faster!

Removing neighboring (consecutive-only) duplicates in a Pandas DataFrame

Pandas, the Python Data Analysis Library, makes it easy to drop duplicates from a DataFrame, using the drop_duplicates() function (http://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.drop_duplicates.html).

The problem with this is it removes ALL duplicates anywhere in your DataFrame. Depending on what you’re doing, you may not want to get rid of all duplicates everywhere, but only neighboring duplicates. That is, duplicates that are consecutive. But if there’s a duplicate after a non-duplicate row, that’s okay, for your purpose.

For example, you may have the following data:

1 2 3
1 2 3
1 5 5
1 2 3
1 5 5

You only want to get rid of consecutive duplicates (which in this case are only the first two rows), and get this result:

1 2 3
1 5 5
1 2 3
1 5 5

You can accomplish this using the pandas shift() function, which can be used to get the very next element, like so:

data = data.loc[data.shift() != data]

What this does is for every row in the DataFrame, it compares it to the next row. If all columns are equal to the columns in the next row, the row does not get repeated.

Note: this only works if you have simple elements in your DataFrame that can be checked to be equivalent (in the example above all elements are integers). Otherwise you’ll need to extend the type of element, and implement an equivalency function.