Analyzing flight data using GraphX

Let's analyze flight data by representing the airports as vertices and routes as edges. Let's do some basic graph analytics to find out departures and arrivals and also analyze the data with the Pregel API to find out the cheapest fares. Download the flight data from the following location:

http://www.transtats.bts.gov/DL_SelectFields.asp?Table_ID=236&DB_Short_Name=On-Time

The steps to analyze the data are as follows:

  1. Select OriginAirportID, Origin, DestAirportID, Dest, and Distance then click Download. Copy the ZIP file onto the cluster, unzip it, and then copy the contents to HDFS:
    unzip 355968671_T_ONTIME.zip
    
    hadoop fs -put 355968671_T_ONTIME.csv
    
  2. Get into the Scala shell using the spark-shell command and then import all dependencies, as follows:
    scala> import org.apache.spark.graphx._
    scala> import org.apache.spark.rdd.RDD
    
  3. Define a Scala case class for the flight schema corresponding to the CSV data file:
    scala> case class Flight(org_id:Long, origin:String, dest_id:Long, dest:String, dist:Float)
    
  4. Let's define a function to parse CSV data into the Flight class:
    scala> def parseFlightCsv(str: String): Flight = {
      val line = str.split(",")
      Flight(line(0).toLong, line(1), line(2).toLong, line(3), line(4).toFloat)
    }
    
  5. Let's create csvRDD by reading the input file and then remove the header:
    scala> val csvRDD = sc.textFile("355968671_T_ONTIME.csv")
    scala> val noHdrRDD = csvRDD.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter }
    
    scala> val flightsRDD = noHdrRDD.map(parseFlightCsv)
    
  6. Let's define airports as vertices (Airport ID and Airport name) and a default vertex called nowhere. Map the airport ID to the three-letter code for mapping later:
    scala> val airports = flightsRDD.map(flight => (flight.org_id, flight.origin)).distinct
    
    scala> airports.take(3)
    res26: Array[(Long, String)] = Array((14122,"PIT"), (10141,"ABR"), (13158,"MAF"))
    
    scala> val default = "nowhere"
    
    scala> val airportMap = airports.map { case ((org_id), name) => (org_id -> name) }.collect.toList.toMap
    
  7. The edges are the routes between airports in this case. An edge must have a source, a destination, and property such as the distance in this case:
    scala> val flightRoutes = flightsRDD.map(flight => ((flight.org_id, flight.dest_id), flight.dist)).distinct
    
    scala> val edges = flightRoutes.map {
    case ((org_id, dest_id), distance) =>Edge(org_id.toLong, dest_id.toLong, distance) }
    
    scala> edges.take(1)
    
    // Array(Edge(10299,10926,160))
    
  8. Create a graph using VertexRDD, EdgeRDD, and a default vertex:
    scala> val graph = Graph(airports, edges, default)
    
  9. Let's do some basic analytics using this graph. Find which airports have the most incoming flights:
    scala> val maxIncoming = graph.inDegrees.collect.sortWith(_._2 > _._2).map(x => (airportMap(x._1), x._2)).take(3)
    
    scala> maxIncoming.foreach(println)
    (ATL,152)
    (ORD,145)
    (DFW,143)
    
  10. Now, let's find out which airport has the most outgoing flights:
    scala> val maxout= graph.outDegrees.join(airports).sortBy(_._2._1, ascending=false).take(3)
    
    scala> maxout.foreach(println)
    (10397,(153,ATL))
    (13930,(146,ORD))
    (11298,(143,DFW))
    
  11. Find the top three flights from a source airport to a destination airport:
    scala> graph.triplets.sortBy(_.attr, ascending=false).map(triplet => "There were " + triplet.attr.toInt + " flights from " + triplet.srcAttr + " to " + triplet.dstAttr + ".").take(3) .foreach(println)
    
    There were 4983 flights from "JFK" to "HNL".
    There were 4983 flights from "HNL" to "JFK".
    There were 4962 flights from "EWR" to "HNL".
    

Pregel API

Many important graph algorithms are iterative algorithms, since the properties of vertices depend on the properties of their neighbors, which depend on the properties of their neighbors. Pregel is an iterative graph processing model, developed at Google, which uses a sequence of iterations of message passing between vertices in a graph. GraphX implements a Pregel-like bulk-synchronous message-passing API.

With the Pregel implementation in GraphX, vertices can only send messages to neighboring vertices.

The Pregel operator is executed in a series of super steps. In each super step:

  • The vertices receive the sum of their inbound messages from the previous super step
  • Compute a new value for the vertex property
  • Send messages to their neighboring vertices in the next super step

When there are no more messages remaining, the Pregel operator will end the iteration and the final graph is returned. The following code computes the cheapest airfare using the Pregel API:

scala> val sourceId: VertexId = 13024 //  starting vertex

A graph with edges containing an airfare cost calculation as 50 + distance / 20 is as follows:

scala> val gg = graph.mapEdges(e => 50.toDouble + e.attr.toDouble/20  )

Initialize the graph; all vertices except source have the distance as infinity:

scala> val initialGraph = gg.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)

Now, call pregel on the graph:

scala> val sssp = initialGraph.pregel(Double.PositiveInfinity)(
   // Vertex Program
  (id, dist, newDist) => math.min(dist, newDist), 
  triplet => {  
   // Send Message
    if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
      Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
    } else {
      Iterator.empty
    }
  },
  // Merge Message
  (a,b) => math.min(a,b) 
)

Now, print the routes with the lowest flight cost:

scala> println(sssp.edges.take(4).mkString("
"))
Edge(10135,10397,84.6)
Edge(10135,13930,82.7)
Edge(10140,10397,113.45)
Edge(10140,10821,133.5)

Find the routes with airport codes and the lowest flight cost:

scala> sssp.edges.map{ case ( Edge(org_id, dest_id,price))=> ( (airportMap(org_id), airportMap(dest_id), price)) }.takeOrdered(10)(Ordering.by(_._3))

Array((WRG,PSG,51.55), (PSG,WRG,51.55), (CEC,ACV,52.8), (ACV,CEC,52.8), (ORD,MKE,53.35), (IMT,RHI,53.35), (MKE,ORD,53.35), (RHI,IMT,53.35), (STT,SJU,53.4), (SJU,STT,53.4))

Find airports with the lowest flight cost:

scala> println(sssp.vertices.take(4).mkString("
"))

(10208,277.79)
(10268,260.7)
(14828,261.65)
(14698,125.25)

Find airport codes sorted by the lowest flight cost:

scala> sssp.vertices.collect.map(x => (airportMap(x._1), x._2)).sortWith(_._2 < _._2)
res21: Array[(String, Double)] = Array(PDX,62.05), (SFO,65.75), (EUG,117.35)
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.133.128.145