Example Python Machine Learning Algorithm on Spark

Combining software frameworks that do machine learning with ones that do parallel cluster computing is tricky. Because the tools are always changing, you want these tasks to be decoupled, but not so decoupled that it’s hard to put them together. Inevitably there will be some glue code in the middle. What follows is an example of glue that runs Python’s scikit-learn code in a  Spark parallel environment.

Built on top of Python’s SciPy scientific computing module, scikit-learn contains implementations of a broad range of machine learning algorithms along with helpful utilities for generating datasets, performing evaluations, working with standard corpora, and so forth. It is well-documented and relatively easy to use. Scikit-learn has support for running work in parallel on multiple cores of single machine, but no support for distributing work across a cluster.

Spark is a successor to Hadoop that takes the Map/Reduce paradigm back to its functional programming roots. Instead of writing Map and Reduce Java classes as you would in Hadoop, you apply anonymous map and reduce functions to a distributed data set, allowing you to focus on the data transformations you are interested in while minimizing boilerplate. Spark is implemented in Scala, but has support for other languages such as Java, Python and Ruby.

Here is a program that puts these two things together to run a basic boosting task in Spark.

from pyspark import SparkContext

import numpy as np

from sklearn.cross_validation import train_test_split, Bootstrap
from sklearn.datasets import make_classification
from sklearn.metrics import accuracy_score
from sklearn.tree import DecisionTreeClassifier

def run(sc):
	def zero_matrix(n, m):
		return np.zeros(n*m, dtype = int).reshape(n, m)

	def vote_increment(y_est):
		increment = zero_matrix(y_est.size, n_ys)
		increment[np.arange(y_est.size), y_est] = 1
		return increment # test point x class matrix with 1s marking the estimator prediction

	X, y = make_classification()
	X_train, X_test, y_train, y_test = train_test_split(X, y)

	n_test = X_test.shape[0]
	n_ys = np.unique(y_train).size

	model = DecisionTreeClassifier()
	# Partition the training data into random sub-samples with replacement.
	samples = sc.parallelize(Bootstrap(y.size))
	# Train a model for each sub-sample and apply it to the test data.
	vote_tally = samples.map(lambda (index, _):
		model.fit(X[index], y[index]).predict(X_test)
	).map(vote_increment).fold(zero_matrix(n_test, n_ys), np.add) # Take the learner majority vote.
	y_estimate_vote = np.argmax(vote_tally, axis = 1)
	return accuracy_score(y_test, y_estimate_vote)

if __name__ == '__main__':
	print run(SparkContext("local", "Boost"))

spark_parallel_boost.py randomly generates test and training data sets (lines 19-20), randomly selects subsets of the training set with replacement (line 27), uses the Spark framework to train an ensemble of decision trees with these subsets and apply them to the test set (line 30), then takes the ensemble majority vote and uses it to calculate a test prediction accuracy (lines 31-33). To see this script in action, set up Spark on your machine, then run it using the pyspark script that comes with the Spark distribution. It will generate a bunch of logger output followed by an accuracy score between 0 and 1.

Since the inputs are random, the accuracy results don’t mean anything. Also the code is all being run on your local machine. (That’s what the “local” argument to the SparkContext constructor in line 36 means.) Nevertheless, this can serve as a template for applications that do real learning on a cluster and illustrates both the advantages and frustrations of combining these two tools. The advantage is that the machine learning part itself is extremely simple: all the heavy lifting is done for you in line 30. The frustration is that although scikit-learn already implements various parallelizable techniques (boosting, cross validation, decision forests), it only does so for a single machine, so you have to write Spark versions yourself. (Note how here the most complicated part of the code is the part that takes the majority vote among the learners.) Still, it’s probably easier to roll your own ensemble techniques in Python than in Java Map and Reduce classes, so this is a good place to start.

This entry was posted in Those that have just broken the flower vase and tagged , , . Bookmark the permalink.

10 Responses to Example Python Machine Learning Algorithm on Spark

  1. Interesting – I’ve just been playing at implementing ensembles in Python with scikit-learn, but haven’t come across spark for the parallelisation. Will have to try it out!

  2. idosht says:

    Very Interesting – sklearn with spark and I tried this example on IPython notebook – I think these 3 will make life much easier 🙂

  3. spark in not a successor to “hadoop”. what do you mean by hadoop? you probably wanted to say spark is a successor to mapreduce.

    • W.P. McNeill says:

      I’m using Hadoop equivalently to MapReduce. You’re right: MapReduce is the idea, Hadoop is an implementation, but it’s default implementation for anyone who doesn’t work at Google so you can see how I’d make the equivalence.

  4. T.E says:

    Thanks for the example; I don’t suppose you’ve been able to retrieve the tree instances for later use? I had previously used Apache Pig’s python UDFs to train a bunch of Random Forests and persisted the models by pickling the model and returning it to pig as a chararray; is there an effective way to do this with spark?

  5. MJ says:

    Thanks for the example. While I am starting with pyspark, I wondered how to translate your code to use real data set from a file.
    I started with this :

    import numpy as np
    import pandas as pd
    from sklearn import cross_validation
    from sklearn.cluster import KMeans

    inputDf = pd.read_csv(“auto.csv”, sep=’\t’, \
    import patsy
    formula = ‘origin ~ mpg + cylinders + displace + hp + accel + C(year)’
    y, X = patsy.dmatrices(formula, data=inputDf)#, return_type=’dataframe’)
    X_train, X_test, y_train, y_test = cross_validation.train_test_split(X, y, test_size=0.2)
    algoModel = KMeans(n_clusters=3)
    samples = sc.parallelize(cross_validation.Bootstrap(len(y)))
    model = samples.map( lambda (index, _): algoModel.fit(X_train[index], y_train[index]).predict(X_test) )
    Predict = model.predict(X_test)

    Here I have two questions :
    – Is there an easy way to compare predicted and true values from y_test?
    – I would like to parallelize the data without bootstraping or at least without raplacement. is it possible?

    Thanks a lot

  6. Pingback: Example Python Machine Learning Algorithm on Spark | Analytics Team

  7. Pingback: 6 points to compare Python and Scala for Data Science using Apache Spark | Vademecum of Practical Data Science

  8. yumi says:

    This code is great but what if your data (i.e., X, y) is so large that it does not fit in a driver? I would like to use your code but Nrow > 10 millions… :\

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.