Chapter 15. Migrating and Integrating

Now it’s time to recap what you’ve learned from a different angle: bringing Cassandra into your existing enterprise architecture. First, we’ll apply your knowledge to the task of migrating a relational application to Cassandra, including adapting data models and application code. While the focus is migration from relational databases to Cassandra, the principles apply to migration from other database types as well. We’ll finish up by taking a look at tools and integrations for getting data into and out of Cassandra, and searching and analyzing data stored in Cassandra clusters.

Knowing when to migrate

The first consideration is how to know when you need to migrate an application or use case to Cassandra. A clear indication is when you encounter one or more of the challenges of relational databases highlighted in Chapter 1:

  • Poor performance due to volume and complexity of queries

  • Challenges scaling beyond a single database node

  • Availability risk due to single node or single region deployments

  • High licensing costs associated with sophisticated multi-node solutions

  • High software maintenance cost due to complex queries and stored procedures

  • Limited ability to deploy in hybrid- or multi-cloud architectures

You don’t have to have all of these challenges in order to start looking at Cassandra, but they certainly indicate where Cassandra’s decentralized design, elastic scalability, flexible deployment, high availability, tuneable consistency and high performance could be a great fit for your application, as you learned in Chapter 2.

So, how will you approach a migration project from a legacy database such as a RDBMS to Cassandra? The history of IT is littered with overly ambitious projects that failed by attempting to deliver too much functionality at once. To mitigate this risk, we recommend making your transition incrementally, perhaps one or two use cases at a time.

Adapting the data model

Suppose your task is to migrate a hotel reservation system to a new cloud-based system. You’ll want to start by adapting your data model. Building on the relational data model for the hotel domain introduced in Chapter 5, Figure 15-1 designates primary keys (PK) and foreign keys (FK) as well as the multiplicity of each relationship:

cdg3 1501
Figure 15-1. Reservation Relational Model

Considering how you might translate this relational model to Cassandra, there are two main approaches: direct translation and indirect translation. In indirect translation, you reverse engineer existing relational data models to produce a conceptual model, and analyze the workflows of the applications that exercise those data models to identify access patterns. As shown in Figure 15-2, the reverse engineered conceptual data model and access patterns become inputs to the familiar Cassandra data modeling process introduced in Chapter 5. That process begins with creating a conceptual data model and identifying access patterns, and proceeds to creating logical data models and physical data models and eventually schema described in CQL.

cdg3 1502
Figure 15-2. Indirect Translation Process

Since you’ve already learned this Cassandra data modeling process in Chapter 5, let’s focus on direct translation, in which you translate directly from relational data models to Cassandra. In this method, the focus is on translating the entities and relationships from the source data models into Cassandra tables using patterns that we’ll introduce in the following sections. As in the Cassandra data modeling approach, you will still want to review your table designs against access patterns to make sure you’re identifying all the required queries.

Translating Entities

First, let’s consider patterns for mapping entities to Cassandra tables. The relational Hotel table shown at the top of Figure 15-3 is an example entity table. Entities in this table might be accessed by an existing application by the relational key HotelID, and so the first pattern is to create a hotels table in your Cassandra data model using a similar key design.

Over time, the legacy application likely identified the need to locate hotels by name, phone number or other attributes, and my have created one or more indexes on relational tables to support these queries:

/* SQL */
CREATE INDEX idx_name ON Hotels (Name);
SELECT * FROM Hotels WHERE Name='My Hotel' ;
cdg3 1503
Figure 15-3. Entity Translation

This leads to the second pattern, which is to create denormalized Cassandra tables representing the different search combinations. For example the HotelsByName table uses the name column as the partition key, and the hotel_id as a clustering column as a safeguard to ensure a unique row should another hotel have the same name. As you first learned in Chapter 4, the partition key and clustering column together make up a Cassandra primary key.

Use Secondary Indexes and Materialized Views Carefully

As you learned in Chapter 7, Cassandra does provide capabilities as an alternative to denormalization that those with a relational background will find familiar: secondary indexes and materialized views. Cassandra’s pluggable secondary index capability allows you to create indexes on columns not included in a table’s primary key, with multiple index implementations available. Remember that queries based on indexes involve a larger number of nodes and therefore do not scale as well as other queries. You’ll want to stress test any intended usage using the techniques identified in Chapter 13.

Materialized views allow you to offload the work of maintaining multiple denormalized tables to Cassandra. There are some restrictions on views you can create, and this is still considered an experimental feature as of the 4.0 release.

The third pattern shown in Figure 15-3 involves the representation of complex types in Cassandra tables. While the type of the Address column in the SQL Hotels table has been left unspecified, it could be represented as a string (varchar) or user defined type depending on the the SQL variant in use. In Cassandra it would be natural to use UDTs to describe a structure such as the Address type that can be referenced by multiple tables within a keyspace.

Translating Relationships

Next, consider that relationships between entities are frequently modeled as tables in relational models; these are also known as join tables. The RoomToAmenity table shown in Figure 15-4 is one such join table that describes the relationship between a hotel room in the Room table and an amenity that room offers to its guests in the Amenity table. This design provides a common definition of an amenity that could apply to rooms across many different hotels.

