Friday, 2 December 2016

Agile In Data Science and Analytics

Is Agile an effective way to herd the data scientists into the production pen or just an excuse to avoid documentation and planning? What components in Agile do we recommend for Analytics PoCs and full-fledged projects?  

So let's discuss about it.

Every organization starts with the ambitions of business and further creates roadmap of technology, people and investment needed to unlock that business potential. To unlock the objective, we go through the phase of initial discussions, understand the requirements, technical workloads like – “I need a Linux server, database, recommendation engine, tools to handle the big data...”
Technical requirements are quite straightforward most of the times, but analytical activity is quite vague and there is uncertainty as we don’t know what can be the best approach to solve the problem, the amount of time to get the best solution.
If we develop it in traditional waterfall model approach, how it will go:

Developing a traditional analytics project:
Let’s say we need to build a recommendation engine for users. Use case seems pretty easy. A traditional analytics team would go endlessly building an engine by which will use the entire user data, run CBR(content based recommendation) or CF(Collaborative Filtering), and after a long effort possibly providing a powerful recommendation engine which can provide near real time recommendation to the users.  In the entire hassle free development, there was no interaction with business people.

Challenges in Traditional Approach
We developed the entire engine but are not sure about the correctness of the model. What if, we used wrong data, or wrong variables? We don’t even know if our data exploration and insights were correct? Oops, assume stakeholders reject it and give the feedback for existing model, as it didn’t meet the expectations. Let’s rework now. Wouldn’t it be awesome if we could have used Agile before?
Agile approach would have played a great role here, rapid and iterative product development and getting rapid customer feedback cycles.
Now our problem and opportunity come at the interaction of two trends: how we can incorporate data science and analytics, which is applied research and needs exhaustive effort on an unpredictable timeline, into the agile application? How can analytics applications do better than traditional waterfall approach model? How can we craft application for unknown, evolving data models? 



What is Agile

Agile Software development focuses on the four values(from Agile Manifesto):
  • Individuals and Interactions over process and tools
  • Working software over comprehensive documentation
  • Customer collaboration over contract negotiation
  • Responding to change over following a plan
Engineering Products and Engineering data science, both are different as
data science is less deterministic. It needs lots of creativity and though
process to derive the best approach.  Agile helps to manage those in the
cycles, where team explore, learn something about the data, share the
insights with the business team/stakeholders, align the needs and approach, take the feedback and start in the same direction.


How Agile Analytics approach unfolds

The main difference from traditional to Agile analytics approach is using iterative process, sharing the learnings with stakeholders, getting rapid feedbacks and learn with new business questions and describing datasets.
A team of Data scientists, Business analysts and other SMEs work with the stakeholders to discuss each question until they have:
  • The clear and as narrow as possible scope
  • Potential datasets and variables to be used for analysis
  • Questions to be answered
Data scientists provide the insights on the nature and quality of the dataset, hone the questions, hypothesis, and provide a concrete list of algorithms that can be viable to answer those questions. These outputs turn into Proof of concepts or prototypes of an analytics solution.
It is a voyoge of discovery. The below structure known as data-value pyramid explains that.


Every project needs an investment. And building Analytics solution is generally costlier than developing application software. As each business silo can point to a different domain or different data source. There is high risk in the investment.
Agile Analytics helps to minimize the risk of pursuing the blind alleys. With the iterative approach, cyclic interaction with business team, it mitigates the risk of implementing models which turns out to be garbage.

References:

Friday, 3 June 2016

How to write Spark jobs in Java for Spark Job Server

In the previous post, we learnt about setting up Spark job server, and running the spark jobs. So far, we have used Scala programs to run on job server. Now we’ll see, how to write the Spark jobs in java to run on job server.

As in Scala, job must implement the SparkJob trait.  So the job looks like this:

object SampleJob  extends SparkJob {
    override def runJob(sc:SparkContext, jobConfig: Config): Any = ???
    override def validate(sc:SparkContext, config: Config): SparkJobValidation = ???
}

