Calculating running estimate of mean and standard deviation in Python

Say you have a stream of means and standard deviations for a random variable x that you want to combine. So essentially you’re combining two groups of means and standard deviations, G_{x,1} = \mu_{x,1}, \sigma_{x,1} and
G_{x,2} = \mu_{x,2}, \sigma_{x,2} .

If you have access to the random variable x‘s value coming in as a stream, you can collect the values for some n_{x,1} number of values and calculate the mean and standard deviation to form a group G_{x,1}, and then combine it with the mean and standard deviation of the next group G_{x,2} consisting of the next n_{x,2} values of x to form: G_{x,1:2} = \mu_{x,1:2}, \sigma_{x,1:2}

The formulas for the combined means and standard deviations are:

n_{x,1:2}=n_{x,1}+n_{x,2}
\mu_{x,1:2}=\frac{(n_{x,1}\mu_{x,1}) + (n_{x,2}\mu_{x,2})}{ n_{x,1:2} }
\sigma_{x,1:2}=\sqrt{\frac{(n_{x,1}-1)\sigma_{x,1}^{2} + (n_{x,2}-1)\sigma_{x,2}^{2} + n_{x,1}(\mu_{x,1} - \mu_{x,1:2})^{2} + n_{x,2}(\mu_{x,2} - \mu_{x,1:2})^{2} }{n_{x,1:2}-1}}

Note that this is the Bessel corrected standard deviation calculation according to https://en.wikipedia.org/wiki/Standard_deviation#Corrected_sample_standard_deviation, which I found leads to a better estimate.

In Python code, this is what it looks like:

import numpy as np
np.random.seed(31337)

def combine_mean_std(mean_x_1, std_x_1, n_x_1, mean_x_2, std_x_2, n_x_2):
    n_x_1_2 = n_x_1 + n_x_2
    mean_x_1_2 = (mean_x_1 * n_x_1 + mean_x_2 * n_x_2) / n_x_1_2
    std_x_1_2 = np.sqrt(((n_x_1 - 1) * (std_x_1 ** 2) + (n_x_2 - 1) * (
                            std_x_2 ** 2) + n_x_1 * ((mean_x_1_2 - mean_x_1) ** 2)
                            + n_x_2 * ((mean_x_1_2 - mean_x_2) ** 2))
                        / (n_x_1_2 - 1))
    return mean_x_1_2, std_x_1_2, n_x_1_2

total_mean_x = None
total_std_x = None
total_n_x = 0

all_x = None # For getting the actual mean and std for comparison with the running estimate

for i in range(10):
    x = np.random.randint(0, 100, np.random.randint(1, 100))
    if all_x is None:
        all_x = x
    else:
        all_x = np.concatenate((all_x,x),axis=0)
    mean_x = x.mean()
    std_x = x.std()
    n_x = x.shape[0]
    if total_mean_x is None and total_std_x is None:
        total_mean_x = mean_x
        total_std_x = std_x
        total_n_x = n_x
    else:
        total_mean_x, total_std_x, total_n_x = combine_mean_std(total_mean_x, total_std_x, total_n_x,
                                                                mean_x, std_x, n_x)

print(total_mean_x, total_std_x, total_n_x)
print(all_x.mean(), all_x.std(), all_x.shape[0])

If you run the code above and inspect the values printed at the end, you’ll note that the running estimate in total_mean_x and total_std_x are almost exactly the same as the actual mean and std output by literally collecting all x values and calculating the two values (but which may not be possible or feasible in your task).

Inspecting gradients in Chainer

Chainer is my choice of framework when it comes to implementing Neural Networks. It makes working with and trouble shooting deep learning easy.

Printing out the gradients during back propagation to inspect their values is sometimes useful in deep learning, to see if your gradients are as expected and aren’t either exploding (numbers too large) or vanishing (numbers too small). Fortunately, this is easy to do in Chainer.

Chainer provides access to the parameters in your model, and for each parameter, you can check the gradient during the back propagation step, stored in the optimizer (such as SGD or Adam). To access these, you can extend chainer.training.updaters.StandardUpdater() to additionally output the gradients, by defining your own StandardUpdater like so:

class CustomStandardUpdater(chainer.training.updaters.StandardUpdater):
    def __init__(self, train_iter, optimizer, device):
        super(CustomStandardUpdater, self).__init__(
            train_iter, optimizer, device=device)

    def update_core(self):
        super(CustomStandardUpdater, self).update_core()
        optimizer = self.get_optimizer('main')
        for name, param in optimizer.target.namedparams(include_uninit=False):
            print(name, param.grad)

In lines 9-10 you can see the parameters (weights) of your neural network being accessed through the optimizer, and for each parameter, the name and gradient is being output. This StandardUpdater can be attached to your training module as follows:

model = MyChainerModel()
optimizer.setup(model)
optimizer = chainer.optimizers.Adam()
train_iter = chainer.iterators.SerialIterator(train_dataset, batch_size=32, shuffle=True)
updater = CustomStandardUpdater(train_iter, optimizer, gpu)
trainer = training.Trainer(updater, stop_trigger=(100, 'epoch'))
trainer.run()

Parallelize Pandas map() and apply() while accounting for future records

A few blog posts ago, I covered how to parallelize Pandas map() and apply(). You can read more about it at http://blog.adeel.io/2016/11/06/parallelize-pandas-map-or-apply/ … Essentially it works by breaking the data into smaller chunks, and using Python’s multiprocessing capabilities you call map() or apply() on the individual chunks of data, in parallel.