Note that while the RoomToAmenity table has no additional attributes beyond the RoomID and AmenityID that serve as foreign keys into the Room and Amenity tables, respectively, join tables may contain additional columns. For example, the Reservation table represents a more complex relationship between a hotel room and a guest who is reserving the room.

cdg3 1504
Figure 15-4. Relationship Translation

The first pattern for translating relationships between entities is to map the relationship to a table. The amenities_by_room table is an example of this approach. Such a table could be used alongside entity tables for amenities and rooms.

The second pattern is to collapse one of the entity types as a user defined type (UDT) within a containing entity table. For example, consider the design shown to the lower right of Figure 15-4. In this design, the rooms_by_hotel table contains a set of the amenity UDT.

Note that nothing prevents you from using both of these approaches in the same data model. You can use the intended access patterns of your application to determine if either or both would be appropriate. For example, the second design pattern would be appropriate if your application needs to retrieve information about hotel rooms and their amenities - this design allows that information to be retrieved in a single query. You’ll want to balance this against the effort required to update amenities for each room when they change.

Whether you choose a direct or indirect translation approach, the resulting models should be largely the same, especially if you are evaluating your proposed designs against the queries needed by your application.

Adapting the application

After updating your data model, the next step is to adapt (or create) the application code. You might identify inventory and reservation processing as the use cases to begin with, due to their impact on scalability.

You might then choose to use the microservice architecture style for the new implementation. You could identify and design a Reservation Service using the techniques discussed in Chapter 7 and assign responsibility for reservation data and associated Cassandra tables to it. One approach to migration toward a microservice architecture is to use a technique known as the strangler pattern in which capabilities of the legacy system are gradually replaced one at a time by microservice implementations. The legacy system remains operational until all of its capabilities have been replaced, whereupon it can be decommissioned.

Figure 15-5 shows an early stage in this process, in which clients are first modified to talk to an API layer that abstracts access to the legacy application, either by emulating its API or by providing a modern API such as a REST or GraphQL. The API layer can delegate reservation-related requests to the Reservation Service while continuing to direct other requests to the legacy application.

cdg3 1505
Figure 15-5. Application Migration

The Reservation Service maintains its own data store based on Cassandra. Depending on the complexity of the design, the Reservation Service may require some coordination with the legacy application, but your goal should be to isolate the database level as much as possible. We’ll discuss some approaches for message-based interaction below.

Refactoring Data Access

You’ll recall we presented a design for the Reservation Service in Chapter 7 for a Java-based implementation. The view in Figure 15-6 is a more abstract view that highlights our recommendation to separate out layers within each microservice implementation. Provide one or more API endpoints using REST or GraphQL, centralize business logic such as validation and business processes, and use the Data Access Objects pattern to abstract the details of interactions with a specific database.

cdg3 1506
Figure 15-6. Data Accesss Object Pattern

Using an object mapper as provided by the DataStax Java Driver is one way to implement a DAO layer.

Maintaining Consistency

As you write or update data access code, you’ll need to consider the consistency needs of your application. Developers familiar with relational technology are accustomed to using transactions to accomplish writes to multiple related tables and often have concerns related to consistency in writing and reading data, including:

  • “I’ve heard Cassandra is “eventually consistent”. How can I make sure that I can read data immediately after it is written?”

  • “How can I avoid race conditions when inserting or updating a row, or maintain consistency across writes to multiple tables without ACID transactions?”

  • “How can I efficiently read data from multiple tables without joins?”

As you’ve learned in this book, Cassandra provides several mechanisms that allow us to gain a bit more control over the consistency of our reads and writes. Let’s quickly review them here.

Configuring consistency levels to achieve strong consistency

Let’s recap how you can use Cassandra’s tuneable consistency to achieve the level of consistency you need. Cassandra allows you to specify a replication strategy at the keyspace level which includes the number of replicas of your data that will be stored per data center. You specify a consistency level on each CQL statement that describes how many of those nodes must respond; typically this includes setting a default consistency level in the driver you’re using and overriding on individual statements as needed.

We introduced the available consistency levels in Chapter 9 and discussed how you can achieve strong consistency (that is, the ability to guarantee that a read gets the most recently written data) using QUORUM or LOCAL_QUORUM consistency level for both reads and writes. If your use case doesn’t require this level of consistency, you can use lower consistency levels such as ONE or LOCAL_ONE to increase write throughput.

Using batches to coordinate writes to multiple tables

New Cassandra users accustomed to relational databases are often uncomfortable with the idea of storing multiple copies of data in denormalized tables. Typically users become comfortable with the idea that storage is relatively cheap in modern cloud architectures and are less concerned with these additional copies than with how to ensure data is written consistently across these different tables.

Cassandra provides a BATCH query that allows you to group mutations to multiple tables in a single query. You can include CQL INSERT, UPDATE, and even DELETE statements in a batch. The guarantee of a batch is that if all the statements are valid CQL, once any of the statements completes successfully, the coordinator will continue to work in the background to make sure that all the statements are executed successfully, using mechanisms such as “Hinted Handoff” where needed.

