Aggregation use case

In this rather lengthy section, we will use the aggregation framework to process data from the Ethereum blockchain.

Using our Python code from https://github.com/agiamas/mastering-mongodb/tree/master/chapter_5, we have extracted data from Ethereum and loaded it into our MongoDB database.

Our data resides in two collections, blocks and transactions.

A sample block document has the following fields:

  • Number of transactions
  • Number of contract internal transactions
  • Block hash
  • Parent block hash
  • Mining difficulty
  • Gas used
  • Block height
> db.blocks.findOne()
{
"_id" : ObjectId("595368fbcedea89d3f4fb0ca"),
"number_transactions" : 28,
"timestamp" : NumberLong("1498324744877"),
"gas_used" : 4694483,
"number_internal_transactions" : 4,
"block_hash" : "0x89d235c4e2e4e4978440f3cc1966f1ffb343b9b5cfec9e5cebc331fb810bded3",
"difficulty" : NumberLong("882071747513072"),
"block_height" : 3923788
}

A sample transaction document has the following fields:

  • Transaction hash
  • Block height it belongs to
  • From hash address
  • To hash address
  • Transaction value
  • Transaction fee
> db.transactions.findOne()
{
"_id" : ObjectId("59535748cedea89997e8385a"),
"from" : "0x3c540be890df69eca5f0099bbedd5d667bd693f3",
"txfee" : 28594,
"timestamp" : ISODate("2017-06-06T11:23:10Z"),
"value" : 0,
"to" : "0x4b9e0d224dabcc96191cace2d367a8d8b75c9c81",
"txhash" : "0xf205991d937bcb60955733e760356070319d95131a2d9643e3c48f2dfca39e77",
"block" : 3923794
}

Sample data for our database is available on GitHub: https://github.com/agiamas/mastering-mongodb.

The code used to import this data into MongoDB is available here: 

https://github.com/agiamas/mastering-mongodb/tree/master/chapter_5.

As curious developers in this novel blockchain technology, we want to analyze Ethereum transactions. We are especially keen to:

  • Find the top 10 addresses that transactions originate from
  • Find the top 10 addresses that transactions end in
  • Find the average value per transaction, with statistics around deviation
  • Find the average fee required per transaction, with statistics around deviation
  • Find the time of day that the network is more active, by number of transactions or value of transactions
  • Find the day of week that the network is more active, by number of transactions or value of transactions

We find the top 10 addresses that transactions originate from. To calculate this metric, we first count the number of occurrences with a 1 count for each one, group them by the value of the from field and output them into a new field called count.

Following that, we sort by the value of the count field in descending (-1) order and finally we limit the output to the first 10 documents that pass through the pipeline. These documents are the top 10 addresses that we are looking for.

Sample Python code is shown here:

   def top_ten_addresses_from(self):
pipeline = [
{"$group": {"_id": "$from", "count": {"$sum": 1}}},
{"$sort": SON([("count", -1)])},
{"$limit": 10},
]
result = self.collection.aggregate(pipeline)
for res in result:
print(res)

{u'count': 38, u'_id': u'miningpoolhub_1'}
{u'count': 31, u'_id': u'Ethermine'}
{u'count': 30, u'_id': u'0x3c540be890df69eca5f0099bbedd5d667bd693f3'}
{u'count': 27, u'_id': u'0xb42b20ddbeabdc2a288be7ff847ff94fb48d2579'}
{u'count': 25, u'_id': u'ethfans.org'}
{u'count': 16, u'_id': u'Bittrex'}
{u'count': 8, u'_id': u'0x009735c1f7d06faaf9db5223c795e2d35080e826'}
{u'count': 8, u'_id': u'Oraclize'}
{u'count': 7, u'_id': u'0x1151314c646ce4e0efd76d1af4760ae66a9fe30f'}
{u'count': 7, u'_id': u'0x4d3ef0e8b49999de8fa4d531f07186cc3abe3d6e'}

Now we find the top 10 addresses that transactions end in. Similar to from, the calculation for the to addresses is exactly the same, only grouping by the to field instead of from:

   def top_ten_addresses_to(self):