This works great, but what if it’s time series data, and part of the data you need to process each record lies in a future record? For example, if you are tracking the change of price from one moment to what it will be in a moment in the future. In this case the approach I laid out about dividing it into chunks will not work, because as you reach the end of a chunk, you will not have the future records to use.

It turns out that there’s a relatively simple way to do this. Essentially you determine how much in the future you need to go, and include those extra records in each chunk (so some records at the edges are duplicated in chunks), and then drop them at the very end.

So let’s say for each record, you also need records from up to 30 seconds in the future, for your calculation. And each record in your data represents 1 second. So essentially you include 30 extra records in each chunk so they are available for the parallel calculations. And then drop them later.

You start by setting up your parallel processor function like so:

import pandas as pd
import multiprocessing

cpu_count = multiprocessing.cpu_count()

def parallelize(data, split_interval):
    splits = range(0, cpu_count)
    parallel_arguments = []
    for split in splits:
        parallel_arguments.append([split, data, split_interval])
    pool = multiprocessing.Pool(cpu_count)
    data_array = pool.map(worker, parallel_arguments)
    pool.close()
    pool.join()
    final_data = pd.concat(data_array)
    final_data = final_data.groupby(final_data.index).max() #This is where duplicates are dropped.
    return final_data.sort_index()

What you’ve done is defined an array of a tuple of arguments (parameters) that can are iterated over, to spawn each parallel worker. In the tuple we pass a reference to the Pandas DataFrame, and the data chunk the worker function should work on. Note that the worker function returns that chunk, and concatenates it back into a final DataFrame. After doing is, note the groupby() function that is called, this is where we drop the duplicate records at the edges that were included in each chunk.

Here’s what your worker would do to work on its chunk:

def worker(params):
    num = params[0]
    data = params[1]
    split_interval = params[2]
    split_start = num*split_interval
    split_end = ((num+1)*split_interval)+30
    this_data = data.iloc[split_start:split_end].copy()
    # ...do work on this_data chunk, which includes records from 30 seconds in the future
    # Add new columns to this_data, or whatever
    return this_data

Note this line: split_end = ((num+1)*split_interval)+30. In the chunk you’re working on, you’re including the next 30 records, which in this example represent the next 30 seconds that you need in your calculations.

And finally to tie it together, you do:

if __name__ == '__main__':
    data = pd.DataFrame(...) #Load data
    data_count = len(data)
    split_interval = data_count / cpu_count
    final_data = handler(data, split_interval) #This is the data with all the work done on it

Parallelize Pandas map() or apply()

Pandas is a very useful data analysis library for Python. It can be very useful for handling large amounts of data.

Unfortunately Pandas runs on a single thread, and doesn’t parallelize for you. And if you’re doing lots of computation on lots of data, such as for creating features for Machine Learning, it can be pretty slow depending on what you’re doing.

To tackle this problem, you essentially have to break your data into smaller chunks, and compute over them in parallel, making use of the Python multiprocessing library.

Let’s say you have a large Pandas DataFrame:

import pandas as pd

data = pd.DataFrame(...) #Load data

And you want to apply() a function to the data like so:

def work(x):
    # Do something to x
    # return something

data = data.apply(work)

What you can do is break the DataFrame into smaller chunks using numpy, and use a Pool from the multiprocessing library to do work in parallel on each chunk, like so:

import numpy as np
from multiprocessing import cpu_count, Parallel

cores = cpu_count() #Number of CPU cores on your system
partitions = cores #Define as many partitions as you want

def parallelize(data, func):
    data_split = np.array_split(data, partitions)
    pool = Pool(cores)
    data = pd.concat(pool.map(func, data_split))
    pool.close()
    pool.join()
    return data

And that’s it. Now you can call parallelize on your DataFrame like so:

data = parallelize(data, work);

Run it, and watch your system’s CPU utilization shoot up to 100%! And it should finish much faster, depending on how many cores you have. 8 cores should theoretically be 8x faster. Or you could fire up an AWS EC2 instance with 32 cores and run it 32x faster!

Removing neighboring (consecutive-only) duplicates in a Pandas DataFrame

Pandas, the Python Data Analysis Library, makes it easy to drop duplicates from a DataFrame, using the drop_duplicates() function (http://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.drop_duplicates.html).

The problem with this is it removes ALL duplicates anywhere in your DataFrame. Depending on what you’re doing, you may not want to get rid of all duplicates everywhere, but only neighboring duplicates. That is, duplicates that are consecutive. But if there’s a duplicate after a non-duplicate row, that’s okay, for your purpose.

For example, you may have the following data:

1 2 3
1 2 3
1 5 5
1 2 3
1 5 5

You only want to get rid of consecutive duplicates (which in this case are only the first two rows), and get this result:

1 2 3
1 5 5
1 2 3
1 5 5

You can accomplish this using the pandas shift() function, which can be used to get the very next element, like so:

data = data.loc[data.shift() != data]

What this does is for every row in the DataFrame, it compares it to the next row. If all columns are equal to the columns in the next row, the row does not get repeated.

Note: this only works if you have simple elements in your DataFrame that can be checked to be equivalent (in the example above all elements are integers). Otherwise you’ll need to extend the type of element, and implement an equivalency function.