Spark graphxで空港路線情報を解析する

個人開発したアプリの宣伝
目的地が設定できる手帳のような使い心地のTODOアプリを公開しています。
Todo with Location

Todo with Location

  • Yoshiko Ichikawa
  • Productivity
  • Free

スポンサードリンク

データのダウンロード

書籍通り、米国運輸省のサイトからダウンロードします。

https://www.transtats.bts.gov/DL_SelectFields.asp?Table_ID=236&DB_Short_Name=On-Time

On-Time : Reporting Carrier On-Time Performance (1987-present)

  • Filter Geography : All
  • Filter Year : 2017
  • Filter Period : December

項目が若干更新されているので、以下の17項目をチェックする。

  • Time Period
    • DayofMonth
    • DayOfWeek
  • Airline
    • Reporting_Airline
    • Tail_Number
    • Flight_Number_Reporting_Airline
  • Origin
    • OriginAirportID
    • Origin
  • Destination
    • DestAirportID
    • Dest
  • Departure Performance
    • CRSDepTime
    • DepTime
    • DepDelayMinutes
  • Arrival Performance
    • CRSArrTime
    • ArrTime
    • ArrDelayMinutes
  • Flight Summaries
    • CRSElapsedTime
    • Distance

とし、画面右上のDownloadボタンよりダウンロードが行える。


データの前処理

ダウンロードしたデータを解凍後、Sparkで処理し易い用、前処理を施します。

ファイル名を変更

$ mv 469381788_T_ONTIME_REPORTING.csv flight.csv

ヘッダーを削除

$ sed -i '1d' flight.csv 

"" ダブルクォートを削除

$ sed -i "s/\"//g" flight.csv

行末の, カンマを削除

$ sed -i 's/,$//g' flight.csv 

欠損値のある行、,,が連続する

$ sed -i "/,,/d" flight.csv 

整数値につく浮動少数を削除

$ sed -i "s/\.00//g" flight.csv 
$ sed -i "s/\.0//g" flight.csv 


分析アプリケーションの作成

空港データのRDDとグラフデータの作成

main-core.scala

import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.util.IntParam
import org.apache.spark.graphx._
import org.apache.spark.graphx.util.GraphGenerators

case class F(
  dofM:String, dofW:String, carrier:String, tailnum:String, flnum:Int,
  src_id:Long, origin:String, dest_id:Long, dest:String, crsdeptime:Double,
  deptime:Double, depdelaymins:Double, crsarrtime:Double, arrtime:Double,
  arrdelay:Double, crselapsedtime:Double, dist:Int
)

def parseF(str: String): F = {
  val L = str.split(",")
  F(
    L(0), L(1), L(2), L(3), L(4).toInt, L(5).toLong, L(6), L(7).toLong,
    L(8), L(9).toDouble, L(10).toDouble, L(11).toDouble, L(12).toDouble,
    L(13).toDouble, L(14).toDouble, L(15).toDouble, L(16).toInt
  )
}

val textRDD = sc.textFile("/spark/work/flight.csv")
// ノードする空港情報
val airports = textRDD.map(parseF).cache().map(F => (F.src_id, F.origin)).distinct
// edgeとする路線情報
val R = textRDD.map(parseF).cache().map(F => ((F.src_id, F.dest_id), F.dist)).distinct
val nowhere = "nowhere"
R.cache

// 空港間を繋ぐ辺を作成。edgeには距離を情報として含める
val edges = R.map { case ((src_id, dest_id), distance) => Edge(src_id.toLong, dest_id.toLong, distance)}
// graphオブジェクトの作成
val graph = Graph(airports, edges, nowhere)


graphオブジェクトを利用した各分析データの出力

analytics.scala

println("空港の数を表示")
graph.numVertices

println("ルートの数を表示")
graph.numEdges

println("最長ルートの上位5件を表示(出発空港コード, 到着空港コード, 距離)")
val N = 5
// tripletsはエッジを始点および終点プロパティを含むよう拡張する
// http://mogile.web.fc2.com/spark/graphx-programming-guide.html
graph.triplets.sortBy(_.attr, ascending=false).map(triplet =>
  triplet.srcAttr + "から" +
  triplet.dstAttr + "までの距離は" +
  triplet.attr.toString + "マイル" ).take(N).foreach(println)


println("4500マイルを超える距離のルートを表示")
println("出発空港ID, 到着空港ID, マイル数")
val M = 4500
val N = 100
graph.edges.filter{
  case ( Edge(src_id, dest_id, distance)) => distance > M
}.take(N).foreach(println)

println("出発便数が多い空港上位5箇所を表示")
println("空港ID, 出発便数, 空港コード")
val N=5
// outDegrees 外へ向かうエッジの数 joinはspark dfの結合
val maxout = graph.outDegrees.join(airports).sortBy(_._2._1, ascending=false)
maxout.take(N).foreach(println)

println("他の空港との接続が最も多い空港を5件表示")
val N=5
// pageRankはノードがリンクしているエッジに応じてランク付けする。みたい。
// https://qiita.com/AKB428/items/bf1cd05d6cf3e23986d9
val impAirports = graph.pageRank(0.1).vertices.join(airports).sortBy(_._2._1, false).map(_._2._2)
impAirports.take(N).foreach(println)


プログラムの実行

$ /spark/bin/spark-shell -i ./main-core.scala

...
...
...

scala > :load analytics.scala
Loading analytics.scala...
空港の数を表示
res1: Long = 294                                                                
ルートの数を表示
res3: Long = 4153
最長ルートの上位5件を表示(出発空港コード, 到着空港コード, 距離)
N: Int = 5
JFKからHNLまでの距離は4983マイル
HNLからJFKまでの距離は4983マイル
EWRからHNLまでの距離は4962マイル
HNLからEWRまでの距離は4962マイル
HNLからIADまでの距離は4817マイル
4500マイルを超える距離のルートを表示
出発空港ID, 到着空港ID, マイル数
M: Int = 4500
N: Int = 100
Edge(10397,12173,4502)
Edge(11618,12173,4962)
Edge(12173,11618,4962)
Edge(12478,12173,4983)
Edge(12173,10397,4502)
Edge(12173,12264,4817)
Edge(12173,12478,4983)
Edge(12264,12173,4817)
出発便数が多い空港上位5箇所を表示
空港ID, 出発便数, 空港コード
N: Int = 5
maxout: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, (Int, String))] = MapPartitionsRDD[51] at sortBy at analytics.scala:35
(10397,(151,ATL))
(13930,(148,ORD))
(11292,(124,DEN))
(13487,(112,MSP))
(11298,(106,DFW))
他の空港との接続が最も多い空港を5件表示
N: Int = 5
impAirports: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[207] at map at analytics.scala:35
ATL
ORD
MSP
DEN
IAH