pipeline = [
{"$group": {"_id": "$to", "count": {"$sum": 1}}},
{"$sort": SON([("count", -1)])},
{"$limit": 10},
]
result = self.collection.aggregate(pipeline)
for res in result:
print(res)

{u'count': 33, u'_id': u'0x6090a6e47849629b7245dfa1ca21d94cd15878ef'}
{u'count': 30, u'_id': u'0x4b9e0d224dabcc96191cace2d367a8d8b75c9c81'}
{u'count': 25, u'_id': u'0x69ea6b31ef305d6b99bb2d4c9d99456fa108b02a'}
{u'count': 23, u'_id': u'0xe94b04a0fed112f3664e45adb2b8915693dd5ff3'}
{u'count': 22, u'_id': u'0x8d12a197cb00d4747a1fe03395095ce2a5cc6819'}
{u'count': 18, u'_id': u'0x91337a300e0361bddb2e377dd4e88ccb7796663d'}
{u'count': 13, u'_id': u'0x1c3f580daeaac2f540c998c8ae3e4b18440f7c45'}
{u'count': 12, u'_id': u'0xeef274b28bd40b717f5fea9b806d1203daad0807'}
{u'count': 9, u'_id': u'0x96fc4553a00c117c5b0bed950dd625d1c16dc894'}
{u'count': 9, u'_id': u'0xd43d09ec1bc5e57c8f3d0c64020d403b04c7f783'}

Let's find the average value per transaction, with statistics around deviation. In this example, we are using the $avg and $stdDevPop operators of the values of the value field to calculate statistics for this field. Using a simple $group operation, we output a single document with the id of our choice (here value) and the averageValues.

   def average_value_per_transaction(self):
pipeline = [
{"$group": {"_id": "value", "averageValues": {"$avg": "$value"}, "stdDevValues": {"$stdDevPop": "$value"}}},
]
result = self.collection.aggregate(pipeline)
for res in result:
print(res)

{u'averageValues': 5.227238976440972, u'_id': u'value', u'stdDevValues': 38.90322689649576}

Let's find the average fee required per transaction, with statistics around deviation. Average fees are similar to average values, replacing $value with $txfee:

   def average_fee_per_transaction(self):
pipeline = [
{"$group": {"_id": "value", "averageFees": {"$avg": "$txfee"}, "stdDevValues": {"$stdDevPop": "$txfee"}}},
]
result = self.collection.aggregate(pipeline)
for res in result:
print(res)

{u'_id': u'value', u'averageFees': 320842.0729166667, u'stdDevValues': 1798081.7305142984}

We find the time of day that the network is more active, by number of transactions or value of transactions.

To find out the most active hour for transactions, we use the $hour operator to extract the hour field from the ISODate() field in which we stored our datetime values and called timestamp.

   def active_hour_of_day_transactions(self):
pipeline = [
{"$group": {"_id": {"$hour": "$timestamp"}, "transactions": {"$sum": 1}}},
{"$sort": SON([("transactions", -1)])},
{"$limit": 1},
]
result = self.collection.aggregate(pipeline)
for res in result:
print(res)

{u'_id': 11, u'transactions': 34}


def active_hour_of_day_values(self):
pipeline = [
{"$group": {"_id": {"$hour": "$timestamp"}, "transaction_values": {"$sum": "$value"}}},
{"$sort": SON([("transactions", -1)])},
{"$limit": 1},
]
result = self.collection.aggregate(pipeline)
for res in result:
print(res)

{u'transaction_values': 33.17773841, u'_id': 20}

Let's find the day of week that the network is more active, by number of transactions or value of transactions. Similar to the hour of day, we use the $dayOfWeek operator to extract the day of week from ISODate() objects. Days are numbered 1 for Sunday to 7 for Saturday, following the US convention.

   def active_day_of_week_transactions(self):
pipeline = [
{"$group": {"_id": {"$dayOfWeek": "$timestamp"}, "transactions": {"$sum": 1}}},
{"$sort": SON([("transactions", -1)])},
{"$limit": 1},
]
result = self.collection.aggregate(pipeline)
for res in result:
print(res)

