読者です 読者をやめる 読者になる 読者になる

StatsFragments

Python, R, Rust, 統計, 機械学習とか

簡単なデータ操作を PySpark & pandas の DataFrame で行う

Spark v1.3.0 で追加された DataFrame 、結構いいらしいという話は聞いていたのだが 自分で試すことなく時間が過ぎてしまっていた。ようやく PySpark を少し触れたので pandas との比較をまとめておきたい。内容に誤りや よりよい方法があればご指摘 下さい。

過去に基本的なデータ操作について 以下 ふたつの記事を書いたことがあるので、同じ処理のPySpark 版を加えたい。今回は ひとつめの "簡単なデータ操作〜" に相当する内容。

pandas 版

準備

環境は EC2 に作る。Spark のインストールについてはそのへんに情報あるので省略。サンプルデータは iriscsv でダウンロードしてホームディレクトリにおいた。以降の操作は PySpark のコンソールで行う。

# Spark のパスに移動
$ echo $SPARK_HOME
/usr/local/spark
$ cd $SPARK_HOME
$ pwd
/usr/local/spark

# 既定だとログの出力が邪魔なので抑制
$ cp conf/log4j.properties.template conf/log4j.properties
$ vi conf/log4j.properties
# INFO をすべて WARN に変える

$ bin/pyspark
# Welcome to
#       ____              __
#      / __/__  ___ _____/ /__
#     _\ \/ _ \/ _ `/ __/  '_/
#    /__ / .__/\_,_/_/ /_/\_\   version 1.3.1
#       /_/
# 
# Using Python version 2.7.9 (default, Apr  1 2015 19:28:03)
# SparkContext available as sc, HiveContext available as sqlContext.

ここでは HDFS は使わず、pandasread_csvpandas.DataFrame を作成する。列名に "." が入っていると PySpark でうまく動かないようだったため以下のカラム名にした。

import pandas as pd
# 表示する行数を設定
pd.options.display.max_rows=10

names = ['SepalLength', 'SepalWidth', 'PetalLength', 'PetalWidth', 'Species']

# pandas
pdf = pd.read_csv('~/iris.csv', header=None, names=names)
pdf
#      SepalLength  SepalWidth  PetalLength  PetalWidth    Species
# 0            5.1         3.5          1.4         0.2     setosa
# 1            4.9         3.0          1.4         0.2     setosa
# 2            4.7         3.2          1.3         0.2     setosa
# 3            4.6         3.1          1.5         0.2     setosa
# 4            5.0         3.6          1.4         0.2     setosa
# ..           ...         ...          ...         ...        ...
# 145          6.7         3.0          5.2         2.3  virginica
# 146          6.3         2.5          5.0         1.9  virginica
# 147          6.5         3.0          5.2         2.0  virginica
# 148          6.2         3.4          5.4         2.3  virginica
# 149          5.9         3.0          5.1         1.8  virginica
# 
# [150 rows x 5 columns]

これを PySparkDataFrame に変換する。pandasPySpark の基本的な違いとして、

  • PySpark には pandas.Index にあたるものがない
  • PySpark には pandas.Series にあたるものがない ( 代わり? に Row があるが、どの程度 柔軟な操作ができるかは未知 )
# PySpark
sdf = sqlContext.createDataFrame(pdf)

sdf
# DataFrame[SepalLength: double, SepalWidth: double, PetalLength: double,
#           PetalWidth: double, Species: string]

# データの中身を表示するには DataFrame.show()
sdf.show()
# SepalLength SepalWidth PetalLength PetalWidth Species
# 5.1         3.5        1.4         0.2        setosa 
# 4.9         3.0        1.4         0.2        setosa 
# 4.7         3.2        1.3         0.2        setosa 
# 4.6         3.1        1.5         0.2        setosa 
# ..          ...        ...         ...        ...
# 5.4         3.9        1.3         0.4        setosa 
# 5.1         3.5        1.4         0.3        setosa 
# 5.7         3.8        1.7         0.3        setosa 
# 5.1         3.8        1.5         0.3        setosa 

以降、pdfpandassdfPySparkDataFrame をあらわす。

