StatsFragments

Python, R, Rust, 統計, 機械学習とか

簡単な集約/変換処理を PySpark & pandas の DataFrame で行う

こちらの続き。

sinhrks.hatenablog.com

準備

サンプルデータは iris 。今回は HDFScsv を置き、そこから読み取って DataFrame を作成する。

# HDFS にディレクトリを作成しファイルを置く
$ hadoop fs -mkdir /data/
$ hadoop fs -put iris.csv /data/
$ hadoop fs -ls /
Found 1 items
drwxr-xr-x   - ec2-user supergroup          0 2015-04-28 20:01 /data

# Spark のパスに移動
$ echo $SPARK_HOME
/usr/local/spark
$ cd $SPARK_HOME
$ pwd
/usr/local/spark
$ bin/pyspark

補足 前回同様に pandas から直接 PySparkDataFrame を作成した場合、groupBy 時に java.lang.OutOfMemoryError: Java heap space エラーが発生してシェルごと落ちる。

CSV ファイルの読み込み

pandas では前回同様 read_csv

import numpy as np
import pandas as pd
# 表示する行数を設定
pd.options.display.max_rows=10

names = ['SepalLength', 'SepalWidth', 'PetalLength', 'PetalWidth', 'Species']

# pandas
pdf = pd.read_csv('~/iris.csv', header=None, names=names)
pdf 
# 略

PySpark は標準では csv から直接 DataFrame を作成できないため、一度 Row のリストを作成して DataFrame に変換する。

from pyspark.sql import Row
lines = sc.textFile("hdfs://127.0.0.1:9000/data/iris.csv")
cells = lines.map(lambda l: l.split(","))
rows = cells.map(lambda x: Row(SepalLength=float(x[0]), SepalWidth=float(x[1]),
                               PetalLength=float(x[2]), PetalWidth=float(x[3]),
                               Species=x[4]))
sdf = sqlContext.createDataFrame(rows)
sdf.show()
# 略

グルーピング/集約

ある列の値ごとに集計

pandas, PySpark で多少 文法は異なる。

列の値でグループ分けし、一列の合計を取得する場合:
# pandas 
pdf.groupby('Species')['SepalLength'].sum()
# Species
# setosa        250.3
# versicolor    296.8
# virginica     329.4
# Name: SepalLength, dtype: float64

# PySpark
sdf.groupBy('Species').sum('SepalLength').show()
# Species    SUM(SepalLength)  
# virginica  329.3999999999999 
# versicolor 296.8             
# setosa     250.29999999999998
指定した複数列の合計を取得する場合:
# pandas
pdf.groupby('Species')[['PetalWidth', 'PetalLength']].sum()
#             PetalWidth  PetalLength
# Species                            
# setosa            12.2         73.2
# versicolor        66.3        213.0
# virginica        101.3        277.6

# PySpark
sdf.groupBy('Species').sum('PetalWidth', 'PetalLength').show()
# Species    SUM(PetalWidth)    SUM(PetalLength)  
# virginica  101.29999999999998 277.59999999999997
# versicolor 66.30000000000001  213.0             
# setosa     12.199999999999996 73.2  
全列の合計を取得する場合:
# pandas
pdf.groupby('Species').sum()
#             SepalLength  SepalWidth  PetalLength  PetalWidth
# Species                                                     
# setosa            250.3       170.9         73.2        12.2
# versicolor        296.8       138.5        213.0        66.3
# virginica         329.4       148.7        277.6       101.3

# PySpark
sdf.groupBy('Species').sum().show()
# Species    SUM(PetalLength)   SUM(PetalWidth)    SUM(SepalLength)   SUM(SepalWidth)   
# virginica  277.59999999999997 101.29999999999998 329.3999999999999  148.7             
# versicolor 213.0              66.30000000000001  296.8              138.5             
# setosa     73.2               12.199999999999996 250.29999999999998 170.90000000000003

補足 pandas では グループ化したデータも DataFrame と同じようにスライシングできたりする。 一方、PySparkGroupedData は集約系のAPI しか持っていない。

# pandas
pdf.groupby('Species')['PetalWidth']
# <pandas.core.groupby.SeriesGroupBy object at 0x7f62f4218d50>

# PySpark (NG!)
sdf.groupBy('Species')[['Species']]
# TypeError: 'GroupedData' object has no attribute '__getitem__'
sdf.groupBy('Species').select('PetalWidth')
# AttributeError: 'GroupedData' object has no attribute 'select'

また、pandas では apply で自作の集約関数 (UDAF) を利用することができるが、PySpark 1.3.1 時点 では非対応らしい。PySparkudf を利用して定義した自作関数を集約時に使うと以下のエラーになる。

# pandas
pdf.groupby('Species')[['PetalWidth', 'PetalLength']].apply(np.sum)
#             PetalWidth  PetalLength
# Species                            
# setosa            12.2         73.2
# versicolor        66.3        213.0
# virginica        101.3        277.6

