Installing SAP Spark Controller 2.2.0 on MapR MEP 3.0.1

© Notice: I copied this post’s image from one of SAP’s Bob tutorials!
The Challenge
At one of my customers we need to install the SAP Spark Controller to be able to connect our MapR cluster to HANA. If you arrived at this post, you will know why this is a great feature for a lot of use cases. But it also lead to a small challenge which may be clear from the URL’s below:

In short: the 2.2.0 Spark Controller needs Spark 1.6 but MapR MEP3 provides Spark 2.1. Trust me, I have tried to get it running but due to the absence of the Spark assembly in 2.x versions and the absence of Akka stuff I think it’s nearly impossible to get it running unless SAP does a recompile. Luckily, in YARN, a Spark Job actually is ‘just another job’ if you just provide the Spark assembly when submitting. Because of this, you are still able to set up the Spark Controller while not having to hack too much and leave the cluster messy…. By far!

A few words of warning and a prerequisite

  • Because I like to install things at minimum, sometimes I cut corners or use a different approach than stated in the fine manuals. This may lead to situations that are unsupported by one or the other vendor. This goes in particular for the installation of the Spark Controller, for which I didn’t use the rpm command to install, but extracted the rpm and copied the files over. I did this to be easily be able to switch between Spark Controller versions. In a production environment, nothing stops you from using the normal rpm installation approach
  • With regards to MapR, I download one rpm from their repository to be able to extract just the jar file you need, no more. This prevents a full Spark installation having to be hanging around on your node(s)
  • You will need a MapR 5.2 based cluster with MEP 3.0.1 installed. I will not cover the installation of that

