Python Dask で 並列 DataFrame 処理
はじめに
先日のエントリで少し記載した Dask について、その使い方を書く。Dask を使うと、NumPy や pandas の API を利用して並列計算/分散処理を行うことができる。また、Dask は Out-Of-Core (データ量が多くメモリに乗らない場合) の処理も考慮した実装になっている。
上にも書いたが、Daskは NumPy や pandas を置き換えるものではない。数値計算のためのバックエンドとして NumPy や pandas を利用するため、むしろこれらのパッケージが必須である。
Dask は NumPy や pandas の API を完全にはサポートしていないため、並列 / Out-Of-Core 処理が必要な場面では Dask を、他では NumPy / pandas を使うのがよいと思う。pandasとDask のデータはそれぞれ簡単に相互変換できる。
補足 とはいえ都度の変換は手間なので、pandas の処理実行時に Dask を利用するオプションをつける という検討はされている。
インストール
pip もしくは conda で。
pip install dask
準備
まずは必要なパッケージを import する。
import numpy as np import pandas as pd import dask.dataframe as dd np.__version__ # '1.9.3' pd.__version__ # '0.16.2' # バージョン表示のためにインポート。 import dask dask.__version__ # '0.7.1'
pandas から Dask への変換
サンプルデータは すでにメモリ上にある pd.DataFrame とする。
df = pd.DataFrame({'X': np.arange(10),
'Y': np.arange(10, 20),
'Z': np.arange(20, 30)},
index=list('abcdefghij'))
df
# X Y Z
# a 0 10 20
# b 1 11 21
# c 2 12 22
# d 3 13 23
# e 4 14 24
# f 5 15 25
# g 6 16 26
# h 7 17 27
# i 8 18 28
# j 9 19 29
pandas のデータ構造から Dask に変換するには dd.from_pandas。2つめの引数で データをいくつのパーティションに分割するかを指定している。結果は dask.dataframe.DataFrame ( dd.DataFrame ) となる。
divisions はデータがどこで分割されたかを示す。表示から、1つ目のパーティションには .index が "a" 〜 "e" までのデータが、2つ目のには "f" 〜 "j" までのデータが含まれていることがわかる。
重要 dd.DataFrame の処理全般について、行の順序は保証されない。各パーティションには divisions で示される .index を持つ行が含まれるが、パーティション内が常にソートされているとは限らない。
ddf = dd.from_pandas(df, 2) ddf # dd.DataFrame<from_pandas-b08addf72f0693a9fa1bb6c21d91f3d4, divisions=('a', 'f', 'j')> # DataFrame の列名 ddf.columns # ('X', 'Y', 'Z') # DataFrame の index ddf.index # dd.Index<from_pandas-b08addf72f0693a9fa1bb6c21d91f3d4-index, divisions=('a', 'f', 'j')> # DataFrame の divisions (パーティションの分割箇所) ddf.divisions # ('a', 'f', 'j') # DataFrame のパーティション数 ddf.npartitions # 2
dd.DataFrame からデータを取得する (計算処理を実行する) には .compute()。結果、元の pd.DataFrame が得られる。
ddf.compute() # X Y Z # a 0 10 20 # b 1 11 21 # c 2 12 22 # d 3 13 23 # e 4 14 24 # f 5 15 25 # g 6 16 26 # h 7 17 27 # i 8 18 28 # j 9 19 29
内部処理
ここから、dd.DataFrame でどういった処理ができるのか、内部動作とあわせて記載する。といっても難しいことは全くやっていない。
まず、データ全体について 1 加算する処理を考える。これは 各パーティションごとに 1 加算して連結するのと同じ。
ddf + 1 # dd.DataFrame<elemwise-5b9ae0407254158903343113fac41421, divisions=('a', 'f', 'j')> (ddf + 1).compute() # 略

