Showing posts with label big data. Show all posts
Showing posts with label big data. Show all posts

Tuesday, September 26, 2017

Mean Imputation in Apache Spark

If you are interested in building predictive models on Big Data, then there is a good chance you are looking to use Apache Spark. Either with MLLib or one of the growing number of machine learning extensions built to work with Spark such as Elephas which lets you use Keras and Spark together.

For all its flexibility, you will find certain things about working with Spark a little tedious.

For one thing you will have to write a lot of boilerplate code to clean and prepare features before training models. MLLib only accepts numerical data, and will not handle NULL values. There are some great library functions for String indexing and One Hot Encoding. However, you will still need to explicitly apply all these functions yourself. Compared with using a product like H2O, which does all this via configuration options in the model, writing this code can be a chore.

As frustrating as it is, the flexibility of being able to build reusable Machine Learning Pipelines in which you can make your feature generation part of the Meta-Parameters, will make it all worthwhile in the end.

So, to this end, here is a little bit of re-usable code that will help with one common data preparation task: Mean Imputation of Missing Values.


If you are lucky enough to be on Spark 2.2.0 then there is a library function you can use to do this. This was lifted directly from this StackOverflow post:

import org.apache.spark.ml.feature._

def imputeMeans(df: org.apache.spark.sql.DataFrame, feet: Array[String]): (org.apache.spark.sql.DataFrame, Array[String]) = {
        val outcols = feet.map(c => s"${c}_imputed")
        val imputer = new Imputer().setInputCols(feet).setOutputCols(outcols).setStrategy("mean")
        val resdf = imputer.fit(df).transform(df)
        (resdf, outcols)
}

Which should be fairly self explanatory, you simply pass the function a dataframe and an array of column names. You will get back a new data frame with additional columns in which the '_imputed' suffix has been added.


If you are on an older version of Spark then you can do the following

def imputeToMean(df: org.apache.spark.sql.DataFrame, col: String): org.apache.spark.sql.DataFrame = {
        val meanVal = df.select(mean(df(col))).collectAsList().get(0).getDouble(0)
        df.na.fill(meanVal, Seq(col))
}

def imputeMeans(df: org.apache.spark.sql.DataFrame, cols: Array[String]): org.apache.spark.sql.DataFrame = {
        cols.foldLeft(df)( (dfin, col) => imputeToMean(dfin, col) )
}

Again the imputeMeans function takes a DataFrame and an array of column names. This time it just returns a new DataFrame in which the required columns have had their NULLs replaced with the column mean. This version can be time consuming to run, so I suggest that you cache it once it is done.

Hope that helps.

Wednesday, September 6, 2017

Apache Spark - Cumulative Aggregations over Groups using Window Functions

There are many tasks you want to perform over a data set that correspond to some form of aggregation over groups of your data, but in a way that takes some kind of ordering into consideration.

For example calculating the cumulative total sales per salesperson over the course of each financial year. You want the data grouped by salesperson and the financial year, but then transformed to calculate the cumulative total up to and including each sale.

These kinds of functions are possible in SQL syntax, but they are complicated, and difficult to read. Apache Spark by comparison has a very elegant and easy to use API for generating these kinds of results.

Here is a simple example in which we want the average sales per salesperson, leading up to and including their current sale.

First lets create a dummy data set and look at it. (In reality you will usually be loading an enormous set from your cluster, but this lets us experiment with the API).

val redy = sc.parallelize(
    Seq((201601, "Jane", 10), (201602, "Tim", 20), (201603, "Jane", 30),
        (201604, "Tim", 40), (201605, "Jane", 50), (201606, "Tim", 60),
        (201607, "Jane", 70), (201608, "Jane", 80), (201609, "Tim", 90),
        (201610, "Tim",  100), (201611, "Jane", 110), (201612, "Tim", 120)
    )
)

case class X(id: Int, name: String, sales: Int)
val redy2 = redy.map( in => X(in._1, in._2, in._3) )
val df = sqlContext.createDataFrame(redy2)

df.show

...and this it what it looks like

+------+----+-----+
|    id|name|sales|
+------+----+-----+
|201601|Jane|   10|
|201602| Tim|   20|
|201603|Jane|   30|
|201604| Tim|   40|
|201605|Jane|   50|
|201606| Tim|   60|
|201607|Jane|   70|
|201608|Jane|   80|
|201609| Tim|   90|
|201610| Tim|  100|
|201611|Jane|  110|
|201612| Tim|  120|
+------+----+-----+

To add our additional column with the aggregation over the grouped and ordered data, it is as simple as:

df.withColumn("avg_sales", avg(df("sales"))
   .over( Window.partitionBy("name").orderBy("id") )
).show


...which will produce the following output:


