Thrift is a binary protocol and is used for cross-language communication. The Nimbus node in Storm is a thrift service, and the topologies structure is also defined in the thrift structure. Due to the wide used of thrift in Storm, we can write code in any language to connect to the Nimbus node.
This section covers how we can collect the cluster details (similar to the details shown on the Storm UI page) using the Nimbus thrift client. The extraction or collection of information through the Nimbus thrift client allows us to plot or show the cluster details in a more visual manner.
The Nimbus thrift API is very rich and it exposes all the necessary information required to monitor the Storm cluster.
We are going to look at how we can use the Nimbus thrift Java API to perform the following tasks:
The following are the steps to fetch the cluster details using the Nimbus thrift client:
com.learningstorm
as Group Id and monitoring
as Artifact Id, as shown in the following screenshot:pom.xml
file:<dependency> <groupId>org.apache.thrift</groupId> <artifactId>libthrift</artifactId> <version>0.7.0</version> </dependency> <dependency> <groupId>storm</groupId> <artifactId>storm</artifactId> <version>0.9.0.1</version> </dependency>
pom.xml
file:<repository> <id>clojars.org</id> <url>http://clojars.org/repo</url> </repository>
ThriftClient
, in the com.learningstorm.monitoring
package. The ThriftClient
class contains logic to make a connection to the Nimbus thrift server and return the Nimbus client. The following is the code for the ThriftClient
class:public class ThriftClient { // IP of the Storm UI node private static final String STORM_UI_NODE = "127.0.0.1"; public Client getClient() { // Set the IP and port of thrift server. // By default, the thrift server start on port 6627 TSocket socket = new TSocket(STORM_UI_NODE, 6627); TFramedTransport tFramedTransport = new TFramedTransport(socket); TBinaryProtocol tBinaryProtocol = new TBinaryProtocol(tFramedTransport); Client client = new Client(tBinaryProtocol); try { // Open the connection with thrift client. tFramedTransport.open(); }catch(Exception exception) { throw new RuntimeException("Error occurred while making connection with nimbus thrift server"); } // return the Nimbus Thrift client. return client; } }
NimbusConfiguration
class in the com.learningstorm.monitoring
package. This class contains logic to collect the Nimbus configuration using the Nimbus client. The following is the code for the NimbusConfiguration
class:public class NimbusConfiguration { public void printNimbusStats() { try { ThriftClient thriftClient = new ThriftClient(); Client client = thriftClient.getClient(); String nimbusConiguration = client.getNimbusConf(); System.out.println("*************************************"); System.out.println("Nimbus Configuration : "+nimbusConiguration); System.out.println("*************************************"); }catch(Exception exception) { throw new RuntimeException("Error occurred while fetching the Nimbus statistics : "); } } public static void main(String[] args) { new NimbusConfiguration().printNimbusStats(); } }
The preceding program uses the getNimbusConf()
method of the backtype.storm.generated.Nimbus.Client
class to fetch the Nimbus configuration.
SupervisorStatistics
class in the com.learningstorm.monitoring
package to collect information about all the supervisor nodes running in the Storm cluster. The following is the code for the SupervisorStatistics
class:public class SupervisorStatistics { public void printSupervisorStatistics(){ try { ThriftClient thriftClient = new ThriftClient(); Client client = thriftClient.getClient(); // Get the cluster information. ClusterSummary clusterSummary = client.getClusterInfo(); // Get the SupervisorSummary iterator Iterator<SupervisorSummary> supervisorsIterator = clusterSummary.get_supervisors_iterator(); while (supervisorsIterator.hasNext()) { // Print the information of supervisor node SupervisorSummary supervisorSummary = (SupervisorSummary) supervisorsIterator.next(); System.out.println("*************************************"); System.out.println("Supervisor Host IP : "+supervisorSummary.get_host()); System.out.println("Number of used workers : "+supervisorSummary.get_num_used_workers()); System.out.println("Number of workers : "+supervisorSummary.get_num_workers()); System.out.println("Supervisor ID : "+supervisorSummary.get_supervisor_id()); System.out.println("Supervisor uptime in seconds : "+supervisorSummary.get_uptime_secs()); System.out.println("*************************************"); } }catch (Exception e) { throw new RuntimeException("Error occurred while getting cluster info : "); } } }
The SupervisorStatistics
class uses the getClusterInfo()
method of the backtype.storm.generated.Nimbus.Client
class to get the instance of the backtype.storm.generated.ClusterSummary
class and then calls the get_supervisors_iterator()
method of the backtype.storm.generated.ClusterSummary
class to get an iterator over the backtype.storm.generated.SupervisorSummary
class. The following screenshot is the output of the SupervisorStatistics
class:
TopologyStatistics
class in the com.learningstorm.monitoring
package to collect information of all the topologies running in a Storm cluster, as shown in the following code:public class TopologyStatistics { public void printTopologyStatistics() { try { ThriftClient thriftClient = new ThriftClient(); // Get the thrift client Client client = thriftClient.getClient(); // Get the cluster info ClusterSummary clusterSummary = client.getClusterInfo(); // Get the interator over TopologySummary class Iterator<TopologySummary> topologiesIterator = clusterSummary.get_topologies_iterator(); while (topologiesIterator.hasNext()) { TopologySummary topologySummary = topologiesIterator.next(); System.out.println("*************************************"); System.out.println("ID of topology: " + topologySummary.get_id()); System.out.println("Name of topology: " + topologySummary.get_name()); System.out.println("Number of Executors: " + topologySummary.get_num_executors()); System.out.println("Number of Tasks: " + topologySummary.get_num_tasks()); System.out.println("Number of Workers: " + topologySummary.get_num_workers()); System.out.println("Status of topology: " + topologySummary.get_status()); System.out.println("Topology uptime in seconds: " + topologySummary.get_uptime_secs()); System.out.println("*************************************"); } }catch (Exception exception) { throw new RuntimeException("Error occurred while fetching the topologies information"); } } }
The TopologyStatistics
class uses the get_topologies_iterator()
method of the backtype.storm.generated.ClusterSummary
class to get an iterator over the backtype.storm.generated.TopologySummary
class. The class TopologyStatistics
will print the value of the number of executors, the number of tasks, and the number of worker processes assigned to each topology. The following is the console output of the TopologyStatistics
class:
SpoutStatistics
class in the com.learningstorm.monitoring
package to get the statistics of spouts. The SpoutStatistics
class contains a printSpoutStatistics(String topologyId)
method to print the details about all the spouts served by the given topology, as shown in the following code:public class SpoutStatistics { private static final String DEFAULT = "default"; private static final String ALL_TIME = ":all-time"; public void printSpoutStatistics(String topologyId) { try { ThriftClient thriftClient = new ThriftClient(); // Get the nimbus thrift client Client client = thriftClient.getClient(); // Get the information of given topology TopologyInfo topologyInfo = client.getTopologyInfo(topologyId); Iterator<ExecutorSummary> executorSummaryIterator = topologyInfo.get_executors_iterator(); while (executorSummaryIterator.hasNext()) { ExecutorSummary executorSummary = executorSummaryIterator.next(); ExecutorStats executorStats = executorSummary.get_stats(); if(executorStats !=null) { ExecutorSpecificStats executorSpecificStats = executorStats.get_specific(); String componentId = executorSummary.get_component_id(); // if (executorSpecificStats.is_set_spout()) { SpoutStats spoutStats = executorSpecificStats.get_spout(); System.out.println("*************************************"); System.out.println("Component ID of Spout:- " + componentId); System.out.println("Transferred:- " + getAllTimeStat(executorStats.get_transferred(), ALL_TIME)); System.out.println("Total tuples emitted:- " + getAllTimeStat(executorStats.get_emitted(), ALL_TIME)); System.out.println("Acked: " + getAllTimeStat(spoutStats.get_acked(), ALL_TIME)); System.out.println("Failed: " + getAllTimeStat(spoutStats.get_failed(), ALL_TIME)); System.out.println("*************************************"); } } } }catch (Exception exception) { throw new RuntimeException("Error occurred while fetching the spout information : "+exception); } } private static Long getAllTimeStat(Map<String, Map<String, Long>> map, String statName) { if (map != null) { Long statValue = null; Map<String, Long> tempMap = map.get(statName); statValue = tempMap.get(DEFAULT); return statValue; } return 0L; } public static void main(String[] args) { new SpoutStatistics().printSpoutStatistics("LearningStormClusterTopology-1-1393847956"); } }
The preceding class uses the getTopologyInfo(topologyId)
method of the backtype.storm.generated.Nimbus.Client
class to fetch the spout information of the given topology. The output of the TopologyStatistics
class prints the ID of each topology; we can pass this ID as an argument to the getTopologyInfo(topologyId)
method to get information about spouts running inside a topology. The SpoutStatistics
class prints the following statistics of the spout:
The following is the console output of the SpoutStatistics
class:
BoltStatistics
class in the com.learningstorm.monitoring
package to get the statistics of bolts. The BoltStatistics
class contains a printBoltStatistics(String topologyId)
method to print information about all the bolts served by the given topology, as shown in the following code:public class BoltStatistics { private static final String DEFAULT = "default"; private static final String ALL_TIME = ":all-time"; public void printBoltStatistics(String topologyId) { try { ThriftClient thriftClient = new ThriftClient(); // Get the Nimbus thrift server client Client client = thriftClient.getClient(); // Get the information of given topology TopologyInfo topologyInfo = client.getTopologyInfo(topologyId); Iterator<ExecutorSummary> executorSummaryIterator = topologyInfo.get_executors_iterator(); while (executorSummaryIterator.hasNext()) { // get the executor ExecutorSummary executorSummary = executorSummaryIterator.next(); ExecutorStats executorStats = executorSummary.get_stats(); if (executorStats != null) { ExecutorSpecificStats executorSpecificStats = executorStats.get_specific(); String componentId = executorSummary.get_component_id(); if (executorSpecificStats.is_set_bolt()) { BoltStats boltStats = executorSpecificStats.get_bolt(); System.out.println("*************************************"); System.out.println("Component ID of Bolt " + componentId); System.out.println("Transferred: " + getAllTimeStat(executorStats.get_transferred(), ALL_TIME)); System.out.println("Emitted:" +.getAllTimeStat(executorStats.get_emitted(), ALL_TIME)); System.out.println("Acked: " + getBoltStats(boltStats.get_acked(), ALL_TIME)); System.out.println("Failed: " + getBoltStats(boltStats.get_failed(), ALL_TIME)); System.out.println("Executed: " +.getBoltStats(boltStats.get_executed(), ALL_TIME)); System.out.println("*************************************"); } } } } catch (Exception exception) { throw new RuntimeException("Error occurred while fetching the bolt information :"+exception); } } private static Long getAllTimeStat(Map<String, Map<String, Long>> map, String statName) { if (map != null) { Long statValue = null; Map<String, Long> tempMap = map.get(statName); statValue = tempMap.get(DEFAULT); return statValue; } return 0L; } public static Long getBoltStats(Map<String, Map<GlobalStreamId, Long>> map, String statName) { if (map != null) { Long statValue = null; Map<GlobalStreamId, Long> tempMap = map.get(statName); Set<GlobalStreamId> key = tempMap.keySet(); if (key.size() > 0) { Iterator<GlobalStreamId> iterator = key.iterator(); statValue = tempMap.get(iterator.next()); } return statValue; } return 0L; } public static void main(String[] args) { new BoltStatistics().printBoltStatistics("LearningStormClusterTopology-1-1393847956"); } }
The preceding class uses the getTopologyInfo(topologyId)
method of the backtype.storm.generated.Nimbus.Client
class to fetch information about the given topology. The output of the TopologyStatistics
class prints the ID of each topology; we can pass this ID as an argument to the getTopologyInfo(topologyId)
method to get information about spouts running inside a topology. The BoltStatistics
class prints the following statistics about a bolt:
The following is the console output of the BoltStatistics
class:
killTopology
class in the com.learningstorm.monitoring
package to kill a topology. The following is the code for the killTopology
class:public class killTopology { public void kill(String topologyId) { try { ThriftClient thriftClient = new ThriftClient(); // Get the nimbus thrift client Client client = thriftClient.getClient(); // kill the given topology client.killTopology(topologyId); }catch (Exception exception) { throw new RuntimeException("Error occurred while killing the topology : "+exception); } } public static void main(String[] args) { new killTopology().kill("topologyId"); } }
The preceding class uses the killTopology(topologyName)
method of the backtype.storm.generated.Nimbus.Client
class to kill the topology.
In this section, we covered several examples that enable you to collect Storm cluster metrics or details using the Nimbus thrift client. The Nimbus thrift API is very rich and can collect all the metrics that are available on the Storm UI through this API.
18.118.20.68