StatsFragments

Python, R, Rust, 統計, 機械学習とか

{flexclust} + DTW で 時系列を k-means クラスタリングする

概要

下の記事のつづき。下の記事では DTW (Dynamic Time Warping) 距離を使って階層的クラスタリングを行った。続けて、 DTW 距離を使って 非階層的クラスタリング (k-means法) を試してみる。

stats::kmeans では任意の距離関数を利用することはできないため、任意の距離関数が利用できる {flexclust} というパッケージを使うことにした。

補足 DTW についてはこちら。

動的時間伸縮法 / DTW (Dynamic Time Warping) を可視化する - StatsFragments

インストール

install.packages('flexclust')
library(flexclust)

{flexclust} の使い方

サンプルデータは前回記事と同一。 まずは既定 (ユークリッド距離) で k-means してみる。サンプルデータは行持ちでわたす必要があるので転置する。

クラスタリングした結果は TSclust::cluster.evaluation を使って正分類率を算出して評価する。そのため、 {TSclust} もロードする。

library(TSclust)

# クラスタ数は 3
res = kcca(t(data), 3)
cluster.evaluation(rep(c(1, 2, 3), rep(N, 3)), res@cluster)
# [1] 0.7705387

任意の距離関数で k-means する場合は、kccafamily オプションで、 kccaFamily インスタンス化した距離関数を渡してやればよい。ここでは、既定の方法と同じ flexclust::distEuclidean を距離関数として渡してみる。

res = kcca(t(data), 3, family = kccaFamily(dist = distEuclidean))
cluster.evaluation(rep(c(1, 2, 3), rep(N, 3)), res@cluster)
# [1] 0.7705387

補足 ここではたまたま 1回目 / 2回目の正分類率が同じだったが、乱数 / 中心点の初期値によっては正分類率が異なることもありうる。

{flexclust} に任意の距離関数を渡す

DTW を使って k-means するためには、上記の distEuclidean を参考にして距離関数を定義してやればよさそうだ。どのような定義であればよいのか調べるため、flexclust::distEuclidean の実装をみてみる。引数と返り値は、

  • x: 入力データ (次元は データ数 x データの長さ)
  • centers : 現在のクラスタの中心点 (次元は クラスタ数 x データの長さ)
  • z: 各入力データと各クラスタの距離 (次元は データ数 x クラスタ数)
distEucledean
# function (x, centers) 
# {
#     if (ncol(x) != ncol(centers)) 
#         stop(sQuote("x"), " and ", sQuote("centers"), " must have the same number of columns")
#     z <- matrix(0, nrow = nrow(x), ncol = nrow(centers))
#     for (k in 1:nrow(centers)) {
#         z[, k] <- sqrt(colSums((t(x) - centers[k, ])^2))
#     }
#     z
# }
# <environment: namespace:flexclust>

# x として渡ってくるもの (データ数 x データの長さ の行列)
# サンプルの場合、実際は 60行目 / 20列目までデータあり
         [,1]       [,2]       [,3]       [,4]     [,5]      [,6]     [,7]     [,8]      [,9]
U1 -0.6327521  2.8410958  3.4529034 3.76949256 4.306223 5.2242329 1.305026 3.493067 8.0427642
U2 -0.9945734  3.4368117 -0.9907990 2.60606620 1.453351 0.8575501 2.541472 1.869034 0.6731607
U3  0.7014436  0.9778474  0.5219079 3.13321606 3.984465 3.8435608 3.707534 4.780511 7.7889760

# center として渡ってくるもの (クラスタ数 x 系列の長さ の行列)
# サンプルの場合、実際は 20列目までデータあり
           [,1]       [,2]      [,3]      [,4]       [,5]       [,6]       [,7]      [,8]
D3   0.62595969  1.1235099  1.592674 -0.949247 -0.8132157 -2.0273653 -4.4877420 -5.137003
N17 -0.05983237  0.2078591  1.261035  1.826098  1.4504397  2.3426289  3.7686407  3.463603
N4  -1.52153133 -2.0982198 -1.930059 -4.379938 -2.9796999 -0.9948817  0.4933508 -3.084777