# PySpark (NG!)
import pyspark.sql.functions
np_sum = pyspark.sql.functions.udf(np.sum, pyspark.sql.types.FloatType())
sdf.groupBy('Species').agg(np_sum(sdf.PetalWidth))
# py4j.protocol.Py4JJavaError: An error occurred while calling o334.agg.
# : org.apache.spark.sql.AnalysisException: expression 'pythonUDF' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() if you don't care which value you get.;

行持ち / 列持ち変換

複数列持ちの値を行持ちに展開 (unpivot / melt)

pandas では pd.meltDataFrame.melt ではないので注意。

# pandas
pmelted = pd.melt(pdf, id_vars=['Species'], var_name='variable', value_name='value')
pmelted
#        Species     variable  value
# 0       setosa  SepalLength    5.1
# 1       setosa  SepalLength    4.9
# 2       setosa  SepalLength    4.7
# 3       setosa  SepalLength    4.6
# 4       setosa  SepalLength    5.0
# ..         ...          ...    ...
# 595  virginica   PetalWidth    2.3
# 596  virginica   PetalWidth    1.9
# 597  virginica   PetalWidth    2.0
# 598  virginica   PetalWidth    2.3
# 599  virginica   PetalWidth    1.8
# 
# [600 rows x 3 columns]

同様の処理を PySpark でやるには、DataFrame.flatMap。1行の入力に対して複数行 (この例では4行) のデータを返すことができる。fratMap の返り値は RDD インスタンスになるため、必要なら再度 DataFrame 化する。

# PySpark

def mapper(row):
    return [Row(Species=row[4], variable='PetalLength', value=row[0]),
            Row(Species=row[4], variable='PetalWidth', value=row[1]),
            Row(Species=row[4], variable='SepalLength', value=row[2]),
            Row(Species=row[4], variable='SepalWidth', value=row[3])] 

smelted = sqlContext.createDataFrame(sdf.flatMap(mapper))
smelted.show()
# Species value variable   
# setosa  1.4   PetalLength
# setosa  0.2   PetalWidth 
# setosa  5.1   SepalLength
# setosa  3.5   SepalWidth 
# ...     ..    ...
# setosa  1.4   PetalLength
# setosa  0.2   PetalWidth 
# setosa  5.0   SepalLength
# setosa  3.6   SepalWidth 
smelted.count()
# 600L

複数行持ちの値を列持ちに変換 (pivot)

pandas では DataFrame.pivot。pivotするデータは列にする値 (以下では Species ) と行にする値 (以下では variable ) の組がユニークになっている必要がある。そのため、まず pivot 用データを作成 -> その後 pivot する。

# pandas

# pivot 用データを作成
punpivot = pmelted.groupby(['Species', 'variable']).sum()
punpivot = punpivot.reset_index()
punpivot
#        Species     variable  value
# 0       setosa  PetalLength   73.2
# 1       setosa   PetalWidth   12.2
# 2       setosa  SepalLength  250.3
# 3       setosa   SepalWidth  170.9
# 4   versicolor  PetalLength  213.0
# ..         ...          ...    ...
# 7   versicolor   SepalWidth  138.5
# 8    virginica  PetalLength  277.6
# 9    virginica   PetalWidth  101.3
# 10   virginica  SepalLength  329.4
# 11   virginica   SepalWidth  148.7
# 
# [12 rows x 3 columns]

# pivot
punpivot.pivot(index='variable', columns='Species', values='value')
# Species      setosa  versicolor  virginica
# variable                                  
# PetalLength    73.2       213.0      277.6
# PetalWidth     12.2        66.3      101.3
# SepalLength   250.3       296.8      329.4
# SepalWidth    170.9       138.5      148.7

PySparkDataFrame のままでは同じ処理はできないようなので、一度 RDD に変換してから、 groupBy -> map

# PySpark

# pivot 用データを作成 
sunpivot = smelted.groupBy('Species', 'variable').sum()
sunpivot.show()
# Species    variable    SUM(value)        
# versicolor SepalWidth  138.5             
# versicolor SepalLength 296.8             
# setosa     PetalLength 73.2              
# virginica  PetalWidth  101.29999999999998
# versicolor PetalWidth  66.30000000000001 
# setosa     SepalWidth  170.90000000000003
# virginica  PetalLength 277.59999999999997
# setosa     SepalLength 250.29999999999998
# versicolor PetalLength 213.0             
# setosa     PetalWidth  12.199999999999996
# virginica  SepalWidth  148.7             
# virginica  SepalLength 329.3999999999999 
 
def reducer(obj):
    # variable : value の辞書を作成
    result = {o[1]:o[2] for o in obj[1]}
    return Row(Species=obj[0], **result)

# pivot
spivot = sunpivot.rdd.groupBy(lambda x: x[0]).map(reducer)
spivot.collect()
# [Row(PetalLength=277.59999999999997, PetalWidth=101.29999999999998, SepalLength=329.3999999999999, SepalWidth=148.7, Species=u'virginica'),
#  Row(PetalLength=73.2, PetalWidth=12.199999999999996, SepalLength=250.29999999999998, SepalWidth=170.90000000000003, Species=u'setosa'),
#  Row(PetalLength=213.0, PetalWidth=66.30000000000001, SepalLength=296.8, SepalWidth=138.5, Species=u'versicolor')]

