Simplified grid computing

Cloud computing gives us enough power and technologies to do almost anything. Such a statement is true until we do not deal with real-world limits. As discussed in the Parallel programming section in Chapter 1, Performance Thoughts in, some overhead that limits the system's scalability always exists. This is Amdahl's law. Although this sentence is definitely true, we can avoid many such limitations. Grid computing is the art of parallelizing computation with a large number of systems.

Although specific frameworks or languages do exist, we will see how to create a small grid computing system from scratch in C#.

The first thing to do is have a huge dataset divided into smaller datasets. This improves scalability at the highest level. Once we have the data, we also have to bind each data portion with the related execution logic. By creating multiple messages, we can handle any business logic step. This fine granularity helps to achieve the highest scalability and adds great reliability because anything could eventually go wrong at any time. Such a message architecture will let the forgotten message flow to another available engine, which can resume from the last failed point. Indeed, designs that frequently save partial states refer to those messages as persistence points.

Let's imagine that we have to work for an automotive solution. Now, we need to handle millions of position packets per day, and for each packet, calculate the full street address if it is not already available in our system.

Different designs are available with increasing difficulty and eventually impacts performance results. Usually, the more scalable a design is, the more overload it could bring if executed in a small system. However, such overall overload is a necessary cost if we want to scale to hundreds of systems or more.

Simplified grid computing

A queue-based automotive solution for grid computing

The message structure is the main creation. Although the structure itself is quite the same, using different message names helps in making the business needs very clear. Otherwise, a single message name is also possible with the addition of a message status. The tree messages represent the tree computational stages: data input, reverse geocode, and data saving. Here is an example:

/// <summary>
/// stage 1
/// a position is found in history db
/// a street address search starts across streets database
/// </summary>
public class SearchForAddressMessage
{
    public SearchForAddressMessage()
    {
    }

    public SearchForAddressMessage(float latitude, float longitude)
    {
        Latitude = (float)Math.Round(latitute, 4);
        Longitude = (float)Math.Round(longitude, 4);
    }

    public float Latitude { get; set; }
    public float Longitude { get; set; }

    public override int GetHashCode()
    {
        return new { Latitude, Longitude }.GetHashCode();
    }
}

/// <summary>
/// stage 2
/// a new address need to be parsed from
/// found coordinates against a reverse-geocode service
/// </summary>
public class GeocodeNewAddressMessage : SearchForAddressMessage
{
}

/// <summary>
/// stage 3
/// a street address has been found and must
/// flow to the street database
/// </summary>
public class SaveNewAddressMessage : SearchForAddressMessage
{
    public string StreetAddress { get; set; }
}

Some helper methods increment productivity and make it easier to understand the code:

public static class QueueClientExtensions
{
    /// <summary>
    /// Helper method for sending any message
    /// to the queue with type name as ContentType
    /// </summary>
    public static void SendBody<T>(this QueueClient client, T arg)
    {
        client.Send(new BrokeredMessage(arg)
        {
            //by specifying the message type
            //the receiver will know what logic to execute
            //and how to deserialize the message body
            ContentType = arg.GetType().Name
        });
    }

    /// <summary>
    /// Helper method for sending any message
    /// to the queue with type name as ContentType
    /// </summary>
    public static void SendBody<T>(this QueueClient client, IEnumerable<T> args)
    {
        client.SendBatch(args.Select(arg => new BrokeredMessage(arg)
        {
            //by specifying the message type
            //the receiver will knows what logic to execute
            //and how to deserialize the message body
            ContentType = arg.GetType().Name
        }));
    }
}

To improve the realism of demonstration, I read thousands of positions from a CSV file I created with pseudo-random values. The CSV file only contains two columns: latitude and longitude. Here's an example of how to read it:

//the queue client needs connection string and queue name
var queue = QueueClient.CreateFromConnectionString(
SERVICEBUS_CONNECTIONSTRING, QUEUE_NAME);

//emulate data packet input
Task.Factory.StartNew(() =>
    {
        //a csv file with positions
        var fname = @"C:Temppositions_export.csv";

        //skip line nr 1 containing column names
        //take only 100 items for testing purpose
        //split string for semicolon char
        var positions = File.ReadAllLines(fname).Skip(1).Take(100)
            .Select(row => row.Split(';'))
            //parse csv data as "latitude;longitude"
            .Select(x => new { Latitude = float.Parse(x[0]), Longitude = float.Parse(x[1]) })
            //avoid unnecessary duplications
            .Distinct()
            //create stage 1 messages
            .Select(x => new SearchForAddressMessage(x.Latitude, x.Longitude));

        //upload all messages to the queue
        queue.SendBody(positions);
    });

Each process step will send a message to the following one throughout a queued message. Based on those steps, engines will execute their internal logic without interfering with other engines, because each one will handle only a single message per instance.