type(pdf)
# <class 'pandas.core.frame.DataFrame'>
type(sdf)
# <class 'pyspark.sql.dataframe.DataFrame'> 

基本

まず基本的な操作を。先頭いくつかのデータを確認するには headPySpark での返り値は Row インスタンスのリストになる。

# pandas
pdf.head(5)
#    SepalLength  SepalWidth  PetalLength  PetalWidth Species
# 0          5.1         3.5          1.4         0.2  setosa
# 1          4.9         3.0          1.4         0.2  setosa
# 2          4.7         3.2          1.3         0.2  setosa
# 3          4.6         3.1          1.5         0.2  setosa
# 4          5.0         3.6          1.4         0.2  setosa

# PySpark
sdf.head(5)
# [Row(SepalLength=5.1, SepalWidth=3.5, PetalLength=1.4, PetalWidth=0.2, Species=u'setosa'),
#  Row(SepalLength=4.9, SepalWidth=3.0, PetalLength=1.4, PetalWidth=0.2, Species=u'setosa'),
#  Row(SepalLength=4.7, SepalWidth=3.2, PetalLength=1.3, PetalWidth=0.2, Species=u'setosa'),
#  Row(SepalLength=4.6, SepalWidth=3.1, PetalLength=1.5, PetalWidth=0.2, Species=u'setosa'),
#  Row(SepalLength=5.0, SepalWidth=3.6, PetalLength=1.4, PetalWidth=0.2, Species=u'setosa')]

# pandas
type(pdf.head(5))
# <class 'pandas.core.frame.DataFrame'>

# PySpark
type(sdf.head(5))
# <type 'list'>

各カラムのデータ型の確認には dtypes

# pandas
pdf.dtypes
# SepalLength    float64
# SepalWidth     float64
# PetalLength    float64
# PetalWidth     float64
# Species         object
# dtype: object

# PySpark
sdf.dtypes
# [('SepalLength', 'double'), ('SepalWidth', 'double'),
#  ('PetalLength', 'double'), ('PetalWidth', 'double'),
#  ('Species', 'string')]

要約統計量の確認は describe

# pandas
pdf.describe()
#        SepalLength  SepalWidth  PetalLength  PetalWidth
# count   150.000000  150.000000   150.000000  150.000000
# mean      5.843333    3.054000     3.758667    1.198667
# std       0.828066    0.433594     1.764420    0.763161
# min       4.300000    2.000000     1.000000    0.100000
# 25%       5.100000    2.800000     1.600000    0.300000
# 50%       5.800000    3.000000     4.350000    1.300000
# 75%       6.400000    3.300000     5.100000    1.800000
# max       7.900000    4.400000     6.900000    2.500000

# PySpark
sdf.describe().show()
# summary SepalLength        SepalWidth         PetalLength        PetalWidth        
# count   150                150                150                150               
# mean    5.843333333333334  3.0540000000000003 3.758666666666666  1.1986666666666668
# stddev  0.8253012917851317 0.4321465800705415 1.7585291834055206 0.760612618588172 
# min     4.3                2.0                1.0                0.1               
# max     7.9                4.4                6.9                2.5     

列操作

列名操作

参照と変更。PySpark ではプロパティへの代入は不可。

# pandas
pdf.columns
# Index([u'SepalLength', u'SepalWidth', u'PetalLength', u'PetalWidth', u'Species'], dtype='object')

# PySpark
sdf.columns
# [u'SepalLength', u'SepalWidth', u'PetalLength', u'PetalWidth', u'Species']

# pandas
pdf.columns = [1, 2, 3, 4, 5]
pdf.columns
# Int64Index([1, 2, 3, 4, 5], dtype='int64')

# PySpark
sdf.columns = [1, 2, 3, 4, 5]
# AttributeError: can't set attribute

マッピングによって列名を変更するには、pandas では DataFrame.rename

# pandas
pdf.columns
# Int64Index([1, 2, 3, 4, 5], dtype='int64')

