Tuesday, September 26, 2017

Mean Imputation in Apache Spark

If you are interested in building predictive models on Big Data, then there is a good chance you are looking to use Apache Spark. Either with MLLib or one of the growing number of machine learning extensions built to work with Spark such as Elephas which lets you use Keras and Spark together.

For all its flexibility, you will find certain things about working with Spark a little tedious.

For one thing you will have to write a lot of boilerplate code to clean and prepare features before training models. MLLib only accepts numerical data, and will not handle NULL values. There are some great library functions for String indexing and One Hot Encoding. However, you will still need to explicitly apply all these functions yourself. Compared with using a product like H2O, which does all this via configuration options in the model, writing this code can be a chore.

As frustrating as it is, the flexibility of being able to build reusable Machine Learning Pipelines in which you can make your feature generation part of the Meta-Parameters, will make it all worthwhile in the end.

So, to this end, here is a little bit of re-usable code that will help with one common data preparation task: Mean Imputation of Missing Values.


If you are lucky enough to be on Spark 2.2.0 then there is a library function you can use to do this. This was lifted directly from this StackOverflow post:

import org.apache.spark.ml.feature._

def imputeMeans(df: org.apache.spark.sql.DataFrame, feet: Array[String]): (org.apache.spark.sql.DataFrame, Array[String]) = {
        val outcols = feet.map(c => s"${c}_imputed")
        val imputer = new Imputer().setInputCols(feet).setOutputCols(outcols).setStrategy("mean")
        val resdf = imputer.fit(df).transform(df)
        (resdf, outcols)
}

Which should be fairly self explanatory, you simply pass the function a dataframe and an array of column names. You will get back a new data frame with additional columns in which the '_imputed' suffix has been added.


If you are on an older version of Spark then you can do the following

def imputeToMean(df: org.apache.spark.sql.DataFrame, col: String): org.apache.spark.sql.DataFrame = {
        val meanVal = df.select(mean(df(col))).collectAsList().get(0).getDouble(0)
        df.na.fill(meanVal, Seq(col))
}

def imputeMeans(df: org.apache.spark.sql.DataFrame, cols: Array[String]): org.apache.spark.sql.DataFrame = {
        cols.foldLeft(df)( (dfin, col) => imputeToMean(dfin, col) )
}

Again the imputeMeans function takes a DataFrame and an array of column names. This time it just returns a new DataFrame in which the required columns have had their NULLs replaced with the column mean. This version can be time consuming to run, so I suggest that you cache it once it is done.

Hope that helps.

Wednesday, September 6, 2017

Apache Spark - Cumulative Aggregations over Groups using Window Functions

There are many tasks you want to perform over a data set that correspond to some form of aggregation over groups of your data, but in a way that takes some kind of ordering into consideration.

For example calculating the cumulative total sales per salesperson over the course of each financial year. You want the data grouped by salesperson and the financial year, but then transformed to calculate the cumulative total up to and including each sale.

These kinds of functions are possible in SQL syntax, but they are complicated, and difficult to read. Apache Spark by comparison has a very elegant and easy to use API for generating these kinds of results.

Here is a simple example in which we want the average sales per salesperson, leading up to and including their current sale.

First lets create a dummy data set and look at it. (In reality you will usually be loading an enormous set from your cluster, but this lets us experiment with the API).

val redy = sc.parallelize(
    Seq((201601, "Jane", 10), (201602, "Tim", 20), (201603, "Jane", 30),
        (201604, "Tim", 40), (201605, "Jane", 50), (201606, "Tim", 60),
        (201607, "Jane", 70), (201608, "Jane", 80), (201609, "Tim", 90),
        (201610, "Tim",  100), (201611, "Jane", 110), (201612, "Tim", 120)
    )
)

case class X(id: Int, name: String, sales: Int)
val redy2 = redy.map( in => X(in._1, in._2, in._3) )
val df = sqlContext.createDataFrame(redy2)

df.show

...and this it what it looks like

+------+----+-----+
|    id|name|sales|
+------+----+-----+
|201601|Jane|   10|
|201602| Tim|   20|
|201603|Jane|   30|
|201604| Tim|   40|
|201605|Jane|   50|
|201606| Tim|   60|
|201607|Jane|   70|
|201608|Jane|   80|
|201609| Tim|   90|
|201610| Tim|  100|
|201611|Jane|  110|
|201612| Tim|  120|
+------+----+-----+

To add our additional column with the aggregation over the grouped and ordered data, it is as simple as:

df.withColumn("avg_sales", avg(df("sales"))
   .over( Window.partitionBy("name").orderBy("id") )
).show


...which will produce the following output:


+------+----+-----+------------------+
|    id|name|sales|         avg_sales|
+------+----+-----+------------------+
|201602| Tim|   20|              20.0|
|201604| Tim|   40|              30.0|
|201606| Tim|   60|              40.0|
|201609| Tim|   90|              52.5|
|201610| Tim|  100|              62.0|
|201612| Tim|  120| 71.66666666666667|
|201601|Jane|   10|              10.0|
|201603|Jane|   30|              20.0|
|201605|Jane|   50|              30.0|
|201607|Jane|   70|              40.0|
|201608|Jane|   80|              48.0|
|201611|Jane|  110|58.333333333333336|
+------+----+-----+------------------+


Voila. An additional column containing the average sales per salesperson leading up to and including the current sale. You can modify this to change the aggregation function, add additional columns to the grouping or the ordering. It is clean and readable, and fast.


Tuesday, May 23, 2017

Inspecting Changed Files in a Git Commit


When you are getting set up with using GIT for data science work you will likely have some teething issues with getting your process and flow right.

One common one is that you have a GIT repo that controls your production code, but you make config changes in the prod code, and code changes in your working copy. If you have not graduated to PULL requests from DEV branches, then you might find yourself unable to push changes made without merging into PROD.

Here are some small things that are useful in understanding what has changed before you flick that switch.

Look at what the commits are that not in your current working version

git log HEAD..origin/master

This will give you something like

commit 30d1cb6a3564e09753078288a9317f1b2309ab81
Author: John Hawkins <johawkins@blahblahblah.com>
Date:   Tue May 23 10:03:10 2017 +1000

    clean up

commit d69a1765bcfa4257573707e9af2bb014c412d7d8
Author: Added on 2016-12-06i <johawkins@blahblahblah.com>
Date:   Tue May 16 05:39:14 2017 +0000

    adding deployment notes

You can then inspect the changes in the individual commits using:

 git show --pretty="" --name-only d69a1765bcfa4257573707e9af2bb014c412d7d8

Which will give you something like

commit d69a1765bcfa4257573707e9af2bb014c412d7d8
Author: Added on 2016-12-06i <johawkins@blahblahblah.com>
Date:   Tue May 16 05:39:14 2017 +0000

    adding deployment notes

drill_udfs/README.md

Note that it will show you the changed files at the end, which is often the most important thing to know. At least this help me understand what I am bringing in.