# z として返すべきもの (データ数 x クラスタ数 の行列)
# サンプルの場合、実際は 60行目 までデータあり
         [,1]     [,2]     [,3]
[1,] 69.75362 16.51187 44.71353
[2,] 71.45904 18.28978 45.20475
[3,] 57.58485  9.22195 32.93481
[4,] 74.50569 21.34163 48.72438
[5,] 74.53808 20.40498 49.45025
[6,] 66.04604 14.45167 40.60733

形式がわかったので、 TSclust::diss.DTWARP を利用して同様のロジックを作成する。

distDTWARP <- function (x, centers) {
  if (ncol(x) != ncol(centers)) 
    stop(sQuote("x"), " and ", sQuote("centers"), " must have the same number of columns")
  z <- matrix(0, nrow = nrow(x), ncol = nrow(centers))
  for (k in 1:nrow(centers)) {
    z[, k] <- apply(x, 1, function(x) diss.DTWARP(x, centers[k, ]))
  }
  z
}

いざ実行。

res_dtw = kcca(t(data), 3, family = kccaFamily(dist = distDTWARP))

、、、。

、、、。

、、、反応ないな、、、。

動いていることを信じて処理時間を計ってみる。

res_dtw = kcca(t(data), 3, family = kccaFamily(dist = distDTWARP))

#     ユーザ    システム        経過  
#  1142.323     44.924   1306.208 

cluster.evaluation(rep(c(1, 2, 3), rep(N, 3)), res_dtw@cluster)
# [1] 0.7653292

なんだこれは、、、。遅いし あんま結果もよくない。

{flexclust} に任意のクラスタ中心点更新関数を渡す

調べてみると、どうも kccaFamily に原因がありそうだ。 kccaFamily には 距離関数 dist のほかに、クラスタ中心点の更新に使う関数 cent が渡せる。ここで、dist には関数を指定 / cent には何も指定しない場合、kccaFamily では以下の関数 centOptim を使ってクラスタ中心点を更新する。DTW は 普通の距離関数に比べると計算コストが高いため、この optim での最適化に非常に時間がかかっていたようだ。

centOptim <- function (x, dist) {
    foo <- function(p) sum(dist(x, matrix(p, nrow = 1)))
    optim(colMeans(x), foo)$par
}

クラスタ中心点の更新処理については ユークリッド距離の最小化 or クラスタの中央値で行うようにしてみる。感覚的には、これらの方法で更新されたクラスタの中心点を使った場合も、クラスタ内の各要素 / 中心点の DTW 距離は小さくなるはずだ。

# ユークリッド距離最小となるように中心点更新
cent <- function(x) centOptim(x, dist = distEuclidean)
res_dtw_ceuc <- kcca(t(data), 3, family = kccaFamily(dist = distDTWARP, cent = cent))

#    ユーザ    システム        経過  
#    3.374      0.118      3.942 

cluster.evaluation(rep(c(1, 2, 3), rep(N, 3)), res_dtw_ceuc@cluster)
# [1] 0.7638889

# クラスタの中央値で中心点更新
cent <- function(x) apply(x, 2, median)
res_dtw_cmed <- kcca(t(data), 3, family = kccaFamily(dist = distDTWARP, cent = cent))

#    ユーザ    システム        経過  
#     5.926      0.209      7.039 

cluster.evaluation(rep(c(1, 2, 3), rep(N, 3)), res_dtw_cmed@cluster)
# [1] 0.7638889

試してみると、処理時間はかなり短くなった。

比較

k-means 法でのクラスタリングは初期値の影響を受けるため、100回ほどループさせて比較してみた。 比較したのは 以下の3手法

横軸に正分類率 / 縦軸に頻度として頻度分布を描いてみた。ベストな値は DTW (Euclidean) で出ていたが、顕著な差があるようには見えない。ループ回数を増やせば / 元データがちょっと違えば逆転しそうである。

