Running Spark in yarn-client mode on a MapR cluster with your Windows based laptop

Introduction

Every now and then there’s a challenge at hand. I recently came across one, luckily. Someone made the decision to hand out a Windows based laptop to a promising data scientist. All the data scientists I have met so far want to run their stuff on Linux or Mac, or at least something that gives them a native ‘Unix’ prompt. The laptop is hardware encrypted with a locked bios, so the chances on getting a dual boot running were slim. Having only 4GB of memory did not give us a feasible virtual machine option either. So, Windows it is. Funny thing was, I have always wanted to get a ‘guest-laptop’ to be able to run jobs on a remote cluster, without having to do log in to the cluster itself.

Of course, there are a couple of prerequisites to get this up and running. For instance, the cluster must be ‘open’ to the laptop; the latter must be able to connect to a variety of ports. For the sake of this setup I assume that the cluster is not locked down from ‘impersonating’ the running Hadoop user. Your cluster may require different setup so your mileage may vary.

At the customer’s site is a MapR 5.2 development cluster that we are allowed to use to build models and transform big amounts of data. That MapR cluster will be our target to run (eventually) a pyspark session on. Please bear in mind that this is a locally installed cluster. There are a lot of problems to achieve the below when running on (f.i.) AWS due to the ‘reverse-nat-like’ network setup with this service provider. Believe me, I have tried different tunneling- and socks options but all to no avail. If someone can enlighten me, please do.

For various reasons we (still) use Spark 1.6.1. I have not tested the below with Spark 2 but I already know it will be different due to the absence of the spark assembly jar in the later versions.

So to summarize this is what you’ll need

  • An installed MapR 5.2 cluster with MEP 1.1 (Spark 1.6.1)
  • The shiny new Windows laptop that ‘was not to be’. I assume a 64-bit install.
  • Some patience if it does not run at first startup

