StatsFragments

Python, R, Rust, 統計, 機械学習とか

Chainer + Dask で 並列 Deep Learning したい <1>

この記事は Chainer Advent Calendar 2015 17 日目の記事です。


はじめに

サイズが大きいデータを Deep Learning すると学習に時間がかかってつらい。時間がかかってつらいので並列処理して高速化したい。

並列化するのに良さそうなパッケージないかな? と探してみると、Dask という並列 / Out-Of-Core 計算パッケージを見つけた。これと Chainer を組み合わせると並列処理が簡単に書けそうな気がする。

最初は MNIST を並列化してみたが、データが小さすぎるせいか むしろ遅くなってしまった。もう少し大きいデータである CIFAR-10 を使い、より深いネットワーク構造でその効果を確かめたい。

最終的には以下二つの処理を並列化することを目指す。

  1. Data Augmentation
  2. DNN の学習

1. Data Augmentation

層の深い DNN をうまく学習させるため、学習データに対して クロッピング等の画像処理を行い学習データを増やす。予測も Augmentation した画像群に対して行いその平均をとる。具体的な処理は id:ultraist さんのエントリに詳しい。

ultraist.hatenablog.com

これは Dask を使って CPU 側で並列処理したい。Dask についてはこちらを。

補足 もっとも、学習データ/テストデータはそれぞれ前もって 1 度だけ Augmentation しておけばよいため、この並列化の重要度は低い。Daskは むしろ 2. DNN の学習の並列化をシンプルに書くために使う (予定)。

2. DNN の学習

Chainer を使って GPU 側で並列処理したい。Chainer のドキュメントにも記載されているが、学習を並列化する方法は大きく 2 種類ある。

2-1 Model Parallel ある DNN の処理ごとに異なる GPU を割り当てて並列化する。
2-2 Data Parallel ある DNN を複数のデータ/GPUで同時に学習させ、学習したパラメータをなんらかの方法で統合する。

2-1. Model Parallel は モデルごとに実装する必要がある。2-2. Data Parallel はある程度 汎用的に書けそうなので、今回は Data Parallel をやりたい。

2-2. Data Parallel での DNN の学習

Data Parallel にもいくつか方法がある。詳細は "確率的最適化 (機械学習プロフェッショナルシリーズ)" や PFI 岡野原さんの資料に記載されている。

確率的最適化 (機械学習プロフェッショナルシリーズ)

確率的最適化 (機械学習プロフェッショナルシリーズ)

Data Parallel について自分の理解を簡単に整理すると、

手法 概要
Parameter Mixture (単純平均) パラメータを各ノードで並列に最適化し、最後にパラメータの平均をとる。期待誤差が強凸 / 損失関数が十分滑らかなら上手くいくらしい。
Distributed Gradient (同期型・ミニバッチ法) 勾配の計算を各ノードで並列に行い、パラメータの更新は 1台のノードで行う。勾配計算のたびにパラメータの同期が発生する。Chainer のドキュメントに記載されているのはこの方式。
Asynchronous Update (非同期分散) 勾配の計算、パラメータ更新を各ノードで並列に行う。パラメータ更新時にロックしないため、古いパラメータを元に勾配計算 / 更新がされることがある。実装の一種である Hogwild! は スパースな問題を対象とし、パラメータ更新時に勾配が 0 でない部分だけを更新する。
Iterative Parameter Mixture パラメータを各ノードで並列に最適化する。特定の期間ごと (epochごと等) にパラメータの平均をとり、更新されたパラメータを元に各ノードで最適化を繰り返す。

目指す姿

トラブルや金銭的な課題 (EC2) がなければ、以下 5 ステップに分けてやってみるつもり。

1 データの準備 & Distributed Gradient での DNN 学習並列化 (←今回)
2 Iterative Parameter Mixture での DNN 学習並列化
3 Dask による Data Augmentation の並列化
4 ステップ 2 + ステップ 3 の処理を統合
5 blaze/distributed ( 旧 dask.distributed )によるノード間分散処理

利用環境

データのダウンロード

CIFAR-10 は以下サイトから入手できる。スクリプトでダウンロードする。

import numpy as np
np.__version__
# '1.10.2'

import os
import requests

fname = 'cifar-10-python.tar.gz'
datadir = 'data'

os.mkdir(datadir)

reader = requests.get('http://www.cs.toronto.edu/~kriz/{0}'.format(fname), stream=True)
with open(os.path.join(datadir, fname), 'wb') as f:
    for chunk in reader.iter_content(chunk_size=1024):
        if chunk:
            f.write(chunk)
            f.flush()

