PythonでSpark Streamingの動作を試す

個人開発したアプリの宣伝
目的地が設定できる手帳のような使い心地のTODOアプリを公開しています。
Todo with Location

Todo with Location

  • Yoshiko Ichikawa
  • Productivity
  • Free

スポンサードリンク

書籍で紹介があったSpark Streamingの動作を試してみます。

HDFSは構築するのが億劫なので、masterノード上にある/tmp/dir01をウォッチします。ファイルの作成を検知した時、" "で区切られた単語を集計する簡単なプログラム。

streaming.py

# -*- coding:utf-8 -*-
from pyspark.context import SparkContext
from pyspark import StorageLevel
from pyspark.streaming import StreamingContext
sc = SparkContext("local", "myAppName")
ssc = StreamingContext(sc, 10)
L = ssc.textFileStream("/tmp/dir01")
W = L.flatMap(lambda line: line.split(" ")).filter(lambda x:x)
WC = W.map(lambda word: (word, 1)).reduceByKey(lambda x, y: x + y)
WC.pprint()
ssc.start()
ssc.awaitTermination()


プログラムの実行

$ spark/bin/spark-submit --master local streaming.py


ファイルが、StreamingContext(sc, 10) #10秒間隔内に作成されなかった場合は確認した時間のみ表示されていく。

-------------------------------------------
Time: 2020-08-13 05:43:30
-------------------------------------------

-------------------------------------------
Time: 2020-08-13 05:43:40
-------------------------------------------


ウォッチディレクトリにファイルを作成

Spark実行コンソールとは別のコンソールから、ファイルを作成

$ echo "Yui Ichikawa - City: Kochi - Country: Japan" > /tmp/dir01/file04
-------------------------------------------
Time: 2020-08-13 05:48:20
-------------------------------------------
(u'Yui', 1)
(u'Ichikawa', 1)
(u'-', 2)
(u'City:', 1)
(u'Kochi', 1)
(u'Country:', 1)
(u'Japan', 1)

のように単語ごとの出現数が出力される。


別ファイルを作成

$  echo "Yui Ichikawa Rin Ichikawa  - City: Kochi - Country: Japan" > /tmp/dir01/file05
-------------------------------------------
Time: 2020-08-13 05:49:00
-------------------------------------------
(u'City:', 1)
(u'Ichikawa', 2)
(u'-', 2)
(u'Yui', 1)
(u'Rin', 1)
(u'Kochi', 1)
(u'Country:', 1)
(u'Japan', 1)