Let’s go
As a first step, you are required to download a couple of files you’ll need to setup your client

  • The MapR 5.2 Windows client package at mapr-client-5.2.0.39122GA-1.amd64.zip. Installation of the MapR cluster is out of scope of this post.
  • An installed Python environment to run pyspark. Download it at for instance python-2.7.13.amd64.msi or python-2.6.6.amd64.msi. You’ll know how to install this one but make SURE the version you install matches the version on the cluster. 2.7 on your Windows box with 2.6 on the cluster will fail with ‘Python in worker has different version 2.6 than that in driver 2.7, PySpark cannot run with different minor versions‘. 2.6 is highly deprecated but sadly still in use on older CentOS 6 versions. You will have some trouble getting pip modules installed I believe.
  • The Spark 1.6.1 tarball without Hadoop located at (for instance spark-1.6.1-bin-without-hadoop.tgz
  • The latest Java 1.8 JDK from jdk8-downloads-2133151.html

To prevent Java serialVersionUID or NoClassDefFound errors, you’ll have to copy a few files from the cluster, they are:

  • /opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/yarn/hadoop-yarn-server-web-proxy-2.7.0-mapr-1607.jar
  • /opt/mapr/spark/spark-1.6.1/lib/datanucleus-api-jdo-4.2.1.jar
  • /opt/mapr/spark/spark-1.6.1/lib/datanucleus-core-4.1.6.jar
  • /opt/mapr/spark/spark-1.6.1/lib/datanucleus-rdbms-4.1.7.jar
  • /opt/mapr/spark/spark-1.6.1/lib/javax.servlet-api-3.1.0.jar
  • /opt/mapr/spark/spark-1.6.1/lib/spark-1.6.1-mapr-1611-yarn-shuffle.jar
  • /opt/mapr/spark/spark-1.6.1/lib/spark-assembly-1.6.1-mapr-1611-hadoop2.7.0-mapr-1602.jar
  • /opt/mapr/spark/spark-1.6.1/lib/spark-examples-1.6.1-mapr-1611-hadoop2.7.0-mapr-1602.jar

To access the Hive metastore on the cluster, download /opt/mapr/hive/hive-1.2/conf/hive-site.xml as well.

Have those ready! Before continuing any further you’ll have to set some environmental variables to get things up and running. You may set them using the horrible Windows GUI method, I prefer to create a command script that does just that. So create a directory c:\opt\mapr and a file called c:\opt\mapr\mapr-env.bat and paste the following contents in there.

@echo off
set JAVA_HOME=C:\Progra~1\Java\jdk1.8.0_121
set SPARK_HOME=C:\opt\mapr\spark\spark-1.6.1
set MAPR_HOME=C:\opt\mapr
set HADOOP_HOME=%MAPR_HOME%\hadoop\hadoop-2.7.0
set HADOOP_CONF_DIR=%HADOOP_HOME%\etc\hadoop
set YARN_CONF_DIR=%HADOOP_CONF_DIR%
set PATH=%JAVA_HOME%\bin;c:\Python27\;%SPARK_HOME%\bin;%HADOOP_HOME%\bin;%MAPR_HOME%\server;%PATH%
set HADOOP_USER_NAME=mapr
cmd /C hadoop classpath > tmpFile
set /p SPARK_DIST_CLASSPATH= < tmpFile
del tmpFile
set YARN_APPLICATION_CLASSPATH=%SPARK_DIST_CLASSPATH:\=/%
set YARN_APPLICATION_CLASSPATH=%YARN_APPLICATION_CLASSPATH:;=:%
set YARN_APPLICATION_CLASSPATH=%YARN_APPLICATION_CLASSPATH:c:/=/%

Adjust the JDK path in JAVA_HOME if necessary but make sure you use the 8.3 notation in stead of the one that uses the spaces (or install to a space-less location like c:\opt\jdk-1.8) I will not explain the above contents, they are needed to get both the Scala as well as the Python shells running.

Start installation

MapR Hadoop Client

  • Unzip the contents of mapr-client-5.2.0.39122GA-1.amd64.zip to c:\opt\mapr
  • Move the copied hadoop-yarn-server-web-proxy-2.7.0-mapr-1607.jar to C:\opt\mapr\hadoop\hadoop-2.7.0\share\hadoop\yarn
  • Run the mapr-env.bat script

Now configure the MapR Hadoop client by invoking

c:\opt\mapr\server\configure.bat -N my.cluster.com -c -C CLDB-HOST:7222 -HS HISTORYSERVER-HOST

The cluster name (-N), CLDB-HOST (-C) and HISTORYSERVER-HOST (-HS) are specific to your cluster setup! Note that the Windows configuration does not allow you to enter Zookeeper quorum information (-Z parameter) . If all goes well, no output will be given from the script.

You'll have to edit two files before you are ready to submit your first Hadoop-based YARN job. First, you'll have to tell mapreduce that you will be submitting cross-platform so edit C:\opt\mapr\hadoop\hadoop-2.7.0\etc\hadoop\mapred-site.xml and add

<property>
  <name>mapreduce.app-submission.cross-platform</name>
  <value>true</value>
</property>

Secondly, you'll have to tell the job that you'll be spoofing another user, so edit C:\opt\mapr\hadoop\hadoop-2.7.0\etc\hadoop\core-site.xml and add

<property>
  <name>hadoop.spoofed.user.uid</name>
  <value>5000</value>
</property>
<property>
  <name>hadoop.spoofed.user.gid</name>
  <value>5000</value>
</property>
<property>
  <name>hadoop.spoofed.user.username</name>
  <value>mapr</value>
</property>

If your cluster has different uid, gid or username, edit to your liking. Note that you are not restricted to the use of the mapr user. If there is another named user present, configure that one. If there is a user on the cluster that matches your Windows login name, you don'thave to edit core-site.xml. Please note that for a YARN job to succesfuly run, the user needs to be present on ALL nodes of the cluster with the same uid and gid. The Resource Manager will not accept your job if there are mismatches or you are trying to use an unknown (from Linux perspective) user.

After this installation and configuration you should be able to submit the teragen job bundled with the MapR Hadoop client:

hadoop jar %HADOOP_HOME%\share\hadoop\mapreduce\hadoop-mapreduce-examples-2.7.0-mapr-1607.jar teragen 10000 /tmp/teragen

Delete the directory first if it already exists. You should be able to use your just installed Windows client for that 😉

hadoop fs -ls /tmp
hadoop fs -rm -r /tmp/teragen

Do not continue until you have successfully ran the Teragen hadoop job!

Install Apache Spark

We will use the Spark distribution without Hadoop from the Apache downloads download site, but replace some of the jars with the ones from the cluster. Main reason for that is that the MapR distribution has it's own implementation of HDFS called MapR-FS and you'll need the classes to acces that file system. Same goes for the Hadoop client installed above, that's the reason you need the Spark without Hadoop tarball. So, do the following:

  • Create a directory called c:\opt\mapr\spark\
  • Uncompress spark-1.6.1-bin-without-hadoop.tgz to c:\opt\mapr\spark
  • Rename the directory spark-1.6.1-bin-without-hadoop to spark-1.6.1 so the Spark install will be in c:\opt\mapr\spark\spark-1.6.1
  • From the lib folder in spark-1.6.1 remove the files spark-assembly-1.6.1-hadoop2.2.0.jar, spark-1.6.1-yarn-shuffle.jar and spark-examples-1.6.1-hadoop2.2.0.jar
  • Move or copy the previously fetched jar files to the spark-1.6.1/lib folder : spark-assembly-1.6.1-mapr-1611-hadoop2.7.0-mapr-1602.jar, spark-1.6.1-mapr-1611-yarn-shuffle.jar, datanucleus-api-jdo-4.2.1.jar, datanucleus-core-4.1.6.jar and datanucleus-rdbms-4.1.7.jar
  • Optionally, but preferred, also copy javax.servlet-api-3.1.0.jar and spark-examples-1.6.1-mapr-1611-hadoop2.7.0-mapr-1602.jar to lib

To prevent the Spark jar being copied all the time, you may do the following:

hadoop fs -mkdir -p /user/mapr/apps/spark
hadoop fs -copyFromLocal %SPARK_HOME%\lib\spark-assembly-1.6.1-mapr-1611-hadoop2.7.0-mapr-1602.jar /user/mapr/apps/spark
hadoop fs -ls /user/mapr/apps/spark

On the cluster only
To run the pyspark example below, you will have to make sure numpy is installed on the cluster. You'll get an error soon enough if it's missing, so have your admin install it for you using:

sudo yum -y install numpy

On your Windows client you will need a couple of jars to get the databricks csv jar working. I have found out that passing --packages does not work with the approach in this post, but using the --jars option does. So these are the jars you need:

Download them to c:\opt\libext (or something like that). Next to that, you'll have to install a couple of Python modules as well:

python -m pip install -U pip setuptools
python -m pip install matplotlib
python -m pip install pandas
python -m pip install numpy

If you wish to access the Hive metastore from Spark, copy the hive-site.xml file to c:\opt\mapr\spark\spark-1.6.1\conf. Now you should be ready to start a Scala Spark shell:

spark-shell --master yarn-client --conf spark.yarn.jar=maprfs:///user/mapr/apps/spark/spark-assembly-1.6.1-mapr-1611-hadoop2.7.0-mapr-1602.jar --conf spark.hadoop.yarn.application.classpath=%YARN_APPLICATION_CLASSPATH%

To start a Python Spark shell use the following command:

pyspark --master yarn-client --conf spark.yarn.jar=maprfs:///user/mapr/apps/spark/spark-assembly-1.6.1-mapr-1611-hadoop2.7.0-mapr-1602.jar --conf spark.hadoop.yarn.application.classpath=%YARN_APPLICATION_CLASSPATH%

Now to run the pyspark ML example from MapR, download some files first and put them on MapR-FS:

Put them on MapR-FS in maprfs://tmp/

hadoop fs -copyFromLocal churn-bigml-80.csv /tmp/
hadoop fs -copyFromLocal churn-bigml-20.csv /tmp/

Now run pyspark and copy and paste the code below. It should run nicely on the cluster!

Start pyspark

pyspark --master yarn-client --conf spark.yarn.jar=maprfs:///user/mapr/apps/spark/spark-assembly-1.6.1-mapr-1611-hadoop2.7.0-mapr-1602.jar --conf spark.hadoop.yarn.application.classpath=%YARN_APPLICATION_CLASSPATH% --jars C:\opt\libext\spark-csv_2.10-1.3.0.jar,C:\opt\libext\univocity-parsers-1.5.1.jar,C:\opt\libext\commons-csv-1.1.jar

Code

CV_data = sqlContext.read.load('maprfs:///tmp/churn-bigml-80.csv',format='com.databricks.spark.csv',header='true',inferSchema='true')
final_test_data = sqlContext.read.load('maprfs:///tmp/churn-bigml-20.csv', format='com.databricks.spark.csv', header='true', inferSchema='true')						  

CV_data.cache()
final_test_data.cache()

CV_data.printSchema()
final_test_data.printSchema()

from pyspark.sql.types import DoubleType
from pyspark.sql.functions import UserDefinedFunction

binary_map = {'Yes':1.0, 'No':0.0, 'True':1.0, 'False':0.0}
toNum = UserDefinedFunction(lambda k: binary_map[k], DoubleType())

CV_data = CV_data.drop('State').drop('Area code') \
    .drop('Total day charge').drop('Total eve charge') \
    .drop('Total night charge').drop('Total intl charge') \
    .withColumn('Churn', toNum(CV_data['Churn'])) \
    .withColumn('International plan', toNum(CV_data['International plan'])) \
    .withColumn('Voice mail plan', toNum(CV_data['Voice mail plan'])).cache()

final_test_data = final_test_data.drop('State').drop('Area code') \
    .drop('Total day charge').drop('Total eve charge') \
    .drop('Total night charge').drop('Total intl charge') \
    .withColumn('Churn', toNum(final_test_data['Churn'])) \
    .withColumn('International plan', toNum(final_test_data['International plan'])) \
    .withColumn('Voice mail plan', toNum(final_test_data['Voice mail plan'])).cache()

from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import DecisionTree

def labelData(data):
    # label: row[end], features: row[0:end-1]
    return data.map(lambda row: LabeledPoint(row[-1], row[:-1]))

training_data,testing_data = labelData(CV_data).randomSplit([0.8, 0.2])

model = DecisionTree.trainClassifier(training_data, numClasses=2, maxDepth=2,
                                     categoricalFeaturesInfo={1:2, 2:2},
                                     impurity='gini', maxBins=32)

print model.toDebugString()

Conclusion and wrapup
So, allthough it's a bit of a hassle, you can get things up and running using a Windows client. The one thing I could not get up and running was the driver's web UI on port 4040 It seems to be working as long as the Spark contexts are not loaded yet, after that the web UI fails with a nullpointer exception. Luckily, you can get to a lot of info at the application master on the cluster (port 8088). Have fun!

Posted in Uncategorized.