8 Extensible Record Stores

Extensible record stores are database systems akin to Google’s BigTable system. These databases have tables as their basic data structure although with a highly flexible column management; they implement the concept of column families that act as containers for subsets of columns. Alternative names are tabular data stores, columnar data stores, wide column stores or column family stores. To avoid confusion with the column stores introduced in the previous chapter, we will stick with the more generic name extensible record store here. Although extensible record stores reuse some terminology of the relational data model, they are also akin to key-value stores because they map a unique, multidimensional key to a value.

8.1 Logical Data Model

Extensible record stores bid farewell to the strict normalization paradigm known from RDBMSs. Instead they encourage a certain amount of data duplication for the sake of better query locality and more efficient query execution. That is, while the design in the relational model is centered around entities (see the Entity-Relationship modeling in Section 1.3.1) and later on normalization (see Section 2.2) is applied to obtain several tables with less anomalies, for an extensible record store first of all a typical query workload should be identified and data modeled around this workload accordingly. As an illustration, let us revisit our library example with three tables similar to the normalized tables in Section 2.2 (see Table 8.1.).

Table 8.1. Library tables revisited

image

Now let us assume that a typical query on this data set would be: “What are the names of those readers who have to return books on or before 31-10-2016?”. When we store these data in a row store , we would have to execute a join operation in order to recombine the return date and the corresponding reader’s name. A row store would also load more data than necessary into main memory when answering this query; more precisely, it would also load the reader ID columns which are irrelevant in the query. Similar overhead occurs when we store the data in a column store , because it would have to execute a tuple reconstruction as well as a join on the reader ID: it has to combine values from the Name column and the ReturnDate column and hence it has to access and combine data from many data locations.

For all of these reasons, extensible record stores represent cells in the table as key-value pairs: every column name is the key which is mapped to the column value in a row. In other words, the table’s values are stored as a collection of key-value pairs where the column name is repeated for every value in the column and rows are identified by a row key. Due to this repetition the general advice is to choose only short column names as otherwise storage space would be increased significantly for larger tables. On the other hand, the key-value pair representation makes the table structure much more flexible: we are free to choose different column names in every row. For example, we could decide to use the return date as the column name and map it to the reader’s name; in other words, we combine the names and return dates in a single key-value pair for each row. We can ignore the reader ID for the moment as we are not interested in it in our example workload (it might be stored in another table together with more data of the reader). To continue the example, our table would correspond to the following collection of key-value pairs (let us ignore the BookID column for the moment, too):

image

What extensible record stores do to organize these key-value pairs is adding another structural dimension (apart from the usual two dimensions column and row ): the column family . A column family groups columns that are often accessed simultaneously in a typical query. The columns inside a column family are identified by a so-called column qualifier . The full column name consists of the column family name and the column qualifier. When processing such a query, only those column families are fetched into main memory that contain the columns that are required by the query; the remainder of a row need not be loaded. Column families hence provide data locality by storing data inside a column family together on disk. One further advantage of column families is the dynamic addition of new columns: While column families must be created before using them and hence are fixed for a table, inside a column family, arbitrary columns can be added at runtime – and theoretically there can be infinitely many columns in each column family. In our example it might make sense to separate the bibliographic information of a book from the lending information. So we introduce one column family called BookInfo and one column family called LendingInfo:

image

Lastly, the columns (that is, the key-value pairs) that belong to the same entity are grouped together according to a unique identifier (the row key ). In our example, the entities are books and we use the book ID as the row key. Most importantly, the row keys have to be unique for each entity inside each column family. However, there is no way of specifying foreign key constraints between different column families and no referential integrity is ensured. That is, an extensible record store has no means of ensuring that for a book ID that appears as a row key in the LendingInfo column family, there is also an entry for the same row key in the BookInfo column family. Hence, all kinds of referential integrity checks have to be done by the application using the extensible record store.

image

At this point we also see the flexibility of how columns can be added to rows: one row can have different columns (with different column qualifiers like the due dates) than other rows inside the same column family. This is exactly why extensible record stores are good at storing sparse data : in contrast to a relational table that would record null values, an extensible record store just ignores values that are not present and no null values are included in rows. All in all, the concatenation of the row key, the column family name and the column qualifier identifies a cell in the extensible record store; that is, the full key to access a column value is of the form rowkey:columnfamily:columnqualifier. For example, the full key 1008.BookInfo.Author uniquely identifies the value “Smith”.

Extensible record stores offer the convenient feature of ordered storage : while the relational data model is set-based and the order of the output basically depends on the DBMS, extensible record stores sort the data internally. Inside a column family, the rows are sorted by their row keys; and inside a row the columns are sorted by their qualifiers. Some extensible record stores (like HBase) just represent row keys and column qualifiers as byte arrays and hence order row keys and column qualifiers by their binary representation. Other extensible record stores (like Cassandra) offer data types for row keys and column qualifiers and hence sorting can be done according to the data type and may differ from the binary order. In particular, for different column families, different sort orders can be chosen. For example, the column qualifiers in the BookInfo column family can be sorted alphabetically descending, while the column qualifiers in LendingInfo can be sorted chronologically ascending, and the row keys are ordered numerically ascending.

image

image

With these ordering features, extensible record stores are particularly well-suited for identifying contiguous sequences of columns – like the one we considered at the beginning of this section: “What are the names of those readers who have to return books on or before 31-10-2016?”. To answer this query we have to find out the columns with column qualifiers less than or equal to 31-10-2016. Due to the ordering we know that the matching columns are stored in a consecutive range and that we do not have to search further once we have reached a column qualifier greater than 31-10-2016. In a similar manner, row keys should be chosen in such a way that rows that are often accessed together have row keys that are close according to the chosen ordering – so that they could be fetched from disk in the same slice of rows.

Last but not least, some extensible record stores add one more dimension to the row-columnfamily-column space: time. Every insert or update of a value is accompanied by a timestamp . This timestamp can be specified by the user in the insert or update command; otherwise the current system time (in milliseconds) is used. Extensible record stores hence provide an automatic versioning of column values. When a read command without an explicit timestamp is issued on a cell, the most recent version is returned; a user can however also specify a timestamp in a read command to retrieve older versions. Versioning can further be configured by specifying a maximum threshold for the amount of stored version: the oldest version will automatically be discarded once a new version is stored and the maximum value is exceeded. Another option for automatic discarding is to specify a time-to-live value for each cell: when the specified time span has elapsed, the corresponding version of the cell is deleted.

One last thing to mention is that extensible record stores usually do not make a distinction between inserts and updates. Instead, a put command is provided that checks whether there is an existing cell for the given key; if this is the case, the value for the key will be updated; otherwise a new cell is inserted. This is why this operation is sometimes called an upsert . With versioning enabled, the upsert also checks the timestamp provided in the put command; the version of the cell with exactly the same timestamp as provided in the put command is updated; if there is no such version, a new version for the provided timestamp (that is different from the existing timestamps) is inserted.

8.2 Physical storage

Under the hood, extensible record stores use several techniques for efficient query answering and recovery. We survey some of them in this section.

8.2.1 Memtables and immutable sorted data files

Extensible record stores implement a write-optimized storage model: all data records written to the on-disk store will only be appended to the existing records. Once written, these records are read-only and cannot be modified: they are immutable data files. Any modification of a record must hence also be simulated by appending a new record in the store. More specifically, writing to and reading from an extensible record store comprises the following steps.

image

Fig. 8.1. Writing to memory tables and data files

Memtable: The most recent writes are collected in a main memory table (a so-called memtable ) of fixed size. Usually there is one memtable per column family. A record in the memtable is identified by its key (row key, column family, column qualifier and timestamp in milliseconds). In case of an upsert, the value part of this record also contains the new value that should be assigned to this key; in case of a deletion the data part of the record is empty. Recall that the timestamp can be chosen by the user; if no particular timestamp is specified in the put request, the current system time is used by default.

Tombstones: Deletions are treated by writing a new record for a key. However, this record has no value assigned to it; instead a delete marker (called tombstone ) is attached to the record. The tombstone masks all previous versions (with timestamps prior to the timestamp of the tombstone) which will then be invisible to the user. Later on however, newer versions for the same key can again be inserted and will be accessible by the user. Several types of tombstones can be defined that differ in the scope of records that they mark as deleted: a tombstone can for example mark as deleted either a single version of a column (defined by its exact timestamp), an entire column (with all its versions), or an entire column family (with all versions of all columns).

Sorted data files: Once the memtable is filled (at least up to a certain percentage), it is written to disk (flushed ) and an entirely new memtable is started. When flushing a memtable, its records are sorted by key. Recall that in some record stores the sorting order of the row key and column qualifiers can be configured; by default, binary ordering (byte order) is used. The flushed sorted data files on disk are immutable: they are accessed when reading data but no writes are executed on them. Modifications (upserts or deletions) for a given key will be contained in other sorted data files that are flushed at a later point of time. The advantage of immutable data files is that buffer management is a lot easier: there are no “dirty pages” in the page buffer that contain modifications that have to be translated to writes on the on-disk records. Internally, the sorted data files carry a sequence number to maintain the chronological order of write operations which is important for the read process.

Combine upon read: The downside of immutable data files is that they complicate the read process: retrieving all the relevant data that match a user query requires combining records from several on-disk data files and the memtable. This combination may affect records for different search keys that are spread out across several data files; but it may also apply to records for the same key of which different versions exist in different data files. In other words, all sorted data files have to be searched for records matching the read request (see Figure 8.2.).

image

Fig. 8.2. Reading from memory tables and data files

Roughly, there are two types of read requests: get (also called point query ) and scan (also called range query ). A get request accesses a particular row (identified by its row key). A scan iterates over a contiguous range of rows depending on some condition on the row key; for example, a starting row key can be specified and then the next consecutive 10 rows can be retrieved with a scan. Due to sorting of the data files, scans for a contiguous range of keys can be done efficiently.

Once the row keys to be accessed are identified, the result can be restricted to a subset of the columns inside each row (by specifying their column qualifiers). In some extensible record stores a set of versions of a cell (by specifying its column qualifier and a condition on the timestamp) can also be accessed. For example, for a particular column qualifier and a user-defined threshold k , the k records with the k most recent timestamps are returned – provided that at least k versions of the cell exist. Usually, a range of timestamps can also be specified in a per-query basis; for example from 0 up to a certain timestamp to retrieve the oldest records.

Combining records from various on-disk data files and the memtable and identifying the most recent version of a column is not trivial. One difficulty is that there may be a clash of timestamps: several records for exactly the same key and version (with identical row key, column family, column qualifier and timestamp) but differing in their value portion may exists in the immutable data files. These clashes may occur because the timestamp is part of the key: if the user specifies a key that already exists in a data file, a new record with the same key is appended to the memtable and later on flushed to a new data file. When reading this key, several values for exactly the same key could be returned from different data files. It is however desirable to determine a unique most recent value for each key. One way to handle such a clash is to use the unique sequence number of the stored data files: the record for a key that is contained in the data file with the highest sequence number is the most recently written one. A further difficulty of reads is that the extensible record store has to interpret the time-to-live (TTL) values of records as well as tombstones when retrieving and combining data from multiple sorted data files. Because on-disk data files are immutable records with a passed TTL still remain in the store and hence they have to be skipped when combining records into the result set. In a similar manner, deletions mask all records with earlier timestamps than the timestamp of the tombstone; these masked records have to be filtered out of the result set.

Some extra information can be maintained for each data file to speed up the combine process; for example, the range of row keys in the file or the minimum and maximum timestamp can be stored as metadata of the file. With the help of these metadata, some data files may be excluded from the combine process straight away.

8.2.2 File format

Extensible records stores store the data in the on-disk data files in a certain format with the following properties:

