Python simpy による離散イベントシミュレーション
この記事は Python Advent Calendar 2014 の14日目の記事です。
この記事では、離散イベントシミュレーション用の Python パッケージ simpy
について書きたい。simpy
の現在のバージョンは 3.0.5、イニシャルリリースは 2002 年とかなり歴史のあるパッケージだ。
この simpy
、けっこう面白いなーと思っていて、自分は 毎朝 起きるたび、今日は simpy
について書かれたブログあるかな?と探しては 裏切られる日々を数年つづけてきた。もう誰かにこんな思いはさせたくない、、、こうなったら自分で書くしかない。
離散イベントシミュレーション (Discrete event simulation) とは
離散イベントシミュレーションの説明はこちらがよくまとまっている。
簡単にいうと 発生が離散的なイベントをモデル化 / シミュレーションするもの。例えば、
- 小売店のレジ: 顧客は1列に並び、3台あるレジのうち先に空いたもので会計を行う。レジの個数をひとつ増やすと、顧客の平均待ち時間はどのように変化するか?
- 工場の生産工程: ある製品をいくつかの連続する工程で作業している。各工程の処理時間がそれぞれ別の確率分布に従ってばらついているとき、時間あたりの生産量を安定させるためにカイゼンすべきなのはどの工程か?
- Webサーバ応答: クライアントからの処理は 多重化されたアプリケーションサーバ / DBにて逐次処理される。応答時間を 一定以下にしたいとき、それぞれ何台ずつ設置すればよいか?
いずれも、数学的には 待ち行列としてモデル化される。
インストール
まずはパッケージのインストール。pip
でインストールできるのでカンタン。
pip install simpy
最初のシミュレーション: 等速直線運動
SimPy in 10 Minutes — SimPy 3.0.5 documentation
をベースに自動車の例でいく。まずモデル化するのは 1台の自動車での等速直線運動。条件は、
- 1 台の自動車がある
- 自動車は 時速 72 km = 秒速 20 m で移動している
- t 秒経過したとき、自動車はどの位置にあるか?
というものにする。これを simpy
で シミュレーションするには、以下のような手順で処理を書いていけばよい。
- シミュレーション環境
simpy.Environment
を作る - 単位時間あたりの処理を行う関数 (プロセスと呼ぶ) を generator として書く。返り値は、次にこのプロセスを呼ぶまでのタイムアウトを表す
simpy.Environment.timeout
インスタンス。 - 2で作成した generator を、
simpy.Environment.process
を使ってシミュレーション環境に配置する simpy.Environment.run
を使ってシミュレーション環境を実行する。単位時間が経過するたび、2で作成したプロセスが呼ばれる
# おまじない from __future__ import unicode_literals, division import simpy import numpy as np # 1. シミュレーション環境を作成 env = simpy.Environment() def car(env): # 時速 72 km = 秒速 20 m velocity = 72 / 3600 * 1000 # 現在位置 location = 0.0 # プロセス実行時に行われる処理 while True: print('現在時刻 {0:2d} 位置: {1} m'.format(env.now, location)) location += velocity # 次にプロセスを実行するまでのタイムアウト (ここでは毎回実行するため 1 を返す) yield env.timeout(1) # 2. シミュレーション対象のプロセス (generator) を作成 car(env) # <generator object car at 0x10c333640> # 3. シミュレーション環境に プロセスを追加 env.process(car(env)) # 4. シミュレーション環境の実行 (15単位時間分) env.run(until=15) # 現在時刻 0 位置: 0.0 m # 現在時刻 1 位置: 20.0 m # 現在時刻 2 位置: 40.0 m # 現在時刻 3 位置: 60.0 m # 現在時刻 4 位置: 80.0 m # 現在時刻 5 位置: 100.0 m # 現在時刻 6 位置: 120.0 m # 現在時刻 7 位置: 140.0 m # 現在時刻 8 位置: 160.0 m # 現在時刻 9 位置: 180.0 m # 現在時刻 10 位置: 200.0 m # 現在時刻 11 位置: 220.0 m # 現在時刻 12 位置: 240.0 m # 現在時刻 13 位置: 260.0 m # 現在時刻 14 位置: 280.0 m
シミュレーション環境における経過単位時間は simpy.Environment.now
で取得できる。この例では この単位時間 を 1 秒としてみている。単位時間が経過するたびにプロセスとして定義した generator が呼ばれ、自動車の現在位置がアップデートされているのがわかる。
2つ目のシミュレーション: ランダムな速度変化
最初の例では「これsimpy
使う必要ないよね?」という感想を抱いていただけたと思う。
2 番目の例では自動車の速度が正規分布に従う / 速度が適当な単位時間で変化するものにしてみる。条件は、
ここからは Car
をクラスとして定義する。自身のシミュレーションを実行するプロセス (generator) を返すメソッドは Car.run
。
class Car(object): # 時速が更新される単位時間 step = 5 def __init__(self, env, mean, std): # シミュレーション環境への参照 self.env = env # 時速の平均値 self.mean = mean # 時速の標準偏差 self.std = std # 現在の時速 self.velocity = 0.0 # 現在位置 self.location = 0.0 # 直前の位置 self.prev_location = 0.0 def update_velocity(self): # 現在時速を更新 # 現在時速は 平均 mean, 標準偏差 std の正規分布に従う v = np.random.normal(self.mean, self.std) if v < 0: v = 0 return v def update_location(self): # 現在位置を更新 self.prev_location = self.location self.location += self.velocity / 3600 * 1000 return self.location def run(self): # 自身のプロセス (generator) を返すメソッド while True: if env.now % self.step == 0: # step が経過するたび、現在時速を更新 (乱数をふりなおす) self.velocity = self.update_velocity() form = '現在時刻 {0:2d} 位置: {1:.1f} m 時速 {2:.1f} km' print(form.format(self.env.now, self.location, self.velocity)) self.update_location() yield self.env.timeout(1) np.random.seed(1) # 1. シミュレーション環境を作成 env = simpy.Environment() # Car インスタンスを平均時速 72 km、標準偏差 10 km で作成 c = Car(env, mean=72, std=10) # 2. シミュレーション対象のプロセス (generator) を作成 # 3. シミュレーション環境に プロセスを追加 env.process(c.run()) # 4. シミュレーション環境の実行 (100単位時間分) env.run(until=100) # 現在時刻 0 位置: 0.0 m 時速 88.2 km # 現在時刻 1 位置: 24.5 m 時速 88.2 km # 現在時刻 2 位置: 49.0 m 時速 88.2 km # 現在時刻 3 位置: 73.5 m 時速 88.2 km # 現在時刻 4 位置: 98.0 m 時速 88.2 km # 現在時刻 5 位置: 122.6 m 時速 65.9 km # 現在時刻 6 位置: 140.9 m 時速 65.9 km # ..... # 現在時刻 93 位置: 1814.6 m 時速 72.4 km # 現在時刻 94 位置: 1834.7 m 時速 72.4 km # 現在時刻 95 位置: 1854.9 m 時速 77.8 km # 現在時刻 96 位置: 1876.5 m 時速 77.8 km # 現在時刻 97 位置: 1898.1 m 時速 77.8 km # 現在時刻 98 位置: 1919.7 m 時速 77.8 km # 現在時刻 99 位置: 1941.3 m 時速 77.8 km
出力をみると 5 単位時間ごとに時速が変化し、時速にあわせて現在位置が更新されていくことがわかる。
3つ目のシミュレーション: 複数台でのシミュレーション
ここからは自動車が複数台ある場合のシミュレーションを行う。
- 5 台の自動車がある。最初の車間距離はそれぞれ 20 m である
- 自動車の時速は、それぞれ 平均 72 km、標準偏差 10 km の正規分布に従う。この速度は 5 単位時間ごとに変化する
- 車線は 1 車線であり、前の車を追い越すことはできない
- t 秒経過したとき、自動車はそれぞれどの位置にあるか?
まず、上で定義した Car
クラスを継承し、"前の車を追い抜けないロジック" を加えた Car2
クラスを定義する。Car2
は、進みたい位置の手前に 別の車が存在する場合 車間距離 1 m まで近づくが追い越さない。ちょっと車間距離 詰めすぎだが、まあ例なので。
class Car2(Car): def __init__(self, env, number, mean, std, fdist=20, ahead=None): super(Car2, self).__init__(env, mean, std) # 車の番号 self.number = number # 自分の前にいる Car への参照 self.ahead = ahead # fdist は初期化時の車間距離 self.location = - number * fdist def run(self): while True: if env.now % self.step == 0: self.velocity = self.update_velocity() form = '現在時刻 {0:2d} 番号 {1} 位置: {2:.1f} m 時速 {3:.1f} km' message = form.format(self.env.now, self.number, self.location, self.velocity) self.update_location() # 前方に車が存在するときは、前の車の位置 -1 m まで詰める if self.ahead is not None: if self.location >= self.ahead.location - 1: self.location = self.ahead.location - 1 # 前の車によってブロックされたときには アスタリスクを表示 message += ' *' print(message) yield self.env.timeout(1) @property def actual_velocity(self): # 現在地、直前の位置をもとに、実際に進めた距離から時速を計算 v = (self.location - self.prev_location) * 3600 / 1000 return v
また、シミュレーション環境の条件をいろいろ変えるため、シミュレーション環境の初期化用の関数 init_env
を作る。
def init_env(env, num_cars=5, mean=72, std=10, fdist=20): cars = [] prev = None # num_cars で指定された数だけ Car2 インスタンスを作成 for i in range(num_cars): c = Car2(env, number=i, mean=mean, std=std, fdist=fdist, ahead=prev) env.process(c.run()) prev = c cars.append(c) return env, cars
いざ実行。
各自動車は、自分の前にいる車のせいで進みたい位置まで進めなかった場合にメッセージの末尾に "*" を表示する。最初は車間距離があるため それぞれの時速に応じた位置まで進めているが、シミュレーションが進むに従って 車間が狭まってくる = 渋滞が発生していることがわかる。
np.random.seed(1) env = simpy.Environment() env, cars = init_env(env, num_cars=5, std=50) env.run(until=200) # 現在時刻 0 番号 0 位置: 0.0 m 時速 88.2 km # 現在時刻 0 番号 1 位置: -20.0 m 時速 65.9 km # 現在時刻 0 番号 2 位置: -40.0 m 時速 66.7 km # 現在時刻 0 番号 3 位置: -60.0 m 時速 61.3 km # 現在時刻 0 番号 4 位置: -80.0 m 時速 80.7 km # 現在時刻 1 番号 0 位置: 24.5 m 時速 88.2 km # 現在時刻 1 番号 1 位置: -1.7 m 時速 65.9 km # 現在時刻 1 番号 2 位置: -21.5 m 時速 66.7 km # 現在時刻 1 番号 3 位置: -43.0 m 時速 61.3 km # 現在時刻 1 番号 4 位置: -57.6 m 時速 80.7 km # 現在時刻 2 番号 0 位置: 49.0 m 時速 88.2 km # 現在時刻 2 番号 1 位置: 16.6 m 時速 65.9 km # 現在時刻 2 番号 2 位置: -2.9 m 時速 66.7 km # 現在時刻 2 番号 3 位置: -26.0 m 時速 61.3 km # 現在時刻 2 番号 4 位置: -35.2 m 時速 80.7 km # 現在時刻 3 番号 0 位置: 73.5 m 時速 88.2 km # 現在時刻 3 番号 1 位置: 34.9 m 時速 65.9 km # ..... # 現在時刻 197 番号 2 位置: 3773.7 m 時速 76.2 km * # 現在時刻 197 番号 3 位置: 3772.7 m 時速 80.1 km * # 現在時刻 197 番号 4 位置: 3750.1 m 時速 82.4 km # 現在時刻 198 番号 0 位置: 3791.2 m 時速 55.7 km # 現在時刻 198 番号 1 位置: 3790.2 m 時速 78.0 km * # 現在時刻 198 番号 2 位置: 3789.2 m 時速 76.2 km * # 現在時刻 198 番号 3 位置: 3788.2 m 時速 80.1 km * # 現在時刻 198 番号 4 位置: 3773.0 m 時速 82.4 km # 現在時刻 199 番号 0 位置: 3806.7 m 時速 55.7 km # 現在時刻 199 番号 1 位置: 3805.7 m 時速 78.0 km * # 現在時刻 199 番号 2 位置: 3804.7 m 時速 76.2 km * # 現在時刻 199 番号 3 位置: 3803.7 m 時速 80.1 km * # 現在時刻 199 番号 4 位置: 3796.0 m 時速 82.4 km *
4つ目のシミュレーション: 車間距離の効果
上の例での動きは、こちらの本にある ASEP (Asymmetric Simple Exclusion Process / 非対称単純排除過程) を模したもの。これは自分の前が空いている時だけ前に進める、という過程。ちゃんと知りたい方は以下の書籍 ならびに スライドを。
- 作者: 西成活裕
- 出版社/メーカー: 新潮社
- 発売日: 2006/09/21
- メディア: 単行本
- 購入: 8人 クリック: 100回
- この商品を含むブログ (145件) を見る
http://www.silvertwilight.gr.jp/bsj/Past_Activity_files/NIshinari_20090131.pdf
こちらによると、各自動車間の車間距離が十分にあいている場合には 前の車にあわせてブレーキをかけることが減るので 渋滞軽減の効果があるようだ。
ということで車間距離の初期値を それぞれ 20m, 60m とした 2 グループで比較してみる。今回のモデルで変更できるのは 車間距離の初期値だけ (走行中の車間距離を調整する / 前の車の時速が後ろに伝播するようなロジックは入ってない) だが、雰囲気はつかめるのではないかと。
np.random.seed(1) env = simpy.Environment() # 1グループ目: 車間距離の初期値 20m で 20 台 env, cars1 = init_env(env, num_cars=20, fdist=20) # 2グループ目: 車間距離の初期値 60m で 20 台 env, cars2 = init_env(env, num_cars=20, fdist=60) # 描画用の figure, axes を作成 fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(7, 4)) ax1.set_ylim(0, 80) ax2.set_xlim(-1000, 2000) ax2.xaxis.set_visible(False) ax2.yaxis.set_visible(False) objs = [] velocities1 = [] velocities2 = [] # 200 単位時間 x 40 台分のループ for i in range(200 * 40): if i % 40 == 0: # 各グループの時速平均値を求める velocities1.append(np.mean([c.actual_velocity for c in cars1])) velocities2.append(np.mean([c.actual_velocity for c in cars2])) # 折れ線グラフを描画 l1 = ax1.plot(range(len(velocities1)), velocities1, color='#c92b2b') l2 = ax1.plot(range(len(velocities2)), velocities2, color='#005ec4') # 各グループの位置を描画 pt1 = ax2.scatter([c.location for c in cars1], [2] * len(cars1), color='#c92b2b') pt2 = ax2.scatter([c.location for c in cars2], [1] * len(cars2), color='#005ec4') objs.append(tuple(l1) + tuple(l2) + (pt1, pt2)) # シミュレーション環境を 1 ステップ進める (1単位時間でなく、次のプロセスが実行されるまで進める) env.step() ax1.legend([l1[0], l2[0]], ['車間初期値20m', '車間初期値60m'], loc=4) # アニメーション設定 ani = animation.ArtistAnimation(fig, objs, interval=1, repeat=False) plt.show()
上のグラフは、単位時間の経過を横軸 / 各グループの平均時速を縦軸としたもの。下は 各自動車の位置を横軸にプロットしたもの。
車間距離の初期値が小さいグループ (赤) は 前の車に追いつきやすい = 速度が殺されやすい。結果、グループの平均時速は小さくなってしまう。
全体として 車間距離の初期値が大きいグループの方が平均時速は高めと言えそうだ。後半あまり差がないのは、どちらのグループでも 車間距離が詰まってしまって差がでにくくなったからかな、、、。
その他のシミュレーション
公式ドキュメントにはさらに複雑なシミュレーションが Example として載っている。上には入れられなかったが、特定のオブジェクトによってリソースが占有される形での 待ち行列も simpy.Resource
を使えば比較的 簡単にモデル化できる。
Examples — SimPy 3.0.5 documentation
まとめ
simpy
を使うと、複数のオブジェクトが相互に影響するような離散イベント / 待ち行列を 比較的簡単にモデル化できる。特に、一部のプロセスの挙動を変えた場合の動きをシミュレーションしたい、という場合は非常に便利。