簡単なデータ操作を PySpark & pandas の DataFrame で行う
Spark v1.3.0 で追加された DataFrame
、結構いいらしいという話は聞いていたのだが 自分で試すことなく時間が過ぎてしまっていた。ようやく PySpark
を少し触れたので pandas
との比較をまとめておきたい。内容に誤りや よりよい方法があればご指摘 下さい。
過去に基本的なデータ操作について 以下 ふたつの記事を書いたことがあるので、同じ処理のPySpark
版を加えたい。今回は ひとつめの "簡単なデータ操作〜" に相当する内容。
pandas 版
準備
環境は EC2 に作る。Spark のインストールについてはそのへんに情報あるので省略。サンプルデータは iris を csv でダウンロードしてホームディレクトリにおいた。以降の操作は PySpark
のコンソールで行う。
# Spark のパスに移動 $ echo $SPARK_HOME /usr/local/spark $ cd $SPARK_HOME $ pwd /usr/local/spark # 既定だとログの出力が邪魔なので抑制 $ cp conf/log4j.properties.template conf/log4j.properties $ vi conf/log4j.properties # INFO をすべて WARN に変える $ bin/pyspark # Welcome to # ____ __ # / __/__ ___ _____/ /__ # _\ \/ _ \/ _ `/ __/ '_/ # /__ / .__/\_,_/_/ /_/\_\ version 1.3.1 # /_/ # # Using Python version 2.7.9 (default, Apr 1 2015 19:28:03) # SparkContext available as sc, HiveContext available as sqlContext.
ここでは HDFS は使わず、pandas
の read_csv
で pandas.DataFrame
を作成する。列名に "." が入っていると PySpark
でうまく動かないようだったため以下のカラム名にした。
import pandas as pd # 表示する行数を設定 pd.options.display.max_rows=10 names = ['SepalLength', 'SepalWidth', 'PetalLength', 'PetalWidth', 'Species'] # pandas pdf = pd.read_csv('~/iris.csv', header=None, names=names) pdf # SepalLength SepalWidth PetalLength PetalWidth Species # 0 5.1 3.5 1.4 0.2 setosa # 1 4.9 3.0 1.4 0.2 setosa # 2 4.7 3.2 1.3 0.2 setosa # 3 4.6 3.1 1.5 0.2 setosa # 4 5.0 3.6 1.4 0.2 setosa # .. ... ... ... ... ... # 145 6.7 3.0 5.2 2.3 virginica # 146 6.3 2.5 5.0 1.9 virginica # 147 6.5 3.0 5.2 2.0 virginica # 148 6.2 3.4 5.4 2.3 virginica # 149 5.9 3.0 5.1 1.8 virginica # # [150 rows x 5 columns]
これを PySpark
の DataFrame
に変換する。pandas
と PySpark
の基本的な違いとして、
PySpark
にはpandas.Index
にあたるものがないPySpark
にはpandas.Series
にあたるものがない ( 代わり? にRow
があるが、どの程度 柔軟な操作ができるかは未知 )
# PySpark sdf = sqlContext.createDataFrame(pdf) sdf # DataFrame[SepalLength: double, SepalWidth: double, PetalLength: double, # PetalWidth: double, Species: string] # データの中身を表示するには DataFrame.show() sdf.show() # SepalLength SepalWidth PetalLength PetalWidth Species # 5.1 3.5 1.4 0.2 setosa # 4.9 3.0 1.4 0.2 setosa # 4.7 3.2 1.3 0.2 setosa # 4.6 3.1 1.5 0.2 setosa # .. ... ... ... ... # 5.4 3.9 1.3 0.4 setosa # 5.1 3.5 1.4 0.3 setosa # 5.7 3.8 1.7 0.3 setosa # 5.1 3.8 1.5 0.3 setosa
以降、pdf
が pandas
、sdf
が PySpark
の DataFrame
をあらわす。
type(pdf) # <class 'pandas.core.frame.DataFrame'> type(sdf) # <class 'pyspark.sql.dataframe.DataFrame'>
基本
まず基本的な操作を。先頭いくつかのデータを確認するには head
。
PySpark
での返り値は Row
インスタンスのリストになる。
# pandas pdf.head(5) # SepalLength SepalWidth PetalLength PetalWidth Species # 0 5.1 3.5 1.4 0.2 setosa # 1 4.9 3.0 1.4 0.2 setosa # 2 4.7 3.2 1.3 0.2 setosa # 3 4.6 3.1 1.5 0.2 setosa # 4 5.0 3.6 1.4 0.2 setosa # PySpark sdf.head(5) # [Row(SepalLength=5.1, SepalWidth=3.5, PetalLength=1.4, PetalWidth=0.2, Species=u'setosa'), # Row(SepalLength=4.9, SepalWidth=3.0, PetalLength=1.4, PetalWidth=0.2, Species=u'setosa'), # Row(SepalLength=4.7, SepalWidth=3.2, PetalLength=1.3, PetalWidth=0.2, Species=u'setosa'), # Row(SepalLength=4.6, SepalWidth=3.1, PetalLength=1.5, PetalWidth=0.2, Species=u'setosa'), # Row(SepalLength=5.0, SepalWidth=3.6, PetalLength=1.4, PetalWidth=0.2, Species=u'setosa')] # pandas type(pdf.head(5)) # <class 'pandas.core.frame.DataFrame'> # PySpark type(sdf.head(5)) # <type 'list'>
各カラムのデータ型の確認には dtypes
。
# pandas pdf.dtypes # SepalLength float64 # SepalWidth float64 # PetalLength float64 # PetalWidth float64 # Species object # dtype: object # PySpark sdf.dtypes # [('SepalLength', 'double'), ('SepalWidth', 'double'), # ('PetalLength', 'double'), ('PetalWidth', 'double'), # ('Species', 'string')]
要約統計量の確認は describe
。
# pandas pdf.describe() # SepalLength SepalWidth PetalLength PetalWidth # count 150.000000 150.000000 150.000000 150.000000 # mean 5.843333 3.054000 3.758667 1.198667 # std 0.828066 0.433594 1.764420 0.763161 # min 4.300000 2.000000 1.000000 0.100000 # 25% 5.100000 2.800000 1.600000 0.300000 # 50% 5.800000 3.000000 4.350000 1.300000 # 75% 6.400000 3.300000 5.100000 1.800000 # max 7.900000 4.400000 6.900000 2.500000 # PySpark sdf.describe().show() # summary SepalLength SepalWidth PetalLength PetalWidth # count 150 150 150 150 # mean 5.843333333333334 3.0540000000000003 3.758666666666666 1.1986666666666668 # stddev 0.8253012917851317 0.4321465800705415 1.7585291834055206 0.760612618588172 # min 4.3 2.0 1.0 0.1 # max 7.9 4.4 6.9 2.5
列操作
列名操作
参照と変更。PySpark
ではプロパティへの代入は不可。
# pandas pdf.columns # Index([u'SepalLength', u'SepalWidth', u'PetalLength', u'PetalWidth', u'Species'], dtype='object') # PySpark sdf.columns # [u'SepalLength', u'SepalWidth', u'PetalLength', u'PetalWidth', u'Species'] # pandas pdf.columns = [1, 2, 3, 4, 5] pdf.columns # Int64Index([1, 2, 3, 4, 5], dtype='int64') # PySpark sdf.columns = [1, 2, 3, 4, 5] # AttributeError: can't set attribute
マッピングによって列名を変更するには、pandas
では DataFrame.rename
。
# pandas pdf.columns # Int64Index([1, 2, 3, 4, 5], dtype='int64') pdf = pdf.rename(columns={1:'SepalLength', 2:'SepalWidth', 3:'PetalLength', 4:'PetalWidth', 5:'Species'}) pdf # SepalLength SepalWidth PetalLength PetalWidth Species # 0 5.1 3.5 1.4 0.2 setosa # 1 4.9 3.0 1.4 0.2 setosa # 2 4.7 3.2 1.3 0.2 setosa # 3 4.6 3.1 1.5 0.2 setosa # 4 5.0 3.6 1.4 0.2 setosa # .. ... ... ... ... ... # 145 6.7 3.0 5.2 2.3 virginica # 146 6.3 2.5 5.0 1.9 virginica # 147 6.5 3.0 5.2 2.0 virginica # 148 6.2 3.4 5.4 2.3 virginica # 149 5.9 3.0 5.1 1.8 virginica # # [150 rows x 5 columns]
PySpark
では DataFrame.withColumnRenamed
。
# PySpark sdf.withColumnRenamed('Species', 'xxx') # DataFrame[SepalLength: double, SepalWidth: double, PetalLength: double, # PetalWidth: double, xxx: string]
補足 非破壊的な処理のため、反映には代入が必要。ここでは列名変更したくないので代入しない。
列名による列選択
pandas.DataFrame
の列選択では 当該の列のデータを含む Series
が返ってくる。
# pandas pdf.Species # 0 setosa # 1 setosa # 2 setosa # 3 setosa # 4 setosa # ... # 145 virginica # 146 virginica # 147 virginica # 148 virginica # 149 virginica # Name: Species, dtype: object pdf['Species'] # 略
PySpark
では、列の属性を表現する Column
インスタンスが返ってくる。
# PySpark sdf.Species # Column<Species> sdf['Species'] # Column<Species>
pandas
、PySpark
いずれも、文字列ではなくリストを渡せば その列を DataFrame
としてスライシングする。
# pandas pdf[['PetalWidth']] # PetalWidth # 0 0.2 # 1 0.2 # 2 0.2 # 3 0.2 # 4 0.2 # .. ... # 145 2.3 # 146 1.9 # 147 2.0 # 148 2.3 # 149 1.8 # # [150 rows x 1 columns] # PySpark sdf[['PetalWidth']] # DataFrame[PetalWidth: double] # pandas pdf[['PetalWidth', 'PetalLength']] # PetalWidth PetalLength # 0 0.2 1.4 # 1 0.2 1.4 # 2 0.2 1.3 # 3 0.2 1.5 # 4 0.2 1.4 # .. ... ... # 145 2.3 5.2 # 146 1.9 5.0 # 147 2.0 5.2 # 148 2.3 5.4 # 149 1.8 5.1 # # [150 rows x 2 columns] # PySpark sdf[['PetalWidth', 'PetalLength']] # DataFrame[PetalWidth: double, PetalLength: double]
PySpark
では DataFrame.select
でもよい。
# PySpark sdf.select(sdf['PetalWidth'], sdf['PetalLength']) # DataFrame[PetalWidth: double, PetalLength: double]
真偽値リストによる列選択
自分はあまり使わないのだが、R {dplyr} との比較という観点で。
indexer = [False, False, True, True, False] # pandas pdf.loc[:, indexer] # PetalLength PetalWidth # 0 1.4 0.2 # 1 1.4 0.2 # 2 1.3 0.2 # 3 1.5 0.2 # 4 1.4 0.2 # .. ... ... # 145 5.2 2.3 # 146 5.0 1.9 # 147 5.2 2.0 # 148 5.4 2.3 # 149 5.1 1.8 # # [150 rows x 2 columns] # PySpark sdf[[c for c, i in zip(sdf.columns, indexer) if i is True]] # DataFrame[PetalLength: double, PetalWidth: double]
列の属性による列選択
列の型が pandas
の場合は float
、PySpark
の場合は double
の列のみ取り出す。
# pandas pdf.loc[:, pdf.dtypes == float] # SepalLength SepalWidth PetalLength PetalWidth # 0 5.1 3.5 1.4 0.2 # 1 4.9 3.0 1.4 0.2 # 2 4.7 3.2 1.3 0.2 # 3 4.6 3.1 1.5 0.2 # 4 5.0 3.6 1.4 0.2 # .. ... ... ... ... # 145 6.7 3.0 5.2 2.3 # 146 6.3 2.5 5.0 1.9 # 147 6.5 3.0 5.2 2.0 # 148 6.2 3.4 5.4 2.3 # 149 5.9 3.0 5.1 1.8 # # [150 rows x 4 columns] # PySpark sdf[[c for c, type in sdf.dtypes if type == 'double']] # DataFrame[SepalLength: double, SepalWidth: double, PetalLength: double, # PetalWidth: double]
行操作
値の条件による行選択
# pandas pdf[pdf['Species'] == 'virginica'] # SepalLength SepalWidth PetalLength PetalWidth Species # 100 6.3 3.3 6.0 2.5 virginica # 101 5.8 2.7 5.1 1.9 virginica # 102 7.1 3.0 5.9 2.1 virginica # 103 6.3 2.9 5.6 1.8 virginica # 104 6.5 3.0 5.8 2.2 virginica # .. ... ... ... ... ... # 145 6.7 3.0 5.2 2.3 virginica # 146 6.3 2.5 5.0 1.9 virginica # 147 6.5 3.0 5.2 2.0 virginica # 148 6.2 3.4 5.4 2.3 virginica # 149 5.9 3.0 5.1 1.8 virginica # # [50 rows x 5 columns] # PySpark sdf[sdf['Species'] == 'virginica'].show() # SepalLength SepalWidth PetalLength PetalWidth Species # 6.3 3.3 6.0 2.5 virginica # 5.8 2.7 5.1 1.9 virginica # 7.1 3.0 5.9 2.1 virginica # 6.3 2.9 5.6 1.8 virginica # .. ... ... ... ... # 6.5 3.0 5.5 1.8 virginica # 7.7 3.8 6.7 2.2 virginica # 7.7 2.6 6.9 2.3 virginica # 6.0 2.2 5.0 1.5 virginica
PySpark
では DataFrame.filter
でもよい。
# PySpark sdf.filter(sdf['Species'] == 'virginica').show() # 略
行番号による行選択
PySpark
では 冒頭の n 行を抽出したりはできるが、適当な行選択ではできない、、、と思う。PySpark
でどうしてもやりたければ index にあたる列を追加 -> 列の値で選択。
4/27追記 DataFrame.collect
で Row
のリストに変換 -> 再度 DataFrame
化すればスキーマ変更せずにできる、、、が、マスタでデータを処理することになるのでよい方法ではなさそう。
# pandas pdf.loc[[2, 3, 4]] # SepalLength SepalWidth PetalLength PetalWidth Species # 2 4.7 3.2 1.3 0.2 setosa # 3 4.6 3.1 1.5 0.2 setosa # 4 5.0 3.6 1.4 0.2 setosa # PySpark [c for i, c in enumerate(sdf.collect()) if i in (2, 3 ,4)] # [Row(SepalLength=4.7, SepalWidth=3.2, PetalLength=1.3, PetalWidth=0.2, Species=u'setosa'), # Row(SepalLength=4.6, SepalWidth=3.1, PetalLength=1.5, PetalWidth=0.2, Species=u'setosa'), # Row(SepalLength=5.0, SepalWidth=3.6, PetalLength=1.4, PetalWidth=0.2, Species=u'setosa')] sqlContext.createDataFrame([c for i, c in enumerate(sdf.collect()) if i in (2, 3 ,4)]).show() # SepalLength SepalWidth PetalLength PetalWidth Species # 4.7 3.2 1.3 0.2 setosa # 4.6 3.1 1.5 0.2 setosa # 5.0 3.6 1.4 0.2 setosa
ランダムサンプリング
pandas
ではindex
をサンプリングしてスライス。
import random # pandas pdf.loc[random.sample(pdf.index, 5)] # Sepal.Length Sepal.Width Petal.Length Petal.Width Species # 64 6.1 2.9 4.7 1.4 versicolor # 17 5.4 3.9 1.3 0.4 setosa # 14 4.3 3.0 1.1 0.1 setosa # 4 4.6 3.1 1.5 0.2 setosa # 146 6.7 3.0 5.2 2.3 virginica
PySpark
では DataFrame.sample
。この関数、fraction
の確率で行ごとに選択しているようで、サンプリングするたびに抽出される行数が変わる ( 行数指定でランダムサンプリング、という処理はできなさそう )。
# PySpark sdf.sample(False, 5.0 / sdf.count()).show() # SepalLength SepalWidth PetalLength PetalWidth Species # 5.4 3.4 1.7 0.2 setosa # 5.7 2.8 4.5 1.3 versicolor # 6.2 2.8 4.8 1.8 virginica # 6.1 3.0 4.9 1.8 virginica
補足 今後、pandas
にもサンプリングメソッド DataFrame.sample
が実装予定 #9666。
列の追加
pandas
では DataFrame
に対して 直接 新しい列を追加できるが、PySpark
ではできない。
# 破壊的な処理になるのでコピー pdf_temp = pdf.copy() # pandas pdf_temp['PetalMult'] = pdf_temp['PetalWidth'] * pdf_temp['PetalLength'] pdf_temp # SepalLength SepalWidth PetalLength PetalWidth Species PetalMult # 0 5.1 3.5 1.4 0.2 setosa 0.28 # 1 4.9 3.0 1.4 0.2 setosa 0.28 # 2 4.7 3.2 1.3 0.2 setosa 0.26 # 3 4.6 3.1 1.5 0.2 setosa 0.30 # 4 5.0 3.6 1.4 0.2 setosa 0.28 # .. ... ... ... ... ... ... # 145 6.7 3.0 5.2 2.3 virginica 11.96 # 146 6.3 2.5 5.0 1.9 virginica 9.50 # 147 6.5 3.0 5.2 2.0 virginica 10.40 # 148 6.2 3.4 5.4 2.3 virginica 12.42 # 149 5.9 3.0 5.1 1.8 virginica 9.18 # # [150 rows x 6 columns] # PySpark sdf['PetalMult'] = sdf['PetalWidth'] * sdf['PetalLength'] # TypeError: 'DataFrame' object does not support item assignment
PySpark
での列追加は DataFrame.withColumn
。
# PySpark sdf.withColumn('PetalMult', sdf.PetalWidth * sdf.PetalLength).show() # SepalLength SepalWidth PetalLength PetalWidth Species PetalMult # 5.1 3.5 1.4 0.2 setosa 0.27999999999999997 # 4.9 3.0 1.4 0.2 setosa 0.27999999999999997 # 4.7 3.2 1.3 0.2 setosa 0.26 # 4.6 3.1 1.5 0.2 setosa 0.30000000000000004 # .. .. ... ... ... ... # 5.4 3.9 1.3 0.4 setosa 0.52 # 5.1 3.5 1.4 0.3 setosa 0.42 # 5.7 3.8 1.7 0.3 setosa 0.51 # 5.1 3.8 1.5 0.3 setosa 0.44999999999999996
補足 pandas
v0.16.0 以降では 類似のメソッド DataFrame.assign
が追加。
# pandas pdf.assign(PetalMult=pdf['PetalWidth'] * pdf['PetalLength']) # 略
まとめ
PySpark
と pandas
のデータ操作を整理した。
PySpark
、上のような基本的な処理は pandas
と似たやり方で直感的に使える感じだ。
大規模処理は PySpark
、細かい取り回しが必要なものは pandas
でうまく併用できるとよさそう。
4/29追記 続きはこちら。
Learning Spark: Lightning-Fast Big Data Analysis
- 作者: Holden Karau,Andy Konwinski,Patrick Wendell,Matei Zaharia
- 出版社/メーカー: O'Reilly Media
- 発売日: 2015/01/28
- メディア: Kindle版
- この商品を含むブログを見る