Running your R analysis on a Spark cluster

Challenge

For one of our clients we are in the process of designing a versatile data platform that can be used, among others, to run R analysis on. In this post I’ll summarise the actions I did to get R running using RStudio 0.99.473, R 3.2.2 and Spark 1.4.0 hereby leveraging the potential of a Spark cluster.

Prerequisites

For the R analysis I used my very recent (2015) MacBook Pro with 16GB of memory and Yosemite installed. Although not tested, to the best of my knowledge you should be able to get it up and running on a recent Linux distribution and Windows as well. You will need a copy of the Spark binaries to be present on your machine. I have it installed (unzipped) in /Users/rutger/Development/spark-1.4.0-bin-hadoop2.6. You can download a copy (use a pre-built version) from

http://spark.apache.org/downloads.html

I’m aware that at the time of writing Spark 1.4.1 has been released but I used the 1.4.0 release to match our current dev-cluster’s version.

Install R

Download and install the R distribution for Mac OS X from https://cran.r-project.org/bin/macosx/ and install using ‘the’ regular method.

Install RStudio

Download and install RStudio for your OS from https://www.rstudio.com/products/rstudio/download/

Working Spark cluster

Obviously, to actually run SparkR jobs, you will be needing a Spark cluster. In my case, I used a Intel NUC-based 3-node ‘take it with you’ cluster provisioned with Apache Hadoop 2.7.0 and Spark 1.4.0. I have one Spark master called nucluster3 and three slaves called nucluster[1-3] (the master is also a slave).

I will not go into detail in this post on installing a development Spark cluster apart from stating that it should not give you any headache getting it up and running as long as you only edit SPARK_HOME/conf/spark-env.sh (add JAVA_HOME variable) and SPARK_HOME/conf/slaves (list all slave hostnames) and start the cluster on the master node (nucluster3 in our case) by using start-master.sh and start-slaves.sh. You should than be greeted on http://nucluster3:8080 by:

aaeaaqaaaaaaaandaaaajgrmytdlyzqzlwi0ntutnguxzs04ntrhltzjztcxytmxyjiwma

Analyze!

All’s set and done from the software perspective, so let’s head over to the setup and the actual running of R statements.

Setup

We need a file to work on. For this post, I used a file with zipcodes called postcode.csv. You may upload it to your HDFS cluster with HUE, but I used the hardcore way by first copying it to one of the nodes and from there, used the hadoop fs command to put it on HDFS:

  • scp postcode.csv root@nucluster1:
  • hadoop fs -copyFromLocal postcode.csv /user/root/
  • head -2 postcode.csv

“id”,”postcode”,”postcode_id”,”pnum”,”pchar”,”minnumber”,”maxnumber”,”numbertype”,”street”,”city”,”city_id”,”municipality”,”municipality_id”,”province”,”province_code”,”lat”,”lon”,”rd_x”,”rd_y”,”location_detail”,”changed_date”
395614,”7940XX”,79408888,7940,”XX”,4,12,”mixed”,”Troelstraplein”,”Meppel”,1082,”Meppel”,119,”Drenthe”,”DR”,”52.7047653217626″,”6.1977201775604″,”209781.52077777777777777778″,”524458.25733333333333333333″,”postcode”,”2014-04-10 13:20:28″

I did not bother to strip or use the header in this case. It’s just there. So, you’re now ready to start your analysis! Fire up RStudio, you will be greeted by the familiar R prompt, accompanied by some warnings you may safely ignore for now.

aaeaaqaaaaaaaamlaaaajgqym2m3ndbjlwu1mwetndbkni1ioti4lwu3nmixmdiwyzdizq

Now a few things have to be set up. R needs to know where it can find the SparkR library. In my case I use the following command:

.libPaths(c(.libPaths(), ‘/Users/rutger/Development/spark-1.4.0-bin-hadoop2.6/R/lib’))

