Friday, 3 June 2016

How to write Spark jobs in Java for Spark Job Server

In the previous post, we learnt about setting up Spark job server, and running the spark jobs. So far, we have used Scala programs to run on job server. Now we’ll see, how to write the Spark jobs in java to run on job server.

As in Scala, job must implement the SparkJob trait.  So the job looks like this:

object SampleJob  extends SparkJob {
    override def runJob(sc:SparkContext, jobConfig: Config): Any = ???
    override def validate(sc:SparkContext, config: Config): SparkJobValidation = ???

What these methods are:
  • runJob method contains the implementation of the Job. The SparkContext is managed by the JobServer and will be provided to the job through this method. This relieves the developer from the boiler-plate configuration management that comes with the creation of a Spark job and allows the Job Server to manage and re-use contexts.
  • validate method allows for an initial validation of the context and any provided configuration. If the context and configuration are OK to run the job, returning spark.jobserver.SparkJobValid will let the job execute, otherwise returning spark.jobserver.SparkJobInvalid(reason) prevents the job from running and provides means to convey the reason of failure. In this case, the call immediately returns an HTTP/1.1 400 Bad Request status code.
    validate helps preventing running jobs that will eventually fail due to missing or wrong configuration and save both time and resources.

In Java, we need to extend JavaSparkJob class. It has following methods which will be overridden in the program:
  • runJob(jsc: JavaSparkContext, jobConfig: Config)
  • validate(sc: SparkContext, config: Config)
  • invalidate(jsc: JavaSparkContext, config: Config)

JavaSparkJob class is available in job-server-api package. Build the job-server-api source code and add this jar to your project.  Add spark and other required dependencies in your pom.xml. 

Let’s start with the basic WordCount example:

package spark.jobserver;

import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;

import org.apache.commons.lang.StringUtils;
import org.apache.spark.SparkContext;
import scala.Tuple2;
import spark.jobserver.JavaSparkJob;
import spark.jobserver.SparkJobInvalid;
import spark.jobserver.SparkJobValid$;
import spark.jobserver.SparkJobValidation;
import com.typesafe.config.Config;
public class Wordcount extends JavaSparkJob implements Serializable { 
       private static final long serialVersionUID = 1L;
private static final Pattern SPACE = Pattern.compile(" ");
static String fileName = StringUtils.EMPTY;