Actions to be performed as root
I upgraded my old MEP 1.1 to MEP 3.0.1 using yum -y update (after changing /etc/yum.repos.d/mapr_ecosystem.repo to reflect the base url to be http://package.mapr.com/releases/MEP/MEP-3.0.1/redhat), but it seemed that this upgrade left my YARN installation unusable. To fix, I had to restore the rights on the container-executor executable as follows:

chmod -R 6050 /opt/mapr/hadoop/hadoop-2.7.0/bin/container-executor 
chown root.mapr /opt/mapr/hadoop/hadoop-2.7.0/bin/container-executor 

From this point on it’s a straightforward install from the downloaded zip file at the SAP download site. Most of the comments will be in the code blocks.

# Add hanaes user and groups
# Note that it MUST be hanaes because it seems to be hardcoded in the jar's
groupadd sapsys
useradd -g sapsys -d /home/hanaes -s /bin/bash hanaes

# Create some directories we need. These are the default when you use rpm to install the controller
# You are able to change all of these but then you'll have to configure much more later on
mkdir -p /var/log/hanaes/ /var/run/hanaes/ /usr/sap/spark/

# Extract installation rpm from the 2.2.0 Spark Controller
unzip HANASPARKCTRL02_0-70002101.zip sap.hana.spark.controller-2.2.0-1.noarch.rpm
rpm2cpio sap.hana.spark.controller-2.2.0-1.noarch.rpm|cpio -idmv

# Now remove (be careful if you have an install present!) a possibly existing installation
rm -rf /usr/sap/spark/controller
# Move the extracted files to the target directory
mv usr/sap/spark/controller/ /usr/sap/spark/

# I remove this template because I do not want it to be interfering in any way with the installation
rm -f /usr/sap/spark/controller/conf/hanaes-template.xml

# Now the 'old' mapr-spark package rpm package is downloaded and the assembly jar we need extracted
wget http://archive.mapr.com/releases/MEP/MEP-1.1/redhat/mapr-spark-1.6.1.201707241448-1.noarch.rpm
mkdir -p ./opt/mapr/spark/spark-1.6.1/lib/
# The below command leaves us with the assembly in ./opt/mapr/spark/spark-1.6.1/lib/
rpm2cpio mapr-spark-1.6.1.201707241448-1.noarch.rpm| cpio -icv "*spark-assembly-1.6.1-mapr-1707-hadoop2.7.0-mapr-1602.jar"

# Remove possibly present old jars from Hive or Spark.
# We need fresh copies from the /opt/mapr/hive directory and the extracted rpm file
rm -f /usr/sap/spark/controller/lib/spark-assembly*jar /usr/sap/spark/controller/lib/datanucleus-*.jar /usr/sap/spark/controller/lib/bonecp-*.jar

# Copy needed files to lib dir of the Spark Controller installation
# Because the controller uses Hive as the metadata provider, these libs are needed to access the Hive Thrift Server 
# (as per Spark Controller installation instructions)
cp /opt/mapr/hive/hive-2.1/lib/datanucleus-*.jar /opt/mapr/hive/hive-2.1/lib/bonecp-*.jar ./opt/mapr/spark/spark-1.6.1/lib/spark-assembly-1.6.1-mapr-1707-hadoop2.7.0-mapr-1602.jar /usr/sap/spark/controller/lib/

# Set ownership of files needed for hanaes
chown -R hanaes.sapsys /var/log/hanaes/ /var/run/hanaes/ /usr/sap/spark

# Make directory on HDFS for hanaes user, mostly to be able to stage the application to YARN
hadoop fs -mkdir -p /user/hanaes
hadoop fs -chown hanaes:sapsys /user/hanaes

# Now allow the hanaes user to actually submit an app to YARN. Add the below:
vi /opt/mapr/hadoop/hadoop-2.7.0/etc/hadoop/core-site.xml
  <property>
    <name>hadoop.proxyuser.hanaes.hosts</name>
    <value>*</value>
  </property>
  <property>
    <name>hadoop.proxyuser.hanaes.groups</name>
    <value>*</value>
  </property>

# You might need to tweak values like yarn.nodemanager.resource.memory-mb, but I'll leave that to the reader.

# Restart node- and resourcemanager to reflect the change above
maprcli node services resourcemanager restart -nodes <your cluster nodes with resource managers>
maprcli node services nodemanager restart -nodes <your cluster nodes with node managers>

Actions to be performed as the hanaes user
Once the root part is done, the hanaes user is able to configure the controller and start it. Most important part is the configuration part. Pay attention especially to location of the java home directory stored in the JAVA_HOME parameter. This will for sure be different on your install.

# Become hanaes
su - hanaes

# Configure Spark Controller
cd /usr/sap/spark/controller/conf/

# Contents of log4j.properties. This may be too chatty in production, you may restrict the rootCategory INFO to WARN or ERROR
# The INFO in the spark.sql packages will log the SQL executed and requested, which may come in handy when looking for errors.

log4j.rootCategory=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
log4j.logger.com.sap=INFO
log4j.logger.org.apache.spark.sql.hana=INFO
log4j.logger.org.apache.spark.sql.hive.hana=INFO

# Contents of hana_hadoop-env.sh. These are the bare minimum lines you will need to get started on MapR

#!/bin/bash
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.141-1.b16.el7_3.x86_64/
export HANA_SPARK_ASSEMBLY_JAR=/usr/sap/spark/controller/lib/spark-assembly-1.6.1-mapr-1707-hadoop2.7.0-mapr-1602.jar
export HADOOP_CONF_DIR=/opt/mapr/hadoop/hadoop-2.7.0/etc/hadoop/
export HADOOP_CLASSPATH=`hadoop classpath`
export HIVE_CONF_DIR=/opt/mapr/hive/hive-2.1/conf

# Contents of hanaes-site.xml. Tweak to your need, especially the executor instances and memory
# The below is for my one node testing VM

<?xml version="1.0"?>
<configuration>
  <property>
    <name>sap.hana.es.server.port</name>
    <value>7860</value>
  </property>
  <property>
    <name>spark.executor.memory</name>
    <value>1g</value>
  </property>
  <property>
    <name>spark.executor.instances</name>
    <value>1</value>
  </property>
  <property>
    <name>spark.executor.cores</name>
    <value>1</value>
  </property>
  <property>
    <name>sap.hana.enable.compression</name>
    <value>true</value>
  </property>
</configuration>

cd ../bin/

# If you are on a memory restricted node, you might want to change the ES_HEAPSIZE
# This got me baffeled for a while when using this latest Spark Controller version
# Main but undescriptive error I got was something like network.Server did not start, check YARN logs.
# But the driver did not start, so there was nothing in YARN ;)

vi hanaes (first line after the shebang line)
export HANA_ES_HEAPSIZE=2048

# I think you WILL need the large amount of memory if you want to use the caching feature but I'm not too sure about that.

# Now start the controller and cross your fingers
# I like to clear the log first when setting this up. In production situations this may not be handy.
echo > /var/log/hanaes/hana_controller.log && ./hanaes restart

# And monitor
tail -f /var/log/hanaes/hana_controller.log

The result
Well, that wasn’t too hard! Spark Controller is up and running and accessible from HANA Studio.

[hanaes@mapr ~]$ cat /var/log/hanaes/hana_controller.log 
17/08/17 12:18:00 INFO Server: Starting Spark Controller
17/08/17 12:18:02 INFO CommandRouter$$anon$1: Added JAR /usr/sap/spark/controller/lib/controller.common-2.1.1.jar at ...
17/08/17 12:18:02 INFO CommandRouter$$anon$1: Added JAR /usr/sap/spark/controller/lib/spark1_6/spark.shims_1.6.2-2.1.1.jar at ...
17/08/17 12:18:02 INFO CommandRouter$$anon$1: Added JAR /usr/sap/spark/controller/lib/controller.core-2.1.1.jar at ...
17/08/17 12:18:02 INFO ZooKeeper: Client environment:java.class.path=/usr/sap/spark/controller/lib/spark-assembly-1.6.1-mapr-1707-hadoop2.7.0-mapr-1602.jar: ...
17/08/17 12:18:03 INFO Client: Uploading resource file:/usr/sap/spark/controller/lib/spark-assembly-1.6.1-mapr-1707-hadoop2.7.0-mapr-1602.jar -> maprfs:/...
17/08/17 12:18:03 INFO Client: Uploading resource file:/usr/sap/spark/controller/lib/spark.extension-2.1.1.jar -> maprfs:/...
17/08/17 12:18:03 INFO Client: Uploading resource file:/usr/sap/spark/controller/lib/controller.common-2.1.1.jar -> maprfs:/...
17/08/17 12:18:10 INFO CommandRouterDefault: Running Spark Controller on Spark 1.6.1 with Application Id application_1502967355788_0022
17/08/17 12:18:10 INFO CommandRouterDefault: Connecting to Hive MetaStore!
17/08/17 12:18:11 INFO HanaHiveSQLContext: Initializing execution hive, version 1.2.1
17/08/17 12:18:17 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
17/08/17 12:18:18 INFO HanaHiveSQLContext: Initializing HiveMetastoreConnection version 1.2.1 using Spark classes.
17/08/17 12:18:18 INFO metastore: Trying to connect to metastore with URI thrift://mapr.whizzkit.nl:9083
17/08/17 12:18:18 INFO metastore: Connected to metastore.
17/08/17 12:18:18 INFO CommandRouterDefault: Server started

[root@mapr ~]# yarn application -list
17/08/17 15:44:58 INFO client.MapRZKBasedRMFailoverProxyProvider: Updated RM address to mapr.whizzkit.nl/10.80.91.143:8032
Total number of applications (application-types: [] and states: [SUBMITTED, ACCEPTED, RUNNING]):1
                Application-Id Application-Name                   Application-Type User   Queue       State   Final-State Progress Tracking-URL
application_1502967355788_0022 SAP HANA Spark Controller:sparksql SPARK            hanaes root.hanaes RUNNING UNDEFINED   10%      http://mapr.whizzkit.nl:4040

Have fun installing!

Run a pyspark Windows client in YARN client mode on MapR 5.2.1 / MEP 3.0

Introduction

Every now and then there’s a challenge at hand. I recently came across one, luckily. Someone made the decision to hand out a Windows based laptop to a promising data scientist. All the data scientists I have met so far want to run their stuff on Linux or Mac, or at least something that gives them a native ‘Unix’ prompt. The laptop is hardware encrypted with a locked bios, so the chances on getting a dual boot running were slim. Having only 4GB of memory did not give us a feasible virtual machine option either. So, Windows it is. Funny thing was, I have always wanted to get a ‘guest-laptop’ to be able to run jobs on a remote cluster, without having to do log in to the cluster itself.

Of course, there are a couple of prerequisites to get this up and running. For instance, the cluster must be ‘open’ to the laptop; the latter must be able to connect to a variety of ports. For the sake of this setup I assume that the cluster is not locked down from ‘impersonating’ the running Hadoop user. Your cluster may require different setup so your mileage may vary.

At the customer’s site is a MapR 5.2.1 development cluster with MEP 3.0 that we are allowed to use to build models and transform big amounts of data. That MapR cluster will be our target to run (eventually) a pyspark session on. Please bear in mind that this is a locally installed cluster. There are a lot of problems to achieve the below when running on (f.i.) AWS due to the ‘reverse-nat-like’ network setup with this service provider. Believe me, I have tried different tunneling- and socks options but all to no avail. If someone can enlighten me, please do.

So to summarize this is what you’ll need:

  • An installed MapR 5.2.1 cluster with MEP 3.0 (Spark 2.1.0)
  • The shiny new Windows laptop that ‘was not to be’. I assume a 64-bit install.
  • Some patience if it does not run at first startup

Let’s go
As a first step, you are required to download a couple of files you’ll need to setup your client

  • The MapR 5.2.1 Windows client package at http://archive.mapr.com/releases/v5.2.1/. Installation of the MapR cluster itself is out of scope of this post.
  • An installed Python environment to run pyspark. Download it for instance at https://www.python.org/downloads/release/python-2713/. You’ll know how to install this one but make SURE the major version you install matches the version on the cluster. 2.7 on your Windows box with 2.6 on the cluster will fail with ‘Python in worker has different version 2.6 than that in driver 2.7, PySpark cannot run with different minor versions‘. 2.6 is highly deprecated but sadly still in use on older CentOS 6 versions. You will have some trouble getting the pip modules installed mentioned below on 2.6.
  • The Spark 2.1.0 tarball without Hadoop located at (for instance) http://spark.apache.org/downloads.html
  • The latest Java 1.8 JDK from http://www.oracle.com/technetwork/java/javase/downloads/

To prevent Java serialVersionUID or NoClassDefFound errors, you’ll have to copy a few files from the cluster, they are:

  • /opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/yarn/hadoop-yarn-server-web-proxy-2.7.0-mapr-1703.jar
  • /opt/mapr/spark/spark-2.1.0/jars/datanucleus-api-jdo-4.2.1.jar
  • /opt/mapr/spark/spark-2.1.0/jars/datanucleus-core-4.1.6.jar
  • /opt/mapr/spark/spark-2.1.0/jars/datanucleus-rdbms-4.1.7.jar
  • All spark-*-mapr-1703.jar files in /opt/mapr/spark/spark-2.1.0/jars (Just te be safe, copy them all)

Note that since Spark 2.0 there is no longer an assembly that you can copy and use. You may be tempted to create a zip holding all the jars and use Spark’s spark.yarn.archive parameter. I found that not to be working in a mixed (Windows/Linux) environment. To access the Hive metastore on the cluster, download /opt/mapr/hive/hive-1.2/conf/hive-site.xml as well.

Have those ready! Before continuing any further you’ll have to set some environmental variables to get things up and running. You may set them using the horrible Windows GUI method, I prefer to create a command script that does just that. So create a directory c:\opt\mapr and a file called c:\opt\mapr\mapr-env.bat and paste the following contents in there.

@echo off
set JAVA_HOME=C:\Progra~1\Java\jdk1.8.0_121
set MAPR_HOME=C:\opt\mapr
set SPARK_HOME=%MAPR_HOME%\spark\spark-2.1.0
set HADOOP_HOME=%MAPR_HOME%\hadoop\hadoop-2.7.0
set HADOOP_CONF_DIR=%HADOOP_HOME%\etc\hadoop
set YARN_CONF_DIR=%HADOOP_CONF_DIR%
set PATH=%JAVA_HOME%\bin;c:\Python27\;%SPARK_HOME%\bin;%HADOOP_HOME%\bin;%MAPR_HOME%\server;%PATH%
set HADOOP_USER_NAME=mapr
cmd /C hadoop classpath > tmpFile
set /p SPARK_DIST_CLASSPATH= < tmpFile
del tmpFile
set YARN_APPLICATION_CLASSPATH=%SPARK_DIST_CLASSPATH:\=/%
set YARN_APPLICATION_CLASSPATH=%YARN_APPLICATION_CLASSPATH:;=:%
set YARN_APPLICATION_CLASSPATH=%YARN_APPLICATION_CLASSPATH:c:/=/%

Adjust the JDK path in JAVA_HOME if necessary but make sure you use the 8.3 notation in stead of the one that uses the spaces (or install to a space-less location like c:\opt\jdk-1.8) I will not explain the above contents, they are needed to get both the Scala as well as the Python shells running. But a remark on the YARN_APPLICATION_CLASSPATH variable: this one is used on the server, not on your Windows machine like the other ones.

Start installation
Install MapR Hadoop Client

  • Unzip the contents of mapr-client-5.2.1.42646GA-1.amd64.zip to c:\opt\mapr
  • Move the copied hadoop-yarn-server-web-proxy-2.7.0-mapr-1703.jar to C:\opt\mapr\hadoop\hadoop-2.7.0\share\hadoop\yarn
  • Run the mapr-env.bat script

Now configure the MapR Hadoop client by invoking

c:\opt\mapr\server\configure.bat
  -N my.cluster.com -c
  -C CLDB-HOST:7222
  -HS HISTORYSERVER-HOST

For instance

c:\opt\mapr\server\configure.bat 
  -N mapr-521-230.whizzkit.nl -c
  -C mapr.whizzkit.nl:7222
  -HS mapr.whizzkit.nl

The cluster name (-N), CLDB-HOST (-C) and HISTORYSERVER-HOST (-HS) are specific to your cluster setup! Note that the Windows configuration does not allow you to enter Zookeeper quorum information (-Z parameter). If all goes well, no output will be given from the script.

You'll have to edit two files before you are ready to submit your first Hadoop-based YARN job. First, you'll have to tell mapreduce that you will be submitting cross-platform so edit C:\opt\mapr\hadoop\hadoop-2.7.0\etc\hadoop\mapred-site.xml and add

<property>
  <name>mapreduce.app-submission.cross-platform</name>
  <value>true</value>
</property>

Secondly, you'll have to tell the job that you'll be spoofing another user, so edit C:\opt\mapr\hadoop\hadoop-2.7.0\etc\hadoop\core-site.xml and add

<property>
  <name>hadoop.spoofed.user.uid</name>
  <value>5000</value>
</property>
<property>
  <name>hadoop.spoofed.user.gid</name>
  <value>5000</value>
</property>
<property>
  <name>hadoop.spoofed.user.username</name>
  <value>mapr</value>
</property>

If your cluster has different uid, gid or username, edit to your liking. Note that you are not restricted to the use of the mapr user. If there is another named user present, configure that one. If there is a user on the cluster that matches your Windows login name, you don't have to edit core-site.xml. Please note that for a YARN job to successfully run, the user needs to be present on ALL nodes of the cluster with the same uid and gid. The Resource Manager will not accept your job if there are mismatches or you are trying to use an unknown (from Linux perspective) user.

After this installation and configuration you should be able to submit the teragen job bundled with the MapR Hadoop client:

hadoop jar \
  %HADOOP_HOME%\share\hadoop\mapreduce\hadoop-mapreduce-examples-2.7.0-mapr-1703.jar \
  teragen 10000 /tmp/teragen

Delete the directory first if it already exists. You should be able to use your just installed Windows client for that 😉

hadoop fs -ls /tmp
hadoop fs -rm -r -skipTrash /tmp/teragen

Do not continue until you have successfully ran the Teragen hadoop job!

Install Apache Spark

We will use the Spark distribution without Hadoop from the Apache downloads download site, but replace some of the jars with the ones from the cluster. Main reason for that is that the MapR distribution has it's own implementation of HDFS called MapR-FS and you'll need the jars provided by MapR to acces that file system. Same goes for the Hadoop client installed above, that's the reason you need the Spark without Hadoop tarball. So, do the following:

  • Create a directory called c:\opt\mapr\spark\
  • Uncompress spark-2.1.0-bin-without-hadoop.tgz to c:\opt\mapr\spark
  • Rename the directory spark-2.1.0-bin-without-hadoop to spark-2.1.0 so the Spark install will be in c:\opt\mapr\spark\spark-2.1.0
  • From the jars folder in spark-2.1.0 remove all spark-*_2.11-2.1.0.jar files
  • Move or copy the previously fetched jar spark- and datanucleus jar files to the spark-2.1.0/jars folder
  • If you wish to access the Hive metastore on the cluster from Spark, copy the previously downloaded hive-site.xml file to c:\opt\mapr\spark\spark-2.1.0\conf.

The first Spark test will be running the spark-shell in YARN client mode. You should be greeted by the familiar ASCII-art:

spark-shell --master yarn --deploy-mode client
  --conf spark.hadoop.yarn.application.classpath=%YARN_APPLICATION_CLASSPATH%

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0-mapr-1703
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_121)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

