Run a pyspark Windows client in YARN client mode on MapR 5.2.1 / MEP 3.0

Introduction

Every now and then there’s a challenge at hand. I recently came across one, luckily. Someone made the decision to hand out a Windows based laptop to a promising data scientist. All the data scientists I have met so far want to run their stuff on Linux or Mac, or at least something that gives them a native ‘Unix’ prompt. The laptop is hardware encrypted with a locked bios, so the chances on getting a dual boot running were slim. Having only 4GB of memory did not give us a feasible virtual machine option either. So, Windows it is. Funny thing was, I have always wanted to get a ‘guest-laptop’ to be able to run jobs on a remote cluster, without having to do log in to the cluster itself.

Of course, there are a couple of prerequisites to get this up and running. For instance, the cluster must be ‘open’ to the laptop; the latter must be able to connect to a variety of ports. For the sake of this setup I assume that the cluster is not locked down from ‘impersonating’ the running Hadoop user. Your cluster may require different setup so your mileage may vary.

At the customer’s site is a MapR 5.2.1 development cluster with MEP 3.0 that we are allowed to use to build models and transform big amounts of data. That MapR cluster will be our target to run (eventually) a pyspark session on. Please bear in mind that this is a locally installed cluster. There are a lot of problems to achieve the below when running on (f.i.) AWS due to the ‘reverse-nat-like’ network setup with this service provider. Believe me, I have tried different tunneling- and socks options but all to no avail. If someone can enlighten me, please do.

So to summarize this is what you’ll need:

  • An installed MapR 5.2.1 cluster with MEP 3.0 (Spark 2.1.0)
  • The shiny new Windows laptop that ‘was not to be’. I assume a 64-bit install.
  • Some patience if it does not run at first startup

Let’s go
As a first step, you are required to download a couple of files you’ll need to setup your client

  • The MapR 5.2.1 Windows client package at http://archive.mapr.com/releases/v5.2.1/. Installation of the MapR cluster itself is out of scope of this post.
  • An installed Python environment to run pyspark. Download it for instance at https://www.python.org/downloads/release/python-2713/. You’ll know how to install this one but make SURE the major version you install matches the version on the cluster. 2.7 on your Windows box with 2.6 on the cluster will fail with ‘Python in worker has different version 2.6 than that in driver 2.7, PySpark cannot run with different minor versions‘. 2.6 is highly deprecated but sadly still in use on older CentOS 6 versions. You will have some trouble getting the pip modules installed mentioned below on 2.6.
  • The Spark 2.1.0 tarball without Hadoop located at (for instance) http://spark.apache.org/downloads.html
  • The latest Java 1.8 JDK from http://www.oracle.com/technetwork/java/javase/downloads/

To prevent Java serialVersionUID or NoClassDefFound errors, you’ll have to copy a few files from the cluster, they are:

  • /opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/yarn/hadoop-yarn-server-web-proxy-2.7.0-mapr-1703.jar
  • /opt/mapr/spark/spark-2.1.0/jars/datanucleus-api-jdo-4.2.1.jar
  • /opt/mapr/spark/spark-2.1.0/jars/datanucleus-core-4.1.6.jar
  • /opt/mapr/spark/spark-2.1.0/jars/datanucleus-rdbms-4.1.7.jar
  • All spark-*-mapr-1703.jar files in /opt/mapr/spark/spark-2.1.0/jars (Just te be safe, copy them all)

Note that since Spark 2.0 there is no longer an assembly that you can copy and use. You may be tempted to create a zip holding all the jars and use Spark’s spark.yarn.archive parameter. I found that not to be working in a mixed (Windows/Linux) environment. To access the Hive metastore on the cluster, download /opt/mapr/hive/hive-1.2/conf/hive-site.xml as well.

Have those ready! Before continuing any further you’ll have to set some environmental variables to get things up and running. You may set them using the horrible Windows GUI method, I prefer to create a command script that does just that. So create a directory c:\opt\mapr and a file called c:\opt\mapr\mapr-env.bat and paste the following contents in there.

