Chapter 3. Logs and Message Queues

“Everything is a file” is the powerful, unifying abstraction at the heart of *nix systems. It’s proved surprisingly flexible and effective as a metaphor for over 40 years. In a similar way, “everything is a log” is the powerful abstraction for streaming architectures.

Message queues provide ideal semantics for managing producers that write messages to queues and consumers that read them, thereby joining subsystems together with a level of indirection that provides decoupling. Implementations can provide durable message storage with tunable persistence characteristics and other benefits.

Let’s explore these two concepts, how they are different, their relative strengths and weaknesses, and a merger that provides the best of both worlds.

The Log Is the Core Abstraction

Logs have been used for a long time as a mechanism for services to output information about what they are doing, including implementation details and problems encountered, as well as application state transitions. Log entries may include a timestamp, a notion of “urgency” (e.g., error, warning, or informational), information about the process and/or machine, and an ad hoc text message with more details. The log entries may be written in space-separated text, JSON, or a binary format (useful for efficient transport and storage). Well-structured log entries at appropriate execution points are proxies for system events. The order of entries is significant, as it indicates event sequencing and state transitions. While we often associate logs with files, this is just one possible storage mechanism.

The metaphor of a log generalizes to a wide class of data streams, such as these examples:

Service logs

These are the logs that services write to capture implementation details as processing unfolds, especially when problems arise. These details may be invisible to users and not directly associated with the application’s logical state.

Write-ahead logs for database CRUD transactions

Each insert, update, and delete that changes state is an event. Many databases use a WAL (write-ahead log) internally to append such events durably and quickly to a filesystem before acknowledging the change to clients, after which time in-memory data structures and other, more permanent files are updated with the current state of the records. That way, if the database crashes after the WAL write completes, the WAL can be used to reconstruct and complete any in-flight transactions, once the database is running again.

Other state transitions

User web sessions and automated processes, such as manufacturing and chemical processing, are examples of systems that routinely transition from one state to another. Logs are a popular way to capture and propagate these state transitions so that downstream consumers can process them as they see fit.

Telemetry from IoT devices

Many widely deployed devices, including cars, phones, network routers, computers, airplane engines, medical devices, home automation devices, and kitchen appliances, are now capable of sending telemetry back to the manufacturer for analysis. Some of these devices also use remote services to implement their functionality, like location-aware and voice-recognition applications. Manufacturers use the telemetry to better understand how their products are used; to ensure compliance with licenses, laws, and regulations (e.g., obeying road speed limits); and for predictive maintenance, where anomalous behavior is modeled and detected that may indicate pending failures, so that proactive action can prevent service disruption.

Clickstreams

How do users interact with an application? Are there sections that are confusing or slow? Is the process of purchasing goods and services as streamlined as possible? Which application version leads to more purchases, A or B? Logging user activity allows for clickstream analysis.

Logs also enable two general architecture patterns that are popular in the microservice world: event sourcing and command-query responsibility segregation (CQRS).

To understand event sourcing, consider the database write-ahead log. It is a record of all changes (events) that have occurred. This log can be replayed (“sourced”) to reconstruct the state of the database at any point in time, even though the only state visible to queries in most databases is the latest snapshot in time. Hence, an event source provides the ability to replay history and can be used to reconstruct a lost database, to replicate one instance to additional copies, to apply new analytics, and more.

Incremental replication supports CQRS. Having a separate data store for writes (“commands”) versus reads (“queries”) enables each one to be tuned and scaled independently, according to its unique characteristics. For example, I might have a few, high-volume writers, but a large number of occasional readers. If the write database goes down, reading can continue, at least for a while. Similarly, if reading becomes unavailable, writes can continue. The trade-off is accepting eventually consistency, as the read data stores will lag behind the write data stores.1

Hence, an architecture with logs at the core is a flexible architecture for a wide spectrum of applications.

Message Queues and Integration

Traditional message queues are first-in, first-out (FIFO) data structures. The word “message” is used historically here; the data can be any kind of record, event, or the like. The ordering is often by time of arrival, similar to logs. Each message queue can represent a logical concept, allowing readers to focus on the messages they care about and not have to process all of the messages in the system. This also promotes scalability through parallelism, as the processing of each queue can be isolated in its own process or thread. Most implementations allow more than one writer to insert messages and more than one reader to extract them.