No response will be given. Your R analysis will actually be an application running on the Spark cluster submitted by the sparkr-shell. So R(Studio) should know where it can find this command. Because a CSV file is to be read, the spark-csv package is also loaded:

Sys.setenv(PATH = paste(Sys.getenv(c(‘PATH’)), ‘/Users/rutger/Development/spark-1.4.0-bin-hadoop2.6/bin’, sep=’:’))

Sys.setenv(‘SPARKR_SUBMIT_ARGS’='”–packages” “com.databricks:spark-csv_2.11:1.1.0” “sparkr-shell”‘)

Again, these commands should not give you any response unless you make a mistake. You are now ready to load the SparkR library:

library(SparkR)

This will give you some feedback, something like

Attaching package: ‘SparkR’

Followed by some remarks about masked packages. They are harmless in our case:

aaeaaqaaaaaaaauraaaajdi5mwrlzguyltvlotktngnhoc04zdixltqyy2jhyjk3mji3oa

The first thing we’ll have to do is create two contexts; the Spark context and the SparkRSQL context:

sc <- sparkR.init(master = “spark://nucluster3:7077″, appName=”SparkR”)

sqlContext <- sparkRSQL.init(sc)

The first command produces a fair amount of output but should complete without warnings, the second one should finish silently. Remember the postcodes I put on HDFS? We are now ready to load them into a dataframe. nucluster1 is the cluster’s namenode, by the way.

postcodes <- read.df(sqlContext, “hdfs://nucluster1:9000/user/root/postcode.csv”, source = “com.databricks.spark.csv”)

This command should finish successfully again with a fair amount of output. You are now ready to do your R magic, but in the examples below I will stick to very simple ones.

Plain R commands

head(postcodes)

aaeaaqaaaaaaaakoaaaajdvly2y4zjgyltiwotutngfmny05ndexltk0yzfkowiwywrmzq

count(postcodes)

aaeaaqaaaaaaaaskaaaajdiwyta3ndeyltnkotatnguxzc1hzjaylwy4mji1mzm0mdy4za

A slightly more difficult example is to count by one of the fields. In this case only the first few lines are returned

head(summarize(groupBy(postcodes, postcodes$C3), count = n(postcodes$C3)))

aaeaaqaaaaaaaaxdaaaajgm2ndcxzdq5lthkntktngvims1injq0lwm3zmfhothlzmu1ng

Please note that without the ‘head’ command, nothing will actually be executed, only the return type will be displayed:

DataFrame[C3:string, count:bigint] 

SQL commands

One of the powerful features of this setup is the ability to use SQL in your commands. For this to work you’ll first have to create a temporary table definition.

registerTempTable(postcodes, “postcodes”)

amsterdam <- sql(sqlContext, “SELECT C1,C8 FROM postcodes WHERE C9 = ‘Amsterdam'”)
head(amsterdam)

It may sound boring, but the first command will stay silent and the second one will return the query result (All Amsterdam zipcodes):

aaeaaqaaaaaaaaxdaaaajgm2ndcxzdq5lthkntktngvims1injq0lwm3zmfhothlzmu1ng

Of course, you are able to write much more cpu- and disk intensive queries. This totally bogus one put’s the cluster to work for a while:

count(sql(sqlContext, “SELECT p1.C1,p1.C8 FROM postcodes p1, postcodes p2 where p1.C1<p2.C1”))

Note that all activity is done on the cluster, not your own machine! Your machine is the job driver and only serves as such. But you can view a lot of information about your job: take a look at http://localhost:4040/jobs/ and find your own running one.

aaeaaqaaaaaaaaqfaaaajdbkodhlnjdilwmynzatngvkmi1hywqwltnjmduxntlhmzu1oq

In the screenshot above you will see the ‘sort-of-cartesian-product’ I created hogging the system with a huge input set. Feel free to look around jobs, stages (note the beautiful DAG Visualization Spark provides) and storage information. You may also check out the Spark Master and note your running application named SparkR at http://nucluster3:8080/

aaeaaqaaaaaaaaahaaaajgy2nzk3mzyxltmwntytnge4zi05mta2ltnkmwfjytzknwvioq

I’m done, so I quit RStudio by using the well known q command. You may choose wether to save your workspace or not!

q()

Conclusions and final remarks

Analysing your data using familiar R while at the same time leveraging the power of a Spark cluster is a breeze once you get the hang of it. I mainly focussed on ‘getting it up and running‘ and not on ‘beautiful R apps‘. Go and knock yourself out with libraries like Shiny (http://shiny.rstudio.com/)!

Remarks

Of course you always find out after writing a blog post that you have missed something trivial. In case of loading the csv, spark-csv will give you column names for free if you load using:

postcodes <- read.df(sqlContext, “hdfs://nucluster1:9000/user/root/postcode.csv”, source = “com.databricks.spark.csv”, header = “true”)

Apart from that, some minor annoyances popped up:

  • I noticed on the nodes of the cluster that only one core was used in stead of the four that are in. I must have overlooked something during configuration and will reply or update when I found the solution.
  • Apart from that, I really like Spark to be less chatty (get rid of the INFO messages) and will try to create a log4j.properties file for that.

Executing existing R scripts from Spark

Introduction

In one of my previous posts I illustrated how to perform R analysis using a Spark cluster. This is all safe and sound if the developer is starting from scratch and/or no existing, proven, R scripts are already developed. But most (data)scientists already have a handful of scripts that they use during their day-to-day work and need some extra power around them. Or, even better, the developed models are to be used during the actual realtime processing of the data, for instance to support a recommendation engine or some other complex analysis.

In this post I’ll illustrate a hybrid solution leveraging the power of both Spark (by which I mean in this post Spark/Scala) and R but this time using a powerful feature called ‘piping’. RDD’s have the ability to pipe themselves out of the Spark context using regular standard-in (stdin) and standard-out (stdout). This behavior closely resamples the ability of map/reduce to use Python, Perl, Bash or other scripts to do mapping or reducing.

Once you get to know how to actually do it, it’s surprisingly simple!

Setup

A proper R environment is required, for which the setup can be found in one of my previous posts. Note that to follow along this particular post, you will not need RStudio, but it will come in handy if you like to extend the scripts provided. You will however need a couple of extra packages (text mining, NLP and snowball stemmer). Start R and issue the following commands:

install.packages(‘tm’, repos=’http://cran.xl-mirror.nl’)
install.packages(‘NLP’, repos=’http://cran.xl-mirror.nl’)
install.packages(‘SnowballC’, repos=’http://cran.xl-mirror.nl’)

You may now quit the R session by invoking

q()

See the screenshot below for full details of the output.

aaeaaqaaaaaaaaxuaaaajgq5odfkmdzjlti0yzmtngi3mi05mmnlltqwotjlnmywzdqzza

Let’s go!

In the example below, the responsibilities of R and Spark are clearly separated:

  • R is used only to sanitize the incoming data
  • Spark performs the actual wordcount

The picture below represents the dataflow. Once the Spark context is started up, it starts reading a file called ‘alice.txt’. Every line is piped-out on stdout to a R wrapper script containing only logic to read the lines, pass them to a function called ‘sanitize’ (which is loaded from the corresponding R file) and return them on stdout of the R Runtime to the Spark context.

Using a wrapper keeps the code in sanitize.R reusable and independent on being called just from Spark. In this case, a simple ‘source’ command is executed to load the function in sanitize.R; in productional environments it may be wise to create reusable and easily distributable packages contained in a repository.

aaeaaqaaaaaaaar8aaaajdviywi5ywm5ltc4mzutndfkoc1hmgm0lwi5ythjmwuwymviza

The Scala context picks up the sanitized lines and uses map and reduce logic to calculate the appearances of the words. These will probably differ from the original input because, for instance, stemming is done (note the appearance of the word ‘alic’ in stead of the original input ‘Alice’. Finally, the result is written to stdout.

Let’s start with the Scala code. Only relevant code is shown for brevity. Also, the code below is a functional one-liner without any intermediate variables, hereby making it slightly less readable for a programmer to actually ‘get’ what’s happening.

File : ExternalR.scala

aaeaaqaaaaaaaauxaaaajdi2odayodg3lwm4mwetndllzc05zdg0ltk3mdywnwm1ode0yw

The comments in the code actually describe what’s taking place. After the SparkContext has been created, a textfile called ‘alice.txt’ is read using the ‘textFile’ method of the context and it’s contents piped to stdout using RDD’s ‘pipe’ function. In this particular case, the receiving end is an R script, but it may be any script or application which is able to read data from stdin and write to stdout.

The script or application that is called must be executable. On a Linux box that means it’s permissions are at least 500, to be executable and readible by the user that is executing the Scala code. If it’s not, an error will be raised:

java.io.IOException: Cannot run program “./wrapper.R”: error=13, Permission denied

After the R script returns it’s data the ‘classic’ wordcount is done. A few tricks have been added, for instance sorting the set on count (which implies doing a dual swap because there is nog sortByValue at the moment) and filtering empty words. Finally, the top 100 words are taken and output to the console.

Note that the above construction creates a scalable solution both from the Spark as well as the R perspective. As long as the R scripts does not need ALL the data from the input file, execution of the R script will scale linearly (within the boundaries of your cluster, that is). The Spark context will collect all data together and reduce the data to it’s final state.

The R code is made of two scripts,explained below.

File : wrapper.R

aaeaaqaaaaaaaayhaaaajgi5mjm4yzdhlthmnjatndrhny04mzlkltzmmwjlzjc3njbjma

Again, the code above is pretty self-explanatory. First, a check is done if the requested function (sanitize) exists and, if not, the script containing it (sanitize.R ) is included. Stdin is opened, followed by reading it one line at the time (note the ‘n=1’ parameter to readLines). The line is passed to the sanitize function and it’s output written to stdout. Finally, stdin is closed. No return value is needed. The sha-bang (#!) at the head of the R script tells the system that this file is a set of commands to be fed to the command interpreter indicated. In this case, R. Without this sha-bang line, execution will most likely fail.

As said, the wrapper is called using two parameters, the library file to be loaded and the function to be called inside that library file, hereby creating a very versatile wrapper (not only for Spark) which can read from stdin, perform some logic, and write to stdout. A small performance penalty may be hit using the ‘eval’ construct. That has not been validated. Some example output from calling the wrapper standalone follows:

aaeaaqaaaaaaaacgaaaajde2mwvmnjrkltdkntmtngfjnc04zjzjlte5mmnjotu3owrlzq

The wrapper may be extended to be able to include multiple libraries and function flows, but that exercise is left to the reader. The actual ‘business logic’ is done in sanitize.R. Needed libraries are loaded and the input line is sanitized according to our needs. The beauty of this is, this function may be part of a much bigger R library which has no clue that it will be called from a Spark context one day.

File : sanitize.R

aaeaaqaaaaaaaas_aaaajdfioti5zju0lwzjzwytngi3ny05ywewltc5nwjkn2rimmi4yw

Succesfully running the Spark/Scala job will result in something like the following screen, which can be found at http://localhost:4040 (or at the location of your Spark Driver host):

aaeaaqaaaaaaaatyaaaajgeyywzhzdm1lwjlntmtngrjys1hzwy5ltvmzgniymeynmuxmg

Summary

As said, leveraging the power of R from Spark is a breeze. It took me a little while to get it up and running, but the result is a scalable, generic solution to enable data scientist to have their R code being used from a Spark Context.

How to connect to your Kerberos secured cluster using FoxyProxy, SSH and a Dynamic Proxy

Introduction

So here you find yourself with a shiny new smartcard which enables you to log in to the client’s network. The IT guys did a great job setting up your corporate laptop, you happen to be a local admin on it, and you are dying to start the installation of your preferred Hadoop distribution and run al those fancy Spark jobs the data scientist made. It’s still your first day, so naively you ask one of your co-workers the ip address and credentials for one of the vm’s the (other) IT guys have prepared for you.

BUMP…

See that smartcard you got? That’s your ticket to cluster heaven. Never forget to bring it with you, or your day will be worthless. Not also does it unlock your laptop, the security infrastructure this company has laid down forbids local users on servers and relies on the use of Kerberos tickets. I’m by far an expert on that matter, so I will stick to saying that this was working without a flaw (but it’s important later on). Ah, another challenge… Root access on the vm’s? No way, José.

So what about the ip addresses of the cluster’s servers, you ask. Wait a minute here, sparky. You find yourself at a pretty large company with all kinds of sensitive customer information. The cluster’s vm’s are only to be accessed using a stepping server, giving IT the possibility to lock you out of ALL vm’s at once if you misbehave. Slowly you get the feeling that one day to install your cluster will possibly not suffice. Because of the lack of root privileges and the minimum amount of needed cluster components, I decided to do a plain Apache Hadoop install in stead of that wonderful all-open-source distribution with the light green logo.

IT provided me with a dedicated user called ‘cluster’ which I was able to sudo to. This user became the owner of all software beneath the /opt/cluster directory. For the sake if simplicity I assume in this article that this user starts all processes, in stead of dedicated users for HDFS, YARN, Elasticsearch and Kibana (which is preferred). The installation of the components is out of scope of this article.

Let’s go

I decided that for the moment I would be happy if all frontend web ui’s (f.i. HDFS, YARN, Spark Drivers and Kibana) would be accessible from my laptop. During an earlier project I struggled to submit Spark jobs using SSH forwarding, so I set that wish aside. Enough hurdles ahead, but I never fear to take them. So let me first introduce the global picture.

aaeaaqaaaaaaaalhaaaajde0otrjnzlhlwuyngytnguzzc05njqzlwvkythiyjkwmjfinq

 

There is a office LAN in the 192.168.x.x range with a DNS server that serves requests for that LAN only. You are authenticated on your (Windows) workstation using a smartcard which will give you a Kerberos ticket via NTLM to access servers on the complete network. Access to the cluster LAN in the 10.0.0.x range is via the dual headed Stepping Server. Your workstation holds Firefox, PuTTY and the FoxyProxy plugin. IT has granted you access to the Stepping Server and the nodes of the cluster. Inter-cluster-node fire-walling is turned OFF for simplicity. (There are already two in front of the cluster, right?)

So the basic challenges listed:

  1. Securely access the web ui’s on the cluster nodes from the workstation
  2. Access the nodes by hostname
  3. Respect the Kerberos infrastructure
  4. Prevent exposure of those ui’s and data to other users
  5. Don’t do illegal things. I repeat: do not do illegal things

Before you continue to do this yourself, a few assumptions are made

  1. You are able to edit the hosts file on your machine (which needs root in Unix-like systems and administrator rights on Windows) For another client a actually wrote a Java program which is able to use a non-root private DNS server, which I may write another article on in the future
  2. You are able to install Firefox plugins (if bullet 1 above is not possible, there might by Firefox plugins that can overrule or add DNS records, I have not taken a look into that)
  3. On the stepping server, SSH forwarding is allowed. This may seem trivial, but some IT departments might disable this feature to actually prevent you from doing what I am about to show to you.
  4. You are comfortable with a state of mind that is summarised as ‘if it’s possible, it’s allowed’. But remember bullet 5 above!

DNS resolving

This is actually one of the easiest bits. Just edit your hosts file (c:\windows\system32\drivers\etc\hosts on Windows or /etc/hosts on most Unix-like systems) to include

10 . 0 . 0 . 151 node-1
10 . 0 . 0 . 152 node-2
...
10 . 0 . 0 . 155 node-5

You will now be able to do a nslookup on the hostnames of the cluster nodes. Bear in mind that a ping will also resolve the hostname but will not reply successfully because of the lack of proper routing to the cluster nodes.

Install and configure FoxyProxy

Use the Firefox extensions installer or go to https://getfoxyproxy.org/ to download the extension. I used the Basic version, which has enough functionality to support this use case. You may configure it like the picture below:

aaeaaqaaaaaaaae2aaaajgq1zmzlztuzlti1ntetndbjns04zjcylwjmztiymjnkmznjzq

 

The important bits here are to make sure you check the ‘SOCKS proxy?’ box and ‘SOCKS v5’ bullet. The host or IP address is ‘localhost’ and the port set to any underprivileged one, like 6666 in my case. Remember this number because it is needed when the connection to the cluster is made. FoxyProxy will look like below. Please note that once you enable FoxyProxy in this scenario, you are not able to browse other websites, it will be a dedicated connection to the cluster! So enable and disable FoxyProxy to your needs.

aaeaaqaaaaaaaadkaaaajdzlzjaxzjkzlwi0ndytndi2zs05otk1lwi2m2jkyzfmyzq1nw

Connect to the cluster

Connecting to the cluster involves two steps: connecting to the Stepping Server followed by connecting to any one of the cluster-nodes.

While connecting to the Stepping Server it is important that a few configurations are made:

1 – In PuTTY you will need ‘Allow GSSAPI credential delegation’ to be able to take the Kerberos ticket with you all the way down to the cluster-nodes.

2 – In PuTTY’s tunnel configuration add a local forward from the FoxyProxy port (6666) to localhost:6666. ‘Localhost’ in this case refers to the Stepping Server, not your workstation!

3 – Do NOT check ‘Local ports accept connections from other hosts’. If you do that and you have no firewall installed, other users are able to use your workstation as a gateway to the cluster!

Setting GSSAPI options

aaeaaqaaaaaaaaebaaaajdqynjg0zmjjltq5nwqtndjiyy05y2m0lteyyjfkmzqwnmq1zg

Setting up port forwarding

aaeaaqaaaaaaaai9aaaajdzly2q5yja3ltcxm2ytndm1ns04oty5ltk2y2rjzmnhzwuxna

After a successful connection to the Stepping Server you are now ready to continue to on of the cluster nodes. On our Linux host it proved to be enough to enter

ssh -D 6666 node-1

The magic is hidden in the -D command-line switch. This starts a dynamic proxy on the connection which is able to forward requests on ANY node/port combination. FoxyProxy sends it’s requests via the PuTTY connection and the Linux ssh connection above to the corresponding server and passes the results back again. I had to keep this connection alive by using a command like ‘ping -i 10 localhost’ because it looked like IT disabled connection keep-alives in SSHD. The chain now looks like this:

aaeaaqaaaaaaaahcaaaajdaxotyyotfklwi0zdgtndc0zs1iztm2ltziowixnzcwmddjzg

Voilà

The end result should be that you are now able to browse to the various web ui’s of your cluster:

1 – Kibana : http://node-1:5601/app/kibana#/dashboard/cluster

2 – HUE : http://node-2:8888/jobbrowser/

3 – HDFS : http://node-3:50070/explorer.html#/

4 – YARN : http://node-4:8088/cluster/nodes

Note the use of the hostnames of the cluster and the ability to actually use ANY port on the cluster nodes. This HIGHLY differs from a scenario where you do local port forwarding. When browsing YARN, for instance, you are now able to connect to any node, to any port/service of YARN like the 8042 node HTTP port, making the cluster easily browsable.

Conclusion

At first glance, I just shook my head and wondered why things could be overcomplicated and secured so/too much. But as with any client, there is a reason for most aspects of security. In the end I managed to satisfy the need to connect to the web ui’s of the cluster in a legal, secure and maintainable way.

After finding this one out, other users of the cluster were able to connect as well and do their magic. Next stop, of course: submitting a Spark job using the proxy!

Heading picture re-used from : http://neurosciencenews.com/neuron-cluster-mapping-neuroscience-1807