ただ、DTW (Euclidean) の結果はばらつきが少ないようには見える。

f:id:sinhrks:20141121073027p:plain

まとめ

  • {flexclust} を使って DTW 距離で k-means してみた。その際、クラスタ中心点の更新を既定 ( DTW距離 + optim ) で行うのは処理速度の点で現実的でない。クラスタ中心点の更新を ユークリッド距離 or 中央値で行うことで処理速度は改善する。しかし、上記の方法/データではユークリッド距離での k-meansと比較して 劇的な差はみられなかった。

  • 確かめられていないのは、

補足

こちらの本に DTW での k-meansというトピック (時系列ではなく手書き文字についてだが) があるようだ。高い、、、。

Developing Concepts in Applied Intelligence (Studies in Computational Intelligence)

Developing Concepts in Applied Intelligence (Studies in Computational Intelligence)

  • 作者: Kishan G. Mehrotra,Chilukuri Mohan,Jae C. Oh,Pramod K. Varshney,Moonis Ali
  • 出版社/メーカー: Springer
  • 発売日: 2011/06/10
  • メディア: ハードカバー
  • この商品を含むブログを見る

2015/06/03 追記: 直近でブクマいただいていたので補足を。DTW は ユークリッド距離と比べると計算量が多いため、 k-means のように距離計算を複数回 繰り返すアルゴリズムとはあまり相性がよくない。今回、自分はトレンドによって時系列を分類したかったのだが、最終的には上昇/下降など 解釈可能なパターンをいくつか教師データとして用意して 最近傍法を使って分類した (距離計算の回数を減らし、かつ 解釈できるパターンに分類する必要があったため)。

pandas でメモリに乗らない 大容量ファイルを上手に扱う

概要

分析のためにデータ集めしていると、たまに マジか!? と思うサイズの CSV に出くわすことがある。なぜこんなに育つまで放っておいたのか、、、? このエントリでは普通には開けないサイズの CSVpandas を使ってうまいこと処理する方法をまとめたい。

サンプルデータ

たまには実データ使おう、ということで WorldBankから GDPデータを落とす。以下のページ右上の "DOWNLOAD DATA" ボタンで CSV を選択し、ローカルに zip を保存する。解凍した "ny.gdp.mktp.cd_Indicator_en_csv_v2.csv" ファイルをサンプルとして使う。

http://data.worldbank.org/indicator/NY.GDP.MKTP.CD?page=1

補足 pandasRemote Data Access で WorldBank のデータは直接 落っことせるが、今回は ローカルに保存した csv を読み取りたいという設定で。

chunksize を使って ファイルを分割して読み込む

まず、pandas で普通に CSV を読む場合は以下のように pd.read_csv を使う。サンプルデータではヘッダが 3行目から始まるため、冒頭の 2行を skiprows オプションで読み飛ばす。

import pandas as pd

fname = 'ny.gdp.mktp.cd_Indicator_en_csv_v2.csv'
df = pd.read_csv(fname, skiprows=[0, 1])
df.shape
# (258, 59)

df.head()
#   Country Name Country Code     Indicator Name  Indicator Code          1961
# 0        Aruba          ABW  GDP (current US$)  NY.GDP.MKTP.CD           NaN
# 1      Andorra          AND  GDP (current US$)  NY.GDP.MKTP.CD           NaN
# 2  Afghanistan          AFG  GDP (current US$)  NY.GDP.MKTP.CD  5.488889e+08
# 3       Angola          AGO  GDP (current US$)  NY.GDP.MKTP.CD           NaN
# 4      Albania          ALB  GDP (current US$)  NY.GDP.MKTP.CD           NaN
#
# [5 rows x 59 columns]

では読み取り対象ファイルが メモリに乗らないほど大きい場合はどうするか? read_csvchunksize オプションを指定することでファイルの中身を 指定した行数で分割して読み込むことができる。chunksize には 1回で読み取りたい行数を指定する。例えば 50 行ずつ読み取るなら、chunksize=50