All this is a good way to organize and use logs, too. There’s a crucial difference in the reading semantics of logs versus queues, which means that queues and logs are not equivalent constructs.

For most message queue implementations, when a message is read, it is also deleted from the queue; that is, the message is “popped.” You might have multiple stateless readers for parallelism, such as a pool of workers, each of which pops a message, processes it, and then comes back for a new one, while the other workers are doing the same thing in parallel. However, having more than one reader means that none of them will see all the messages in the queue. That’s a disadvantage when we have multiple readers where each one does something different. We’ll return to this crucial point in a moment.

But first, let’s discuss a few real-world considerations for queues. To ensure that all messages are processed at least once, the queue may wait for acknowledgment from a reader before deleting a message, but this means that policies and enforcement mechanisms are required to handle concurrency cases such as when a second reader tries to pop (read) a message before the queue has received the acknowledgment from the first reader. Should the same message be given to the second reader, effectively implementing at least once behavior? (See “At Most Once. At Least Once. Exactly Once.”.) Or should the second reader be given the next message in the queue instead, while waiting for the acknowledgment for the first message? What happens if the acknowledgment for the first message is never received? How long should we wait? Presumably a timeout occurs and the first message must be made available again for a subsequent reader. But what happens if we want to process the messages in the queue’s original FIFO order? In this case the readers will need to coordinate to ensure proper ordering. Ugh…

Combining Logs and Queues

An implicit goal with logs is that all readers should be able to see the entire log, not just a subset of it. This is crucial for stateful processing of the log for different purposes. For example, for a given log, we might have one reader that updates a database write-ahead log, another that feeds a dashboard, and another that is used for training machine learning models, all of which need to see all entries. With a traditional message queue, we can only have one reader so it sees all messages, and it would have to support all these downstream processing scenarios.

Hence, a log system does not pop entries on reading. They may live forever or the system may provide some mechanism to delete old entries. The log system allows each reader to decide at which offset into the log reading should start, which supports reprocessing part of the log or restarting a failed process where it left off. The reader can then scan the entries, in order, at its own pace, up to the latest entry.

This means that the log system must track the current offset into the log for each reader. The log system may also support configurable at-most-once, at-least-once, or exactly-once semantics.

To get the benefits of message queues, the log system can support multiple logs, each working like a message queue where the entries focus on the same area of interest and typically have the same schema. This provides the same organizational benefits as classic message queues.

The Case for Apache Kafka

Apache Kafka implements the combined log and message queue features just described, providing the best of both models. The Kafka documentation describes it as “a distributed, partitioned, replicated commit log service.”

Kafka was invented at LinkedIn, where it matured into a highly reliable system with impressive scalability.4

Hence, Kafka is ideally suited as the backbone of fast data architectures.

Kafka uses the following terms for the concepts we’ve described in this chapter. I’ll use the term record from now on instead of entries, which I used for logs, and messages, which I used for message queues.

Topic

The analog of a message queue where records of the same “kind” (and usually the same schema) are written.

Partition

A way of splitting a topic into smaller sections for greater parallelism and capacity. While the topic is a logical grouping of records, it can be partitioned randomly or by hashing a key. Note that record order is only guaranteed for a partition, not the whole topic when it has more than one partition. This is often sufficient, as in many cases we just need to preserve ordering for messages with the same key, all of which will get hashed to the same partition. Partitions can be replicated across a Kafka cluster for greater resiliency and availability. For durability, each partition is written to a disk file and a record is not considered committed until it has been written to this file.

Producer

Kafka’s term for a writer.

Consumer

Kafka’s term for a reader. It is usually ideal to have one reader instance per partition. See the Kafka documentation for additional details.

Consumer group

A set of consumers that covers the partitions in a topic.

Kafka will delete blocks of records, oldest first, based either on a user-specified retention time (the time to live, or TTL, which defaults to seven days), a maximum number of bytes allowed in the topic (the default is unbounded), or both.

