StatsFragments

Python, R, Rust, 統計, 機械学習とか

Python Dask で 並列 DataFrame 処理

はじめに

先日のエントリで少し記載した Dask について、その使い方を書く。Dask を使うと、NumPypandasAPI を利用して並列計算/分散処理を行うことができる。また、Dask は Out-Of-Core (データ量が多くメモリに乗らない場合) の処理も考慮した実装になっている。

sinhrks.hatenablog.com

上にも書いたが、DaskNumPypandas を置き換えるものではない。数値計算のためのバックエンドとして NumPypandas を利用するため、むしろこれらのパッケージが必須である。

DaskNumPypandasAPI を完全にはサポートしていないため、並列 / Out-Of-Core 処理が必要な場面では Dask を、他では NumPy / pandas を使うのがよいと思う。pandasDask のデータはそれぞれ簡単に相互変換できる。

補足 とはいえ都度の変換は手間なので、pandas の処理実行時に Dask を利用するオプションをつける という検討はされている。

インストール

pip もしくは conda で。

pip install dask

準備

まずは必要なパッケージを import する。

import numpy as np
import pandas as pd
import dask.dataframe as dd

np.__version__
# '1.9.3'

pd.__version__
# '0.16.2'

# バージョン表示のためにインポート。
import dask
dask.__version__
# '0.7.1'

pandas から Dask への変換

サンプルデータは すでにメモリ上にある pd.DataFrame とする。

df = pd.DataFrame({'X': np.arange(10), 
                   'Y': np.arange(10, 20),
                   'Z': np.arange(20, 30)},
                  index=list('abcdefghij'))
df
#    X   Y   Z
# a  0  10  20
# b  1  11  21
# c  2  12  22
# d  3  13  23
# e  4  14  24
# f  5  15  25
# g  6  16  26
# h  7  17  27
# i  8  18  28
# j  9  19  29

pandas のデータ構造から Dask に変換するには dd.from_pandas。2つめの引数で データをいくつのパーティションに分割するかを指定している。結果は dask.dataframe.DataFrame ( dd.DataFrame ) となる。

divisions はデータがどこで分割されたかを示す。表示から、1つ目のパーティションには .index が "a" 〜 "e" までのデータが、2つ目のには "f" 〜 "j" までのデータが含まれていることがわかる。

重要 dd.DataFrame の処理全般について、行の順序は保証されない。各パーティションには divisions で示される .index を持つ行が含まれるが、パーティション内が常にソートされているとは限らない。

ddf = dd.from_pandas(df, 2)
ddf
# dd.DataFrame<from_pandas-b08addf72f0693a9fa1bb6c21d91f3d4, divisions=('a', 'f', 'j')>

# DataFrame の列名
ddf.columns
# ('X', 'Y', 'Z')

# DataFrame の index
ddf.index
# dd.Index<from_pandas-b08addf72f0693a9fa1bb6c21d91f3d4-index, divisions=('a', 'f', 'j')>

# DataFrame の divisions (パーティションの分割箇所)
ddf.divisions
# ('a', 'f', 'j')
 
# DataFrame のパーティション数
ddf.npartitions
# 2

dd.DataFrame からデータを取得する (計算処理を実行する) には .compute()。結果、元の pd.DataFrame が得られる。

ddf.compute()
#    X   Y   Z
# a  0  10  20
# b  1  11  21
# c  2  12  22
# d  3  13  23
# e  4  14  24
# f  5  15  25
# g  6  16  26
# h  7  17  27
# i  8  18  28
# j  9  19  29

内部処理

ここから、dd.DataFrame でどういった処理ができるのか、内部動作とあわせて記載する。といっても難しいことは全くやっていない。

まず、データ全体について 1 加算する処理を考える。これは 各パーティションごとに 1 加算して連結するのと同じ。

ddf + 1
# dd.DataFrame<elemwise-5b9ae0407254158903343113fac41421, divisions=('a', 'f', 'j')>

(ddf + 1).compute()
# 略

f:id:sinhrks:20150924210655p:plain

次に、列ごとの合計をとる処理。これは、各パーティションごとに列の合計をとって連結し、もう一度 合計をとる処理と同じ。