Data blocks: An on-disk data file is composed of several data blocks. The block size can usually be configured in the extensible record store settings. Moreover, in some extensible record stores, a different block size can be used in each column family and hence the block size can be specified by the user when creating the column family. A block may also span multiple conventional memory pages; recall that the size of a memory page is usually fixed and dictated by the memory buffers and the operating system. On top of this, memory management is even more flexible in extensible record stores: the block size can also be exceeded by some records in a data file. Indeed, if the size of a record is larger than the block size, the record is nevertheless handled as one coherent block although it spans several memory pages.
Key-value pairs: As shown in Figure 8.3., a data block may contain one or more key-value pairs. Each key-value pair contains the entire key – that is, row key, column family, column qualifier and timestamp. This format hence is the foundation for the flexibility of extensible record stores because no fixed database schema is needed to interpret the data. This flexibility however comes at the price of repetitious occurrences of portions of keys: when a row consists of several columns, the row key is contained in every record for each of these columns; analogously, column families and column qualifiers are usually parts of keys in several different records. For this reason, longer row keys, column families and column qualifiers have a negative impact on storage consumption in extensible record stores. The key is followed by a type information: the type determines if the record is a put (in other words, an upsert) – in which case the new value is appended – of if it is a deletion – in which case the type also determines the scope of the deletion like single version, entire column or entire column family. In some extensible record stores additionally the type of an increment is available: in this case the column is called a counter column and during an insert a certain increment value is added to the previous most recent value of the record.

image

Fig. 8.3. File format of data files

Index: Obviously, data files usually contain records for several keys and may become quite large. Reading in records in a data file sequentially is a very inefficient method when searching for a single record for a given key in such a large data file. In order to speed up the retrieval of records from data files, an index structure is maintained at the end of each file. Indexing is done for row keys in a block-wise manner; that is, the first row key on each block is inserted in the index. The retrieval process is then supported by the index as follows. When searching for a given key in a data file, first of all the entire index of the data file is loaded into main memory. As not all row keys are maintained in the index (only the first row key in each block), the index has to return either the entry for the exact search key (in which case the search key is the first key in a block), or the index entry for the largest row key preceding the search key. In the former case when the exact search key is contained in the index, the index entry is used to offset to the correct block in the data file and load the block into memory to access the value for the search key. In the later case the exact search key is not found in the index and we hence cannot be sure whether a record for the search key is contained in the data file or not; due to this the index entry for the largest preceding row key is used to offset into the data file, load the block into main memory and parse it sequentially either until the exact search key is found and its record is accessed – or until the end of the block is reached without finding a record for the search key.

image

Fig. 8.4. Multilevel index in data files

Trailer: As the last component of a data file, a trailer contains management information (for example, where the index starts; see Figure 8.3.).

Multi-level index: Even though indexing is done at the block level and hence not all row keys are maintained in the index, the single index at the end of each data file might become quite large. This might slow down the read process because the entire index has to be loaded into main memory before accessing any key-value pairs. This is where multilevel indexes come to the rescue. A multilevel index splits the single index into several sub-indexes: one sub-index (called leaf index ) is stored at the end of each block, and only a small super-index (called root index ) pointing to the sub-indexes is stored at the end of the data file. As leaf indexes are contained in each block they now allow for a more fine-grained indexing: each key inside a block can be indexed in the leaf index of the block such that its index entry determines an offset into the block where the record for the key can be found. This extended file format is sketched in Figure 8.4.. For extremely large data files, even more intermediate index levels are possible to help keep the root level small; the root index entries then point to the intermediate indexes and the intermediate indexes themselves point to leaf indexes.

Depending on the exact organization of the index and the on-disk management of the data files (see the notion of compaction below) there are different implementations and manifestations of these data files. In particular, a form of data file is the Log-Structured Merge Tree (LSM tree) or one of its variants like for example a Sorted Array Merge Tree (SAMT) or a Cache-Oblivious Look-ahead Array (COLA). All these tree structures have in common that they espouse a better write throughput but also have a more efficient behavior for scans because the data in the leaf nodes are stored in contiguous data blocks ordered by their keys (as opposed to scattered leaf nodes with other tree structures like the B-tree).

8.2.3 Redo logging

The memtable is kept in volatile memory until it is eventually flushed to disk. Data are only durable (ensured by backups and replication) when stored in the on-disk data files and hence data that are contained in the memtable are vulnerable to failures of the database server. For example, a crash of the server may wipe out the entire memory, or write errors may occur when flushing the memtable to disk. When restarting the server (or when trying to rewrite the data to disk) the data in the memtable have to be recovered. Recovery is achieved with the help of an on-disk log file that keeps track of all records that are appended to the memtable but have not yet been flushed to the disk. Note that this means that all data have to be written twice: once to the log file and then to the memtable. Inside the log file, each record received a log sequence number (LSN) that maintains the chronological order of write operations. Because data are stored to the on-disk log file before (that is, ahead of) appending them to the memtable, this process is often called write-ahead logging (see Figure 8.5.).

image

Fig. 8.5. Write-ahead log on disk

One peculiarity of extensible record stores is that they assume redo-logging to be sufficient: When restarting the database server after a failure the memtable is reconstructed from the log file by re-executing all the operations in the log as ordered by their LSNs. This restriction is justified by the fact that extensible record stores support neither transactions that range over several rows nor long-lived transactions which would have to be logged completely before appending their operations to the memtable. Because redo-logging of such complex transaction leads to a slow write performance, undo-logging would be necessary when such a transaction aborts prematurely: as the transaction did not complete, all operations that so far have been executed by the transaction have to be rolled back.

Although the logging mechanism adds an additional overhead to the write process, it in fact improves overall write performance: while appending a record to the log file in chronological order is fast, sorting the records by key is slower and can be deferred until flushing the memtable; moreover, flushing the memtable corresponds to a batch write of all records to sequential data blocks of a new data file – this is much more efficient than inserting each record at its correct position one at a time. Last but not least, writes to the same key can be coalesced and only the most recent record has to be flushed to disk – in particular, no record for a key must be written to disk at all if an upsert for the key is masked by a tombstone for the key in the memtable.

image

fig 8.6. Compaction on disk

8.2.4 Compaction

After some time, several flushes of memtables will have occurred and hence there will be quite a lot of data files stored on disk. These data files will most probably contain some outdated records: records whose time-to-live value has passed, records for which more recent versions exist and for which the maximum number of stored versions is exceeded, or records which are masked by a tombstone. Outdated records not only unnecessarily occupy disk space, they also slow down read processes because they have to be loaded and compared with other records in the combine process of data retrieval (see Section 8.2.1). This is why a process called compaction was devised to remove any unwanted records and merge a set of data files into a new one. As sketched in Figure 8.6., a set of data files is chosen for compaction, their records are merged and the result is written to a new larger data file (at a new location on disk); finally, the small input data files can be deleted. More specifically, a minor compaction merges only a small subset of all data files, whereas a major compaction merges all data files into a single new one.