The normal case is for each consumer to walk through the partition records in order, but since the consumer controls which offset is read next, it can read the records in any order it wants. The consumer offsets are actually stored by Kafka itself, which makes it easier to restart a failed consumer where it left off.

A topic is a big buffer between producers and consumers. It effectively decouples them, providing many advantages. The big buffer means data loss is unlikely when there is one instance of a consumer for a particular logical function and it crashes. Producers can keep writing data to the topic while a new, replacement consumer instance is started, picking up where the last one left off.

Decoupling means it’s easy for the numbers of producers and consumers to vary independently, either for scalability or to integrate new application logic with the topic. This is much harder to do if producers and consumers have direct connections to each other, for example using sockets.

Finally, the producer and consumer APIs are simple and narrow. They expose a narrow abstraction that makes them easy to use and also effectively hides the implementation so that many scalability and resiliency features can be implemented behind the scenes. Having one universal way of connecting services like this is very appealing for architectural simplicity and developer productivity.

Alternatives to Kafka

You might have noticed in Chapter 2 that we showed five options for streaming engines and three for microservice frameworks, but only one log-oriented data backplane option, Kafka. In 1979, the only relational database in the world was Oracle, but of course many alternatives have come and gone since then. Similarly, Kafka is by far the most widely used system of its kind today, with a vibrant community and a bright future. Still, there are a few emerging alternatives you might consider, depending on your needs: Apache Pulsar, which originated at Yahoo! and is now developed by Streaml.io, and Pravega, developed by Dell EMC.

I don’t have the space here to compare these systems in detail, but to provide motivation for your own investigation, I’ll just mention two advantages of Pulsar compared to Kafka, at least as they exist today.

First, if you prefer a message queue system, one designed for big data loads, Pulsar is actually implemented as a queue system that also supports the log model.

Second, in Kafka, each partition is explicitly tied to one file on one physical disk, which means that the maximum possible partition size is bounded by the hard drive that stores it. This explicit mapping also complicates scaling a Kafka topic by splitting it into more partitions, because of the data movement to new files and possibly new disks that is required. It also makes scaling down, by consolidating partitions, sufficiently hard that it is almost never done. Because Pulsar treats a partition as an abstraction, decoupled from how the partition is actually stored, the Pulsar implementation is able to store partitions of unlimited size. Scaling up and down is much easier, too.

To be abundantly clear, I’m not arguing that Pulsar is better than Kafka. These two advantages may be of no real value to you, and there are many other pros and cons of these systems to consider.

When Should You Not Use a Log System?

Finally, all choices have disadvantages, including Kafka. Connecting two services through a Kafka topic has the disadvantages of extra overhead, including disk I/O to persist the log updates, and the latency between adding a record to the log and a consumer reading it, where there could be many other records ahead of your record in the log.

Put another way, sending a record from one service to another using a socket connection, such as REST, shared memory, or another IPC (interprocess communication) primitive will usually be faster and consume fewer system resources. You will give up all the advantages of Kafka, including decoupling of services and greater resiliency and flexibility.

So, use Kafka topics as the default choice, but if you have extremely tight latency requirements or lots of small services where messaging overhead would be a significant percentage of the overall compute time, consider which connections should happen without using Kafka. On the other hand, remember that premature optimization is the root of all evil.5

Now that we’ve made the case for a data backplane system like Kafka, let’s explore our options for processing this data with various streaming engines.

1 Jay Kreps doesn’t use the term CQRS, but he discusses the advantages and disadvantages in practical terms in his Radar post, “Why Local State Is a Fundamental Primitive in Stream Processing”.

2 See Tyler Treat’s blog post, “You Cannot Have Exactly-Once Delivery”.

3 You can always concoct a failure scenario where some data loss will occur.

4 In 2015, LinkedIn’s Kafka infrastructure surpassed 1.1 trillion messages per day, and it’s been growing since then.

5 Donald Knuth, “Structured Programming with Goto Statements,” Computing Surveys 6, no. 4 (1974): 261–301 (but possibly a rephrasing of an earlier quote from C. A. R. Hoare).

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.15.143.40