Tuesday, September 26, 2017

Mean Imputation in Apache Spark

If you are interested in building predictive models on Big Data, then there is a good chance you are looking to use Apache Spark. Either with MLLib or one of the growing number of machine learning extensions built to work with Spark such as Elephas which lets you use Keras and Spark together.

For all its flexibility, you will find certain things about working with Spark a little tedious.

For one thing you will have to write a lot of boilerplate code to clean and prepare features before training models. MLLib only accepts numerical data, and will not handle NULL values. There are some great library functions for String indexing and One Hot Encoding. However, you will still need to explicitly apply all these functions yourself. Compared with using a product like H2O, which does all this via configuration options in the model, writing this code can be a chore.

As frustrating as it is, the flexibility of being able to build reusable Machine Learning Pipelines in which you can make your feature generation part of the Meta-Parameters, will make it all worthwhile in the end.

So, to this end, here is a little bit of re-usable code that will help with one common data preparation task: Mean Imputation of Missing Values.


If you are lucky enough to be on Spark 2.2.0 then there is a library function you can use to do this. This was lifted directly from this StackOverflow post:

import org.apache.spark.ml.feature._

def imputeMeans(df: org.apache.spark.sql.DataFrame, feet: Array[String]): (org.apache.spark.sql.DataFrame, Array[String]) = {
        val outcols = feet.map(c => s"${c}_imputed")
        val imputer = new Imputer().setInputCols(feet).setOutputCols(outcols).setStrategy("mean")
        val resdf = imputer.fit(df).transform(df)
        (resdf, outcols)
}

Which should be fairly self explanatory, you simply pass the function a dataframe and an array of column names. You will get back a new data frame with additional columns in which the '_imputed' suffix has been added.


If you are on an older version of Spark then you can do the following

def imputeToMean(df: org.apache.spark.sql.DataFrame, col: String): org.apache.spark.sql.DataFrame = {
        val meanVal = df.select(mean(df(col))).collectAsList().get(0).getDouble(0)
        df.na.fill(meanVal, Seq(col))
}

def imputeMeans(df: org.apache.spark.sql.DataFrame, cols: Array[String]): org.apache.spark.sql.DataFrame = {
        cols.foldLeft(df)( (dfin, col) => imputeToMean(dfin, col) )
}

Again the imputeMeans function takes a DataFrame and an array of column names. This time it just returns a new DataFrame in which the required columns have had their NULLs replaced with the column mean. This version can be time consuming to run, so I suggest that you cache it once it is done.

Hope that helps.

6 comments: