データのダウンロード
書籍通り、米国運輸省のサイトからダウンロードします。
https://www.transtats.bts.gov/DL_SelectFields.asp?Table_ID=236&DB_Short_Name=On-Time
On-Time : Reporting Carrier On-Time Performance (1987-present)
は
- Filter Geography : All
- Filter Year : 2017
- Filter Period : December
項目が若干更新されているので、以下の17項目をチェックする。
- Time Period
- DayofMonth
- DayOfWeek
- Airline
- Reporting_Airline
- Tail_Number
- Flight_Number_Reporting_Airline
- Origin
- OriginAirportID
- Origin
- Destination
- DestAirportID
- Dest
- Departure Performance
- CRSDepTime
- DepTime
- DepDelayMinutes
- Arrival Performance
- CRSArrTime
- ArrTime
- ArrDelayMinutes
- Flight Summaries
- CRSElapsedTime
- Distance
とし、画面右上のDownloadボタンよりダウンロードが行える。
データの前処理
ダウンロードしたデータを解凍後、Sparkで処理し易い用、前処理を施します。
ファイル名を変更
$ mv 469381788_T_ONTIME_REPORTING.csv flight.csv
ヘッダーを削除
$ sed -i '1d' flight.csv
"" ダブルクォート
を削除
$ sed -i "s/\"//g" flight.csv
行末の, カンマ
を削除
$ sed -i 's/,$//g' flight.csv
欠損値のある行、,,
が連続する
$ sed -i "/,,/d" flight.csv
整数値につく浮動少数を削除
$ sed -i "s/\.00//g" flight.csv $ sed -i "s/\.0//g" flight.csv
分析アプリケーションの作成
空港データのRDDとグラフデータの作成
main-core.scala
import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.util.IntParam import org.apache.spark.graphx._ import org.apache.spark.graphx.util.GraphGenerators case class F( dofM:String, dofW:String, carrier:String, tailnum:String, flnum:Int, src_id:Long, origin:String, dest_id:Long, dest:String, crsdeptime:Double, deptime:Double, depdelaymins:Double, crsarrtime:Double, arrtime:Double, arrdelay:Double, crselapsedtime:Double, dist:Int ) def parseF(str: String): F = { val L = str.split(",") F( L(0), L(1), L(2), L(3), L(4).toInt, L(5).toLong, L(6), L(7).toLong, L(8), L(9).toDouble, L(10).toDouble, L(11).toDouble, L(12).toDouble, L(13).toDouble, L(14).toDouble, L(15).toDouble, L(16).toInt ) } val textRDD = sc.textFile("/spark/work/flight.csv") // ノードする空港情報 val airports = textRDD.map(parseF).cache().map(F => (F.src_id, F.origin)).distinct // edgeとする路線情報 val R = textRDD.map(parseF).cache().map(F => ((F.src_id, F.dest_id), F.dist)).distinct val nowhere = "nowhere" R.cache // 空港間を繋ぐ辺を作成。edgeには距離を情報として含める val edges = R.map { case ((src_id, dest_id), distance) => Edge(src_id.toLong, dest_id.toLong, distance)} // graphオブジェクトの作成 val graph = Graph(airports, edges, nowhere)
graphオブジェクトを利用した各分析データの出力
analytics.scala
println("空港の数を表示") graph.numVertices println("ルートの数を表示") graph.numEdges println("最長ルートの上位5件を表示(出発空港コード, 到着空港コード, 距離)") val N = 5 // tripletsはエッジを始点および終点プロパティを含むよう拡張する // http://mogile.web.fc2.com/spark/graphx-programming-guide.html graph.triplets.sortBy(_.attr, ascending=false).map(triplet => triplet.srcAttr + "から" + triplet.dstAttr + "までの距離は" + triplet.attr.toString + "マイル" ).take(N).foreach(println) println("4500マイルを超える距離のルートを表示") println("出発空港ID, 到着空港ID, マイル数") val M = 4500 val N = 100 graph.edges.filter{ case ( Edge(src_id, dest_id, distance)) => distance > M }.take(N).foreach(println) println("出発便数が多い空港上位5箇所を表示") println("空港ID, 出発便数, 空港コード") val N=5 // outDegrees 外へ向かうエッジの数 joinはspark dfの結合 val maxout = graph.outDegrees.join(airports).sortBy(_._2._1, ascending=false) maxout.take(N).foreach(println) println("他の空港との接続が最も多い空港を5件表示") val N=5 // pageRankはノードがリンクしているエッジに応じてランク付けする。みたい。 // https://qiita.com/AKB428/items/bf1cd05d6cf3e23986d9 val impAirports = graph.pageRank(0.1).vertices.join(airports).sortBy(_._2._1, false).map(_._2._2) impAirports.take(N).foreach(println)
プログラムの実行
$ /spark/bin/spark-shell -i ./main-core.scala ... ... ... scala > :load analytics.scala Loading analytics.scala... 空港の数を表示 res1: Long = 294 ルートの数を表示 res3: Long = 4153 最長ルートの上位5件を表示(出発空港コード, 到着空港コード, 距離) N: Int = 5 JFKからHNLまでの距離は4983マイル HNLからJFKまでの距離は4983マイル EWRからHNLまでの距離は4962マイル HNLからEWRまでの距離は4962マイル HNLからIADまでの距離は4817マイル 4500マイルを超える距離のルートを表示 出発空港ID, 到着空港ID, マイル数 M: Int = 4500 N: Int = 100 Edge(10397,12173,4502) Edge(11618,12173,4962) Edge(12173,11618,4962) Edge(12478,12173,4983) Edge(12173,10397,4502) Edge(12173,12264,4817) Edge(12173,12478,4983) Edge(12264,12173,4817) 出発便数が多い空港上位5箇所を表示 空港ID, 出発便数, 空港コード N: Int = 5 maxout: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, (Int, String))] = MapPartitionsRDD[51] at sortBy at analytics.scala:35 (10397,(151,ATL)) (13930,(148,ORD)) (11292,(124,DEN)) (13487,(112,MSP)) (11298,(106,DFW)) 他の空港との接続が最も多い空港を5件表示 N: Int = 5 impAirports: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[207] at map at analytics.scala:35 ATL ORD MSP DEN IAH
リンク