ダウンロードした tar ファイルを解凍する。以下のスクリプトを実行すると、 data/cifar-10-batches-py ディレクトリの下に 訓練用、テスト用のデータを含む pickle ファイルができる (拡張子はない)。

import tarfile
tf = tarfile.open(os.path.join(datadir, fname), 'r')
tf.extractall(datadir)

tarpath = tf.getnames()[0]
tarpath
# 'cifar-10-batches-py'

files = os.listdir(os.path.join(datadir, tarpath))
datafiles = [os.path.join(datadir, tarpath, f) for f in files if f.startswith('data')]
datafiles
# ['data/cifar-10-batches-py/data_batch_5',
#  'data/cifar-10-batches-py/data_batch_4',
#  'data/cifar-10-batches-py/data_batch_1',
#  'data/cifar-10-batches-py/data_batch_3',
#  'data/cifar-10-batches-py/data_batch_2']

データの準備

データは Dask で読み込む。もっとも、今回は学習時には Dask ではなく np.ndarray を直接使うので普通に読み込んでもいい ( 次回以降は Dask が活躍する予定...)。

まずは pickle ファイルから 画像データ / ラベルデータを取得する関数を定義する。

import dask
import dask.array as da
dask.__version__
# '0.7.5'

from six.moves import cPickle as pickle

def load(fn):
    # https://www.cs.toronto.edu/~kriz/cifar.html
    with open(os.path.join(datadir, tarpath, fn), 'rb') as f:
        result = pickle.load(f)
    return result

def load_data(fn):
    return load(fn)['data']

def load_labels(fn):
    # list から np.ndarray に変換
    return np.array(load(fn)['labels'])

これらを使って、訓練用データを読み込む Computational Graph を定義する。詳細はこちらのドキュメントを。

dsk_data = {('data', i, 0): (load_data, 'data_batch_{0}'.format(i + 1)) for i in range(5)}
dsk_data
# {('data', 0, 0): (<function __main__.load_data>, 'data_batch_1'),
#  ('data', 1, 0): (<function __main__.load_data>, 'data_batch_2'),
#  ('data', 2, 0): (<function __main__.load_data>, 'data_batch_3'),
#  ('data', 3, 0): (<function __main__.load_data>, 'data_batch_4'),
#  ('data', 4, 0): (<function __main__.load_data>, 'data_batch_5')}

data = da.Array(dsk_data, 'data', chunks=(10000, 3 * 32 * 32),
                dtype=np.float32, shape=(50000, 3 * 32 * 32))
data
# dask.array<data, shape=(50000, 3072), dtype=float32, chunksize=(10000, 3072)>

data.compute()
# array([[ 59,  43,  50, ..., 140,  84,  72],
#        [154, 126, 105, ..., 139, 142, 144],
#        [255, 253, 253, ...,  83,  83,  84],
#        ..., 
#        [ 35,  40,  42, ...,  77,  66,  50],
#        [189, 186, 185, ..., 169, 171, 171],
#        [229, 236, 234, ..., 173, 162, 161]], dtype=uint8)

ラベルについても同様。

dsk_labels = {('labels', i): (load_labels, 'data_batch_{0}'.format(i + 1)) for i in range(5)}
labels = da.Array(dsk_labels, 'labels', chunks=(10000, ), shape=(50000, ))
labels
# dask.array<labels, shape=(50000,), dtype=None, chunksize=(10000,)>

labels.compute()
# array([6, 9, 9, ..., 9, 1, 1])

テスト用データは np.ndarray として読み込んだ。

test_data = load_data('test_batch')
test_labels = np.array(load_labels('test_batch'))

データの確認

データの中身を確かめるため各画像を描画してみたい。各レコード中には (チャネル, 縦, 横) という次元に対応する画素が順に 1 次元に並んでいる。これを matplotlib で描画するため (縦, 横, チャネル) の 3 次元となるように reshapetranspose する。

nrows = 3
ncols = 10

images = data[0:nrows*ncols, :].compute()
images = images.reshape(nrows*ncols, 3, 32, 32).transpose(0, 2, 3, 1)
images[0].shape
# (32, 32, 3)

import matplotlib.pyplot as plt
fig, axes = plt.subplots(nrows, ncols, figsize=(10, 4))
for im, ax in zip(images, axes.flatten()):
    ax.imshow(im)
    ax.axis('off')

f:id:sinhrks:20151214181824p:plain

DNN の学習

Single GPU での学習

比較のため、まずは並列化せずに一つの GPU だけで学習 / テストがしたい。Chainer をインポートし、 CUDA, cuDNN が利用できることを確かめる。