You may opt to perform an extra optimization when starting Spark jobs. You may have noticed the Neither spark.yarn.jars nor spark.yarn.archive is set message when starting the shell. This causes your Spark driver to copy all jars to the cluster. To prevent this, you may do the following:

# Create a directory to hold the Spark jars
hadoop fs -mkdir -p /user/mapr/apps/spark/spark-2.1.0
# Copy all jars from the Window machine to MapR-FS
hadoop fs -copyFromLocal %SPARK_HOME%\jars\*jar /user/mapr/apps/spark/spark-2.1.0
# Check if it succeeded, all jars (around 105) should be listed
hadoop fs -ls /user/mapr/apps/spark/spark-2.1.0

Now you are able to start the spark-shell in a slightly different way, but quicker. Note that there will still be created and uploaded a small zip file called __spark_conf__.zip that contains a snapshot of your Hadoop and Spark config, as well as jars that the spark-shell seems fit to upload. They are located in /user/mapr/.sparkStaging/application__.

spark-shell --master yarn --deploy-mode client
  --conf spark.yarn.jars=maprfs:///user/mapr/apps/spark/spark-2.1.0/*
  --conf spark.hadoop.yarn.application.classpath=%YARN_APPLICATION_CLASSPATH%

scala> val count = sc.parallelize(1 to 100).filter { _ =>
     |   val x = math.random
     |   val y = math.random
     |   x*x + y*y < 1
     | }.count()
count: Long = 76

scala> println(s"Pi is roughly ${4.0 * count / 100}")
Pi is roughly 3.04

Run pyspark
Actions to perform on the cluster only
To run the pyspark example below, you will have to make sure numpy is installed on the cluster. You'll get an error soon enough if it's missing, so have your admin install it for you using:

sudo yum -y install numpy

Actions to perform on you Windows client
On your Windows client you will need a couple of jars to get the databricks csv jar working. I have found out that passing --packages does not work with the approach in this post, but using the --jars option does. So these are the jars you need:

Download them to c:\opt\libext (or something like that). Next to that, you'll have to install a couple of Python modules as well:

python -m pip install -U pip setuptools
python -m pip install matplotlib
python -m pip install pandas
python -m pip install numpy

I have copied and modified a small part (the actual training a model, not the evaluation) of the blogpost pyspark ML example from MapR to reflect changes needed for Spark 2.1.0. But first download a file you need and put it on MapR-FS:

churn-bigml-80.csv

Put it on MapR-FS in maprfs:///tmp/

hadoop fs -copyFromLocal -f churn-bigml-80.csv /tmp/

You are now ready to start a Python Spark shell by using the command. You may notice the similarities between the used conf parameters in spark-shell and pyspark.

pyspark --master yarn --deploy-mode client
  --conf spark.yarn.jars=maprfs:///user/mapr/apps/spark/spark-2.1.0/*
  --conf spark.hadoop.yarn.application.classpath=%YARN_APPLICATION_CLASSPATH%
  --jars C:\opt\libext\spark-csv_2.10-1.3.0.jar,C:\opt\libext\univocity-parsers-1.5.1.jar,C:\opt\libext\commons-csv-1.1.jar

