Spark in local mode

The easiest way to try out Apache Spark from Python on SherlockML is in local mode. The entire processing is done on a single server. You thus still benefit from parallelisation across all the cores in your server, but not across several servers.

Spark runs on the Java virtual machine. It exposes a Python, R and Scala interface. You can interact with all these interfaces on SherlockML, but the installation procedure differs slightly.

Using PySpark

To use PySpark on SherlockML, create a custom environment to install PySpark. Your custom environment should include:

  • openjdk-8-jdk in the system section;
  • pyspark in the Python section, under pip.
../../_images/pyspark-env.png

Start a new Jupyter server with this environment. Unfortunately, PySpark does not play well with Anaconda environments. You therefore need to set environment variables telling Spark which Python executable to use. Add these lines to the top of your notebook:

import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

You can now import pyspark and create a Spark context:

import pyspark

number_cores = 8
memory_gb = 24
conf = (
    pyspark.SparkConf()
        .setMaster('local[{}]'.format(number_cores))
        .set('spark.driver.memory', '{}g'.format(memory_gb))
)
sc = pyspark.SparkContext(conf=conf)

pyspark does not support restarting the Spark context, so if you need to change the settings for your cluster, you will need to restart the Jupyter kernel.

Now that we have instantiated a Spark context, we can use it to run calculations:

rdd = sc.parallelize([1, 4, 9])
sum_squares = rdd.map(lambda elem: float(elem)**2).sum()

This example hard-codes the number of threads and the memory. You may want to set these dynamically based on the size of the server. You can use the NUM_CPUS and AVAILABLE_MEMORY_MB environment variables to determine the size of the server the notebook is currently running on:

number_cores = int(os.environ['NUM_CPUS'])
memory_gb = int(os.environ['AVAILABLE_MEMORY_MB']) // 1024

Using the Spark shell and Scala APIs

To interact with Spark from Scala, create a new server (of any type) and create an environment with openjdk-8-jdk in the system section and the following in the scripts section:

SPARK_VERSION=2.3.1
HADOOP_VERSION=2.7
SPARK_HOME=/tmp/spark

cd /tmp
wget http://apache.mirror.anlx.net/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz

dist_name=spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}
archive_name=$dist_name.tgz
tar xzf $archive_name
rm $archive_name
mv $dist_name $SPARK_HOME

cat <<-EOF > /etc/sherlockml_environment.d/spark.sh
export SPARK_HOME=/tmp/spark
export PATH=$SPARK_HOME/bin:'$PATH'

alias spark-shell="spark-shell --master=local[$NUM_CPUS] --driver-memory ${AVAILABLE_MEMORY_MB}M"
alias spark-submit="spark-submit --master=local[$NUM_CPUS] --driver-memory ${AVAILABLE_MEMORY_MB}M"
EOF

Apply this environment to a Jupyter or to an RStudio server. If you now open a new terminal, you can run spark-shell to open a Spark shell.

../../_images/spark-shell.png

While the Spark shell allows for rapid prototyping and iteration, it is not suitable for more significant Scala programs. The normal route for developing these is to create a Scala application, package it as a jar and run it with spark-submit. To write a Scala application, you will need to install sbt. You can install sbt reproducibly by creating an environment with the following commands in the scripts section:

wget -O /tmp/sbt https://raw.githubusercontent.com/paulp/sbt-extras/master/sbt
sudo mv /tmp/sbt /usr/local/bin/sbt
chmod a+x /usr/local/bin/sbt

For an overview of a modern Scala and Spark setup that works well on SherlockML, we recommend this blog post.

In particular, the Spark session should be instantiated as follows:

import org.apache.spark.sql.SparkSession

trait SparkSessionProvider {

  lazy val spark: SparkSession = {
    val numberCpus = sys.env.getOrElse("NUM_CPUS", "*")
    val availableMemoryMaybe = sys.env.get("AVAILABLE_MEMORY_MB")

    val builder = SparkSession
      .builder()
      .master(s"local[$numberCpus]")
      .appName("spark-on-sherlock")

    availableMemoryMaybe.foreach { availableMemory =>
      builder.config("spark.driver.memory", s"{availableMemory}m")
    }
    builder.getOrCreate()
  }

}

You can then mix or instantiate this trait into your application:

object App extends Application with SparkSessionProvider {
  val rdd = spark.sparkContext.parallelize(List(1, 4, 9))
  println(rdd.map { _ * 2 }.sum)
}

Once you have an application ready, you can package it by running sbt package. This creates a jar in the target directory. You can run your application using the local scheduler with spark-submit:

spark-submit target/scala-2.11/your-app_2.11-0.1-SNAPSHOT.jar

Spark and R

To use SparkR from an RStudio server in SherlockML, create the environment that installs Spark outlined in the previous section. After you have applied that environment to an RStudio server, you should be able to access Spark by executing the following lines in your R session:

Sys.setenv(SPARK_HOME = "/tmp/spark")
master <- paste("local[", Sys.getenv("NUM_CPUS"), "]", sep="")
memory <- paste(Sys.getenv("AVAILABLE_MEMORY_MB"), "m", sep="")
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.session(master = master, sparkConfig = list(spark.driver.memory = memory))

This will start a SparkR session. You can now try out examples from the SparkR documentation:

df <- as.DataFrame(faithful)
head(df)

Accessing the Spark UI

Spark runs a dashboard that gives information about jobs which are currently executing. To access this dashboard, you can use the command line client sml from your local computer to open a tunnel to the server:

sml shell <project> <server> -L 4040:localhost:4040

You will now be able to see the Spark UI in your browser at http://localhost:4040.

../../_images/spark-ui.png