{u'_id': 3, u'transactions': 92}


def active_day_of_week_values(self):
pipeline = [
{"$group": {"_id": {"$dayOfWeek": "$timestamp"}, "transaction_values": {"$sum": "$value"}}},
{"$sort": SON([("transactions", -1)])},
{"$limit": 1},
]
       result = self.collection.aggregate(pipeline)
for res in result:
print(res)

{u'transaction_values': 547.62439312, u'_id': 2}

The aggregations that we calculated can be described in the figure here:

In terms of blocks, we would like to know:

  • Average number of transactions per block, for both total overall transactions and also total contract internal transactions
  • Average gas used per block
  • Average difficulty per block and how it deviates

Average number of transactions per block, both in total and also in contract internal transactions. Averaging over the number_transactions field we can get the number of transactions per block as illustrated here:

   def average_number_transactions_total_block(self):
pipeline = [
{"$group": {"_id": "average_transactions_per_block", "count": {"$avg": "$number_transactions"}}},
]
result = self.collection.aggregate(pipeline)
for res in result:
print(res)

{u'count': 39.458333333333336, u'_id': u'average_transactions_per_block'}


   def average_number_transactions_internal_block(self):
pipeline = [
{"$group": {"_id": "average_transactions_internal_per_block", "count": {"$avg": "$number_internal_transactions"}}},
]
result = self.collection.aggregate(pipeline)
for res in result:
print(res)

{u'count': 8.0, u'_id': u'average_transactions_internal_per_block'}

Average gas used per block:

def average_gas_block(self):
pipeline = [
{"$group": {"_id": "average_gas_used_per_block",
"count": {"$avg": "$gas_used"}}},
]
result = self.collection.aggregate(pipeline)
for res in result:
print(res)

{u'count': 2563647.9166666665, u'_id': u'average_gas_used_per_block'}

Average difficulty per block and how it deviates:

  def average_difficulty_block(self):
pipeline = [
{"$group": {"_id": "average_difficulty_per_block",
"count": {"$avg": "$difficulty"}, "stddev": {"$stdDevPop": "$difficulty"}}},
]
result = self.collection.aggregate(pipeline)
for res in result:
print(res)

{u'count': 881676386932100.0, u'_id': u'average_difficulty_per_block', u'stddev': 446694674991.6385}

Our aggregations are described in the following schema:

Now that we have basic statistics calculated, we want to up our game and identify more information about our transactions. Through our sophisticated machine learning algorithms, we have identified some of the transactions as either scam or initial coin offering (ICO) or maybe both.

In these documents, we have marked these attributes in an array called tags like this;

{
"_id" : ObjectId("59554977cedea8f696a416dd"),
"to" : "0x4b9e0d224dabcc96191cace2d367a8d8b75c9c81",
"txhash" : "0xf205991d937bcb60955733e760356070319d95131a2d9643e3c48f2dfca39e77",
"from" : "0x3c540be890df69eca5f0099bbedd5d667bd693f3",
"block" : 3923794,
"txfee" : 28594,
"timestamp" : ISODate("2017-06-10T09:59:35Z"),
"tags" : [
"scam",
"ico"
],
"value" : 0
}

Now we want to get the transactions from June 2017, remove the _id field, and produce different documents according to the tags that we have identified. So, in the example of the aforementioned mentioned document, we would output two documents in our new collection scam_ico_documents for separate processing.

The way to do this via the aggregation framework is shown here:

   def scam_or_ico_aggregation(self):
pipeline = [
{"$match": {"timestamp": {"$gte": datetime.datetime(2017,06,01), "$lte": datetime.datetime(2017,07,01)}}},
{"$project": {
"to": 1,
"txhash": 1,
"from": 1,
"block": 1,
"txfee": 1,
"tags": 1,
"value": 1,
"report_period": "June 2017",
"_id": 0,
}

},
{"$unwind": "$tags"},
{"$out": "scam_ico_documents"}
]
result = self.collection.aggregate(pipeline)
for res in result:
print(res)

