Chapter 7. Reading and Writing Data

Now that we understand the data model, we’ll look at the different kinds of queries you can perform in Cassandra to read and write data. In this chapter, we use Cassandra 0.6.7-beta1, which is the most recent release version at the time of this writing.

Query Differences Between RDBMS and Cassandra

There are several differences between Cassandra’s model and query methods and what’s available in RDBMS, and these are important to keep in mind.

No Update Query

There is no first-order concept of an update in Cassandra, meaning that there is no client query called an “update.” You can readily achieve the same effect, however, by simply performing an insert using an existing row key. If you issue an insert statement for a key that already exists, Cassandra will overwrite the values for any matching columns; if your query contains additional columns that don’t already exist for that row key, then the additional columns will be inserted. This is all seamless.

Record-Level Atomicity on Writes

Cassandra automatically gives you record-level atomicity on every write operation. In RDBMS, you would have to specify row-level locking. Although Cassandra offers atomicity at the column family level, it does not guarantee isolation.

No Server-Side Transaction Support

Because you need to denormalize tables to create secondary indexes, you might need to insert data into two or more tables (one for your primary table and one for your inverted or secondary index). This means issuing two insert statements. So if it is applicable for your use case, you’ll need to manually “roll back” writes by issuing a delete if one of the insert operations fails.

More to the point, perhaps, is that Cassandra doesn’t have transactions, because it just wasn’t built with that goal in mind.

No Duplicate Keys

It is possible in SQL databases to insert more than one row with identical values if you have not defined a unique primary key constraint on one of the columns. This is not possible in Cassandra. If you write a new record with a key that already exists in a column family, the values for any existing columns will be overwritten, and any columns that previously were not present for that row will be added to the row.

Basic Write Properties

There are a few basic properties of Cassandra’s write ability that are worth noting. First, writing data is very fast in Cassandra, because its design does not require performing disk reads or seeks. The memtables and SSTables save Cassandra from having to perform these operations on writes, which slow down many databases. All writes in Cassandra are append-only.

Because of the database commit log and hinted handoff design, the database is always writeable, and within a column family, writes are always atomic.

Consistency Levels

Cassandra’s tuneable consistency levels mean that you can specify in your queries how much consistency you require. A higher consistency level means that more nodes need to respond to the query, giving you more assurance that the values present on each replica are the same. If two nodes respond with different timestamps, the newest value wins, and that’s what will be returned to the client. In the background, Cassandra will then perform what’s called a read repair: it takes notice of the fact that one or more replicas responded to a query with an outdated value, and updates those replicas with the most current value so that they are all consistent.

There are several consistency levels that you can specify, and they mean something different for read operations than for write operations. The possible consistency levels, and the implications of specifying each one for read queries, are shown in Table 7-1.

Note

The consistency levels are based on the replication factor specified in the configuration file, not on the total number of nodes in the system.

Table 7-1. Read consistency levels
Consistency levelImplication
ZEROUnsupported. You cannot specify CL.ZERO for read operations because it doesn’t make sense. This would amount to saying “give me the data from no nodes.”
ANYUnsupported. Use CL.ONE instead.
ONEImmediately return the record held by the first node that responds to the query. A background thread is created to check that record against the same record on other replicas. If any are out of date, a read repair is then performed to sync them all to the most recent value.
QUORUMQuery all nodes. Once a majority of replicas ((replication factor / 2) + 1) respond, return to the client the value with the most recent timestamp. Then, if necessary, perform a read repair in the background on all remaining replicas.
ALLQuery all nodes. Wait for all nodes to respond, and return to the client the record with the most recent timestamp. Then, if necessary, perform a read repair in the background. If any nodes fail to respond, fail the read operation.

As you can see from the table, there are certain consistency levels that are not supported for read operations: ZERO and ANY. Notice that the implication of consistency level ONE is that the first node to respond to the read operation is the value that the client will get—even if it is out of date. The read repair operation is performed after the record is returned, so any subsequent reads will all have a consistent value, regardless of the responding node.

Another item worth noting is in the case of consistency level ALL. If you specify CL.ALL, then you’re saying that you require all replicas to respond, so if any node with that record is down or otherwise fails to respond before the timeout, the read operation fails.

Note

A node is considered unresponsive if it does not respond to a query before the value specified by rpc_timeout_in_ms in the configuration file. The default is 10 seconds.

You can specify these consistency levels for write operations as well, though their meanings are very different. The implications of using the different consistency levels on writes are shown in Table 7-2.

Table 7-2. Write consistency levels
Consistency levelImplication
ZEROThe write operation will return immediately to the client before the write is recorded; the write will happen asynchronously in a background thread, and there are no guarantees of success.
ANYEnsure that the value is written to a minimum of one node, allowing hints to count as a write.
ONEEnsure that the value is written to the commit log and memtable of at least one node before returning to the client.
QUORUMEnsure that the write was received by at least a majority of replicas ((replication factor / 2) + 1).
ALLEnsure that the number of nodes specified by replication factor received the write before returning to the client. If even one replica is unresponsive to the write operation, fail the operation.

The most notable consistency level for writes is the ANY level. This level means that the write is guaranteed to reach at least one node, but it allows a hint to count as a successful write. That is, if you perform a write operation and the node that the operation targets for that value is down, the server will make a note to itself, called a hint, which it will store until that node comes back up. Once the node is up, the server will detect this, look to see whether it has any writes that it saved for later in the form of a hint, and then write the value to the revived node. In many cases, the node that makes the hint actually isn’t the node that stores it; instead, it sends it off to one of the nonreplica neighbors of the node that is down.

Using the consistency level of ONE on writes means that the write operation will be written to both the commit log and the memtable. That means that writes at CL.ONE are durable, so this level is the minimum level to use to achieve fast performance and durability. If this node goes down immediately after the write operation, the value will have been written to the commit log, which can be replayed when the server is brought back up to ensure that it still has the value.

For both reads and writes, the consistency levels of ZERO, ANY, and ONE are considered weak, whereas QUORUM and ALL are considered strong. Consistency is tuneable in Cassandra because clients can specify the desired consistency level on both reads and writes. There is an equation that is popularly used to represent the way to achieve strong consistency in Cassandra: R + W > N = strong consistency. In this equation, R, W, and N are the read replica count, the write replica count, and the replication factor, respectively; all client reads will see the most recent write in this scenario, and you will have strong consistency.

Basic Read Properties

There are a few basic properties of reading data from Cassandra that are worth noting. First, it’s easy to read data because clients can connect to any node in the cluster to perform reads, without having to know whether a particular node acts as a replica for that data. If a client connects to a node that doesn’t have the data it’s trying to read, the node it’s connected to will act as coordinator node to read the data from a node that does have it, identified by token ranges.

To fulfill read operations, Cassandra does have to perform seeks, but you can speed these up by adding RAM. Adding RAM will help you if you find the OS doing a lot of paging on reads (in general, it is better to enable the various caches Cassandra has). Cassandra has to wait for a number of responses synchronously (based on consistency level and replication factor), and then perform read repairs as necessary.

So reads are clearly slower than writes, for these various reasons. The partitioner doesn’t influence the speed of reads. In the case of range queries, using OPP is significantly faster because it allows you to easily determine which nodes don’t have your data. The partitioner’s responsibility is to perform the consistent hash operation that maps keys to nodes. In addition, you can choose row caching and key caching strategies to give you a performance boost (see Chapter 11).

The API

This section presents an overview of the basic Cassandra API so that once we start reading and writing data, some of these exotic terms won’t seem quite so difficult. We already know what a column, super column, and column family are: a column family contains columns or super columns; a super column contains only columns (you can’t nest one super column inside another); and columns contain a name/value pair and a timestamp.

In a relational database, the terms SELECT, INSERT, UPDATE, and DELETE mean just what they mean colloquially, in regular life. But working with Cassandra’s API structures is not exactly straightforward, and can be somewhat daunting to newcomers; after all, there’s no such thing as a “slice range” in regular life, so these terms may take some getting used to.

There are two basic concepts that you’ll want to learn quickly: ranges and slices. Many queries are defined using these terms, and they can be a bit confusing at first.

Note

Columns are sorted by their type (as specified by CompareWith), and rows are sorted by their partitioner.

Ranges and Slices

A range basically refers to a mathematical range, where you have a set of ordered elements and you want to specify some subset of those elements by defining a start element and a finish element. The range is the representation of all the elements between start and finish, inclusive.

Ranges typically refer to ranges of keys (rows). The term slice is used to refer to a range of columns within a row.

The range works according to the column family’s comparator. That is, given columns a, b, c, d, and e, the range of (a,c) includes columns a, b, and c. So, if you have 1,000 columns with names that are long integers, you can see how you could easily specify a range of columns between 35 and 45. By using ranges, you can retrieve all of the columns in a range that you define (called a range slice), or you can perform the same update to the items in the range using a batch.

You may have many hundreds of columns defined on a row, but you might not want to retrieve all of them in a given query. Columns are stored in sorted order, so the range query is provided so that you can fetch columns within a range of column names.

Note

Range queries require using an OrderPreservingPartitioner, so that keys are returned in the order defined by the collation used by the partitioner.

When specifying a range query and using Random Partitioner, there’s really no way to specify a range more narrow than “all”. This is obviously an expensive proposition, because you might incur additional network operations. It can also potentially result in missed keys. That’s because it’s possible that an update happening at the same time as your row scan will miss the updates made earlier in the index than what you are currently processing.

There is another thing that can be confusing at first. When you are using Random Partitioner, you must recall that range queries first hash the keys. So if you are using a range of “Alice” to “Alison”, the query will first run a hash on each of those keys and return not simply the natural values between Alice and Alison, but rather the values between the hashes of those values.

Here is the basic flow of a read operation that looks for a specific key when using Random Partitioner. First, the key is hashed, and then the client connects to any node in the cluster. That node will route the request to the node with that key. The memtable is consulted to see whether your key is present; if it’s not found, then a scan is performed on the Bloom filter for each file, starting with the newest one. Once the key is found in the Bloom filter, it is used to consult the corresponding datafile and find the column values.

Setup and Inserting Data

We’ll look at inserts first, because you need to have something in the database to query. In this section we set up a Java project and walk through a complete example that does an insert and then reads the data back.

First, download Cassandra from http://cassandra.apache.org. It’s easiest to get started with the binary version. See Chapter 2 if you’re having any trouble.

Now let’s create a new project to test some of our work. For now, we’ll use a Java project in Eclipse. First, create a new project, and then add a few JARs to your classpath: a Log4J JAR to output log statements (don’t forget to set the properties file for your own classes); the Thrift library called libthrift-r917130.jar, which contains the org.apache.thrift classes; and the Cassandra JAR apache-cassandra-x.x.x.jar, which contains the org.apache.cassandra classes. We also need to add the SLF4J logger (both the API and the implementation JAR), which Cassandra requires. Finally, add an argument to your JVM to add the log4j.properties file to your classpath:

-Dlog4j.configuration=file:///home/eben/books/cassandra/log4j.properties

Note

In Eclipse, you can add the log4j.properties file by creating a new Run Configuration. Click the Arguments tab, and then in the VM Arguments text field, enter the parameter specified in the previous code sample—of course using the actual path to your properties file.

My log4j.properties file looks like this:

# output messages into a rolling log file as well as stdout
log4j.rootLogger=DEBUG,stdout,R

# stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p %d{HH:mm:ss,SSS} %m%n

# rolling log file
log4j.appender.R=org.apache.log4j.RollingFileAppender
log4j.appender.file.maxFileSize=5MB
log4j.appender.file.maxBackupIndex=5
log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%5p [%t] %d{ISO8601} %C %F (line %L) %m%n

# This points to your logs directory
log4j.appender.R.File=cass-client.log

To keep this simple, we’ll use the default keyspace and configuration. Now start the server, and create a class that looks like the one shown in Example 7-1. This example will open a connection to Cassandra and write a new row with two columns: name and age. We then read back a single column value for that row, and then read the entire row.

Example 7-1. SimpleWriteRead.java
package com.cassandraguide.rw;

import java.io.UnsupportedEncodingException;
import java.util.List;

import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.Clock;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.ColumnPath;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.NotFoundException;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
import org.apache.cassandra.thrift.TimedOutException;
import org.apache.cassandra.thrift.UnavailableException;
import org.apache.log4j.Logger;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;

public class SimpleWriteRead {

	private static final Logger LOG = Logger.getLogger(SimpleWriteRead.class);
	
	//set up some constants 
	private static final String UTF8 = "UTF8";
	private static final String HOST = "localhost";
	private static final int PORT = 9160;
	private static final ConsistencyLevel CL = ConsistencyLevel.ONE;

	//not paying attention to exceptions here
	public static void main(String[] args) throws UnsupportedEncodingException,
			InvalidRequestException, UnavailableException, TimedOutException,
			TException, NotFoundException {

		TTransport tr = new TSocket(HOST, PORT);
		//new default in 0.7 is framed transport
		TFramedTransport tf = new TFramedTransport(tr);
		TProtocol proto = new TBinaryProtocol(tf);
		Cassandra.Client client = new Cassandra.Client(proto);
		tf.open();
		client.set_keyspace("Keyspace1");

		String cfName = "Standard1";
		byte[] userIDKey = "1".getBytes(); //this is a row key

		Clock clock = new Clock(System.currentTimeMillis());

		//create a representation of the Name column
		ColumnPath colPathName = new ColumnPath(cfName);
		colPathName.setColumn("name".getBytes(UTF8));
		
		ColumnParent cp = new ColumnParent(cfName);

		//insert the name column
		LOG.debug("Inserting row for key " + new String(userIDKey));
		client.insert(userIDKey, cp, 
				new Column("name".getBytes(UTF8), 
						"George Clinton".getBytes(), clock), CL);

		//insert the Age column
		client.insert(userIDKey, cp, 
				new Column("age".getBytes(UTF8), 
						"69".getBytes(), clock), CL);
				
		LOG.debug("Row insert done.");

		// read just the Name column
		LOG.debug("Reading Name Column:");
		Column col = client.get(userIDKey, colPathName,
				CL).getColumn();

		LOG.debug("Column name: " + new String(col.name, UTF8));
		LOG.debug("Column value: " + new String(col.value, UTF8));
		LOG.debug("Column timestamp: " + col.clock.timestamp);

		//create a slice predicate representing the columns to read
		//start and finish are the range of columns--here, all
		SlicePredicate predicate = new SlicePredicate();
		SliceRange sliceRange = new SliceRange();
		sliceRange.setStart(new byte[0]);
		sliceRange.setFinish(new byte[0]);
		predicate.setSlice_range(sliceRange);

		LOG.debug("Complete Row:");
		// read all columns in the row
		ColumnParent parent = new ColumnParent(cfName);
		List<ColumnOrSuperColumn> results = 
			client.get_slice(userIDKey, 
					parent, predicate, CL);
		
		//loop over columns, outputting values
		for (ColumnOrSuperColumn result : results) {
			Column column = result.column;
			LOG.debug(new String(column.name, UTF8) + " : "
					+ new String(column.value, UTF8));
		}
		tf.close();
		
		LOG.debug("All done.");
	}
}

Running this example will output the following:

DEBUG 14:02:09,572 Inserting row for key 1
DEBUG 14:02:09,580 Row insert done.
DEBUG 14:02:09,580 Reading Name Column:
DEBUG 14:02:09,585 Column name: name
DEBUG 14:02:09,586 Column value: George Clinton
DEBUG 14:02:09,586 Column timestamp: 1284325329569
DEBUG 14:02:09,589 Complete Row:
DEBUG 14:02:09,594 age : 69
DEBUG 14:02:09,594 name : George Clinton
DEBUG 14:02:09,594 All done.

Note

This isn’t Cassandra-specific, but in Java you can easily get a more user-friendly representation of a date by wrapping the long timestamp output with a new Date object, like this: new Date(col.timestamp);.

Let’s unpack what we’ve done here. First, we create a connection to the Cassandra server:

TTransport tr = new TSocket(HOST, PORT);
//new default in 0.7 is framed transport
TFramedTransport tf = new TFramedTransport(tr);
TProtocol proto = new TBinaryProtocol(tf);
Cassandra.Client client = new Cassandra.Client(proto);
tf.open();
client.set_keyspace("Keyspace1");

Here we’re using the framed transport, which is the new default in Cassandra 0.7. This code connects to Cassandra at the specified keyspace.

Then, we create representations for the column family we’ll use, and convenience values for the row key and clock that indicate when this insert was performed:

String cfName = "Standard1";
byte[] userIDKey = "1".getBytes(); //this is a row key

Clock clock = new Clock(System.currentTimeMillis());

Next, we use the client object with the column path to insert a new value:

ColumnParent cp = new ColumnParent(cfName);

//insert the name column
LOG.debug("Inserting row for key " + new String(userIDKey));
client.insert(userIDKey, cp, 
		new Column("name".getBytes(UTF8), 
				"George Clinton".getBytes(), clock), CL);

The insert operation requires a row key, as well as the column object that includes the column name and the value we want to assign to it for this row key. We also specify the clock representing when this insert was performed, and the consistency level to apply.

We then basically repeat this operation to write to the same row, but now to the age column, giving it a value of 69:

client.insert(userIDKey, cp, 
		new Column("age".getBytes(UTF8), 
				"69".getBytes(), clock), CL);

So at this point we have inserted two columns into a single row and are ready to read it back to verify.

To ensure our insert went well by reading it back, we use the client get method, passing the row key and the path to the column we want to read (the name column below), and then specify the consistency level we require for this operation:

ColumnPath colPathName = new ColumnPath(cfName);
colPathName.setColumn("name".getBytes(UTF8));
Column col = client.get(userIDKey, colPathName,
		CL).getColumn();

LOG.debug("Column name: " + new String(col.name, UTF8));
LOG.debug("Column value: " + new String(col.value, UTF8));
LOG.debug("Column timestamp: " + col.clock.timestamp);

So each column value has its own timestamp (wrapped in a clock) because it’s the column, not the row, that is the atomic unit. This can be confusing when you first come to Cassandra if you’re used to adding a timestamp column to a relational table to indicate when it was last updated. But there’s no such thing in Cassandra as the last time a row was updated; it’s granular at the column level.

Because Cassandra returns a byte array for column names and values, we create a String object around the byte array so we can do application stuff with it (like write it to a log here). The clock is stored as a long (representing the milliseconds since the Unix epoch), so we could wrap this in a new java.util.Date object if we wanted to.

So using the get method and specifying the column path and other parameters, we read a single column’s value. But now we want to get a “range” of columns for a single row (called a slice). So we use a slice predicate:

SlicePredicate predicate = new SlicePredicate();
SliceRange sliceRange = new SliceRange();
sliceRange.setStart(new byte[0]);
sliceRange.setFinish(new byte[0]);
predicate.setSlice_range(sliceRange);

The slice predicate is a container object that allows us to specify the range of columns that we want to read between a start and a finish. By specifying new byte[0] as the start and finish positions of the range, we’re saying that we want all of the columns.

Now that our predicate is set up, we can execute the range slice query that will get all of the columns for our row, so we can loop over them one by one:

ColumnParent parent = new ColumnParent(cfName);
List<ColumnOrSuperColumn> results = 
  client.get_slice(userIDKey, parent, predicate, CL);

This get_slice query uses the predicate we created as well as two new things: the ColumnOrSuperColumn class and a column parent. The ColumnOrSuperColumn class is just what it says: it represents either a column or a super column returned by Thrift. Thrift does not have support for inheritance, so this class is used to pack up both columns and super columns in this one object (depending on what you’re querying). The client just reads the values if a column is returned; if a super column is returned, the client gets a column out of the super column and reads that.

The column parent is the path to the parent of a set of columns. Because we’re retrieving a set of columns in a get_slice by definition, we need to specify the column family that is the parent of the columns we’re looking up. Now we can loop over columns for this row, printing out each column’s three attributes, and then close the connection:

for (ColumnOrSuperColumn result : results) {
  Column column = result.column;

  LOG.debug(new String(column.name, UTF8) + " : "
    + new String(column.value, UTF8));
}
tf.close();

You use the insert operation to add values or to overwrite existing values. So to update a value, use the insert operation with the new column values you want for the same key.

You can also insert many values at once, which we see how to do later in this chapter in Batch Mutates.

Using a Simple Get

Use the get operation to retrieve columns or super columns, using a column path to access them:

ColumnOrSuperColumn get(byte[] key, ColumnPath column_path, 
  ConsistencyLevel consistency_level)

Example 7-2 shows how to do this.

Example 7-2. Using the get operation
package com.cassandraguide.rw;

//imports left out

public class GetExample {

	private static final Logger LOG = Logger.getLogger(GetExample.class);
	
	private static final String UTF8 = "UTF8";
	private static final String HOST = "localhost";
	private static final int PORT = 9160;
	private static final ConsistencyLevel CL = ConsistencyLevel.ONE;

	public static void main(String[] args) throws UnsupportedEncodingException,
			InvalidRequestException, UnavailableException, TimedOutException,
			TException, NotFoundException {

		TTransport tr = new TSocket(HOST, PORT);
		//new default in 0.7 is framed transport
		TFramedTransport tf = new TFramedTransport(tr);
		TProtocol proto = new TBinaryProtocol(tf);
		Cassandra.Client client = new Cassandra.Client(proto);
		tf.open();
		client.set_keyspace("Keyspace1");
		
		String cfName = "Standard1";
		byte[] userIDKey = "1".getBytes(); //this is the row key

		Clock clock = new Clock(System.currentTimeMillis());

		//create a representation of the Name column
		ColumnParent cp = new ColumnParent(cfName);

		//insert the name column
		LOG.debug("Inserting row for key " + new String(userIDKey));
		client.insert(userIDKey, cp, 
				new Column("name".getBytes(UTF8), 
						"George Clinton".getBytes(), clock), CL);
		
		LOG.debug("Row insert done.");

		/** Do the GET */

		LOG.debug("Get result:");
		// read all columns in the row
		ColumnPath path = new ColumnPath();
		path.column_family = cfName;
		path.column = "name".getBytes();
		
		ColumnOrSuperColumn cosc = client.get(userIDKey, path, CL);
		Column column = cosc.column;
		LOG.debug(new String(column.name, UTF8) + " : "
				+ new String(column.value, UTF8));
		//END GET
		
		tr.close();
		
		LOG.debug("All done.");
	}
}

Here, we perform an insert so that we have something to get. We create a client object and then call its get method, which takes the row key, a column path, and a consistency level as arguments. The column path sets the name of the column that we’re looking for. Remember that the column names and values are binary (in a Java client they’re byte arrays), so we have to convert the string column name to a byte array for the query. Then when we get the column’s value, it’s a byte array too, so we convert it to a string to work with the result.

In this example, we add values for both the name and the age columns, but because our column path represents only the single column we’re interested in querying, we just get the age. The output is shown here:

DEBUG 14:36:42,265 Inserting row for key 1
DEBUG 14:36:42,273 Row insert done.
DEBUG 14:36:42,273 Get result:
DEBUG 14:36:42,282 name : George Clinton
DEBUG 14:36:42,282 All done.

Seeding Some Values

Here we’ll just use the command-line interface to quickly create a couple of keys with some different columns to serve as data for the following queries:

[default@Keyspace1] set Standard1['k1']['a']='1'
Value inserted.
[default@Keyspace1] set Standard1['k1']['b']='2'
Value inserted.
[default@Keyspace1] set Standard1['k1']['c']='3'
Value inserted.
[default@Keyspace1] set Standard1['k2']['a']='2.1'
Value inserted.
[default@Keyspace1] set Standard1['k2']['b']='2.2'

So we have two rows; the first has three columns and the second has two columns.

Slice Predicate

A slice predicate is used in both read and write operations, acting as a limiting factor for specifying a set of columns. You can specify the predicate one of two ways: with a list of column names or with a slice range. If you know the names of a few columns that you want to retrieve, then you can specify them explicitly by name. If you don’t know their names, or for another reason want to retrieve a range of columns, you use a slice range object to specify the range.

Note

I’m using contained examples to be concise. But it is not uncommon to see many, many columns defined per row in Cassandra, so don’t let these examples mislead you. It’s a big data store and can hold two billion columns per row in version 0.7.

To use the slice predicate, you create a predicate object populated with the column names you want, and pass it to your read operation.

Getting Particular Column Names with Get Slice

If you want to get just the columns called “a” and “b” in a single row, you can use the predicate with the column names specified.

You can get a set of columns contained in a column parent using a get_slice operation. It will retrieve values by column name or a range of names, and will return either columns or super columns. The get_slice operation has this signature:

List<ColumnOrSuperColumn> get_slice(byte[] key, 
ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel cl)

This is shown in Example 7-3.

Example 7-3. SlicePredicate.java
package com.cassandraguide.rw;

// imports omitted

public class SlicePredicateExample {
	
	public static void main(String[] args) throws Exception {
		Connector conn = new Connector();
		Cassandra.Client client = conn.connect();
		
		SlicePredicate predicate = new SlicePredicate();
		List<byte[]> colNames = new ArrayList<byte[]>();
		colNames.add("a".getBytes());
		colNames.add("b".getBytes());
		predicate.column_names = colNames;
		
		ColumnParent parent = new ColumnParent("Standard1");

		byte[] key = "k1".getBytes();
		List<ColumnOrSuperColumn> results = 
			client.get_slice(key, parent, predicate, ConsistencyLevel.ONE);
		
		for (ColumnOrSuperColumn cosc : results) {	
			Column c = cosc.column;
			System.out.println(new String(c.name, "UTF-8") + " : "
					+ new String(c.value, "UTF-8"));
		}
				
		conn.close();
		
		System.out.println("All done.");
	}
}

In this example, only the specified columns will be retrieved, and the other columns will be ignored. The query returns a list of ColumnOrSuperColumn objects. Because we know that we’re querying a regular column family, we get the column out of the ColumnOrSuperColumn data structure returned by the underlying RPC mechanism (Thrift), and finally retrieve the names and values from it in a loop.

The output is as follows:

a : 1
b : 2
All done.

Getting a Set of Columns with Slice Range

Sometimes you don’t want to specify each and every column you want to retrieve, perhaps because you have a lot of columns to retrieve or because you don’t know all of their names.

To read a range of the columns in a row, you can specify the start and finish columns, and Cassandra will give you the start and finish columns as well as any columns in between, according to the sorting order based on the comparator for that column family. Create your slice predicate, then create your range, and then set the range into the predicate before passing the predicate to your read operation. Here’s an example:

SlicePredicate predicate = new SlicePredicate();
SliceRange sliceRange = new SliceRange();
sliceRange.setStart("age".getBytes());
sliceRange.setFinish("name".getBytes());
predicate.setSlice_range(sliceRange);

When executed with a get_slice operation, this query will return the two columns specified, as well as any columns that a comparison operation would sort within that range (between them lexicographically, numerically, or whatever). For example, if this row also had an “email” column, it would be returned in the results as well.

You must consider the column names according to their comparator, and specify start and finish in the proper order. For example, trying to set a start of name and a finish of age will throw an exception like this:

InvalidRequestException(why:range finish must come after start in the order of 
traversal)

Note

Recall that “returning a column” doesn’t mean that you get the value, as in SQL; it means that you get the complete column data structure, which is the name, the value, and the timestamp.

Counts

You can limit the number of columns returned by your slice range by using the count attribute of the Slice Range structure. Let’s say we have a row with hundreds of columns. We could specify a range that might include many of these columns, but limit our result set to only the first 10 columns like this:

SliceRange sliceRange = new SliceRange();
sliceRange.setStart("a".getBytes());
sliceRange.setFinish("d".getBytes());
sliceRange.count = 10;

Again, the “first” columns are those according to the order dictated by the column family’s comparator.

Reversed

You can also reverse the order in which the columns are fetched by setting the reversed = true attribute on the slice range. If you have columns age, email, and name, then setting reversed to true returns them in the order name, email, age.

Getting All Columns in a Row

To read all of the columns in a row, you still need the predicate using a slice range, but you pass it empty byte arrays for the start and finish parameters, like this:

SlicePredicate predicate = new SlicePredicate();
SliceRange sliceRange = new SliceRange();
sliceRange.setStart(new byte[0]);
sliceRange.setFinish(new byte[0]);
predicate.setSlice_range(sliceRange);

Then, you can pass this populated predicate object to your get_slice operation along with the other necessary parameters (such as consistency level and so on).

Get Range Slices

In the same way that you access a set of columns by using a range, you can also access a range of keys or tokens. Using the get_range_slices operation, you can pass it a KeyRange object that defines the range of keys you want.

One major distinction here is that you can get either keys or tokens in a range using the same KeyRange data structure as the parameter to get_range_slices. Key ranges are start-inclusive, whereas token ranges are start-exclusive; because of ring distribution, tokens can wrap around so that the end token is less than the start token.

The API defines the operation get_range_slices for you to use. The operation is structured like this:

List<KeySlice> results = client.get_range_slices(parent, predicate, keyRange, 
  ConsistencyLevel);

For a complete listing that shows using a range slice, see Example 7-4.

Example 7-4. GetRangeSliceExample.java
package com.cassandraguide.rw;

//imports omitted

public class GetRangeSliceExample {
	
	public static void main(String[] args) throws Exception {
		Connector conn = new Connector();
		Cassandra.Client client = conn.connect();
		
		System.out.println("Getting Range Slices.");
		
		SlicePredicate predicate = new SlicePredicate();
		List<byte[]> colNames = new ArrayList<byte[]>();
		colNames.add("a".getBytes());
		colNames.add("b".getBytes());
		predicate.column_names = colNames;
		
		ColumnParent parent = new ColumnParent("Standard1");
		
		KeyRange keyRange = new KeyRange();
		keyRange.start_key = "k1".getBytes();
		keyRange.end_key = "k2".getBytes();

		//a key slice is returned
		List<KeySlice> results = 
			client.get_range_slices(parent, predicate, keyRange, 
					ConsistencyLevel.ONE);
		
		for (KeySlice keySlice : results) {	
			List<ColumnOrSuperColumn> cosc = keySlice.getColumns();
						
			System.out.println("Current row: " + 
					new String(keySlice.getKey()));
			
			for (int i = 0; i < cosc.size(); i++) {	
				Column c = cosc.get(i).getColumn();
				System.out.println(new String(c.name, "UTF-8") + " : "
						+ new String(c.value, "UTF-8"));
			}
		}
				
		conn.close();
		
		System.out.println("All done.");
	}
}

Note

This program assumes that you’ve added a few values for a few different row keys to work with, as shown in Seeding Some Values.

The program outputs the following:

Getting Range Slices.
Current row: k1
a : 1
b : 2
Current row: k2
a : 2.1
b : 2.2
All done.

Though the names involving “slice” and “range” may seem unusual at first, they turn out to be innocuous. Just remember that slices mean sets of columns and ranges mean sets of keys. So in this example, we’re getting multiple columns for multiple row keys.

Multiget Slice

With get_slice, we saw how to get a set of column names for a single specified row key. multiget_slice lets you retrieve a subset of columns for a set of row keys based on a column parent and a predicate. That is, given more than one row key, retrieve the value of the named columns for each key. So a multiget slice is more than one named column for more than one row.

Note

There used to be a method called multiget, but it is now deprecated in favor of multiget_slice.

The operation looks like this:

Map<byte[],List<ColumnOrSuperColumn>> results = 
			client.multiget_slice(rowKeys, parent, predicate, CL);

