Running your R analysis on a Spark cluster

Challenge

For one of our clients we are in the process of designing a versatile data platform that can be used, among others, to run R analysis on. In this post I’ll summarise the actions I did to get R running using RStudio 0.99.473, R 3.2.2 and Spark 1.4.0 hereby leveraging the potential of a Spark cluster.

Prerequisites

For the R analysis I used my very recent (2015) MacBook Pro with 16GB of memory and Yosemite installed. Although not tested, to the best of my knowledge you should be able to get it up and running on a recent Linux distribution and Windows as well. You will need a copy of the Spark binaries to be present on your machine. I have it installed (unzipped) in /Users/rutger/Development/spark-1.4.0-bin-hadoop2.6. You can download a copy (use a pre-built version) from

http://spark.apache.org/downloads.html

I’m aware that at the time of writing Spark 1.4.1 has been released but I used the 1.4.0 release to match our current dev-cluster’s version.

Install R

Download and install the R distribution for Mac OS X from https://cran.r-project.org/bin/macosx/ and install using ‘the’ regular method.

Install RStudio

Download and install RStudio for your OS from https://www.rstudio.com/products/rstudio/download/

Working Spark cluster

Obviously, to actually run SparkR jobs, you will be needing a Spark cluster. In my case, I used a Intel NUC-based 3-node ‘take it with you’ cluster provisioned with Apache Hadoop 2.7.0 and Spark 1.4.0. I have one Spark master called nucluster3 and three slaves called nucluster[1-3] (the master is also a slave).

I will not go into detail in this post on installing a development Spark cluster apart from stating that it should not give you any headache getting it up and running as long as you only edit SPARK_HOME/conf/spark-env.sh (add JAVA_HOME variable) and SPARK_HOME/conf/slaves (list all slave hostnames) and start the cluster on the master node (nucluster3 in our case) by using start-master.sh and start-slaves.sh. You should than be greeted on http://nucluster3:8080 by:

aaeaaqaaaaaaaandaaaajgrmytdlyzqzlwi0ntutnguxzs04ntrhltzjztcxytmxyjiwma

Analyze!

All’s set and done from the software perspective, so let’s head over to the setup and the actual running of R statements.

Setup

We need a file to work on. For this post, I used a file with zipcodes called postcode.csv. You may upload it to your HDFS cluster with HUE, but I used the hardcore way by first copying it to one of the nodes and from there, used the hadoop fs command to put it on HDFS:

  • scp postcode.csv root@nucluster1:
  • hadoop fs -copyFromLocal postcode.csv /user/root/
  • head -2 postcode.csv

“id”,”postcode”,”postcode_id”,”pnum”,”pchar”,”minnumber”,”maxnumber”,”numbertype”,”street”,”city”,”city_id”,”municipality”,”municipality_id”,”province”,”province_code”,”lat”,”lon”,”rd_x”,”rd_y”,”location_detail”,”changed_date”
395614,”7940XX”,79408888,7940,”XX”,4,12,”mixed”,”Troelstraplein”,”Meppel”,1082,”Meppel”,119,”Drenthe”,”DR”,”52.7047653217626″,”6.1977201775604″,”209781.52077777777777777778″,”524458.25733333333333333333″,”postcode”,”2014-04-10 13:20:28″

I did not bother to strip or use the header in this case. It’s just there. So, you’re now ready to start your analysis! Fire up RStudio, you will be greeted by the familiar R prompt, accompanied by some warnings you may safely ignore for now.

aaeaaqaaaaaaaamlaaaajgqym2m3ndbjlwu1mwetndbkni1ioti4lwu3nmixmdiwyzdizq

Now a few things have to be set up. R needs to know where it can find the SparkR library. In my case I use the following command:

.libPaths(c(.libPaths(), ‘/Users/rutger/Development/spark-1.4.0-bin-hadoop2.6/R/lib’))

No response will be given. Your R analysis will actually be an application running on the Spark cluster submitted by the sparkr-shell. So R(Studio) should know where it can find this command. Because a CSV file is to be read, the spark-csv package is also loaded:

Sys.setenv(PATH = paste(Sys.getenv(c(‘PATH’)), ‘/Users/rutger/Development/spark-1.4.0-bin-hadoop2.6/bin’, sep=’:’))

