Nothing beats live production data. With this recipe, we will write some code to record the live data. Then we will play it back with delays added to simulate playing back the live data stream.
recipe64
as well as a user with permission to access it.With these steps, we will see how to record and play back data at a real-time pace.
recipe64_livedata.py
that simulates live data being sent every one to ten seconds.import random import sys import time from network import * from springpython.remoting.pyro import * print "Sending events to live network app. Ctrl+C to exit..." proxy = PyroProxyFactory() proxy.service_url = "PYROLOC://127.0.0.1:7766/network_advised" while True: hostname = random.choice(["pyhost1","pyhost2","pyhost3"]) condition = random.choice(["serverRestart", "lineStatus"]) severity = random.choice([1,5]) evt = Event(hostname, condition, severity) stored_event, is_active, updated_services, updated_equipment = proxy.process(evt) print "Stored event: %s" % stored_event print "Active? %s" % is_active print "Services updated: %s" % updated_services print "Equipment updated; %s" % updated_equipment print "================" time.sleep(random.choice(range(1,10)))
recipe64_server.py
that initializes the database using the SQL script recipe62_network.mysql
from Targeting the test server.from springpython.database.factory import * from springpython.database.core import * from springpython.remoting.pyro import * from springpython.aop import * from network import * from datetime import datetime import os import os.path import pickle import logging logger = logging.getLogger("springpython.remoting") loggingLevel = logging.DEBUG logger.setLevel(loggingLevel) ch = logging.StreamHandler() ch.setLevel(loggingLevel) formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") ch.setFormatter(formatter) logger.addHandler(ch) # Initialize the database factory = MySQLConnectionFactory("user", "password", "localhost", "recipe64") dt = DatabaseTemplate(factory) sql = open("recipe62_network.mysql").read().split(";") for statement in sql: dt.execute(statement + ";")
# Create an instance of the network management app target_service = EventCorrelator(factory) # Expose the original network app as a Pyro service unadvised_service = PyroServiceExporter() unadvised_service.service_name = "network" unadvised_service.service = target_service unadvised_service.after_properties_set()
class Recorder(MethodInterceptor): """ An interceptor that catches each event, write it to disk, then proceeds to the network management app. """ def __init__(self): self.filename = "recipe64_data.txt" self.special_char = "&&&" if os.path.exists(self.filename): os.remove(self.filename) def invoke(self, invocation): # Write data to disk with open(self.filename, "a") as f: evt = invocation.args[0] now = datetime.now() output = (evt, now) print "Recording %s" % evt f.write(pickle.dumps(output).replace( " ", "&&&") + " ") # Now call the advised service return invocation.proceed()
# Wrap the network app with an interceptor advisor = ProxyFactoryObject() advisor.target = target_service advisor.interceptors = [Recorder()] # Expose the advised network app as a Pyro service advised_service = PyroServiceExporter() advised_service.service_name = "network_advised" advised_service.service = advisor advised_service.after_properties_set()
python recipe64_server.py
. Notice in the following screenshot that there is both a network
service and a network_advised
service registered with Pyro.python recipe64_livedata.py
until it generates a few events, and then hit Ctrl+C to break out of it.recipe64_data.txt
data file, noting how each line represents a separate event and time stamp. While it's hard to decipher the data stored in a pickled format, it's possible to spot bits and pieces.recipe64_playback.py
that de-pickles each line of the data file.from springpython.remoting.pyro import * from datetime import datetime import pickle import time with open("recipe64_data.txt") as f: lines = f.readlines() events = [pickle.loads(line.replace("&&&", " ")) for line in lines]
def calc_offset(evt, time_it_happened, previous_time): if previous_time is None: return time_it_happened - time_it_happened else: return time_it_happened - previous_time
print "Sending events to live network app. Ctrl+C to exit..." proxy = PyroProxyFactory() proxy.service_url = "PYROLOC://127.0.0.1:7766/network"
previous_time = None for (e, time_it_happened) in events: diff = calc_offset(e, time_it_happened, previous_time) print "Original: %s Now: %s" % (time_it_happened, datetime.now()) stored_event, is_active, updated_services, updated_equipment = proxy.process(e) print "Stored event: %s" % stored_event print "Active? %s" % is_active print "Services updated: %s" % updated_services print "Equipment updated; %s" % updated_equipment print "Next event in %s seconds" % diff.seconds print "================" time.sleep(diff.seconds) previous_time = time_it_happened
python recipe64_playback.py
and observe how it has the same delays as the original live data simulator.Normally, we would be recording data coming in from the live network. In this situation, we need a simulator that generates random data. The simulator we coded in this recipe is very similar to the one shown in the Coding a data simulator recipe.
To capture the data, we coded an interceptor that is embedded between Pyro and the network management application. Every event published to the network_advised
Pyro service name seamlessly passes through this interceptor:
datetime.now()
in order to capture a time stampFinally, we have a playback script that reads in the data file, one event per line. It de-pickles each line into the tuple format it was originally stored in, and builds a list of events.
The list of events is then scanned, one at a time. By comparing the current event's time stamp with the previous one, a difference in seconds is calculated in order to use Python's time.sleep()
method to play the events back at the same rate they were recorded.
The playback script uses Pyro to send the events into the network management application. But it talks to a different exposure point. This is to avoid re-recording the same event.
The code in this recipe uses Pyro as the mechanism connecting clients and servers communicate in a publish/subscribe paradigm. This isn't the only way to build such a service. Python has XML-RPC built in as well. It just isn't as flexible as Pyro. A more thorough analysis of real traffic is needed to determine if this interface is good enough. Alternatives include pushing events through a database EVENT table, where the client inserts rows, and the server polls the table for new rows, and then removes them as they are consumed.
This recipe also makes heavy use of Spring Python for its Aspect Oriented Programming features to insert the data recording code (http://static.springsource.org/spring-python/1.1.x/reference/html/aop.html). This provides a clean way to add the extra layer of functionality we need to sniff and record network traffic without having to touch the already built network management code.
Well, the recipe is more about recording the live data and controlling the speed of playback. To capture this concept in a reusable recipe requires that we simulate the live system. But the fundamental concept of inserting a tap point in front of the network management processor, as we have done, is just as valid.
The recipe was coded to ensure that stopping the recording would have minimal risk of losing captured data not yet written to disk. Analysis of production data is required to determine the most efficient way of storing data. For example, it may take less I/O intensity to write data in batches of 10, or perhaps 100 events. But the risk is that data can be lost in similar bundles.
If the volume of traffic is low enough, writing each event one-by-one, as shown in this recipe, may not be a problem at all.
It is not uncommon to have the actual logic of opening the file, appending the data, and then closing the file contained in a separate class. This utility could then be injected into the interceptor we built. This may become important if some more elaborate means to storing or piping the data is needed. For example, another Pyro service may exist in another location that wants a copy of the live data feed.
Injecting the data consumer into the aspect we coded would give us more flexibility. In this recipe, we don't have such requirements, but it's not hard to imagine making such adjustments as new requirements arrive.
3.22.41.212