How to do it...

Once you have installed the SNAS BMP message API and started ZooKeeper server and Kafka server as shown previously, you are ready to run a simple listener for the BMP messages.

First, start the Python client of the SNAS message API:

$ python 13_3_log_consumer.py
Connecting to kafka... takes a minute to load offsets and topics, please wait
Now consuming/waiting for messages...

13_3_snas_log_consumer.py is adopted from openbmp-python-api-message/examples/log_consumer.py.

Now, if you run the following from another Terminal and send an empty message using the Enter key in your keyboard:

$ kafka_2.11-0.11.0.0/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic openbmp.parsed.router
>
>

You will receive the following message:

$ python 13_3_snas_log_consumer.py --conf="config.yaml"
Connecting to kafka... takes a minute to load offsets and topics, please wait
Now consuming/waiting for messages...
    
Received Message (2017-07-21 12:17:53.536705) : ROUTER(V: 0.0)
[]

If you run the following command, you will see a list of topics created for BMP by the 13_3_snas_log_consumer.py:

$ kafka_2.11-0.11.0.0/bin/kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
openbmp.parsed.bmp_stat
openbmp.parsed.collector
openbmp.parsed.l3vpn
openbmp.parsed.ls_link
openbmp.parsed.ls_node
openbmp.parsed.ls_prefix
openbmp.parsed.peer
openbmp.parsed.router
openbmp.parsed.unicast_prefix

Listing 13.3 gives the simple BMP log consumer, adopted from the SNAS examples. This listing omits a few lines of code for brevity. Check the full code at 13_3_snas_log_consumer.py:

#!/usr/bin/env python 
# Python Network Programming Cookbook, Second Edition
-- Chapter - 13 # This program is optimized for Python 2.7.12. # SNAS Message API Requires Python 2.7 to Run. # This program may run on any other version with/without
modifications. # Adopted from openbmp-python-api-message/examples/
log_consumer.py import argparse import yaml import datetime import time import kafka from openbmp.api.parsed.message import Message from openbmp.api.parsed.message import BmpStat from openbmp.api.parsed.message import Collector from openbmp.api.parsed.message import LsLink from openbmp.api.parsed.message import LsNode from openbmp.api.parsed.message import LsPrefix from openbmp.api.parsed.message import Peer from openbmp.api.parsed.message import Router from openbmp.api.parsed.message import UnicastPrefix from openbmp.api.parsed.message import L3VpnPrefix def process_message(msg): m = Message(msg.value) # Gets body of kafka message. t = msg.topic # Gets topic of kafka message. m_tag = t.split('.')[2].upper() t_stamp = str(datetime.datetime.now()) # For various cases of BMP message topics. Omitted
logs for the sake of space. if t == "openbmp.parsed.router": router = Router(m) print (' ' + 'Received Message (' + t_stamp + ')
: ' + m_tag + '(V: ' + str(m.version) + ')') print (router.to_json_pretty()) elif t == "openbmp.parsed.peer": peer = Peer(m) elif t == "openbmp.parsed.collector": collector = Collector(m) elif t == "openbmp.parsed.bmp_stat": bmp_stat = BmpStat(m) elif t == "openbmp.parsed.unicast_prefix": unicast_prefix = UnicastPrefix(m) elif t == "openbmp.parsed.l3vpn": l3vpn_prefix = L3VpnPrefix(m) elif t == "openbmp.parsed.ls_node": ls_node = LsNode(m) elif t == "openbmp.parsed.ls_link": ls_link = LsLink(m) elif t == "openbmp.parsed.ls_prefix": ls_prefix = LsPrefix(m) def main(conf): # Enable to topics/feeds topics = [ 'openbmp.parsed.router', 'openbmp.parsed.peer',
'openbmp.parsed.collector', 'openbmp.parsed.bmp_stat', 'openbmp.parsed.
unicast_prefix', 'openbmp.parsed.ls_node', 'openbmp.parsed.ls_link',
'openbmp.parsed.ls_prefix',
'openbmp.parsed.l3vpn' ] # Read config file with open(conf, 'r') as f: config_content = yaml.load(f) bootstrap_server = config_content['bootstrap_servers'] try: # connect and bind to topics print ("Connecting to kafka... takes a minute to
load offsets and topics, please wait") consumer = kafka.KafkaConsumer( *topics, bootstrap_servers=bootstrap_server, client_id="dev-testing" + str(time.time()), group_id="dev-testing" + str(time.time()), enable_auto_commit=True, auto_commit_interval_ms=1000, auto_offset_reset="largest" ) for m in consumer: process_message(m) except kafka.common.KafkaUnavailableError as err: print ("Kafka Error: %s" % str(err)) except KeyboardInterrupt: print ("User stop requested") if __name__ == '__main__': parser = argparse.ArgumentParser(description='SNAS
Log Consumer') parser.add_argument('--conf', action="store",
dest="conf", default="config.yaml") given_args = parser.parse_args() conf = given_args.conf main (conf)

A configuration file can be passed by the --conf parameter. Default is config.yaml, which just points to the default Kafka server location:

bootstrap_servers: localhost:9092

The following screenshot shows the output of the Python program of the SNAS message API and the Kafka broker, along with the list of topics created by the Python program:

SNAS Message API and Kafka Broker
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.189.186.109