You’ll want to keep in mind the amount of data that is in a batch. Thankfully, Cassandra provides a configurable threshold batch_size_warn_threshold_in_kb property that you can use to detect when clients are submitting large batches, as discussed in Chapter 9 and Chapter 11.

Using lightweight transactions for exclusivity and uniqueness

One of the first things relational users learn about Cassandra is that it does not support transactions with ACID semantics due to the challenges of implementing the required locking in a distributed system. However, Cassandra provides a more limited capability called a lightweight transaction that is scoped to a single partition; a small number of nodes are involved in the lightweight transaction.

As you learned in Chapter 9, Cassandra provides two forms of lightweight transactions: one for guaranteeing unique rows, and one for implementing check-and-set style operations. You can use the IF NOT EXISTS syntax on an INSERT statement to make sure a row with the same primary key already does not already exist. For example, when inserting into the reservations_by_confirmation table, you can use this syntax to ensure the confirmation number is unique. You use the IF <conditions> syntax to ensure that one or more values satisfy the conditions you supply before performing an UPDATE, for example, making sure that an available inventory count matches your expected value before decrementing it.

Using denormalization to avoid joins

Working around Cassandra’s lack of joins actually begins back in data modeling, prior to application development. You saw an example of this in the design of the amenities_by_room table, which is intended to allow the retrieval of information about a hotel room and its amenities in a single query. This avoids the need for a join on rooms and amenities tables.

There may be cases where you can’t anticipate the joins that will be needed in the future. In microservice architectures, separate data types may be owned by different services with their own data stores, meaning that you wouldn’t have been able to join the data in any case. In both of these situations you’ll most likely end up implementing application level joins. The emergence of GraphQL as a standard for interfaces has helped application level joins feel less threatening. We’ll address more complex analytics queries in “Analyzing data with Apache Spark”.

Migrating Stored Procedures

A final aspect you’ll want to consider in migrating an application from a relational database is whether some of the business logic might actually be implemented within the database as stored procedures. Many legacy applications make use of stored procedures for reasons including: the desire to promote reuse of common queries, an attempt to achieve some performance benefit, or even because a DBA tasked with helping application developers write queries wanted to abstract some complexity away. The benefits of stored procedures are often traded against reduced application portability and maintainability, as there may be different tools and processes required to deploy, monitor, and debug the stored procedure.

Cassandra 2.2 introduced two features that will look familiar to those looking for stored procedures: user-defined functions (UDFs) and user-defined aggregates (UDAs) allow clients to shift some processing to the coordinator node. Using these features can improve performance in some situations by reducing the amount of data that has to be returned to the client and reducing processing load on the client, at the cost of additional processing on the server.

User-defined functions

UDFs are functions that are applied on Cassandra nodes to stored data as part of query processing. Before using UDFs in your cluster, enable them in the cassandra.yaml file on each node:

enable_user_defined_functions: true

Here’s a quick summary of how this works: create a UDF using the CQL CREATE FUNCTION command, which causes the function to be propagated to every node in the cluster. When you execute a query that references the UDF, it is applied to each row of the query result.

Let’s create an example UDF to count the number of available rooms in our available_rooms_by_hotel_date table:

cqlsh:hotel> CREATE FUNCTION count_if_true(input boolean)
  RETURNS NULL ON NULL INPUT
  RETURNS int
  LANGUAGE java AS 'if (input) return 1; else return 0;';

We’ll dissect this command a bit at a time. We’ve created a UDF named count_if_true which operates on a boolean parameter and returns an integer. We’ve also included a null check to make sure the function works effectively just in case the value is not defined. Note that if a UDF fails, execution of the query is aborted, so this can be an important check.

UDF Security

The 3.0 release added a security feature to run UDF code in a separate sandbox so that malicious functions can’t gain unauthorized access to a node’s Java runtime.

Next, note that we’ve declared this to be a Java implementation with the LANGUAGE clause. Cassandra natively supports functions and aggregates defined in Java and JavaScript. They can also be implemented using any language supported by the Java Scripting API specified in JSR 223, including Python, Ruby, and Scala. Functions defined in these languages require adding additional scripting engine JAR files to Cassandra’s Java CLASSPATH.

Finally, we include the actual Java syntax of the function with the AS clause. Now this function is somewhat trivial by itself, because all we’re doing is counting true values as 1. We’ll do something more powerful with this UDF in a bit.

First, however, try your UDF out on the available_rooms_by_hotel_date table to see how it works:

cqlsh:hotel> SELECT room_number, count_if_true(is_available)
  FROM available_rooms_by_hotel_date
  WHERE hotel_id='AZ123' and date='2016-01-05';

 room_number | hotel.count_if_true(is_available)
-------------+-----------------------------------
         101 |                                 1
         102 |                                 1
         103 |                                 1
         104 |                                 1
         105 |                                 1
(5 rows)

As you can see, the column with the function result is qualified with the hotel keyspace name. This is because each UDF is associated with a specific keyspace. If you were to execute a similar query in the DataStax Java Driver, you would find a Column in each Row with the name hotel_count_if_true_is_available.

User-defined aggregates