What these methods are:
  • runJob method contains the implementation of the Job. The SparkContext is managed by the JobServer and will be provided to the job through this method. This relieves the developer from the boiler-plate configuration management that comes with the creation of a Spark job and allows the Job Server to manage and re-use contexts.
  • validate method allows for an initial validation of the context and any provided configuration. If the context and configuration are OK to run the job, returning spark.jobserver.SparkJobValid will let the job execute, otherwise returning spark.jobserver.SparkJobInvalid(reason) prevents the job from running and provides means to convey the reason of failure. In this case, the call immediately returns an HTTP/1.1 400 Bad Request status code.
    validate helps preventing running jobs that will eventually fail due to missing or wrong configuration and save both time and resources.

In Java, we need to extend JavaSparkJob class. It has following methods which will be overridden in the program:
  • runJob(jsc: JavaSparkContext, jobConfig: Config)
  • validate(sc: SparkContext, config: Config)
  • invalidate(jsc: JavaSparkContext, config: Config)

JavaSparkJob class is available in job-server-api package. Build the job-server-api source code and add this jar to your project.  Add spark and other required dependencies in your pom.xml. 

Let’s start with the basic WordCount example:

WordCount.java:

package spark.jobserver;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;

import org.apache.commons.lang.StringUtils;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import spark.jobserver.JavaSparkJob;
import spark.jobserver.SparkJobInvalid;
import spark.jobserver.SparkJobValid$;
import spark.jobserver.SparkJobValidation;
import com.typesafe.config.Config;
public class Wordcount extends JavaSparkJob implements Serializable { 
       private static final long serialVersionUID = 1L;
private static final Pattern SPACE = Pattern.compile(" ");
static String fileName = StringUtils.EMPTY;

       public Object runJob(JavaSparkContext jsc, Config config) {
              try {
                     JavaRDD<String> lines = jsc.textFile(
                                  config.getString("input.filename"), 1);
                     JavaRDD<String> words = lines
                                  .flatMap(new FlatMapFunction<String, String>() {
                                         public Iterable<String> call(String s) {
                                                return Arrays.asList(SPACE.split(s));
                                         }
                                  });
                     JavaPairRDD<String, Integer> counts = words.mapToPair(
                                  new PairFunction<String, String, Integer>() {
                                         public Tuple2<String, Integer> call(String s) {
                                                return new Tuple2<String, Integer>(s, 1);
                                         }
                                  }).reduceByKey(new Function2<Integer, Integer, Integer>() {
                           public Integer call(Integer i1, Integer i2) {
                                  return i1 + i2;
                           }
                     });
                     List<Tuple2<String, Integer>> output = counts.collect();
                     System.out.println(output);
                     return output;
              } catch (Exception e) {
                     e.printStackTrace();
                     return null;
              }
       }

       public SparkJobValidation validate(SparkContext sc, Config config) {
              String filename = config.getString("input.filename");
              if (!filename.isEmpty()) {
                     return SparkJobValid$.MODULE$;
              } else {
                     return new SparkJobInvalid(
                                  "Input paramerter is missing. Please mention the filename");
              }
       }

       public String invalidate(JavaSparkContext jsc, Config config) {
              return null;
       }
}

Next step is : compile the code and build the jar. Then upload it to the Job server.

So your Spark job is ready to run on JobServer....!!!  

Thursday, 26 May 2016

How to run Spark Job server and spark jobs

Spark Job server provides a RESTful interface for submission and management of Spark jobs, jars and job contexts. It facilitates sharing of jobs and RDD data in a single context. It can run standalone job as well. Job History and configuration is persisted.

Features:
Few of the features are listed here:
    ·       Simple REST interface
    ·       Separate JVM per SparkContext for isolation
    ·       Separate Jar uploading step for faster job execution
    ·       Supports low-latency jobs via long running job contexts
    ·       Asynchronous and synchronous job API.
    ·       Kill running job via stop context and delete job.
    ·     Named Objects (RDD/Dataframes) to cache and retrieve by name, improving object sharing and reuse among jobs.
    ·      Preliminary support for Java.

Setup Spark Job Server:

Sbt Version
Spark version
0.3.1
0.9.1
0.4.0
1.0.2
0.4.1
1.1.0
0.5.0  
1.2.0
0.5.1
1.3.0
0.5.2
1.3.1 
0.6.0
1.4.1
0.6.1
1.5.2 
0.6.2
1.6.1
Master(0.13.x)
1.6.1 

