jupyterでPySparkを使ってみました

PythonにPySparkがあることを最近知ったため、
勉強として試してみました。






PySparkとは



OSSの並行分散処理ができるフレームワークです。

処理が高速のため大規模データ処理に適しています。

SparkはJavaScalaPythonで使うことができ、
PySparkはSparkを実行するためのPython APIのことです。





Sparkには2つのプログラミングモデルがあります。




  • RDD -- 構造化でないデータに対して柔軟な処理ができます。


  • Dataframe -- SQLのように処理を書くことができ、テーブルのような構造化データの処理はこちらのほうが便利です。
    処理速度もRDDより速いです。







今回はインタラクティブシェルでRDD、jupyterでDataframeを使いました。






やったこと









参考



spark.apache.org



qiita.com






自分の環境









Sparkのインストール



Sparkの動作にJREが必要なのでJREもインストールしておきます。




$ brew cask install java

$ brew install apache-spark





インストール後に下記コマンドを実行すると、
インタラクティブシェルで対話的に操作できるようになります。




$ pyspark

>>>





インタラクティブシェルでの操作は、こちらを参考にしました。



blog.amedama.jp





SparkContextのインスタンスscとして既に用意されているようなので、

scを使えばすぐにSparkでの操作ができます。




# ファイルからRDDインスタンスを生成
>>> textfile = sc.textFile("README.md")

# Sparkの文字を含む行を抽出
>>> filtered_rdd = textfile.filter(lambda line: u'Spark' in line)

# スペースで区切る
>>> words = textfile.flatMap(lambda line: line.split())

# タプルにする(出現頻度のカウント用)
>>> words_tuple = words.map(lambda word: (word, 1))

# 単語の出現頻度をカウント
>>> words_count = words_tuple.reduceByKey(lambda a, b: a + b)
>>> words_count.collect()[:10]

# 出現頻度を降順ソート
>>> words_count_sorted = words_count.sortBy(lambda t: t[1], False)
>>> words_count_sorted.collect()[:10]






Jupyterで起動



pysparkコマンドを実行すると、Jupyterを起動するよう設定を変更します。

��jupyterのインストールはpipかanacondaを使って済ましておきます。)






環境設定



Sparkの設定ファイルを作成します。

デフォルトだとtemplateしかないので、複製して新しく作成します。

ディレクトリパスやバージョンは、適宜自分の環境に読み替えてください。




$ cd /usr/local/Cellar/apache-spark/2.2.1/libexec/

$ cp conf/spark-env.sh.template conf/spark-env.sh

$ vi conf/spark-env.sh





jupyterでSparkでの起動ができるよう、spark-env.shの最下部に下記を追加します。

spark-env.sh




export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS='notebook'





pysparkコマンドを実行すると、jupyterが起動するようになります。




$ pyspark






データの取得



操作するためのjsonファイルのデータを取得します。

米アマゾンの商品口コミデータが取ってこれるようなので、これを使わせて頂きました。

ターミナルで下記コマンドを実行して圧縮ファイルを取得します。




wget -O /tmp/reviews_Grocery_and_Gourmet_Food_5.json.gz http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Grocery_and_Gourmet_Food_5.json.gz





解凍してできるjsonファイルを任意の場所に置きます。

jupyterでパスを指定せずに読み取れるよう、jupyterを起動するディレクトリに置くと便利です。






Pandasでデータ出力



jsonファイルを読み込んで指定したカラムで出力します。

read_json()のパラメータlinesを指定しないと1行ずつ読み込まれずエラーになります。




import pandas as pd

data = pd.read_json('reviews_Grocery_and_Gourmet_Food_5.json', lines=True)
str_columns = ['asin','reviewText','reviewTime', 'reviewerID','reviewerName','summary']
df = pd.DataFrame.from_dict(data)[str_columns]
df





f:id:mtomitomi:20180127174251p:plain






jsonをDataframeに変換



sqlContext.read.json()の引数にjsonのファイル名を指定することで、SparkのDataframeを作成することができます。

printSchema()で作成したスキーマを表示することができます。

show()に数値を指定することで、作成した3行のデータを出力することができます。




df_json = sqlContext.read.json('reviews_Grocery_and_Gourmet_Food_5.json')
df_json.printSchema()
df_json.show(3)





f:id:mtomitomi:20180127174100p:plain






絞り込み



filter()を使って条件に一致したデータのみの取得をすることができます。




df_json = sqlContext.read.json('reviews_Grocery_and_Gourmet_Food_5.json')
df_json.filter(df_json.overall > 4.0).show(10)





f:id:mtomitomi:20180127185833p:plain






抽出



select()を使って指定した列のみを抽出することができます。




df_json = sqlContext.read.json('reviews_Grocery_and_Gourmet_Food_5.json')
df_json.select('asin', 'reviewText', 'reviewTime').show(10)





f:id:mtomitomi:20180127190137p:plain






レコード数で集計



groupBy()count()sort()を使うことができます。

下記は件数が多いasinの上位10件を取得しています。




df_json = sqlContext.read.json('reviews_Grocery_and_Gourmet_Food_5.json')
df_json.groupBy('asin').count().sort('count', ascending=False).show(10)





f:id:mtomitomi:20180127194221p:plain






やってみた感想



SQLのように書けますし、Javaラムダ式をよく使っていたこともあり、
APIの使い方が似ていて非常に理解しやすかったです。
PySparkの速さについては検証できていませんが、ここで使ったjsonファイルを利用して、
時間があるときに処理速度の比較をしてみようと思います。



Sparkとは関係なく初歩的なことなのですが、Pythonでlambda関数が使えるのを初めて知りました。
Javaで使っていたので仕組みはわかりますが、
書き方がJavaとだいぶ異なっていたので少し戸惑いました。

Pythonの基礎的な部分でまだまだ知らないことがあると自覚できたので、
オンラインサイト等で学習を進めておこうと思います。