Routing destination IPs through OpenVPN on DD-WRT routers

DD-WRT is an excellent router OS. It comes with an OpenVPN client, so you can route all (or selective) outbound traffic through a VPN.

The OpenVPN client in DD-WRT makes it easy to specify the source IP addresses that need to have all their traffic routed through the VPN connection. This is done by specifying the source IP addresses (corresponding to devices on your internal network) in policy based routing.

However, there’s no way to specify what destination IP addresses you want routed through the VPN. For example, if it doesn’t matter what the originating source device is on your network, but you want only a certain set of destination addresses (out on the Internet) routed through the VPN connection.

To do this is pretty straight forward. You can ssh into your router (you’ll need to enable ssh management) and run this command:

ip rule add to table 10
ip route flush cache

And that’s it. Table 10 is the routing table for the VPN connection, and thus this command will make any traffic destined to the IP route through the VPN connection.

A couple caveats: the OpenVPN client will need to be already connected before you create this routing rule, the routing rule won’t persist through system reboots, and if the OpenVPN connection is dropped for whatever reason, the client software will reconnect it, but the rule will need to be created again.

A crude way around all this is to create a script that runs at bootup/startup for the router. You can define this startup script Administration -> Commands in the DD-WRT interface. The script can run every 5 minutes and create the rule. So when the router is restarted, or if the OpenVPN client connection drops and reconnects, the routing rule will just get re-created. *Note: this is obviously a very inelegant way of getting this done, but it does the job. I’m sure you can improve on this in many ways.

You can add this to your bootup/startup commands for the router:

echo '#!/bin/sh' > /tmp/
echo 'while true; do' >> /tmp/
echo '	ip rule add to table 10' >> /tmp/
echo '	ip route flush cache' >> /tmp/
echo '	sleep 300' >> /tmp/
echo 'done' >> /tmp/
chmod +x /tmp/
nohup /tmp/ >> /dev/null 2>&1 &

Skip certain file extensions from Morgan HTTP logger for Express.js

Morgan is a useful HTTP request logger middleware for Express.js, which plugs in nicely to Node.js and the MEAN stack. More info on Morgan at

One useful feature is to add a filter to skip certain files that you don’t want logged. For example you may not want a log of every single get of an image file.

First you define a filter function that returns a boolean for certain file extensions types:

