We have seen a method to read our personRdd
object from Elasticsearch and this forms a simple and neat solution for our storage requirements. However, when writing commercial applications, we must always be mindful of security and, at the time of writing, Elasticsearch security is still in development; so it would be useful at this stage to introduce a storage mechanism with native security. This is an important consideration we are using GDELT data that is, of course, open source by definition. In a commercial environment, it is very common for datasets to be confidential or commercially sensitive in some way, and clients will often request details of how their data will be secured long before they discuss the data science aspect itself. It is the authors experience that many a commercial opportunity is lost due to the inability of solution providers to demonstrate a robust and secure data architecture.
Accumulo (http://accumulo.apache.org) is a NoSQL database based on Google's Bigtable design (http://research.google.com/archive/bigtable.html) and was originally developed by the United States National Security Agency, which was subsequently released to the Apache community in 2011. Accumulo offers us the usual big data advantages such as bulk loading and parallel reading but also has some additional capabilities such as Iterators, for efficient server and client-side precomputation, data aggregation and, most importantly, cell-level security.
For our work in community detection, we will use Accumulo to take advantage specifically of its iterator and cell-level security features. First of all, we should set up an Accumulo instance and then load some data from Elasticsearch to Accumulo you can find the full code in our GitHub repository.
The steps required to install Accumulo are out of the scope of this book; there are several tutorials available on the Web. A vanilla installation with a root user is all that is required to continue with this chapter, although we need to pay particular attention to the initial security setup in the Accumulo configuration. Once you run the Accumulo shell successfully, you are ready to proceed.
Use the following code as a guideline to creating users. The aim is to create several users with different security labels so that when we load the data, the users will have varying access to it.
# set up some users createuser matt createuser ant createuser dave createuser andy # create the persons table createtable persons # switch to the persons table table persons # ensure all of the users can access the table grant -s System.READ_TABLE -u matt grant -s System.READ_TABLE -u ant grant -s System.READ_TABLE -u dave grant -s System.READ_TABLE -u andy # allocate security labels to the users addauths -s unclassified,secret,topsecret -u matt addauths -s unclassified,secret -u ant addauths -s unclassified,topsecret -u dave addauths -s unclassified -u andy # display user auths getauths -u matt # create a server side iterator to sum values setiter -t persons -p 10 -scan -minc -majc -n sumCombiner -class org.apache.accumulo.core.iterators.user.SummingCombiner # list iterators in use listiter –all # once the table contains some records ... user matt # we'll see all of the records that match security labels for the user scan
Accumulo protects its cells using tokens. Tokens are made up of labels; in our case, these are [unclassified
], [secret
], and [topsecret
], but you can use any comma-delimited values. Accumulo rows are written with a visibility
field (refer to the following code) that is simply a string representation of the labels required to access a row value. The visibility
field can contain Boolean logic to combine different labels and also allows for basic precedence, for instance:
secret&topsecret (secret AND topsecret) secret|topsecret (secret OR topsecret) unclassified&(secret|topsecret) (unclassified AND secret, or unclassified AND topsecret)
A user has to match at least the visibility
field in order to be granted access, and must supply labels that are a subset of his token stored in Accumulo (or the query will be rejected). Any values that are not matched will simply not be returned in the user query, this is an important point because if there is some indication to the user that data is missing, it is often possible for the user to draw logical, correct (or often worse, incorrect) conclusions about the nature of the data, for example, in a contact chain of people, if some vertices are available to a user and some not, but the unavailable vertices are marked as such, then the user might be able to determine information about those missing entities based on the surrounding graph. For example, a government agency investigating organized crime may allow senior employees to view an entire graph, but junior employees to only view parts of it. Let's say some well-known persons are shown in the graph, and there is a blank entry for a vertex, then it might be straightforward to workout who the missing entity is; if this placeholder is absent altogether, then there is no obvious indication that the chain stretches any further, thus allowing the agency to control dissemination of information. The graph is still of use to analysts, however, who are oblivious to the link and can continue working on specific areas of the graph.
Iterators are a very important feature in Accumulo and provide a real-time processing framework, which leverages the power and parallelization of Accumulo, to produce modified versions of data at very low latency. We won't go into great detail here as the Accumulo documentation has plenty of examples, but we will use an iterator to keep a sum of the values for the same Accumulo row, that is, the number of times we have seen the same person pair; and this will be stored in that row value. This iterator will then appear to take effect whenever the table is scanned; we will also demonstrate how to invoke the same Iterator from the client side (for use when it has not been applied to the server).
Let's take advantage of Spark's ability to use Hadoop input and output formats, which leverage the native Elasticsearch and Accumulo libraries. It is worth noting that there are different routes that we could take here, the first is to use the Elasticsearch code given earlier to produce an array of string tuples and feed that into AccumuloLoader
(found in the code repository); the second is to explore an alternative using additional Hadoop InputFormat
; we can produce code that reads from Elasticsearch using EsInputFormat
and writes to Accumulo using AccumuloOutputFormat
class.
Before delving into the code, it is worth describing the schema we will be using to store a graph of persons in Accumulo. Each source node (person A
) will be stored as a row key, the association name (such as "is also known as") as a column family, the destination node (person B
) as a column qualifier, and a default value of 1
as a column value (that will be aggregated thanks to our iterator). This is reported here in Figure 2:
The main advantage of such a model is that given an input vertex (a person's name), one can quickly access all its known relationships through a simple GET query. The reader will surely appreciate the cell level security where we hide a particular edge triplet [personA] <= [relationB] => [personD]
from most Accumulo users with no [SECRET
] authorization granted.
The downside of such a model is that, compared to a graph database (such as Neo4J or OrientDB), traversing queries such as a depth first search would be terribly inefficient (we would need multiple recursive queries). We delegate any graph processing logic to GraphX later in this chapter.
We use the following maven dependency in order to build both our input/output formats and our Spark client. The version obviously depends on the distribution of Hadoop and Accumulo installed.
<dependency> <groupId>org.apache.accumulo</groupId> <artifactId>accumulo-core</artifactId> <version>1.7.0<version> </dependency>
We configure for reading from Elasticsearch through the ESInputFormat
class. We extract a key-value pair RDD of Text
and MapWritable
, where the key contains the document ID and the value of all the JSON documents wrapped inside of a serializable HashMap:
val spark = SparkSession .builder() .appName("communities-loader") .getOrCreate() val sc = spark.sparkContext val hdpConf = sc.hadoopConfiguration // set the ES entry points hdpConf.set("es.nodes", "localhost:9200") hdpConf.set("es.resource", "gzet/articles") // Read map writable objects import org.apache.hadoop.io.Text import org.apache.hadoop.io.MapWritable import org.elasticsearch.hadoop.mr.EsInputFormat val esRDD: RDD[MapWritable] = sc.newAPIHadoopRDD( hdpConf, classOf[EsInputFormat[Text, MapWritable]], classOf[Text], classOf[MapWritable] ).values
An Accumulo mutation
is similar to a put
object in HBase, and contains the table's coordinates such as row key, column family, column qualifier, column value, and visibility. This object is built as follows:
def buildMutations(value: MapWritable) = { // Extract list of persons val people = value .get("person") .asInstanceOf[ArrayWritable] .get() .map(_.asInstanceOf[Text]) .map(_.toString) // Use a default Visibility val visibility = new ColumnVisibility("unclassified") // Build mutation on tuples buildTuples(people.toArray) .map { case (src, dst) => val mutation = new Mutation(src) mutation.put("associated", dst, visibility, "1") (new Text(accumuloTable), mutation) }
We use the aforementioned buildTuples
method to calculate our person pairs and write them to Accumulo using the Hadoop AccumuloOutputFormat
. Note that we can optionally apply a security label to each of our output rows using ColumnVisibility
; refer to Cell security, which we saw earlier.
We configure for writing to Accumulo. Our output RDD will be a key-value pair RDD of Text
and Mutation
, where the key contains the Accumulo table and the value the mutation to insert:
// Build Mutations val accumuloRDD = esRDD flatMap buildMutations // Save Mutations to Accumulo accumuloRDD.saveAsNewAPIHadoopFile( "", classOf[Text], classOf[Mutation], classOf[AccumuloOutputFormat] )
Now that we have our data in Accumulo, we can use the shell to inspect it (assuming we select a user that has enough privileges to see the data). Using the scan
command in Accumulo shell, we can simulate a specific user and query, therefore validating the results of io.gzet.community.accumulo.AccumuloReader
. When using the Scala version, we must ensure that the correct Authorization is used-it is passed into the read function via a String
, an example might be "secret,topsecret"
:
def read( sc: SparkContext, accumuloTable: String, authorization: Option[String] = None )
This method of applying Hadoop input/output format utilizes static
methods within the Java Accumulo library (AbstractInputFormat
is subclassed by InputFormatBase
, which is subclassed by AccumuloInputFormat
). Spark users must pay particular attention to these utility methods that alter the Hadoop configuration via an instance of a Job
object. This can be set as follows:
val hdpConf = sc.hadoopConfiguration val job = Job.getInstance(hdpConf) val clientConfig = new ClientConfiguration() .withInstance(accumuloInstance) .withZkHosts(zookeeperHosts) AbstractInputFormat.setConnectorInfo( job, accumuloUser, new PasswordToken(accumuloPassword) ) AbstractInputFormat.setZooKeeperInstance( job, clientConfig ) if(authorization.isDefined) { AbstractInputFormat.setScanAuthorizations( job, new Authorizations(authorization.get) ) } InputFormatBase.addIterator(job, is) InputFormatBase.setInputTableName(job, accumuloTable)
You will also notice the configuration of an Accumulo iterator:
val is = new IteratorSetting( 1, "summingCombiner", "org.apache.accumulo.core.iterators.user.SummingCombiner" ) is.addOption("all", "") is.addOption("columns", "associated") is.addOption("lossy", "TRUE") is.addOption("type", "STRING")
We can use client or server-side iterators and we have previously seen an example of server-side when configuring Accumulo via the shell. The key difference is that client-side Iterators are executed within the client JVM, as opposed to server-side, which leverage the power of the Accumulo tablet servers. A full explanation can be found in the Accumulo documentation. However, there are many reasons for selecting a client or server-side Iterator including choices over whether tablet server performance should be compromised, JVM memory usage, and so on. These decisions should be made when creating your Accumulo architecture. At the end of our AccumuloReader
code, we can see the calling function that produces an RDD of EdgeWritable
:
val edgeWritableRdd: RDD[EdgeWritable] = sc.newAPIHadoopRDD( job.getConfiguration, classOf[AccumuloGraphxInputFormat], classOf[NullWritable], classOf[EdgeWritable] ) values
We have implemented our own Accumulo InputFormat
to enable us to read Accumulo rows and automatically output our own Hadoop Writable
; EdgeWritable
. This provides for a convenience wrapper to hold our source vertex, our destination vertex, and the count as edge weight, which can then be used when building the graph. This is extremely useful as Accumulo uses the iterator discussed earlier to calculate the total count for each unique row, thereby removing the need to do this manually. As Accumulo is written in Java, our InputFormat
uses Java to extend InputFormatBase
, thus inheriting all of the Accumulo InputFormat
default behavior, but outputting our choice of schema.
We are only interested in outputting EdgeWritables
; therefore, we set all of the keys to be null (NullWritable
) and the values to EdgeWritable
, an additional advantage being that values in Hadoop only need to inherit from the Writable
Interface (although we have inherited WritableComparable
for completeness, and EdgeWritable
can therefore be used as a key, if required).
Because GraphX uses long objects as an underlying type for storing vertices and edges, we first need to translate all of the persons we fetched from Accumulo into a unique set of IDs. We assume our list of unique persons does not fit in memory, or wouldn't be efficient to do so anyway, so we simply build a distributed dictionary using the zipWithIndex
function as shown in the following code:
val dictionary = edgeWritableRdd .flatMap { edge => List(edge.getSourceVertex, edge.getDestVertex) } .distinct() .zipWithIndex() .mapValues { index => index + 1L } } dictionary.cache() dictionary.count() dictionary .take(3) .foreach(println) /* (david bowie, 1L) (yoko ono, 2L) (john lennon, 3L) */
We create an edge RDD using two successive join operations onto our person tuples and finally build our weighted and directed graph of persons with vertices containing the person name, and edge attributes the frequency count of each tuple.
val vertices = dictionary.map(_.swap) val edges = edgeWritableRdd .map { edge => (edge.getSourceVertex, edge) } .join(dictionary) .map { case (from, (edge, fromId)) => (edge.getDestVertex, (fromId, edge)) } .join(dictionary) .map { case (to, ((fromId, edge), toId)) => Edge(fromId, toId, edge.getCount.toLong) } val personGraph = Graph.apply(vertices, edges) personGraph.cache() personGraph.vertices.count() personGraph .triplets .take(2) .foreach(println) /* ((david bowie,1),(yoko ono,2),1) ((david bowie,1),(john lennon,3),1) ((yoko ono,2),(john lennon,3),1) */
3.143.239.234