Several things have to be considered during compaction:

The records of all key-value pairs in the data files have to be sorted by their keys and hence reordering and restructuring of the index is necessary.

At the same time, time-to-live values have to be interpreted so that expired records can simply be ignored.

If one of the data files contains a tombstone, all data that are masked by the tomb-stone and have been written prior to the tombstone can be ignored. Note that records that are masked by the tombstone but have been written after the insertion of the tombstone (because they are contained in a more recent data file as identified by the data file sequence number) are handled differently: these records are merged into the new data file but will still be masked by the tombstone if it is a minor compaction. Tombstones themselves can only be deleted during major compaction; this means that only after a major compaction more recent records for a key will be visible because they would previously be masked by the tombstone. This somehow incoherent behavior is usually chosen to simplify the compaction process and the interpretation of tombstones during a read process. Other semantics of deletions can be enforced but this would require data retrieval as well as minor compactions to be more involved.

In some extensible record stores, versioning settings are also enforced during compaction: only a specified amount of versions for each key is kept at the maximum. For example, if the maximum amount of versions to be stored is set to three, for each key the records with the three most recent timestamps are copied to the compacted data file while all records with older timestamps are ignored.

Last but not least, changing column family settings can be done during major compaction: when settings (like the data type or sorting order of columns) have been modified, the new settings will be applied during compaction to the records in the older sorted data files. That is, after major compaction all records are consolidated to the new settings. This can be seen as an automatic support for schema evolution .

Compaction is demanding with respect to disk space: sufficient disk capacity is needed when smaller data files are merged into a new larger data file. While the compaction is run, roughly twice the disk space as occupied by the smaller data files is needed. The smaller data files will however be discarded after compaction (as soon as all read processes on them have finished) effectively releasing the disk space.

Several heuristics can be applied when choosing data files for minor compaction. Oldest files first: Data files may be chronologically ordered (as by their data file sequence number) and the oldest files with the lowest sequence number are chosen for compaction; that is, older records migrate into larger compacted data files first. If the data file sequence number is needed during data retrieval to avoid timestamp clashes, only data files with a continuous range of sequence numbers can be merged to maintain the global chronological order.
Small files first: In order to obtain a homogeneous set of data files, several similar sized smaller files are merged into one larger file.
Compaction threshold: The user can configure a minimum number of files for which a compaction is run. For smaller amount of files compaction is deemed unnecessary and inefficient.

Note that with these heuristics, the same record may be chosen for minor compaction several times and the records unnecessarily often migrates from smaller files into larger files. Furthermore, the size of the resulting compacted files cannot be controlled. To avoid these issues, leveled compaction has proved to be advantageous. With leveled compaction, the data files are organized into levels as shown in Figure 8.7.. Each level contains a fixed amount of data files; data files in lower levels are smaller than data files in higher levels. A flush always moves the memtable to a data file in the lowest level L 0. Subsequent compaction steps move a record only from one level to the next, so that the amount of merges for each record is bounded by the number of levels. It is also helpful for an efficient merge process to assign non-overlapping key ranges to the data files inside each level; in this way the merge only involves one of the data file in the next level while all other data files in the level remain unaffected. Key ranges also improve the read process because not all data files have to be accessed to search for a key.

image

fig 8.7. Leveled compaction

8.2.5 Bloom filters

A Bloom filter is a probabilistic method to determine set membership for a given value. To be more specific, for a given value c and a set S of values, the Bloom filter is a small bit vector with which we can decide whether c is included in S without actually searching for c in S – but it comes with a small probability of error. Hence, the following cases of outcomes can be considered:

True positive: A true positive means that the Bloom filter correctly reports a match and confirms that c is an element of S ; hence c ∈ S indeed holds.

False positive: A false positive means that the Bloom filter wrongly reports a match; that is, it assumes that c is included in S although it is not.

True negative: A true negative means that the Bloom filter correctly reports a miss and states that c is not an element of S ; hence c ∉ S indeed holds.

False negative: A false negative means that the Bloom filter erroneously reports a miss saying that c is not included in S although in fact c ∈ S holds.

What makes Bloom filters a good choice for a quick membership pre-test is that only one kind of error arises: False positives indeed happen with a certain probability. Hence, when the Bloom filter reports a match (saying c is an element of S ), we cannot be sure if this is true: we have to actually search for c in S to verify whether c ∈ S holds or whether the Bloom filter wrongly believed c to be an element of S although in fact c ∉ S holds. In contrast, false negatives will never occur with a Bloom filter. In other words, there will only be true negatives: when the Bloom filter decides that c ∉ S , we can simply skip searching for c in S .

image

fig 8.8. Bloom filter for a data file

For extensible record stores, Bloom filters can be maintained for all the row keys in a data file with the following positive effect: When searching for a given query key in the file, first the Bloom filter is accessed with the query key. If the Bloom filter reports a miss, this will be a true negative; hence, we do not have to access any data (and not even the index) inside this data file. In case the Bloom filter reports a match, we have to load the index and search it for the query key (and potentially access the appropriate block and scan it for the query key) to check whether the match was a true positive or a false positive. In case of a false positive, the key will not be found in the data file. For small data files, a single Bloom filter can be appended at the end of the data file. See Figure 8.8. for an illustration with the trailer having a pointer to the start of the Bloom filter entry. For larger files with lots of keys, a single Bloom filter will be large, too. This will result in performance delays when searching for a row key in the data file. To remedy this and in conjunction with an index, a Bloom filter can be broken into pieces (similar to the multi-level index): a small Bloom filter “chunk” is maintained for the keys in each block; the Bloom filter chunk is then queried for the existence of a key before actually accessing data in the corresponding block. A further extension is to not only have a Bloom filter for row keys but instead to maintain a Bloom filter for the combination of row key and column qualifier. With such a row-key-column-qualifier Bloom filter we only have to search the row for the given column qualifier in case of a match; yet, in case the filter reports a miss, we do not access the data record at all.