       public Object runJob(JavaSparkContext jsc, Config config) {
              try {
                     JavaRDD<String> lines = jsc.textFile(
                                  config.getString("input.filename"), 1);
                     JavaRDD<String> words = lines
                                  .flatMap(new FlatMapFunction<String, String>() {
                                         public Iterable<String> call(String s) {
                                                return Arrays.asList(SPACE.split(s));
                     JavaPairRDD<String, Integer> counts = words.mapToPair(
                                  new PairFunction<String, String, Integer>() {
                                         public Tuple2<String, Integer> call(String s) {
                                                return new Tuple2<String, Integer>(s, 1);
                                  }).reduceByKey(new Function2<Integer, Integer, Integer>() {
                           public Integer call(Integer i1, Integer i2) {
                                  return i1 + i2;
                     List<Tuple2<String, Integer>> output = counts.collect();
                     return output;
              } catch (Exception e) {
                     return null;

       public SparkJobValidation validate(SparkContext sc, Config config) {
              String filename = config.getString("input.filename");
              if (!filename.isEmpty()) {
                     return SparkJobValid$.MODULE$;
              } else {
                     return new SparkJobInvalid(
                                  "Input paramerter is missing. Please mention the filename");

       public String invalidate(JavaSparkContext jsc, Config config) {
              return null;

Next step is : compile the code and build the jar. Then upload it to the Job server.

So your Spark job is ready to run on JobServer....!!!  

Thursday, 26 May 2016

How to run Spark Job server and spark jobs

Spark Job server provides a RESTful interface for submission and management of Spark jobs, jars and job contexts. It facilitates sharing of jobs and RDD data in a single context. It can run standalone job as well. Job History and configuration is persisted.

Few of the features are listed here:
    ·       Simple REST interface
    ·       Separate JVM per SparkContext for isolation
    ·       Separate Jar uploading step for faster job execution
    ·       Supports low-latency jobs via long running job contexts
    ·       Asynchronous and synchronous job API.
    ·       Kill running job via stop context and delete job.
    ·     Named Objects (RDD/Dataframes) to cache and retrieve by name, improving object sharing and reuse among jobs.
    ·      Preliminary support for Java.

Setup Spark Job Server:

Sbt Version
Spark version

To setup the server, pre-requisites are:

               ·      64 bit Operating system
        ·      Java 8
        ·      sbt
        ·      curl
        ·      git
        ·      Spark

Please make sure sbt version should be compatible to spark version.  Here is the list of compatible versions.

You can install Java8 from here.

For sbt, you can refer sbt official site .

  For CentOS users:
Yum install curl
Yum install git
Yum install sbt

   For Ubuntu users:
sudo apt-get install curl
sudo apt-get install git
sudo apt-get install sbt

Download the Spark package and setup it. Windows User can refer the link How to setup Spark on windows. Once spark setup is done, run the spark master and worker daemon.
[xuser@machine123 spark-1.6.1-bin-hadoop2.6]$ sbin/

Now clone the spark job server repo on your local.

[xuser@machine123 ~]$ git clone

Run sbt command in the cloned repo. It will build the project and give the sbt shell. If you are running sbt command first time,it will take much time. Then type re-start  to start the server on sbt shell:

[xuser@machine123 spark-jobserver]$ sbt
[info] Loading project definition from /home/xuser/softwares/spark-jobserver/project
Missing bintray credentials /home/xuser/.bintray/.credentials. Some bintray features depend on this.
Missing bintray credentials /home/xuser/.bintray/.credentials. Some bintray features depend on this.
Missing bintray credentials /home/xuser/.bintray/.credentials. Some bintray features depend on this.
Missing bintray credentials /home/xuser/.bintray/.credentials. Some bintray features depend on this.
[info] Set current project to root (in build file:/home/xuser/spark-jobserver/)
> re-start

If you want to use any specific configuration to start the server, You can also specify JVM parameters after "---". Including all the options looks like this:
 > re-start config/application.conf  --- -Xmx512m

It will start the spark job server on http://localhost:8090 url. You can see all daemons using jps.

Sample SparkJobs Walkthrough:

Spark-job-server has some sample Spark jobs written in Scala.To package the test jar, run command. 
[xuser@machine123 spark-jobserver]$ sbt job-server-tests/package

It will give you a jar in job-server-tests/target/scala-2.10 directory.Now upload the jar to the server:

[xuser@machine123 spark-jobserver]$ curl --data-binary @job-server-tests/target/scala-2.10/job-server-tests_2.10-0.7.0-SNAPSHOT.jar localhost:8090/jars/test

This jar is uploaded as app test. You can view same information on webUI.
We can run the jobs in two mode: Transient Context mode, Persistent Content mode.

Unrelated jobs -with Transient Context:

In this mode, each job will create its own spark context.  Let's submit the WordCount job on the server:
[xuser@machine123 ~]$ curl -d "input.string = a b c a b see" 'localhost:8090/jobs?appName=test&classPath=spark.jobserver.WordCountExample'
  "status": "STARTED",
  "result": {
    "jobId": "5453779a-f004-45fc-a11d-a39dae0f9bf4",
    "context": "b7ea0eb5-spark.jobserver.WordCountExample"

Persistent Context mode- Related Jobs:

In this mode, jobs can use the existing Spark context. Create a spark context named ‘test-context’:
[xuser@machine123 ~]$ curl -d "" 'localhost:8090/contexts/test-context?num-cpu-cores=4&memory-per-node=512m'

To see the existing contexts:
[xuser@machine123 ~]$ curl localhost:8090/contexts

To run the job in existing context:
[xuser@machine123 ~]$ curl -d "input.string = a b c a b see" 'localhost:8090/jobs?appName=test&classPath=spark.jobserver.WordCountExample&context=test-context&sync=true'
  "result": {
    "a": 2,
    "b": 2,
    "c": 1,
    "see": 1

You can run the job without any input argument passing -d "":

[xuser@machine123 ~]$ curl -d "" 'localhost:8090/jobs?appName=test&classPath=spark.jobserver.LongPiJob&context=test-context&sync=true'
  "result": 3.1403460207612457

You can check the job status by giving job ID in following command:

[xuser@machine123 ~]$ curl localhost:8090/jobs/<jobID>

You can see the all the running, completed, failed jobs on Job Server UI.  Now you are ready to write your jobs to run of SparkJobServer..!!!

Friday, 15 April 2016

Frequent Issues occurred during Spark Development

While coding,we face many issues,be it compilation or execution. So I tried to collate some frequently faced issues for Spark development here.

  •    When we run spark on windows, sometimes following error is displayed:
Caused by: java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS should be writable. Current permissions are: rwxrwxr-x
at org.apache.hadoop.hive.ql.session.SessionState.createRootHDFSDir(
at org.apache.hadoop.hive.ql.session.SessionState.createSessionDirs(
at org.apache.hadoop.hive.ql.session.SessionState.start(
... 7 more

You need to give 777 permission to this directory. 
Lets say, if /tmp/hive is present in your D: drive, run following command:

D:\winutils\bin\winutils.exe chmod 777 D:\tmp\hive
For complete installation steps, you can refer previous post.

  •    How to launch Master and worker on windows manually?
Open command prompt and go to %SPARK_HOME%/bin folder.  Run the following commands:

spark-class org.apache.spark.deploy.master.Master     <= for master node
spark-class org.apache.spark.deploy.worker.Worker spark://masternode:7077  <= for worker node

  •     How to get rid of “A master url is not set for configuration” error?
From command line:

Set –Dspark.master=spark://hostname:7077 as a JVM parameter

From code, use SparkConf.setMaster() method.
SparkConf conf = new SparkConf().setAppName("App_Name").setMaster("spark://hostname:7077);

  •     How to solve following “System memory, Please use larger heap” size error?
Exception in thread "main" java.lang.IllegalArgumentException: System memory 259522560 must be at least 4.718592E8. Please use a larger heap size.
at org.apache.spark.memory.UnifiedMemoryManager$.apply(UnifiedMemoryManager.scala:175)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:354)
       at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:193)
       at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:288)
       at org.apache.spark.SparkContext.<init>(SparkContext.scala:457)
       at com.spark.example.SimpleApp.main(

Add -Xmx1024m -Xms512m in VM arguments

             Stay tuned for further updates..!!!