reader = pd.read_csv(fname, skiprows=[0, 1], chunksize=50)

chunksize を指定したとき、返り値は DataFrame ではなく TextFileReader インスタンスとなる。

type(reader)
# pandas.io.parsers.TextFileReader

TextFileReaderfor でループさせると、ファイルの中身を指定した行数ごとに DataFrame として読み取る。TextFileReader は 現時点での読み取り位置 (ポインタ) を覚えており、ファイルの中身をすべて読み取るとループが終了する。

for r in reader:
    print(type(r), r.shape)
# (<class 'pandas.core.frame.DataFrame'>, (50, 59))
# (<class 'pandas.core.frame.DataFrame'>, (50, 59))
# (<class 'pandas.core.frame.DataFrame'>, (50, 59))
# (<class 'pandas.core.frame.DataFrame'>, (50, 59))
# (<class 'pandas.core.frame.DataFrame'>, (50, 59))
# (<class 'pandas.core.frame.DataFrame'>, (8, 59))

もしくは、TextFileReader.get_chunk で、現在の読み取り位置 (ポインタ) から N 行を読み取る。

# 上のループで全ての要素は読み取り済みになっているので、再度 reader を初期化
reader = pd.read_csv(fname, skiprows=[0, 1], chunksize=50)

# 先頭から 5行を読み込み
reader.get_chunk(5)
#   Country Name Country Code     Indicator Name  Indicator Code          1961
# 0        Aruba          ABW  GDP (current US$)  NY.GDP.MKTP.CD           NaN
# 1      Andorra          AND  GDP (current US$)  NY.GDP.MKTP.CD           NaN
# 2  Afghanistan          AFG  GDP (current US$)  NY.GDP.MKTP.CD  5.488889e+08
# 3       Angola          AGO  GDP (current US$)  NY.GDP.MKTP.CD           NaN
# 4      Albania          ALB  GDP (current US$)  NY.GDP.MKTP.CD           NaN
# 
# [5 rows x 59 columns]

# 次の4行 (6行目 - 9行目)を読み込み
reader.get_chunk(4)
#            Country Name Country Code     Indicator Name  Indicator Code  1961
# 0         Andean Region          ANR  GDP (current US$)  NY.GDP.MKTP.CD   NaN
# 1            Arab World          ARB  GDP (current US$)  NY.GDP.MKTP.CD   NaN
# 2  United Arab Emirates          ARE  GDP (current US$)  NY.GDP.MKTP.CD   NaN
# 3             Argentina          ARG  GDP (current US$)  NY.GDP.MKTP.CD   NaN
# 
# [4 rows x 59 columns]

補足 chunksize オプションは、 read_csvread_table で利用できる 。

補足 pandasread_csv はかなり速いので、パフォーマンス面でも pandas を使うのはよいと思う。

A new high performance, memory-efficient file parser engine for pandas | Wes McKinney

python - Fastest way to parse large CSV files in Pandas - Stack Overflow

でも、分割された DataFrame を扱うのはめんどうなんだけど?

分析に必要なレコードは 全データのごく一部、なんてこともよくある。chunk させて読み込んだ 各グループに対して前処理をかけると、サイズが劇的に減ってメモリに乗る。そんなときは、前処理をかけた後、各 DataFrame を結合してひとつにしたい。

DataFrame の結合には pd.concatTextFileReader から読み込んだ 各 DataFrame はそれぞれが 0 から始まる index を持つ。pd.concat は既定では index を維持して DataFrame を結合するため、そのままでは index が重複してしまう。 重複を避けるためには、結合後に index を振りなおす = ignore_index=True を指定する必要がある。

よく使う表現は、

def preprocess(x):
    # 不要な行, 列のフィルタなど、データサイズを削減する処理
    return x

reader = pd.read_csv(fname, skiprows=[0, 1], chunksize=50)
df = pd.concat((preprocess(r) for r in reader), ignore_index=True)

df.shape
# (258, 59)