Pre-requisites:
To setup the server, pre-requisites are:

               ·      64 bit Operating system
        ·      Java 8
        ·      sbt
        ·      curl
        ·      git
        ·      Spark

Please make sure sbt version should be compatible to spark version.  Here is the list of compatible versions.

You can install Java8 from here.

For sbt, you can refer sbt official site .








  For CentOS users:
Yum install curl
Yum install git
Yum install sbt

   For Ubuntu users:
sudo apt-get install curl
sudo apt-get install git
sudo apt-get install sbt

Download the Spark package and setup it. Windows User can refer the link How to setup Spark on windows. Once spark setup is done, run the spark master and worker daemon.
[xuser@machine123 spark-1.6.1-bin-hadoop2.6]$ sbin/start-all.sh

Now clone the spark job server repo on your local.

[xuser@machine123 ~]$ git clone https://github.com/spark-jobserver/spark-jobserver.git

Run sbt command in the cloned repo. It will build the project and give the sbt shell. If you are running sbt command first time,it will take much time. Then type re-start  to start the server on sbt shell:

[xuser@machine123 spark-jobserver]$ sbt
[info] Loading project definition from /home/xuser/softwares/spark-jobserver/project
Missing bintray credentials /home/xuser/.bintray/.credentials. Some bintray features depend on this.
Missing bintray credentials /home/xuser/.bintray/.credentials. Some bintray features depend on this.
Missing bintray credentials /home/xuser/.bintray/.credentials. Some bintray features depend on this.
Missing bintray credentials /home/xuser/.bintray/.credentials. Some bintray features depend on this.
[info] Set current project to root (in build file:/home/xuser/spark-jobserver/)
> re-start

If you want to use any specific configuration to start the server, You can also specify JVM parameters after "---". Including all the options looks like this:
 > re-start config/application.conf  --- -Xmx512m

It will start the spark job server on http://localhost:8090 url. You can see all daemons using jps.


Sample SparkJobs Walkthrough:

Spark-job-server has some sample Spark jobs written in Scala.To package the test jar, run command. 
[xuser@machine123 spark-jobserver]$ sbt job-server-tests/package

It will give you a jar in job-server-tests/target/scala-2.10 directory.Now upload the jar to the server:

[xuser@machine123 spark-jobserver]$ curl --data-binary @job-server-tests/target/scala-2.10/job-server-tests_2.10-0.7.0-SNAPSHOT.jar localhost:8090/jars/test
OK

This jar is uploaded as app test. You can view same information on webUI.
We can run the jobs in two mode: Transient Context mode, Persistent Content mode.

Unrelated jobs -with Transient Context:

In this mode, each job will create its own spark context.  Let's submit the WordCount job on the server:
[xuser@machine123 ~]$ curl -d "input.string = a b c a b see" 'localhost:8090/jobs?appName=test&classPath=spark.jobserver.WordCountExample'
{
  "status": "STARTED",
  "result": {
    "jobId": "5453779a-f004-45fc-a11d-a39dae0f9bf4",
    "context": "b7ea0eb5-spark.jobserver.WordCountExample"
  }
}

Persistent Context mode- Related Jobs:

In this mode, jobs can use the existing Spark context. Create a spark context named ‘test-context’:
[xuser@machine123 ~]$ curl -d "" 'localhost:8090/contexts/test-context?num-cpu-cores=4&memory-per-node=512m'
OK

To see the existing contexts:
[xuser@machine123 ~]$ curl localhost:8090/contexts
["test-context"]

To run the job in existing context:
[xuser@machine123 ~]$ curl -d "input.string = a b c a b see" 'localhost:8090/jobs?appName=test&classPath=spark.jobserver.WordCountExample&context=test-context&sync=true'
{
  "result": {
    "a": 2,
    "b": 2,
    "c": 1,
    "see": 1
  }
}

You can run the job without any input argument passing -d "":

[xuser@machine123 ~]$ curl -d "" 'localhost:8090/jobs?appName=test&classPath=spark.jobserver.LongPiJob&context=test-context&sync=true'
{
  "result": 3.1403460207612457
}

You can check the job status by giving job ID in following command:

[xuser@machine123 ~]$ curl localhost:8090/jobs/<jobID>

You can see the all the running, completed, failed jobs on Job Server UI.  Now you are ready to write your jobs to run of SparkJobServer..!!!