import chainer 
chainer.__version__
# '1.5.1'

import chainer.cuda as cuda
cuda.available
# True

cuda.cudnn_enabled
# True

学習するモデルとしては、@mitmul さんが GitHub 上で公開されている CIFAR-10 用のモデルを使わせていただく。

github.com

今回は Data Augmentation をしておらずデータ数が少ないため、比較的単純なモデルである models/Cifar10.py を利用した。API は後の処理にあわせて少し変更している。

from chainer import optimizers
from chainer import Variable

import chainer.functions as F
import chainer.links as L

class Cifar10(chainer.Chain):

    def __init__(self):
        super(Cifar10, self).__init__(
            conv1=L.Convolution2D(3, 32, 5, stride=1, pad=2),
            conv2=L.Convolution2D(32, 32, 5, stride=1, pad=2),
            conv3=L.Convolution2D(32, 64, 5, stride=1, pad=2),
            fc4=F.Linear(1344, 4096),
            fc5=F.Linear(4096, 10),
        )

    def _internal_call(self, x, t, train=True):
        h = F.max_pooling_2d(F.relu(self.conv1(x)), 3, stride=2)
        h = F.max_pooling_2d(F.relu(self.conv2(h)), 3, stride=2)
        h = F.relu(self.conv3(h))
        h = F.spatial_pyramid_pooling_2d(h, 3, F.MaxPooling2D)
        h = F.dropout(F.relu(self.fc4(h)), ratio=0.5, train=train)
        h = self.fc5(h)
        return h

    def __call__(self, x, t):
        h = self._internal_call(x, t, train=True)
        self.loss = F.softmax_cross_entropy(h, t)
        self.accuracy = F.accuracy(h, t)
        return self.loss

    def predict(self, x, t):
        """ データに対する予測値 (ラベル) を返す """
        h = self._internal_call(x, t, train=False)
        self.pred = F.softmax(h)
        return self.pred.data.argmax(axis=1)

optimizer = optimizers.Adam(alpha=0.001)
model = Cifar10()

xp = cuda.cupy 
model.to_gpu()

optimizer.setup(model)
optimizer.target
# <__main__.Cifar10 at 0x7f7e9c586150>

ここで 訓練データ/テストデータを np.ndarray に変換する。

def data_transformer(x):
    # 各レコード 1 次元のデータを 3次元 (3, 32, 32) に変換する 
    return x.astype(np.float32).reshape(len(x), 3, 32, 32) 

def labels_transformer(y):
    return y.astype(np.int32)

tr_data = data_transformer(data.compute())
tr_labels = labels_transformer(labels.compute())
te_data = data_transformer(test_data)
te_labels = labels_transformer(test_labels)

学習 / テストを行う。ログの出力など実行に不要な箇所は省略した。

def run_epoch(optimizer, model, data, labels, train=True, batch_size=100):

    sum_accuracy = 0
    sum_loss = 0
    total = len(labels)
    
    if train:
        indexer = np.random.permutation(total)
        data = data[indexer]
        labels = labels[indexer]

    for index in range(0, total, batch_size):
        x = data[index:index+batch_size]
        t = labels[index:index+batch_size]

        volatile = 'off' if train else 'on'    
        x = Variable(xp.asarray(x), volatile=volatile)
        t = Variable(xp.asarray(t), volatile=volatile)

        if train:
            optimizer.update(model, x, t)
            sum_loss += float(model.loss.data) * len(t)
            sum_accuracy += float(model.accuracy.data) * len(t)
        else:
            predicted = model.predict(x, t)
            acc = float((predicted == t.data).sum())
            sum_loss = np.nan
            sum_accuracy += acc

    return sum_accuracy, sum_loss


for epoch in range(0, 20):
    
    print('******************** Epoch {:02d} (Train) ********************'.format(epoch + 1))
    acc, loss = run_epoch(optimizer, model, tr_data, tr_labels, train=True)
  
    print('******************** Epoch {:02d} (Test) *********************'.format(epoch + 1))
    acc, loss = run_epoch(optimizer, model, te_data, te_labels, train=False)

出力されたログを上に添付した。最左列は学習開始からの時間 (秒) である。20 Epoch 経過時点で 355 秒、テストデータに対して 60 % 程度の精度だった。

# ******************** Epoch 01 (Train) ********************
#    4.091:    10000/50000 loss=10.164 acc=0.123
#    7.329:    20000/50000 loss=6.162  acc=0.150
#   10.570:    30000/50000 loss=4.788  acc=0.177
#   13.812:    40000/50000 loss=4.064  acc=0.205
#   17.056:    50000/50000 loss=3.612  acc=0.231
# ******************** Epoch 01 (Test) *********************
#   18.140:    10000/10000 loss=nan    acc=0.380
#
#      ...              ...             ...           ...
#
# ******************** Epoch 20 (Train) ********************
#  340.570:    10000/50000 loss=0.993  acc=0.650
#  343.832:    20000/50000 loss=0.993  acc=0.650
#  347.095:    30000/50000 loss=1.001  acc=0.650
#  350.350:    40000/50000 loss=1.005  acc=0.650
#  353.611:    50000/50000 loss=1.014  acc=0.649
# ******************** Epoch 20 (Test) *********************
#  354.691:    10000/10000 loss=nan    acc=0.621

Multi GPU での学習 (Distributed Gradient)

上に記載のとおり、今回は Distributed Gradient (同期型・ミニバッチ法) を利用した並列学習を行いたい。勾配計算は各 GPU で行い、パラメータの更新は 1 つの GPU で行う。

実装は Chainer のドキュメントに記載されている内容をなぞればよい。利用するメソッドの概要を整理する。

メソッド 概要
Link.zerograds() Link に含まれる各 Variable が持つ勾配 ._grad を 0 で埋める。
Link.__call__() 最適化する Variable を返す。Link を継承したクラスで定義する。
Variable.backward() Variable から誤差逆伝播を行う
Link.addgrads(link) 引数の Link に含まれる Variable の勾配を自身の対応する Variable に加算する。
Optimizer.update() (引数なしで呼ばれた場合) 計算済みの勾配をもとに、モデルの Variable を更新する。実装は GradientMethod.update() 中にある。
Link.copyparams() 引数の Link に含まれる Variable の値を自身の対応する Variable にコピーする。

とはいえ、ドキュメントの書き方だと GPU 数が増えると少し手間だ。簡単な wrapper を作り、 Distributed Gradient を以下のように定型的に書けるようにしたい。

    with model as m:       # 各 GPU 上のモデルで .zerograds() を実行
        m(x, t)            # 各 GPU 上のモデルで .__call__() を実行
                           # 各 GPU 上のモデルで .backward() を実行し、
                           # 計算された勾配を .addgrads() でマスターに加算
    optimizer.update()
    model.syncparams()     # マスターのパラメータを .copyparams() で各 GPU 上のモデルにコピー

そのための wrapper として GPUDistributedGradient クラスを定義する。

class GPUDistributedGradient(chainer.Chain):

    def __init__(self, chain, ngpus):

        if isinstance(ngpus, int):
            ngpus = list(range(ngpus))

        self.ngpus = ngpus
        self._chains = [chain.copy().to_gpu(i) for i in ngpus]
    
    @property
    def master(self):
        """ 最初の GPU にあるモデルをマスターとする """
        return self._chains[0]
    
    def __call__(self, *args):
        """ 引数毎に 'Variable のリスト' を渡す"""
        self._losses = [chain(*arg) for chain, arg in zip(self._chains, zip(*args))]
        return self._losses[0]
    
    def predict(self, x, t):
        """ 予測値はマスターで計算し、(まだ) 並列化しない """
        return self.master.predict(x, t)
    
    def zerograds(self):
        for chain in self._chains:
            chain.zerograds()

    def backward(self):
        for loss in self._losses:
            loss.backward()
            
    def syncgrads(self):
        """ 各 GPU からの勾配をマスターに加算する """
        for chain in self._chains[1:]:
            self.master.addgrads(chain)
    
    def __enter__(self):
        self.zerograds()
        return self
    
    def __exit__(self, exception_type, exception_value, traceback):
        self.backward()
        self.syncgrads()
        return True

    def syncparams(self):
        """ マスターのパラメータを他 GPU のモデルにコピーする """
        for chain in self._chains[1:]:
            chain.copyparams(self.master)

上で作成したクラスを利用して、並列化したいモデルを wrap する。optimizer へは マスターとなるモデル model.master を渡す。

NGPU = 4

optimizer = optimizers.Adam(alpha=0.001)
model = Cifar10()
model = GPUDistributedGradient(model, NGPU)

optimizer.setup(model.master)
optimizer.target
# <__main__.Cifar10 at 0x7f7e9c6ede10>

run_epoch を書き換えて実行してみる。run_epoch の呼び出しは先ほどと同じため省略する。