More formally, a Bloom filter is a bit vector of a chosen length m with every position initialized to 0. The bit vector is accompanied by k different hash functions h 1 , . . . , hk where each hash function is assumed to map an arbitrary value c randomly to a number between 1 and m – that is, hi (c ) ∈ {1, . . . , m} . More precisely, these hash functions should not only map values randomly but also uniformly to the range 1, . . . , m : for each number d between 1 and m , the probability that an input value c is mapped to d should be equal to image this can be written as Prob (hi (c ) = d ) = image . The case that two different input values c and c' are mapped to the same value (that is, hi (c ) = hi (c' )) is called a collision . Collisions are the reason why false positives can occur with Bloom filters as we will see shortly.

Let us now have a look at how Bloom filters improve query performance in extensible record stores. When a record for a row key is inserted, we compute all k hash values of the key and fo r hi (key ) = d , the d-th bit in the Bloom filter is set to 1. See Figure 8.9. for a Bloom filter of length m = 16 that uses three hash functions. The Bloom filter is initially all-zero. When the first record with key key 1 is added, the three hash values for key 1 are calculated and the corresponding bits (1, 3 and 8) in the Bloom filter are set to 1. The same steps are executed when the second record with key key 2 is added; this time the hash values of key 2 result in the bits 6, 10 and 13 to be flipped. When querying for a certain row key, the k hash functions are also applied to the query key. All these k hash values are then compared with the Bloom filter: when for each i = 1 . . . k and each hash value hi (query ) = d ', the d '-th bit of the Bloom filter is 1, then we have a match. Note in Figure 8.9. that for query 1 all three bits corresponding to the query’s hash values in the Bloom filter are 1; in Figure 8.9. (d) this corresponds to a true positive if indeed key 1 = query 1 . Due to collisions of the hash functions, this match can however be a false positive: the matching bits could have been set by a set of keys other than query 1 which however happen to have one or more hash values identical with query 1 ; this is shown in Figure 8.9. (e). This is why we have to access the appropriate data block in the data file and search for the query key there. On the other hand, when for a query key one of its hash values corresponds to a 0 bit in the Bloom filter, then we can be sure that this particular key so far has not been added to the data file.

This kind of basic Bloom filter works nicely (and without false negatives) as long as keys are only added to the store. To support deletions of keys, the basic Bloom filter has to be extended. For example counting Bloom filters keep a counter of how many times a bit was set to one: for an insertion the counter is incremented and for a deletion the counter is decremented. Luckily, with extensible record stores, such extensions are not needed: The data files are immutable and hence Bloom filters have to be computed once when creating a new data file – that is, when flushing the memtable or when compacting a set of data files.

The probability of false positives for Bloom filter settings (given by the number n of elements in the input set S , the length m of the Bloom filter bit vector and the amount k of hash functions) can be approximated. With such an approximation, we can choose parameters n , m and k such that the occurrence of false positives is minimized – and hence we have a high accuracy of the Bloom filter. We reiterate a common approximation for the probability of false positives below. This approximation is only valid for large Bloom filters (that is, large values of the bit vector length m ). In other words, for small Bloom filters, the expected amount of false positives can be significantly higher. To obtain a lower bound for the false positive probability, recall that the probability that hashing an input value with one hash function sets a certain bit to 1 is image (due to uniformity of hash functions). By reversing this argument, the probability that a bit in

image

fig 8.9. A Bloom filter of length m = 16 with three hash functions

the bit vector is not set to 1 is

image

After applying all k hash functions to an input value, the probability of a bit not being set to 1 is

image

And after hashing all n input values this probability turns into

image

Again by reversing the argument the probability that one bit is indeed set to 1 is:

image

Given this probability, what is now the probability of a false positive? A false positive happens when applying all k hash functions on the test value leads to k collisions. In other words, the k bits in the bit vector that are computed for the test value are already set to 1. The probability for this case happening is usually taken to be

image

This is a simplified lower bound which is not entirely correct; this approximation is nevertheless used often and for sufficiently large values of m and relatively small values of k there is only a slight difference to the correct false positive rate (which is however derived by a much more complex formula).

In abstract terms, we learn the following rules of thumb: the more elements (n ) are added to a Bloom filter, the lower the accuracy; the more hash functions (k ) are used, the higher the accuracy (but more computation needs to be done); the larger the bit vector (m ) the higher the accuracy (but more space is used).

Equation 8.1 is then often approximated by using the Euler number e and the fact that asymptotically (that is, for large m ) the expression image tends towards e −1 :

image

By doing some math with this formula, one can on the one hand derive the optimum value for k – that is, the number of hash functions for which the false positive probability is minimized. This value for kopt is calculated to be around the fraction of m divided by n multiplied with the natural logarithm of 2: image As the result of this calculation is usually not a natural number we can choose the next integer that is greater than or less than the result. For example, assume that m = 16 and n = 4 (and hence m = 4n ), then image By inserting several values for k into Equation 8.2, we see that indeed k = 3 has the lowest approximate false positive probability as shown in Table 8.2.. In general however, we see that m = 4n is obviously not a good choice because the probabilities are much too high for any practical application. Fortunately, these values quickly degrade for larger m . For example, for m = 8n we see that kopt image 5.55 and the approximate false positive probability is a lot lower as shown in Table 8.3..

Table 8.2. False positive probability for m = 4 • n

m = 4 • n approximate false positive probability by Equation 8.2
k = 1 0.221 (22.1%)
k = 2 0.155 (15.5%)
k = 3 0.147 (14.7%)
k = 4 0.160 (16.0%)

Table 8.3. False positive probability for m = 8 • n

m = 8 • n approximate false positive probability by Equation 8.2
k = 3 0.031 (3.1%)
k = 4 0.024 (2.4%)
k = 5 0.022 (2.2%)
k = 6 0.022 (2.2%)
k = 7 0.023 (2.3%)

On the other hand, we can also fix our desired false positive probability and then calculate the size m of the Bloom filter that is needed to ensure the desired probability. Indeed, in some extensible record stores, the desired probability can be configured by the user; decreasing this probability then ultimately results in larger data file sizes because the Bloom filters occupy more space.

The bottleneck of practical Bloom filters is the abundant computation of hash values. To alleviate this problem a partition scheme for the Bloom filter can be used resulting in a partitioned Bloom filter (as introduced in [KM08]): Two hash functions h 1 and h 2 can simulate the entire set of k hash functions. More precisely, k hash functions g 0 , . . . gk −1 can be derived by letting