列の合計をとるため、結果は Series になる。ddf.sum() の時点では .compute() が呼ばれていないため実際の計算は行われていないが、Dask が結果の型 や divisions を推測し正しく表示している。

ddf.sum()
# dd.Series<dataframe-sum--second-7ba12c9d58c17f61406b84b6c30d7a26, divisions=(None, None)>

ddf.sum().compute()
# X     45
# Y    145
# Z    245
# dtype: int64

f:id:sinhrks:20150924210729p:plain

Dask ではこのような形で、計算処理をパーティションごとに並列 / Out-Of-Core 実行できる形に読み替えている。これらの処理は内部的には Computational Graph ( Dask Graph ) として表現され、.compute() によって実行される。

各処理の Dask Graph は、.visualize() メソッドを利用して確認できる。Graph 上で縦につながっていない処理同士は並列で実行できる。

ddf.sum().visualize()

f:id:sinhrks:20150924212007p:plain

各列の平均をとる場合、内部的には各列の .sum()と 各列の .count() をそれぞれ計算して除算。

ddf.mean().compute()
# X     4.5
# Y    14.5
# Z    24.5
# dtype: float64

ddf.mean().visualize()

f:id:sinhrks:20150924212112p:plain

DataFrame 同士の演算や、演算をチェインすることもできる。互いのパーティションが異なる場合はそれらが一致するよう調整が行われる。

((ddf - (ddf * 2)) == - ddf).visualize() 

f:id:sinhrks:20150924212136p:plain

また、累積関数 ( cumxxx ) や ウィンドウ関数 ( rolling_xxx ) なども利用できる。

ddf.cumsum().compute()
#     X    Y    Z
# a   0   10   20
# b   1   21   41
# c   3   33   63
# d   6   46   86
# e  10   60  110
# f  15   75  135
# g  21   91  161
# h  28  108  188
# i  36  126  216
# j  45  145  245

ddf.cumsum().visualize()

f:id:sinhrks:20150924213152p:plain

concat, join などの 連結 / 結合もできる。通常の演算と同じく、dd.DataFrame 同士のパーティションは適当に調整される。

df2 = pd.DataFrame({'A': np.arange(5), 
                    'B': np.arange(10, 15)},
                   index=list('adefg'))
df2
#    A   B
# a  0  10
# d  1  11
# e  2  12
# f  3  13
# g  4  14

ddf2 = dd.from_pandas(df2, 2)
ddf2
# dd.DataFrame<from_pandas-667963fc37e22688843f02da80df5963, divisions=('a', 'f', 'g')>

ddf.join(ddf2).compute()
#    X   Y   Z   A   B
# a  0  10  20   0  10
# b  1  11  21 NaN NaN
# c  2  12  22 NaN NaN
# d  3  13  23   1  11
# e  4  14  24   2  12
# f  5  15  25   3  13
# g  6  16  26   4  14
# h  7  17  27 NaN NaN
# i  8  18  28 NaN NaN
# j  9  19  29 NaN NaN

ddf.join(ddf2).visualize()

f:id:sinhrks:20150924214828p:plain

サポートされている処理の一覧は以下のAPIドキュメントを。一部利用できない引数が明記されていないが、次バージョンにて改訂

9/26 追記 処理結果については、行の順序以外は pandas の処理と一致するはず。例外は quantile のような percentile をとる処理。これらは Out-Of-Core 処理のための近似アルゴリズムを使っており、正確な値とずれることがある。

実データでの利用例

こちらが良エントリ (英語)。

まとめ

Dask を利用して DataFrame を並列処理する方法を記載した。手順は、

  • dd.from_pandas を利用して pd.DataFramedd.DataFrame へ変換。
  • 実行したいメソッド / 演算を dd.DataFrame に対して適用。
  • .compute() で計算を実行し、結果を取得する。計算処理は Dask にて自動的に並列化される。

最後、pandas 0.16.2 時点では並列処理による速度向上は大きくはない。これは Python の GIL (Global Interpreter Lock ) により並列実行できる処理が限定されているため。今月中にリリース予定の pandas 0.17.0 では いくつかの処理で Cython から明示的に GIL 解放するよう実装を変更しており、並列化による速度向上は大きくなる。

参考 Pandas Releasing the GIL