Here we have four distinct steps in our aggregation framework pipeline:

  1. Using $match, we only extract documents that have a field timestamp value of June 1st 2017.
  2. Using project, we add a new  report_period field with a value of June 2017 and remove the _id field by setting its value to 0. We keep the rest of the fields intact by using the value 1, as shown.
  3. Using $unwind, we output one new document per tag in our $tags array.
  4. Finally, using $out, we output all of our documents to a new scam_ico_documents collection.

Since we used the $out operator we will get no results in the command line. If we comment out {"$out": "scam_ico_documents"}, we get result documents that look like this:

{u'from': u'miningpoolhub_1', u'tags': u'scam', u'report_period': u'June 2017', u'value': 0.52415349, u'to': u'0xdaf112bcbd38d231b1be4ae92a72a41aa2bb231d', u'txhash': u'0xe11ea11df4190bf06cbdaf19ae88a707766b007b3d9f35270cde37ceccba9a5c', u'txfee': 21.0, u'block': 3923785}

The final result in our database will look like this:

{
"_id" : ObjectId("5955533be9ec57bdb074074e"),
"to" : "0x4b9e0d224dabcc96191cace2d367a8d8b75c9c81",
"txhash" : "0xf205991d937bcb60955733e760356070319d95131a2d9643e3c48f2dfca39e77",
"from" : "0x3c540be890df69eca5f0099bbedd5d667bd693f3",
"block" : 3923794,
"txfee" : 28594,
"tags" : "scam",
"value" : 0,
"report_period" : "June 2017"
}

Now that we have documents clearly separated in the scam_ico_documents collection, we can perform further analysis pretty easily. An example of this analysis would be to append more information on some of these scammers. Luckily, our data scientists have come up with some additional information, which we have extracted into a new collection scam_details, looking like this:

{
"_id" : ObjectId("5955510e14ae9238fe76d7f0"),
"scam_address" : "0x3c540be890df69eca5f0099bbedd5d667bd693f3",
Email_address": [email protected]"
}

We can now create a new aggregation pipeline job to join our scam_ico_documents with the scam_details collection and output these extended results in a new collection, scam_ico_documents_extended, like this:

  def scam_add_information(self):
client = MongoClient()
db = client.mongo_book
scam_collection = db.scam_ico_documents
pipeline = [
{"$lookup": {"from": "scam_details", "localField": "from", "foreignField": "scam_address", "as": "scam_details_data"}},
{"$match": {"scam_details_data": { "$ne": [] }}},
{"$out": "scam_ico_documents_extended"}
]
scam_collection.aggregate(pipeline)

Here we are using a three-step aggregation pipeline:

  1. Use the $lookup command to join data from the scam_details collection and scam_address field with data from our local collection (scam_ico_documents) based on the value from the local collection attribute from being equal to the value in the scam_details collection scam_address field.
    If these are equal, the pipeline adds a new field to the document named scam_details_data.
  2. Next, we only match the documents that have a scam_details_data field, the ones that matched with the lookup aggregation framework step.
  3. Finally, we output these documents in a new collection called scam_ico_documents_extended.

These documents now look like this:

> db.scam_ico_documents_extended.findOne()
{
"_id" : ObjectId("5955533be9ec57bdb074074e"),
"to" : "0x4b9e0d224dabcc96191cace2d367a8d8b75c9c81",
"txhash" : "0xf205991d937bcb60955733e760356070319d95131a2d9643e3c48f2dfca39e77",
"from" : "0x3c540be890df69eca5f0099bbedd5d667bd693f3",
"block" : 3923794,
"txfee" : 28594,
"tags" : "scam",
"value" : 0,
"report_period" : "June 2017",
"scam_details_data" : [
{
"_id" : ObjectId("5955510e14ae9238fe76d7f0"),
"scam_address" : "0x3c540be890df69eca5f0099bbedd5d667bd693f3",
email_address": [email protected]"
}]}

Using the aggregation framework, we have identified our data and can process it rapidly and efficiently.

The previous examples can be summed up in the following diagram:

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.12.107.31