Now, start the sequence of sending messages to the queue and retrieving them from the queue that acts as a persistence system for temporary data, adding failover logic and the asynchronous distributed programming feature. The queue will automatically send a message to another engine if one will not confirm that it has been successfully handled. Such logic execution is a type of milestone-based programming.

Those small persistence messages are often referred to as persistence points when used in other technologies, the Microsoft Biz Talk Server has a similar design. For this example, a simple database with the following table structure has been created:

CREATE TABLE [dbo].[StreetAddress](
  [Latitude] [real] NOT NULL,
  [Longitude] [real] NOT NULL,
  [Position] [geography] NOT NULL,
  [FullAddress] [varchar](250) NOT NULL,
 CONSTRAINT [PK_StreetAddress] PRIMARY KEY CLUSTERED
(
  [Latitude] ASC,
  [Longitude] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]

All grid engines will execute an always-running logic that reads messages from the queue, and then checks the message content type name to route the message to the right stage logic.

while (true)
{
    //try dequeue some message

    var msg = queue.Receive(TimeSpan.FromSeconds(1));
    if (msg == null)
        break;

//continue…

In the following example, the Stage 1 takes the message from the queue and checks if the address is present in our database. If not, a new message of Stage 2 is produced and sent to the queue:

if (msg.ContentType == "SearchForAddressMessage")
{
    //stage 1
    var body = msg.GetBody<SearchForAddressMessage>();

    // existence check for address
    using (var db = new StreetAddressDBEntities())
        if (!db.StreetAddress.Any(a => a.Latitude == body.Latitude && a.Longitude == body.Longitude))
            //if the address is unavailable in our database
            //a stage 2 message is sent to the queue
            //to trigger the reverse geocode logic
            queue.SendBody(new GeodoceNewAddressMessage
            {
                Latitude = body.Latitude,
                Longitude = body.Longitude
            });

    //signal the message as completed
    //so it will be deleted by the queue
    msg.Complete();
}

Once a message arrives at Stage 2, a reverse geocode request is sent to a Geographic Information System (GIS), such as Bing Maps. Once the street address is found, we will enqueue (add to the queue) a new message for Stage 3:

else if (msg.ContentType == "GeodoceNewAddressMessage")
{
    Console.WriteLine("# {0} -> Found GeodoceNewAddressMessage", i);

    //stage 2
    var body = msg.GetBody<GeodoceNewAddressMessage>();

    //a request for reverse-geocode is available
    //a WS is available for Bing Maps here
    //http://dev.virtualearth.net/webservices/v1/geocodeservice/geocodeservice.svc

    //the found street
    string street = null;

    using (var client = new GeocodeServiceClient("BasicHttpBinding_IGeocodeService"))
        try
        {
            //ask bing for the street address
            var response = client.ReverseGeocode(new ReverseGeocodeRequest
            {
                //bing credential key
                Credentials = new Credentials { ApplicationId = BING_KEY },
                //given position
                Location = new Location { Latitude = body.Latitude, Longitude = body.Longitude }
            }).Results.FirstOrDefault();

            if (response != null)
                street = response.Address.FormattedAddress;
        }
        catch (Exception ex)
        {
        }

    //if reverse geocode succeded
    if (street != null)
        //send a message for saving position to the street db
        queue.SendBody(new SaveNewAddressMessage
            {
                Latitude = body.Latitude,
                Longitude = body.Longitude,
                StreetAddress = street,
            });
    else
        Debug.WriteLine(string.Format("No street found for {0} {1}", body.Latitude, body.Longitude));

    //signal the message as completed
    //so it will be deleted by the queue
    msg.Complete();
}

Once a message reaches Stage 3, we will save the newly found street address to our database for future data caching and any other need:

else if (msg.ContentType == "SaveNewAddressMessage")
{
    Console.WriteLine("# {0} -> Found SaveNewAddressMessage", i);

    //stage 3
    var body = msg.GetBody<SaveNewAddressMessage>();

    using (var db = new StreetAddressDBEntities())
    using (var tr = db.Database.BeginTransaction())
    {
        //always make a double check
        if (!db.StreetAddress.Any(a => a.Latitude == body.Latitude && a.Longitude == body.Longitude))
            db.StreetAddress.Add(new StreetAddress
                {
                    Latitude = body.Latitude,
                    Longitude = body.Longitude,
                    FullAddress = body.StreetAddress,
                });

        db.SaveChanges();
        tr.Commit();
    }

    //signal the message as completed
    //so it will be deleted by the queue
    msg.Complete();
}

This solution, although a simple prototype, could even handle millions of messages.

Further optimizations could split the single queue into multiple queues, one per stage or business logic. Often, a grid system executes different logic altogether or the same logic with different versions.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.227.9