function skipLog (req, res) {
  var url = req.url;
    url = url.substr(0,url.indexOf('?'));
  if(url.match(/(js|jpg|png|ico|css|woff|woff2|eot)$/ig)) {
    return true;
  return false;

The function above will return true for any files with the extension .js, .jpg, .png, (and so on…). Note: you’ll want to return true for skips because you want to evaluate it to skip=true. Also note that the code extracts out the filename from the URL in case there are request parameters attached to it.

Then to use it, you would initiate Morgan like so when setting it up in express.js:

var morgan = require('morgan');
var express = require('express');
var app = express();
app.use(morgan('combined', {stream: accessLogStream, skip: skipLog}));

And that’s it!

Redirecting all stdout and stderr to Logger in Java

This would seem obvious, but it wasn’t to me, so I thought I’d write about it to help out anyone else attempting to accomplish the same. It’s pretty straight forward actually.

Let’s say you have a java.util.logging.Logger object that you’ve initialized, and you want to redirect all stderr (exceptions, System.err.print()’s) and stdout (System.out.print()) to it. You’ll want to use System.setErr and System.setOut to a custom object which writes to your Logger object.

Let’s first define a class to do this for us, and then I’ll explain how it works:

class CustomOutputStream extends OutputStream 
	Logger logger;
	Level level;
	StringBuilder stringBuilder;
	public CustomOutputStream(Logger logger, Level level)
		this.logger = logger;
		this.level = level;
		stringBuilder = new StringBuilder();
	public final void write(int i) throws IOException 
		char c = (char) i;
		if(c == '\r' || c == '\n')
				stringBuilder = new StringBuilder();

The way this works is by extending OutputStream and overriding the write() method. But write() only takes one character at a time, so essentially you want to buffer each character into a String Builder, to build up the whole line, until you encounter a \r or \n (carriage return, new line), and then submit it to the logger.

To attach CustomOutputStream to your logger:

Logger logger = Logger.getLogger(...);
		new PrintStream(
			new CustomOutputStream(logger,Level.SEVERE) //Or whatever logger level you want
		new PrintStream(
				new CustomOutputStream(logger,Level.FINE) //Or whatever logger level you

Note: if you’ve configured your logger to always include the class/method with the log message, a side effect of this is that the output will not include your original method that wrote the log message to stderr or stdout , but instead your.package.CustomOutputStream.write().

Happy logging!

Using a SOCKS proxy in Java’s HttpURLConnection

Doing a Google Search on how to get Java’s URLConnection or HttpURLConnection to use a SOCKS proxy yields many results on how to pass in arguments to the JVM to set it up, or to call System.setProperty(), which then sets the SOCKS proxy to be used for all HTTP connections through Java. But what if you want to limit it to only certain connections started from HttpURLConnection, or if the proxy address isn’t available until later on?

Here’s a code snippet on how you’d go about doing that programatically.

String proxyString = ""; //ip:port
String proxyAddress[] = proxyString.split(":");
Proxy proxy = new Proxy(Proxy.Type.SOCKS, new InetSocketAddress(proxyAddress[0], Integer.parseInt(proxyAddress[1])));
URL url = new URL("");
HttpURLConnection httpUrlConnection = (HttpURLConnection) url.openConnection(proxy);
//do whatever with httpUrlConnection, it will be connected through the SOCKS proxy!

And that’s it.

Creating your own thread dispatcher in Java

Java offers ExecutorService, which can be used for managing and dispatching threads. But ExecutorService has limitations. One of them being the fact that if you create a fixed thread pool, you need to define all your threads up front. What if you’ll be spawning a million threads (but only a handful will be running at a given moment)? That would take up a lot of memory. And in some cases, you may need to know the outcome of certain threads to schedule new threads. It’s hard and cumbersome to do this using ExecutorService.

On the other hand, you can easily create your own thread dispatcher service, which can be limited to run only a certain number of threads at a time. See code snippet below:

final static int MAX_THREADS_AT_A_TIME = 10;
static int currentlyRunningThreadsCount = 0;
static Object dispatcherLock = new Object();

public static void main(String args[])
	for(int i=0; i<100; i++)
		final int thisThreadCount = i+1;
		new Thread(new Runnable() {
			public void run() {
				//Do something
				System.out.println("Thread "+thisThreadCount+" starting.");
				try { Thread.sleep(5000); } catch(InterruptedException e) { }
				System.out.println("Thread "+thisThreadCount+" finished.");
		if(currentlyRunningThreadsCount >= MAX_THREADS_AT_A_TIME)
				try { dispatcherLock.wait(); } catch(InterruptedException e) { }

Let’s break it down. In this case we’re spawning 100 threads, but limiting it to run only 10 threads at a time. Everything outside of the thread runnable is part of the “dispatcher” service. The dispatcher loop uses a counter currentlyRunningThreadsCount to track how many threads are running at a time. If there are 10, it wait()’s on a lock object. And as each thread finishes its work, it decrements currentlyRunningThreadsCount and calls notify() on the lock object, which wakes up the dispatcher and it moves on to spawn more.

Pretty simple, right?!

Scheduling Tasks in the MEAN stack

As I was still learning my way around the MEAN stack, I couldn’t wrap my mind around how to schedule future tasks in the MEAN stack (such as sending an email). This was because MEAN is so stateless, transactional. Or at least it seems to be. But, recall that we have ExpressJS and Node.js in the picture, which are always running on the server side.

Given this, the solution turns out to be very simple. You can schedule tasks in Node using the node-schedule package. To install node-schedule, run the command:

npm install node-schedule

Then in ExpressJS you can set up tasks like so:

var schedule = require("node-schedule");
var dailyRule = new schedule.RecurrenceRule();
dailyRule.second = 0;
dailyRule.minute = 0;
dailyRule.hour = 0;
var onceADay = schedule.scheduleJob(dailyRule, function(){
  //Do something here

I’ve found that it’s easy to put this in ExpressJS’s app.js (or wherever your server startup script is), since that is called on the MEAN application’s server side startup.

Parallelize Pandas map() and apply() while accounting for future records

A few blog posts ago, I covered how to parallelize Pandas map() and apply(). You can read more about it at … Essentially it works by breaking the data into smaller chunks, and using Python’s multiprocessing capabilities you call map() or apply() on the individual chunks of data, in parallel.

This works great, but what if it’s time series data, and part of the data you need to process each record lies in a future record? For example, if you are tracking the change of price from one moment to what it will be in a moment in the future. In this case the approach I laid out about dividing it into chunks will not work, because as you reach the end of a chunk, you will not have the future records to use.

It turns out that there’s a relatively simple way to do this. Essentially you determine how much in the future you need to go, and include those extra records in each chunk (so some records at the edges are duplicated in chunks), and then drop them at the very end.

So let’s say for each record, you also need records from up to 30 seconds in the future, for your calculation. And each record in your data represents 1 second. So essentially you include 30 extra records in each chunk so they are available for the parallel calculations. And then drop them later.

You start by setting up your parallel processor function like so:

import pandas as pd
import multiprocessing

cpu_count = multiprocessing.cpu_count()

def parallelize(data, split_interval):
    splits = range(0, cpu_count)
    parallel_arguments = []
    for split in splits:
        parallel_arguments.append([split, data, split_interval])
    pool = multiprocessing.Pool(cpu_count)
    data_array =, parallel_arguments)
    final_data = pd.concat(data_array)
    final_data = final_data.groupby(final_data.index).max() #This is where duplicates are dropped.
    return final_data.sort_index()

What you’ve done is defined an array of a tuple of arguments (parameters) that can are iterated over, to spawn each parallel worker. In the tuple we pass a reference to the Pandas DataFrame, and the data chunk the worker function should work on. Note that the worker function returns that chunk, and concatenates it back into a final DataFrame. After doing is, note the groupby() function that is called, this is where we drop the duplicate records at the edges that were included in each chunk.

Here’s what your worker would do to work on its chunk:

def worker(params):
    num = params[0]
    data = params[1]
    split_interval = params[2]
    split_start = num*split_interval
    split_end = ((num+1)*split_interval)+30
    this_data = data.iloc[split_start:split_end].copy()
    # work on this_data chunk, which includes records from 30 seconds in the future
    # Add new columns to this_data, or whatever
    return this_data

Note this line: split_end = ((num+1)*split_interval)+30. In the chunk you’re working on, you’re including the next 30 records, which in this example represent the next 30 seconds that you need in your calculations.

And finally to tie it together, you do:

if __name__ == '__main__':
    data = pd.DataFrame(...) #Load data
    data_count = len(data)
    split_interval = data_count / cpu_count
    final_data = handler(data, split_interval) #This is the data with all the work done on it