gi (x ) = h 1 (x ) + ih 2 (x )

for i ranging from 0 to k − 1. Asymptotically, for large values of m , the same approximation for the false positive probability can be achieved as in Equation 8.2. Hence we can get an (approximately) equally accurate Bloom filter by only computing two hash functions instead of k . The partition schema derives its name from the fact that the bit vector is split into k different partitions each of size image By further assuming that h 1 and h 2 range over [0, . . . , m '−1] and computing the summation gi modulo m ', the i-th hash value (that is, gi ) indeed occupies a bit in the i-th partition. Figure 8.10. shows and example for four partitions each of length four.

image

fig 8.10. A partitioned Bloom filter with k = 4 and partition length m 0 = 4

8.3 Implementations and Systems

This section briefly surveys some features of currently available open source extensible record stores.

8.3.1 Apache Cassandra

Cassandra stores column families in a keyspace. The Cassandra query language (CQL) can be used to interact with the database system and CQL commands can be input into the CQL shell (cqlsh).

image Web resources:

Apache Cassandra: http://cassandra.apache.org/

documentation page: http://docs.datastax.com/en/getting_started/

GitHub repository: https://github.com/apache/cassandra

A keyspace can be created with certain settings for the replication.

CREATE KEYSPACE library

WITH REPLICATION = {

’class’ : ’SimpleStrategy’,

‘replication_factor’ : 3 };

The create table command creates a new column family in the keyspace (this command is identical to the create column family command). That is, in Cassandra the term table corresponds to a column family – which is in contrast to other extensible record stores that use the term table as a container for column families.

CREATE TABLE bookinfo (

bookid int PRIMARY KEY,

title text,

author text

);

Insertions are done with the insert command:

INSERT INTO bookinfo (bookid, title, author)

VALUES (1002,’Databases’,’Miller’);

An index can be created on columns other than the primary key to enable filtering on column values:

CREATE INDEX ON bookinfo (title);

CREATE INDEX ON bookinfo (author);

Each Cassandra column family (table) has a primary key. The primary key can be composed of several columns (a so-called compound key) where the first component (column) of the primary key is called the partition key; on the partition key, the data is split into partitions that will be distributed among the database servers. The other components of the primary key are used to sort (cluster) the data inside a partition. Even more sophisticated, the partition key itself (that is, the first component of the primary key) can be composite and hence consist of a tuple of columns. In particular, if the partitions defined by a simple partition key are too large, a composite partition key can help split the data in to smaller partitions.

Cassandra supports the collection types set, list and map for cell values; internally, each element of a collection is store inside a separate column. For example, we can create an attribute as a set of texts, and then update values in the set (where addition (+) insertion of an element subtraction (-) denotes removal of an element). After creating an index on its values, the set can be searched with the contains statement. For the book example, we can change the author to allow a set of texts and then issue a select statement to find those books where a certain author is contained in the set:

CREATE TABLE bookinfo (

bookid int PRIMARY KEY,

title text,

author set<text>

);

INSERT INTO bookinfo (bookid, title, author)

VALUES(1002, ’Databases’, {’Miller’});

UPDATE bookinfo SET author = author + {’Smith’} WHERE bookid = 1002;

CREATE INDEX ON bookinfo (author);

SELECT title,author FROM bookinfo WHERE author CONTAINS ’Miller’;

Moreover Cassandra allows to define nested tuples and user-defined types.

The Cassandra Java Driver comes with some features including asynchronous communication, load balancing, and node discovery. In particular, a client application can connect to a server (identified by its server name as a string) in a cluster. A session object can be obtained from the cluster and CQL statements can then be executed on the session object.

Cluster cluster = Cluster.builder()

.addContactPoint(“localhost”).build();

Session session = cluster.connect();

session.execute(

“CREATE TABLE TABLE bookinfo (“+

“ bookid int PRIMARY KEY,”+

“ title text,”+

“ author text”+

“);”);

The execute method returns a Result object that can be used to access the results of a selection query:

ResultSet results = session.execute(“SELECT * FROM bookinfo “+

“WHERE title = ‘Databases’);”);

for (Row row : results) {

System.out.println(“Title: “ + row.getString(“title”)+

“, Author: “+row.getString(“author”));

}

Instead of filling in values in the CQL query string, statements can be parameterized: several “?” markers can be used in the CQL string. When executing the parameterized statement, the markers are replaced by the remaining parameters of the execute method in the order of occurrence:

session.execute(“INSERT INTO bookinfo (bookid, title, author)”+

“ VALUES (?, ?, ?)”, 1002, “Databases”, “Miller”);

Moreover, the session object has a prepare method to declare a prepared statement. A prepared statement is a method that is registered and precompiled into an execution plan at the servers sided such that an invocation of the statement only needs to pass the name of the statement and parameters. The prepared statement can be combined with the parameterization with “?” signs such that parameters passed to the prepared statement are inserted into the prepared statement in the order of occurrence in the invocation:

PreparedStatement prepStatement = getSession().prepare(“INSERT” + “ INTO bookinfo (bookid, title, author) VALUES (?, ?, ?);”);

A BoundStatement object is then used to invoke a prepared statement and insert (bind) parameters into it:

BoundStatement boundStatement = new BoundStatement(prepStatement);

getSession().execute(boundStatement.bind(1002,”Databases”,”Miller”));

The query builder API might be even more convenient to use in a Java program since no CQL statements must be written but instead a Select, Update, Insert or Delete object can be used to represent a query:

– the QueryBuilder.select method returns a Select object.

– the QueryBuilder.insert method returns an Insert object.

– the QueryBuilder.update method returns an Update object.

– the QueryBuilder.delete method returns a Delete object.

Method chaining can be used to set more restrictions for each operation. The Select object has for example a from method that defines the keyspace and the table (column family) name and a where method that specifies restrictions on columns:

Select select = QueryBuilder.select().all()

.distinct().from(“library”, “bookinfo”)

.where(eq(“title”, “Databases”));

ResultSet results = session.execute(select);

Similarly, the Insert object has a methods insertInto and value to insert data into a column family:

Insert insert = QueryBuilder.insertInto(“library”, “bookinfo”)