You specify the parent and the predicate as usual, and also provide the operation with the set of row keys you want to query. The row keys are just specified as a list of byte arrays, which are the key names.

The results come back to us as a Map<byte[],List<ColumnOrSuperColumn>>. This may seem like a complicated data structure, but it’s actually simple. A map is a key/value pair, and in this case the byte array key is the row key, so in this example there will be two keys: one for each row key we get. Each byte[] key in the results map points to a list containing one or more ColumnOrSuperColumn objects. This structure is used because Thrift does not support inheritance. You have to know whether your column family is of type Standard or Super, and then you just get the right one from the data structure. From the ColumnOrSuperColumn, you extract the column (in this case, the super_column will be empty), and then use the column object to get the name and value. You could also get the timestamp from it if you wanted to.

An example of using multiget slice is shown in Example 7-5.

Example 7-5. MultigetSliceExample.java
package com.cassandraguide.rw;

//imports omitted

public class MultigetSliceExample {
	
	private static final ConsistencyLevel CL = ConsistencyLevel.ONE;
	
	private static final String columnFamily = "Standard1";

	public static void main(String[] args) throws UnsupportedEncodingException,
			InvalidRequestException, UnavailableException, TimedOutException,
			TException, NotFoundException {

		Connector conn = new Connector();
		Cassandra.Client client = conn.connect();
		
		System.out.println("Running Multiget Slice.");

		SlicePredicate predicate = new SlicePredicate();
		List<byte[]> colNames = new ArrayList<byte[]>();
		colNames.add("a".getBytes());
		colNames.add("c".getBytes());
		predicate.column_names = colNames;

		ColumnParent parent = new ColumnParent(columnFamily);

		//instead of one row key, we specify many
		List<byte[]> rowKeys = new ArrayList<byte[]>();
		rowKeys.add("k1".getBytes());
		rowKeys.add("k2".getBytes());
		
		//instead of a simple list, we get a map, where the keys are row keys
		//and the values the list of columns returned for each
		Map<byte[],List<ColumnOrSuperColumn>> results = 
			client.multiget_slice(rowKeys, parent, predicate, CL);
		
		for (byte[] key : results.keySet()) {	
			List<ColumnOrSuperColumn> row = results.get(key);
			
			System.out.println("Row " + new String(key) + " --> ");
			for (ColumnOrSuperColumn cosc : row) {
				Column c = cosc.column;
				System.out.println(new String(c.name, "UTF-8") + " : "
						+ new String(c.value, "UTF-8"));
			}
		}
				
		conn.close();
		
		System.out.println("All done.");
	}
}

So we have a couple of row keys in the database, with identifiers I’ve kept purposefully short and clear in order to help you visualize the structure. We have a variety of column sets between the two rows but we’re only interested in retrieving the a and c columns, so we use a slice predicate and specify the column_names to limit the results. We also want to specify more than one row, so we use a list of byte arrays to indicate which row keys we’re after.

Running this code elicits the following result:

Running Multiget Slice.
Row k2 --> 
a : 2.1
Row k1 --> 
a : 1
c : 3
All done.

As you can see, there was no column named “b” defined for the row with key “k2”, and Cassandra didn’t return anything for it. There was a value for column “b” in row “k1”, but we didn’t ask for it in our slice, so we didn’t get it.

Note

Because we’re using Thrift as the underlying RPC mechanism to communicate with Cassandra, the results will come back to you unordered and you’ll have to sort them on the client. That’s because Thrift can’t preserve order. A multiget is actually just a wrapper over a series of get requests.

Deleting

Deleting data is not the same in Cassandra as it is in a relational database. In RDBMS, you simply issue a delete statement that identifies the row or rows you want to delete. In Cassandra, a delete does not actually remove the data immediately. There’s a simple reason for this: Cassandra’s durable, eventually consistent, distributed design. If Cassandra had a straightforward design for deletes and a node goes down, that node would therefore not receive the delete. Once that node comes back online, it would mistakenly think that all of the nodes that had received the delete had actually missed a write (the data that it still has because it missed the delete), and it would start repairing all of the other nodes. So Cassandra needs a more sophisticated mechanism to support deletes. That mechanism is called a tombstone.

A tombstone is a special marker issued in a delete that overwrites the deleted values, acting as a placeholder. If any replica did not receive the delete operation, the tombstone can later be propagated to those replicas when they are available again. The net effect of this design is that your data store will not immediately shrink in size following a delete. Each node keeps track of the age of all its tombstones. Once they reach the age as configured in gc_grace_seconds (which is 10 days by default), then a compaction is run, the tombstones are garbage-collected, and the corresponding disk space is recovered.

Note

Remember that SSTables are immutable, so the data is not deleted from the SSTable. On compaction, tombstones are accounted for, merged data is sorted, a new index is created over the sorted data, and the freshly merged, sorted, and indexed data is written to a single new file.

The assumption is that 10 days is plenty of time for you to bring a failed node back online before compaction runs. If you feel comfortable doing so, you can reduce that grace period to reclaim disk space more quickly.

Let’s run an example that will delete some data that we previously inserted. Note that there is no “delete” operation in Cassandra, it’s remove, and there’s really no “remove,” it’s just a write (of a tombstone flag). Because a remove operation is really a tombstone write, you still have to supply a timestamp with the operation, because if there are multiple clients writing, the highest timestamp wins—and those writes might include a tombstone or a new value. Cassandra doesn’t discriminate here; whichever operation has the highest timestamp will win.