+------+----+-----+------------------+
|    id|name|sales|         avg_sales|
+------+----+-----+------------------+
|201602| Tim|   20|              20.0|
|201604| Tim|   40|              30.0|
|201606| Tim|   60|              40.0|
|201609| Tim|   90|              52.5|
|201610| Tim|  100|              62.0|
|201612| Tim|  120| 71.66666666666667|
|201601|Jane|   10|              10.0|
|201603|Jane|   30|              20.0|
|201605|Jane|   50|              30.0|
|201607|Jane|   70|              40.0|
|201608|Jane|   80|              48.0|
|201611|Jane|  110|58.333333333333336|
+------+----+-----+------------------+


Voila. An additional column containing the average sales per salesperson leading up to and including the current sale. You can modify this to change the aggregation function, add additional columns to the grouping or the ordering. It is clean and readable, and fast.


Monday, November 10, 2014

Basic Guide to Setting Up Single Node Hadoop 2.5.1 Cluster on Ubuntu



So, you have decided you are interested in big data and data science and exploring what you can do with Hadoop and Map Reduce.

But... you find most of the tutorials too hard to wade through, inconsistent, or you simply encounter problems that you just can't solve. Hadoop is evolving so fast that often the documentation is unable to keep up. 

Here I will run you through the process I followed to get the latest version of Hadoop (2.5.1) running so I could use it to test my Map Reduce programs. 

You can see the official Apache Docs here.


Part One: Java

You need to make sure you have a compatible version of Java on your machine.

Jump into your terminal and type
java -version
You preferably need an installation of Java 7.
When I run this I get:

java version "1.7.0_55"
OpenJDK Runtime Environment (IcedTea 2.4.7) (7u55-2.4.7-1ubuntu1~0.12.04.2)
OpenJDK 64-Bit Server VM (build 24.51-b03, mixed mode)


Part Two: Other Software

You will need ssh and rsync installed. Chances are that they already are, but if not just run:
sudo apt-get install ssh
sudo apt-get install rsync


Part Three: Grab a Release

Head to the Apache Hadoop Releases page, choose a mirror and grab the tarball (.tar.gz). Make sure you do not grab the source file by mistake (src).
Remember: in this walk-through I have grabbed release: 2.5.1

Part Four: Unpack & Configure

Copy the tarball to wherever you want Hadoop to reside. For me I like to put it in the directory
/usr/local/hadoop
and then extract the contents with
tar -xvf hadoop-2.5.1.tar.gz
Then you will need to do some configuration. Open the file
vi hadoop-2.5.1/etc/hadoop/hadoop-env.sh
You will need to modify the line that currently looks like this
export JAVA_HOME=${JAVA_HOME}

You need to point this to your java installation. If you are not sure where that it just run
which java

and then copy the path (minus the bin/java at the end) into the hadoop config file to replace the text ${JAVA_HOME}.



Part Five: Test

First run a quick to check that you have configured java correctly. The following command should show you the version of hadoop and its compilation information.

hadoop-2.5.1/bin/hadoop version

Part Six: Run Standalone

The simplest thing you can do with hadoop is run a map reduce job as a stand alone script.

The Apache Docs give a great simple example: grepping a collection of files.