pdf = pdf.rename(columns={1:'SepalLength', 2:'SepalWidth', 3:'PetalLength', 4:'PetalWidth', 5:'Species'})
pdf
#      SepalLength  SepalWidth  PetalLength  PetalWidth    Species
# 0            5.1         3.5          1.4         0.2     setosa
# 1            4.9         3.0          1.4         0.2     setosa
# 2            4.7         3.2          1.3         0.2     setosa
# 3            4.6         3.1          1.5         0.2     setosa
# 4            5.0         3.6          1.4         0.2     setosa
# ..           ...         ...          ...         ...        ...
# 145          6.7         3.0          5.2         2.3  virginica
# 146          6.3         2.5          5.0         1.9  virginica
# 147          6.5         3.0          5.2         2.0  virginica
# 148          6.2         3.4          5.4         2.3  virginica
# 149          5.9         3.0          5.1         1.8  virginica
# 
# [150 rows x 5 columns]

PySpark では DataFrame.withColumnRenamed

# PySpark
sdf.withColumnRenamed('Species', 'xxx')
# DataFrame[SepalLength: double, SepalWidth: double, PetalLength: double,
#           PetalWidth: double, xxx: string]

補足 非破壊的な処理のため、反映には代入が必要。ここでは列名変更したくないので代入しない。

列名による列選択

pandas.DataFrame の列選択では 当該の列のデータを含む Series が返ってくる。

# pandas
pdf.Species
# 0         setosa
# 1         setosa
# 2         setosa
# 3         setosa
# 4         setosa
#          ...    
# 145    virginica
# 146    virginica
# 147    virginica
# 148    virginica
# 149    virginica
# Name: Species, dtype: object

pdf['Species']
# 略

PySpark では、列の属性を表現する Column インスタンスが返ってくる。

# PySpark
sdf.Species
# Column<Species>
sdf['Species']
# Column<Species>

pandasPySpark いずれも、文字列ではなくリストを渡せば その列を DataFrame としてスライシングする。

# pandas
pdf[['PetalWidth']]
#      PetalWidth
# 0           0.2
# 1           0.2
# 2           0.2
# 3           0.2
# 4           0.2
# ..          ...
# 145         2.3
# 146         1.9
# 147         2.0
# 148         2.3
# 149         1.8
# 
# [150 rows x 1 columns]

# PySpark
sdf[['PetalWidth']]
# DataFrame[PetalWidth: double]

# pandas
pdf[['PetalWidth', 'PetalLength']]
#      PetalWidth  PetalLength
# 0           0.2          1.4
# 1           0.2          1.4
# 2           0.2          1.3
# 3           0.2          1.5
# 4           0.2          1.4
# ..          ...          ...
# 145         2.3          5.2
# 146         1.9          5.0
# 147         2.0          5.2
# 148         2.3          5.4
# 149         1.8          5.1
# 
# [150 rows x 2 columns]

# PySpark
sdf[['PetalWidth', 'PetalLength']]
# DataFrame[PetalWidth: double, PetalLength: double]

PySpark では DataFrame.select でもよい。

# PySpark
sdf.select(sdf['PetalWidth'], sdf['PetalLength'])
# DataFrame[PetalWidth: double, PetalLength: double]
真偽値リストによる列選択

自分はあまり使わないのだが、R {dplyr} との比較という観点で。

indexer = [False, False, True, True, False]

# pandas
pdf.loc[:, indexer]
#      PetalLength  PetalWidth
# 0            1.4         0.2
# 1            1.4         0.2
# 2            1.3         0.2
# 3            1.5         0.2
# 4            1.4         0.2
# ..           ...         ...
# 145          5.2         2.3
# 146          5.0         1.9
# 147          5.2         2.0
# 148          5.4         2.3
# 149          5.1         1.8
# 
# [150 rows x 2 columns]

# PySpark
sdf[[c for c, i in zip(sdf.columns, indexer) if i is True]]
# DataFrame[PetalLength: double, PetalWidth: double]
列の属性による列選択

列の型が pandas の場合は floatPySpark の場合は double の列のみ取り出す。