A simple delete looks like this:

Connector conn = new Connector();
Cassandra.Client client = conn.connect();

String columnFamily = "Standard1";
byte[] key = "k2".getBytes(); //this is the row key

Clock clock = new Clock(System.currentTimeMillis());

ColumnPath colPath = new ColumnPath();
colPath.column_family = columnFamily;
colPath.column = "b".getBytes();

client.remove(key, colPath, clock, ConsistencyLevel.ALL);

System.out.println("Remove done.");

conn.close();

Batch Mutates

There were many examples of using batch mutate to perform multiple inserts in Chapter 4, so I won’t rehash that here. I’ll just present an overview.

To perform many insert or update operations at once, use the batch_mutate method instead of the insert method. Like a batch update in the relational world, the batch_mutate operation allows grouping calls on many keys into a single call in order to save on the cost of network round trips. If batch_mutate fails in the middle of its list of mutations, there will be no rollback, so any updates that have already occured up to this point will remain intact. In the case of such a failure, the client can retry the batch_mutate operation.

Note

There used to be an operation called batch_insert, but it is deprecated.

Batch Deletes

The sample application doesn’t include any delete operations, so let’s look at that in a little more depth.

You use remove to delete a single column, but you can use a Deletion structure with a batch_mutate operation to perform a set of complex delete operations at once.

You can create a list of column names that you want to delete, and then indirectly pass it to batch_mutate. I say “indirectly” because there are several data structures that you need to create in order to run a deletion.

First, create the list of column names to delete. Pass that list to a SlicePredicate, pass the SlicePredicate to a Deletion object, pass that to a Mutation object, and finally, pass that to a batch_mutate.

The following code snippet shows how to do this. First, you create a SlicePredicate object to hold the names of the columns you want to delete. Here we just want to delete the “b” column. Then, you create a Deletion object that sets this predicate, and create a Mutation that sets this Deletion.

Once you have the Deletion object set up, you can create your mutation map. This map uses a byte array key to point to the deletions you want to make, so you can use different keys with the same or different mutation objects to perform the batch delete. These keys point to another map, which itself uses a string as the key to name the column family to modify. That map key points to a list of mutations that should be performed.

String columnFamily = "Standard1";
byte[] key = "k2".getBytes(); //this is the row key

Clock clock = new Clock(System.currentTimeMillis());

SlicePredicate delPred = new SlicePredicate();
List<byte[]> delCols = new ArrayList<byte[]>();

//let's delete the column named 'b', though we could add more
delCols.add("b".getBytes());
delPred.column_names = delCols;

Deletion deletion = new Deletion();
deletion.predicate = delPred;
deletion.clock = clock;
Mutation mutation = new Mutation();
mutation.deletion = deletion;

Map<byte[], Map<String, List<Mutation>>> mutationMap = 
new HashMap<byte[], Map<String, List<Mutation>>>();

List<Mutation> mutationList = new ArrayList<Mutation>();
mutationList.add(mutation);

Map<String, List<Mutation>> m = new HashMap<String, List<Mutation>>();
m.put(columnFamily, mutationList);

//just for this row key, though we could add more
mutationMap.put(key, m);
client.batch_mutate(mutationMap, ConsistencyLevel.ALL);

There is a second way to specify items to delete using the Deletion structure: you can use a SliceRange instead of a List of columns, so you can delete by range instead of explicitly listing column names.

Range Ghosts

You may sometimes hear people refer to “range ghosts” in Cassandra. This means that even if you have deleted all of the columns for a given row, you will still see a result returned for that row in a range slice, but the column data will be empty. This is valid, and is just something to keep in mind as you iterate result sets on the client.

Programmatically Defining Keyspaces and Column Families

You can create keyspaces and column families through the API as well. Example 7-6 shows you how.

Example 7-6. DefineKeyspaceExample.java
package com.cassandraguide.rw;

//imports omitted

/**
 * Shows how to define a keyspace and CF programmatically.
 */
public class DefineKeyspaceExample {

	public static void main(String[] args) throws UnsupportedEncodingException,
			InvalidRequestException, UnavailableException, TimedOutException,
			TException, NotFoundException, InterruptedException {

		Connector conn = new Connector();
		Cassandra.Client client = conn.connect();

		System.out.println("Defining new keyspace.");
		
		KsDef ksdef = new KsDef();
		ksdef.name = "ProgKS";
		ksdef.replication_factor = 1;
		ksdef.strategy_class = 
			"org.apache.cassandra.locator.RackUnawareStrategy";
		
		List<CfDef> cfdefs = new ArrayList<CfDef>();
		CfDef cfdef1 = new CfDef();
		cfdef1.name = "ProgCF1";
		cfdef1.keyspace = ksdef.name;
		cfdefs.add(cfdef1);
		
		ksdef.cf_defs = cfdefs;
		
		client.system_add_keyspace(ksdef);
		
		System.out.println("Defining new cf.");
		CfDef cfdef2 = new CfDef();
		cfdef2.keyspace = ksdef.name;
		cfdef2.column_type = "Standard";
		cfdef2.name = "ProgCF";
		
		client.system_add_column_family(cfdef2);
		
		conn.close();
		
		System.out.println("All done.");
	}
}

Summary

In this chapter we saw how to read and write data using a variety of operations offered by Cassandra’s rich API.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.142.119.241