As we’ve just learned, user-defined functions operate on a single row. In order to perform operations across multiple rows, we create a user-defined aggregate. The UDA leverages two UDFs: a state function and an optional final function. A state function is executed against every row, while the final function, if present, operates on the results of the state function.

Let’s look at a simple example to help investigate how this works. First, you’ll need a state function. The count_if_true function is close, but you need to make a small change to allow the available count to be summed across multiple rows. Let’s create a new function that allows a running total to be passed in, incremented and returned:

cqlsh:hotel> CREATE FUNCTION state_count_if_true(total int, input boolean)
  RETURNS NULL ON NULL INPUT
  RETURNS int
  LANGUAGE java AS 'if (input) return total+1; else return total;';

Note that the total parameter is passed as the first parameter, with its type matching the return type of the function (int). For a UDF to be used as a state function, the first parameter type and return types must match. The second parameter is the boolean returned by the count_if_true UDF.

Now we can create an aggregate that uses this state function:

cqlsh:hotel> CREATE AGGREGATE total_available (boolean)
  SFUNC state_count_if_true
  STYPE int
  INITCOND 0;

Let’s break down this statement piece by piece: first, you’ve declared a UDA called total_available, which operates on columns of type boolean.

The SFUNC clause identifies the state function used by this query—in this case, state_count_if_true.

Next, you identify the type that is used to accumulate results from the state function by the STYPE clause. Cassandra maintains a value of this type, which it passes to the state function as it is called on each successive row. The STYPE must be the same as the first parameter and return type of the state function. The INITCOND clause allows us to set the initial value of the result; here, you set the initial count to zero.

In this case, we’ve chosen to omit the final function, but we could have included a function that took an argument of the STYPE and returned any other type, such as a function that accepts an integer argument and returns a boolean indicating if the inventory is at a low level that should generate an alert.

Now let’s use our aggregate to get a count of available rooms returned by one of our previous queries. Note that our query must only include the UDA, with no other columns or functions:

cqlsh:hotel> SELECT total_available(is_available)
  FROM available_rooms_by_hotel_date
  WHERE hotel_id='AZ123' and date='2016-01-05';

 hotel.total_available(is_available)
-------------------------------------
                                   5

(1 rows)

As you can see, this query yields a result of five available rooms for the specified hotel and date.

Additional UDF/UDA Command Options

You can use the familiar IF NOT EXISTS syntax when creating UDFs and UDAs to avoid error messages for attempts to create functions and aggregates with duplicate signatures. Alternatively, you can use the CREATE OR REPLACE syntax when you actually intend to override the current function or aggregate.

Use the DESCRIBE FUNCTIONS command or the DESCRIBE AGGREGATES command to learn which UDFs and UDAs have been defined already. This can be especially useful when there are functions with the same name but different signatures.

Finally, you can delete UDFs and UDAs using the DROP FUNCTION and DROP AGGREGATE commands.

Built-in functions and aggregates

In addition to user-defined functions and aggregates, Cassandra also provides some built-in, or native, functions and aggregates:

COUNT

The COUNT function counts the number of rows returned by a query. For example, to count the number of hotels in the database:

SELECT COUNT(*) FROM hotel.hotels;

This command can also count the number of non-null values of a specified column. For example, the following could be used to count how many guests provided an email address:

SELECT COUNT(emails) FROM reservation.guests;
MIN and MAX

The MIN and MAX functions compute the minimum and maximum value returned by a query for a given column. For example, this query could be used to determine the minimum and maximum stay lengths (in nights) for reservations at a given hotel and arrival date:

SELECT MIN(nights), MAX(nights) FROM reservations_by_hotel_date
  WHERE hotel_id='AZ123' AND start_date='2016-09-09';
sum

The sum function calculates the sum of the values returned by a query for a given column. You could sum the number of nights to be stayed across multiple reservations as follows:

SELECT SUM(nights) FROM reservations_by_hotel_date
  WHERE hotel_id='AZ123' AND start_date='2016-09-09';
avg

The avg function computes the average of all the values returned by a query for a given column. To get the average stay length in nights, you might execute:

SELECT AVG(nights) FROM reservations_by_hotel_date
  WHERE hotel_id='AZ123' AND start_date='2016-09-09';

These built-in aggregates are technically part of the system keyspace. Therefore, the column name containing results of our last query would be system_avg_nights.

Managing UDF/UDA scope

When migrating an application to Cassandra, it might seem a natural fit to convert each stored procedure into a Cassandra equivalent. That might or might not be a good case. A good rule of thumb is to avoid using stored procedures to implement business processes, data transformation or validation. It’s best to confine their usage to very basic analytical and statistical tasks like counting numbers of records meeting a particular criteria, or calculating sums, averages or other mathematical operators across multiple records.

Planning the deployment

Along with adapting your data model and application code, planning your deployment is an important step in migrating from your existing database to Cassandra. You’ve learned many things throughout the course of this book that will help you in these steps:

Planning your cluster topology

As you learned in Chapter 10, your cluster topology will be primarily driven by the data centers in which you need to deploy your application. In addition to physical data centers, you’ll read below about some cases in which you may want to create additional logical Cassandra data centers within the same physical data centers.

