Recording and playing back live data in real time

Nothing beats live production data. With this recipe, we will write some code to record the live data. Then we will play it back with delays added to simulate playing back the live data stream.

Getting ready

  1. Make sure the MySQL production database server is up and running.
  2. Open a command-line MySQL client shell as the root user.
  3. Create a database for this recipe called recipe64 as well as a user with permission to access it.
  4. Exit the shell.
How to do it...

With these steps, we will see how to record and play back data at a real-time pace.

  1. Write a script called that simulates live data being sent every one to ten seconds.
    import random
    import sys
    import time
    from network import *
    from springpython.remoting.pyro import *
    print "Sending events to live network app. Ctrl+C to exit..."
    proxy = PyroProxyFactory()
    proxy.service_url = "PYROLOC://"
    while True:
        hostname = random.choice(["pyhost1","pyhost2","pyhost3"])
        condition = random.choice(["serverRestart", "lineStatus"])
        severity = random.choice([1,5])
        evt = Event(hostname, condition, severity)
        stored_event, is_active, updated_services, 
             updated_equipment = proxy.process(evt)
        print "Stored event: %s" % stored_event
        print "Active? %s" % is_active
        print "Services updated: %s" % updated_services
        print "Equipment updated; %s" % updated_equipment
        print "================"
  2. Write a server script called that initializes the database using the SQL script recipe62_network.mysql from Targeting the test server.
    from springpython.database.factory import *
    from springpython.database.core import *
    from springpython.remoting.pyro import *
    from springpython.aop import *
    from network import *
    from datetime import datetime
    import os
    import os.path
    import pickle
    import logging
    logger = logging.getLogger("springpython.remoting")
    loggingLevel = logging.DEBUG
    ch = logging.StreamHandler()
    formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
    # Initialize the database
    factory = MySQLConnectionFactory("user", "password",
                                     "localhost", "recipe64")
    dt = DatabaseTemplate(factory)
    sql = open("recipe62_network.mysql").read().split(";")
    for statement in sql:
        dt.execute(statement + ";")
  3. Add some code that creates an instance of the network management application and advertises it using Pyro and Spring Python.
    # Create an instance of the network management app
    target_service = EventCorrelator(factory)
    # Expose the original network app as a Pyro service
    unadvised_service = PyroServiceExporter()
    unadvised_service.service_name = "network"
    unadvised_service.service = target_service
  4. Add some more code that defines an interceptor that captures incoming event data along with a time stamp to disk.
    class Recorder(MethodInterceptor):
        An interceptor that catches each event,
        write it to disk, then proceeds to the
        network management app.
        def __init__(self):
            self.filename = "recipe64_data.txt"
            self.special_char = "&&&"
            if os.path.exists(self.filename):
        def invoke(self, invocation):
            # Write data to disk
            with open(self.filename, "a") as f:
                evt = invocation.args[0]
                now =
                output = (evt, now)
                print "Recording %s" % evt
    ", "&&&") + "
            # Now call the advised service
            return invocation.proceed()
  5. Add some code that wraps the network management application with the interceptor and advertises it using Pyro.
    # Wrap the network app with an interceptor
    advisor = ProxyFactoryObject() = target_service
    advisor.interceptors = [Recorder()]
    # Expose the advised network app as a Pyro service
    advised_service = PyroServiceExporter()
    advised_service.service_name = "network_advised"
    advised_service.service = advisor
  6. Start up the server app by typing python Notice in the following screenshot that there is both a network service and a network_advised service registered with Pyro.
  7. Run the live data simulator by typing python until it generates a few events, and then hit Ctrl+C to break out of it.
  8. Look at the server-side of things, and notice how it recorded several events.
  9. Inspect the recipe64_data.txt data file, noting how each line represents a separate event and time stamp. While it's hard to decipher the data stored in a pickled format, it's possible to spot bits and pieces.
  10. Create a script called that de-pickles each line of the data file.
    from springpython.remoting.pyro import *
    from datetime import datetime
    import pickle
    import time
    with open("recipe64_data.txt") as f:
        lines = f.readlines()
    events = [pickle.loads(line.replace("&&&", "
                                          for line in lines]
  11. Add a function that finds the time interval between the current event and the previous one.
    def calc_offset(evt, time_it_happened, previous_time):
        if previous_time is None:
            return time_it_happened - time_it_happened
            return time_it_happened - previous_time
  12. Define a client proxy to connect to the unadvised interface of our network management application.
    print "Sending events to live network app. Ctrl+C to exit..."
    proxy = PyroProxyFactory()
    proxy.service_url = "PYROLOC://"
  13. Add code that iterates over each event, calculating the difference, and then delaying the next event by that many seconds.
    previous_time = None
    for (e, time_it_happened) in events:
        diff = calc_offset(e, time_it_happened, previous_time)
        print "Original: %s Now: %s" % (time_it_happened,
        stored_event, is_active, updated_services, 
             updated_equipment = proxy.process(e)
        print "Stored event: %s" % stored_event
        print "Active? %s" % is_active
        print "Services updated: %s" % updated_services
        print "Equipment updated; %s" % updated_equipment
        print "Next event in %s seconds" % diff.seconds
        print "================"
        previous_time = time_it_happened
  14. Run the playback script by typing python and observe how it has the same delays as the original live data simulator.
How it works...

Normally, we would be recording data coming in from the live network. In this situation, we need a simulator that generates random data. The simulator we coded in this recipe is very similar to the one shown in the Coding a data simulator recipe.

To capture the data, we coded an interceptor that is embedded between Pyro and the network management application. Every event published to the network_advised Pyro service name seamlessly passes through this interceptor:

  • Each event that comes in is appended to the data file that was initialized when the interceptor was first created
  • The event is also stored with a copy of in order to capture a time stamp
  • The event and time stamp are combined into a tuple, and pickled, making it easy to write and later read back from disk
  • The data is pickled to make it easy to transfer to and from disk
  • After writing it to disk, the interceptor calls the target service and passes the results back to the original caller

Finally, we have a playback script that reads in the data file, one event per line. It de-pickles each line into the tuple format it was originally stored in, and builds a list of events.

The list of events is then scanned, one at a time. By comparing the current event's time stamp with the previous one, a difference in seconds is calculated in order to use Python's time.sleep() method to play the events back at the same rate they were recorded.

The playback script uses Pyro to send the events into the network management application. But it talks to a different exposure point. This is to avoid re-recording the same event.

There's more...

The code in this recipe uses Pyro as the mechanism connecting clients and servers communicate in a publish/subscribe paradigm. This isn't the only way to build such a service. Python has XML-RPC built in as well. It just isn't as flexible as Pyro. A more thorough analysis of real traffic is needed to determine if this interface is good enough. Alternatives include pushing events through a database EVENT table, where the client inserts rows, and the server polls the table for new rows, and then removes them as they are consumed.

This recipe also makes heavy use of Spring Python for its Aspect Oriented Programming features to insert the data recording code ( This provides a clean way to add the extra layer of functionality we need to sniff and record network traffic without having to touch the already built network management code.

I thought this recipe was about live data!

Well, the recipe is more about recording the live data and controlling the speed of playback. To capture this concept in a reusable recipe requires that we simulate the live system. But the fundamental concept of inserting a tap point in front of the network management processor, as we have done, is just as valid.

Is opening and closing a file for every event a good idea?

The recipe was coded to ensure that stopping the recording would have minimal risk of losing captured data not yet written to disk. Analysis of production data is required to determine the most efficient way of storing data. For example, it may take less I/O intensity to write data in batches of 10, or perhaps 100 events. But the risk is that data can be lost in similar bundles.

If the volume of traffic is low enough, writing each event one-by-one, as shown in this recipe, may not be a problem at all.

What about offloading the storage of data?

It is not uncommon to have the actual logic of opening the file, appending the data, and then closing the file contained in a separate class. This utility could then be injected into the interceptor we built. This may become important if some more elaborate means to storing or piping the data is needed. For example, another Pyro service may exist in another location that wants a copy of the live data feed.

Injecting the data consumer into the aspect we coded would give us more flexibility. In this recipe, we don't have such requirements, but it's not hard to imagine making such adjustments as new requirements arrive.