You may copy and paste the code below to check if you have succeeded in following along.

from pyspark.sql.types import DoubleType
from pyspark.sql.functions import UserDefinedFunction
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import DecisionTree

binary_map = {'Yes':1.0, 'No':0.0, True:1.0, False:0.0, 'True':1.0, 'False':0.0}
toBinary = UserDefinedFunction(lambda k: binary_map[k], DoubleType())

def labelData(data):
    return data.rdd.map(lambda row: LabeledPoint(row[-1], row[:-1]))

churn_data = sqlContext \
    .read \
    .load('maprfs:///tmp/churn-bigml-80.csv',format='com.databricks.spark.csv',header='true',inferSchema='true')

churn_data = churn_data.drop('State').drop('Area code') \
    .drop('Total day charge').drop('Total eve charge') \
    .drop('Total night charge').drop('Total intl charge') \
    .withColumn('Churn', toBinary(churn_data['Churn'])) \
    .withColumn('International plan', toBinary(churn_data['International plan'])) \
    .withColumn('Voice mail plan', toBinary(churn_data['Voice mail plan']))

training_data, testing_data = labelData(churn_data).randomSplit([0.8, 0.2])

decisiontree_model = DecisionTree.trainClassifier(
    training_data, 
    numClasses=2,
    maxDepth=2,
    categoricalFeaturesInfo={1:2, 2:2},
    impurity='gini',
    maxBins=32
)

print decisiontree_model.toDebugString()

It will finally print out something like:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.0-mapr-1703
      /_/

Using Python version 2.7.13 (v2.7.13:a06454b1afa1, Dec 17 2016 20:53:40)
SparkSession available as 'spark'.
...
DecisionTreeModel classifier of depth 2 with 7 nodes
  If (feature 12 <= 3.0)
   If (feature 4 <= 262.8)
    Predict: 0.0
   Else (feature 4 > 262.8)
    Predict: 1.0
  Else (feature 12 > 3.0)
   If (feature 4 <= 153.4)
    Predict: 1.0
   Else (feature 4 > 153.4)
    Predict: 0.0

Conclusion and wrapup
So, allthough it's a bit of a hassle, you can get Spark 2.1.0 up and running using a Windows client. What we achieved is a horizontally scalable environment for the data scientists to work with, without being tied to the vertical limitations of their own laptop: running your job on a 64-core, 48GB, 3-node cluster with SAS drives seems better to me! (Remember this is a development cluster!)

As a last note: you may have noticed that I am not using one of the most distinctive features of MapR; being able to just copy files from and to MapR-FS by using MapR's NFS server. That's not possible in this setup, as we are (and that was the purpose) not running the spark-shell or pyspark on a cluster node. Have fun, feel free to comment and ask questions!

Running Spark in yarn-client mode on a MapR cluster with your Windows based laptop

Introduction

Every now and then there’s a challenge at hand. I recently came across one, luckily. Someone made the decision to hand out a Windows based laptop to a promising data scientist. All the data scientists I have met so far want to run their stuff on Linux or Mac, or at least something that gives them a native ‘Unix’ prompt. The laptop is hardware encrypted with a locked bios, so the chances on getting a dual boot running were slim. Having only 4GB of memory did not give us a feasible virtual machine option either. So, Windows it is. Funny thing was, I have always wanted to get a ‘guest-laptop’ to be able to run jobs on a remote cluster, without having to do log in to the cluster itself.

Of course, there are a couple of prerequisites to get this up and running. For instance, the cluster must be ‘open’ to the laptop; the latter must be able to connect to a variety of ports. For the sake of this setup I assume that the cluster is not locked down from ‘impersonating’ the running Hadoop user. Your cluster may require different setup so your mileage may vary.

At the customer’s site is a MapR 5.2 development cluster that we are allowed to use to build models and transform big amounts of data. That MapR cluster will be our target to run (eventually) a pyspark session on. Please bear in mind that this is a locally installed cluster. There are a lot of problems to achieve the below when running on (f.i.) AWS due to the ‘reverse-nat-like’ network setup with this service provider. Believe me, I have tried different tunneling- and socks options but all to no avail. If someone can enlighten me, please do.

For various reasons we (still) use Spark 1.6.1. I have not tested the below with Spark 2 but I already know it will be different due to the absence of the spark assembly jar in the later versions.

So to summarize this is what you’ll need

  • An installed MapR 5.2 cluster with MEP 1.1 (Spark 1.6.1)
  • The shiny new Windows laptop that ‘was not to be’. I assume a 64-bit install.
  • Some patience if it does not run at first startup