Make sure to configure an appropriate replication strategy for each keyspace that includes the number of replicas you want per data center. The NetworkTopologyStrategy is almost always the right choice unless you are sure your cluster will never extend beyond a single data center. Remember to adjust replication strategies for Cassandra’s system keyspaces to reflect as well.

Sizing your cluster

You’ll want to size your cluster appropriately so that you have some headroom for growth without over-provisioning. To get an estimate of the data size you can expect for your various denormalized table designs, use the formulas described in Chapter 5.

You should also identify performance goals for your key queries including desired read and write throughput and maximum latencies. Your data size and performance requirements will help you identify the number of nodes you’ll need in each data center to store all your data and ensure response times are within your requirements. Use the stress testing tools and techniques described in Chapter 13 to gain confidence in your estimates.

Integration with your operational environment

Since Cassandra is just one part of your deployed system, you’ll likely have infrastructure in place for collecting metrics and aggregating log files. You can use what you learned in Chapter 11 to incorporate Cassandra’s metrics and logs into your overall monitoring platform.

You may also use scripts or an orchestration framework like Kubernetes in place for automated deployment and management of cloud applications and infrastructure. You can use what you learned in Chapter 12 to help manage your Cassandra clusters in keeping with your DevOps practices.

Setting your security posture

Your Cassandra clusters will become a key part of your overall enterprise security program, since they will be storing data which is likely of strategic business value and may have regulatory implications. You’ll want to take advantage of features you learned about in Chapter 14 including encryption of data in motion and at rest. Make sure your use of Cassandra’s authentication and authorization is integrated with any enterprise identity management capability you may have in place. Strive to create Cassandra users or roles for specific administrators, microservices, or applications that map to the fewest privileges required to do their work.

Migrating data

Once you’ve planned and deployed a cluster, you’re ready to begin moving your application and its data. There are multiple techniques you can use for data migration. These techniques are useful not only when you are migrating applications in production, but in other cases such as loading test data into a cluster, or when you need to add or modify tables (a process often known as schema migration).

Zero Downtime Migration

Depending on your business requirements, you may need to transition from your current database to Cassandra without taking your systems offline. A common integration design pattern used to perform zero-downtime migrations is to deploy a special version of an application that performs writes to both the old database and the new database. This dual write technique, shown in Figure 15-7, is usually leveraged in combination with an initial data load using a bulk loading tool.

cdg3 1507
Figure 15-7. Dual Write Example

To execute the data migration, you first deploy the application version performing dual writes in order to capture new data, then migrate existing data using one of the bulk loading approaches discussed below. If there is a failure during the migration and you need to roll it back, the legacy database will still have all the data. Once the data migration is complete, you disable writes to the legacy database and perhaps decommission the legacy application.

This approach could be combined with the application migration example above, in which the API layer or another intermediate service performs the dual writes.

As an alternate approach to dual writes, you could enable the change data capture (CDC) capabilities of your legacy database and configure or write a component to consume the CDC events and push them into Cassandra. You’ll see one example for how to do this below in “Managing data flow with Apache Kafka”.

Bulk Loading

In using Cassandra, you’ll often find it useful to be able to load data into a cluster, whether test data or data used by your application. Fortunately, there are a couple of easy ways to bulk load formatted data to and from Cassandra.

The cqlsh COPY command

As you learned in Chapter 9, cqlsh supports loading and unloading of comma-separated variable (CSV) files via the COPY command. For example, you could use the following command to save the contents of the reservations_by_confirmation table to a file:

cqlsh:hotel> COPY reservations_by_confirmation TO 'reservations.csv' WITH HEADER=TRUE;

The TO value specifies the file to write to, and the HEADER option to TRUE causes the column names to be printed in the output file. You could edit this file and read the contents back in with this command:

cqlsh:hotel> COPY reseravtions_by_confirmation FROM 'reservations.csv' WITH HEADER=true;

The COPY command supports other options to configure how quotes, escapes, and times are represented.

DataStax Bulk Loader (DSBulk)

The DataStax Bulk loader is similar to cqlsh COPY in that it provides a command line interface and inputs and outputs files, but with faster performance. DSBulk is available as a free download from the DataStax Downloads site. While the tool originally only supported unloading from open-source Cassandra clusters, in December 2019 DataStax updated the tool to support loading into open-source Cassandra clusters. DSBulk supports both JSON and CSV formats and can read input from a web URL or stdin, and output to stdout. It provides flexible formats for dates and times and handles user defined types as well.

Other useful features of DSBulk include its status logging and error handling. Instead of failing immediately on a bad input line (or worse, failing silently and continuing), the bad lines are output to a log file so you can examine them, fix them, and try loading again. You can also configure a threshold for the maximum number of failures encountered before the import aborts. The tool also calculates summary statistics on every run.

The design of DSBulk was inspired by the work of Brian Hess on Cassandra Loader and Brian has produced a series of blog posts with over 40 examples of how to use DSBulk for various use cases.

Apache Spark

