StatsFragments

Python, R, Rust, 統計, 機械学習とか

Python spyre によるデータ分析結果のWebアプリ化

R を使っている方はご存知だと思うが、R には {Shiny} というパッケージがあり、データ分析の結果を インタラクティブな Web アプリとして共有することができる。{Shiny} って何?という方には こちらの説明がわかりやすい。

qiita.com

Python でも {Shiny} のようなお手軽可視化フレームワークがあるといいよね、とたびたび言われていたのだが、spyre という なんかそれっぽいパッケージがあったので触ってみたい。

github.com

インストール

pip で。

pip install dataspyre

使い方

現時点で ドキュメンテーションはない ので、README と examples ディレクトリを見る。サンプルとして株価を取得してプロットするWebアプリを作ってみたい。spyre で Webアプリを作る手順は以下の3つ。

  1. spyre.server.App を継承したクラスを作る。
  2. 描画をコントロール/指示するクラス変数を指定する。
  3. 描画を行うメソッド getData, getPlot を書く。メソッド名は描画内容 (output_type) ごとに固定。

最初は examples のものを書き換えながら作るのが楽。各クラス変数/メソッドは相互に関連するため、プログラム全体を示した上で必要と思われる箇所にコメントを入れた。

補足 株価の取得には以下のパッケージを使う。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from spyre import server

import pandas as pd
pd.options.display.mpl_style = 'default'

# あらかじめデータを取得しておく
# 終値のみを取得し、一つのDataFrameに結合
import japandas as jpd
toyota = jpd.DataReader(7203, 'yahoojp', start='2015-01-01')[[u'終値']]
toyota.columns = [u'トヨタ']
honda = jpd.DataReader(7267, 'yahoojp', start='2015-01-01')[[u'終値']]
honda.columns = [u'ホンダ']
df = toyota.join(honda)


# spyre.server.App を継承したクラスを作る
class StockExample(server.App):
    title = u"株価のプロット"

    # 左側のペインに表示する UI 要素を辞書のリストで指定
    # ここではドロップダウン一つだけを表示
    inputs = [{"input_type":'dropdown', 
               # ドロップダウン自体の表示ラベル
               "label": 'Frequency',
               # ドロップダウンの選択項目を指定
               # label はドロップダウン項目の表示ラベル
               # value は各項目が選択された時にプログラム中で利用される値
               "options" : [ {"label": "月次", "value":"M"},
                             {"label": "週次", "value":"W"},
                             {"label": "日次", "value":"B"}],
               # 各 UI 要素の入力は各描画メソッド (getData, getPlot) に
               # 辞書型の引数 params として渡される
               # その辞書から値を取り出す際のキー名
               "variable_name": 'freq',
               "action_id": "update_data" }]

    # 画面を更新する設定
    controls = [{"control_type" : "hidden",
                 "label" : "update",
                 "control_id" : "update_data"}]

    # 描画するタブの表示ラベルを文字列のリストで指定
    tabs = [u"トヨタ", u"ホンダ", u"データ"]

    # tabs で指定したそれぞれのタブに描画する内容を辞書のリストで指定
    outputs = [{"output_type" : "plot", # matplotlib のプロットを描画する
                "output_id" : "toyota", # 描画するタブに固有の id
                "control_id" : "update_data",
                "tab" : u"トヨタ",       # 描画するタブの表示ラベル (tabs に含まれるもの)
                "on_page_load" : True },

               {"output_type" : "plot",
                "output_id" : "honda",
                "control_id" : "update_data",
                "tab" : u"ホンダ",
                "on_page_load" : True },

               {"output_type" : "table", # DataFrameを描画する
                "output_id" : "table_id",
                "control_id" : "update_data",
                "tab" : u"データ",
                "on_page_load" : True }]

    def getData(self, params):
        """
        output_type="table" を指定したタブを描画する際に呼ばれるメソッド
        DataFrameを返すこと

        params は UI 要素の入力 + いくつかのメタデータを含む辞書
        UI 要素の入力は inputs で指定した variable_name をキーとして行う
        """
        # ドロップダウンの値を取得
        # 値にはユーザの選択によって、options -> value で指定された M, W, B いずれかが入る
        freq = params['freq']

        # freq でグループ化し平均をとる
        tmp = df.groupby(pd.TimeGrouper(freq)).mean()
        return tmp

    def getPlot(self, params):
        """
        output_type="plot" を指定したタブを描画する際に呼ばれるメソッド
        matplotlib.Figureを返すこと
        """
        tmp = self.getData(params)

        # 同じ output_type で複数のタブを描画したい場合は、 params に含まれる
        # output_id で分岐させる
        # output_id は タブの表示ラベルではなく、outputs 中で指定した output_id
        if params['output_id'] == 'toyota':
            ax = tmp[[u'トヨタ']].plot(legend=False)
            return ax.get_figure()
        elif params['output_id'] == 'honda':
            ax = tmp[[u'ホンダ']].plot(legend=False)
            return ax.get_figure()
        else:
            raise ValueError


app = StockExample()
# port 9093 で Webサーバ + アプリを起動
app.launch(port=9093)

実行後、ブラウザで http://127.0.0.1:9093 を開くと以下のような画面が表示される。inputs で指定した項目が左側のメニューに、 tabs で指定した項目がタブとして表示されている。

f:id:sinhrks:20150613001354p:plain

ドロップダウンやタブの選択を切り替えると、動的にグラフやデータが更新されることがわかる。

f:id:sinhrks:20150613001400p:plain

DataFrame の描画はちょっとおかしく、index である日時自体が描画されずにその名前 ("日付") だけが表示されている。これは後日 issue あげて直そうかと思う。

f:id:sinhrks:20150613001406p:plain

06/13追記 index を描画しないのは spyre の仕様で、そのとき名前が残ってしまうのは pandas のバグです。v0.17.0で修正予定。

まとめ

お手軽可視化フレームワーク spyre を試してみた。現行の {Shiny} と比べると 機能/UI とも十分ではないが、いくつかのデータ / プロットをインタラクティブに共有する用途であれば十分使えそうだ。

使いごこちの印象として、はじめて {Shiny} を触ったあの日の気持ちを思い出したような気がする、、。

R で 状態空間モデル: {dlm} の最尤推定を可視化する

{dlm} において、状態空間モデルが最尤推定される過程がみたい。以下内容の補足的なエントリ。

sinhrks.hatenablog.com

「状態空間時系列分析入門」では 引き続き 第8章 に相当。

状態空間時系列分析入門

状態空間時系列分析入門

  • 作者: J.J.F.コマンダー,S.J.クープマン,Jacques J.F. Commandeur,Sime Jan Koopman,和合肇
  • 出版社/メーカー: シーエーピー出版
  • 発売日: 2008/09
  • メディア: 単行本
  • 購入: 2人 クリック: 4回
  • この商品を含むブログを見る

データの準備

データは著者のサポートサイト、An Introduction to State Space Time Series Analysis から入手できる。

library(dlm)
# install_github('sinhrks/ggfortify')
library(ggfortify)

ukdrivers <- read.table('UKdriversKSI.txt', skip = 1)
ukdrivers <- ts(ukdrivers[[1]], start = c(1969, 1), frequency = 12)
ukdrivers <- log(ukdrivers)

y <- ukdrivers

モデルの作成と状態推定値のプロット

サンプルとして、第3章3節 確率的レベルと確定的傾きをもつローカル線形トレンドモデル を作成する。

parm <- log(c(var(y), 0.001))
buildFun <- function(parm){
  dlmModPoly(order = 2, dV = exp(parm[1]), dW = c(exp(parm[2]), 0))
}
dlmFit <- dlmMLE(y, parm = parm, build = buildFun)

# カルマンフィルタ
dlmFiltered <- dlmFilter(y, buildFun(dlmFit$par))

# 平滑化
dlmSmoothed <- dlmSmooth(y, buildFun(dlmFit$par))

第8章ではモデルの推定値として 3 つの値が出てくるため、その差異がわかるようにプロットする。用語は状態空間時系列分析入門のものを利用。

p <- autoplot(y)
p <- autoplot(dlmFiltered$a[, 1], colour = 'red', linetype = 'dashed', p = p)
p <- autoplot(dlmFiltered$m[, 1], colour = 'red', p = p)
p <- autoplot(dlmSmoothed, colour = 'blue', p = p)
p + ylim(c(6.8, 8))
  • 黒実線 ( y ): 観測時系列
  • 赤破線 ( dlmFiltered$a ): カルマンフィルタの1期先予測 (予測ステップでの予測値)
  • 赤実線 ( dlmFiltered$m ): カルマンフィルタのフィルタ化状態 (更新ステップでの予測値)
  • 青実線 ( dlmSmoothed ): 平滑化状態

f:id:sinhrks:20150530114345p:plain

一部期間を拡大するとそれぞれの差異がわかる。これらのうち、最尤推定時の尤度計算式中で直接 利用されるのは "1期先予測" 時の予測誤差 (赤破線) とその標準偏差

p + xlim(as.Date('1975', '%Y'), as.Date('1977', '%Y')) + ylim(c(6.8, 8))

f:id:sinhrks:20150530114355p:plain

補足 上のモデルでは FF = [1, 0] のため、観測値の推定値 == 状態推定値。

dlmFiltered$mod$FF
#      [,1] [,2]
# [1,]    1    0

all((dlmFiltered$a %*% t(dlmFiltered$mod$FF)) == dlmFiltered$a[, 1])
# [1] TRUE

補足 カルマンフィルタの動きについては以前こんなエントリを書いた。2つ目のエントリにカルマンフィルタの予測 / フィルタ化状態がどういった感じで動いているか記載がある (平滑化は記載ない)。

予測誤差と予測誤差分散

最尤推定の尤度計算式は先のエントリに記載。計算に必要な (1期先) 予測誤差と予測誤差標準偏差dlm:::residuals.dlmFiltered で求めることができる。

# 既定では標準化されるため、 type = 'raw' を指定して実値をとる
resid <- residuals(dlmFiltered, type = 'raw')

# 予測誤差
resid$res[1:12]
#  [1]  7.430707083 -3.827523556  0.111504168 -0.021616238  0.225665269 -0.044983932
#  [7]  0.045295381  0.061233329 -0.021088564  0.049603994  0.270853650  0.007138991

# 予測誤差標準偏差
resid$sd[1:12]
#  [1] 4472.1359566 2236.0679824    0.1922632    0.1596384    0.1484746    0.1429560
#  [7]    0.1396797    0.1375109    0.1359691    0.1348166    0.1339224    0.1332084

ふつうに計算して求めるなら、dlm::dlmFiltered インスタンスの各プロパティを利用して、

# 予測誤差
(y - (matrix(as.numeric(dlmFiltered$a), 192, 2) %*% t(dlmFiltered$mod$FF)))[1:12]
#  [1]  7.430707083 -3.827523556  0.111504168 -0.021616238  0.225665269 -0.044983932
#  [7]  0.045295381  0.061233329 -0.021088564  0.049603994  0.270853650  0.007138991

# もしくは
(y - dlmFiltered$f)[1:12]
#  [1]  7.430707083 -3.827523556  0.111504168 -0.021616238  0.225665269 -0.044983932
#  [7]  0.045295381  0.061233329 -0.021088564  0.049603994  0.270853650  0.007138991

# 予測誤差標準偏差
f <-  function(i) {
  FF <- dlmFiltered$mod$FF
  V <- dlmFiltered$mod$V
  diag(crossprod(dlmFiltered$D.R[i, ] * t(FF %*% dlmFiltered$U.R[[i]])) + V)
}
drop(t(sqrt(sapply(seq(along = dlmFiltered$U.R), f))))[1:12]
#  [1] 4472.1359566 2236.0679824    0.1922632    0.1596384    0.1484746    0.1429560
#  [7]    0.1396797    0.1375109    0.1359691    0.1348166    0.1339224    0.1332084

最尤推定の可視化

最尤推定時のパラメータの変化を記録するため、dlm::dlmMLE を書き換えた関数を定義する。LL_ckbookcompat はテキスト記載の対数尤度 (に近いもの) を計算するための関数 ( 詳細はこちら )。

# optim 1 ステップごとに作成されたモデルを保存
models <<- list()

dlmMLE_wrapper <- function (y, parm, build, method = "L-BFGS-B",
                            ..., debug = FALSE) {
  logLik <- function(parm, ...) {
    mod <- build(parm, ...)
    # グローバル変数に追加
    models[[length(models) + 1]] <<- mod
    return(dlmLL(y = y, mod = mod, debug = debug))
  }
  return(optim(parm, logLik, method = method, ...))
}

# 推定するパラメータ数が 2 のため、総和をとる範囲を変更
LL_ckbookcompat <- function(y, mod) {
  n <- length(y)
  filtered <- dlmFilter(y, mod)
  resid <- residuals_dlmcompat(filtered) 
  F <- (as.numeric(resid$sd)^2)[3:n]
  v <- as.numeric(resid$res)[3:n]
  return(-0.5 * (n * log(2 * pi) + sum(log(F) + (v^2/F))))
}

続けて、アニメーション用の処理を定義して実行。

library(animation)

models <<- list()
dlmMLE_wrapper(y, parm = parm, build = buildFun)