@echo off
set JAVA_HOME=C:\Progra~1\Java\jdk1.8.0_121
set MAPR_HOME=C:\opt\mapr
set SPARK_HOME=%MAPR_HOME%\spark\spark-2.1.0
set HADOOP_HOME=%MAPR_HOME%\hadoop\hadoop-2.7.0
set HADOOP_CONF_DIR=%HADOOP_HOME%\etc\hadoop
set YARN_CONF_DIR=%HADOOP_CONF_DIR%
set PATH=%JAVA_HOME%\bin;c:\Python27\;%SPARK_HOME%\bin;%HADOOP_HOME%\bin;%MAPR_HOME%\server;%PATH%
set HADOOP_USER_NAME=mapr
cmd /C hadoop classpath > tmpFile
set /p SPARK_DIST_CLASSPATH= < tmpFile
del tmpFile
set YARN_APPLICATION_CLASSPATH=%SPARK_DIST_CLASSPATH:\=/%
set YARN_APPLICATION_CLASSPATH=%YARN_APPLICATION_CLASSPATH:;=:%
set YARN_APPLICATION_CLASSPATH=%YARN_APPLICATION_CLASSPATH:c:/=/%

Adjust the JDK path in JAVA_HOME if necessary but make sure you use the 8.3 notation in stead of the one that uses the spaces (or install to a space-less location like c:\opt\jdk-1.8) I will not explain the above contents, they are needed to get both the Scala as well as the Python shells running. But a remark on the YARN_APPLICATION_CLASSPATH variable: this one is used on the server, not on your Windows machine like the other ones.

Start installation
Install MapR Hadoop Client

  • Unzip the contents of mapr-client-5.2.1.42646GA-1.amd64.zip to c:\opt\mapr
  • Move the copied hadoop-yarn-server-web-proxy-2.7.0-mapr-1703.jar to C:\opt\mapr\hadoop\hadoop-2.7.0\share\hadoop\yarn
  • Run the mapr-env.bat script

Now configure the MapR Hadoop client by invoking

c:\opt\mapr\server\configure.bat
  -N my.cluster.com -c
  -C CLDB-HOST:7222
  -HS HISTORYSERVER-HOST

For instance

c:\opt\mapr\server\configure.bat 
  -N mapr-521-230.whizzkit.nl -c
  -C mapr.whizzkit.nl:7222
  -HS mapr.whizzkit.nl

The cluster name (-N), CLDB-HOST (-C) and HISTORYSERVER-HOST (-HS) are specific to your cluster setup! Note that the Windows configuration does not allow you to enter Zookeeper quorum information (-Z parameter). If all goes well, no output will be given from the script.

You'll have to edit two files before you are ready to submit your first Hadoop-based YARN job. First, you'll have to tell mapreduce that you will be submitting cross-platform so edit C:\opt\mapr\hadoop\hadoop-2.7.0\etc\hadoop\mapred-site.xml and add

<property>
  <name>mapreduce.app-submission.cross-platform</name>
  <value>true</value>
</property>

Secondly, you'll have to tell the job that you'll be spoofing another user, so edit C:\opt\mapr\hadoop\hadoop-2.7.0\etc\hadoop\core-site.xml and add

<property>
  <name>hadoop.spoofed.user.uid</name>
  <value>5000</value>
</property>
<property>
  <name>hadoop.spoofed.user.gid</name>
  <value>5000</value>
</property>
<property>
  <name>hadoop.spoofed.user.username</name>
  <value>mapr</value>
</property>

If your cluster has different uid, gid or username, edit to your liking. Note that you are not restricted to the use of the mapr user. If there is another named user present, configure that one. If there is a user on the cluster that matches your Windows login name, you don't have to edit core-site.xml. Please note that for a YARN job to successfully run, the user needs to be present on ALL nodes of the cluster with the same uid and gid. The Resource Manager will not accept your job if there are mismatches or you are trying to use an unknown (from Linux perspective) user.