While we’ll cover the integration in more detail below in “Analyzing data with Apache Spark”, it’s worth noting here that you can use Apache Spark to run distributed jobs that spread the work of reading from a source database and writing into Cassandra across multiple machines. Note that the source database could be another Cassandra cluster or even another table in the same cluster, which makes this is a popular approach for schema migration.

Validating Data Migration

No matter what tool you use to migrate data, you may want to have some checks in place to make sure all of your data has been moved correctly. The DSBulk error logging features are useful here, but you could also write Spark jobs to manually compare data between source and target databases a row at a time. If both your source and target systems are Cassandra-based, the Cassandra diff project provides a useful starting point.

Common Integrations

Whether you’re migrating an existing application to Cassandra or creating something entirely new, you’ll likely have other infrastructure that you need Cassandra to work alongside in order to get the most out of your data. This might already be in place, or you might be adding it for a new application. In this final section we’ll examine a few of the most popular integrations, many of which happen to be with other Apache Software Foundation projects with distributed architectures.

Managing data flow with Apache Kafka

Apache Kafka is a distributed streaming platform that is used to build real-time data pipelines and streaming applications. It supports a publish and subscribe style of messaging in which messages are published to topics in a key-value format. Similar to Cassandra, Kafka partitions data using the key and replicates data across multiple nodes, known as brokers in Kafka.

Cassandra and Kafka are used together frequently in microservice architectures as shown in Figure 15-8. Revisiting the Reservation Service design from Chapter 7, you can see one common interaction pattern. In this design, the Reservation Service receives an API request to perform some action such as creating, updating, or deleting a reservation. After persisting the change to Cassandra, the Reservation Service produces a message to a reservations topic in a Kafka cluster.

cdg3 1508
Figure 15-8. Cassandra Microservice Integration with Kafka

Other services consume the reservations topic and perform various actions in response to each message: for example, the Inventory Service updating inventory tables to mark the dates as reserved, or perhaps an Email Service sending an email thanking the guest for making a reservation. In this style of interaction, Cassandra and Kafka are not connected directly, but are used in a complementary fashion.

Kafka provides some storage capability for its topics and the ability to perform queries and joins via the KSQL query language. However, it does not provide all of the features of a database and is primarily suitable for short-term storage. In many cases involving larger data sets it will be appropriate to replicate data to a database for longer-term storage and more flexible querying. Kafka Connect is a pluggable framework for building reusable producers or consumers that connect Kafka topics to existing databases or applications. The DataStax Apache Kafka Connector is a sink connector that you deploy in Kafka Connect that will automatically take messages from Kafka topics and writes them to Cassandra or DataStax Enterprise. You could use the sink connector in a live migration of reservation data from another system as shown in Figure 15-9. Configure a source connector for the legacy system database, which will write data into Kafka topics, and the DataStax Apache Kafka Connector as a sink to write data to Cassandra.

cdg3 1509
Figure 15-9. Streaming Data into Cassandra with Kafka Connect

The connector uses a configuration file to map messages from Kafka topics to one or more Cassandra tables. This is useful for performing writes into multiple denormalized tables such as the various tables in the reservation keyspace. The connector supports multiple serialization formats for the source message including Avro and JSON, and the ability to set the CQL writetime and TTL attributes on writes to Cassandra. Because the DataStax Kafka Connector is built on the DataStax Java Driver, all the configuration options provided by the driver are available as well.

Searching with Apache Lucene, SOLR, and Elasticsearch

Even if you follow the best practices for Cassandra data modeling and design multiple denormalized tables to support different queries, you may encounter cases where you need more advanced search semantics than just querying data by a primary key. For example, you may require full text search features such as case insensitivity, substring search, or fuzzy mapping. Or you might have location data and need to perform geospatial queries such as finding all the hotels within a certain radius from a specific latitude/longitude.

Distributed search capability can be added to Cassandra via Apache Lucene, which provides an engine for distributed indexing and searching, and its subproject, Apache Solr, which adds REST and JSON APIs to the Lucene search engine. DataStax Enterprise Search provides an implementation of Cassandra’s pluggable secondary index interface. It maintains Lucene indexes on each node in the cluster and uses Solr’s APIs for implement searching. This integrated approach is more efficient than running a separate search cluster. Stratio has provided a plugin that uses a similar approach.

Elasticsearch is another popular open source search framework built on top of Apache Lucene. It supports multitenancy and provides Java and JSON over HTTP APIs. The Elassandra project provides a forked version of Elasticsearch that works as a secondary index implementation for Cassandra.

Analyzing data with Apache Spark

with Patrick McFadin

In a successful deployment of any application, you can expect your business partners to approach you with questions that require in-depth analysis of your data. There are many commercial analytics and business intelligence products that can ingest data from Cassandra, including Stream Analytix, Tableau, and Teradata. You can also use ETL tools such as Informatica, or Talend to extract data from your Cassandra clusters into a data lake or data warehouse for future analysis.

In this section, we’ll focus on the most popular open source analytics integration, Apache Spark. Spark is a data analytics framework that provides a massively parallel processing framework to enable simple API calls across large volumes of data. Originally developed in 2009 at UC Berkeley as an improvement to MapReduce, Spark was open sourced in 2010, and became an Apache project in 2014.