saveGIF({
  for (mod in models[1:65]) {
    filtered <- dlmFilter(y, mod)
    p <- autoplot(filtered, fitted.colour = 'red', fitted.linetype = 'dashed')
    p <- autoplot(dlmSmooth(y, mod), colour = 'blue', p = p)
    label <- paste('CKbook LL:', round(LL_ckbookcompat(y, mod), 2), 'dlmLL:', round(dlmLL(y, mod), 2))
    p <- p + annotate('text', label = label, x = as.Date('1980', '%Y'), y = 7.8, size = 6)
    print(p)
    Sys.sleep(0.01)
  }
}, interval = 0.3, movie.name =  "dlm.gif",
ani.width = 500, ani.height = 300)

最尤推定時に各状態推定値がどのように変化していくかをみる。左側 "CKbookLL" はテキスト記載の対数尤度計算式 LL_ckbookcompat の値。右側 "dlmLL" は dlm::dlmLL の値 ( optim で最適化される値 )。

テキストに記載された数値より、収束時には左側の値が 0.6247935 x 観測時系列の長さ (192) = 119.9604 程度になるはずだ。

f:id:sinhrks:20150530130231g:plain

f:id:sinhrks:20150530130254g:plain

補足 {dlm} は既定では optim の際に method = 'L-BFGS-B' を利用する。可視化目的であれば Nelder-Mead 法なんかを使ったほうが 素人目には "最適化らしい" 動きになる気がする。

models <<- list()
dlmMLE_wrapper(y, parm = parm, build = buildFun,  method = 'Nelder-Mead')
# 以降略

f:id:sinhrks:20150530130005g:plain

f:id:sinhrks:20150530130117g:plain

R で 状態空間モデル: 状態空間時系列分析入門を {rstan} で再現したい

前の記事でもリンクさせていただいているが、サイト 「状態空間時系列分析入門」をRで再現する では以下のテキストを {dlm}, {KFAS} で再現されており非常にありがたい。これらのパッケージの使い方については リンク先を読めば困らない感じだ。

自分も勉強のために似たことやりたい、、でも同じことやるのもなあ、、と考えた結果 同テキストの内容 {rstan} を使ってやってみた。

補足 Stan には状態空間表現用の関数 gaussian_dlm_obs ( 利用例 ) があるのだが、自分は使ったことがない。7章までのモデルは全て漸化式で表現されているため、それらを Stan のモデルとして記述した。

状態空間時系列分析入門

状態空間時系列分析入門

  • 作者: J.J.F.コマンダー,S.J.クープマン,Jacques J.F. Commandeur,Sime Jan Koopman,和合肇
  • 出版社/メーカー: シーエーピー出版
  • 発売日: 2008/09
  • メディア: 単行本
  • 購入: 2人 クリック: 4回
  • この商品を含むブログを見る

github リポジトリ

スクリプト / モデルは GitHub にあげた。現在 7 章までのモデルと kick する R スクリプト, markdown ファイルを置いている。

github.com

モデルの一覧

各章のモデルについて、その実行結果を RPubs にあげている。各モデルの概要とリンクを以下に記載する。自分のモデリング能力不足のため いくつか課題があると考えており、特に怪しいものには * をつけた。

かんたんな説明

必要パッケージ

まず、実行には以下のCRAN非公開パッケージが必要である。後者はどうでもよいが {pforeach} はいろいろ捗るのでインストールしていない方はただちに入れるべきだ。

データの読み込み

データは著者のサポートサイト、An Introduction to State Space Time Series Analysis から入手できる。各データは適宜 ts インスタンス化しておく。

Stan の実行

Stan の呼び出しは全て以下のように並列化して行う。

model_file <- 'モデルを記述したファイルのパス'

# モデルのコンパイルを実行
stan_fit <- stan(file = model_file, chains = 0)

# Stan を並列実行する。実行結果に sflist2stanfit を適用し、stanfit インスタンスを得る
fit <- pforeach(i = 1:4, .final = sflist2stanfit)({
  stan(fit = stan_fit, data = standata, chains = 1, seed = i)
})

また、モデルが収束したかどうかは 全ての Rhat が 1.1 未満となっているかどうかで確認する。そのままでは収束しない / しにくいモデルには、初期値 / 事前分布 / 制約を入れて収束させた。

is.converged <- function(stanfit) {
  summarized <- summary(stanfit)  
  all(summarized$summary[, 'Rhat'] < 1.1)
}

stopifnot(is.converged(fit))

また、テキストに最尤推定の結果が記載されている場合は、以下のようにして近い値になっているかを確認した。値がずれる場合は stopifnot を外し、差異を出力する。

# is.almost.fitted の定義は各リンク先を参照
stopifnot(is.almost.fitted(mu[[1]], 7.4157))
結果のプロット

テキストのグラフを再現する形で行う。使っているパッケージの説明はこちら。

課題

以下の2つについてはもう少し Stan に習熟できたら直したい。

複数の確率的状態をもつモデルをうまく収束させたい

このようなモデルは 収束しにくく、また 最尤推定した状態推定値 ( テキスト、{dlm}{KFAS}の結果 ) とずれやすい。特に 現状 制約を入れて収束させているモデルについて、事前分布 / 初期値 / 尤度関数の範囲でなんとかしたい。

モデルの一覧で * をつけたものがこれにあたる。

テキストの尤度関数を再現したい

テキストに記載の式 (概要は以下記事参照) に揃えようとすると、自分の記述では収束しにくくなってしまう。

sinhrks.hatenablog.com

R で 状態空間モデル: {dlm} で単変量モデルの状態空間表現を利用する

こちらの続きで、状態空間時系列分析入門の第8章の内容。

sinhrks.hatenablog.com

状態空間時系列分析入門

状態空間時系列分析入門

  • 作者: J.J.F.コマンダー,S.J.クープマン,Jacques J.F. Commandeur,Sime Jan Koopman,和合肇
  • 出版社/メーカー: シーエーピー出版
  • 発売日: 2008/09
  • メディア: 単行本
  • 購入: 2人 クリック: 4回
  • この商品を含むブログを見る

状態空間モデルでは、トレンド、季節効果、説明変数効果などさまざまな要因を考慮することができる。また これらの要因を単一の状態空間表現で統一的に扱える。時刻 { t } における観測値 { y_t } と状態 { a_t } は以下の式で書ける (前回と同じ)。