sqlContext.createDataFrame(spivot).show()
# PetalLength        PetalWidth         SepalLength        SepalWidth         Species   
# 277.59999999999997 101.29999999999998 329.3999999999999  148.7              virginica 
# 73.2               12.199999999999996 250.29999999999998 170.90000000000003 setosa    
# 213.0              66.30000000000001  296.8              138.5              versicolor

列の分割 / 結合

列の値を複数列に分割

ある列の値を適当に文字列処理して、新しい列を作成したい。pandas には 文字列処理用のアクセサがあるため、 assign と組み合わせて以下のように書ける。

# pandas
psplitted = pmelted.assign(Parts=pmelted['variable'].str.slice(0, 5),
                           Scale=pmelted['variable'].str.slice(5))
psplitted
#        Species     variable  value  Parts   Scale
# 0       setosa  SepalLength    5.1  Sepal  Length
# 1       setosa  SepalLength    4.9  Sepal  Length
# 2       setosa  SepalLength    4.7  Sepal  Length
# 3       setosa  SepalLength    4.6  Sepal  Length
# 4       setosa  SepalLength    5.0  Sepal  Length
# ..         ...          ...    ...    ...     ...
# 595  virginica   PetalWidth    2.3  Petal   Width
# 596  virginica   PetalWidth    1.9  Petal   Width
# 597  virginica   PetalWidth    2.0  Petal   Width
# 598  virginica   PetalWidth    2.3  Petal   Width
# 599  virginica   PetalWidth    1.8  Petal   Width
# 
# [600 rows x 5 columns]

PySparkには上記のようなメソッドはないので map で処理する。

# PySpark
def splitter(row):
    parts = row[2][:5]
    scale = row[2][5:]
    return Row(Species=row[0], value=row[1], Parts=parts, Scale=scale)

ssplitted = sqlContext.createDataFrame(smelted.map(splitter))
ssplitted.show()
# Parts Scale  Species value
# Petal Length setosa  1.4  
# Petal Width  setosa  0.2  
# Sepal Length setosa  5.1  
# Sepal Width  setosa  3.5  
# Petal Length setosa  1.4  
# ..    ..     ...     ..
# Petal Length setosa  1.4  
# Petal Width  setosa  0.2  
# Sepal Length setosa  5.0  
# Sepal Width  setosa  3.6  

複数列の値を一列に結合

pandas では普通に文字列結合すればよい。

# pandas
psplitted['variable2'] = psplitted['Parts'] + psplitted['Scale']
psplitted
#        Species     variable  value  Parts   Scale    variable2
# 0       setosa  SepalLength    5.1  Sepal  Length  SepalLength
# 1       setosa  SepalLength    4.9  Sepal  Length  SepalLength
# 2       setosa  SepalLength    4.7  Sepal  Length  SepalLength
# 3       setosa  SepalLength    4.6  Sepal  Length  SepalLength
# 4       setosa  SepalLength    5.0  Sepal  Length  SepalLength
# ..         ...          ...    ...    ...     ...          ...
# 595  virginica   PetalWidth    2.3  Petal   Width   PetalWidth
# 596  virginica   PetalWidth    1.9  Petal   Width   PetalWidth
# 597  virginica   PetalWidth    2.0  Petal   Width   PetalWidth
# 598  virginica   PetalWidth    2.3  Petal   Width   PetalWidth
# 599  virginica   PetalWidth    1.8  Petal   Width   PetalWidth
# 
# [600 rows x 6 columns]

PySpark では map

# PySpark
def unite(row):
    return Row(Species=row[2], value=row[3], variable=row[0] + row[1])

sqlContext.createDataFrame(splitted.map(unite)).show()

# Species value variable   
# setosa  1.4   PetalLength
# setosa  0.2   PetalWidth 
# setosa  5.1   SepalLength
# setosa  3.5   SepalWidth 
# ..      ..    ..
# setosa  1.4   PetalLength
# setosa  0.2   PetalWidth 
# setosa  5.0   SepalLength
# setosa  3.6   SepalWidth 

補足 withColumn の場合、オペレータは 数値の演算として扱われてしまうようなのでここでは使えない。

# PySpark (NG!)
ssplitted.withColumn('variable', splitted.Parts + splitted.Scale).show()
# Parts Scale  Species value variable
# Petal Length setosa  1.4   null    
# Petal Width  setosa  0.2   null    
# ..    ..     ..      ..    ..

まとめ

PySparkpandas のデータ集約/変形処理を整理した。

データ分析用途で利用したい場合、(ごく当たり前だが) データ量が少なく手元でさっといろいろ試したい場合は pandas、データ量が比較的多く 単純な処理を全体にかけたい場合は Spark がよい。

Spark は map 系の API が充実するとさらに使いやすくなりそうだ。が、小回りの効く文法/機能が充実していくことは考えにくいので 完全に Spark だけでデータ分析をする、、という状態には将来もならないのではないかと思う。小さいデータは pandas 使いましょう。

Learning Spark: Lightning-Fast Big Data Analysis

Learning Spark: Lightning-Fast Big Data Analysis