Unlike Apache Hadoop, which writes intermediate results to disk, the Spark core processing engine is designed to maximize memory usage while minimizing disk and network access. Spark uses streaming instead of batch-oriented processing to achieve processing speeds up to 100 times faster than Hadoop. In addition, Spark’s API is much simpler to use than Hadoop.

Spark provides multiple APIs for working with data at different levels of abstraction. The base level of data representation in Spark is the Resilient Distributed Dataset (RDD). The RDD is a description of the data to be processed, such as a file or data collection. Once an RDD is created, the data contained can be transformed with API calls as if all of the data were contained in a single machine. However, in reality, the RDD can span many nodes in the network by partitioning. Each partition can be operated on in parallel to produce a final result. The RDD supports the familiar map and reduce operations plus additional operations such as count, filter, union, and distinct. For a full list of transformations, see the Spark documentation.

Spark provides two additional APIs on top of RDDs: Datasets and DataFrames. A Dataset provides the functionality of an RDD and adds the ability to query data using Spark SQL. A DataFrame is a Dataset which is organized into named columns, similar to a table in relational databases or Cassandra. DataFrames can be constructed from structured data files, existing RDDs, tables in Hive, or external databases.

Spark provides API support in Java, Scala, Python and the R statistics language. In addition to the core engine, Spark includes further libraries for different types of processing requirements, including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.

Use cases for Spark with Cassandra

Apache Cassandra is a great choice for transactional workloads that require high scale and maximum availability. Apache Spark is a great choice for analyzing large volumes of data at scale. Combining the two enables many interesting use cases that exploit the power of both technologies.

An example use case is high-volume time-series data. A system for ingesting weather data from thousands of sensors with variable volume is a perfect fit for Cassandra. Once the data is collected, further analysis on data stored in Cassandra may be difficult given that the analytics capabilities available using CQL are limited. At this point, adding Spark to the solution will open many new uses for the collected data. For example, we can pre-build aggregations from the raw sensor data and store those results in Cassandra tables for use in frontend applications. This brings analytics closer to users without the need to run complex data warehouse queries at runtime.

Or consider the hotel application we’ve discussed throughout this book. You can use Spark to implement various analytic tasks on reservation and guest data, such as generating reports on revenue trends, or demographic analysis of anonymized guest records to determine where your company should build a new hotel.

One use case to avoid is using Spark-Cassandra integration as an alternative to a Hadoop workload. Cassandra is suited for transactional workloads at high volume and shouldn’t be considered as a data warehouse. When approaching a use case where both technologies might be needed, first apply Cassandra to solving a problem suited for Cassandra, such as those we discuss in Chapter 2. Then consider incorporating Spark as a way to analyze and enrich the data stored in Cassandra without the cost and complexity of extract, transform, and load (ETL) processing.

Deploying Spark with Cassandra

A Spark cluster consists of a Spark Cluster Manager and Spark Workers. Clients create SparkContext objects used to submit jobs to the Spark Cluster Manager, which distributes the work to the Spark Executors on each node in the Several Cluster Managers are available, including implementations for Apache Mesos, Hadoop YARN, and Kubernetes. There is also a Standalone Cluster Manager useful for test and development work on a single-node cluster.

Now let’s look at deploying Spark and Cassandra together. While it is possible to deploy Spark and Cassandra clusters independently, you can gain performance and efficiency by co-locating a Spark Worker on each Cassandra node in a data center, as shown in Figure 15-10. Because Cassandra places data per node based on token assignment, this existing data distribution can be used as an advantage to parallelize Spark jobs. This is the architecture used by DataStax Enterprise Analytics.

cdg3 1510
Figure 15-10. Topology of a Spark-Cassandra cluster

Here’s how this works: when a job is submitted to the Spark Cluster Manager, the Spark Workers on each node spawn Spark Executors to complete the work. Using the spark-cassandra-connector as a conduit, the data required for each job is sourced from the local node as much as possible. We’ll learn more about the connector momentarily.

Because each node contains a portion of the entire data in the cluster, each Spark Worker will only need to process that local subset of data: for example, a count action on a table. Each node will have a range of the table’s data. The count is calculated locally and then merged from every node to produce the total count.

This design maximizes data locality, resulting in improved throughput and lower resource utilization for analytic jobs. The Spark Executors only communicate over the network when data needs to be merged from other nodes. As cluster sizes get larger, the efficiency gains of this design are much more pronounced.

The spark-cassandra-connector

The spark-cassandra-connector is an open source project sponsored by DataStax on GitHub. The connector can be used by clients as a conduit to read and write data from Cassandra tables via Spark. The connector provides features including SQL queries and server-side filtering. The connector is implemented in Scala, but a Java API is available as well. API calls from the spark-cassandra-connector provide direct access to data in Cassandra in a context related to the underlying data. As Spark accesses data, the connector translates to and from Cassandra as the data source.

To start using the spark-cassandra-connector, you’ll need to download both the connector and Spark. Although a detailed installation guide is beyond our scope here, we’ll give a quick summary. For a more fulsome introduction, we suggest the O’Reilly book Spark: The Definitive Guide. You can download either a pre-built version of Spark, or build Spark yourself from the source. You can also download a preview version of DataStax Enterprise and enable analytics mode.