df.head()
#   Country Name Country Code     Indicator Name  Indicator Code          1961
# 0        Aruba          ABW  GDP (current US$)  NY.GDP.MKTP.CD           NaN
# 1      Andorra          AND  GDP (current US$)  NY.GDP.MKTP.CD           NaN
# 2  Afghanistan          AFG  GDP (current US$)  NY.GDP.MKTP.CD  5.488889e+08
# 3       Angola          AGO  GDP (current US$)  NY.GDP.MKTP.CD           NaN
# 4      Albania          ALB  GDP (current US$)  NY.GDP.MKTP.CD           NaN
# 
# [5 rows x 59 columns]

メモリ上の DataFrame のサイズを確認したい

DataFrame を適切な大きさに分割するために、DataFrame のメモリ上の占有サイズが知りたい。これは v0.15.0 以降であれば可能。

DataFrame 全体のメモリ上のサイズを表示するには DataFrame.info()。 表示された情報の最終行、 "memory size" をみる。ただし、この表示には object 型のカラムが占めているメモリは含まれないので注意。"+" がついているのは実際の占有領域が表示よりも大きいことを示す。

df.info()
# <class 'pandas.core.frame.DataFrame'>
# Int64Index: 258 entries, 0 to 257
# Data columns (total 59 columns):
# Country Name      258 non-null object
# Country Code      258 non-null object
# Indicator Name    258 non-null object
# Indicator Code    258 non-null object
# 1961              127 non-null float64
# ...
# 2012              222 non-null float64
# 2013              216 non-null float64
# 2014              0 non-null float64
# Unnamed: 58       0 non-null float64
# dtypes: float64(55), object(4)
# memory usage: 116.9+ KB

ちゃんと表示する場合は DataFrame.memory_usage(index=True)index=Trueindex の占有メモリも表示に含めるオプション。表示はカラム別になるため、全体のサイズを見たい場合は sum をとる。

df.memory_usage(index=True)
# Index             2064
# Country Name      1032
# Country Code      1032
# Indicator Name    1032
# Indicator Code    1032
# 1961              2064
# ...
# 2013              2064
# 2014              2064
# Unnamed: 58       2064
# Length: 60, dtype: int64

df.memory_usage(index=True).sum()
# 119712

DataFrame を複製せずに処理する

DataFrameメソッドを呼び出した場合、一般には DataFrame をコピーしてから処理を行って返す (非破壊的な処理を行う)。大容量の DataFrame ではこのコピーによって 実メモリのサイズを超え MemoryError になる / もしくはコピーに非常に時間がかかることがある。

例えば、欠損値の補完を fillna で行う処理をみてみる。fillna の返り値では NaN が 0 でパディングされているが、元データは変更されていない。fillna では、コピーした DataFrame に対して処理を行っていることが "1961" カラムの値を比較するとわかる。

df.fillna(0).head()
#   Country Name Country Code     Indicator Name  Indicator Code          1961
# 0        Aruba          ABW  GDP (current US$)  NY.GDP.MKTP.CD  0.000000e+00
# 1      Andorra          AND  GDP (current US$)  NY.GDP.MKTP.CD  0.000000e+00
# 2  Afghanistan          AFG  GDP (current US$)  NY.GDP.MKTP.CD  5.488889e+08
# 3       Angola          AGO  GDP (current US$)  NY.GDP.MKTP.CD  0.000000e+00
# 4      Albania          ALB  GDP (current US$)  NY.GDP.MKTP.CD  0.000000e+00

# dfはそのまま
df.head()
#   Country Name Country Code     Indicator Name  Indicator Code          1961
# 0        Aruba          ABW  GDP (current US$)  NY.GDP.MKTP.CD           NaN
# 1      Andorra          AND  GDP (current US$)  NY.GDP.MKTP.CD           NaN
# 2  Afghanistan          AFG  GDP (current US$)  NY.GDP.MKTP.CD  5.488889e+08
# 3       Angola          AGO  GDP (current US$)  NY.GDP.MKTP.CD           NaN
# 4      Albania          ALB  GDP (current US$)  NY.GDP.MKTP.CD           NaN
# 
# [5 rows x 59 columns]