{ y_t = z'_t a_t + \epsilon_{t} \ \ \ \ \ \ \epsilon_t \sim NID(0, \sigma^{2}_{ \epsilon } ) }

{ a_{ t+1 } = T_t a_t + R_t \eta_t \ \ \ \ \ \ \eta_t \sim NID(0, Q_t) }

この式であらわされたモデルを {dlm} で利用する際にどうすればよいのかをまとめたい。

vignette にあるとおり、{dlm} ではモデルを dlm::dlmModPoly などのモデル記述関数の組み合わせで定義できる。これらの関数は便利だが、上の式との関係は外側からはよくわからない。

上式から {dlm} のモデルを作成する際には dlm::dlm を直接使うのがよさそうだ。dlm::dlm の引数は上式にほぼ対応しているため、式表現されたモデルをそのまま実装できる。また、モデル記述関数も内部的には dlm::dlm を使っているため、これら関数の理解にも役立つ。dlm::dlm の引数と上式の対応は前回書いたとおり。

  • FF: { z'_t } に対応。
  • V: { \sigma^{2} } に対応。
  • GG: { T_t } に対応。
  • W: { R_t Q_t } に対応。
  • m0: { a_1 } の事前分布の平均。
  • C0: { a_1 } の事前分布の分散。
  • J...: 各パラメータが時間変化するとき利用。

参考 サイト 「状態空間時系列分析入門」をRで再現する はこのテキストを {dlm}, {KFAS} で再現している。こちらは dlm::dlm ではなく dlm::dlmModPoly などの関数を使ってモデル作成している。

準備

まずデータを読み込む。データは著者のサポートサイト、An Introduction to State Space Time Series Analysis から入手できる。

library(dlm)
# install_github('sinhrks/ggfortify')
library(ggfortify)

ukdrivers <- read.table('UKdriversKSI.txt', skip = 1)
ukdrivers <- ts(ukdrivers[[1]], start = c(1969, 1), frequency = 12)
ukdrivers <- log(ukdrivers)

以降、テキストに記載のモデル順に dlm::dlm から利用する場合のプログラムを列挙する。値 / 推定するパラメータを変更したい場合は buildObj 内の対応する要素を適当に修正すればよい。

1. ローカルレベルモデル (確定的レベル)

それぞれのモデルについて 状態 { a_t } として何が考慮されているかを書いておく。

  • { a_t = u }: { u } は確定的レベル (経時変化しない)。

補足 それぞれのモデルの式表現を書こうとしたのだが、はてなマークダウン記法だと TeX 記法中 の "&" がうまく処理できないようなので諦めた。詳細はテキストと見比べてください。

y <- ukdrivers

parm<- log(var(y))
buildObj <- function(parm) {
  dlm(FF = matrix(1, 1, 1),
      V = matrix(exp(parm), 1, 1),
      GG = matrix(1, 1, 1),
      W = matrix(0, 1, 1),
      m0 = 0,
      C0 = matrix(1e+07, 1, 1))
}
dlmFitObj <- dlmMLE(y, parm = parm, build = buildObj)
(dlmModObj <- buildObj(dlmFitObj$par))
# $FF
#      [,1]
# [1,]    1
# 
# $V
#            [,1]
# [1,] 0.02935256
# 
# $GG
#      [,1]
# [1,]    1
# 
# $W
#      [,1]
# [1,]    0
# 
# $m0
# [1] 0
# 
# $C0
#       [,1]
# [1,] 1e+07

p <- autoplot(dlmFilter(y, dlmModObj), fitted.colour = 'red', fitted.linetype = 'dashed')
autoplot(dlmSmooth(y, dlmModObj), colour = 'blue', p = p)

f:id:sinhrks:20150524153631p:plain

赤破線がカルマンフィルタ、青破線が平滑化した値。以降もプロットするが、確率的状態を使うと 実データにほぼフィットするため、グラフからでは差がよくわからない。

2. ローカル線形トレンドモデル (確率的レベルと確率的傾き)

  • { a_t = ( u_t, v_t )^T }: { u_t } は確率的レベル (経時変化する)、{ v_t } は確率的傾き(経時変化する)。
parm <- log(c(var(y), 0.001, 0.001))
buildObj <- function(parm) {
  dlm(FF = matrix(c(1, 0), 1, 2),
      V = matrix(exp(parm[1]), 1, 1),
      GG = matrix(c(1, 1,
                    0, 1), 2, 2, byrow = TRUE),
      W = matrix(c(exp(parm[2]), 0,
                   0, exp(parm[3])), 2, 2, byrow = TRUE),
      m0 = c(0, 0),
      C0 = matrix(c(1e+07, 0,
                    0, 1e+07), 2, 2, byrow = TRUE))
}
dlmFitObj <- dlmMLE(y, parm = parm, build = buildObj)
(dlmModObj <- buildObj(dlmFitObj$par))
# $FF
#      [,1] [,2]
# [1,]    1    0
# 
# $V
#             [,1]
# [1,] 0.002118549
# 
# $GG
#      [,1] [,2]
# [1,]    1    1
# [2,]    0    1
# 
# $W
#            [,1]        [,2]
# [1,] 0.01212741 0.00000e+00
# [2,] 0.00000000 1.92431e-10
# 
# $m0
# [1] 0 0
# 
# $C0
#       [,1]  [,2]
# [1,] 1e+07 0e+00
# [2,] 0e+00 1e+07

p <- autoplot(dlmFilter(y, dlmModObj), fitted.colour = 'red', fitted.linetype = 'dashed')
autoplot(dlmSmooth(y, dlmModObj), colour = 'blue', p = p)

f:id:sinhrks:20150524153644p:plain

3. 季節要素のあるローカルレベルモデル (確率的レベルと確率的季節)

  • { a_t = ( u_t, \gamma_{ 1, t }, \gamma_{ 2, t }, \gamma_{ 3, t } )^T }: { u_t } は確率的レベル、{ \gamma } は 四半期単位の確率的季節のダミー項。
ukinflation <- read.table('UKinflation.txt', skip = 1)
ukinflation <- ts(ukinflation[[1]], start = c(1950, 1), frequency = 4)
y <- ukinflation

parm <- log(c(var(y), 0.00001, 0.00001))
buildObj <- function(parm) {
  dlm(FF = matrix(c(1, 1, 0, 0), 1, 4),
      V = matrix(c(exp(parm[1])), 1, 1),
      GG = matrix(c(1,  0,  0,  0,
                    0, -1, -1, -1,
                    0,  1,  0,  0,
                    0,  0,  1,  0),
                  4, 4, byrow = TRUE),
      W = matrix(c(exp(parm[2]),            0, 0, 0,
                   0,            exp(parm[3]), 0, 0,
                   0,                       0, 0, 0,
                   0,                       0, 0, 0),
                 4, 4, byrow = TRUE),
      m0 = c(0, 0, 0, 0),
      C0 = matrix(c(1e+07,     0,     0,     0,
                    0,     1e+07,     0,     0,
                    0,         0, 1e+07,     0,
                    0,         0,     0, 1e+07),
                  4, 4, byrow = TRUE))
}
dlmFitObj <- dlmMLE(y, parm = parm, build = buildObj)
dlmFitObj <- dlmMLE(y, parm = parm, build = buildObj)
(dlmModObj <- buildObj(dlmFitObj$par))
# $FF
#      [,1] [,2] [,3] [,4]
# [1,]    1    1    0    0
# 
# $V
#             [,1]
# [1,] 3.37127e-05
# 
# $GG
#      [,1] [,2] [,3] [,4]
# [1,]    1    0    0    0
# [2,]    0   -1   -1   -1
# [3,]    0    1    0    0
# [4,]    0    0    1    0
# 
# $W
#              [,1]         [,2] [,3] [,4]
# [1,] 2.124158e-05 0.000000e+00    0    0
# [2,] 0.000000e+00 4.345176e-07    0    0
# [3,] 0.000000e+00 0.000000e+00    0    0
# [4,] 0.000000e+00 0.000000e+00    0    0
# 
# $m0
# [1] 0 0 0 0
# 
# $C0
#       [,1]  [,2]  [,3]  [,4]
# [1,] 1e+07 0e+00 0e+00 0e+00
# [2,] 0e+00 1e+07 0e+00 0e+00
# [3,] 0e+00 0e+00 1e+07 0e+00
# [4,] 0e+00 0e+00 0e+00 1e+07

p <- autoplot(dlmFilter(y, dlmModObj), fitted.colour = 'red', fitted.linetype = 'dashed')
autoplot(dlmSmooth(y, dlmModObj), colour = 'blue', p = p)

f:id:sinhrks:20150524153704p:plain

4. 説明変数のあるローカルレベルモデル (確率的レベル)

  • { a_t = ( u_t, \beta_t )^T }: { u_t } は確率的レベル、{ \beta_t } は説明変数の係数。

説明変数は dlm::dlm へ引数 X として渡す。

ukpetrol <- read.table('logUKpetrolprice.txt', skip = 1)
ukpetrol <- ts(ukpetrol, start = start(ukdrivers), frequency = frequency(ukdrivers))

y <- ukdrivers
x <- ukpetrol

parm <- c(var(y), 0.001)  

buildObj <- function(parm) {
  dlm(FF = matrix(c(1, 0), 1, 2),
      V = matrix(exp(parm[1]), 1, 1),
      GG = matrix(c(1, 0,
                    0, 1),
                  2, 2, byrow = TRUE),
      W = matrix(c(exp(parm[2]), 0,
                   0,            0),
                 2, 2, byrow = TRUE),
      m0 = c(0, 0),
      C0 = matrix(c(1e+07,     0,
                    0,     1e+07),
                  2, 2, byrow = TRUE),
      JFF = matrix(c(0, 1), 1, 2),
      X = as.matrix(x))
}
dlmFitObj <- dlmMLE(y, parm = parm, build = buildObj)
(dlmModObj <- buildObj(dlmFitObj$par))
# $FF
#      [,1] [,2]
# [1,]    1    1
# 
# $V
#             [,1]
# [1,] 0.002347965
# 
# $GG
#      [,1] [,2]
# [1,]    1    0
# [2,]    0    1
# 
# $W
#            [,1] [,2]
# [1,] 0.01166743    0
# [2,] 0.00000000    0
# 
# $JFF
#      [,1] [,2]
# [1,]    0    1
# 
# $X
#      V1    
# [1,] -2.273
# [2,] -2.279
# [3,] ...   
# 
# $m0
# [1] 0 0
# 
# $C0
#       [,1]  [,2]
# [1,] 1e+07 0e+00
# [2,] 0e+00 1e+07

filtered <- dlmFilter(y, dlmModObj)$m
smoothed <- dlmSmooth(y, dlmModObj)$s
p <- autoplot(y)
p <- autoplot(filtered[, 1] + filtered[, 2] * x, colour = 'red', linetype = 'dashed', p = p)
autoplot(smoothed[, 1] + smoothed[, 2] * x, colour = 'blue', p = p)

f:id:sinhrks:20150524153715p:plain

5. 干渉変数のあるローカルレベルモデル (確率的レベル)

  • { a_t = ( u_t, \lambda_t )^T }: { u_t } は確率的レベル、{ \lambda_t } は干渉変数の係数。

変数名は異なるが、式は説明変数のあるローカルレベルモデルと同一。

ukseats <- c(rep(0, (1982 - 1968) * 12 + 1), rep(1, (1984 - 1982) * 12 - 1))
ukseats <- ts(ukseats, start = start(ukdrivers), frequency = frequency(ukdrivers))

x <- ukseats
parm <- c(var(y), 0.001)  

buildObj <- function(parm) {
  dlm(FF = matrix(c(1, 0), 1, 2),
      V = matrix(exp(parm[1]), 1, 1),
      GG = matrix(c(1, 0,
                    0, 1),
                  2, 2, byrow = TRUE),
      W = matrix(c(exp(parm[2]), 0,
                   0,            0),
                 2, 2, byrow = TRUE),
      m0 = c(0, 0),
      C0 = matrix(c(1e+07,     0,
                    0,     1e+07),
                  2, 2, byrow = TRUE),
      JFF = matrix(c(0, 1), 1, 2),
      X = as.matrix(x))
}

dlmFitObj <- dlmMLE(y, parm = parm, build = buildObj)
(dlmModObj <- buildObj(dlmFitObj$par))
# $FF
#      [,1] [,2]
# [1,]    1    0
# 
# $V
#             [,1]
# [1,] 0.002692686
# 
# $GG
#      [,1] [,2]
# [1,]    1    0
# [2,]    0    1
# 
# $W
#            [,1] [,2]
# [1,] 0.01041175    0
# [2,] 0.00000000    0
# 
# $JFF
#      [,1] [,2]
# [1,]    0    1
# 
# $X
#      [,1]
# [1,] 0   
# [2,] 0   
# [3,] ... 
# 
# $m0
# [1] 0 0
# 
# $C0
#       [,1]  [,2]
# [1,] 1e+07 0e+00
# [2,] 0e+00 1e+07

filtered <- dlmFilter(y, dlmModObj)$m
smoothed <- dlmSmooth(y, dlmModObj)$s
p <- autoplot(y)
p <- autoplot(filtered[, 1] + filtered[, 2] * x, colour = 'red', linetype = 'dashed', p = p)
autoplot(smoothed[, 1] + smoothed[, 2] * x, colour = 'blue', p = p)

f:id:sinhrks:20150524153729p:plain

6. 季節要素と干渉変数のあるローカルレベルモデル (確率的レベルと確率的季節)

  • { a_t = ( u_t, \gamma_{ 1, t }, \gamma_{ 2, t }, \gamma_{ 3, t }, \lambda_t )^T }: { u_t } は確率的レベル、{ \gamma } は確率的季節、{ \lambda_t } は干渉変数の係数。

補足 テキスト記載の数式は3項の季節要素、説明変数、干渉変数を含む 6 状態が記載されている。季節要素が3項のため英国インフレーションモデルについて記載していると思われるが、このモデルには説明変数がないため状態数があわない。そのため、テキスト中の式を英国インフレーションモデルに合うよう書き換えている。

y <- ukinflation

ukpulse <- rep(0, length.out = length(ukinflation))
ukpulse[4*(1975-1950)+2] <- 1
ukpulse[4*(1979-1950)+3] <- 1
ukpulse <- ts(ukpulse, start = start(ukinflation), frequency = frequency(ukinflation))
x <- ukpulse

parm <- log(c(var(y), 0.00001, 0.00001))
buildObj <- function(parm) {
  dlm(FF = matrix(c(1, 1, 0, 0, 1), 1, 5),
      V = matrix(c(exp(parm[1])), 1, 1),
      GG = matrix(c(1,  0,  0,  0, 0,
                    0, -1, -1, -1, 0, 
                    0,  1,  0,  0, 0, 
                    0,  0,  1,  0, 0,
                    0,  0,  0,  0, 1),
                  5, 5, byrow = TRUE),
      W = matrix(c(exp(parm[2]),            0, 0, 0, 0,
                   0,            exp(parm[3]), 0, 0, 0,
                   0,                       0, 0, 0, 0,
                   0,                       0, 0, 0, 0,
                   0,                       0, 0, 0, 0),
                 5, 5, byrow = TRUE),
      m0 = c(0, 0, 0, 0, 0),
      C0 = matrix(c(1e+07,     0,     0,     0,     0,
                    0,     1e+07,     0,     0,     0,
                    0,         0, 1e+07,     0,     0, 
                    0,         0,     0, 1e+07,     0,
                    0,         0,     0,     0, 1e+07),
                  5, 5, byrow = TRUE),
      JFF = matrix(c(0, 0, 0, 0, 1), 1, 5),
      X = as.matrix(x))
}
dlmFitObj <- dlmMLE(y, parm = parm, build = buildObj)
(dlmModObj <- buildObj(dlmFitObj$par))
# $FF
#      [,1] [,2] [,3] [,4] [,5]
# [1,]    1    1    0    0    1
# 
# $V
#             [,1]
# [1,] 2.27569e-05
# 
# $GG
#      [,1] [,2] [,3] [,4] [,5]
# [1,]    1    0    0    0    0
# [2,]    0   -1   -1   -1    0
# [3,]    0    1    0    0    0
# [4,]    0    0    1    0    0
# [5,]    0    0    0    0    1
# 
# $W
#              [,1]         [,2] [,3] [,4] [,5]
# [1,] 1.783797e-05 0.000000e+00    0    0    0
# [2,] 0.000000e+00 4.310206e-07    0    0    0
# [3,] 0.000000e+00 0.000000e+00    0    0    0
# [4,] 0.000000e+00 0.000000e+00    0    0    0
# [5,] 0.000000e+00 0.000000e+00    0    0    0
# 
# $JFF
#      [,1] [,2] [,3] [,4] [,5]
# [1,]    0    0    0    0    1
# 
# $X
#      [,1]
# [1,] 0   
# [2,] 0   
# [3,] ... 
# 
# $m0
# [1] 0 0 0 0 0
# 
# $C0
#       [,1]  [,2]  [,3]  [,4]  [,5]
# [1,] 1e+07 0e+00 0e+00 0e+00 0e+00
# [2,] 0e+00 1e+07 0e+00 0e+00 0e+00
# [3,] 0e+00 0e+00 1e+07 0e+00 0e+00
# [4,] 0e+00 0e+00 0e+00 1e+07 0e+00
# [5,] 0e+00 0e+00 0e+00 0e+00 1e+07

p <- autoplot(dlmFilter(y, dlmModObj), fitted.colour = 'red', fitted.linetype = 'dashed')
autoplot(dlmSmooth(y, dlmModObj), colour = 'blue', p = p)

f:id:sinhrks:20150524155321p:plain

モデルの作り方

とはいえ、最初から上式のような形でモデルが定義できていることはないと思う。まずは簡単なモデルを dlm::dlmModPoly などで作って、それをベースに書き換えていくのが良さそう。作ったモデルに適当なパラメータを与えて初期化すれば、モデルの次元 / 中身を確認できる。

y <- ukdrivers
parm<- log(c(var(y), 0.001))
buildFun <- function(parm){
  dlmModPoly(order = 2, dV = exp(parm[1]), dW = c(exp(parm[2]), 0))
}
buildFun(parm)
# $FF
#      [,1] [,2]
# [1,]    1    0
# 
# $V
#            [,1]
# [1,] 0.02935256
# 
# $GG
#      [,1] [,2]
# [1,]    1    1
# [2,]    0    1
# 
# $W
#       [,1] [,2]
# [1,] 0.001    0
# [2,] 0.000    0
# 
# $m0
# [1] 0 0
# 
# $C0
#       [,1]  [,2]
# [1,] 1e+07 0e+00
# [2,] 0e+00 1e+07

R で 状態空間モデル: {dlm} の対数尤度計算について

状態空間モデルを扱う R パッケージである {dlm} では、モデルのパラメータ推定を最尤法で行なう。その際に使われる対数尤度の計算式はパッケージによって違いがあるようで、推定結果が自分の参照しているテキストとずれる。その違いを確かめたい。

当該のテキストはこちら。今回はこれを持っている方向けの内容。

状態空間時系列分析入門

状態空間時系列分析入門

  • 作者: J.J.F.コマンダー,S.J.クープマン,Jacques J.F. Commandeur,Sime Jan Koopman,和合肇
  • 出版社/メーカー: シーエーピー出版
  • 発売日: 2008/09
  • メディア: 単行本
  • 購入: 2人 クリック: 4回
  • この商品を含むブログを見る

参考 サイト 「状態空間時系列分析入門」をRで再現する はこのテキストを {dlm}, {KFAS} で再現しており、対数尤度の差異についても記載がある。

テキストでは単変量状態空間モデルの 時刻 { t } における観測値 { y_t } と状態 { a_t } を以下の式であらわしている。

{ y_t = z'_t a_t + \epsilon_{t} \ \ \ \ \ \ \epsilon_t \sim NID(0, \sigma^{2}_{ \epsilon } ) }

{ a_{ t+1 } = T_t a_t + R_t \eta_t \ \ \ \ \ \ \eta_t \sim NID(0, Q_t) }

1期先の状態を更新するカルマンフィルタは、

{ a_{ t+1 } = T_t a_t + K_t (y_t - z'_t a_t ) }

{ K_t = \frac{P_t}{F_t} }

  • { K_t }: カルマンゲイン
  • { P_t }: 状態推定誤差分散
  • { F_t }: カルマンフィルタでの1期先予測誤差分散

これらを用いて、対数尤度関数は式 (8.7) で以下のように定義されている。

{ \displaystyle \log L_d = - \frac{n}{2} log(2 \pi) - \frac{1}{2} \sum_{t=d+1}^n ( \log F_t +  \frac{ v_{t}^{2} }{ F_t }) }

  • { n }: 観測時系列の長さ
  • { d }: 推定する状態数
  • { v_t }: カルマンフィルタの1期先予測誤差

この式で計算される対数尤度は {dlm} のものと異なる。差異がある理由として記載する 2 点がわかったが、これらを解消してもまだ結果はあわない。

1. テキストの記載による差異

まず、テキストの各モデルに対数尤度として記載されている数値は 上式の計算結果そのものでない。P162 に添付されているプログラムに 対数尤度を系列の長さ { n } で割る処理がしれっと入っており、処理後の値が対数尤度として記載されている。そのため、対数尤度に戻すためには各モデルに記載されている数値に 系列の長さ { n } をかける必要がある。

例えば 2.1 ローカルレベルモデル (確定的レベルモデル) に対数尤度として記載されている数値 0.3297597 に、系列の長さ 192 をかけた 63.31386 が 式 (8.7) の計算結果にあたる。

2. {dlm}対数尤度計算式との差異

テキストで利用しているデータ利用して、{dlm} との差異を確かめたい。まずデータを読み込む。データは著者のサポートサイト、An Introduction to State Space Time Series Analysis から入手できる。

ukdrivers <- read.table('UKdriversKSI.txt', skip = 1)
ukdrivers <- ts(ukdrivers[[1]], start = c(1969, 1), frequency = 12)
ukdrivers <- log(ukdrivers)

y <- ukdrivers

パラメータを受け取ってローカルレベルモデルを返す関数 buildFun を定義し、dlm::dlmMLE最尤推定する。結果は リストになり、推定されたパラメータ par対数尤度 (を代替するもの/後述) value がプロパティとして含まれている。

library(dlm)
parm <- c(var(y))
buildFun <- function(x) {
  dlmModPoly(order = 1, dV = parm, dW = 0)
}
dlmFit <- dlmMLE(y, parm = parm, build = buildFun)
class(dlmFit)
# [1] "list"
names(dlmFit)
# [1] "par"         "value"       "counts"      "convergence" "message"    
dlmFit$par
# [1] 0.02935256

続けて、最尤推定したパラメータ dlmFit$par を使ってモデルを作成する。プロパティの意味は vignette の記載がわかりやすい。テキストと式/添字をあわせると

  • FF: { z'_t } に対応。
  • V: { \sigma^{2} } に対応。
  • GG: { T_t } に対応。
  • W: { R_t Q_t } に対応。
  • m0: { a_1 } の事前分布の平均。
  • C0: { a_1 } の事前分布の分散。
  • J...: 各パラメータが時間変化するとき利用。
dlmMod <- buildFun(dlmFit$par)
class(dlmMod)
# [1] "dlm"
names(dlmMod)
#  [1] "m0"  "C0"  "FF"  "V"   "GG"  "W"   "JFF" "JV"  "JGG" "JW" 
V(dlmMod)
#            [,1]
# [1,] 0.02935256

ここで、{dlm} の内部で最尤推定がどのように行なわれているのか確かめたい。 dlm::dlmMLE の定義をみると、最尤推定dlm::dlmLLoptim で最適化することで行っている。

dlmMLE
# function (y, parm, build, method = "L-BFGS-B", ..., debug = FALSE) 
# {
#     logLik <- function(parm, ...) {
#         mod <- build(parm, ...)
#         return(dlmLL(y = y, mod = mod, debug = debug))
#     }
#     out <- optim(parm, logLik, method = method, ...)
#     return(out)
# }
# <environment: namespace:dlm>

dlm::dlmLL はその名前から対数尤度のようにも見えるが、対数尤度とは符号が逆 ( optim で最適化するため、尤もらしいほど値が小さくなる )。

dlmLL(y = y, mod = buildFun(dlmFit$par))
# [1] -230.7721

補足 パラメータ推定時の dlm::dlmLL の値は dlm::dlmMLE の結果にも含まれている。

dlmFit$value
# [1] -230.7721

dlm::dlmLL の処理は少し条件分岐が多いため省略するが、上のモデルについて必要な処理は以下のようなものであることがわかる。カルマンフィルタ dlm::dlmFilter を適用した結果から予測誤差、予測誤差標準偏差を求めて 対数尤度の代替となる値を算出している。

# 予測誤差、予測誤差標準偏差を計算
residuals_dlmcompat <- function (object) {
  mod <- object$mod
  # yは観測値 (入力), a は状態の予測値
  res <- drop(object$y - object$a %*% t(mod$FF))
  f <-  function(i) {
    # 予測誤差標準偏差
    diag(crossprod(object$D.R[i, ] * t(mod$FF %*% object$U.R[[i]])) + mod$V)
  }
  SD <- drop(t(sqrt(sapply(seq(along = object$U.R), f))))
  return(list(res = res, sd = SD))
}

# dlmLLと同じ値を計算
LL_dlmcompat <- function(y, mod) {
  filtered <- dlmFilter(y, mod)
  resid <- residuals_dlmcompat(filtered) 
  # 予測誤差分散
  F <- as.numeric(resid$sd)^2
  v <- as.numeric(resid$res)
  return(0.5 * sum(log(F) + v^2/F))
}

LL_dlmcompat(y = y, mod = buildFun(dlmFit$par))
# [1] -230.7721

これを上式 (8.7) と比べると以下の差異がある。

  • 符号が逆 ( optim での処理のため )
  • 第1項目がない ( パラメータ非依存のため optim には影響ない )
  • 第2項目の総和の範囲が異なる

これをテキストに準じて書き換えると、

LL_ckbookcompat <- function(y, mod) {
  n <- length(y)
  filtered <- dlmFilter(y, mod)
  resid <- residuals_dlmcompat(filtered) 
  F <- (as.numeric(resid$sd)^2)[2:n]
  v <- as.numeric(resid$res)[2:n]
  return(-0.5 * (n * log(2 * pi) + sum(log(F) + v^2/F)))
}
LL_ckbookcompat(y = y, mod = buildFun(dlmFit$par))
# [1] 62.39492

となり、テキストの対数尤度 63.31386 に結構 近づいた (が、あわない、、)。対数尤度の計算結果はパッケージによって異なる、ということは覚えておいたほうがよさそうだ。

補足: このモデルにおいては、 {KFAS} の結果も上記修正後の結果と一致している。

library(KFAS)
kfasMod <- SSModel(y ~ SSMtrend(degree=1, Q=list(matrix(0))), H = matrix(NA))
kfasFit <- fitSSM(kfasMod, inits = parm, method = "BFGS")

kfasFit$optim.out$value
# [1] -62.39492
KFS(kfasFit$model)$logLik
# [1] 62.39492

pandas 日時まわりのリサンプリング/オフセット処理

こちらの続き。

今回のサンプルデータには自分の歩数のデータを使いたい。インスパイヤ元は以下のサイトだ。

d.hatena.ne.jp

データの読み込み

歩数データは iPhone の Health アプリから Export できる。形式は XML なので、そのままでは pandas で読み込めない。一度 XML から必要な属性を辞書のリストとして取り出した後、pandas に読み込ませる。

import pandas as pd
from xml.etree import ElementTree
tree = ElementTree.parse('export.xml')
root = tree.getroot()
# 属性の辞書のリストを作る
data = [e.attrib for e in root.findall('.//Record')]
data[0]
# {'endDate': '20150511220900+0900',
#  'recordCount': '104',
#  'source': u'xxx',
#  'startDate': '20150511210900+0900',
#  'type': 'HKQuantityTypeIdentifierStepCount',
#  'unit': 'count',
#  'value': '531'}

df = pd.DataFrame(data)
# 必要な列のみにフィルタ
df = df[['startDate', 'type', 'unit', 'value']]
df.head()
#              startDate                               type   unit value
# 0  20150511210900+0900  HKQuantityTypeIdentifierStepCount  count   531
# 1  20150512000900+0900  HKQuantityTypeIdentifierStepCount  count     6
# 2  20150512060900+0900  HKQuantityTypeIdentifierStepCount  count   629
# 3  20150512070900+0900  HKQuantityTypeIdentifierStepCount  count   312
# 4  20150512080900+0900  HKQuantityTypeIdentifierStepCount  count  1483

Series.value_counts() を使うと、ある列で出現する要素の個数を集計できる。"type" 列を集計すると、Export したデータには歩数以外のデータも含まれていることがわかる。歩数以外は不要なので削除する。

df['type'].value_counts()
# HKQuantityTypeIdentifierStepCount                 375
# HKQuantityTypeIdentifierDistanceWalkingRunning    374
# HKQuantityTypeIdentifierFlightsClimbed            216
# dtype: int64

# 歩数のみにフィルタ
df = df[df['type'] == 'HKQuantityTypeIdentifierStepCount']

また、XML で読み込んだデータはすべて文字列型になっているため、適宜型変換を行う。

# 歩数データは数値に
df['value'] = df['value'].astype(float)

# 日付は index として設定し、日付型 (DatetimeIndex) に変換
df = df.set_index('startDate')

# この時点で index は文字列型になっている
df.index 
# Index([u'20150511210900+0900', u'20150512000900+0900', u'20150512060900+0900',
#        u'20150512070900+0900', u'20150512080900+0900', u'20150512090900+0900',
#        ...
#        u'20150518165000+0900', u'20150518165900+0900', u'20150518171600+0900',
#        u'20150518181200+0900'],
#       dtype='object', name=u'startDate', length=965)

# to_datetime で日時型に変換 / タイムゾーンを表すオフセットを適宜設定
pd.to_datetime(df.index, utc=True).tz_convert('Asia/Tokyo')
# DatetimeIndex(['2015-05-11 21:09:00+09:00', '2015-05-12 00:09:00+09:00',
#                '2015-05-12 06:09:00+09:00', '2015-05-12 07:09:00+09:00',
#                ...
#                '2015-05-18 16:50:00+09:00', '2015-05-18 16:59:00+09:00',
#                '2015-05-18 17:16:00+09:00', '2015-05-18 18:12:00+09:00'],
#               dtype='datetime64[ns]', length=965, freq=None, tz='Asia/Tokyo')

# index を上書き
df.index = pd.to_datetime(df.index, utc=True).tz_convert('Asia/Tokyo')

# タイムゾーンは使わないので削除
df.index = df.index.tz_localize(None)

# 日付の昇順にソート
df = df.sort_index()

df.head()
#                                                   type   unit     value
# 2014-11-25 21:09:00  HKQuantityTypeIdentifierStepCount  count   1396.00
# 2014-11-26 21:09:00  HKQuantityTypeIdentifierStepCount  count   7020.37
# 2014-11-27 21:09:00  HKQuantityTypeIdentifierStepCount  count   6413.63
# 2014-11-28 21:09:00  HKQuantityTypeIdentifierStepCount  count   8396.00
# 2014-11-29 21:09:00  HKQuantityTypeIdentifierStepCount  count  12411.00

今の機種に買い替えたのが昨年11末なので、買い替え以降すべてのデータが入っているようだ。

部分文字列によるスライシング

pandas での一般的なデータ選択については以下の記事にまとめた。

加えて、データが 日付型の Index ( DatetimeIndex ) を持つとき、日付-like な文字列で __getitem__ すると、その期間にあてはまるデータを抽出してくれる。例えば 2015-05-17 日分のデータを抽出したければ以下のようにする。この方法を使うと 好きな期間のデータを簡単に確認することができる。詳細は公式ドキュメントを参照。

df['2015-05-17']
#                                                   type   unit     value
# 2015-05-17 08:09:00  HKQuantityTypeIdentifierStepCount  count   18.0000
# 2015-05-17 11:09:00  HKQuantityTypeIdentifierStepCount  count  180.0000
# 2015-05-17 12:09:00  HKQuantityTypeIdentifierStepCount  count  934.0000
# 2015-05-17 13:09:00  HKQuantityTypeIdentifierStepCount  count  469.0000
# ...                                                ...    ...       ...
# 2015-05-17 21:16:00  HKQuantityTypeIdentifierStepCount  count   88.5087
# 2015-05-17 21:17:00  HKQuantityTypeIdentifierStepCount  count   23.1214
# 2015-05-17 21:18:00  HKQuantityTypeIdentifierStepCount  count   99.3283
# 2015-05-17 21:19:00  HKQuantityTypeIdentifierStepCount  count   41.6284
# 
# [14 rows x 3 columns]

リサンプリング

データは日時で処理したいので、まずは 1日ごとの合計を算出したい。こういうときは DataFrame.resample。リサンプリングする期間や集約関数は引数として指定できる。

s = df.resample('D', how='sum')
#                    value
# 2014-11-25   1396.000000
# 2014-11-26   7020.370000
# 2014-11-27   6413.630000
# 2014-11-28   8396.000000
# ...                  ...
# 2015-05-15   7561.998000
# 2015-05-16  10615.000000
# 2015-05-17   9208.000000
# 2015-05-18   8085.994898
# 
# [175 rows x 1 columns]

日次に集約した結果をプロットしてみる。

s.plot()

f:id:sinhrks:20150518222638p:plain

あまり意識はしていないのだが そこそこ歩いているようだ。5月初旬はちょっと出かけていたため 歩数に異常値が出ている。

日付の標準化

日時データの適当な期間での集約は DataFrame.resample でできた。が、時には日時の補正のみを行い、集約はしたくない場合がある。

DatetimeIndex を日付ごとにまとめるのに一番簡単なのは .normalize

pd.Timestamp('2015-05-01 23:59').normalize()
# Timestamp('2015-05-01 00:00:00')
pd.Timestamp('2015-05-02 00:00').normalize()
# Timestamp('2015-05-02 00:00:00')

他の日時関連のメソッドと同じく、Timestamp, DatetimeIndex 両方で使える。

df.tail().index
# DatetimeIndex(['2015-05-18 18:55:00', '2015-05-18 18:56:00',
#                '2015-05-18 18:57:00', '2015-05-18 18:58:00',
#                '2015-05-18 18:59:00'],
#               dtype='datetime64[ns]', freq=None, tz=None)

df.tail().index.normalize()
# DatetimeIndex(['2015-05-18', '2015-05-18',
#                '2015-05-18', '2015-05-18',
#                '2015-05-18'],
#               dtype='datetime64[ns]', freq=None, tz=None)

当然、集約すれば 日付での .resample と同じ結果になる。

s = df.groupby(df.index.normalize())[['value']].sum()
s 
# 略

オフセット

また、より柔軟に日時補正を行うためにオフセットが提供されている ( 前回記事 )。オフセットを利用してデータを補正/集約する例を書きたい。

オフセットを使わずに .resample で月次平均をとると以下のような結果になる。

s.resample('M', how='mean')
#                    value
# 2014-11-30   7012.500000
# 2014-12-31   8435.741935
# 2015-01-31   9134.032258
# 2015-02-28   9323.821429
# 2015-03-31   9326.356129
# 2015-04-30   9938.533333
# 2015-05-31  10973.055439

オフセットを使って同じ処理をするには、日時を月末に補正するオフセット MonthEnd を使う。オフセットの一覧は 公式ドキュメント を参照。

m = offsets.MonthEnd()
m
# <MonthEnd>

オフセットにはいくつか共通のメソッドがあるため、順に記載する。

まず、ある日付が 当該のオフセット上に存在するか調べるには .onOffsetMonthEnd.onOffset の場合は、引数が月末の日付であるとき True になる。

m.onOffset(pd.Timestamp('2015-04-29'))
# False
m.onOffset(pd.Timestamp('2015-04-30'))
# True
m.onOffset(pd.Timestamp('2015-05-01'))
# False

オフセットを加減算することにより、日時を次の/前のオフセットへと移動できる。また、オフセットの加算と同一の処理として .apply がある。

# 日付を常に 次のオフセットへ移動させる。
pd.Timestamp('2015-04-29') + m
# Timestamp('2015-04-30 00:00:00')
pd.Timestamp('2015-04-30') + m
# Timestamp('2015-05-31 00:00:00')
pd.Timestamp('2015-05-01') + m
# Timestamp('2015-05-31 00:00:00')

# 日付を常に 前のオフセットへ移動させる。
pd.Timestamp('2015-04-29') - m
# Timestamp('2015-03-31 00:00:00')
pd.Timestamp('2015-04-30') - m
# Timestamp('2015-03-31 00:00:00')
pd.Timestamp('2015-05-01') - m
# Timestamp('2015-04-30 00:00:00')

# 日付を常に 次のオフセットへ移動させる。
m.apply(pd.Timestamp('2015-04-29'))
# Timestamp('2015-04-30 00:00:00')
m.apply(pd.Timestamp('2015-04-30'))
# Timestamp('2015-05-31 00:00:00')
m.apply(pd.Timestamp('2015-05-01'))
# Timestamp('2015-05-31 00:00:00')

オフセットに乗っている日時は動かしたくなければ .rollforward / .rollback というメソッドを使う。

# 日付がオフセットに乗っていない場合 次のオフセットへ移動させる。
m.rollforward(pd.Timestamp('2015-04-29'))
# Timestamp('2015-04-30 00:00:00')
m.rollforward(pd.Timestamp('2015-04-30'))
# Timestamp('2015-04-30 00:00:00')
m.rollforward(pd.Timestamp('2015-05-01'))
# Timestamp('2015-05-31 00:00:00')

# 日付がオフセットに乗っていない場合 前のオフセットへ移動させる。
m.rollback(pd.Timestamp('2015-04-29'))
# Timestamp('2015-03-31 00:00:00')
m.rollback(pd.Timestamp('2015-04-30'))
# Timestamp('2015-04-30 00:00:00')
m.rollback(pd.Timestamp('2015-05-01'))
# Timestamp('2015-04-30 00:00:00')

よって、月次平均の算出をオフセットのメソッドを使って行う場合は以下のようになる。

# 例示のため一部データをスライス
s['2014-11']
#                value
# 2014-11-25   1396.00
# 2014-11-26   7020.37
# 2014-11-27   6413.63
# 2014-11-28   8396.00
# 2014-11-29  12411.00
# 2014-11-30   6438.00

# 日時を月末に補正
s['2014-11'].index.map(m.rollforward)
# array([Timestamp('2014-11-30 00:00:00'), Timestamp('2014-11-30 00:00:00'),
#        Timestamp('2014-11-30 00:00:00'), Timestamp('2014-11-30 00:00:00'),
#        Timestamp('2014-11-30 00:00:00'), Timestamp('2014-11-30 00:00:00')], dtype=object)

s.groupby(s.index.map(m.rollforward)).mean()
#                    value
# 2014-11-30   7012.500000
# 2014-12-31   8435.741935
# 2015-01-31   9134.032258
# 2015-02-28   9323.821429
# 2015-03-31   9326.356129
# 2015-04-30   9938.533333
# 2015-05-31  10973.055439

補足 オフセットは .resample の引数として渡すこともでき、同じ結果になる。

s.resample(m, how='mean')
# 略

オフセットを活用した集計

つづけて、カレンダー上の平日 / 休日での差異をみたい。アクセサ ( こちらの記事参照 ) を使って曜日で集計してから処理してもよいが、 BusinessDay オフセットを使えば簡単。BusinessDay.onOffset を使えば平日が True / 休日を False として集計できる。以下の結果をみると、休日の方が歩いている傾向があるようだ。

bday = offsets.BusinessDay()
s.groupby(bday.onOffset).mean()
#               value
# False  10860.216800
# True    8716.657583

BusinessDay オフセットは 土日のみを休日として扱うが、任意の祝日も含めて休日として扱いたい場合は CustomBusinessDay オフセットが使える。拙作のパッケージ で 日本の祝日のカレンダーを定義しているので、それを使って、

import japandas as jpd
calendar = jpd.JapaneseHolidayCalendar()
cday = pd.offsets.CDay(calendar=calendar)

s.groupby(cday.onOffset).mean()
#               value
# False  10799.033448
# True    8600.419640

これまでの内容を使って、月ごとに 平日 / 土日 / 祝日の歩数平均をクロス集計したい。クロス集計を行うには pd.pivot_table

avg = pd.pivot_table(s, index=s.index.map(m.rollforward),
                     columns=[s.index.map(bday.onOffset), s.index.map(cday.onOffset)],
                     values='value', aggfunc='mean')
avg
#                    False    True              
#                    False    False        True 
# 2014-11-30   9424.500000      NaN  5806.500000
# 2014-12-31  10382.275000  11704.0  7579.354545
# 2015-01-31   9534.193333  10162.5  8851.113000
# 2015-02-28  10943.125000   9446.0  8635.578947
# 2015-03-31  11148.877778      NaN  8580.779091
# 2015-04-30  11349.625000   7110.0  9535.666667
# 2015-05-31  12769.000000  11582.7  9572.544211

左から、

  • 土日: BusinessDay CustomBusinessDay ともに False
  • 祝日: BusinessDayTrue, CustomBusinessDayFalse
  • 平日: BusinessDay CustomBusinessDay ともに True

となる。ということで上の結果からも 5月の休日は結構歩いたなってことがわかった。

まとめ

日付を index として持つデータに対する リサンプリング/オフセット処理をまとめた。これらは以下のような使い分けをすればよいと思う。

  • 比較的単純な集約はリサンプリング
  • リサンプリングではできない より細かい補正やクロス集計をしたい場合はオフセット

Pythonによるデータ分析入門 ―NumPy、pandasを使ったデータ処理

Pythonによるデータ分析入門 ―NumPy、pandasを使ったデータ処理

pandas 0.16.0/0.16.1 の主要な新機能

先日 5/11 に pandas 0.16.1 がリリースされた。前バージョンである 0.16.0 とあわせて、主要な変更点である以下3点の概要をまとめたい。各見出しの括弧内には対応したバージョンを記載した。

  • 簡単な列追加 / DataFrame.assign (0.16.0)
  • 文字列処理の強化 (0.16.0/0.16.1)
  • ランダムサンプリング DataFrame.sample (0.16.1)

変更点全体はリリースノートを参照。上記3点以外にも便利な変更はあるのだが、 Categorical や Frequency など 元機能の説明がないとわからない箇所なので別途、、。

簡単な列追加 / DataFrame.assign (0.16.0)

DataFrame への列追加をより簡潔に行うためのメソッドとして、DataFrame.assign が追加された。R {dplyr}mutate 関数に似た書式で複数列の追加ができる。

これまで列追加の際には カラムを指定して代入し、元データを破壊的に変更する必要があった。

import pandas as pd
df = pd.DataFrame({'A':[1, 2, 3], 'B':[4, 5, 6]})
df
#    A  B
# 0  1  4
# 1  2  5
# 2  3  6

# これまでの列追加
df['C'] = [7, 8, 9]
df
#    A  B  C
# 0  1  4  7
# 1  2  5  8
# 2  3  6  9

assign を使うと、作成したい列名をキーワード引数として以下のように書ける。また、戻り値は元データに列追加したコピーとなり、元データ自体は変更されない。

# D列, E列を追加
df.assign(D=[10, 11, 12], E=[13, 14, 15])
#    A  B  C   D   E
# 0  1  4  7  10  13
# 1  2  5  8  11  14
# 2  3  6  9  12  15

# 元データはそのまま
df
#    A  B  C
# 0  1  4  7
# 1  2  5  8
# 2  3  6  9

キーワード引数の値には通常の列作成時と同じ型が渡せる。各列の値を組み合わせた列 + ダミー列を同時に作りたければ、

df.assign(AB=df['A'] * df['B'], AC=df['A'] + df['C'], dummy=1)
#    A  B  C  AB  AC  dummy
# 0  1  4  7   4   8      1
# 1  2  5  8  10  10      1
# 2  3  6  9  18  12      1

キーワード引数として渡せない列名、例えば記号や日本語を含む列名は、一度 辞書にしてから渡せばよい。

keys = {'A*B': df['A'] * df['B'], 'A+B':df['A'] + df['B']}
df.assign(**keys)
#    A  B  A*B  A+B
# 0  1  4    4    5
# 1  2  5   10    7
# 2  3  6   18    9

補足: メソッド自体は 0.16.0 で追加され、0.16.1 でキーワード引数の処理順序がアルファベット順に固定された。

文字列処理の強化 (0.16.0/0.16.1)

pandas での文字列処理について過去に以下の記事を書いたことがあるのだが、当時はアクセサから利用できるメソッドが限定されていて、少し手間がかかるところがあった。

0.16.0, 0.16.1 で、.str アクセサに以下のメソッド群が追加された。それぞれ、 Python 標準の文字列メソッドと同一の処理を Series 内の値に対して適用するもの。

Methods
isalnum isalpha isdigit isspace islower
isupper istitle isnumeric isdecimal find
rfind ljust rjust zfill capitalize
swapcase normalize partition rpartition index
rindex translate

補足 str.normalizeユニコード正規化 ( unicodedata.normalize ) を値に対して適用するもの。

補足 .str アクセサから利用可能なメソッド全体はこちら

df = pd.DataFrame({'A': ['xxx', '3', 'yyy'], 'B': [1, 2, 3]})
df
#      A  B
# 0  xxx  1
# 1    3  2
# 2  yyy  3

# 先頭を大文字に
df['A'].str.capitalize()
# 0    Xxx
# 1      3
# 2    Yyy
# Name: A, dtype: object

# 文字列が数値がどうかを調べる
df['A'].str.isdigit()
# 0    False
# 1     True
# 2    False
# Name: A, dtype: bool

# 5桁分を 0 パディング
df['A'].str.zfill(5)
# 0    00xxx
# 1    00003
# 2    00yyy
# Name: A, dtype: object

やりたいことに応じて適当に組み合わせると、かなり柔軟な処理がかける。例えば 数値の文字列のみを 0 パディングしたければ、

df.loc[df['A'].str.isdigit(), 'A'] = df['A'].str.zfill(5)
df
#        A  B
# 0    xxx  1
# 1  00003  2
# 2    yyy  3

補足 DataFrame.loc の意味はこちらの記事参照。

また、0.16.1 から .str アクセサを Index からも呼び出せるようになった。これまでは 列名/行名に対する文字列処理は .map を使って関数適用する必要があったが、.str を使えば以下のように書ける。

df
#        A  B
# 0    xxx  1
# 1  00003  2
# 2    yyy  3

df.columns.str.lower()
# Index([u'a', u'b'], dtype='object')

# 列名を小文字に変更
df.columns = df.columns.str.lower()
df
#        a  b
# 0    xxx  1
# 1  00003  2
# 2    yyy  3

str.split のように複数の値を返しうる処理については、expand オプションを利用して 返り値の型を制御できる。互換性維持のため、既定値はメソッドにより異なる。APIドキュメントを参照。

  • expand=False: 返り値の次元を増やさない = 返り値は Series もしくは Index
  • expand=True: 返り値の次元を増やす = 返り値は DataFrame もしくは MultiIndex

補足 これまで同様の制御をおこなっていた return_type オプションは deprecate されており、将来のバージョンで削除される。

s = pd.Series(['a,b', 'a,c', 'b,c'])

# 返り値は Series
s.str.split(',')
# 0    [a, b]
# 1    [a, c]
# 2    [b, c]
# dtype: object

# 返り値は DataFrame
s.str.split(',', expand=True)
#    0  1
# 0  a  b
# 1  a  c
# 2  b  c

idx = pd.Index(['a,b', 'a,c', 'b,c'])

# 返り値は 1 レベルの Index
idx.str.split(',')
# Index([[u'a', u'b'], [u'a', u'c'], [u'b', u'c']], dtype='object')

# 返り値は 2 レベルの MultiIndex
idx.str.split(',', expand=True)
# MultiIndex(levels=[[u'a', u'b'], [u'b', u'c']],
#            labels=[[0, 0, 1], [0, 1, 1]])

ランダムサンプリング DataFrame.sample (0.16.1)

Series, DataFrame, Panel から適当なデータをサンプリングするためのメソッドとして、.sample が追加された。

ここでは DataFrame.sample を例としてその処理を記載する。まず以下のようなデータを用意した。

df = pd.DataFrame({'A': [1, 2 ,3, 4, 5], 'B': ['a', 'b', 'c', 'd', 'e']})
df
#    A  B
# 0  1  a
# 1  2  b
# 2  3  c
# 3  4  d
# 4  5  e

DataFrame.sample は 既定では 1 行をランダムにサンプリングする。

  • n: サンプルサイズを指定する。既定は 1。
  • axis: サンプリングするレコードの方向を指定する。0 (既定) で行のサンプリング、1 で列のサンプリング。
# 1 行の抽出
df.sample()
#    A  B
# 3  4  d

# 3 行の抽出
df.sample(3)
#    A  B
# 0  1  a
# 4  5  e
# 2  3  c

# 1 列の抽出
df.sample(axis=1)
#    A
# 0  1
# 1  2
# 2  3
# 3  4
# 4  5

また、サンプルサイズでなく抽出比を指定する場合は frac を指定する。

df.sample(frac=0.4)
#    A  B
# 3  4  d
# 1  2  b

最後に、replace オプションを利用して 抽出方法を変更することができる。

  • replace=False (既定): 非復元抽出。サンプリングされた各要素の重複を許さない。
  • replace=True: 復元抽出。各要素の重複を許す。
# 復元抽出。2行目、4行目が重複
df.sample(n=4, replace=True)
#    A  B
# 1  2  b
# 2  3  c
# 0  1  a
# 2  3  c

# 非復元抽出では 元のデータ数以上のサンプルサイズは取れない
df.sample(n=6)
# ValueError: Cannot take a larger sample than population when 'replace=False'

df.sample(n=6, replace=True)
#    A  B
# 1  2  b
# 4  5  e
# 0  1  a
# 1  2  b
# 2  3  c
# 4  5  e

補足 これまでは index の要素をサンプリングして選択する必要があった。以下の記事参照。

まとめ

0.16.0 / 0.16.1 の変更点のうち、以下 3 点の概要をまとめた。

  • 簡単な列追加 / DataFrame.assign (0.16.0)
  • 文字列処理の強化 (0.16.0/0.16.1)
  • ランダムサンプリング DataFrame.sample (0.16.1)

他にも、バグ修正 / 挙動・仕様の統一など 様々な改善が入っているので使ってみてください。

Pythonによるデータ分析入門 ―NumPy、pandasを使ったデータ処理

Pythonによるデータ分析入門 ―NumPy、pandasを使ったデータ処理

簡単な集約/変換処理を PySpark & pandas の DataFrame で行う

こちらの続き。

sinhrks.hatenablog.com

準備

サンプルデータは iris 。今回は HDFScsv を置き、そこから読み取って DataFrame を作成する。

# HDFS にディレクトリを作成しファイルを置く
$ hadoop fs -mkdir /data/
$ hadoop fs -put iris.csv /data/
$ hadoop fs -ls /
Found 1 items
drwxr-xr-x   - ec2-user supergroup          0 2015-04-28 20:01 /data

# Spark のパスに移動
$ echo $SPARK_HOME
/usr/local/spark
$ cd $SPARK_HOME
$ pwd
/usr/local/spark
$ bin/pyspark

補足 前回同様に pandas から直接 PySparkDataFrame を作成した場合、groupBy 時に java.lang.OutOfMemoryError: Java heap space エラーが発生してシェルごと落ちる。

CSV ファイルの読み込み

pandas では前回同様 read_csv

import numpy as np
import pandas as pd
# 表示する行数を設定
pd.options.display.max_rows=10

names = ['SepalLength', 'SepalWidth', 'PetalLength', 'PetalWidth', 'Species']

# pandas
pdf = pd.read_csv('~/iris.csv', header=None, names=names)
pdf 
# 略

PySpark は標準では csv から直接 DataFrame を作成できないため、一度 Row のリストを作成して DataFrame に変換する。

from pyspark.sql import Row
lines = sc.textFile("hdfs://127.0.0.1:9000/data/iris.csv")
cells = lines.map(lambda l: l.split(","))
rows = cells.map(lambda x: Row(SepalLength=float(x[0]), SepalWidth=float(x[1]),
                               PetalLength=float(x[2]), PetalWidth=float(x[3]),
                               Species=x[4]))
sdf = sqlContext.createDataFrame(rows)
sdf.show()
# 略

グルーピング/集約

ある列の値ごとに集計

pandas, PySpark で多少 文法は異なる。

列の値でグループ分けし、一列の合計を取得する場合:
# pandas 
pdf.groupby('Species')['SepalLength'].sum()
# Species
# setosa        250.3
# versicolor    296.8
# virginica     329.4
# Name: SepalLength, dtype: float64

# PySpark
sdf.groupBy('Species').sum('SepalLength').show()
# Species    SUM(SepalLength)  
# virginica  329.3999999999999 
# versicolor 296.8             
# setosa     250.29999999999998
指定した複数列の合計を取得する場合:
# pandas
pdf.groupby('Species')[['PetalWidth', 'PetalLength']].sum()
#             PetalWidth  PetalLength
# Species                            
# setosa            12.2         73.2
# versicolor        66.3        213.0
# virginica        101.3        277.6

# PySpark
sdf.groupBy('Species').sum('PetalWidth', 'PetalLength').show()
# Species    SUM(PetalWidth)    SUM(PetalLength)  
# virginica  101.29999999999998 277.59999999999997
# versicolor 66.30000000000001  213.0             
# setosa     12.199999999999996 73.2  
全列の合計を取得する場合:
# pandas
pdf.groupby('Species').sum()
#             SepalLength  SepalWidth  PetalLength  PetalWidth
# Species                                                     
# setosa            250.3       170.9         73.2        12.2
# versicolor        296.8       138.5        213.0        66.3
# virginica         329.4       148.7        277.6       101.3

# PySpark
sdf.groupBy('Species').sum().show()
# Species    SUM(PetalLength)   SUM(PetalWidth)    SUM(SepalLength)   SUM(SepalWidth)   
# virginica  277.59999999999997 101.29999999999998 329.3999999999999  148.7             
# versicolor 213.0              66.30000000000001  296.8              138.5             
# setosa     73.2               12.199999999999996 250.29999999999998 170.90000000000003

補足 pandas では グループ化したデータも DataFrame と同じようにスライシングできたりする。 一方、PySparkGroupedData は集約系のAPI しか持っていない。

# pandas
pdf.groupby('Species')['PetalWidth']
# <pandas.core.groupby.SeriesGroupBy object at 0x7f62f4218d50>

# PySpark (NG!)
sdf.groupBy('Species')[['Species']]
# TypeError: 'GroupedData' object has no attribute '__getitem__'
sdf.groupBy('Species').select('PetalWidth')
# AttributeError: 'GroupedData' object has no attribute 'select'

また、pandas では apply で自作の集約関数 (UDAF) を利用することができるが、PySpark 1.3.1 時点 では非対応らしい。PySparkudf を利用して定義した自作関数を集約時に使うと以下のエラーになる。

# pandas
pdf.groupby('Species')[['PetalWidth', 'PetalLength']].apply(np.sum)
#             PetalWidth  PetalLength
# Species                            
# setosa            12.2         73.2
# versicolor        66.3        213.0
# virginica        101.3        277.6

# PySpark (NG!)
import pyspark.sql.functions
np_sum = pyspark.sql.functions.udf(np.sum, pyspark.sql.types.FloatType())
sdf.groupBy('Species').agg(np_sum(sdf.PetalWidth))
# py4j.protocol.Py4JJavaError: An error occurred while calling o334.agg.
# : org.apache.spark.sql.AnalysisException: expression 'pythonUDF' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() if you don't care which value you get.;

行持ち / 列持ち変換

複数列持ちの値を行持ちに展開 (unpivot / melt)

pandas では pd.meltDataFrame.melt ではないので注意。

# pandas
pmelted = pd.melt(pdf, id_vars=['Species'], var_name='variable', value_name='value')
pmelted
#        Species     variable  value
# 0       setosa  SepalLength    5.1
# 1       setosa  SepalLength    4.9
# 2       setosa  SepalLength    4.7
# 3       setosa  SepalLength    4.6
# 4       setosa  SepalLength    5.0
# ..         ...          ...    ...
# 595  virginica   PetalWidth    2.3
# 596  virginica   PetalWidth    1.9
# 597  virginica   PetalWidth    2.0
# 598  virginica   PetalWidth    2.3
# 599  virginica   PetalWidth    1.8
# 
# [600 rows x 3 columns]

同様の処理を PySpark でやるには、DataFrame.flatMap。1行の入力に対して複数行 (この例では4行) のデータを返すことができる。fratMap の返り値は RDD インスタンスになるため、必要なら再度 DataFrame 化する。

# PySpark

def mapper(row):
    return [Row(Species=row[4], variable='PetalLength', value=row[0]),
            Row(Species=row[4], variable='PetalWidth', value=row[1]),
            Row(Species=row[4], variable='SepalLength', value=row[2]),
            Row(Species=row[4], variable='SepalWidth', value=row[3])] 

smelted = sqlContext.createDataFrame(sdf.flatMap(mapper))
smelted.show()
# Species value variable   
# setosa  1.4   PetalLength
# setosa  0.2   PetalWidth 
# setosa  5.1   SepalLength
# setosa  3.5   SepalWidth 
# ...     ..    ...
# setosa  1.4   PetalLength
# setosa  0.2   PetalWidth 
# setosa  5.0   SepalLength
# setosa  3.6   SepalWidth 
smelted.count()
# 600L

複数行持ちの値を列持ちに変換 (pivot)

pandas では DataFrame.pivot。pivotするデータは列にする値 (以下では Species ) と行にする値 (以下では variable ) の組がユニークになっている必要がある。そのため、まず pivot 用データを作成 -> その後 pivot する。

# pandas

# pivot 用データを作成
punpivot = pmelted.groupby(['Species', 'variable']).sum()
punpivot = punpivot.reset_index()
punpivot
#        Species     variable  value
# 0       setosa  PetalLength   73.2
# 1       setosa   PetalWidth   12.2
# 2       setosa  SepalLength  250.3
# 3       setosa   SepalWidth  170.9
# 4   versicolor  PetalLength  213.0
# ..         ...          ...    ...
# 7   versicolor   SepalWidth  138.5
# 8    virginica  PetalLength  277.6
# 9    virginica   PetalWidth  101.3
# 10   virginica  SepalLength  329.4
# 11   virginica   SepalWidth  148.7
# 
# [12 rows x 3 columns]

# pivot
punpivot.pivot(index='variable', columns='Species', values='value')
# Species      setosa  versicolor  virginica
# variable                                  
# PetalLength    73.2       213.0      277.6
# PetalWidth     12.2        66.3      101.3
# SepalLength   250.3       296.8      329.4
# SepalWidth    170.9       138.5      148.7

PySparkDataFrame のままでは同じ処理はできないようなので、一度 RDD に変換してから、 groupBy -> map

# PySpark

# pivot 用データを作成 
sunpivot = smelted.groupBy('Species', 'variable').sum()
sunpivot.show()
# Species    variable    SUM(value)        
# versicolor SepalWidth  138.5             
# versicolor SepalLength 296.8             
# setosa     PetalLength 73.2              
# virginica  PetalWidth  101.29999999999998
# versicolor PetalWidth  66.30000000000001 
# setosa     SepalWidth  170.90000000000003
# virginica  PetalLength 277.59999999999997
# setosa     SepalLength 250.29999999999998
# versicolor PetalLength 213.0             
# setosa     PetalWidth  12.199999999999996
# virginica  SepalWidth  148.7             
# virginica  SepalLength 329.3999999999999 
 
def reducer(obj):
    # variable : value の辞書を作成
    result = {o[1]:o[2] for o in obj[1]}
    return Row(Species=obj[0], **result)

# pivot
spivot = sunpivot.rdd.groupBy(lambda x: x[0]).map(reducer)
spivot.collect()
# [Row(PetalLength=277.59999999999997, PetalWidth=101.29999999999998, SepalLength=329.3999999999999, SepalWidth=148.7, Species=u'virginica'),
#  Row(PetalLength=73.2, PetalWidth=12.199999999999996, SepalLength=250.29999999999998, SepalWidth=170.90000000000003, Species=u'setosa'),
#  Row(PetalLength=213.0, PetalWidth=66.30000000000001, SepalLength=296.8, SepalWidth=138.5, Species=u'versicolor')]

sqlContext.createDataFrame(spivot).show()
# PetalLength        PetalWidth         SepalLength        SepalWidth         Species   
# 277.59999999999997 101.29999999999998 329.3999999999999  148.7              virginica 
# 73.2               12.199999999999996 250.29999999999998 170.90000000000003 setosa    
# 213.0              66.30000000000001  296.8              138.5              versicolor

列の分割 / 結合

列の値を複数列に分割

ある列の値を適当に文字列処理して、新しい列を作成したい。pandas には 文字列処理用のアクセサがあるため、 assign と組み合わせて以下のように書ける。

# pandas
psplitted = pmelted.assign(Parts=pmelted['variable'].str.slice(0, 5),
                           Scale=pmelted['variable'].str.slice(5))
psplitted
#        Species     variable  value  Parts   Scale
# 0       setosa  SepalLength    5.1  Sepal  Length
# 1       setosa  SepalLength    4.9  Sepal  Length
# 2       setosa  SepalLength    4.7  Sepal  Length
# 3       setosa  SepalLength    4.6  Sepal  Length
# 4       setosa  SepalLength    5.0  Sepal  Length
# ..         ...          ...    ...    ...     ...
# 595  virginica   PetalWidth    2.3  Petal   Width
# 596  virginica   PetalWidth    1.9  Petal   Width
# 597  virginica   PetalWidth    2.0  Petal   Width
# 598  virginica   PetalWidth    2.3  Petal   Width
# 599  virginica   PetalWidth    1.8  Petal   Width
# 
# [600 rows x 5 columns]

PySparkには上記のようなメソッドはないので map で処理する。

# PySpark
def splitter(row):
    parts = row[2][:5]
    scale = row[2][5:]
    return Row(Species=row[0], value=row[1], Parts=parts, Scale=scale)

ssplitted = sqlContext.createDataFrame(smelted.map(splitter))
ssplitted.show()
# Parts Scale  Species value
# Petal Length setosa  1.4  
# Petal Width  setosa  0.2  
# Sepal Length setosa  5.1  
# Sepal Width  setosa  3.5  
# Petal Length setosa  1.4  
# ..    ..     ...     ..
# Petal Length setosa  1.4  
# Petal Width  setosa  0.2  
# Sepal Length setosa  5.0  
# Sepal Width  setosa  3.6  

複数列の値を一列に結合

pandas では普通に文字列結合すればよい。

# pandas
psplitted['variable2'] = psplitted['Parts'] + psplitted['Scale']
psplitted
#        Species     variable  value  Parts   Scale    variable2
# 0       setosa  SepalLength    5.1  Sepal  Length  SepalLength
# 1       setosa  SepalLength    4.9  Sepal  Length  SepalLength
# 2       setosa  SepalLength    4.7  Sepal  Length  SepalLength
# 3       setosa  SepalLength    4.6  Sepal  Length  SepalLength
# 4       setosa  SepalLength    5.0  Sepal  Length  SepalLength
# ..         ...          ...    ...    ...     ...          ...
# 595  virginica   PetalWidth    2.3  Petal   Width   PetalWidth
# 596  virginica   PetalWidth    1.9  Petal   Width   PetalWidth
# 597  virginica   PetalWidth    2.0  Petal   Width   PetalWidth
# 598  virginica   PetalWidth    2.3  Petal   Width   PetalWidth
# 599  virginica   PetalWidth    1.8  Petal   Width   PetalWidth
# 
# [600 rows x 6 columns]

PySpark では map

# PySpark
def unite(row):
    return Row(Species=row[2], value=row[3], variable=row[0] + row[1])

sqlContext.createDataFrame(splitted.map(unite)).show()

# Species value variable   
# setosa  1.4   PetalLength
# setosa  0.2   PetalWidth 
# setosa  5.1   SepalLength
# setosa  3.5   SepalWidth 
# ..      ..    ..
# setosa  1.4   PetalLength
# setosa  0.2   PetalWidth 
# setosa  5.0   SepalLength
# setosa  3.6   SepalWidth 

補足 withColumn の場合、オペレータは 数値の演算として扱われてしまうようなのでここでは使えない。

# PySpark (NG!)
ssplitted.withColumn('variable', splitted.Parts + splitted.Scale).show()
# Parts Scale  Species value variable
# Petal Length setosa  1.4   null    
# Petal Width  setosa  0.2   null    
# ..    ..     ..      ..    ..

まとめ

PySparkpandas のデータ集約/変形処理を整理した。

データ分析用途で利用したい場合、(ごく当たり前だが) データ量が少なく手元でさっといろいろ試したい場合は pandas、データ量が比較的多く 単純な処理を全体にかけたい場合は Spark がよい。

Spark は map 系の API が充実するとさらに使いやすくなりそうだ。が、小回りの効く文法/機能が充実していくことは考えにくいので 完全に Spark だけでデータ分析をする、、という状態には将来もならないのではないかと思う。小さいデータは pandas 使いましょう。

Learning Spark: Lightning-Fast Big Data Analysis

Learning Spark: Lightning-Fast Big Data Analysis

Python pandas 関連エントリの目次

このブログ中の pandas 関連のエントリをまとめた目次です。

最近 pandas 開発チーム と PyData グループ の末席に加えていただき、パッケージ自体の改善にもより力を入れたいと思います。使い方についてご質問などありましたら Twitter で @ ください。

目次につけた絵文字は以下のような意味です。

  • 🔰: 最初に知っておけば一通りの操作ができそうな感じのもの。
  • 🚧: v0.16.0 時点で少し情報が古く、機能の改善を反映する必要があるもの。
  • 🚫: 当該の機能が deprecate 扱いとなり、将来的に 代替の方法が必要になるもの。

基本

また、上記に対応した比較エントリ:

機能別

データ選択

グルーピング/集約/集計/データ変形

そのほかの前処理

データ操作

文字列、日付など、各データ型に固有の操作。

入出力

可視化

その他

変更点

自作パッケージ

発表資料

Pythonによるデータ分析入門 ―NumPy、pandasを使ったデータ処理

Pythonによるデータ分析入門 ―NumPy、pandasを使ったデータ処理

簡単なデータ操作を PySpark & pandas の DataFrame で行う

Spark v1.3.0 で追加された DataFrame 、結構いいらしいという話は聞いていたのだが 自分で試すことなく時間が過ぎてしまっていた。ようやく PySpark を少し触れたので pandas との比較をまとめておきたい。内容に誤りや よりよい方法があればご指摘 下さい。

過去に基本的なデータ操作について 以下 ふたつの記事を書いたことがあるので、同じ処理のPySpark 版を加えたい。今回は ひとつめの "簡単なデータ操作〜" に相当する内容。

pandas 版

準備

環境は EC2 に作る。Spark のインストールについてはそのへんに情報あるので省略。サンプルデータは iriscsv でダウンロードしてホームディレクトリにおいた。以降の操作は PySpark のコンソールで行う。

# Spark のパスに移動
$ echo $SPARK_HOME
/usr/local/spark
$ cd $SPARK_HOME
$ pwd
/usr/local/spark

# 既定だとログの出力が邪魔なので抑制
$ cp conf/log4j.properties.template conf/log4j.properties
$ vi conf/log4j.properties
# INFO をすべて WARN に変える

$ bin/pyspark
# Welcome to
#       ____              __
#      / __/__  ___ _____/ /__
#     _\ \/ _ \/ _ `/ __/  '_/
#    /__ / .__/\_,_/_/ /_/\_\   version 1.3.1
#       /_/
# 
# Using Python version 2.7.9 (default, Apr  1 2015 19:28:03)
# SparkContext available as sc, HiveContext available as sqlContext.

ここでは HDFS は使わず、pandasread_csvpandas.DataFrame を作成する。列名に "." が入っていると PySpark でうまく動かないようだったため以下のカラム名にした。

import pandas as pd
# 表示する行数を設定
pd.options.display.max_rows=10

names = ['SepalLength', 'SepalWidth', 'PetalLength', 'PetalWidth', 'Species']

# pandas
pdf = pd.read_csv('~/iris.csv', header=None, names=names)
pdf
#      SepalLength  SepalWidth  PetalLength  PetalWidth    Species
# 0            5.1         3.5          1.4         0.2     setosa
# 1            4.9         3.0          1.4         0.2     setosa
# 2            4.7         3.2          1.3         0.2     setosa
# 3            4.6         3.1          1.5         0.2     setosa
# 4            5.0         3.6          1.4         0.2     setosa
# ..           ...         ...          ...         ...        ...
# 145          6.7         3.0          5.2         2.3  virginica
# 146          6.3         2.5          5.0         1.9  virginica
# 147          6.5         3.0          5.2         2.0  virginica
# 148          6.2         3.4          5.4         2.3  virginica
# 149          5.9         3.0          5.1         1.8  virginica
# 
# [150 rows x 5 columns]

これを PySparkDataFrame に変換する。pandasPySpark の基本的な違いとして、

  • PySpark には pandas.Index にあたるものがない
  • PySpark には pandas.Series にあたるものがない ( 代わり? に Row があるが、どの程度 柔軟な操作ができるかは未知 )
# PySpark
sdf = sqlContext.createDataFrame(pdf)

sdf
# DataFrame[SepalLength: double, SepalWidth: double, PetalLength: double,
#           PetalWidth: double, Species: string]

# データの中身を表示するには DataFrame.show()
sdf.show()
# SepalLength SepalWidth PetalLength PetalWidth Species
# 5.1         3.5        1.4         0.2        setosa 
# 4.9         3.0        1.4         0.2        setosa 
# 4.7         3.2        1.3         0.2        setosa 
# 4.6         3.1        1.5         0.2        setosa 
# ..          ...        ...         ...        ...
# 5.4         3.9        1.3         0.4        setosa 
# 5.1         3.5        1.4         0.3        setosa 
# 5.7         3.8        1.7         0.3        setosa 
# 5.1         3.8        1.5         0.3        setosa 

以降、pdfpandassdfPySparkDataFrame をあらわす。

type(pdf)
# <class 'pandas.core.frame.DataFrame'>
type(sdf)
# <class 'pyspark.sql.dataframe.DataFrame'> 

基本

まず基本的な操作を。先頭いくつかのデータを確認するには headPySpark での返り値は Row インスタンスのリストになる。

# pandas
pdf.head(5)
#    SepalLength  SepalWidth  PetalLength  PetalWidth Species
# 0          5.1         3.5          1.4         0.2  setosa
# 1          4.9         3.0          1.4         0.2  setosa
# 2          4.7         3.2          1.3         0.2  setosa
# 3          4.6         3.1          1.5         0.2  setosa
# 4          5.0         3.6          1.4         0.2  setosa

# PySpark
sdf.head(5)
# [Row(SepalLength=5.1, SepalWidth=3.5, PetalLength=1.4, PetalWidth=0.2, Species=u'setosa'),
#  Row(SepalLength=4.9, SepalWidth=3.0, PetalLength=1.4, PetalWidth=0.2, Species=u'setosa'),
#  Row(SepalLength=4.7, SepalWidth=3.2, PetalLength=1.3, PetalWidth=0.2, Species=u'setosa'),
#  Row(SepalLength=4.6, SepalWidth=3.1, PetalLength=1.5, PetalWidth=0.2, Species=u'setosa'),
#  Row(SepalLength=5.0, SepalWidth=3.6, PetalLength=1.4, PetalWidth=0.2, Species=u'setosa')]

# pandas
type(pdf.head(5))
# <class 'pandas.core.frame.DataFrame'>

# PySpark
type(sdf.head(5))
# <type 'list'>

各カラムのデータ型の確認には dtypes

# pandas
pdf.dtypes
# SepalLength    float64
# SepalWidth     float64
# PetalLength    float64
# PetalWidth     float64
# Species         object
# dtype: object

# PySpark
sdf.dtypes
# [('SepalLength', 'double'), ('SepalWidth', 'double'),
#  ('PetalLength', 'double'), ('PetalWidth', 'double'),
#  ('Species', 'string')]

要約統計量の確認は describe

# pandas
pdf.describe()
#        SepalLength  SepalWidth  PetalLength  PetalWidth
# count   150.000000  150.000000   150.000000  150.000000
# mean      5.843333    3.054000     3.758667    1.198667
# std       0.828066    0.433594     1.764420    0.763161
# min       4.300000    2.000000     1.000000    0.100000
# 25%       5.100000    2.800000     1.600000    0.300000
# 50%       5.800000    3.000000     4.350000    1.300000
# 75%       6.400000    3.300000     5.100000    1.800000
# max       7.900000    4.400000     6.900000    2.500000

# PySpark
sdf.describe().show()
# summary SepalLength        SepalWidth         PetalLength        PetalWidth        
# count   150                150                150                150               
# mean    5.843333333333334  3.0540000000000003 3.758666666666666  1.1986666666666668
# stddev  0.8253012917851317 0.4321465800705415 1.7585291834055206 0.760612618588172 
# min     4.3                2.0                1.0                0.1               
# max     7.9                4.4                6.9                2.5     

列操作

列名操作

参照と変更。PySpark ではプロパティへの代入は不可。

# pandas
pdf.columns
# Index([u'SepalLength', u'SepalWidth', u'PetalLength', u'PetalWidth', u'Species'], dtype='object')

# PySpark
sdf.columns
# [u'SepalLength', u'SepalWidth', u'PetalLength', u'PetalWidth', u'Species']

# pandas
pdf.columns = [1, 2, 3, 4, 5]
pdf.columns
# Int64Index([1, 2, 3, 4, 5], dtype='int64')

# PySpark
sdf.columns = [1, 2, 3, 4, 5]
# AttributeError: can't set attribute

マッピングによって列名を変更するには、pandas では DataFrame.rename

# pandas
pdf.columns
# Int64Index([1, 2, 3, 4, 5], dtype='int64')

pdf = pdf.rename(columns={1:'SepalLength', 2:'SepalWidth', 3:'PetalLength', 4:'PetalWidth', 5:'Species'})
pdf
#      SepalLength  SepalWidth  PetalLength  PetalWidth    Species
# 0            5.1         3.5          1.4         0.2     setosa
# 1            4.9         3.0          1.4         0.2     setosa
# 2            4.7         3.2          1.3         0.2     setosa
# 3            4.6         3.1          1.5         0.2     setosa
# 4            5.0         3.6          1.4         0.2     setosa
# ..           ...         ...          ...         ...        ...
# 145          6.7         3.0          5.2         2.3  virginica
# 146          6.3         2.5          5.0         1.9  virginica
# 147          6.5         3.0          5.2         2.0  virginica
# 148          6.2         3.4          5.4         2.3  virginica
# 149          5.9         3.0          5.1         1.8  virginica
# 
# [150 rows x 5 columns]

PySpark では DataFrame.withColumnRenamed

# PySpark
sdf.withColumnRenamed('Species', 'xxx')
# DataFrame[SepalLength: double, SepalWidth: double, PetalLength: double,
#           PetalWidth: double, xxx: string]

補足 非破壊的な処理のため、反映には代入が必要。ここでは列名変更したくないので代入しない。

列名による列選択

pandas.DataFrame の列選択では 当該の列のデータを含む Series が返ってくる。

# pandas
pdf.Species
# 0         setosa
# 1         setosa
# 2         setosa
# 3         setosa
# 4         setosa
#          ...    
# 145    virginica
# 146    virginica
# 147    virginica
# 148    virginica
# 149    virginica
# Name: Species, dtype: object

pdf['Species']
# 略

PySpark では、列の属性を表現する Column インスタンスが返ってくる。

# PySpark
sdf.Species
# Column<Species>
sdf['Species']
# Column<Species>

pandasPySpark いずれも、文字列ではなくリストを渡せば その列を DataFrame としてスライシングする。

# pandas
pdf[['PetalWidth']]
#      PetalWidth
# 0           0.2
# 1           0.2
# 2           0.2
# 3           0.2
# 4           0.2
# ..          ...
# 145         2.3
# 146         1.9
# 147         2.0
# 148         2.3
# 149         1.8
# 
# [150 rows x 1 columns]

# PySpark
sdf[['PetalWidth']]
# DataFrame[PetalWidth: double]

# pandas
pdf[['PetalWidth', 'PetalLength']]
#      PetalWidth  PetalLength
# 0           0.2          1.4
# 1           0.2          1.4
# 2           0.2          1.3
# 3           0.2          1.5
# 4           0.2          1.4
# ..          ...          ...
# 145         2.3          5.2
# 146         1.9          5.0
# 147         2.0          5.2
# 148         2.3          5.4
# 149         1.8          5.1
# 
# [150 rows x 2 columns]

# PySpark
sdf[['PetalWidth', 'PetalLength']]
# DataFrame[PetalWidth: double, PetalLength: double]

PySpark では DataFrame.select でもよい。

# PySpark
sdf.select(sdf['PetalWidth'], sdf['PetalLength'])
# DataFrame[PetalWidth: double, PetalLength: double]
真偽値リストによる列選択

自分はあまり使わないのだが、R {dplyr} との比較という観点で。

indexer = [False, False, True, True, False]

# pandas
pdf.loc[:, indexer]
#      PetalLength  PetalWidth
# 0            1.4         0.2
# 1            1.4         0.2
# 2            1.3         0.2
# 3            1.5         0.2
# 4            1.4         0.2
# ..           ...         ...
# 145          5.2         2.3
# 146          5.0         1.9
# 147          5.2         2.0
# 148          5.4         2.3
# 149          5.1         1.8
# 
# [150 rows x 2 columns]

# PySpark
sdf[[c for c, i in zip(sdf.columns, indexer) if i is True]]
# DataFrame[PetalLength: double, PetalWidth: double]
列の属性による列選択

列の型が pandas の場合は floatPySpark の場合は double の列のみ取り出す。

# pandas
pdf.loc[:, pdf.dtypes == float]
#      SepalLength  SepalWidth  PetalLength  PetalWidth
# 0            5.1         3.5          1.4         0.2
# 1            4.9         3.0          1.4         0.2
# 2            4.7         3.2          1.3         0.2
# 3            4.6         3.1          1.5         0.2
# 4            5.0         3.6          1.4         0.2
# ..           ...         ...          ...         ...
# 145          6.7         3.0          5.2         2.3
# 146          6.3         2.5          5.0         1.9
# 147          6.5         3.0          5.2         2.0
# 148          6.2         3.4          5.4         2.3
# 149          5.9         3.0          5.1         1.8
# 
# [150 rows x 4 columns]

# PySpark
sdf[[c for c, type in sdf.dtypes if type == 'double']]
# DataFrame[SepalLength: double, SepalWidth: double, PetalLength: double,
#           PetalWidth: double]

行操作

値の条件による行選択
# pandas
pdf[pdf['Species'] == 'virginica']
#      SepalLength  SepalWidth  PetalLength  PetalWidth    Species
# 100          6.3         3.3          6.0         2.5  virginica
# 101          5.8         2.7          5.1         1.9  virginica
# 102          7.1         3.0          5.9         2.1  virginica
# 103          6.3         2.9          5.6         1.8  virginica
# 104          6.5         3.0          5.8         2.2  virginica
# ..           ...         ...          ...         ...        ...
# 145          6.7         3.0          5.2         2.3  virginica
# 146          6.3         2.5          5.0         1.9  virginica
# 147          6.5         3.0          5.2         2.0  virginica
# 148          6.2         3.4          5.4         2.3  virginica
# 149          5.9         3.0          5.1         1.8  virginica
# 
# [50 rows x 5 columns]

# PySpark
sdf[sdf['Species'] == 'virginica'].show()
# SepalLength SepalWidth PetalLength PetalWidth Species  
# 6.3         3.3        6.0         2.5        virginica
# 5.8         2.7        5.1         1.9        virginica
# 7.1         3.0        5.9         2.1        virginica
# 6.3         2.9        5.6         1.8        virginica
# ..          ...        ...         ...        ...
# 6.5         3.0        5.5         1.8        virginica
# 7.7         3.8        6.7         2.2        virginica
# 7.7         2.6        6.9         2.3        virginica
# 6.0         2.2        5.0         1.5        virginica

PySpark では DataFrame.filter でもよい。

# PySpark
sdf.filter(sdf['Species'] == 'virginica').show() 
# 略
行番号による行選択

PySpark では 冒頭の n 行を抽出したりはできるが、適当な行選択ではできない、、、と思う。PySpark でどうしてもやりたければ index にあたる列を追加 -> 列の値で選択。

4/27追記 DataFrame.collectRow のリストに変換 -> 再度 DataFrame 化すればスキーマ変更せずにできる、、、が、マスタでデータを処理することになるのでよい方法ではなさそう。

# pandas
pdf.loc[[2, 3, 4]]
#    SepalLength  SepalWidth  PetalLength  PetalWidth Species
# 2          4.7         3.2          1.3         0.2  setosa
# 3          4.6         3.1          1.5         0.2  setosa
# 4          5.0         3.6          1.4         0.2  setosa

# PySpark
[c for i, c in enumerate(sdf.collect()) if i in (2, 3 ,4)]
# [Row(SepalLength=4.7, SepalWidth=3.2, PetalLength=1.3, PetalWidth=0.2, Species=u'setosa'),
#  Row(SepalLength=4.6, SepalWidth=3.1, PetalLength=1.5, PetalWidth=0.2, Species=u'setosa'),
#  Row(SepalLength=5.0, SepalWidth=3.6, PetalLength=1.4, PetalWidth=0.2, Species=u'setosa')]
sqlContext.createDataFrame([c for i, c in enumerate(sdf.collect()) if i in (2, 3 ,4)]).show()
# SepalLength SepalWidth PetalLength PetalWidth Species
# 4.7         3.2        1.3         0.2        setosa 
# 4.6         3.1        1.5         0.2        setosa 
# 5.0         3.6        1.4         0.2        setosa 
ランダムサンプリング

pandas ではindex をサンプリングしてスライス。

import random

# pandas
pdf.loc[random.sample(pdf.index, 5)]
#      Sepal.Length  Sepal.Width  Petal.Length  Petal.Width     Species
# 64            6.1          2.9           4.7          1.4  versicolor
# 17            5.4          3.9           1.3          0.4      setosa
# 14            4.3          3.0           1.1          0.1      setosa
# 4             4.6          3.1           1.5          0.2      setosa
# 146           6.7          3.0           5.2          2.3   virginica

PySpark では DataFrame.sample。この関数、fraction の確率で行ごとに選択しているようで、サンプリングするたびに抽出される行数が変わる ( 行数指定でランダムサンプリング、という処理はできなさそう )。

# PySpark
sdf.sample(False, 5.0 / sdf.count()).show()
# SepalLength SepalWidth PetalLength PetalWidth Species   
# 5.4         3.4        1.7         0.2        setosa    
# 5.7         2.8        4.5         1.3        versicolor
# 6.2         2.8        4.8         1.8        virginica 
# 6.1         3.0        4.9         1.8        virginica 

補足 今後、pandas にもサンプリングメソッド DataFrame.sample が実装予定 #9666

列の追加

pandas では DataFrame に対して 直接 新しい列を追加できるが、PySpark ではできない。

# 破壊的な処理になるのでコピー
pdf_temp = pdf.copy()

# pandas
pdf_temp['PetalMult'] = pdf_temp['PetalWidth'] * pdf_temp['PetalLength']
pdf_temp
#      SepalLength  SepalWidth  PetalLength  PetalWidth    Species  PetalMult
# 0            5.1         3.5          1.4         0.2     setosa       0.28
# 1            4.9         3.0          1.4         0.2     setosa       0.28
# 2            4.7         3.2          1.3         0.2     setosa       0.26
# 3            4.6         3.1          1.5         0.2     setosa       0.30
# 4            5.0         3.6          1.4         0.2     setosa       0.28
# ..           ...         ...          ...         ...        ...        ...
# 145          6.7         3.0          5.2         2.3  virginica      11.96
# 146          6.3         2.5          5.0         1.9  virginica       9.50
# 147          6.5         3.0          5.2         2.0  virginica      10.40
# 148          6.2         3.4          5.4         2.3  virginica      12.42
# 149          5.9         3.0          5.1         1.8  virginica       9.18
# 
# [150 rows x 6 columns]

# PySpark
sdf['PetalMult'] = sdf['PetalWidth'] * sdf['PetalLength']
# TypeError: 'DataFrame' object does not support item assignment

PySpark での列追加は DataFrame.withColumn

# PySpark
sdf.withColumn('PetalMult', sdf.PetalWidth * sdf.PetalLength).show()
# SepalLength SepalWidth PetalLength PetalWidth Species PetalMult          
# 5.1         3.5        1.4         0.2        setosa  0.27999999999999997
# 4.9         3.0        1.4         0.2        setosa  0.27999999999999997
# 4.7         3.2        1.3         0.2        setosa  0.26               
# 4.6         3.1        1.5         0.2        setosa  0.30000000000000004
# ..          ..         ...         ...        ...     ...  
# 5.4         3.9        1.3         0.4        setosa  0.52               
# 5.1         3.5        1.4         0.3        setosa  0.42               
# 5.7         3.8        1.7         0.3        setosa  0.51               
# 5.1         3.8        1.5         0.3        setosa  0.44999999999999996

補足 pandas v0.16.0 以降では 類似のメソッド DataFrame.assign が追加。

# pandas
pdf.assign(PetalMult=pdf['PetalWidth'] * pdf['PetalLength'])
# 略

まとめ

PySparkpandas のデータ操作を整理した。

PySpark、上のような基本的な処理は pandas と似たやり方で直感的に使える感じだ。 大規模処理は PySpark、細かい取り回しが必要なものは pandas でうまく併用できるとよさそう。

4/29追記 続きはこちら。

sinhrks.hatenablog.com

Learning Spark: Lightning-Fast Big Data Analysis

Learning Spark: Lightning-Fast Big Data Analysis