StatsFragments

Python, R, Rust, 統計, 機械学習とか

Python Dask.Array で 並列 / Out-Of-Core 処理

この記事は Python Advent Calendar 2015 13 日目の記事です。


Python で手軽に並列 / Out-Of-Core 処理を行うためのパッケージである Dask について書きたい。Dask を使うと以下のようなメリットが得られる。

  • 環境構築 / インストールが pip で簡単にできる
  • 手軽に並列処理ができる
  • Out-Of-Core (メモリに乗らないデータ) 処理ができる

補足 Dask は手持ちの PC の シングルコア / 物理メモリでは処理が少しきついかな、といった場合に利用するパッケージのため、より大規模 / 高速 / 安定した処理を行いたい場合には Hadoop や Spark を使ったほうがよい。

Dask は以下 3 つのサブパッケージを持つ。

サブモジュール ベースパッケージ
dask.array NumPy
dask.bag PyToolz (list, set, dict に対する処理)
dask.dataframe pandas

うち dask.dataframe については以前にエントリを書いた。

今回は NumPy API のサブセットをもつ dask.array について。dask.array でも基本的な考え方 / 挙動は dask.dataFrame と同じなので まず上のエントリを読んでください。

インストール

pip で。

# pip install dask

基本的な操作

import numpy as np
np.__version__
# '1.10.1'

import dask
dask.__version__
# '0.7.5'

import dask.array as da

dask.DataFrame はいくつかの行を chunk としてまとめて並列処理を行っていたが、dask.Array は各 axis それぞれを chunk として分割することができる。以下では 4 行 4 列 の ndarray を 2 行 2 列 計 4 つの chunk に分割する。

arr = np.array([[1, 2, 3, 4], [5, 6, 7, 8],
                [9, 10, 11, 12], [13, 14, 15, 16]])
arr
# array([[ 1,  2,  3,  4],
#        [ 5,  6,  7,  8],
#        [ 9, 10, 11, 12],
#        [13, 14, 15, 16]])

darr = da.from_array(arr, chunks=2)
darr
# dask.array<from-ar..., shape=(4, 4), dtype=int64, chunksize=(2, 2)>

dask.Dataframe と同じく、dask.Arrayメソッドを呼び出すことで 内部の Computational Graph を更新していく。評価 (計算の実行) を行うには .compute()。また、.visualize() で Computational Graph を描画することができる。

各行の合計を取る処理をみると、まずそれぞれの chunk ごとに 行の合計を計算 → その後 隣り合う chunk 同士の行の合計を取ることで最終的な出力を得ている。

darr.sum(axis=1)
# dask.array<p_reduc..., shape=(4,), dtype=int64, chunksize=(2,)>

darr.sum(axis=1).compute()
# array([10, 26, 42, 58])

darr.sum(axis=1).visualize()

f:id:sinhrks:20151213180352p:plain

転置のように chunk の位置の入れ替えを伴う処理もできる。Computational Graph 中の四角形 (xxx, 0, 1) は (0, 1) 番目の chunk であることを表す。

darr.T.compute()
# array([[ 1,  5,  9, 13],
#        [ 2,  6, 10, 14],
#        [ 3,  7, 11, 15],
#        [ 4,  8, 12, 16]])

darr.T.visualize()

f:id:sinhrks:20151213111615p:plain

dask.Array 同士の演算もできる。式中では darr.min(axis=1) が重複しているが、これらは Computational Graph 中でマージされ、評価時には 1 度だけ計算される。

((darr - darr.min(axis=1)) / (darr.max(axis=1) - darr.min(axis=1))).compute()
# array([[ 0, -1, -2, -3],
#        [ 1,  0, -1, -2],
#        [ 2,  1,  0, -1],
#        [ 4,  3,  2,  1]])

((darr - darr.min(axis=1)) / (darr.max(axis=1) - darr.min(axis=1))).visualize()

f:id:sinhrks:20151213113555p:plain

並列処理のメリット

各 chunk に対する処理は 自動的に並列化して実行されるため、ある程度データが大きい場合 / やりたい処理が並列で実行できる場合には速度メリットがある。以下では 長さ 1 億 の ndarray の合計を取る処理で比較した ( EC2 c4.2xlarge を利用)。

# NumPy
arr = np.arange(100000000)
%timeit arr.sum()
# 10 loops, best of 3: 76.5 ms per loop

# Dask
darr = da.from_array(arr, chunks=10000000)
%timeit darr.sum().compute()
# 10 loops, best of 3: 25.3 ms per loop

補足 NumPy は主要な処理で GIL を解放しているため、Dask での並列処理は threading によって行われる。処理方法として multiprocessing を指定することもできる。

Out-Of-Core 処理のメリット

Dask では Computational Graph 中の各ノードを順に計算していくため、処理する全データが同時にメモリに乗っている必要がない。そのため、PC の物理メモリを超える大きさのデータも扱うことができる。以下のドキュメントでは複数pkl ファイルを chunk として読み込む方法が記載されている。

関連パッケージ

以下のパッケージでは バックグラウンド処理を dask.array を利用して行うことができる。

まとめ

簡単に並列 / Out-Of-Core 処理を行うためのパッケージである Dask のサブモジュールのうち NumPy 互換の dask.array の使い方を記載した。

ハイパフォーマンスPython

ハイパフォーマンスPython