PythonにPySparkがあることを最近知ったため、
勉強として試してみました。
PySparkとは
OSSの並行分散処理ができるフレームワークです。
処理が高速のため大規模データ処理に適しています。
SparkはJava、Scala、Pythonで使うことができ、
PySparkはSparkを実行するためのPython APIのことです。
Sparkには2つのプログラミングモデルがあります。
RDD -- 構造化でないデータに対して柔軟な処理ができます。
Dataframe -- SQLのように処理を書くことができ、テーブルのような構造化データの処理はこちらのほうが便利です。
処理速度もRDDより速いです。
今回はインタラクティブシェルでRDD、jupyterでDataframeを使いました。
やったこと
環境構築
インタラクティブシェルを使った操作
Jupyter notebookを使った操作
参考
自分の環境
Sparkのインストール
Sparkの動作にJREが必要なのでJREもインストールしておきます。
$ brew cask install java
$ brew install apache-spark
インストール後に下記コマンドを実行すると、
インタラクティブシェルで対話的に操作できるようになります。
$ pyspark
>>>
インタラクティブシェルでの操作は、こちらを参考にしました。
SparkContextのインスタンスがsc
として既に用意されているようなので、
scを使えばすぐにSparkでの操作ができます。
# ファイルからRDDインスタンスを生成
>>> textfile = sc.textFile("README.md")
# Sparkの文字を含む行を抽出
>>> filtered_rdd = textfile.filter(lambda line: u'Spark' in line)
# スペースで区切る
>>> words = textfile.flatMap(lambda line: line.split())
# タプルにする(出現頻度のカウント用)
>>> words_tuple = words.map(lambda word: (word, 1))
# 単語の出現頻度をカウント
>>> words_count = words_tuple.reduceByKey(lambda a, b: a + b)
>>> words_count.collect()[:10]
# 出現頻度を降順ソート
>>> words_count_sorted = words_count.sortBy(lambda t: t[1], False)
>>> words_count_sorted.collect()[:10]
Jupyterで起動
pysparkコマンドを実行すると、Jupyterを起動するよう設定を変更します。
��jupyterのインストールはpipかanacondaを使って済ましておきます。)
環境設定
Sparkの設定ファイルを作成します。
デフォルトだとtemplateしかないので、複製して新しく作成します。
ディレクトリパスやバージョンは、適宜自分の環境に読み替えてください。
$ cd /usr/local/Cellar/apache-spark/2.2.1/libexec/
$ cp conf/spark-env.sh.template conf/spark-env.sh
$ vi conf/spark-env.sh
jupyterでSparkでの起動ができるよう、spark-env.shの最下部に下記を追加します。
spark-env.sh
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS='notebook'
pysparkコマンドを実行すると、jupyterが起動するようになります。
$ pyspark
データの取得
操作するためのjsonファイルのデータを取得します。
米アマゾンの商品口コミデータが取ってこれるようなので、これを使わせて頂きました。
ターミナルで下記コマンドを実行して圧縮ファイルを取得します。
wget -O /tmp/reviews_Grocery_and_Gourmet_Food_5.json.gz http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Grocery_and_Gourmet_Food_5.json.gz
解凍してできるjsonファイルを任意の場所に置きます。
jupyterでパスを指定せずに読み取れるよう、jupyterを起動するディレクトリに置くと便利です。
Pandasでデータ出力
jsonファイルを読み込んで指定したカラムで出力します。
read_json()のパラメータlines
を指定しないと1行ずつ読み込まれずエラーになります。
import pandas as pd
data = pd.read_json('reviews_Grocery_and_Gourmet_Food_5.json', lines=True)
str_columns = ['asin','reviewText','reviewTime', 'reviewerID','reviewerName','summary']
df = pd.DataFrame.from_dict(data)[str_columns]
df
jsonをDataframeに変換
sqlContext.read.json()
の引数にjsonのファイル名を指定することで、SparkのDataframeを作成することができます。printSchema()
で作成したスキーマを表示することができます。show()
に数値を指定することで、作成した3行のデータを出力することができます。
df_json = sqlContext.read.json('reviews_Grocery_and_Gourmet_Food_5.json')
df_json.printSchema()
df_json.show(3)
絞り込み
filter()
を使って条件に一致したデータのみの取得をすることができます。
df_json = sqlContext.read.json('reviews_Grocery_and_Gourmet_Food_5.json')
df_json.filter(df_json.overall > 4.0).show(10)
抽出
select()
を使って指定した列のみを抽出することができます。
df_json = sqlContext.read.json('reviews_Grocery_and_Gourmet_Food_5.json')
df_json.select('asin', 'reviewText', 'reviewTime').show(10)
レコード数で集計
groupBy()
、count()
、sort()
を使うことができます。
下記は件数が多いasinの上位10件を取得しています。
df_json = sqlContext.read.json('reviews_Grocery_and_Gourmet_Food_5.json')
df_json.groupBy('asin').count().sort('count', ascending=False).show(10)
やってみた感想
SQLのように書けますし、Javaでラムダ式をよく使っていたこともあり、
APIの使い方が似ていて非常に理解しやすかったです。
PySparkの速さについては検証できていませんが、ここで使ったjsonファイルを利用して、
時間があるときに処理速度の比較をしてみようと思います。
Sparkとは関係なく初歩的なことなのですが、Pythonでlambda関数が使えるのを初めて知りました。
Javaで使っていたので仕組みはわかりますが、
書き方がJavaとだいぶ異なっていたので少し戸惑いました。
Pythonの基礎的な部分でまだまだ知らないことがあると自覚できたので、
オンラインサイト等で学習を進めておこうと思います。