このコピー処理を避けたい場合は inplace=True を指定する。こいつが指定されると、各メソッドDataFrame のコピーを行わず、呼び出し元の DataFrame を直接処理する (破壊的な処理を行う)。また、inplace=True が指定されたメソッドは返り値をもたない ( None を返す ) ため、代入を行ってはならない。

# 返り値はないため、代入してはダメ
df.fillna(0, inplace=True)

# df 自体が変更された
df.head()
#   Country Name Country Code     Indicator Name  Indicator Code          1961
# 0        Aruba          ABW  GDP (current US$)  NY.GDP.MKTP.CD  0.000000e+00
# 1      Andorra          AND  GDP (current US$)  NY.GDP.MKTP.CD  0.000000e+00
# 2  Afghanistan          AFG  GDP (current US$)  NY.GDP.MKTP.CD  5.488889e+08
# 3       Angola          AGO  GDP (current US$)  NY.GDP.MKTP.CD  0.000000e+00
# 4      Albania          ALB  GDP (current US$)  NY.GDP.MKTP.CD  0.000000e+00
# 
# [5 rows x 59 columns]

やりたいメソッドinplace オプションを持つかどうかは API ガイド で確認。

eval による処理の最適化

numexpr のインストール

pd.eval を使うには numexpr パッケージが必要なので、入っていなければインストールする。

pip install numexpr

eval のメリット

pd.eval では文字列表現された式を受け取って評価できる。pd.eval を使うと以下 2つのメリットがある。

  • 大容量の DataFrame を効率的に処理できる
  • 数値演算を一括して処理できる

補足 numexpr のソースは読んだことがないので詳細不明だが、 pandas では連続するオペレータは逐次処理されていく。例えば "A + B + C" であれば まず "A + B" の結果から DataFrame を作って、さらに "+ C" して DataFrame を作る。DataFrame 生成にはそれなりにコストがかかるので この辺の内部処理が最適化されるのかな、と思っている。

補足 逆に、小さい処理では eval から numexpr を呼び出すコストの方が メリットよりも大きくなるため、pd.eval を使う場合は通常処理とのパフォーマンス比較を行ってからにすべき。今回のサンプルくらいのデータサイズであれば eval よりも 通常の演算のほうが速い。

eval でサポートされる式表現は、公式ドキュメントにあるとおり、

  • 数値演算オペレータ。ただし、シフト演算 <<>> を除く。
  • 比較演算オペレータ。連結も可能 ( 2 < df < df2 とか)
  • 論理演算オペレータ ( not も含む)。
  • listtupleリテラル表現 ( [1, 2](1, 2) )
  • プロパティアクセス ( df.a )
  • __getitem__ でのアクセス ( df[0] )

eval の利用

例えばサンプルデータの "2011" 〜 "2013" カラムの値を足し合わせたいときはこう書ける。

pd.eval("df['2011'] + df['2012'] + df['2013']")
# 0     2.584464e+09
# 1     0.000000e+00
# 2     5.910162e+10
# 3     3.411516e+11
# 4     3.813926e+10
# ...
# 256    6.218183e+10
# 257    3.623061e+10
# Length: 258, dtype: float64

また、pd.eval ではなく DataFrame.eval として呼んだ場合、式表現中はデータのカラム名を変数名としてもつ名前空間で評価される ( DataFrame.query と同じ )。DataFrame.query については以下の記事参照。

まとめ

pandas で大容量ファイルを扱うときは、

  • chunksize で分割して読み込み
  • メソッド呼び出しは inplace=True
  • オペレータを使う処理は pd.eval

これさえ覚えておけば、大容量ファイルが送りつけられた場合でも安心。

Pythonによるデータ分析入門 ―NumPy、pandasを使ったデータ処理

Pythonによるデータ分析入門 ―NumPy、pandasを使ったデータ処理