Python Dask.Array で 並列 / Out-Of-Core 処理
この記事は Python Advent Calendar 2015 13 日目の記事です。
Python で手軽に並列 / Out-Of-Core 処理を行うためのパッケージである Dask
について書きたい。Dask
を使うと以下のようなメリットが得られる。
- 環境構築 / インストールが
pip
で簡単にできる - 手軽に並列処理ができる
- Out-Of-Core (メモリに乗らないデータ) 処理ができる
補足 Dask
は手持ちの PC の シングルコア / 物理メモリでは処理が少しきついかな、といった場合に利用するパッケージのため、より大規模 / 高速 / 安定した処理を行いたい場合には Hadoop や Spark を使ったほうがよい。
Dask
は以下 3 つのサブパッケージを持つ。
サブモジュール | ベースパッケージ |
---|---|
dask.array |
NumPy |
dask.bag |
PyToolz (list , set , dict に対する処理) |
dask.dataframe |
pandas |
うち dask.dataframe
については以前にエントリを書いた。
今回は NumPy
API のサブセットをもつ dask.array
について。dask.array
でも基本的な考え方 / 挙動は dask.dataFrame
と同じなので まず上のエントリを読んでください。
インストール
pip
で。
# pip install dask
基本的な操作
import numpy as np np.__version__ # '1.10.1' import dask dask.__version__ # '0.7.5' import dask.array as da
dask.DataFrame
はいくつかの行を chunk としてまとめて並列処理を行っていたが、dask.Array
は各 axis それぞれを chunk として分割することができる。以下では 4 行 4 列 の ndarray
を 2 行 2 列 計 4 つの chunk に分割する。
arr = np.array([[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12], [13, 14, 15, 16]]) arr # array([[ 1, 2, 3, 4], # [ 5, 6, 7, 8], # [ 9, 10, 11, 12], # [13, 14, 15, 16]]) darr = da.from_array(arr, chunks=2) darr # dask.array<from-ar..., shape=(4, 4), dtype=int64, chunksize=(2, 2)>
dask.Dataframe
と同じく、dask.Array
のメソッドを呼び出すことで 内部の Computational Graph を更新していく。評価 (計算の実行) を行うには .compute()
。また、.visualize()
で Computational Graph を描画することができる。
各行の合計を取る処理をみると、まずそれぞれの chunk ごとに 行の合計を計算 → その後 隣り合う chunk 同士の行の合計を取ることで最終的な出力を得ている。
darr.sum(axis=1) # dask.array<p_reduc..., shape=(4,), dtype=int64, chunksize=(2,)> darr.sum(axis=1).compute() # array([10, 26, 42, 58]) darr.sum(axis=1).visualize()
転置のように chunk の位置の入れ替えを伴う処理もできる。Computational Graph 中の四角形 (xxx, 0, 1)
は (0, 1) 番目の chunk であることを表す。
darr.T.compute() # array([[ 1, 5, 9, 13], # [ 2, 6, 10, 14], # [ 3, 7, 11, 15], # [ 4, 8, 12, 16]]) darr.T.visualize()
dask.Array
同士の演算もできる。式中では darr.min(axis=1)
が重複しているが、これらは Computational Graph 中でマージされ、評価時には 1 度だけ計算される。
((darr - darr.min(axis=1)) / (darr.max(axis=1) - darr.min(axis=1))).compute() # array([[ 0, -1, -2, -3], # [ 1, 0, -1, -2], # [ 2, 1, 0, -1], # [ 4, 3, 2, 1]]) ((darr - darr.min(axis=1)) / (darr.max(axis=1) - darr.min(axis=1))).visualize()
並列処理のメリット
各 chunk に対する処理は 自動的に並列化して実行されるため、ある程度データが大きい場合 / やりたい処理が並列で実行できる場合には速度メリットがある。以下では 長さ 1 億 の ndarray
の合計を取る処理で比較した ( EC2 c4.2xlarge を利用)。
# NumPy arr = np.arange(100000000) %timeit arr.sum() # 10 loops, best of 3: 76.5 ms per loop # Dask darr = da.from_array(arr, chunks=10000000) %timeit darr.sum().compute() # 10 loops, best of 3: 25.3 ms per loop
補足 NumPy
は主要な処理で GIL を解放しているため、Dask
での並列処理は threading
によって行われる。処理方法として multiprocessing
を指定することもできる。
Out-Of-Core 処理のメリット
Dask
では Computational Graph 中の各ノードを順に計算していくため、処理する全データが同時にメモリに乗っている必要がない。そのため、PC の物理メモリを超える大きさのデータも扱うことができる。以下のドキュメントでは複数の pkl
ファイルを chunk として読み込む方法が記載されている。
関連パッケージ
以下のパッケージでは バックグラウンド処理を dask.array
を利用して行うことができる。
Xray
: 多次元ラベルデータをpandas
のように処理できるパッケージ
まとめ
簡単に並列 / Out-Of-Core 処理を行うためのパッケージである Dask
のサブモジュールのうち NumPy
互換の dask.array
の使い方を記載した。
- 作者: Micha Gorelick,Ian Ozsvald,相川愛三
- 出版社/メーカー: オライリージャパン
- 発売日: 2015/11/20
- メディア: 大型本
- この商品を含むブログ (3件) を見る