Using the Accumulo database

We have seen a method to read our personRdd object from Elasticsearch and this forms a simple and neat solution for our storage requirements. However, when writing commercial applications, we must always be mindful of security and, at the time of writing, Elasticsearch security is still in development; so it would be useful at this stage to introduce a storage mechanism with native security. This is an important consideration we are using GDELT data that is, of course, open source by definition. In a commercial environment, it is very common for datasets to be confidential or commercially sensitive in some way, and clients will often request details of how their data will be secured long before they discuss the data science aspect itself. It is the authors experience that many a commercial opportunity is lost due to the inability of solution providers to demonstrate a robust and secure data architecture.

Accumulo (http://accumulo.apache.org) is a NoSQL database based on Google's Bigtable design (http://research.google.com/archive/bigtable.html) and was originally developed by the United States National Security Agency, which was subsequently released to the Apache community in 2011. Accumulo offers us the usual big data advantages such as bulk loading and parallel reading but also has some additional capabilities such as Iterators, for efficient server and client-side precomputation, data aggregation and, most importantly, cell-level security.

For our work in community detection, we will use Accumulo to take advantage specifically of its iterator and cell-level security features. First of all, we should set up an Accumulo instance and then load some data from Elasticsearch to Accumulo you can find the full code in our GitHub repository.

Setup Accumulo

The steps required to install Accumulo are out of the scope of this book; there are several tutorials available on the Web. A vanilla installation with a root user is all that is required to continue with this chapter, although we need to pay particular attention to the initial security setup in the Accumulo configuration. Once you run the Accumulo shell successfully, you are ready to proceed.

Use the following code as a guideline to creating users. The aim is to create several users with different security labels so that when we load the data, the users will have varying access to it.

# set up some users
createuser matt
createuser ant
createuser dave
createuser andy

# create the persons table
createtable persons

# switch to the persons table
table persons

# ensure all of the users can access the table
grant -s System.READ_TABLE -u matt
grant -s System.READ_TABLE -u ant
grant -s System.READ_TABLE -u dave
grant -s System.READ_TABLE -u andy

# allocate security labels to the users
addauths -s unclassified,secret,topsecret -u matt
addauths -s unclassified,secret -u ant
addauths -s unclassified,topsecret -u dave
addauths -s unclassified -u andy

# display user auths
getauths -u matt

# create a server side iterator to sum values
setiter -t persons -p 10 -scan -minc -majc -n sumCombiner -class
org.apache.accumulo.core.iterators.user.SummingCombiner

# list iterators in use
listiter –all

# once the table contains some records ...
user matt

# we'll see all of the records that match security labels for the user
scan

Cell security

Accumulo protects its cells using tokens. Tokens are made up of labels; in our case, these are [unclassified], [secret], and [topsecret], but you can use any comma-delimited values. Accumulo rows are written with a visibility field (refer to the following code) that is simply a string representation of the labels required to access a row value. The visibility field can contain Boolean logic to combine different labels and also allows for basic precedence, for instance:

secret&topsecret (secret AND topsecret)
secret|topsecret (secret OR topsecret)
unclassified&(secret|topsecret) (unclassified AND secret, or unclassified AND topsecret)

A user has to match at least the visibility field in order to be granted access, and must supply labels that are a subset of his token stored in Accumulo (or the query will be rejected). Any values that are not matched will simply not be returned in the user query, this is an important point because if there is some indication to the user that data is missing, it is often possible for the user to draw logical, correct (or often worse, incorrect) conclusions about the nature of the data, for example, in a contact chain of people, if some vertices are available to a user and some not, but the unavailable vertices are marked as such, then the user might be able to determine information about those missing entities based on the surrounding graph. For example, a government agency investigating organized crime may allow senior employees to view an entire graph, but junior employees to only view parts of it. Let's say some well-known persons are shown in the graph, and there is a blank entry for a vertex, then it might be straightforward to workout who the missing entity is; if this placeholder is absent altogether, then there is no obvious indication that the chain stretches any further, thus allowing the agency to control dissemination of information. The graph is still of use to analysts, however, who are oblivious to the link and can continue working on specific areas of the graph.

Iterators

Iterators are a very important feature in Accumulo and provide a real-time processing framework, which leverages the power and parallelization of Accumulo, to produce modified versions of data at very low latency. We won't go into great detail here as the Accumulo documentation has plenty of examples, but we will use an iterator to keep a sum of the values for the same Accumulo row, that is, the number of times we have seen the same person pair; and this will be stored in that row value. This iterator will then appear to take effect whenever the table is scanned; we will also demonstrate how to invoke the same Iterator from the client side (for use when it has not been applied to the server).

Elasticsearch to Accumulo

Let's take advantage of Spark's ability to use Hadoop input and output formats, which leverage the native Elasticsearch and Accumulo libraries. It is worth noting that there are different routes that we could take here, the first is to use the Elasticsearch code given earlier to produce an array of string tuples and feed that into AccumuloLoader (found in the code repository); the second is to explore an alternative using additional Hadoop InputFormat; we can produce code that reads from Elasticsearch using EsInputFormat and writes to Accumulo using AccumuloOutputFormat class.

A graph data model in Accumulo

Before delving into the code, it is worth describing the schema we will be using to store a graph of persons in Accumulo. Each source node (person A) will be stored as a row key, the association name (such as "is also known as") as a column family, the destination node (person B) as a column qualifier, and a default value of 1 as a column value (that will be aggregated thanks to our iterator). This is reported here in Figure 2:

A graph data model in Accumulo
Figure 2: Graph data model on Accumulo

The main advantage of such a model is that given an input vertex (a person's name), one can quickly access all its known relationships through a simple GET query. The reader will surely appreciate the cell level security where we hide a particular edge triplet [personA] <= [relationB] => [personD] from most Accumulo users with no [SECRET] authorization granted.

The downside of such a model is that, compared to a graph database (such as Neo4J or OrientDB), traversing queries such as a depth first search would be terribly inefficient (we would need multiple recursive queries). We delegate any graph processing logic to GraphX later in this chapter.

Hadoop input and output formats

We use the following maven dependency in order to build both our input/output formats and our Spark client. The version obviously depends on the distribution of Hadoop and Accumulo installed.

<dependency>
  <groupId>org.apache.accumulo</groupId>
  <artifactId>accumulo-core</artifactId>
  <version>1.7.0<version>
</dependency>

We configure for reading from Elasticsearch through the ESInputFormat class. We extract a key-value pair RDD of Text and MapWritable, where the key contains the document ID and the value of all the JSON documents wrapped inside of a serializable HashMap:

val spark = SparkSession
  .builder()
  .appName("communities-loader")
  .getOrCreate()
 
val sc = spark.sparkContext
val hdpConf = sc.hadoopConfiguration
 
// set the ES entry points
hdpConf.set("es.nodes", "localhost:9200")
hdpConf.set("es.resource", "gzet/articles")
 
// Read map writable objects
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.MapWritable
import org.elasticsearch.hadoop.mr.EsInputFormat
 
val esRDD: RDD[MapWritable] = sc.newAPIHadoopRDD(
  hdpConf,
  classOf[EsInputFormat[Text, MapWritable]],
  classOf[Text],
  classOf[MapWritable]
).values

An Accumulo mutation is similar to a put object in HBase, and contains the table's coordinates such as row key, column family, column qualifier, column value, and visibility. This object is built as follows:

def buildMutations(value: MapWritable) = {
 
  // Extract list of persons
  val people = value
    .get("person")
    .asInstanceOf[ArrayWritable]
    .get()
    .map(_.asInstanceOf[Text])
    .map(_.toString)

  // Use a default Visibility
  val visibility = new ColumnVisibility("unclassified")

  // Build mutation on tuples
  buildTuples(people.toArray)
    .map {
      case (src, dst) =>
        val mutation = new Mutation(src)
        mutation.put("associated", dst, visibility, "1")
        (new Text(accumuloTable), mutation)
    }

We use the aforementioned buildTuples method to calculate our person pairs and write them to Accumulo using the Hadoop AccumuloOutputFormat. Note that we can optionally apply a security label to each of our output rows using ColumnVisibility; refer to Cell security, which we saw earlier.

We configure for writing to Accumulo. Our output RDD will be a key-value pair RDD of Text and Mutation, where the key contains the Accumulo table and the value the mutation to insert:

// Build Mutations
val accumuloRDD = esRDD flatMap buildMutations

// Save Mutations to Accumulo
accumuloRDD.saveAsNewAPIHadoopFile(
  "",
  classOf[Text],
  classOf[Mutation],
  classOf[AccumuloOutputFormat]
)

Reading from Accumulo

Now that we have our data in Accumulo, we can use the shell to inspect it (assuming we select a user that has enough privileges to see the data). Using the scan command in Accumulo shell, we can simulate a specific user and query, therefore validating the results of io.gzet.community.accumulo.AccumuloReader. When using the Scala version, we must ensure that the correct Authorization is used-it is passed into the read function via a String, an example might be "secret,topsecret":

def read(
  sc: SparkContext,
  accumuloTable: String,
  authorization: Option[String] = None
)

This method of applying Hadoop input/output format utilizes static methods within the Java Accumulo library (AbstractInputFormat is subclassed by InputFormatBase, which is subclassed by AccumuloInputFormat). Spark users must pay particular attention to these utility methods that alter the Hadoop configuration via an instance of a Job object. This can be set as follows:

val hdpConf = sc.hadoopConfiguration
val job = Job.getInstance(hdpConf)

val clientConfig = new ClientConfiguration()
  .withInstance(accumuloInstance)
  .withZkHosts(zookeeperHosts)

AbstractInputFormat.setConnectorInfo(
  job,
  accumuloUser,
  new PasswordToken(accumuloPassword)
)

AbstractInputFormat.setZooKeeperInstance(
  job,
  clientConfig
)

if(authorization.isDefined) {
  AbstractInputFormat.setScanAuthorizations(
    job,
    new Authorizations(authorization.get)
  )
}

InputFormatBase.addIterator(job, is)
InputFormatBase.setInputTableName(job, accumuloTable)

You will also notice the configuration of an Accumulo iterator:

val is = new IteratorSetting(
  1,
  "summingCombiner",
  "org.apache.accumulo.core.iterators.user.SummingCombiner"
)

is.addOption("all", "")
is.addOption("columns", "associated")
is.addOption("lossy", "TRUE")
is.addOption("type", "STRING")

We can use client or server-side iterators and we have previously seen an example of server-side when configuring Accumulo via the shell. The key difference is that client-side Iterators are executed within the client JVM, as opposed to server-side, which leverage the power of the Accumulo tablet servers. A full explanation can be found in the Accumulo documentation. However, there are many reasons for selecting a client or server-side Iterator including choices over whether tablet server performance should be compromised, JVM memory usage, and so on. These decisions should be made when creating your Accumulo architecture. At the end of our AccumuloReader code, we can see the calling function that produces an RDD of EdgeWritable:

val edgeWritableRdd: RDD[EdgeWritable] = sc.newAPIHadoopRDD(
  job.getConfiguration,
  classOf[AccumuloGraphxInputFormat],
  classOf[NullWritable],
  classOf[EdgeWritable]
) values

AccumuloGraphxInputFormat and EdgeWritable

We have implemented our own Accumulo InputFormat to enable us to read Accumulo rows and automatically output our own Hadoop Writable; EdgeWritable. This provides for a convenience wrapper to hold our source vertex, our destination vertex, and the count as edge weight, which can then be used when building the graph. This is extremely useful as Accumulo uses the iterator discussed earlier to calculate the total count for each unique row, thereby removing the need to do this manually. As Accumulo is written in Java, our InputFormat uses Java to extend InputFormatBase, thus inheriting all of the Accumulo InputFormat default behavior, but outputting our choice of schema.

We are only interested in outputting EdgeWritables; therefore, we set all of the keys to be null (NullWritable) and the values to EdgeWritable, an additional advantage being that values in Hadoop only need to inherit from the Writable Interface (although we have inherited WritableComparable for completeness, and EdgeWritable can therefore be used as a key, if required).

Building a graph

Because GraphX uses long objects as an underlying type for storing vertices and edges, we first need to translate all of the persons we fetched from Accumulo into a unique set of IDs. We assume our list of unique persons does not fit in memory, or wouldn't be efficient to do so anyway, so we simply build a distributed dictionary using the zipWithIndex function as shown in the following code:

val dictionary = edgeWritableRdd
  .flatMap {
    edge =>
      List(edge.getSourceVertex, edge.getDestVertex)
  }
  .distinct()
  .zipWithIndex()
  .mapValues {
    index =>
      index + 1L
  }
}

dictionary.cache()
dictionary.count()

dictionary
  .take(3)
  .foreach(println)

/*
(david bowie, 1L)
(yoko ono, 2L)
(john lennon, 3L)
*/

We create an edge RDD using two successive join operations onto our person tuples and finally build our weighted and directed graph of persons with vertices containing the person name, and edge attributes the frequency count of each tuple.

val vertices = dictionary.map(_.swap)

val edges = edgeWritableRdd
  .map {
    edge =>
      (edge.getSourceVertex, edge)
  }
  .join(dictionary)
  .map {
    case (from, (edge, fromId)) =>
      (edge.getDestVertex, (fromId, edge))
  }
  .join(dictionary)
  .map {
    case (to, ((fromId, edge), toId)) =>
      Edge(fromId, toId, edge.getCount.toLong)
  }

val personGraph = Graph.apply(vertices, edges)

personGraph.cache()
personGraph.vertices.count()

personGraph
  .triplets
  .take(2)
  .foreach(println)

/*
((david bowie,1),(yoko ono,2),1)
((david bowie,1),(john lennon,3),1)
((yoko ono,2),(john lennon,3),1)
*/
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.143.239.234