Friday, 20 June 2014

Submitting a topology to Remote Storm Cluster

It is very easy to write a topology and submit to the same Storm Cluster.
But problem arises when we need to submit a topology remotely to remote Storm cluster from a local machine.
What should we do in that case?

Here is the approach to submit a topology to remote cluster.

I have a local windows machine and one Storm Cluster(1 nimbus Linux machine and 2 supervisor Linux machine)
Let's say following are the machines in cluster :

Nimbus Machine :
Supervisor Machine 1:
Supervisor Machine 2:

Storm cluster should be up and running on above machine.

Now from local machine, use NimbusClient to submit Jar to cluster.

NimbusClient nimbus = new NimbusClient(storm_conf,"<nimbus machine ip>",<nimbus port>);
nimbus.getClient().submitTopology(topologyName,uploadedJarLocation,jsonConf, builder.createTopology());

Here is a running example:

import java.util.Map;
import org.json.simple.JSONValue;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.Nimbus.Client;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.NimbusClient;
import backtype.storm.utils.Utils;

public class RunningClusterTopology {
    public static void main(String[] args) throws Exception {
  TopologyBuilder builder = new TopologyBuilder();
                Config conf = new Config();
                conf.put(Config.NIMBUS_HOST, "");
                Map storm_conf = Utils.readStormConfig();
                storm_conf.put("", "");
                Client client = NimbusClient.getConfiguredClient(storm_conf)
                String inputJar = "C:\\workspace\\TestStormRunner\\target\\TestStormRunner-0.0.1-SNAPSHOT-jar-with-dependencies.jar";
                NimbusClient nimbus = new NimbusClient(storm_conf, "",
      // upload topology jar to Cluster using StormSubmitter
               String uploadedJarLocation = StormSubmitter.submitJar(storm_conf,
                try {
                        String jsonConf = JSONValue.toJSONString(storm_conf);
                                        uploadedJarLocation, jsonConf, builder.createTopology());
                } catch (AlreadyAliveException ae) {

It will submit a topology on Nimbus Machine where it’ll run on 2 supervisor machines( and
To test it, open storm UI in browser : http://<nimbus-ip>:<port>