.value(“bookid”, 1002)

.value(“title”, “Databases”)

.value(“author”, “Miller”;

ResultSet results = session.execute(insert);

A mapping manager in conjunction with annotations (similar to JPA and JDO as described in Section 9.3) can be used to map Java object to cassandra tables.

8.3.2 Apache HBase

HBase stores tables in namespaces.

image Web resources:

Apache HBase: http://hbase.apache.org/

documentation page: http://hbase.apache.org/book.html

GitHub repository: https://github.com/apache/hbase

HBase offers a command line interface with its own commands like create, put and get. The table name and the column family name have to be specified in the create command; in our book example we create a table Book and a column family BookInfo:

create ‘book’, ‘bookinfo’

With the put command we can add information for a book where the BookID is used as the row key and the column name consists of the column family name (BookInfo) and the column qualifier (Author and Title) separated by a colon:

put ’book’, ’1002’, ’bookinfo:author’, ’Miller’

put ’book’, ’1002’, ’bookinfo:title’, ’Databases’

With the get command, the values of all columns in a row (for a row key) can be obtained:

get ’book’, ’1002’

With the scan command all rows in a table (or a subset of all rows) can be obtained:

scan ’book’

In the Java API, the Admin class is used to create a table (HTableDescriptor) and a column family as well as a column inside the table (both with HColumnDescriptor).

TableName tableName = TableName.valueOf(“book”);

HTableDescriptor table = new HTableDescriptor(tableName);

table.addFamily(new HColumnDescriptor(“bookinfo”));

HColumnDescriptor newColumn = new HColumnDescriptor(“author”);

Connection connection = ConnectionFactory.createConnection();

Admin admin = connection.getAdmin();

admin.addColumn(tableName, newColumn);

admin.createTable(table);

A Put object writes values to the database; note that is more efficient to keep the column family name and the column qualifier in static byte arrays to avoid repetitive conversions from String to byte array:

public static final byte[] CF = “bookinfo”.getBytes();

public static final byte[] COL = “author”.getBytes();

Put put = new Put(“1002”.getBytes());

put.add(CF, COL, “Miller”.getBytes());

table.put(put);

Note that put works as an upsert: it adds a new row to a table (if the key is new to the column family) or it updates an existing row (if the key already exists in the column family). A Get and a Scan object read values from the database:

Get get = new Get(“1002”.getBytes());

get.addColumn(CF, COL);

Result result = table.get(get);

byte[] b = result.getValue(CF, COL);

Scan scan = new Scan();

scan.addColumn(CF, COL);

ResultScanner scanner = table.getScanner(scan);

try {

for (Result r = rs.next(); r != null; r = rs.next()) {

System.out.println(r.toString()); }

} finally {

scanner.close();

}

table.close();

So-called client request filters enforce conditions on a query. For example, a single column value filter can implement an equality comparison; for example searching for an author called Miller:

public static final byte[] CF = “bookinfo”.getBytes();

public static final byte[] COL = “author”.getBytes();

SingleColumnValueFilter filter = new SingleColumnValueFilter(

CF, COL, CompareOp.EQUAL, Bytes.toBytes(“Miller”) );

scan.setFilter(filter);

Other filter types include RegexStringComparator, SubstringComparator, ColumnPrefixFilter, and ColumnRangeFilter.

Several settings can be configured by the user. For example, by changing the value of HColumnDescriptor.DEFAULT_VERSIONS the maximum number of stored versions for a cell can be altered . The default is 1, so that only one version of a cell will be returned for a query. Any other versions than the most recent one can be physically deleted during a compaction. In a get command, the number of returned versions as well as the recency of returned versions can be configured via Get.setMaxVersions() and Get.setTimeRange(). A minimum number of versions can also be specified that is interpreted in combination with time-to-live values: the database must maintain at least this minimum number of versions (the most recent ones according to their timestamp) so that a cell might even be retained (and not deleted during compaction) when its time-to-live value has expired. Compaction can be influenced by the administrator, too; for example by setting the minimum and maximum sizes that a file should have to be considered for compaction. Bloom filters are by default defined on the the row keys. However they can be configured to work on a combination of row key and column qualifier in a column family; this setting can be changed by calling HColumnDescriptor.setBloomFilterType or setting the property on the command line when creating a table; for example:

create ‘book’,{NAME => ‘bookinfo’, BLOOMFILTER => ‘ROWCOL’}

The row+column Bloom filter provides a benefit when often accessing individual columns – it is however not effective when data are only accessed by row key without restricting the column qualifier. Due to the increased amount of keys to be maintained by the Bloom filter, its space demand increases, too. Other settings that can be changed for Bloom filters are the error rate and the maximum number of keys per Bloom filter.

HBase nicely integrates with the features offered by Hadoop MapReduce: HBase can act as a data source and sink for MapReduce jobs. In this case, the Hadoop jobs should be defined in subclasses of the TableMapper and TableReducer classes. In addition, HBase can run code on the server side in so-called coprocessors which lets developers and administrators add functionality.

8.3.3 Hypertable

Hypertable stores tables in a namespace. Hypertable’s query language is called Hypergraph query language (HQL) which is similar to SQL. The Hypertable shell is Hypertable’s command line interface. HQL queries can be interpreted by the Hypertable shell, the Thrift API or the HqlInterpreter C++ class.

image Web resources:

Hypertable: http://hypertable.org/

documentation page: http://hypertable.com/documentation/

GitHub repository: https://github.com/hypertable/hypertable

Namespaces can be created with an HQL command and namespaces can be nested so that a namespace can be a subnamespace of another namespace.

CREATE NAMESPACE “/mynamespace”;

USE “/mynamespace”;

CREATE NAMESPACE “subnamespace”;

When creating a table, the column family names have to be specified:

CREATE TABLE book (bookinfo, lendinginfo);

Optionally, column families can be assigned to access groups; column families in the same access group will be stored together on disk.

The insert command inserts data; several tuples can be specified in one command and each tuples consists of a row key, the column name (consisting of column family name and column qualifier) and a value – a timestamp can be added as an optional property.

INSERT INTO book VALUES (“1002”, “bookinfo:title”,”Databases”),

(“1002”, “bookinfo:author”,”Miller”);

Selection statements can be specified on the column family level or at the column level. When specifying the column family, the entire set of columns inside the family is returned; for example:

SELECT bookinfo FROM book;

At the column level, all values in the column are returned:

SELECT bookinfo:title FROM book;

Moreover, conditions on the row key and conditions on the key (row key as well as column name) to access a cell can be specified; for example:

SElECT * FROM book WHERE ROW = ’1002’;

SELECT * FROM book WHERE CELL = ’1002’,’bookinfo:title’;

Other comparison operators like < and <= are possible to specify ranges; regular expression for string matching can be used, too. Hypertable supports indexes on cell values as well as on column qualifiers. A cell value index improves searches with selection conditions on cells whereas a column qualifier index improves searches with conditions on column qualifiers.

Compaction can be scheduled by using the compact command. In Hypertable, compaction can be executed on a single row key that can be specified in the command; for example:

COMPACT book “1002”

8.3.4 Apache Accumulo

Apache Accumulo stores tables in namespaces. In order to switch between names-paces, the namespace is prepended to the table name separated by a dot.

image Web resources:

Apache Accumulo: http://accumulo.apache.org/

documentation page: http://accumulo.apache.org/1.7/accumulo_user_manual.html

GitHub repository: https://github.com/apache/accumulo

Accumulo offers a command line interface called Accumulo Shell. The shell can be used for data management (creating tables, writing values, and scanning for values) and for user management (creating users, granting privileges to users, and logging in as a user). Some commands are for example:

createtable book

insert 1002 bookinfo title “Databases”

scan

createuser alice

grant System.CREATE_TABLE-s-u alice

revoke System.CREATE_TABLE-s-u alice

user alice

Accumulo comes with an authentication and authorization framework for users. Accumulo requires an authentication token to be passed along with every request. In the simplest case, this token is obtained by a PasswordToken object that represents the password of the user; more sophisticated authentication mechanisms like Kerberos can be included. A connection to the database is established by creating a ZooKeeper instance (for a certain database name and a string containing one or more names or IP addresses of underlying ZooKeeper servers) and then passing in the user name and the password token.

Instance inst = new ZooKeeperInstance(“db1”, “localhost:2181”);

Connector conn = inst

.getConnector(“username”, new PasswordToken(“password”));

The connector object is the interface to operate with the database: it offers table operations (for example to create a table):

conn.tableOperations().create(“book”);

A Writer object in addition requires a Mutation object that stores the requested updates.

BatchWriter bw = connector.createBatchWriter(

“book”, new BatchWriterConfig());

Mutation mut = new Mutation(“1002”.getBytes());

mut.put(“bookinfo”.getBytes(), “title”.getBytes(),

System.currentTimeMillis(), “Databases”.getBytes());

mut.put(“bookinfo”.getBytes(), “author”.getBytes(),

System.currentTimeMillis(), “Miller”.getBytes());

bw.addMutation(mut);

bw.flush();

bw.close();

A Scanner object can be configured (setting the range of rows and the columns to read) to fetch data from the database:

Scanner scanner = conn.createScanner(“book”, Authorizations.EMPTY);

scanner.setRange(new Range(new Text(“1002”)));

scanner.fetchColumn(new IteratorSetting.Column(“bookinfo”,”title”));

for (Entry<Key,Value> entry : scanner)

System.out.println(“Key: “ +

entry.getKey().toString() +

“ Value: “ +

entry.getValue().toString());

When using the authentication framework, the authentication token can be passed to the databases as part of any command that is issued by the user. Accumulo’s authorization system is based on security labels: each cell has a visibility assigned to it; and each user has a set of authorizations assigned. The visibility of a cell is set when it is written or updated in the Mutator object; for example the visibilty can be set to public:

mut.put(“bookinfo”.getBytes(),

“title”.getBytes(),

new ColumnVisibility(“public”),

System.currentTimeMillis(),

“Databases”.getBytes());

When a scanner is created, it can get passed a set of authorizations that the user is requesting to see. If the requested authorizations are not a subset of the authorizations assigned to the user, execution will stop with an exception being thrown.

Authorizations auths = new Authorizations(“public”);

Scanner scan = conn.createScanner(“book”, auths);

Insertion constraints can be registered with a table. Such a constraint can restrict the insert commands that are allowed for the table. Insertions that are prohibited according to the specified constraints will then be rejected. Each constraint has to implement the interface org.apache.accumulo.core.constraints.Constraint, and then be deployed in a JAR file stored in the lib folder of the Accumulo installation and then registered with the table with the constraint command on the command line.

Compaction can be configured in Accumulo by setting the compaction ratio and the maximum number of files in a tablet (which is a set of data files on disk). The compaction ratio looks at a set of files considered for compaction: the compaction ratio (by default 3) is multiplied with the size of the largest file in the set; if this value is smaller than the sum of the sizes of all files in the set, then compaction takes place. This process starts by considering all files in a tablet as the compaction set and keeps disregarding the largest file until the condition holds.

8.4 Bibliographic Notes

The Google system called BigTable is described in [CDG+ 06]; it can be seen as the foundation of extensible record stores. The Cassandra system started as a project at Facebook and was described by Lakshman [LM10]. HBase is part of the Hadoop project (which also includes a distributed file system and a map-reduce framework); an application of Hadoop in a distributed setting at Facebook is detailed in [BGS+ 11]. A further open source extensible record store is Accumulo [BAB+ 12].

Several experimental approaches for sorted and indexed storage structures (including discussions of compaction strategies) can be found in [GHMT12, Spi12, TBM+ 11, MA11, BFCF+ 07, BDF+ 10]; the log-structured merge tree in [OCGO96] can be seen as the predecessor of these approaches. Write-ahead logging was recently analyzed by Sears and Brewer [SB09]. Bloom filters are named after the author of the seminal article [Blo70]. Lower and upper bound for the false positive rate of Bloom filters is examined by Bose et al [BGK+ 08]. Tarkoma, Rothenberg and Lagerspetz [TRL12] provide a comprehensive overview of advanced Bloom filters (like counting Bloom filters and deletable Bloom filters) and describe recent applications of Bloom filters in distributed systems. Partitioning for Bloom filters was introduced by Kirsch and Mitzenmacher [KM08].

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.139.240.244