Run these commands:
mkdir input
cp hadoop-2.5.1/etc/hadoop/*.xml input
hadoop-2.5.1/bin/hadoop jar hadoop-2.5.1/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.5.1.jar grep input output 'dfs[a-z.]+'

When hadoop completes that process you can open up the results file and have a look.
vi output/part-r-00000
You should see a single line for each match of the regular expression. Trying changing the expression and seeing what you get. Now you can use this installation to test your map reduce jars against Hadoop 2.5.1.


Coming Next: Running Hadoop 2.5.1 in Pseudo Distributed Mode

Thursday, October 31, 2013

Maintaining Constant Probability of Data Loss with Increased Cluster Size in HDFS

In a conversation with a colleague some months ago I was asked if I knew how to scale the replication factor of a Hadoop Distributed File System (HDFS) cluster as the number of nodes increased in order to keep the probability of experiencing any data loss below a certain threshold. My initial reaction to the question was that it would not be affected, I was naively thinking the data loss probability was a product of the replication factor only.

Thankfully, it didn't take me long to realize I was wrong. What is confusing, is that for a constant replication factor as the cluster grows the probability of data loss increases, but the quantity of data lost decreases (if the quantity of data remains constant).

To see why consider a situation in which we have N nodes in a cluster with replication factor K. We let the probability of a single node failing in a given time period be X. This time period needs to be sufficiently small so that we know that the server administrator will not have enough time to replace the machine or drive and recover the data. The probability of experiencing data loss in that time period is the probability of getting K or more nodes failing. The exact value of which is calculated with the following sum:

Although in general a good approximation (a consistent overestimate) is simply:


Clearly as the size of N increases this probability must get bigger.

This got me thinking about how to determine the way the replication factor should scale with the cluster size in order to keep the probability of data loss constant (ignoring the quantity). This problem may have been solved elsewhere, but it was an enjoyable mathematical exercise to go through.

In essence we want to know if the number of nodes in the cluster increases by some value n, then what is the minimum number k such that the probability of data loss remains the same or smaller. Using the approximation from above we can express this as:


Now if we substitute in the formulas for N-choose-K and perform some simplifications we can transform this into:


I optimistically thought that it might be possible to simplify this using Stirling's Approximation, but I am now fairly certain that this is not possible. Ideally we would be able to express k in terms of N,n,K,X, but I do not think that it is possible. If you are reading this and can see that I am wrong please show me how.

In order to get a sense of the relationship between n and k I decided to do some quick numerical simulations in R to have a look at how k scales with n.

I tried various combinations of X, N and K. Interestingly for a constant X the scaling was fairly robust when you varied the initial values of N and K. I have plotted the results for three different values of X so you can see the effect of different probability of machine failure. In all three plots the baseline case was a cluster of 10 nodes with a replication factor of 3.

You can grab the R code used to generate these plots from my GitHub repository.









Tuesday, September 3, 2013

Using AWK for Data Science

Over the years I have become convinced that one of the essential tools needed by anyone whose job consists of working with data is the unix scripting language AWK. It will save you an awful lot of time when it comes to processing raw text data files.

For example, taking a large delimited file of some sort and pre-processing its columns to pull out just the data you want, perform basic calculations or prepare it for entry into a program that requires a specific format.

AWK has saved me countless hours over the years, so now I am writing a super brief primer that should not only convince you it is worth learning but show you some examples.

The first thing you need to know about AWK is that it is data driven, unlike most other languages for which execution is constrained largely by procedural layout of the instructions. AWK instructions are defined by patterns in the data to which actions should be applied. If you are familiar with the regular expression type control structures available in PERL then this should seem like a comfortable idea.

The programs are also data driven in the sense that the entire program is applied to every line of the file (as long as there are patterns that match) and furthermore the program has inbuilt access to the columns of data inside the file through the $0, $1, 2 ... variables: where $0 contains the entire line and $1 upwards has the data from individuals columns. By default the columns are expected to be TAB separated, but you can follow your AWK script with FS=',' to use a comma or any other field separator.

To run a simple AWK script type:

   >awk 'AWK_SCRIPT_HERE' FILE_TO_PROCESS

The basic syntax of the scripts themselves consists of mutiple pattern action pairs defined like this:
PATTERN {ACTION}

One need not include an PATTERN, in which case the action will be applied to every line inside the file to which the program is applied.

So for example, the following program will out the sum of columns 3 and 4

>awk '{print $3+$4}' FILENAME

If we only wanted this to happen when column 1 contained the value 'COSTS' we have a number of options. We could simply use the pattern equivalent of an IF statement as follows:

>awk '$1=="COSTS" {print $3+$4} FILENAME

Alternatively we could use a PATTERN expression as follows

>awk '/COSTS/ {print $3+$4} FILENAME

The problem with the second solution is that it if for some reason the word COSTS can appear in other fields or places in the file then may not get the results you are looking for. There can be a trade off for using the power and flexibility of the regular expression patterns and their ability to lull us into a false sense of security about what they are doing.

There are several special execution paths that can be included in the program. In place of the pattern you can include the reserved words BEGIN or END in order to execute routine before or after the file processing occurs. This is particularly useful for doing something like calculating a MEAN, shown below:

>awk '{sum+=$1; count+=1} END {print sum/count}' FILENAME

By now you should be seeing the appeal of AWK. You can manipulate your data quickly with small scripts that do not require loading an enormous file into a spreadsheet, or writing a more complicated JAVA or PYTHON program.

Finally here are a few of the kinds of tasks that I do with AWK all the time

1)  Convert some file with 10 or more columns into one with a sum of a few and reformating the others:

>awk '{print toupper($1) "," ($3/100) "," ($2+$4-$5)}' FILENAME


2) Calculate the Mean and Standard Deviation on a column. (The following is fora sample, just change the n-1 to n for a complete population.

> awk 'pass==1 {sum+=$1; n+=1} pass==2 {mean=sum/(n-1); ssd+=($1-mean)*($1-mean)} END {print sqrt(ssd/(n-1))}' pass=1 FILENAME pass=2 FILENAME


3) Calculate the Pearson correlation coefficient between a pair of columns. Again for a sample of the data. Change n-1 to n to do the calculation on the entire population data.

> awk 'pass==1 {sx+=$1; sy+=$2; n+=1} pass==2 {mx=sx/(n-1); my=sy/(n-1); cov+=($1-mx)*($2-my); ssdx+=($1-mx)*($1-mx); ssdy+=($2-my)*($2-my);} END {print cov / ( sqrt(ssdx) * sqrt(ssdy) ) }' pass=1 FILENAME pass=2 FILENAME

If you have any great tips for getting more out of AWK let me know, I am always looking for shortcuts.