書籍で紹介があったSpark Streamingの動作を試してみます。
HDFSは構築するのが億劫なので、masterノード上にある/tmp/dir01
をウォッチします。ファイルの作成を検知した時、" "
で区切られた単語を集計する簡単なプログラム。
streaming.py
# -*- coding:utf-8 -*- from pyspark.context import SparkContext from pyspark import StorageLevel from pyspark.streaming import StreamingContext sc = SparkContext("local", "myAppName") ssc = StreamingContext(sc, 10) L = ssc.textFileStream("/tmp/dir01") W = L.flatMap(lambda line: line.split(" ")).filter(lambda x:x) WC = W.map(lambda word: (word, 1)).reduceByKey(lambda x, y: x + y) WC.pprint() ssc.start() ssc.awaitTermination()
プログラムの実行
$ spark/bin/spark-submit --master local streaming.py
ファイルが、StreamingContext(sc, 10) #10秒間隔
内に作成されなかった場合は確認した時間のみ表示されていく。
------------------------------------------- Time: 2020-08-13 05:43:30 ------------------------------------------- ------------------------------------------- Time: 2020-08-13 05:43:40 -------------------------------------------
ウォッチディレクトリにファイルを作成
Spark実行コンソールとは別のコンソールから、ファイルを作成
$ echo "Yui Ichikawa - City: Kochi - Country: Japan" > /tmp/dir01/file04
------------------------------------------- Time: 2020-08-13 05:48:20 ------------------------------------------- (u'Yui', 1) (u'Ichikawa', 1) (u'-', 2) (u'City:', 1) (u'Kochi', 1) (u'Country:', 1) (u'Japan', 1)
のように単語ごとの出現数が出力される。
別ファイルを作成
$ echo "Yui Ichikawa Rin Ichikawa - City: Kochi - Country: Japan" > /tmp/dir01/file05
------------------------------------------- Time: 2020-08-13 05:49:00 ------------------------------------------- (u'City:', 1) (u'Ichikawa', 2) (u'-', 2) (u'Yui', 1) (u'Rin', 1) (u'Kochi', 1) (u'Country:', 1) (u'Japan', 1)