# pandas
pdf.loc[:, pdf.dtypes == float]
#      SepalLength  SepalWidth  PetalLength  PetalWidth
# 0            5.1         3.5          1.4         0.2
# 1            4.9         3.0          1.4         0.2
# 2            4.7         3.2          1.3         0.2
# 3            4.6         3.1          1.5         0.2
# 4            5.0         3.6          1.4         0.2
# ..           ...         ...          ...         ...
# 145          6.7         3.0          5.2         2.3
# 146          6.3         2.5          5.0         1.9
# 147          6.5         3.0          5.2         2.0
# 148          6.2         3.4          5.4         2.3
# 149          5.9         3.0          5.1         1.8
# 
# [150 rows x 4 columns]

# PySpark
sdf[[c for c, type in sdf.dtypes if type == 'double']]
# DataFrame[SepalLength: double, SepalWidth: double, PetalLength: double,
#           PetalWidth: double]

行操作

値の条件による行選択
# pandas
pdf[pdf['Species'] == 'virginica']
#      SepalLength  SepalWidth  PetalLength  PetalWidth    Species
# 100          6.3         3.3          6.0         2.5  virginica
# 101          5.8         2.7          5.1         1.9  virginica
# 102          7.1         3.0          5.9         2.1  virginica
# 103          6.3         2.9          5.6         1.8  virginica
# 104          6.5         3.0          5.8         2.2  virginica
# ..           ...         ...          ...         ...        ...
# 145          6.7         3.0          5.2         2.3  virginica
# 146          6.3         2.5          5.0         1.9  virginica
# 147          6.5         3.0          5.2         2.0  virginica
# 148          6.2         3.4          5.4         2.3  virginica
# 149          5.9         3.0          5.1         1.8  virginica
# 
# [50 rows x 5 columns]

# PySpark
sdf[sdf['Species'] == 'virginica'].show()
# SepalLength SepalWidth PetalLength PetalWidth Species  
# 6.3         3.3        6.0         2.5        virginica
# 5.8         2.7        5.1         1.9        virginica
# 7.1         3.0        5.9         2.1        virginica
# 6.3         2.9        5.6         1.8        virginica
# ..          ...        ...         ...        ...
# 6.5         3.0        5.5         1.8        virginica
# 7.7         3.8        6.7         2.2        virginica
# 7.7         2.6        6.9         2.3        virginica
# 6.0         2.2        5.0         1.5        virginica

PySpark では DataFrame.filter でもよい。

# PySpark
sdf.filter(sdf['Species'] == 'virginica').show() 
# 略
行番号による行選択

PySpark では 冒頭の n 行を抽出したりはできるが、適当な行選択ではできない、、、と思う。PySpark でどうしてもやりたければ index にあたる列を追加 -> 列の値で選択。

4/27追記 DataFrame.collectRow のリストに変換 -> 再度 DataFrame 化すればスキーマ変更せずにできる、、、が、マスタでデータを処理することになるのでよい方法ではなさそう。

# pandas
pdf.loc[[2, 3, 4]]
#    SepalLength  SepalWidth  PetalLength  PetalWidth Species
# 2          4.7         3.2          1.3         0.2  setosa
# 3          4.6         3.1          1.5         0.2  setosa
# 4          5.0         3.6          1.4         0.2  setosa

# PySpark
[c for i, c in enumerate(sdf.collect()) if i in (2, 3 ,4)]
# [Row(SepalLength=4.7, SepalWidth=3.2, PetalLength=1.3, PetalWidth=0.2, Species=u'setosa'),
#  Row(SepalLength=4.6, SepalWidth=3.1, PetalLength=1.5, PetalWidth=0.2, Species=u'setosa'),
#  Row(SepalLength=5.0, SepalWidth=3.6, PetalLength=1.4, PetalWidth=0.2, Species=u'setosa')]
sqlContext.createDataFrame([c for i, c in enumerate(sdf.collect()) if i in (2, 3 ,4)]).show()
# SepalLength SepalWidth PetalLength PetalWidth Species
# 4.7         3.2        1.3         0.2        setosa 
# 4.6         3.1        1.5         0.2        setosa 
# 5.0         3.6        1.4         0.2        setosa 
ランダムサンプリング

pandas ではindex をサンプリングしてスライス。