After this installation and configuration you should be able to submit the teragen job bundled with the MapR Hadoop client:

hadoop jar \
  %HADOOP_HOME%\share\hadoop\mapreduce\hadoop-mapreduce-examples-2.7.0-mapr-1703.jar \
  teragen 10000 /tmp/teragen

Delete the directory first if it already exists. You should be able to use your just installed Windows client for that 😉

hadoop fs -ls /tmp
hadoop fs -rm -r -skipTrash /tmp/teragen

Do not continue until you have successfully ran the Teragen hadoop job!

Install Apache Spark

We will use the Spark distribution without Hadoop from the Apache downloads download site, but replace some of the jars with the ones from the cluster. Main reason for that is that the MapR distribution has it's own implementation of HDFS called MapR-FS and you'll need the jars provided by MapR to acces that file system. Same goes for the Hadoop client installed above, that's the reason you need the Spark without Hadoop tarball. So, do the following:

  • Create a directory called c:\opt\mapr\spark\
  • Uncompress spark-2.1.0-bin-without-hadoop.tgz to c:\opt\mapr\spark
  • Rename the directory spark-2.1.0-bin-without-hadoop to spark-2.1.0 so the Spark install will be in c:\opt\mapr\spark\spark-2.1.0
  • From the jars folder in spark-2.1.0 remove all spark-*_2.11-2.1.0.jar files
  • Move or copy the previously fetched jar spark- and datanucleus jar files to the spark-2.1.0/jars folder
  • If you wish to access the Hive metastore on the cluster from Spark, copy the previously downloaded hive-site.xml file to c:\opt\mapr\spark\spark-2.1.0\conf.

The first Spark test will be running the spark-shell in YARN client mode. You should be greeted by the familiar ASCII-art:

spark-shell --master yarn --deploy-mode client
  --conf spark.hadoop.yarn.application.classpath=%YARN_APPLICATION_CLASSPATH%

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0-mapr-1703
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_121)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

You may opt to perform an extra optimization when starting Spark jobs. You may have noticed the Neither spark.yarn.jars nor spark.yarn.archive is set message when starting the shell. This causes your Spark driver to copy all jars to the cluster. To prevent this, you may do the following:

# Create a directory to hold the Spark jars
hadoop fs -mkdir -p /user/mapr/apps/spark/spark-2.1.0
# Copy all jars from the Window machine to MapR-FS
hadoop fs -copyFromLocal %SPARK_HOME%\jars\*jar /user/mapr/apps/spark/spark-2.1.0
# Check if it succeeded, all jars (around 105) should be listed
hadoop fs -ls /user/mapr/apps/spark/spark-2.1.0

Now you are able to start the spark-shell in a slightly different way, but quicker. Note that there will still be created and uploaded a small zip file called __spark_conf__.zip that contains a snapshot of your Hadoop and Spark config, as well as jars that the spark-shell seems fit to upload. They are located in /user/mapr/.sparkStaging/application__.

spark-shell --master yarn --deploy-mode client
  --conf spark.yarn.jars=maprfs:///user/mapr/apps/spark/spark-2.1.0/*
  --conf spark.hadoop.yarn.application.classpath=%YARN_APPLICATION_CLASSPATH%

scala> val count = sc.parallelize(1 to 100).filter { _ =>
     |   val x = math.random
     |   val y = math.random
     |   x*x + y*y < 1
     | }.count()
count: Long = 76

scala> println(s"Pi is roughly ${4.0 * count / 100}")
Pi is roughly 3.04

Run pyspark
Actions to perform on the cluster only
To run the pyspark example below, you will have to make sure numpy is installed on the cluster. You'll get an error soon enough if it's missing, so have your admin install it for you using:

sudo yum -y install numpy

Actions to perform on you Windows client
On your Windows client you will need a couple of jars to get the databricks csv jar working. I have found out that passing --packages does not work with the approach in this post, but using the --jars option does. So these are the jars you need:

Download them to c:\opt\libext (or something like that). Next to that, you'll have to install a couple of Python modules as well:

python -m pip install -U pip setuptools
python -m pip install matplotlib
python -m pip install pandas
python -m pip install numpy

I have copied and modified a small part (the actual training a model, not the evaluation) of the blogpost pyspark ML example from MapR to reflect changes needed for Spark 2.1.0. But first download a file you need and put it on MapR-FS:

churn-bigml-80.csv

Put it on MapR-FS in maprfs:///tmp/

hadoop fs -copyFromLocal -f churn-bigml-80.csv /tmp/

You are now ready to start a Python Spark shell by using the command. You may notice the similarities between the used conf parameters in spark-shell and pyspark.

pyspark --master yarn --deploy-mode client
  --conf spark.yarn.jars=maprfs:///user/mapr/apps/spark/spark-2.1.0/*
  --conf spark.hadoop.yarn.application.classpath=%YARN_APPLICATION_CLASSPATH%
  --jars C:\opt\libext\spark-csv_2.10-1.3.0.jar,C:\opt\libext\univocity-parsers-1.5.1.jar,C:\opt\libext\commons-csv-1.1.jar

You may copy and paste the code below to check if you have succeeded in following along.

from pyspark.sql.types import DoubleType
from pyspark.sql.functions import UserDefinedFunction
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import DecisionTree

binary_map = {'Yes':1.0, 'No':0.0, True:1.0, False:0.0, 'True':1.0, 'False':0.0}
toBinary = UserDefinedFunction(lambda k: binary_map[k], DoubleType())

def labelData(data):
    return data.rdd.map(lambda row: LabeledPoint(row[-1], row[:-1]))

churn_data = sqlContext \
    .read \
    .load('maprfs:///tmp/churn-bigml-80.csv',format='com.databricks.spark.csv',header='true',inferSchema='true')

churn_data = churn_data.drop('State').drop('Area code') \
    .drop('Total day charge').drop('Total eve charge') \
    .drop('Total night charge').drop('Total intl charge') \
    .withColumn('Churn', toBinary(churn_data['Churn'])) \
    .withColumn('International plan', toBinary(churn_data['International plan'])) \
    .withColumn('Voice mail plan', toBinary(churn_data['Voice mail plan']))

training_data, testing_data = labelData(churn_data).randomSplit([0.8, 0.2])

decisiontree_model = DecisionTree.trainClassifier(
    training_data, 
    numClasses=2,
    maxDepth=2,
    categoricalFeaturesInfo={1:2, 2:2},
    impurity='gini',
    maxBins=32
)

print decisiontree_model.toDebugString()

It will finally print out something like:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.0-mapr-1703
      /_/

Using Python version 2.7.13 (v2.7.13:a06454b1afa1, Dec 17 2016 20:53:40)
SparkSession available as 'spark'.
...
DecisionTreeModel classifier of depth 2 with 7 nodes
  If (feature 12 <= 3.0)
   If (feature 4 <= 262.8)
    Predict: 0.0
   Else (feature 4 > 262.8)
    Predict: 1.0
  Else (feature 12 > 3.0)
   If (feature 4 <= 153.4)
    Predict: 1.0
   Else (feature 4 > 153.4)
    Predict: 0.0

Conclusion and wrapup
So, allthough it's a bit of a hassle, you can get Spark 2.1.0 up and running using a Windows client. What we achieved is a horizontally scalable environment for the data scientists to work with, without being tied to the vertical limitations of their own laptop: running your job on a 64-core, 48GB, 3-node cluster with SAS drives seems better to me! (Remember this is a development cluster!)

As a last note: you may have noticed that I am not using one of the most distinctive features of MapR; being able to just copy files from and to MapR-FS by using MapR's NFS server. That's not possible in this setup, as we are (and that was the purpose) not running the spark-shell or pyspark on a cluster node. Have fun, feel free to comment and ask questions!

Posted in Uncategorized.