Let’s go
As a first step, you are required to download a couple of files you’ll need to setup your client

  • The MapR 5.2 Windows client package at mapr-client-5.2.0.39122GA-1.amd64.zip. Installation of the MapR cluster is out of scope of this post.
  • An installed Python environment to run pyspark. Download it at for instance python-2.7.13.amd64.msi or python-2.6.6.amd64.msi. You’ll know how to install this one but make SURE the version you install matches the version on the cluster. 2.7 on your Windows box with 2.6 on the cluster will fail with ‘Python in worker has different version 2.6 than that in driver 2.7, PySpark cannot run with different minor versions‘. 2.6 is highly deprecated but sadly still in use on older CentOS 6 versions. You will have some trouble getting pip modules installed I believe.
  • The Spark 1.6.1 tarball without Hadoop located at (for instance spark-1.6.1-bin-without-hadoop.tgz
  • The latest Java 1.8 JDK from jdk8-downloads-2133151.html

To prevent Java serialVersionUID or NoClassDefFound errors, you’ll have to copy a few files from the cluster, they are:

  • /opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/yarn/hadoop-yarn-server-web-proxy-2.7.0-mapr-1607.jar
  • /opt/mapr/spark/spark-1.6.1/lib/datanucleus-api-jdo-4.2.1.jar
  • /opt/mapr/spark/spark-1.6.1/lib/datanucleus-core-4.1.6.jar
  • /opt/mapr/spark/spark-1.6.1/lib/datanucleus-rdbms-4.1.7.jar
  • /opt/mapr/spark/spark-1.6.1/lib/javax.servlet-api-3.1.0.jar
  • /opt/mapr/spark/spark-1.6.1/lib/spark-1.6.1-mapr-1611-yarn-shuffle.jar
  • /opt/mapr/spark/spark-1.6.1/lib/spark-assembly-1.6.1-mapr-1611-hadoop2.7.0-mapr-1602.jar
  • /opt/mapr/spark/spark-1.6.1/lib/spark-examples-1.6.1-mapr-1611-hadoop2.7.0-mapr-1602.jar

To access the Hive metastore on the cluster, download /opt/mapr/hive/hive-1.2/conf/hive-site.xml as well.

Have those ready! Before continuing any further you’ll have to set some environmental variables to get things up and running. You may set them using the horrible Windows GUI method, I prefer to create a command script that does just that. So create a directory c:\opt\mapr and a file called c:\opt\mapr\mapr-env.bat and paste the following contents in there.

@echo off
set JAVA_HOME=C:\Progra~1\Java\jdk1.8.0_121
set SPARK_HOME=C:\opt\mapr\spark\spark-1.6.1
set MAPR_HOME=C:\opt\mapr
set HADOOP_HOME=%MAPR_HOME%\hadoop\hadoop-2.7.0
set HADOOP_CONF_DIR=%HADOOP_HOME%\etc\hadoop
set YARN_CONF_DIR=%HADOOP_CONF_DIR%
set PATH=%JAVA_HOME%\bin;c:\Python27\;%SPARK_HOME%\bin;%HADOOP_HOME%\bin;%MAPR_HOME%\server;%PATH%
set HADOOP_USER_NAME=mapr
cmd /C hadoop classpath > tmpFile
set /p SPARK_DIST_CLASSPATH= < tmpFile
del tmpFile
set YARN_APPLICATION_CLASSPATH=%SPARK_DIST_CLASSPATH:\=/%
set YARN_APPLICATION_CLASSPATH=%YARN_APPLICATION_CLASSPATH:;=:%
set YARN_APPLICATION_CLASSPATH=%YARN_APPLICATION_CLASSPATH:c:/=/%

Adjust the JDK path in JAVA_HOME if necessary but make sure you use the 8.3 notation in stead of the one that uses the spaces (or install to a space-less location like c:\opt\jdk-1.8) I will not explain the above contents, they are needed to get both the Scala as well as the Python shells running.

Start installation

MapR Hadoop Client

  • Unzip the contents of mapr-client-5.2.0.39122GA-1.amd64.zip to c:\opt\mapr
  • Move the copied hadoop-yarn-server-web-proxy-2.7.0-mapr-1607.jar to C:\opt\mapr\hadoop\hadoop-2.7.0\share\hadoop\yarn
  • Run the mapr-env.bat script

Now configure the MapR Hadoop client by invoking

c:\opt\mapr\server\configure.bat -N my.cluster.com -c -C CLDB-HOST:7222 -HS HISTORYSERVER-HOST

The cluster name (-N), CLDB-HOST (-C) and HISTORYSERVER-HOST (-HS) are specific to your cluster setup! Note that the Windows configuration does not allow you to enter Zookeeper quorum information (-Z parameter) . If all goes well, no output will be given from the script.

You'll have to edit two files before you are ready to submit your first Hadoop-based YARN job. First, you'll have to tell mapreduce that you will be submitting cross-platform so edit C:\opt\mapr\hadoop\hadoop-2.7.0\etc\hadoop\mapred-site.xml and add

<property>
  <name>mapreduce.app-submission.cross-platform</name>
  <value>true</value>
</property>

Secondly, you'll have to tell the job that you'll be spoofing another user, so edit C:\opt\mapr\hadoop\hadoop-2.7.0\etc\hadoop\core-site.xml and add

<property>
  <name>hadoop.spoofed.user.uid</name>
  <value>5000</value>
</property>
<property>
  <name>hadoop.spoofed.user.gid</name>
  <value>5000</value>
</property>
<property>
  <name>hadoop.spoofed.user.username</name>
  <value>mapr</value>
</property>

If your cluster has different uid, gid or username, edit to your liking. Note that you are not restricted to the use of the mapr user. If there is another named user present, configure that one. If there is a user on the cluster that matches your Windows login name, you don'thave to edit core-site.xml. Please note that for a YARN job to succesfuly run, the user needs to be present on ALL nodes of the cluster with the same uid and gid. The Resource Manager will not accept your job if there are mismatches or you are trying to use an unknown (from Linux perspective) user.

After this installation and configuration you should be able to submit the teragen job bundled with the MapR Hadoop client:

hadoop jar %HADOOP_HOME%\share\hadoop\mapreduce\hadoop-mapreduce-examples-2.7.0-mapr-1607.jar teragen 10000 /tmp/teragen

Delete the directory first if it already exists. You should be able to use your just installed Windows client for that 😉

hadoop fs -ls /tmp
hadoop fs -rm -r /tmp/teragen

Do not continue until you have successfully ran the Teragen hadoop job!

Install Apache Spark

We will use the Spark distribution without Hadoop from the Apache downloads download site, but replace some of the jars with the ones from the cluster. Main reason for that is that the MapR distribution has it's own implementation of HDFS called MapR-FS and you'll need the classes to acces that file system. Same goes for the Hadoop client installed above, that's the reason you need the Spark without Hadoop tarball. So, do the following:

  • Create a directory called c:\opt\mapr\spark\
  • Uncompress spark-1.6.1-bin-without-hadoop.tgz to c:\opt\mapr\spark
  • Rename the directory spark-1.6.1-bin-without-hadoop to spark-1.6.1 so the Spark install will be in c:\opt\mapr\spark\spark-1.6.1
  • From the lib folder in spark-1.6.1 remove the files spark-assembly-1.6.1-hadoop2.2.0.jar, spark-1.6.1-yarn-shuffle.jar and spark-examples-1.6.1-hadoop2.2.0.jar
  • Move or copy the previously fetched jar files to the spark-1.6.1/lib folder : spark-assembly-1.6.1-mapr-1611-hadoop2.7.0-mapr-1602.jar, spark-1.6.1-mapr-1611-yarn-shuffle.jar, datanucleus-api-jdo-4.2.1.jar, datanucleus-core-4.1.6.jar and datanucleus-rdbms-4.1.7.jar
  • Optionally, but preferred, also copy javax.servlet-api-3.1.0.jar and spark-examples-1.6.1-mapr-1611-hadoop2.7.0-mapr-1602.jar to lib

To prevent the Spark jar being copied all the time, you may do the following:

hadoop fs -mkdir -p /user/mapr/apps/spark
hadoop fs -copyFromLocal %SPARK_HOME%\lib\spark-assembly-1.6.1-mapr-1611-hadoop2.7.0-mapr-1602.jar /user/mapr/apps/spark
hadoop fs -ls /user/mapr/apps/spark

On the cluster only
To run the pyspark example below, you will have to make sure numpy is installed on the cluster. You'll get an error soon enough if it's missing, so have your admin install it for you using:

sudo yum -y install numpy

On your Windows client you will need a couple of jars to get the databricks csv jar working. I have found out that passing --packages does not work with the approach in this post, but using the --jars option does. So these are the jars you need:

Download them to c:\opt\libext (or something like that). Next to that, you'll have to install a couple of Python modules as well:

python -m pip install -U pip setuptools
python -m pip install matplotlib
python -m pip install pandas
python -m pip install numpy

If you wish to access the Hive metastore from Spark, copy the hive-site.xml file to c:\opt\mapr\spark\spark-1.6.1\conf. Now you should be ready to start a Scala Spark shell:

spark-shell --master yarn-client --conf spark.yarn.jar=maprfs:///user/mapr/apps/spark/spark-assembly-1.6.1-mapr-1611-hadoop2.7.0-mapr-1602.jar --conf spark.hadoop.yarn.application.classpath=%YARN_APPLICATION_CLASSPATH%

To start a Python Spark shell use the following command:

pyspark --master yarn-client --conf spark.yarn.jar=maprfs:///user/mapr/apps/spark/spark-assembly-1.6.1-mapr-1611-hadoop2.7.0-mapr-1602.jar --conf spark.hadoop.yarn.application.classpath=%YARN_APPLICATION_CLASSPATH%

Now to run the pyspark ML example from MapR, download some files first and put them on MapR-FS:

Put them on MapR-FS in maprfs://tmp/

hadoop fs -copyFromLocal churn-bigml-80.csv /tmp/
hadoop fs -copyFromLocal churn-bigml-20.csv /tmp/

Now run pyspark and copy and paste the code below. It should run nicely on the cluster!

Start pyspark

pyspark --master yarn-client --conf spark.yarn.jar=maprfs:///user/mapr/apps/spark/spark-assembly-1.6.1-mapr-1611-hadoop2.7.0-mapr-1602.jar --conf spark.hadoop.yarn.application.classpath=%YARN_APPLICATION_CLASSPATH% --jars C:\opt\libext\spark-csv_2.10-1.3.0.jar,C:\opt\libext\univocity-parsers-1.5.1.jar,C:\opt\libext\commons-csv-1.1.jar

Code

CV_data = sqlContext.read.load('maprfs:///tmp/churn-bigml-80.csv',format='com.databricks.spark.csv',header='true',inferSchema='true')
final_test_data = sqlContext.read.load('maprfs:///tmp/churn-bigml-20.csv', format='com.databricks.spark.csv', header='true', inferSchema='true')						  

CV_data.cache()
final_test_data.cache()

CV_data.printSchema()
final_test_data.printSchema()

from pyspark.sql.types import DoubleType
from pyspark.sql.functions import UserDefinedFunction

binary_map = {'Yes':1.0, 'No':0.0, 'True':1.0, 'False':0.0}
toNum = UserDefinedFunction(lambda k: binary_map[k], DoubleType())

CV_data = CV_data.drop('State').drop('Area code') \
    .drop('Total day charge').drop('Total eve charge') \
    .drop('Total night charge').drop('Total intl charge') \
    .withColumn('Churn', toNum(CV_data['Churn'])) \
    .withColumn('International plan', toNum(CV_data['International plan'])) \
    .withColumn('Voice mail plan', toNum(CV_data['Voice mail plan'])).cache()

final_test_data = final_test_data.drop('State').drop('Area code') \
    .drop('Total day charge').drop('Total eve charge') \
    .drop('Total night charge').drop('Total intl charge') \
    .withColumn('Churn', toNum(final_test_data['Churn'])) \
    .withColumn('International plan', toNum(final_test_data['International plan'])) \
    .withColumn('Voice mail plan', toNum(final_test_data['Voice mail plan'])).cache()

from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import DecisionTree

def labelData(data):
    # label: row[end], features: row[0:end-1]
    return data.map(lambda row: LabeledPoint(row[-1], row[:-1]))

training_data,testing_data = labelData(CV_data).randomSplit([0.8, 0.2])

model = DecisionTree.trainClassifier(training_data, numClasses=2, maxDepth=2,
                                     categoricalFeaturesInfo={1:2, 2:2},
                                     impurity='gini', maxBins=32)

print model.toDebugString()

Conclusion and wrapup
So, allthough it's a bit of a hassle, you can get things up and running using a Windows client. The one thing I could not get up and running was the driver's web UI on port 4040 It seems to be working as long as the Spark contexts are not loaded yet, after that the web UI fails with a nullpointer exception. Luckily, you can get to a lot of info at the application master on the cluster (port 8088). Have fun!

Running your R analysis on a Spark cluster

Challenge

For one of our clients we are in the process of designing a versatile data platform that can be used, among others, to run R analysis on. In this post I’ll summarise the actions I did to get R running using RStudio 0.99.473, R 3.2.2 and Spark 1.4.0 hereby leveraging the potential of a Spark cluster.

Prerequisites

For the R analysis I used my very recent (2015) MacBook Pro with 16GB of memory and Yosemite installed. Although not tested, to the best of my knowledge you should be able to get it up and running on a recent Linux distribution and Windows as well. You will need a copy of the Spark binaries to be present on your machine. I have it installed (unzipped) in /Users/rutger/Development/spark-1.4.0-bin-hadoop2.6. You can download a copy (use a pre-built version) from

http://spark.apache.org/downloads.html

I’m aware that at the time of writing Spark 1.4.1 has been released but I used the 1.4.0 release to match our current dev-cluster’s version.

Install R

Download and install the R distribution for Mac OS X from https://cran.r-project.org/bin/macosx/ and install using ‘the’ regular method.

Install RStudio

Download and install RStudio for your OS from https://www.rstudio.com/products/rstudio/download/

Working Spark cluster

Obviously, to actually run SparkR jobs, you will be needing a Spark cluster. In my case, I used a Intel NUC-based 3-node ‘take it with you’ cluster provisioned with Apache Hadoop 2.7.0 and Spark 1.4.0. I have one Spark master called nucluster3 and three slaves called nucluster[1-3] (the master is also a slave).

I will not go into detail in this post on installing a development Spark cluster apart from stating that it should not give you any headache getting it up and running as long as you only edit SPARK_HOME/conf/spark-env.sh (add JAVA_HOME variable) and SPARK_HOME/conf/slaves (list all slave hostnames) and start the cluster on the master node (nucluster3 in our case) by using start-master.sh and start-slaves.sh. You should than be greeted on http://nucluster3:8080 by:

aaeaaqaaaaaaaandaaaajgrmytdlyzqzlwi0ntutnguxzs04ntrhltzjztcxytmxyjiwma

Analyze!

All’s set and done from the software perspective, so let’s head over to the setup and the actual running of R statements.

Setup

We need a file to work on. For this post, I used a file with zipcodes called postcode.csv. You may upload it to your HDFS cluster with HUE, but I used the hardcore way by first copying it to one of the nodes and from there, used the hadoop fs command to put it on HDFS:

  • scp postcode.csv root@nucluster1:
  • hadoop fs -copyFromLocal postcode.csv /user/root/
  • head -2 postcode.csv

“id”,”postcode”,”postcode_id”,”pnum”,”pchar”,”minnumber”,”maxnumber”,”numbertype”,”street”,”city”,”city_id”,”municipality”,”municipality_id”,”province”,”province_code”,”lat”,”lon”,”rd_x”,”rd_y”,”location_detail”,”changed_date”
395614,”7940XX”,79408888,7940,”XX”,4,12,”mixed”,”Troelstraplein”,”Meppel”,1082,”Meppel”,119,”Drenthe”,”DR”,”52.7047653217626″,”6.1977201775604″,”209781.52077777777777777778″,”524458.25733333333333333333″,”postcode”,”2014-04-10 13:20:28″

I did not bother to strip or use the header in this case. It’s just there. So, you’re now ready to start your analysis! Fire up RStudio, you will be greeted by the familiar R prompt, accompanied by some warnings you may safely ignore for now.

aaeaaqaaaaaaaamlaaaajgqym2m3ndbjlwu1mwetndbkni1ioti4lwu3nmixmdiwyzdizq

Now a few things have to be set up. R needs to know where it can find the SparkR library. In my case I use the following command:

.libPaths(c(.libPaths(), ‘/Users/rutger/Development/spark-1.4.0-bin-hadoop2.6/R/lib’))

No response will be given. Your R analysis will actually be an application running on the Spark cluster submitted by the sparkr-shell. So R(Studio) should know where it can find this command. Because a CSV file is to be read, the spark-csv package is also loaded:

Sys.setenv(PATH = paste(Sys.getenv(c(‘PATH’)), ‘/Users/rutger/Development/spark-1.4.0-bin-hadoop2.6/bin’, sep=’:’))

Sys.setenv(‘SPARKR_SUBMIT_ARGS’='”–packages” “com.databricks:spark-csv_2.11:1.1.0” “sparkr-shell”‘)

Again, these commands should not give you any response unless you make a mistake. You are now ready to load the SparkR library:

library(SparkR)

This will give you some feedback, something like

Attaching package: ‘SparkR’

Followed by some remarks about masked packages. They are harmless in our case:

aaeaaqaaaaaaaauraaaajdi5mwrlzguyltvlotktngnhoc04zdixltqyy2jhyjk3mji3oa

The first thing we’ll have to do is create two contexts; the Spark context and the SparkRSQL context:

sc <- sparkR.init(master = “spark://nucluster3:7077″, appName=”SparkR”)

sqlContext <- sparkRSQL.init(sc)

The first command produces a fair amount of output but should complete without warnings, the second one should finish silently. Remember the postcodes I put on HDFS? We are now ready to load them into a dataframe. nucluster1 is the cluster’s namenode, by the way.

postcodes <- read.df(sqlContext, “hdfs://nucluster1:9000/user/root/postcode.csv”, source = “com.databricks.spark.csv”)

This command should finish successfully again with a fair amount of output. You are now ready to do your R magic, but in the examples below I will stick to very simple ones.

Plain R commands

head(postcodes)

aaeaaqaaaaaaaakoaaaajdvly2y4zjgyltiwotutngfmny05ndexltk0yzfkowiwywrmzq

count(postcodes)

aaeaaqaaaaaaaaskaaaajdiwyta3ndeyltnkotatnguxzc1hzjaylwy4mji1mzm0mdy4za

A slightly more difficult example is to count by one of the fields. In this case only the first few lines are returned

head(summarize(groupBy(postcodes, postcodes$C3), count = n(postcodes$C3)))

aaeaaqaaaaaaaaxdaaaajgm2ndcxzdq5lthkntktngvims1injq0lwm3zmfhothlzmu1ng

Please note that without the ‘head’ command, nothing will actually be executed, only the return type will be displayed:

DataFrame[C3:string, count:bigint] 

SQL commands

One of the powerful features of this setup is the ability to use SQL in your commands. For this to work you’ll first have to create a temporary table definition.

registerTempTable(postcodes, “postcodes”)

amsterdam <- sql(sqlContext, “SELECT C1,C8 FROM postcodes WHERE C9 = ‘Amsterdam'”)
head(amsterdam)

It may sound boring, but the first command will stay silent and the second one will return the query result (All Amsterdam zipcodes):

aaeaaqaaaaaaaaxdaaaajgm2ndcxzdq5lthkntktngvims1injq0lwm3zmfhothlzmu1ng

Of course, you are able to write much more cpu- and disk intensive queries. This totally bogus one put’s the cluster to work for a while:

count(sql(sqlContext, “SELECT p1.C1,p1.C8 FROM postcodes p1, postcodes p2 where p1.C1<p2.C1”))

Note that all activity is done on the cluster, not your own machine! Your machine is the job driver and only serves as such. But you can view a lot of information about your job: take a look at http://localhost:4040/jobs/ and find your own running one.

aaeaaqaaaaaaaaqfaaaajdbkodhlnjdilwmynzatngvkmi1hywqwltnjmduxntlhmzu1oq

In the screenshot above you will see the ‘sort-of-cartesian-product’ I created hogging the system with a huge input set. Feel free to look around jobs, stages (note the beautiful DAG Visualization Spark provides) and storage information. You may also check out the Spark Master and note your running application named SparkR at http://nucluster3:8080/

aaeaaqaaaaaaaaahaaaajgy2nzk3mzyxltmwntytnge4zi05mta2ltnkmwfjytzknwvioq

I’m done, so I quit RStudio by using the well known q command. You may choose wether to save your workspace or not!

q()

Conclusions and final remarks

Analysing your data using familiar R while at the same time leveraging the power of a Spark cluster is a breeze once you get the hang of it. I mainly focussed on ‘getting it up and running‘ and not on ‘beautiful R apps‘. Go and knock yourself out with libraries like Shiny (http://shiny.rstudio.com/)!

Remarks

Of course you always find out after writing a blog post that you have missed something trivial. In case of loading the csv, spark-csv will give you column names for free if you load using:

postcodes <- read.df(sqlContext, “hdfs://nucluster1:9000/user/root/postcode.csv”, source = “com.databricks.spark.csv”, header = “true”)

Apart from that, some minor annoyances popped up:

  • I noticed on the nodes of the cluster that only one core was used in stead of the four that are in. I must have overlooked something during configuration and will reply or update when I found the solution.
  • Apart from that, I really like Spark to be less chatty (get rid of the INFO messages) and will try to create a log4j.properties file for that.

Executing existing R scripts from Spark

Introduction

In one of my previous posts I illustrated how to perform R analysis using a Spark cluster. This is all safe and sound if the developer is starting from scratch and/or no existing, proven, R scripts are already developed. But most (data)scientists already have a handful of scripts that they use during their day-to-day work and need some extra power around them. Or, even better, the developed models are to be used during the actual realtime processing of the data, for instance to support a recommendation engine or some other complex analysis.

In this post I’ll illustrate a hybrid solution leveraging the power of both Spark (by which I mean in this post Spark/Scala) and R but this time using a powerful feature called ‘piping’. RDD’s have the ability to pipe themselves out of the Spark context using regular standard-in (stdin) and standard-out (stdout). This behavior closely resamples the ability of map/reduce to use Python, Perl, Bash or other scripts to do mapping or reducing.

Once you get to know how to actually do it, it’s surprisingly simple!

Setup

A proper R environment is required, for which the setup can be found in one of my previous posts. Note that to follow along this particular post, you will not need RStudio, but it will come in handy if you like to extend the scripts provided. You will however need a couple of extra packages (text mining, NLP and snowball stemmer). Start R and issue the following commands:

install.packages(‘tm’, repos=’http://cran.xl-mirror.nl’)
install.packages(‘NLP’, repos=’http://cran.xl-mirror.nl’)
install.packages(‘SnowballC’, repos=’http://cran.xl-mirror.nl’)

You may now quit the R session by invoking

q()

See the screenshot below for full details of the output.

aaeaaqaaaaaaaaxuaaaajgq5odfkmdzjlti0yzmtngi3mi05mmnlltqwotjlnmywzdqzza

Let’s go!

In the example below, the responsibilities of R and Spark are clearly separated:

  • R is used only to sanitize the incoming data
  • Spark performs the actual wordcount

The picture below represents the dataflow. Once the Spark context is started up, it starts reading a file called ‘alice.txt’. Every line is piped-out on stdout to a R wrapper script containing only logic to read the lines, pass them to a function called ‘sanitize’ (which is loaded from the corresponding R file) and return them on stdout of the R Runtime to the Spark context.

Using a wrapper keeps the code in sanitize.R reusable and independent on being called just from Spark. In this case, a simple ‘source’ command is executed to load the function in sanitize.R; in productional environments it may be wise to create reusable and easily distributable packages contained in a repository.

aaeaaqaaaaaaaar8aaaajdviywi5ywm5ltc4mzutndfkoc1hmgm0lwi5ythjmwuwymviza

The Scala context picks up the sanitized lines and uses map and reduce logic to calculate the appearances of the words. These will probably differ from the original input because, for instance, stemming is done (note the appearance of the word ‘alic’ in stead of the original input ‘Alice’. Finally, the result is written to stdout.

Let’s start with the Scala code. Only relevant code is shown for brevity. Also, the code below is a functional one-liner without any intermediate variables, hereby making it slightly less readable for a programmer to actually ‘get’ what’s happening.

File : ExternalR.scala

aaeaaqaaaaaaaauxaaaajdi2odayodg3lwm4mwetndllzc05zdg0ltk3mdywnwm1ode0yw

The comments in the code actually describe what’s taking place. After the SparkContext has been created, a textfile called ‘alice.txt’ is read using the ‘textFile’ method of the context and it’s contents piped to stdout using RDD’s ‘pipe’ function. In this particular case, the receiving end is an R script, but it may be any script or application which is able to read data from stdin and write to stdout.

The script or application that is called must be executable. On a Linux box that means it’s permissions are at least 500, to be executable and readible by the user that is executing the Scala code. If it’s not, an error will be raised:

java.io.IOException: Cannot run program “./wrapper.R”: error=13, Permission denied

After the R script returns it’s data the ‘classic’ wordcount is done. A few tricks have been added, for instance sorting the set on count (which implies doing a dual swap because there is nog sortByValue at the moment) and filtering empty words. Finally, the top 100 words are taken and output to the console.

Note that the above construction creates a scalable solution both from the Spark as well as the R perspective. As long as the R scripts does not need ALL the data from the input file, execution of the R script will scale linearly (within the boundaries of your cluster, that is). The Spark context will collect all data together and reduce the data to it’s final state.

The R code is made of two scripts,explained below.

File : wrapper.R

aaeaaqaaaaaaaayhaaaajgi5mjm4yzdhlthmnjatndrhny04mzlkltzmmwjlzjc3njbjma

Again, the code above is pretty self-explanatory. First, a check is done if the requested function (sanitize) exists and, if not, the script containing it (sanitize.R ) is included. Stdin is opened, followed by reading it one line at the time (note the ‘n=1’ parameter to readLines). The line is passed to the sanitize function and it’s output written to stdout. Finally, stdin is closed. No return value is needed. The sha-bang (#!) at the head of the R script tells the system that this file is a set of commands to be fed to the command interpreter indicated. In this case, R. Without this sha-bang line, execution will most likely fail.

As said, the wrapper is called using two parameters, the library file to be loaded and the function to be called inside that library file, hereby creating a very versatile wrapper (not only for Spark) which can read from stdin, perform some logic, and write to stdout. A small performance penalty may be hit using the ‘eval’ construct. That has not been validated. Some example output from calling the wrapper standalone follows:

aaeaaqaaaaaaaacgaaaajde2mwvmnjrkltdkntmtngfjnc04zjzjlte5mmnjotu3owrlzq

The wrapper may be extended to be able to include multiple libraries and function flows, but that exercise is left to the reader. The actual ‘business logic’ is done in sanitize.R. Needed libraries are loaded and the input line is sanitized according to our needs. The beauty of this is, this function may be part of a much bigger R library which has no clue that it will be called from a Spark context one day.

File : sanitize.R

aaeaaqaaaaaaaas_aaaajdfioti5zju0lwzjzwytngi3ny05ywewltc5nwjkn2rimmi4yw

Succesfully running the Spark/Scala job will result in something like the following screen, which can be found at http://localhost:4040 (or at the location of your Spark Driver host):

aaeaaqaaaaaaaatyaaaajgeyywzhzdm1lwjlntmtngrjys1hzwy5ltvmzgniymeynmuxmg

Summary

As said, leveraging the power of R from Spark is a breeze. It took me a little while to get it up and running, but the result is a scalable, generic solution to enable data scientist to have their R code being used from a Spark Context.

How to connect to your Kerberos secured cluster using FoxyProxy, SSH and a Dynamic Proxy

Introduction

So here you find yourself with a shiny new smartcard which enables you to log in to the client’s network. The IT guys did a great job setting up your corporate laptop, you happen to be a local admin on it, and you are dying to start the installation of your preferred Hadoop distribution and run al those fancy Spark jobs the data scientist made. It’s still your first day, so naively you ask one of your co-workers the ip address and credentials for one of the vm’s the (other) IT guys have prepared for you.

BUMP…

See that smartcard you got? That’s your ticket to cluster heaven. Never forget to bring it with you, or your day will be worthless. Not also does it unlock your laptop, the security infrastructure this company has laid down forbids local users on servers and relies on the use of Kerberos tickets. I’m by far an expert on that matter, so I will stick to saying that this was working without a flaw (but it’s important later on). Ah, another challenge… Root access on the vm’s? No way, José.

So what about the ip addresses of the cluster’s servers, you ask. Wait a minute here, sparky. You find yourself at a pretty large company with all kinds of sensitive customer information. The cluster’s vm’s are only to be accessed using a stepping server, giving IT the possibility to lock you out of ALL vm’s at once if you misbehave. Slowly you get the feeling that one day to install your cluster will possibly not suffice. Because of the lack of root privileges and the minimum amount of needed cluster components, I decided to do a plain Apache Hadoop install in stead of that wonderful all-open-source distribution with the light green logo.

IT provided me with a dedicated user called ‘cluster’ which I was able to sudo to. This user became the owner of all software beneath the /opt/cluster directory. For the sake if simplicity I assume in this article that this user starts all processes, in stead of dedicated users for HDFS, YARN, Elasticsearch and Kibana (which is preferred). The installation of the components is out of scope of this article.

Let’s go

I decided that for the moment I would be happy if all frontend web ui’s (f.i. HDFS, YARN, Spark Drivers and Kibana) would be accessible from my laptop. During an earlier project I struggled to submit Spark jobs using SSH forwarding, so I set that wish aside. Enough hurdles ahead, but I never fear to take them. So let me first introduce the global picture.

aaeaaqaaaaaaaalhaaaajde0otrjnzlhlwuyngytnguzzc05njqzlwvkythiyjkwmjfinq

 

There is a office LAN in the 192.168.x.x range with a DNS server that serves requests for that LAN only. You are authenticated on your (Windows) workstation using a smartcard which will give you a Kerberos ticket via NTLM to access servers on the complete network. Access to the cluster LAN in the 10.0.0.x range is via the dual headed Stepping Server. Your workstation holds Firefox, PuTTY and the FoxyProxy plugin. IT has granted you access to the Stepping Server and the nodes of the cluster. Inter-cluster-node fire-walling is turned OFF for simplicity. (There are already two in front of the cluster, right?)

So the basic challenges listed:

  1. Securely access the web ui’s on the cluster nodes from the workstation
  2. Access the nodes by hostname
  3. Respect the Kerberos infrastructure
  4. Prevent exposure of those ui’s and data to other users
  5. Don’t do illegal things. I repeat: do not do illegal things

Before you continue to do this yourself, a few assumptions are made

  1. You are able to edit the hosts file on your machine (which needs root in Unix-like systems and administrator rights on Windows) For another client a actually wrote a Java program which is able to use a non-root private DNS server, which I may write another article on in the future
  2. You are able to install Firefox plugins (if bullet 1 above is not possible, there might by Firefox plugins that can overrule or add DNS records, I have not taken a look into that)
  3. On the stepping server, SSH forwarding is allowed. This may seem trivial, but some IT departments might disable this feature to actually prevent you from doing what I am about to show to you.
  4. You are comfortable with a state of mind that is summarised as ‘if it’s possible, it’s allowed’. But remember bullet 5 above!

DNS resolving

This is actually one of the easiest bits. Just edit your hosts file (c:\windows\system32\drivers\etc\hosts on Windows or /etc/hosts on most Unix-like systems) to include

10 . 0 . 0 . 151 node-1
10 . 0 . 0 . 152 node-2
...
10 . 0 . 0 . 155 node-5

You will now be able to do a nslookup on the hostnames of the cluster nodes. Bear in mind that a ping will also resolve the hostname but will not reply successfully because of the lack of proper routing to the cluster nodes.

Install and configure FoxyProxy

Use the Firefox extensions installer or go to https://getfoxyproxy.org/ to download the extension. I used the Basic version, which has enough functionality to support this use case. You may configure it like the picture below:

aaeaaqaaaaaaaae2aaaajgq1zmzlztuzlti1ntetndbjns04zjcylwjmztiymjnkmznjzq

 

The important bits here are to make sure you check the ‘SOCKS proxy?’ box and ‘SOCKS v5’ bullet. The host or IP address is ‘localhost’ and the port set to any underprivileged one, like 6666 in my case. Remember this number because it is needed when the connection to the cluster is made. FoxyProxy will look like below. Please note that once you enable FoxyProxy in this scenario, you are not able to browse other websites, it will be a dedicated connection to the cluster! So enable and disable FoxyProxy to your needs.

aaeaaqaaaaaaaadkaaaajdzlzjaxzjkzlwi0ndytndi2zs05otk1lwi2m2jkyzfmyzq1nw

Connect to the cluster

Connecting to the cluster involves two steps: connecting to the Stepping Server followed by connecting to any one of the cluster-nodes.

While connecting to the Stepping Server it is important that a few configurations are made:

1 – In PuTTY you will need ‘Allow GSSAPI credential delegation’ to be able to take the Kerberos ticket with you all the way down to the cluster-nodes.

2 – In PuTTY’s tunnel configuration add a local forward from the FoxyProxy port (6666) to localhost:6666. ‘Localhost’ in this case refers to the Stepping Server, not your workstation!

3 – Do NOT check ‘Local ports accept connections from other hosts’. If you do that and you have no firewall installed, other users are able to use your workstation as a gateway to the cluster!

Setting GSSAPI options

aaeaaqaaaaaaaaebaaaajdqynjg0zmjjltq5nwqtndjiyy05y2m0lteyyjfkmzqwnmq1zg

Setting up port forwarding

aaeaaqaaaaaaaai9aaaajdzly2q5yja3ltcxm2ytndm1ns04oty5ltk2y2rjzmnhzwuxna

After a successful connection to the Stepping Server you are now ready to continue to on of the cluster nodes. On our Linux host it proved to be enough to enter

ssh -D 6666 node-1

The magic is hidden in the -D command-line switch. This starts a dynamic proxy on the connection which is able to forward requests on ANY node/port combination. FoxyProxy sends it’s requests via the PuTTY connection and the Linux ssh connection above to the corresponding server and passes the results back again. I had to keep this connection alive by using a command like ‘ping -i 10 localhost’ because it looked like IT disabled connection keep-alives in SSHD. The chain now looks like this:

aaeaaqaaaaaaaahcaaaajdaxotyyotfklwi0zdgtndc0zs1iztm2ltziowixnzcwmddjzg

Voilà

The end result should be that you are now able to browse to the various web ui’s of your cluster:

1 – Kibana : http://node-1:5601/app/kibana#/dashboard/cluster

2 – HUE : http://node-2:8888/jobbrowser/

3 – HDFS : http://node-3:50070/explorer.html#/

4 – YARN : http://node-4:8088/cluster/nodes

Note the use of the hostnames of the cluster and the ability to actually use ANY port on the cluster nodes. This HIGHLY differs from a scenario where you do local port forwarding. When browsing YARN, for instance, you are now able to connect to any node, to any port/service of YARN like the 8042 node HTTP port, making the cluster easily browsable.

Conclusion

At first glance, I just shook my head and wondered why things could be overcomplicated and secured so/too much. But as with any client, there is a reason for most aspects of security. In the end I managed to satisfy the need to connect to the web ui’s of the cluster in a legal, secure and maintainable way.

After finding this one out, other users of the cluster were able to connect as well and do their magic. Next stop, of course: submitting a Spark job using the proxy!

Heading picture re-used from : http://neurosciencenews.com/neuron-cluster-mapping-neuroscience-1807