import random

# pandas
pdf.loc[random.sample(pdf.index, 5)]
#      Sepal.Length  Sepal.Width  Petal.Length  Petal.Width     Species
# 64            6.1          2.9           4.7          1.4  versicolor
# 17            5.4          3.9           1.3          0.4      setosa
# 14            4.3          3.0           1.1          0.1      setosa
# 4             4.6          3.1           1.5          0.2      setosa
# 146           6.7          3.0           5.2          2.3   virginica

PySpark では DataFrame.sample。この関数、fraction の確率で行ごとに選択しているようで、サンプリングするたびに抽出される行数が変わる ( 行数指定でランダムサンプリング、という処理はできなさそう )。

# PySpark
sdf.sample(False, 5.0 / sdf.count()).show()
# SepalLength SepalWidth PetalLength PetalWidth Species   
# 5.4         3.4        1.7         0.2        setosa    
# 5.7         2.8        4.5         1.3        versicolor
# 6.2         2.8        4.8         1.8        virginica 
# 6.1         3.0        4.9         1.8        virginica 

補足 今後、pandas にもサンプリングメソッド DataFrame.sample が実装予定 #9666

列の追加

pandas では DataFrame に対して 直接 新しい列を追加できるが、PySpark ではできない。

# 破壊的な処理になるのでコピー
pdf_temp = pdf.copy()

# pandas
pdf_temp['PetalMult'] = pdf_temp['PetalWidth'] * pdf_temp['PetalLength']
pdf_temp
#      SepalLength  SepalWidth  PetalLength  PetalWidth    Species  PetalMult
# 0            5.1         3.5          1.4         0.2     setosa       0.28
# 1            4.9         3.0          1.4         0.2     setosa       0.28
# 2            4.7         3.2          1.3         0.2     setosa       0.26
# 3            4.6         3.1          1.5         0.2     setosa       0.30
# 4            5.0         3.6          1.4         0.2     setosa       0.28
# ..           ...         ...          ...         ...        ...        ...
# 145          6.7         3.0          5.2         2.3  virginica      11.96
# 146          6.3         2.5          5.0         1.9  virginica       9.50
# 147          6.5         3.0          5.2         2.0  virginica      10.40
# 148          6.2         3.4          5.4         2.3  virginica      12.42
# 149          5.9         3.0          5.1         1.8  virginica       9.18
# 
# [150 rows x 6 columns]

# PySpark
sdf['PetalMult'] = sdf['PetalWidth'] * sdf['PetalLength']
# TypeError: 'DataFrame' object does not support item assignment

PySpark での列追加は DataFrame.withColumn

# PySpark
sdf.withColumn('PetalMult', sdf.PetalWidth * sdf.PetalLength).show()
# SepalLength SepalWidth PetalLength PetalWidth Species PetalMult          
# 5.1         3.5        1.4         0.2        setosa  0.27999999999999997
# 4.9         3.0        1.4         0.2        setosa  0.27999999999999997
# 4.7         3.2        1.3         0.2        setosa  0.26               
# 4.6         3.1        1.5         0.2        setosa  0.30000000000000004
# ..          ..         ...         ...        ...     ...  
# 5.4         3.9        1.3         0.4        setosa  0.52               
# 5.1         3.5        1.4         0.3        setosa  0.42               
# 5.7         3.8        1.7         0.3        setosa  0.51               
# 5.1         3.8        1.5         0.3        setosa  0.44999999999999996

補足 pandas v0.16.0 以降では 類似のメソッド DataFrame.assign が追加。

# pandas
pdf.assign(PetalMult=pdf['PetalWidth'] * pdf['PetalLength'])
# 略

まとめ

PySparkpandas のデータ操作を整理した。

PySpark、上のような基本的な処理は pandas と似たやり方で直感的に使える感じだ。 大規模処理は PySpark、細かい取り回しが必要なものは pandas でうまく併用できるとよさそう。

4/29追記 続きはこちら。

sinhrks.hatenablog.com

Learning Spark: Lightning-Fast Big Data Analysis

Learning Spark: Lightning-Fast Big Data Analysis