次に、列ごとの合計をとる処理。これは、各パーティションごとに列の合計をとって連結し、もう一度 合計をとる処理と同じ。
列の合計をとるため、結果は Series になる。ddf.sum() の時点では .compute() が呼ばれていないため実際の計算は行われていないが、Dask が結果の型 や divisions を推測し正しく表示している。
ddf.sum() # dd.Series<dataframe-sum--second-7ba12c9d58c17f61406b84b6c30d7a26, divisions=(None, None)> ddf.sum().compute() # X 45 # Y 145 # Z 245 # dtype: int64

Dask ではこのような形で、計算処理をパーティションごとに並列 / Out-Of-Core 実行できる形に読み替えている。これらの処理は内部的には Computational Graph ( Dask Graph ) として表現され、.compute() によって実行される。
各処理の Dask Graph は、.visualize() メソッドを利用して確認できる。Graph 上で縦につながっていない処理同士は並列で実行できる。
ddf.sum().visualize()

各列の平均をとる場合、内部的には各列の .sum()と 各列の .count() をそれぞれ計算して除算。
ddf.mean().compute() # X 4.5 # Y 14.5 # Z 24.5 # dtype: float64 ddf.mean().visualize()

DataFrame 同士の演算や、演算をチェインすることもできる。互いのパーティションが異なる場合はそれらが一致するよう調整が行われる。
((ddf - (ddf * 2)) == - ddf).visualize()

また、累積関数 ( cumxxx ) や ウィンドウ関数 ( rolling_xxx ) なども利用できる。
ddf.cumsum().compute() # X Y Z # a 0 10 20 # b 1 21 41 # c 3 33 63 # d 6 46 86 # e 10 60 110 # f 15 75 135 # g 21 91 161 # h 28 108 188 # i 36 126 216 # j 45 145 245 ddf.cumsum().visualize()

concat, join などの 連結 / 結合もできる。通常の演算と同じく、dd.DataFrame 同士のパーティションは適当に調整される。
df2 = pd.DataFrame({'A': np.arange(5),
'B': np.arange(10, 15)},
index=list('adefg'))
df2
# A B
# a 0 10
# d 1 11
# e 2 12
# f 3 13
# g 4 14
ddf2 = dd.from_pandas(df2, 2)
ddf2
# dd.DataFrame<from_pandas-667963fc37e22688843f02da80df5963, divisions=('a', 'f', 'g')>
ddf.join(ddf2).compute()
# X Y Z A B
# a 0 10 20 0 10
# b 1 11 21 NaN NaN
# c 2 12 22 NaN NaN
# d 3 13 23 1 11
# e 4 14 24 2 12
# f 5 15 25 3 13
# g 6 16 26 4 14
# h 7 17 27 NaN NaN
# i 8 18 28 NaN NaN
# j 9 19 29 NaN NaN
ddf.join(ddf2).visualize()

サポートされている処理の一覧は以下のAPIドキュメントを。一部利用できない引数が明記されていないが、次バージョンにて改訂。
9/26 追記 処理結果については、行の順序以外は pandas の処理と一致するはず。例外は quantile のような percentile をとる処理。これらは Out-Of-Core 処理のための近似アルゴリズムを使っており、正確な値とずれることがある。
実データでの利用例
こちらが良エントリ (英語)。
- Analyzing Reddit Comments with Dask and Castra
- Out-of-Core Dataframes in Python: Dask and OpenStreetMap
まとめ
Dask を利用して DataFrame を並列処理する方法を記載した。手順は、
dd.from_pandasを利用してpd.DataFrameをdd.DataFrameへ変換。- 実行したいメソッド / 演算を
dd.DataFrameに対して適用。 .compute()で計算を実行し、結果を取得する。計算処理はDaskにて自動的に並列化される。
最後、pandas 0.16.2 時点では並列処理による速度向上は大きくはない。これは Python の GIL (Global Interpreter Lock ) により並列実行できる処理が限定されているため。今月中にリリース予定の pandas 0.17.0 では いくつかの処理で Cython から明示的に GIL 解放するよう実装を変更しており、並列化による速度向上は大きくなる。