def run_epoch(optimizer, model, data, labels, train=True, batch_size=100):

    sum_accuracy = 0
    sum_loss = 0
    total = len(labels)

    if train:
        indexer = np.random.permutation(total)
        data = data[indexer]
        labels = labels[indexer]
        batch_size = batch_size * NGPU

    for index in range(0, total, batch_size):
        x = data[index:index+batch_size]
        t = labels[index:index+batch_size]

        if train:
            # train 時には 利用する GPU と同じ長さの Variable のリストを渡す
            x = [Variable(cuda.to_gpu(v, i)) for i, v in enumerate(np.array_split(x, NGPU))]
            t = [Variable(cuda.to_gpu(v, i)) for i, v in enumerate(np.array_split(t, NGPU))]
        else: 
            x = Variable(xp.asarray(x), volatile='on')
            t = Variable(xp.asarray(t), volatile='on')

        if train:
            
            with model as m:
                m(x, t)
            optimizer.update()
            model.syncparams()   

            n = sum([len(_t) for _t in t])
            # マスターで計算された loss / acc をレコード数倍して近似
            sum_loss += float(model.master.loss.data) * n
            sum_accuracy += float(model.master.accuracy.data) * n
        else:
            predicted = model.predict(x, t)
            acc = float((predicted == t.data).sum())
            sum_loss = np.nan
            sum_accuracy += acc

    return sum_accuracy, sum_loss

Single GPU の場合と同じくログを添付する。20 Epoch までの処理時間は、並列化なしでの 355 秒に対し 並列化した場合は 277 秒と 20% 程度 速くなっている。また、テストデータに対する精度も並列化なしの場合と同程度で問題はなさそうだ。

# ******************** Epoch 01 (Train) ********************
#    2.870:    10000/50000 loss=1.438  acc=0.479
#    5.439:    20000/50000 loss=1.431  acc=0.480
#    8.008:    30000/50000 loss=1.417  acc=0.488
#   10.595:    40000/50000 loss=1.416  acc=0.491
#   13.155:    50000/50000 loss=1.405  acc=0.492
# ******************** Epoch 01 (Test) *********************
#   14.243:    10000/10000 loss=nan        acc=0.517
#
#      ...              ...             ...           ...
#
# ******************** Epoch 20 (Train) ********************
#  265.769:    10000/50000 loss=0.498  acc=0.824
#  268.249:    20000/50000 loss=0.509  acc=0.821
#  270.738:    30000/50000 loss=0.518  acc=0.818
#  273.230:    40000/50000 loss=0.524  acc=0.814
#  275.717:    50000/50000 loss=0.528  acc=0.813
# ******************** Epoch 20 (Test) *********************
#  276.785:    10000/10000 loss=nan        acc=0.634

処理中に watch nvidia-smi すると 全 GPU が利用されていることが確かめられる。眺めていると GPU 1 〜 3 の利用率はしばしば 0% になっている (パラメータ更新のため)。

$ watch nvidia-smi

f:id:sinhrks:20151215204312p:plain

GPU の利用率が少ないのは モデルが単純だった / バッチサイズが小さかったせいだと思う。このあたりを適切に設定すればパフォーマンス向上の余地がありそうだ。

結果のプロット

縦軸をテストデータにおける精度、横軸を Epoch の経過 ( 100 Ticks で 1 Epoch ) としてグラフを描いた。

f:id:sinhrks:20151216205908p:plain

同じく、横軸に経過時間 (秒) として学習の過程をプロットした。訓練データの情報も追加した。形が異なるのは Multi-GPU で訓練データへの精度を正確に計算していないためかと思う。こうしてみると、テスト部分は並列化しなくてもあまり影響なさそうだ。

f:id:sinhrks:20151216205915p:plain

まとめ

Chainer のドキュメントの手順に従い、DNN の学習を 4 GPU / Distributed Gradient で並列化した。次は Dask の処理も使って Iterative Parameter Mixture をやりたい。

1 データの準備 & Distributed Gradient での DNN 学習並列化 (←済み)
2 Iterative Parameter Mixture での DNN 学習並列化 (←次回)
3 Dask による Data Augmentation の並列化
4 ステップ 2 + ステップ 3 の処理を統合
5 blaze/distributed ( 旧 dask.distributed )によるノード間分散処理

利用したスクリプトは以下のリポジトリに置いた。Jupyter Notebook なので GitHub 上でレンダリングして確認することができる。

何かおかしいことやっていたらご指摘ください。

github.com

深層学習 (機械学習プロフェッショナルシリーズ)

深層学習 (機械学習プロフェッショナルシリーズ)

確率的最適化 (機械学習プロフェッショナルシリーズ)

確率的最適化 (機械学習プロフェッショナルシリーズ)