Cluster statistics using the Nimbus thrift client

Thrift is a binary protocol and is used for cross-language communication. The Nimbus node in Storm is a thrift service, and the topologies structure is also defined in the thrift structure. Due to the wide used of thrift in Storm, we can write code in any language to connect to the Nimbus node.

This section covers how we can collect the cluster details (similar to the details shown on the Storm UI page) using the Nimbus thrift client. The extraction or collection of information through the Nimbus thrift client allows us to plot or show the cluster details in a more visual manner.

The Nimbus thrift API is very rich and it exposes all the necessary information required to monitor the Storm cluster.

Fetching information with the Nimbus thrift client

We are going to look at how we can use the Nimbus thrift Java API to perform the following tasks:

  • Collecting the Nimbus configuration
  • Collecting the supervisor statistics
  • Collecting the topology's statistics
  • Collecting the spout's statistics for the given topology
  • Collecting the bolt's statistics for the given topology
  • Killing the given topology

The following are the steps to fetch the cluster details using the Nimbus thrift client:

  1. Create a Maven project using com.learningstorm as Group Id and monitoring as Artifact Id, as shown in the following screenshot:
    Fetching information with the Nimbus thrift client

    Create a new Maven project

  2. Add the following dependencies in the pom.xml file:
    <dependency>
      <groupId>org.apache.thrift</groupId>
      <artifactId>libthrift</artifactId>
      <version>0.7.0</version>
    </dependency>
    <dependency>
      <groupId>storm</groupId>
      <artifactId>storm</artifactId>
      <version>0.9.0.1</version>
    </dependency>
  3. Add the following repository in the pom.xml file:
    <repository>
      <id>clojars.org</id>
      <url>http://clojars.org/repo</url>
    </repository>
  4. Create a utility class, ThriftClient, in the com.learningstorm.monitoring package. The ThriftClient class contains logic to make a connection to the Nimbus thrift server and return the Nimbus client. The following is the code for the ThriftClient class:
    public class ThriftClient {
      // IP of the Storm UI node
      private static final String STORM_UI_NODE = "127.0.0.1";
      public Client getClient() {
        // Set the IP and port of thrift server.
        // By default, the thrift server start on port 6627
        TSocket socket = new TSocket(STORM_UI_NODE, 6627);
        TFramedTransport tFramedTransport = new TFramedTransport(socket);
        TBinaryProtocol tBinaryProtocol = new TBinaryProtocol(tFramedTransport);
        Client client = new Client(tBinaryProtocol);
        try {
          // Open the connection with thrift client.
          tFramedTransport.open();
        }catch(Exception exception) {
          throw new RuntimeException("Error occurred while making connection with nimbus thrift server");
        }
        // return the Nimbus Thrift client.
        return client;
      }
    }
  5. Let's create a NimbusConfiguration class in the com.learningstorm.monitoring package. This class contains logic to collect the Nimbus configuration using the Nimbus client. The following is the code for the NimbusConfiguration class:
    public class NimbusConfiguration {
    
      public void printNimbusStats() {
        try {
          ThriftClient thriftClient = new ThriftClient();
          Client client = thriftClient.getClient();
          String nimbusConiguration = client.getNimbusConf();
          System.out.println("*************************************");
          System.out.println("Nimbus Configuration : "+nimbusConiguration);
          System.out.println("*************************************");
        }catch(Exception exception) {
          throw new RuntimeException("Error occurred while fetching the Nimbus statistics : ");
        }
      }
      public static void main(String[] args) {
        new NimbusConfiguration().printNimbusStats();
      }
    }

    The preceding program uses the getNimbusConf() method of the backtype.storm.generated.Nimbus.Client class to fetch the Nimbus configuration.

  6. Create a SupervisorStatistics class in the com.learningstorm.monitoring package to collect information about all the supervisor nodes running in the Storm cluster. The following is the code for the SupervisorStatistics class:
    public class SupervisorStatistics {
    
      public void printSupervisorStatistics(){
        try {
          ThriftClient thriftClient = new ThriftClient();
          Client client = thriftClient.getClient();
          // Get the cluster information.
          ClusterSummary clusterSummary = client.getClusterInfo();
          // Get the SupervisorSummary iterator
          Iterator<SupervisorSummary> supervisorsIterator = clusterSummary.get_supervisors_iterator();
    
          while (supervisorsIterator.hasNext()) {
            // Print the information of supervisor node
            SupervisorSummary supervisorSummary = (SupervisorSummary) supervisorsIterator.next();
            System.out.println("*************************************");
            System.out.println("Supervisor Host IP : "+supervisorSummary.get_host());
            System.out.println("Number of used workers : "+supervisorSummary.get_num_used_workers());
            System.out.println("Number of workers : "+supervisorSummary.get_num_workers());
            System.out.println("Supervisor ID : "+supervisorSummary.get_supervisor_id());
            System.out.println("Supervisor uptime in seconds : "+supervisorSummary.get_uptime_secs());
            System.out.println("*************************************");
          }
    
        }catch (Exception e) {
          throw new RuntimeException("Error occurred while getting cluster info : ");
        }
      }
    
    }

    The SupervisorStatistics class uses the getClusterInfo() method of the backtype.storm.generated.Nimbus.Client class to get the instance of the backtype.storm.generated.ClusterSummary class and then calls the get_supervisors_iterator() method of the backtype.storm.generated.ClusterSummary class to get an iterator over the backtype.storm.generated.SupervisorSummary class. The following screenshot is the output of the SupervisorStatistics class:

    Fetching information with the Nimbus thrift client

    The output of the SupervisorStatistics class

  7. Create a TopologyStatistics class in the com.learningstorm.monitoring package to collect information of all the topologies running in a Storm cluster, as shown in the following code:
    public class TopologyStatistics {
    
      public void printTopologyStatistics() {
        try {
          ThriftClient thriftClient = new ThriftClient();
          // Get the thrift client
          Client client = thriftClient.getClient();
          // Get the cluster info
          ClusterSummary clusterSummary = client.getClusterInfo();
          // Get the interator over TopologySummary class
          Iterator<TopologySummary> topologiesIterator = clusterSummary.get_topologies_iterator();
          while (topologiesIterator.hasNext()) {
            TopologySummary topologySummary = topologiesIterator.next();
            System.out.println("*************************************");
            System.out.println("ID of topology: " + topologySummary.get_id());
            System.out.println("Name of topology: " + topologySummary.get_name());
            System.out.println("Number of Executors: " + topologySummary.get_num_executors());
            System.out.println("Number of Tasks: " + topologySummary.get_num_tasks());
            System.out.println("Number of Workers: " + topologySummary.get_num_workers());
            System.out.println("Status of topology: " + topologySummary.get_status());
            System.out.println("Topology uptime in seconds: " + topologySummary.get_uptime_secs());
            System.out.println("*************************************");
          }
        }catch (Exception exception) {
          throw new RuntimeException("Error occurred while fetching the topologies information");
        }
      }
    }

    The TopologyStatistics class uses the get_topologies_iterator() method of the backtype.storm.generated.ClusterSummary class to get an iterator over the backtype.storm.generated.TopologySummary class. The class TopologyStatistics will print the value of the number of executors, the number of tasks, and the number of worker processes assigned to each topology. The following is the console output of the TopologyStatistics class:

    Fetching information with the Nimbus thrift client

    The output of the TopologyStatistics class

  8. Create a SpoutStatistics class in the com.learningstorm.monitoring package to get the statistics of spouts. The SpoutStatistics class contains a printSpoutStatistics(String topologyId) method to print the details about all the spouts served by the given topology, as shown in the following code:
    public class SpoutStatistics {
    
      private static final String DEFAULT = "default";
      private static final String ALL_TIME = ":all-time";
    
      public void printSpoutStatistics(String topologyId) {
        try {
          ThriftClient thriftClient = new ThriftClient();
          // Get the nimbus thrift client
          Client client = thriftClient.getClient();
          // Get the information of given topology
          TopologyInfo topologyInfo = client.getTopologyInfo(topologyId);
          Iterator<ExecutorSummary> executorSummaryIterator = topologyInfo.get_executors_iterator();
          while (executorSummaryIterator.hasNext()) {
            ExecutorSummary executorSummary = executorSummaryIterator.next();
            ExecutorStats executorStats = executorSummary.get_stats();
            if(executorStats !=null) {
              ExecutorSpecificStats executorSpecificStats = executorStats.get_specific();
              String componentId = executorSummary.get_component_id();
              //
              if (executorSpecificStats.is_set_spout()) {
                SpoutStats spoutStats = executorSpecificStats.get_spout();
                System.out.println("*************************************");
                System.out.println("Component ID of Spout:- " + componentId);
                System.out.println("Transferred:- " + getAllTimeStat(executorStats.get_transferred(), ALL_TIME));
                System.out.println("Total tuples emitted:- " + getAllTimeStat(executorStats.get_emitted(), ALL_TIME));
                System.out.println("Acked: " + getAllTimeStat(spoutStats.get_acked(), ALL_TIME));
                System.out.println("Failed: " + getAllTimeStat(spoutStats.get_failed(), ALL_TIME));
                System.out.println("*************************************");
              }
            }
          }
        }catch (Exception exception) {
          throw new RuntimeException("Error occurred while fetching the spout information : "+exception);
        }
      }
    
      private static Long getAllTimeStat(Map<String, Map<String, Long>> map, String statName) {
        if (map != null) {
          Long statValue = null;
          Map<String, Long> tempMap = map.get(statName);
          statValue = tempMap.get(DEFAULT);
          return statValue;
        }
        return 0L;
      }
    
      public static void main(String[] args) {
        new SpoutStatistics().printSpoutStatistics("LearningStormClusterTopology-1-1393847956");
      }
    }

    The preceding class uses the getTopologyInfo(topologyId) method of the backtype.storm.generated.Nimbus.Client class to fetch the spout information of the given topology. The output of the TopologyStatistics class prints the ID of each topology; we can pass this ID as an argument to the getTopologyInfo(topologyId) method to get information about spouts running inside a topology. The SpoutStatistics class prints the following statistics of the spout:

    • The spout ID
    • The number of tuples emitted and transferred
    • The number of tuples failed
    • The number of tuples acknowledged

    The following is the console output of the SpoutStatistics class:

    Fetching information with the Nimbus thrift client

    The output of the SpoutStatistics class

  9. Create a BoltStatistics class in the com.learningstorm.monitoring package to get the statistics of bolts. The BoltStatistics class contains a printBoltStatistics(String topologyId) method to print information about all the bolts served by the given topology, as shown in the following code:
    public class BoltStatistics {
      private static final String DEFAULT = "default";
      private static final String ALL_TIME = ":all-time";
    
      public void printBoltStatistics(String topologyId) {
        try {
          ThriftClient thriftClient = new ThriftClient();
          // Get the Nimbus thrift server client
          Client client = thriftClient.getClient();
    
          // Get the information of given topology
          TopologyInfo topologyInfo = client.getTopologyInfo(topologyId);
          Iterator<ExecutorSummary> executorSummaryIterator = topologyInfo.get_executors_iterator();
          while (executorSummaryIterator.hasNext()) {
            // get the executor
            ExecutorSummary executorSummary = executorSummaryIterator.next();
            ExecutorStats executorStats = executorSummary.get_stats();
            if (executorStats != null) {
              ExecutorSpecificStats executorSpecificStats = executorStats.get_specific();
              String componentId = executorSummary.get_component_id();
              if (executorSpecificStats.is_set_bolt()) {
                BoltStats boltStats = executorSpecificStats.get_bolt();
                System.out.println("*************************************");
                System.out.println("Component ID of Bolt " + componentId);
                System.out.println("Transferred: " + getAllTimeStat(executorStats.get_transferred(), ALL_TIME));
                System.out.println("Emitted:" +.getAllTimeStat(executorStats.get_emitted(), ALL_TIME));
                System.out.println("Acked: " + getBoltStats(boltStats.get_acked(), ALL_TIME));
                System.out.println("Failed: " + getBoltStats(boltStats.get_failed(), ALL_TIME));
                System.out.println("Executed: " +.getBoltStats(boltStats.get_executed(), ALL_TIME));
                System.out.println("*************************************");
              }
            }
          }
        } catch (Exception exception) {
          throw new RuntimeException("Error occurred while fetching the bolt information :"+exception);
        }
      }
    
      private static Long getAllTimeStat(Map<String, Map<String, Long>> map, String statName) {
        if (map != null) {
          Long statValue = null;
          Map<String, Long> tempMap = map.get(statName);
          statValue = tempMap.get(DEFAULT);
          return statValue;
        }
        return 0L;
      }
    
      public static Long getBoltStats(Map<String, Map<GlobalStreamId, Long>> map, String statName) {
        if (map != null) {
          Long statValue = null;
          Map<GlobalStreamId, Long> tempMap = map.get(statName);
          Set<GlobalStreamId> key = tempMap.keySet();
          if (key.size() > 0) {
            Iterator<GlobalStreamId> iterator = key.iterator();
            statValue = tempMap.get(iterator.next());
          }
          return statValue;
        }
        return 0L;
      }
    
      public static void main(String[] args) {
        new BoltStatistics().printBoltStatistics("LearningStormClusterTopology-1-1393847956");
      }
    }

    The preceding class uses the getTopologyInfo(topologyId) method of the backtype.storm.generated.Nimbus.Client class to fetch information about the given topology. The output of the TopologyStatistics class prints the ID of each topology; we can pass this ID as an argument to the getTopologyInfo(topologyId) method to get information about spouts running inside a topology. The BoltStatistics class prints the following statistics about a bolt:

    • The bolt ID
    • The number of tuples emitted and executed
    • The number of tuples failed
    • The number of tuples acknowledged

    The following is the console output of the BoltStatistics class:

    Fetching information with the Nimbus thrift client

    The output of the BoltStatistics class

  10. Create a killTopology class in the com.learningstorm.monitoring package to kill a topology. The following is the code for the killTopology class:
    public class killTopology {
      public void kill(String topologyId) {
        try {
          ThriftClient thriftClient = new ThriftClient();
          // Get the nimbus thrift client
          Client client = thriftClient.getClient();
          // kill the given topology
          client.killTopology(topologyId);
    
        }catch (Exception exception) {
          throw new RuntimeException("Error occurred while killing the topology : "+exception);
        }
      }
      public static void main(String[] args) {
        new killTopology().kill("topologyId");
      }
    }

    The preceding class uses the killTopology(topologyName) method of the backtype.storm.generated.Nimbus.Client class to kill the topology.

In this section, we covered several examples that enable you to collect Storm cluster metrics or details using the Nimbus thrift client. The Nimbus thrift API is very rich and can collect all the metrics that are available on the Storm UI through this API.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.118.20.68