Sys.setenv(‘SPARKR_SUBMIT_ARGS’='”–packages” “com.databricks:spark-csv_2.11:1.1.0” “sparkr-shell”‘)

Again, these commands should not give you any response unless you make a mistake. You are now ready to load the SparkR library:

library(SparkR)

This will give you some feedback, something like

Attaching package: ‘SparkR’

Followed by some remarks about masked packages. They are harmless in our case:

aaeaaqaaaaaaaauraaaajdi5mwrlzguyltvlotktngnhoc04zdixltqyy2jhyjk3mji3oa

The first thing we’ll have to do is create two contexts; the Spark context and the SparkRSQL context:

sc <- sparkR.init(master = “spark://nucluster3:7077″, appName=”SparkR”)

sqlContext <- sparkRSQL.init(sc)

The first command produces a fair amount of output but should complete without warnings, the second one should finish silently. Remember the postcodes I put on HDFS? We are now ready to load them into a dataframe. nucluster1 is the cluster’s namenode, by the way.

postcodes <- read.df(sqlContext, “hdfs://nucluster1:9000/user/root/postcode.csv”, source = “com.databricks.spark.csv”)

This command should finish successfully again with a fair amount of output. You are now ready to do your R magic, but in the examples below I will stick to very simple ones.

Plain R commands

head(postcodes)

aaeaaqaaaaaaaakoaaaajdvly2y4zjgyltiwotutngfmny05ndexltk0yzfkowiwywrmzq

count(postcodes)

aaeaaqaaaaaaaaskaaaajdiwyta3ndeyltnkotatnguxzc1hzjaylwy4mji1mzm0mdy4za

A slightly more difficult example is to count by one of the fields. In this case only the first few lines are returned

head(summarize(groupBy(postcodes, postcodes$C3), count = n(postcodes$C3)))

aaeaaqaaaaaaaaxdaaaajgm2ndcxzdq5lthkntktngvims1injq0lwm3zmfhothlzmu1ng

Please note that without the ‘head’ command, nothing will actually be executed, only the return type will be displayed:

DataFrame[C3:string, count:bigint] 

SQL commands

One of the powerful features of this setup is the ability to use SQL in your commands. For this to work you’ll first have to create a temporary table definition.

registerTempTable(postcodes, “postcodes”)

amsterdam <- sql(sqlContext, “SELECT C1,C8 FROM postcodes WHERE C9 = ‘Amsterdam'”)
head(amsterdam)

It may sound boring, but the first command will stay silent and the second one will return the query result (All Amsterdam zipcodes):

aaeaaqaaaaaaaaxdaaaajgm2ndcxzdq5lthkntktngvims1injq0lwm3zmfhothlzmu1ng

Of course, you are able to write much more cpu- and disk intensive queries. This totally bogus one put’s the cluster to work for a while:

count(sql(sqlContext, “SELECT p1.C1,p1.C8 FROM postcodes p1, postcodes p2 where p1.C1<p2.C1”))

Note that all activity is done on the cluster, not your own machine! Your machine is the job driver and only serves as such. But you can view a lot of information about your job: take a look at http://localhost:4040/jobs/ and find your own running one.

aaeaaqaaaaaaaaqfaaaajdbkodhlnjdilwmynzatngvkmi1hywqwltnjmduxntlhmzu1oq

In the screenshot above you will see the ‘sort-of-cartesian-product’ I created hogging the system with a huge input set. Feel free to look around jobs, stages (note the beautiful DAG Visualization Spark provides) and storage information. You may also check out the Spark Master and note your running application named SparkR at http://nucluster3:8080/

aaeaaqaaaaaaaaahaaaajgy2nzk3mzyxltmwntytnge4zi05mta2ltnkmwfjytzknwvioq

I’m done, so I quit RStudio by using the well known q command. You may choose wether to save your workspace or not!

q()

Conclusions and final remarks

Analysing your data using familiar R while at the same time leveraging the power of a Spark cluster is a breeze once you get the hang of it. I mainly focussed on ‘getting it up and running‘ and not on ‘beautiful R apps‘. Go and knock yourself out with libraries like Shiny (http://shiny.rstudio.com/)!

Remarks

Of course you always find out after writing a blog post that you have missed something trivial. In case of loading the csv, spark-csv will give you column names for free if you load using:

postcodes <- read.df(sqlContext, “hdfs://nucluster1:9000/user/root/postcode.csv”, source = “com.databricks.spark.csv”, header = “true”)

Apart from that, some minor annoyances popped up:

  • I noticed on the nodes of the cluster that only one core was used in stead of the four that are in. I must have overlooked something during configuration and will reply or update when I found the solution.
  • Apart from that, I really like Spark to be less chatty (get rid of the INFO messages) and will try to create a log4j.properties file for that.
Posted in Uncategorized.