Let’s review the common API elements used for most Spark jobs accessing data in Cassandra. To get started, run the spark-shell, which is available in the bin directory of your Spark installation (or use the command dse spark for DataStax Enterprise). If you’re more comfortable with Python, you could also try the pyspark shell.

$ spark-shell
...
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _ / _ / _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_   version 2.2.3.4
      /_/

Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_232)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

To connect your Spark application to Cassandra, you will first need to create a SparkContext containing connection parameters. For example, to connect to a local standalone node, you would execute:

val conf = new SparkConf(true)
   .set("spark.cassandra.connection.host", "127.0.0.1")
   // Optionally, if security is enabled
   .set("cassandra.username", "cassandra")
   .set("cassandra.password", "cassandra")

val sc = new SparkContext(conf)

Establishing a connection between Cassandra and Spark is accomplished by creating a SparkConf configuration pointing to the running Cassandra cluster and Spark Cluster Manager. This example configuration shows how to connect to a local Cassandra node and Cluster Manager. You can also provide Cassandra login credentials if required, as discussed in Chapter 14.

Once the SparkContext is created, you can then operate on Cassandra data by creating an RDD representing a Cassandra table. For example, let’s create an RDD representing the reservations_by_hotel_date table from the reservation keyspace introduced in Chapter 5:

val rdd = sc.cassandraTable("reservation",
  "reservations_by_hotel_date")

Once you’ve created an RDD, you can perform transformations and actions on it. For example, to get the total number of reservations, create the following action to count every record in the table:

println("Number of reservations: " + rdd.count)

Because this is running as an analytics job in parallel with Cassandra, it is much more efficient than running a SELECT count(*) FROM reservations from cqlsh.

As the underlying structure of the RDD is a Cassandra table, you can use CQL to filter the data and select rows. In Cassandra, filter queries using native CQL require a partition key to be efficient, but that restriction is removed when running queries as Spark jobs.

For example, you might derive a use case to produce a report listing reservations by end date, so that each hotel can know who is checking out on a given day. In this example, end_date is not a partition key or clustering column, but you can scan the entire cluster’s data looking for reservations with a checkout date of September 8, 2016:

val rdd = sc.cassandraTable("reservation",
  "reservations_by_hotel_date")
  .select("hotel_id", "confirm_number")
  .where("end_date = ?", "2016-09-08")

// Invoke the action to run the spark job
rdd.toArray.foreach(println)

Finding and retrieving data is only half of the functionality available—you can also save data back to Cassandra. Traditionally, data in a transactional database would require extraction to a separate location in order to perform analytics. With the spark-cassandra-connector, you can extract data, transform in place, and save it directly back to a Cassandra table, eliminating the costly and error-prone ETL process. Saving data back to a Cassandra table is amazingly easy:

// Create a collection of guests with simple identifiers
val collection = sc.parallelize(Seq(("1", "Delaney", "McFadin"),
  ("2", "Quinn", "McFadin")))

// Save to the guests table
collection.saveToCassandra("reservation", "guests",
  SomeColumns("guest_id", "first_name", "last_name"))

This is a simple example, but the basic syntax applies to any data. A more advanced example would be to calculate the average daily revenue for a hotel and write the results to a new Cassandra table. In a sensor application, you might calculate high and low temperatures for a given day and write those results back out to Cassandra.

Querying data is not just limited to Spark APIs. With SparkSQL, you can use familiar SQL syntax to perform complex queries on data in Cassandra, including query options not available in CQL. It’s easy to create enhanced queries such as aggregations, ordering, and joins using the spark object which is automatically available to you in the spark-shell. For example, if you wanted to create a report for guests arriving on a certain date, you could perform a query such as:

scala> val result = spark.sql("
  SELECT * from reservation.reservations_by_confirmation
  JOIN reservation.guests
  ON reservations_by_confirmation.guest_id = guests.guest_id
  WHERE end_date = '2020-05-30'")

scala> result.collect.foreach(println)

The result returned from the spark.sql() operation is a Dataframe which you can manipulate using Dataframe APIs. Once you’ve developed queries using spark-shell, you’re ready to implement them in application code. If you’re building an application in Java or Scala and using Maven, you’ll want to add dependencies such as the following to your project’s pom.xml file to access the Spark core and connector:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.12</artifactId>
     <version>2.4.2</version>
</dependency>
<dependency>
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector</artifactId>
     <version>2.4.2</version>
</dependency>

We’ve just scratched the surface what you can accomplish with Cassandra and Spark. For example, you can use Cassandra as input to machine learning using Spark ML in order to gain additional insights from your data.

Summary

In this chapter, we’ve provided a roadmap for migrating applications to Cassandra and just scratched the surface of the many integration options available for Cassandra. Hopefully we’ve piqued your interest in the wide range of directions you can take your applications using Cassandra and related technologies.

And now we’ve come to the end of our journey together. If we’ve achieved our goal, you now have an in-depth understanding of the right problems to solve using Cassandra, and how to design, implement, deploy, and maintain successful applications.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.237.51.235