Python spyre によるデータ分析結果のWebアプリ化
R を使っている方はご存知だと思うが、R には {Shiny}
というパッケージがあり、データ分析の結果を インタラクティブな Web アプリとして共有することができる。{Shiny}
って何?という方には こちらの説明がわかりやすい。
Python でも {Shiny}
のようなお手軽可視化フレームワークがあるといいよね、とたびたび言われていたのだが、spyre
という なんかそれっぽいパッケージがあったので触ってみたい。
インストール
pip で。
pip install dataspyre
使い方
現時点で ドキュメンテーションはない ので、README と examples ディレクトリを見る。サンプルとして株価を取得してプロットするWebアプリを作ってみたい。spyre
で Webアプリを作る手順は以下の3つ。
spyre.server.App
を継承したクラスを作る。- 描画をコントロール/指示するクラス変数を指定する。
- 描画を行うメソッド
getData
,getPlot
を書く。メソッド名は描画内容 (output_type) ごとに固定。
最初は examples のものを書き換えながら作るのが楽。各クラス変数/メソッドは相互に関連するため、プログラム全体を示した上で必要と思われる箇所にコメントを入れた。
補足 株価の取得には以下のパッケージを使う。
#!/usr/bin/env python # -*- coding: utf-8 -*- from spyre import server import pandas as pd pd.options.display.mpl_style = 'default' # あらかじめデータを取得しておく # 終値のみを取得し、一つのDataFrameに結合 import japandas as jpd toyota = jpd.DataReader(7203, 'yahoojp', start='2015-01-01')[[u'終値']] toyota.columns = [u'トヨタ'] honda = jpd.DataReader(7267, 'yahoojp', start='2015-01-01')[[u'終値']] honda.columns = [u'ホンダ'] df = toyota.join(honda) # spyre.server.App を継承したクラスを作る class StockExample(server.App): title = u"株価のプロット" # 左側のペインに表示する UI 要素を辞書のリストで指定 # ここではドロップダウン一つだけを表示 inputs = [{"input_type":'dropdown', # ドロップダウン自体の表示ラベル "label": 'Frequency', # ドロップダウンの選択項目を指定 # label はドロップダウン項目の表示ラベル # value は各項目が選択された時にプログラム中で利用される値 "options" : [ {"label": "月次", "value":"M"}, {"label": "週次", "value":"W"}, {"label": "日次", "value":"B"}], # 各 UI 要素の入力は各描画メソッド (getData, getPlot) に # 辞書型の引数 params として渡される # その辞書から値を取り出す際のキー名 "variable_name": 'freq', "action_id": "update_data" }] # 画面を更新する設定 controls = [{"control_type" : "hidden", "label" : "update", "control_id" : "update_data"}] # 描画するタブの表示ラベルを文字列のリストで指定 tabs = [u"トヨタ", u"ホンダ", u"データ"] # tabs で指定したそれぞれのタブに描画する内容を辞書のリストで指定 outputs = [{"output_type" : "plot", # matplotlib のプロットを描画する "output_id" : "toyota", # 描画するタブに固有の id "control_id" : "update_data", "tab" : u"トヨタ", # 描画するタブの表示ラベル (tabs に含まれるもの) "on_page_load" : True }, {"output_type" : "plot", "output_id" : "honda", "control_id" : "update_data", "tab" : u"ホンダ", "on_page_load" : True }, {"output_type" : "table", # DataFrameを描画する "output_id" : "table_id", "control_id" : "update_data", "tab" : u"データ", "on_page_load" : True }] def getData(self, params): """ output_type="table" を指定したタブを描画する際に呼ばれるメソッド DataFrameを返すこと params は UI 要素の入力 + いくつかのメタデータを含む辞書 UI 要素の入力は inputs で指定した variable_name をキーとして行う """ # ドロップダウンの値を取得 # 値にはユーザの選択によって、options -> value で指定された M, W, B いずれかが入る freq = params['freq'] # freq でグループ化し平均をとる tmp = df.groupby(pd.TimeGrouper(freq)).mean() return tmp def getPlot(self, params): """ output_type="plot" を指定したタブを描画する際に呼ばれるメソッド matplotlib.Figureを返すこと """ tmp = self.getData(params) # 同じ output_type で複数のタブを描画したい場合は、 params に含まれる # output_id で分岐させる # output_id は タブの表示ラベルではなく、outputs 中で指定した output_id if params['output_id'] == 'toyota': ax = tmp[[u'トヨタ']].plot(legend=False) return ax.get_figure() elif params['output_id'] == 'honda': ax = tmp[[u'ホンダ']].plot(legend=False) return ax.get_figure() else: raise ValueError app = StockExample() # port 9093 で Webサーバ + アプリを起動 app.launch(port=9093)
実行後、ブラウザで http://127.0.0.1:9093 を開くと以下のような画面が表示される。inputs
で指定した項目が左側のメニューに、 tabs
で指定した項目がタブとして表示されている。
ドロップダウンやタブの選択を切り替えると、動的にグラフやデータが更新されることがわかる。
DataFrame
の描画はちょっとおかしく、index
である日時自体が描画されずにその名前 ("日付") だけが表示されている。これは後日 issue あげて直そうかと思う。
06/13追記 index
を描画しないのは spyre
の仕様で、そのとき名前が残ってしまうのは pandas
のバグです。v0.17.0で修正予定。
まとめ
お手軽可視化フレームワーク spyre
を試してみた。現行の {Shiny}
と比べると 機能/UI とも十分ではないが、いくつかのデータ / プロットをインタラクティブに共有する用途であれば十分使えそうだ。
使いごこちの印象として、はじめて {Shiny}
を触ったあの日の気持ちを思い出したような気がする、、。
R で 状態空間モデル: {dlm} の最尤推定を可視化する
{dlm}
において、状態空間モデルが最尤推定される過程がみたい。以下内容の補足的なエントリ。
「状態空間時系列分析入門」では 引き続き 第8章 に相当。
- 作者: J.J.F.コマンダー,S.J.クープマン,Jacques J.F. Commandeur,Sime Jan Koopman,和合肇
- 出版社/メーカー: シーエーピー出版
- 発売日: 2008/09
- メディア: 単行本
- 購入: 2人 クリック: 4回
- この商品を含むブログを見る
データの準備
データは著者のサポートサイト、An Introduction to State Space Time Series Analysis から入手できる。
library(dlm) # install_github('sinhrks/ggfortify') library(ggfortify) ukdrivers <- read.table('UKdriversKSI.txt', skip = 1) ukdrivers <- ts(ukdrivers[[1]], start = c(1969, 1), frequency = 12) ukdrivers <- log(ukdrivers) y <- ukdrivers
モデルの作成と状態推定値のプロット
サンプルとして、第3章3節 確率的レベルと確定的傾きをもつローカル線形トレンドモデル を作成する。
parm <- log(c(var(y), 0.001)) buildFun <- function(parm){ dlmModPoly(order = 2, dV = exp(parm[1]), dW = c(exp(parm[2]), 0)) } dlmFit <- dlmMLE(y, parm = parm, build = buildFun) # カルマンフィルタ dlmFiltered <- dlmFilter(y, buildFun(dlmFit$par)) # 平滑化 dlmSmoothed <- dlmSmooth(y, buildFun(dlmFit$par))
第8章ではモデルの推定値として 3 つの値が出てくるため、その差異がわかるようにプロットする。用語は状態空間時系列分析入門のものを利用。
p <- autoplot(y) p <- autoplot(dlmFiltered$a[, 1], colour = 'red', linetype = 'dashed', p = p) p <- autoplot(dlmFiltered$m[, 1], colour = 'red', p = p) p <- autoplot(dlmSmoothed, colour = 'blue', p = p) p + ylim(c(6.8, 8))
- 黒実線 (
y
): 観測時系列 - 赤破線 (
dlmFiltered$a
): カルマンフィルタの1期先予測 (予測ステップでの予測値) - 赤実線 (
dlmFiltered$m
): カルマンフィルタのフィルタ化状態 (更新ステップでの予測値) - 青実線 (
dlmSmoothed
): 平滑化状態
一部期間を拡大するとそれぞれの差異がわかる。これらのうち、最尤推定時の尤度計算式中で直接 利用されるのは "1期先予測" 時の予測誤差 (赤破線) とその標準偏差。
p + xlim(as.Date('1975', '%Y'), as.Date('1977', '%Y')) + ylim(c(6.8, 8))
補足 上のモデルでは FF = [1, 0]
のため、観測値の推定値 == 状態推定値。
dlmFiltered$mod$FF # [,1] [,2] # [1,] 1 0 all((dlmFiltered$a %*% t(dlmFiltered$mod$FF)) == dlmFiltered$a[, 1]) # [1] TRUE
補足 カルマンフィルタの動きについては以前こんなエントリを書いた。2つ目のエントリにカルマンフィルタの予測 / フィルタ化状態がどういった感じで動いているか記載がある (平滑化は記載ない)。
予測誤差と予測誤差分散
最尤推定の尤度計算式は先のエントリに記載。計算に必要な (1期先) 予測誤差と予測誤差標準偏差は dlm:::residuals.dlmFiltered
で求めることができる。
# 既定では標準化されるため、 type = 'raw' を指定して実値をとる resid <- residuals(dlmFiltered, type = 'raw') # 予測誤差 resid$res[1:12] # [1] 7.430707083 -3.827523556 0.111504168 -0.021616238 0.225665269 -0.044983932 # [7] 0.045295381 0.061233329 -0.021088564 0.049603994 0.270853650 0.007138991 # 予測誤差標準偏差 resid$sd[1:12] # [1] 4472.1359566 2236.0679824 0.1922632 0.1596384 0.1484746 0.1429560 # [7] 0.1396797 0.1375109 0.1359691 0.1348166 0.1339224 0.1332084
ふつうに計算して求めるなら、dlm::dlmFiltered
インスタンスの各プロパティを利用して、
# 予測誤差 (y - (matrix(as.numeric(dlmFiltered$a), 192, 2) %*% t(dlmFiltered$mod$FF)))[1:12] # [1] 7.430707083 -3.827523556 0.111504168 -0.021616238 0.225665269 -0.044983932 # [7] 0.045295381 0.061233329 -0.021088564 0.049603994 0.270853650 0.007138991 # もしくは (y - dlmFiltered$f)[1:12] # [1] 7.430707083 -3.827523556 0.111504168 -0.021616238 0.225665269 -0.044983932 # [7] 0.045295381 0.061233329 -0.021088564 0.049603994 0.270853650 0.007138991 # 予測誤差標準偏差 f <- function(i) { FF <- dlmFiltered$mod$FF V <- dlmFiltered$mod$V diag(crossprod(dlmFiltered$D.R[i, ] * t(FF %*% dlmFiltered$U.R[[i]])) + V) } drop(t(sqrt(sapply(seq(along = dlmFiltered$U.R), f))))[1:12] # [1] 4472.1359566 2236.0679824 0.1922632 0.1596384 0.1484746 0.1429560 # [7] 0.1396797 0.1375109 0.1359691 0.1348166 0.1339224 0.1332084
最尤推定の可視化
最尤推定時のパラメータの変化を記録するため、dlm::dlmMLE
を書き換えた関数を定義する。LL_ckbookcompat
はテキスト記載の対数尤度 (に近いもの) を計算するための関数 ( 詳細はこちら )。
# optim 1 ステップごとに作成されたモデルを保存 models <<- list() dlmMLE_wrapper <- function (y, parm, build, method = "L-BFGS-B", ..., debug = FALSE) { logLik <- function(parm, ...) { mod <- build(parm, ...) # グローバル変数に追加 models[[length(models) + 1]] <<- mod return(dlmLL(y = y, mod = mod, debug = debug)) } return(optim(parm, logLik, method = method, ...)) } # 推定するパラメータ数が 2 のため、総和をとる範囲を変更 LL_ckbookcompat <- function(y, mod) { n <- length(y) filtered <- dlmFilter(y, mod) resid <- residuals_dlmcompat(filtered) F <- (as.numeric(resid$sd)^2)[3:n] v <- as.numeric(resid$res)[3:n] return(-0.5 * (n * log(2 * pi) + sum(log(F) + (v^2/F)))) }
続けて、アニメーション用の処理を定義して実行。
library(animation) models <<- list() dlmMLE_wrapper(y, parm = parm, build = buildFun) saveGIF({ for (mod in models[1:65]) { filtered <- dlmFilter(y, mod) p <- autoplot(filtered, fitted.colour = 'red', fitted.linetype = 'dashed') p <- autoplot(dlmSmooth(y, mod), colour = 'blue', p = p) label <- paste('CKbook LL:', round(LL_ckbookcompat(y, mod), 2), 'dlmLL:', round(dlmLL(y, mod), 2)) p <- p + annotate('text', label = label, x = as.Date('1980', '%Y'), y = 7.8, size = 6) print(p) Sys.sleep(0.01) } }, interval = 0.3, movie.name = "dlm.gif", ani.width = 500, ani.height = 300)
最尤推定時に各状態推定値がどのように変化していくかをみる。左側 "CKbookLL" はテキスト記載の対数尤度計算式 LL_ckbookcompat
の値。右側 "dlmLL" は dlm::dlmLL
の値 ( optim
で最適化される値 )。
テキストに記載された数値より、収束時には左側の値が 0.6247935 x 観測時系列の長さ (192) = 119.9604 程度になるはずだ。
補足 {dlm}
は既定では optim
の際に method = 'L-BFGS-B'
を利用する。可視化目的であれば Nelder-Mead 法なんかを使ったほうが 素人目には "最適化らしい" 動きになる気がする。
models <<- list() dlmMLE_wrapper(y, parm = parm, build = buildFun, method = 'Nelder-Mead') # 以降略
R で 状態空間モデル: 状態空間時系列分析入門を {rstan} で再現したい
前の記事でもリンクさせていただいているが、サイト 「状態空間時系列分析入門」をRで再現する では以下のテキストを {dlm}
, {KFAS}
で再現されており非常にありがたい。これらのパッケージの使い方については リンク先を読めば困らない感じだ。
自分も勉強のために似たことやりたい、、でも同じことやるのもなあ、、と考えた結果 同テキストの内容 {rstan}
を使ってやってみた。
補足 Stan
には状態空間表現用の関数 gaussian_dlm_obs
( 利用例 ) があるのだが、自分は使ったことがない。7章までのモデルは全て漸化式で表現されているため、それらを Stan
のモデルとして記述した。
- 作者: J.J.F.コマンダー,S.J.クープマン,Jacques J.F. Commandeur,Sime Jan Koopman,和合肇
- 出版社/メーカー: シーエーピー出版
- 発売日: 2008/09
- メディア: 単行本
- 購入: 2人 クリック: 4回
- この商品を含むブログを見る
github リポジトリ
各スクリプト / モデルは GitHub にあげた。現在 7 章までのモデルと kick する R スクリプト, markdown ファイルを置いている。
モデルの一覧
各章のモデルについて、その実行結果を RPubs にあげている。各モデルの概要とリンクを以下に記載する。自分のモデリング能力不足のため いくつか課題があると考えており、特に怪しいものには * をつけた。
- 第1章 はじめに
- 第2章 ローカル・レベル・モデル
- 第3章 ローカル線形トレンド・モデル
- 第4章 季節要素のあるローカル・レベル・モデル
- 第5章 説明変数のあるローカル・レベル・モデル
- 第6章 干渉変数のあるローカル・レベル・モデル
- 第7章 英国シートベルト法とインフレーション・モデル
かんたんな説明
必要パッケージ
まず、実行には以下のCRAN非公開パッケージが必要である。後者はどうでもよいが {pforeach}
はいろいろ捗るのでインストールしていない方はただちに入れるべきだ。
hoxo-m/pforeach
:Stan
並列化に利用。sinhrks/ggfortify
: 結果のプロットに利用。
データの読み込み
データは著者のサポートサイト、An Introduction to State Space Time Series Analysis から入手できる。各データは適宜 ts
インスタンス化しておく。
Stan
の実行
Stan
の呼び出しは全て以下のように並列化して行う。
model_file <- 'モデルを記述したファイルのパス' # モデルのコンパイルを実行 stan_fit <- stan(file = model_file, chains = 0) # Stan を並列実行する。実行結果に sflist2stanfit を適用し、stanfit インスタンスを得る fit <- pforeach(i = 1:4, .final = sflist2stanfit)({ stan(fit = stan_fit, data = standata, chains = 1, seed = i) })
また、モデルが収束したかどうかは 全ての Rhat
が 1.1 未満となっているかどうかで確認する。そのままでは収束しない / しにくいモデルには、初期値 / 事前分布 / 制約を入れて収束させた。
is.converged <- function(stanfit) { summarized <- summary(stanfit) all(summarized$summary[, 'Rhat'] < 1.1) } stopifnot(is.converged(fit))
また、テキストに最尤推定の結果が記載されている場合は、以下のようにして近い値になっているかを確認した。値がずれる場合は stopifnot
を外し、差異を出力する。
# is.almost.fitted の定義は各リンク先を参照 stopifnot(is.almost.fitted(mu[[1]], 7.4157))
結果のプロット
テキストのグラフを再現する形で行う。使っているパッケージの説明はこちら。
- RPubs - Plotting Time Series with ggplot2 and ggfortify
- RPubs - Plotting State Space Time Series with ggplot2 and ggfortify
課題
以下の2つについてはもう少し Stan
に習熟できたら直したい。
複数の確率的状態をもつモデルをうまく収束させたい
このようなモデルは 収束しにくく、また 最尤推定した状態推定値 ( テキスト、{dlm}
、{KFAS}
の結果 ) とずれやすい。特に 現状 制約を入れて収束させているモデルについて、事前分布 / 初期値 / 尤度関数の範囲でなんとかしたい。
モデルの一覧で * をつけたものがこれにあたる。
テキストの尤度関数を再現したい
テキストに記載の式 (概要は以下記事参照) に揃えようとすると、自分の記述では収束しにくくなってしまう。
R で 状態空間モデル: {dlm} で単変量モデルの状態空間表現を利用する
こちらの続きで、状態空間時系列分析入門の第8章の内容。
- 作者: J.J.F.コマンダー,S.J.クープマン,Jacques J.F. Commandeur,Sime Jan Koopman,和合肇
- 出版社/メーカー: シーエーピー出版
- 発売日: 2008/09
- メディア: 単行本
- 購入: 2人 クリック: 4回
- この商品を含むブログを見る
状態空間モデルでは、トレンド、季節効果、説明変数効果などさまざまな要因を考慮することができる。また これらの要因を単一の状態空間表現で統一的に扱える。時刻 における観測値 と状態 は以下の式で書ける (前回と同じ)。
この式であらわされたモデルを {dlm}
で利用する際にどうすればよいのかをまとめたい。
vignette にあるとおり、{dlm}
ではモデルを dlm::dlmModPoly
などのモデル記述関数の組み合わせで定義できる。これらの関数は便利だが、上の式との関係は外側からはよくわからない。
上式から {dlm}
のモデルを作成する際には dlm::dlm
を直接使うのがよさそうだ。dlm::dlm
の引数は上式にほぼ対応しているため、式表現されたモデルをそのまま実装できる。また、モデル記述関数も内部的には dlm::dlm
を使っているため、これら関数の理解にも役立つ。dlm::dlm
の引数と上式の対応は前回書いたとおり。
FF
: に対応。V
: に対応。GG
: に対応。W
: に対応。m0
: の事前分布の平均。C0
: の事前分布の分散。J...
: 各パラメータが時間変化するとき利用。
参考 サイト 「状態空間時系列分析入門」をRで再現する はこのテキストを {dlm}
, {KFAS}
で再現している。こちらは dlm::dlm
ではなく dlm::dlmModPoly
などの関数を使ってモデル作成している。
準備
まずデータを読み込む。データは著者のサポートサイト、An Introduction to State Space Time Series Analysis から入手できる。
library(dlm) # install_github('sinhrks/ggfortify') library(ggfortify) ukdrivers <- read.table('UKdriversKSI.txt', skip = 1) ukdrivers <- ts(ukdrivers[[1]], start = c(1969, 1), frequency = 12) ukdrivers <- log(ukdrivers)
以降、テキストに記載のモデル順に dlm::dlm
から利用する場合のプログラムを列挙する。値 / 推定するパラメータを変更したい場合は buildObj
内の対応する要素を適当に修正すればよい。
1. ローカルレベルモデル (確定的レベル)
それぞれのモデルについて 状態 として何が考慮されているかを書いておく。
- : は確定的レベル (経時変化しない)。
補足 それぞれのモデルの式表現を書こうとしたのだが、はてなマークダウン記法だと TeX 記法中 の "&" がうまく処理できないようなので諦めた。詳細はテキストと見比べてください。
y <- ukdrivers parm<- log(var(y)) buildObj <- function(parm) { dlm(FF = matrix(1, 1, 1), V = matrix(exp(parm), 1, 1), GG = matrix(1, 1, 1), W = matrix(0, 1, 1), m0 = 0, C0 = matrix(1e+07, 1, 1)) } dlmFitObj <- dlmMLE(y, parm = parm, build = buildObj) (dlmModObj <- buildObj(dlmFitObj$par)) # $FF # [,1] # [1,] 1 # # $V # [,1] # [1,] 0.02935256 # # $GG # [,1] # [1,] 1 # # $W # [,1] # [1,] 0 # # $m0 # [1] 0 # # $C0 # [,1] # [1,] 1e+07 p <- autoplot(dlmFilter(y, dlmModObj), fitted.colour = 'red', fitted.linetype = 'dashed') autoplot(dlmSmooth(y, dlmModObj), colour = 'blue', p = p)
赤破線がカルマンフィルタ、青破線が平滑化した値。以降もプロットするが、確率的状態を使うと 実データにほぼフィットするため、グラフからでは差がよくわからない。
2. ローカル線形トレンドモデル (確率的レベルと確率的傾き)
- : は確率的レベル (経時変化する)、 は確率的傾き(経時変化する)。
parm <- log(c(var(y), 0.001, 0.001)) buildObj <- function(parm) { dlm(FF = matrix(c(1, 0), 1, 2), V = matrix(exp(parm[1]), 1, 1), GG = matrix(c(1, 1, 0, 1), 2, 2, byrow = TRUE), W = matrix(c(exp(parm[2]), 0, 0, exp(parm[3])), 2, 2, byrow = TRUE), m0 = c(0, 0), C0 = matrix(c(1e+07, 0, 0, 1e+07), 2, 2, byrow = TRUE)) } dlmFitObj <- dlmMLE(y, parm = parm, build = buildObj) (dlmModObj <- buildObj(dlmFitObj$par)) # $FF # [,1] [,2] # [1,] 1 0 # # $V # [,1] # [1,] 0.002118549 # # $GG # [,1] [,2] # [1,] 1 1 # [2,] 0 1 # # $W # [,1] [,2] # [1,] 0.01212741 0.00000e+00 # [2,] 0.00000000 1.92431e-10 # # $m0 # [1] 0 0 # # $C0 # [,1] [,2] # [1,] 1e+07 0e+00 # [2,] 0e+00 1e+07 p <- autoplot(dlmFilter(y, dlmModObj), fitted.colour = 'red', fitted.linetype = 'dashed') autoplot(dlmSmooth(y, dlmModObj), colour = 'blue', p = p)
3. 季節要素のあるローカルレベルモデル (確率的レベルと確率的季節)
- : は確率的レベル、 は 四半期単位の確率的季節のダミー項。
ukinflation <- read.table('UKinflation.txt', skip = 1) ukinflation <- ts(ukinflation[[1]], start = c(1950, 1), frequency = 4) y <- ukinflation parm <- log(c(var(y), 0.00001, 0.00001)) buildObj <- function(parm) { dlm(FF = matrix(c(1, 1, 0, 0), 1, 4), V = matrix(c(exp(parm[1])), 1, 1), GG = matrix(c(1, 0, 0, 0, 0, -1, -1, -1, 0, 1, 0, 0, 0, 0, 1, 0), 4, 4, byrow = TRUE), W = matrix(c(exp(parm[2]), 0, 0, 0, 0, exp(parm[3]), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0), 4, 4, byrow = TRUE), m0 = c(0, 0, 0, 0), C0 = matrix(c(1e+07, 0, 0, 0, 0, 1e+07, 0, 0, 0, 0, 1e+07, 0, 0, 0, 0, 1e+07), 4, 4, byrow = TRUE)) } dlmFitObj <- dlmMLE(y, parm = parm, build = buildObj) dlmFitObj <- dlmMLE(y, parm = parm, build = buildObj) (dlmModObj <- buildObj(dlmFitObj$par)) # $FF # [,1] [,2] [,3] [,4] # [1,] 1 1 0 0 # # $V # [,1] # [1,] 3.37127e-05 # # $GG # [,1] [,2] [,3] [,4] # [1,] 1 0 0 0 # [2,] 0 -1 -1 -1 # [3,] 0 1 0 0 # [4,] 0 0 1 0 # # $W # [,1] [,2] [,3] [,4] # [1,] 2.124158e-05 0.000000e+00 0 0 # [2,] 0.000000e+00 4.345176e-07 0 0 # [3,] 0.000000e+00 0.000000e+00 0 0 # [4,] 0.000000e+00 0.000000e+00 0 0 # # $m0 # [1] 0 0 0 0 # # $C0 # [,1] [,2] [,3] [,4] # [1,] 1e+07 0e+00 0e+00 0e+00 # [2,] 0e+00 1e+07 0e+00 0e+00 # [3,] 0e+00 0e+00 1e+07 0e+00 # [4,] 0e+00 0e+00 0e+00 1e+07 p <- autoplot(dlmFilter(y, dlmModObj), fitted.colour = 'red', fitted.linetype = 'dashed') autoplot(dlmSmooth(y, dlmModObj), colour = 'blue', p = p)
4. 説明変数のあるローカルレベルモデル (確率的レベル)
- : は確率的レベル、 は説明変数の係数。
説明変数は dlm::dlm
へ引数 X
として渡す。
ukpetrol <- read.table('logUKpetrolprice.txt', skip = 1) ukpetrol <- ts(ukpetrol, start = start(ukdrivers), frequency = frequency(ukdrivers)) y <- ukdrivers x <- ukpetrol parm <- c(var(y), 0.001) buildObj <- function(parm) { dlm(FF = matrix(c(1, 0), 1, 2), V = matrix(exp(parm[1]), 1, 1), GG = matrix(c(1, 0, 0, 1), 2, 2, byrow = TRUE), W = matrix(c(exp(parm[2]), 0, 0, 0), 2, 2, byrow = TRUE), m0 = c(0, 0), C0 = matrix(c(1e+07, 0, 0, 1e+07), 2, 2, byrow = TRUE), JFF = matrix(c(0, 1), 1, 2), X = as.matrix(x)) } dlmFitObj <- dlmMLE(y, parm = parm, build = buildObj) (dlmModObj <- buildObj(dlmFitObj$par)) # $FF # [,1] [,2] # [1,] 1 1 # # $V # [,1] # [1,] 0.002347965 # # $GG # [,1] [,2] # [1,] 1 0 # [2,] 0 1 # # $W # [,1] [,2] # [1,] 0.01166743 0 # [2,] 0.00000000 0 # # $JFF # [,1] [,2] # [1,] 0 1 # # $X # V1 # [1,] -2.273 # [2,] -2.279 # [3,] ... # # $m0 # [1] 0 0 # # $C0 # [,1] [,2] # [1,] 1e+07 0e+00 # [2,] 0e+00 1e+07 filtered <- dlmFilter(y, dlmModObj)$m smoothed <- dlmSmooth(y, dlmModObj)$s p <- autoplot(y) p <- autoplot(filtered[, 1] + filtered[, 2] * x, colour = 'red', linetype = 'dashed', p = p) autoplot(smoothed[, 1] + smoothed[, 2] * x, colour = 'blue', p = p)
5. 干渉変数のあるローカルレベルモデル (確率的レベル)
- : は確率的レベル、 は干渉変数の係数。
変数名は異なるが、式は説明変数のあるローカルレベルモデルと同一。
ukseats <- c(rep(0, (1982 - 1968) * 12 + 1), rep(1, (1984 - 1982) * 12 - 1)) ukseats <- ts(ukseats, start = start(ukdrivers), frequency = frequency(ukdrivers)) x <- ukseats parm <- c(var(y), 0.001) buildObj <- function(parm) { dlm(FF = matrix(c(1, 0), 1, 2), V = matrix(exp(parm[1]), 1, 1), GG = matrix(c(1, 0, 0, 1), 2, 2, byrow = TRUE), W = matrix(c(exp(parm[2]), 0, 0, 0), 2, 2, byrow = TRUE), m0 = c(0, 0), C0 = matrix(c(1e+07, 0, 0, 1e+07), 2, 2, byrow = TRUE), JFF = matrix(c(0, 1), 1, 2), X = as.matrix(x)) } dlmFitObj <- dlmMLE(y, parm = parm, build = buildObj) (dlmModObj <- buildObj(dlmFitObj$par)) # $FF # [,1] [,2] # [1,] 1 0 # # $V # [,1] # [1,] 0.002692686 # # $GG # [,1] [,2] # [1,] 1 0 # [2,] 0 1 # # $W # [,1] [,2] # [1,] 0.01041175 0 # [2,] 0.00000000 0 # # $JFF # [,1] [,2] # [1,] 0 1 # # $X # [,1] # [1,] 0 # [2,] 0 # [3,] ... # # $m0 # [1] 0 0 # # $C0 # [,1] [,2] # [1,] 1e+07 0e+00 # [2,] 0e+00 1e+07 filtered <- dlmFilter(y, dlmModObj)$m smoothed <- dlmSmooth(y, dlmModObj)$s p <- autoplot(y) p <- autoplot(filtered[, 1] + filtered[, 2] * x, colour = 'red', linetype = 'dashed', p = p) autoplot(smoothed[, 1] + smoothed[, 2] * x, colour = 'blue', p = p)
6. 季節要素と干渉変数のあるローカルレベルモデル (確率的レベルと確率的季節)
- : は確率的レベル、 は確率的季節、 は干渉変数の係数。
補足 テキスト記載の数式は3項の季節要素、説明変数、干渉変数を含む 6 状態が記載されている。季節要素が3項のため英国インフレーションモデルについて記載していると思われるが、このモデルには説明変数がないため状態数があわない。そのため、テキスト中の式を英国インフレーションモデルに合うよう書き換えている。
y <- ukinflation ukpulse <- rep(0, length.out = length(ukinflation)) ukpulse[4*(1975-1950)+2] <- 1 ukpulse[4*(1979-1950)+3] <- 1 ukpulse <- ts(ukpulse, start = start(ukinflation), frequency = frequency(ukinflation)) x <- ukpulse parm <- log(c(var(y), 0.00001, 0.00001)) buildObj <- function(parm) { dlm(FF = matrix(c(1, 1, 0, 0, 1), 1, 5), V = matrix(c(exp(parm[1])), 1, 1), GG = matrix(c(1, 0, 0, 0, 0, 0, -1, -1, -1, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1), 5, 5, byrow = TRUE), W = matrix(c(exp(parm[2]), 0, 0, 0, 0, 0, exp(parm[3]), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0), 5, 5, byrow = TRUE), m0 = c(0, 0, 0, 0, 0), C0 = matrix(c(1e+07, 0, 0, 0, 0, 0, 1e+07, 0, 0, 0, 0, 0, 1e+07, 0, 0, 0, 0, 0, 1e+07, 0, 0, 0, 0, 0, 1e+07), 5, 5, byrow = TRUE), JFF = matrix(c(0, 0, 0, 0, 1), 1, 5), X = as.matrix(x)) } dlmFitObj <- dlmMLE(y, parm = parm, build = buildObj) (dlmModObj <- buildObj(dlmFitObj$par)) # $FF # [,1] [,2] [,3] [,4] [,5] # [1,] 1 1 0 0 1 # # $V # [,1] # [1,] 2.27569e-05 # # $GG # [,1] [,2] [,3] [,4] [,5] # [1,] 1 0 0 0 0 # [2,] 0 -1 -1 -1 0 # [3,] 0 1 0 0 0 # [4,] 0 0 1 0 0 # [5,] 0 0 0 0 1 # # $W # [,1] [,2] [,3] [,4] [,5] # [1,] 1.783797e-05 0.000000e+00 0 0 0 # [2,] 0.000000e+00 4.310206e-07 0 0 0 # [3,] 0.000000e+00 0.000000e+00 0 0 0 # [4,] 0.000000e+00 0.000000e+00 0 0 0 # [5,] 0.000000e+00 0.000000e+00 0 0 0 # # $JFF # [,1] [,2] [,3] [,4] [,5] # [1,] 0 0 0 0 1 # # $X # [,1] # [1,] 0 # [2,] 0 # [3,] ... # # $m0 # [1] 0 0 0 0 0 # # $C0 # [,1] [,2] [,3] [,4] [,5] # [1,] 1e+07 0e+00 0e+00 0e+00 0e+00 # [2,] 0e+00 1e+07 0e+00 0e+00 0e+00 # [3,] 0e+00 0e+00 1e+07 0e+00 0e+00 # [4,] 0e+00 0e+00 0e+00 1e+07 0e+00 # [5,] 0e+00 0e+00 0e+00 0e+00 1e+07 p <- autoplot(dlmFilter(y, dlmModObj), fitted.colour = 'red', fitted.linetype = 'dashed') autoplot(dlmSmooth(y, dlmModObj), colour = 'blue', p = p)
モデルの作り方
とはいえ、最初から上式のような形でモデルが定義できていることはないと思う。まずは簡単なモデルを dlm::dlmModPoly
などで作って、それをベースに書き換えていくのが良さそう。作ったモデルに適当なパラメータを与えて初期化すれば、モデルの次元 / 中身を確認できる。
y <- ukdrivers parm<- log(c(var(y), 0.001)) buildFun <- function(parm){ dlmModPoly(order = 2, dV = exp(parm[1]), dW = c(exp(parm[2]), 0)) } buildFun(parm) # $FF # [,1] [,2] # [1,] 1 0 # # $V # [,1] # [1,] 0.02935256 # # $GG # [,1] [,2] # [1,] 1 1 # [2,] 0 1 # # $W # [,1] [,2] # [1,] 0.001 0 # [2,] 0.000 0 # # $m0 # [1] 0 0 # # $C0 # [,1] [,2] # [1,] 1e+07 0e+00 # [2,] 0e+00 1e+07
R で 状態空間モデル: {dlm} の対数尤度計算について
状態空間モデルを扱う R パッケージである {dlm}
では、モデルのパラメータ推定を最尤法で行なう。その際に使われる対数尤度の計算式はパッケージによって違いがあるようで、推定結果が自分の参照しているテキストとずれる。その違いを確かめたい。
当該のテキストはこちら。今回はこれを持っている方向けの内容。
- 作者: J.J.F.コマンダー,S.J.クープマン,Jacques J.F. Commandeur,Sime Jan Koopman,和合肇
- 出版社/メーカー: シーエーピー出版
- 発売日: 2008/09
- メディア: 単行本
- 購入: 2人 クリック: 4回
- この商品を含むブログを見る
参考 サイト 「状態空間時系列分析入門」をRで再現する はこのテキストを {dlm}
, {KFAS}
で再現しており、対数尤度の差異についても記載がある。
テキストでは単変量状態空間モデルの 時刻 における観測値 と状態 を以下の式であらわしている。
1期先の状態を更新するカルマンフィルタは、
- : カルマンゲイン
- : 状態推定誤差分散
- : カルマンフィルタでの1期先予測誤差分散
これらを用いて、対数尤度関数は式 (8.7) で以下のように定義されている。
- : 観測時系列の長さ
- : 推定する状態数
- : カルマンフィルタの1期先予測誤差
この式で計算される対数尤度は {dlm}
のものと異なる。差異がある理由として記載する 2 点がわかったが、これらを解消してもまだ結果はあわない。
1. テキストの記載による差異
まず、テキストの各モデルに対数尤度として記載されている数値は 上式の計算結果そのものでない。P162 に添付されているプログラムに 対数尤度を系列の長さ で割る処理がしれっと入っており、処理後の値が対数尤度として記載されている。そのため、対数尤度に戻すためには各モデルに記載されている数値に 系列の長さ をかける必要がある。
例えば 2.1 ローカルレベルモデル (確定的レベルモデル) に対数尤度として記載されている数値 0.3297597 に、系列の長さ 192 をかけた 63.31386 が 式 (8.7) の計算結果にあたる。
2. {dlm}
の対数尤度計算式との差異
テキストで利用しているデータ利用して、{dlm}
との差異を確かめたい。まずデータを読み込む。データは著者のサポートサイト、An Introduction to State Space Time Series Analysis から入手できる。
ukdrivers <- read.table('UKdriversKSI.txt', skip = 1) ukdrivers <- ts(ukdrivers[[1]], start = c(1969, 1), frequency = 12) ukdrivers <- log(ukdrivers) y <- ukdrivers
パラメータを受け取ってローカルレベルモデルを返す関数 buildFun
を定義し、dlm::dlmMLE
で最尤推定する。結果は リストになり、推定されたパラメータ par
や 対数尤度 (を代替するもの/後述) value
がプロパティとして含まれている。
library(dlm) parm <- c(var(y)) buildFun <- function(x) { dlmModPoly(order = 1, dV = parm, dW = 0) } dlmFit <- dlmMLE(y, parm = parm, build = buildFun) class(dlmFit) # [1] "list" names(dlmFit) # [1] "par" "value" "counts" "convergence" "message" dlmFit$par # [1] 0.02935256
続けて、最尤推定したパラメータ dlmFit$par
を使ってモデルを作成する。プロパティの意味は vignette の記載がわかりやすい。テキストと式/添字をあわせると
FF
: に対応。V
: に対応。GG
: に対応。W
: に対応。m0
: の事前分布の平均。C0
: の事前分布の分散。J...
: 各パラメータが時間変化するとき利用。
dlmMod <- buildFun(dlmFit$par) class(dlmMod) # [1] "dlm" names(dlmMod) # [1] "m0" "C0" "FF" "V" "GG" "W" "JFF" "JV" "JGG" "JW" V(dlmMod) # [,1] # [1,] 0.02935256
ここで、{dlm}
の内部で最尤推定がどのように行なわれているのか確かめたい。
dlm::dlmMLE
の定義をみると、最尤推定は dlm::dlmLL
を optim
で最適化することで行っている。
dlmMLE # function (y, parm, build, method = "L-BFGS-B", ..., debug = FALSE) # { # logLik <- function(parm, ...) { # mod <- build(parm, ...) # return(dlmLL(y = y, mod = mod, debug = debug)) # } # out <- optim(parm, logLik, method = method, ...) # return(out) # } # <environment: namespace:dlm>
dlm::dlmLL
はその名前から対数尤度のようにも見えるが、対数尤度とは符号が逆 ( optim
で最適化するため、尤もらしいほど値が小さくなる )。
dlmLL(y = y, mod = buildFun(dlmFit$par)) # [1] -230.7721
補足 パラメータ推定時の dlm::dlmLL
の値は dlm::dlmMLE
の結果にも含まれている。
dlmFit$value # [1] -230.7721
dlm::dlmLL
の処理は少し条件分岐が多いため省略するが、上のモデルについて必要な処理は以下のようなものであることがわかる。カルマンフィルタ dlm::dlmFilter
を適用した結果から予測誤差、予測誤差標準偏差を求めて 対数尤度の代替となる値を算出している。
# 予測誤差、予測誤差標準偏差を計算 residuals_dlmcompat <- function (object) { mod <- object$mod # yは観測値 (入力), a は状態の予測値 res <- drop(object$y - object$a %*% t(mod$FF)) f <- function(i) { # 予測誤差標準偏差 diag(crossprod(object$D.R[i, ] * t(mod$FF %*% object$U.R[[i]])) + mod$V) } SD <- drop(t(sqrt(sapply(seq(along = object$U.R), f)))) return(list(res = res, sd = SD)) } # dlmLLと同じ値を計算 LL_dlmcompat <- function(y, mod) { filtered <- dlmFilter(y, mod) resid <- residuals_dlmcompat(filtered) # 予測誤差分散 F <- as.numeric(resid$sd)^2 v <- as.numeric(resid$res) return(0.5 * sum(log(F) + v^2/F)) } LL_dlmcompat(y = y, mod = buildFun(dlmFit$par)) # [1] -230.7721
これを上式 (8.7) と比べると以下の差異がある。
- 符号が逆 (
optim
での処理のため ) - 第1項目がない ( パラメータ非依存のため
optim
には影響ない ) - 第2項目の総和の範囲が異なる
これをテキストに準じて書き換えると、
LL_ckbookcompat <- function(y, mod) { n <- length(y) filtered <- dlmFilter(y, mod) resid <- residuals_dlmcompat(filtered) F <- (as.numeric(resid$sd)^2)[2:n] v <- as.numeric(resid$res)[2:n] return(-0.5 * (n * log(2 * pi) + sum(log(F) + v^2/F))) } LL_ckbookcompat(y = y, mod = buildFun(dlmFit$par)) # [1] 62.39492
となり、テキストの対数尤度 63.31386 に結構 近づいた (が、あわない、、)。対数尤度の計算結果はパッケージによって異なる、ということは覚えておいたほうがよさそうだ。
補足: このモデルにおいては、 {KFAS}
の結果も上記修正後の結果と一致している。
library(KFAS) kfasMod <- SSModel(y ~ SSMtrend(degree=1, Q=list(matrix(0))), H = matrix(NA)) kfasFit <- fitSSM(kfasMod, inits = parm, method = "BFGS") kfasFit$optim.out$value # [1] -62.39492 KFS(kfasFit$model)$logLik # [1] 62.39492
pandas 日時まわりのリサンプリング/オフセット処理
こちらの続き。
今回のサンプルデータには自分の歩数のデータを使いたい。インスパイヤ元は以下のサイトだ。
データの読み込み
歩数データは iPhone の Health アプリから Export できる。形式は XML なので、そのままでは pandas
で読み込めない。一度 XML から必要な属性を辞書のリストとして取り出した後、pandas
に読み込ませる。
import pandas as pd from xml.etree import ElementTree tree = ElementTree.parse('export.xml') root = tree.getroot() # 属性の辞書のリストを作る data = [e.attrib for e in root.findall('.//Record')] data[0] # {'endDate': '20150511220900+0900', # 'recordCount': '104', # 'source': u'xxx', # 'startDate': '20150511210900+0900', # 'type': 'HKQuantityTypeIdentifierStepCount', # 'unit': 'count', # 'value': '531'} df = pd.DataFrame(data) # 必要な列のみにフィルタ df = df[['startDate', 'type', 'unit', 'value']] df.head() # startDate type unit value # 0 20150511210900+0900 HKQuantityTypeIdentifierStepCount count 531 # 1 20150512000900+0900 HKQuantityTypeIdentifierStepCount count 6 # 2 20150512060900+0900 HKQuantityTypeIdentifierStepCount count 629 # 3 20150512070900+0900 HKQuantityTypeIdentifierStepCount count 312 # 4 20150512080900+0900 HKQuantityTypeIdentifierStepCount count 1483
Series.value_counts()
を使うと、ある列で出現する要素の個数を集計できる。"type" 列を集計すると、Export したデータには歩数以外のデータも含まれていることがわかる。歩数以外は不要なので削除する。
df['type'].value_counts() # HKQuantityTypeIdentifierStepCount 375 # HKQuantityTypeIdentifierDistanceWalkingRunning 374 # HKQuantityTypeIdentifierFlightsClimbed 216 # dtype: int64 # 歩数のみにフィルタ df = df[df['type'] == 'HKQuantityTypeIdentifierStepCount']
また、XML で読み込んだデータはすべて文字列型になっているため、適宜型変換を行う。
# 歩数データは数値に df['value'] = df['value'].astype(float) # 日付は index として設定し、日付型 (DatetimeIndex) に変換 df = df.set_index('startDate') # この時点で index は文字列型になっている df.index # Index([u'20150511210900+0900', u'20150512000900+0900', u'20150512060900+0900', # u'20150512070900+0900', u'20150512080900+0900', u'20150512090900+0900', # ... # u'20150518165000+0900', u'20150518165900+0900', u'20150518171600+0900', # u'20150518181200+0900'], # dtype='object', name=u'startDate', length=965) # to_datetime で日時型に変換 / タイムゾーンを表すオフセットを適宜設定 pd.to_datetime(df.index, utc=True).tz_convert('Asia/Tokyo') # DatetimeIndex(['2015-05-11 21:09:00+09:00', '2015-05-12 00:09:00+09:00', # '2015-05-12 06:09:00+09:00', '2015-05-12 07:09:00+09:00', # ... # '2015-05-18 16:50:00+09:00', '2015-05-18 16:59:00+09:00', # '2015-05-18 17:16:00+09:00', '2015-05-18 18:12:00+09:00'], # dtype='datetime64[ns]', length=965, freq=None, tz='Asia/Tokyo') # index を上書き df.index = pd.to_datetime(df.index, utc=True).tz_convert('Asia/Tokyo') # タイムゾーンは使わないので削除 df.index = df.index.tz_localize(None) # 日付の昇順にソート df = df.sort_index() df.head() # type unit value # 2014-11-25 21:09:00 HKQuantityTypeIdentifierStepCount count 1396.00 # 2014-11-26 21:09:00 HKQuantityTypeIdentifierStepCount count 7020.37 # 2014-11-27 21:09:00 HKQuantityTypeIdentifierStepCount count 6413.63 # 2014-11-28 21:09:00 HKQuantityTypeIdentifierStepCount count 8396.00 # 2014-11-29 21:09:00 HKQuantityTypeIdentifierStepCount count 12411.00
今の機種に買い替えたのが昨年11末なので、買い替え以降すべてのデータが入っているようだ。
部分文字列によるスライシング
pandas
での一般的なデータ選択については以下の記事にまとめた。
加えて、データが 日付型の Index ( DatetimeIndex
) を持つとき、日付-like な文字列で __getitem__
すると、その期間にあてはまるデータを抽出してくれる。例えば 2015-05-17 日分のデータを抽出したければ以下のようにする。この方法を使うと 好きな期間のデータを簡単に確認することができる。詳細は公式ドキュメントを参照。
df['2015-05-17'] # type unit value # 2015-05-17 08:09:00 HKQuantityTypeIdentifierStepCount count 18.0000 # 2015-05-17 11:09:00 HKQuantityTypeIdentifierStepCount count 180.0000 # 2015-05-17 12:09:00 HKQuantityTypeIdentifierStepCount count 934.0000 # 2015-05-17 13:09:00 HKQuantityTypeIdentifierStepCount count 469.0000 # ... ... ... ... # 2015-05-17 21:16:00 HKQuantityTypeIdentifierStepCount count 88.5087 # 2015-05-17 21:17:00 HKQuantityTypeIdentifierStepCount count 23.1214 # 2015-05-17 21:18:00 HKQuantityTypeIdentifierStepCount count 99.3283 # 2015-05-17 21:19:00 HKQuantityTypeIdentifierStepCount count 41.6284 # # [14 rows x 3 columns]
リサンプリング
データは日時で処理したいので、まずは 1日ごとの合計を算出したい。こういうときは DataFrame.resample
。リサンプリングする期間や集約関数は引数として指定できる。
s = df.resample('D', how='sum') # value # 2014-11-25 1396.000000 # 2014-11-26 7020.370000 # 2014-11-27 6413.630000 # 2014-11-28 8396.000000 # ... ... # 2015-05-15 7561.998000 # 2015-05-16 10615.000000 # 2015-05-17 9208.000000 # 2015-05-18 8085.994898 # # [175 rows x 1 columns]
日次に集約した結果をプロットしてみる。
s.plot()
あまり意識はしていないのだが そこそこ歩いているようだ。5月初旬はちょっと出かけていたため 歩数に異常値が出ている。
日付の標準化
日時データの適当な期間での集約は DataFrame.resample
でできた。が、時には日時の補正のみを行い、集約はしたくない場合がある。
DatetimeIndex
を日付ごとにまとめるのに一番簡単なのは .normalize
。
pd.Timestamp('2015-05-01 23:59').normalize() # Timestamp('2015-05-01 00:00:00') pd.Timestamp('2015-05-02 00:00').normalize() # Timestamp('2015-05-02 00:00:00')
他の日時関連のメソッドと同じく、Timestamp
, DatetimeIndex
両方で使える。
df.tail().index # DatetimeIndex(['2015-05-18 18:55:00', '2015-05-18 18:56:00', # '2015-05-18 18:57:00', '2015-05-18 18:58:00', # '2015-05-18 18:59:00'], # dtype='datetime64[ns]', freq=None, tz=None) df.tail().index.normalize() # DatetimeIndex(['2015-05-18', '2015-05-18', # '2015-05-18', '2015-05-18', # '2015-05-18'], # dtype='datetime64[ns]', freq=None, tz=None)
当然、集約すれば 日付での .resample
と同じ結果になる。
s = df.groupby(df.index.normalize())[['value']].sum() s # 略
オフセット
また、より柔軟に日時補正を行うためにオフセットが提供されている ( 前回記事 )。オフセットを利用してデータを補正/集約する例を書きたい。
オフセットを使わずに .resample
で月次平均をとると以下のような結果になる。
s.resample('M', how='mean') # value # 2014-11-30 7012.500000 # 2014-12-31 8435.741935 # 2015-01-31 9134.032258 # 2015-02-28 9323.821429 # 2015-03-31 9326.356129 # 2015-04-30 9938.533333 # 2015-05-31 10973.055439
オフセットを使って同じ処理をするには、日時を月末に補正するオフセット MonthEnd
を使う。オフセットの一覧は 公式ドキュメント を参照。
m = offsets.MonthEnd()
m
# <MonthEnd>
オフセットにはいくつか共通のメソッドがあるため、順に記載する。
まず、ある日付が 当該のオフセット上に存在するか調べるには .onOffset
。MonthEnd.onOffset
の場合は、引数が月末の日付であるとき True
になる。
m.onOffset(pd.Timestamp('2015-04-29')) # False m.onOffset(pd.Timestamp('2015-04-30')) # True m.onOffset(pd.Timestamp('2015-05-01')) # False
オフセットを加減算することにより、日時を次の/前のオフセットへと移動できる。また、オフセットの加算と同一の処理として .apply
がある。
# 日付を常に 次のオフセットへ移動させる。 pd.Timestamp('2015-04-29') + m # Timestamp('2015-04-30 00:00:00') pd.Timestamp('2015-04-30') + m # Timestamp('2015-05-31 00:00:00') pd.Timestamp('2015-05-01') + m # Timestamp('2015-05-31 00:00:00') # 日付を常に 前のオフセットへ移動させる。 pd.Timestamp('2015-04-29') - m # Timestamp('2015-03-31 00:00:00') pd.Timestamp('2015-04-30') - m # Timestamp('2015-03-31 00:00:00') pd.Timestamp('2015-05-01') - m # Timestamp('2015-04-30 00:00:00') # 日付を常に 次のオフセットへ移動させる。 m.apply(pd.Timestamp('2015-04-29')) # Timestamp('2015-04-30 00:00:00') m.apply(pd.Timestamp('2015-04-30')) # Timestamp('2015-05-31 00:00:00') m.apply(pd.Timestamp('2015-05-01')) # Timestamp('2015-05-31 00:00:00')
オフセットに乗っている日時は動かしたくなければ .rollforward
/ .rollback
というメソッドを使う。
# 日付がオフセットに乗っていない場合 次のオフセットへ移動させる。 m.rollforward(pd.Timestamp('2015-04-29')) # Timestamp('2015-04-30 00:00:00') m.rollforward(pd.Timestamp('2015-04-30')) # Timestamp('2015-04-30 00:00:00') m.rollforward(pd.Timestamp('2015-05-01')) # Timestamp('2015-05-31 00:00:00') # 日付がオフセットに乗っていない場合 前のオフセットへ移動させる。 m.rollback(pd.Timestamp('2015-04-29')) # Timestamp('2015-03-31 00:00:00') m.rollback(pd.Timestamp('2015-04-30')) # Timestamp('2015-04-30 00:00:00') m.rollback(pd.Timestamp('2015-05-01')) # Timestamp('2015-04-30 00:00:00')
よって、月次平均の算出をオフセットのメソッドを使って行う場合は以下のようになる。
# 例示のため一部データをスライス s['2014-11'] # value # 2014-11-25 1396.00 # 2014-11-26 7020.37 # 2014-11-27 6413.63 # 2014-11-28 8396.00 # 2014-11-29 12411.00 # 2014-11-30 6438.00 # 日時を月末に補正 s['2014-11'].index.map(m.rollforward) # array([Timestamp('2014-11-30 00:00:00'), Timestamp('2014-11-30 00:00:00'), # Timestamp('2014-11-30 00:00:00'), Timestamp('2014-11-30 00:00:00'), # Timestamp('2014-11-30 00:00:00'), Timestamp('2014-11-30 00:00:00')], dtype=object) s.groupby(s.index.map(m.rollforward)).mean() # value # 2014-11-30 7012.500000 # 2014-12-31 8435.741935 # 2015-01-31 9134.032258 # 2015-02-28 9323.821429 # 2015-03-31 9326.356129 # 2015-04-30 9938.533333 # 2015-05-31 10973.055439
補足 オフセットは .resample
の引数として渡すこともでき、同じ結果になる。
s.resample(m, how='mean') # 略
オフセットを活用した集計
つづけて、カレンダー上の平日 / 休日での差異をみたい。アクセサ ( こちらの記事参照 ) を使って曜日で集計してから処理してもよいが、 BusinessDay
オフセットを使えば簡単。BusinessDay.onOffset
を使えば平日が True
/ 休日を False
として集計できる。以下の結果をみると、休日の方が歩いている傾向があるようだ。
bday = offsets.BusinessDay() s.groupby(bday.onOffset).mean() # value # False 10860.216800 # True 8716.657583
BusinessDay
オフセットは 土日のみを休日として扱うが、任意の祝日も含めて休日として扱いたい場合は CustomBusinessDay
オフセットが使える。拙作のパッケージ で 日本の祝日のカレンダーを定義しているので、それを使って、
import japandas as jpd calendar = jpd.JapaneseHolidayCalendar() cday = pd.offsets.CDay(calendar=calendar) s.groupby(cday.onOffset).mean() # value # False 10799.033448 # True 8600.419640
これまでの内容を使って、月ごとに 平日 / 土日 / 祝日の歩数平均をクロス集計したい。クロス集計を行うには pd.pivot_table
。
avg = pd.pivot_table(s, index=s.index.map(m.rollforward), columns=[s.index.map(bday.onOffset), s.index.map(cday.onOffset)], values='value', aggfunc='mean') avg # False True # False False True # 2014-11-30 9424.500000 NaN 5806.500000 # 2014-12-31 10382.275000 11704.0 7579.354545 # 2015-01-31 9534.193333 10162.5 8851.113000 # 2015-02-28 10943.125000 9446.0 8635.578947 # 2015-03-31 11148.877778 NaN 8580.779091 # 2015-04-30 11349.625000 7110.0 9535.666667 # 2015-05-31 12769.000000 11582.7 9572.544211
左から、
- 土日:
BusinessDay
CustomBusinessDay
ともにFalse
- 祝日:
BusinessDay
はTrue
,CustomBusinessDay
はFalse
- 平日:
BusinessDay
CustomBusinessDay
ともにTrue
となる。ということで上の結果からも 5月の休日は結構歩いたなってことがわかった。
まとめ
日付を index として持つデータに対する リサンプリング/オフセット処理をまとめた。これらは以下のような使い分けをすればよいと思う。
- 比較的単純な集約はリサンプリング
- リサンプリングではできない より細かい補正やクロス集計をしたい場合はオフセット
Pythonによるデータ分析入門 ―NumPy、pandasを使ったデータ処理
- 作者: Wes McKinney,小林儀匡,鈴木宏尚,瀬戸山雅人,滝口開資,野上大介
- 出版社/メーカー: オライリージャパン
- 発売日: 2013/12/26
- メディア: 大型本
- この商品を含むブログ (9件) を見る
pandas 0.16.0/0.16.1 の主要な新機能
先日 5/11 に pandas 0.16.1 がリリースされた。前バージョンである 0.16.0 とあわせて、主要な変更点である以下3点の概要をまとめたい。各見出しの括弧内には対応したバージョンを記載した。
- 簡単な列追加 /
DataFrame.assign
(0.16.0) - 文字列処理の強化 (0.16.0/0.16.1)
- ランダムサンプリング
DataFrame.sample
(0.16.1)
変更点全体はリリースノートを参照。上記3点以外にも便利な変更はあるのだが、 Categorical や Frequency など 元機能の説明がないとわからない箇所なので別途、、。
簡単な列追加 / DataFrame.assign
(0.16.0)
DataFrame
への列追加をより簡潔に行うためのメソッドとして、DataFrame.assign
が追加された。R {dplyr}
の mutate
関数に似た書式で複数列の追加ができる。
これまで列追加の際には カラムを指定して代入し、元データを破壊的に変更する必要があった。
import pandas as pd df = pd.DataFrame({'A':[1, 2, 3], 'B':[4, 5, 6]}) df # A B # 0 1 4 # 1 2 5 # 2 3 6 # これまでの列追加 df['C'] = [7, 8, 9] df # A B C # 0 1 4 7 # 1 2 5 8 # 2 3 6 9
assign
を使うと、作成したい列名をキーワード引数として以下のように書ける。また、戻り値は元データに列追加したコピーとなり、元データ自体は変更されない。
# D列, E列を追加 df.assign(D=[10, 11, 12], E=[13, 14, 15]) # A B C D E # 0 1 4 7 10 13 # 1 2 5 8 11 14 # 2 3 6 9 12 15 # 元データはそのまま df # A B C # 0 1 4 7 # 1 2 5 8 # 2 3 6 9
キーワード引数の値には通常の列作成時と同じ型が渡せる。各列の値を組み合わせた列 + ダミー列を同時に作りたければ、
df.assign(AB=df['A'] * df['B'], AC=df['A'] + df['C'], dummy=1) # A B C AB AC dummy # 0 1 4 7 4 8 1 # 1 2 5 8 10 10 1 # 2 3 6 9 18 12 1
キーワード引数として渡せない列名、例えば記号や日本語を含む列名は、一度 辞書にしてから渡せばよい。
keys = {'A*B': df['A'] * df['B'], 'A+B':df['A'] + df['B']} df.assign(**keys) # A B A*B A+B # 0 1 4 4 5 # 1 2 5 10 7 # 2 3 6 18 9
補足: メソッド自体は 0.16.0 で追加され、0.16.1 でキーワード引数の処理順序がアルファベット順に固定された。
文字列処理の強化 (0.16.0/0.16.1)
pandas
での文字列処理について過去に以下の記事を書いたことがあるのだが、当時はアクセサから利用できるメソッドが限定されていて、少し手間がかかるところがあった。
0.16.0, 0.16.1 で、.str
アクセサに以下のメソッド群が追加された。それぞれ、 Python 標準の文字列メソッドと同一の処理を Series
内の値に対して適用するもの。
Methods | ||||
---|---|---|---|---|
isalnum |
isalpha |
isdigit |
isspace |
islower |
isupper |
istitle |
isnumeric |
isdecimal |
find |
rfind |
ljust |
rjust |
zfill |
capitalize |
swapcase |
normalize |
partition |
rpartition |
index |
rindex |
translate |
補足 str.normalize
は ユニコード正規化 ( unicodedata.normalize
) を値に対して適用するもの。
補足 .str
アクセサから利用可能なメソッド全体はこちら。
df = pd.DataFrame({'A': ['xxx', '3', 'yyy'], 'B': [1, 2, 3]}) df # A B # 0 xxx 1 # 1 3 2 # 2 yyy 3 # 先頭を大文字に df['A'].str.capitalize() # 0 Xxx # 1 3 # 2 Yyy # Name: A, dtype: object # 文字列が数値がどうかを調べる df['A'].str.isdigit() # 0 False # 1 True # 2 False # Name: A, dtype: bool # 5桁分を 0 パディング df['A'].str.zfill(5) # 0 00xxx # 1 00003 # 2 00yyy # Name: A, dtype: object
やりたいことに応じて適当に組み合わせると、かなり柔軟な処理がかける。例えば 数値の文字列のみを 0 パディングしたければ、
df.loc[df['A'].str.isdigit(), 'A'] = df['A'].str.zfill(5) df # A B # 0 xxx 1 # 1 00003 2 # 2 yyy 3
補足 DataFrame.loc
の意味はこちらの記事参照。
また、0.16.1 から .str
アクセサを Index
からも呼び出せるようになった。これまでは 列名/行名に対する文字列処理は .map
を使って関数適用する必要があったが、.str
を使えば以下のように書ける。
df # A B # 0 xxx 1 # 1 00003 2 # 2 yyy 3 df.columns.str.lower() # Index([u'a', u'b'], dtype='object') # 列名を小文字に変更 df.columns = df.columns.str.lower() df # a b # 0 xxx 1 # 1 00003 2 # 2 yyy 3
str.split
のように複数の値を返しうる処理については、expand
オプションを利用して 返り値の型を制御できる。互換性維持のため、既定値はメソッドにより異なる。APIドキュメントを参照。
expand=False
: 返り値の次元を増やさない = 返り値はSeries
もしくはIndex
expand=True
: 返り値の次元を増やす = 返り値はDataFrame
もしくはMultiIndex
補足 これまで同様の制御をおこなっていた return_type
オプションは deprecate されており、将来のバージョンで削除される。
s = pd.Series(['a,b', 'a,c', 'b,c']) # 返り値は Series s.str.split(',') # 0 [a, b] # 1 [a, c] # 2 [b, c] # dtype: object # 返り値は DataFrame s.str.split(',', expand=True) # 0 1 # 0 a b # 1 a c # 2 b c idx = pd.Index(['a,b', 'a,c', 'b,c']) # 返り値は 1 レベルの Index idx.str.split(',') # Index([[u'a', u'b'], [u'a', u'c'], [u'b', u'c']], dtype='object') # 返り値は 2 レベルの MultiIndex idx.str.split(',', expand=True) # MultiIndex(levels=[[u'a', u'b'], [u'b', u'c']], # labels=[[0, 0, 1], [0, 1, 1]])
ランダムサンプリング DataFrame.sample
(0.16.1)
Series
, DataFrame
, Panel
から適当なデータをサンプリングするためのメソッドとして、.sample
が追加された。
ここでは DataFrame.sample
を例としてその処理を記載する。まず以下のようなデータを用意した。
df = pd.DataFrame({'A': [1, 2 ,3, 4, 5], 'B': ['a', 'b', 'c', 'd', 'e']}) df # A B # 0 1 a # 1 2 b # 2 3 c # 3 4 d # 4 5 e
DataFrame.sample
は 既定では 1 行をランダムにサンプリングする。
n
: サンプルサイズを指定する。既定は 1。axis
: サンプリングするレコードの方向を指定する。0
(既定) で行のサンプリング、1
で列のサンプリング。
# 1 行の抽出 df.sample() # A B # 3 4 d # 3 行の抽出 df.sample(3) # A B # 0 1 a # 4 5 e # 2 3 c # 1 列の抽出 df.sample(axis=1) # A # 0 1 # 1 2 # 2 3 # 3 4 # 4 5
また、サンプルサイズでなく抽出比を指定する場合は frac
を指定する。
df.sample(frac=0.4) # A B # 3 4 d # 1 2 b
最後に、replace
オプションを利用して 抽出方法を変更することができる。
replace=False
(既定): 非復元抽出。サンプリングされた各要素の重複を許さない。replace=True
: 復元抽出。各要素の重複を許す。
# 復元抽出。2行目、4行目が重複 df.sample(n=4, replace=True) # A B # 1 2 b # 2 3 c # 0 1 a # 2 3 c # 非復元抽出では 元のデータ数以上のサンプルサイズは取れない df.sample(n=6) # ValueError: Cannot take a larger sample than population when 'replace=False' df.sample(n=6, replace=True) # A B # 1 2 b # 4 5 e # 0 1 a # 1 2 b # 2 3 c # 4 5 e
補足 これまでは index の要素をサンプリングして選択する必要があった。以下の記事参照。
まとめ
0.16.0 / 0.16.1 の変更点のうち、以下 3 点の概要をまとめた。
- 簡単な列追加 /
DataFrame.assign
(0.16.0) - 文字列処理の強化 (0.16.0/0.16.1)
- ランダムサンプリング
DataFrame.sample
(0.16.1)
他にも、バグ修正 / 挙動・仕様の統一など 様々な改善が入っているので使ってみてください。
Pythonによるデータ分析入門 ―NumPy、pandasを使ったデータ処理
- 作者: Wes McKinney,小林儀匡,鈴木宏尚,瀬戸山雅人,滝口開資,野上大介
- 出版社/メーカー: オライリージャパン
- 発売日: 2013/12/26
- メディア: 大型本
- この商品を含むブログ (9件) を見る
簡単な集約/変換処理を PySpark & pandas の DataFrame で行う
こちらの続き。
準備
サンプルデータは iris 。今回は HDFS に csv を置き、そこから読み取って DataFrame
を作成する。
# HDFS にディレクトリを作成しファイルを置く $ hadoop fs -mkdir /data/ $ hadoop fs -put iris.csv /data/ $ hadoop fs -ls / Found 1 items drwxr-xr-x - ec2-user supergroup 0 2015-04-28 20:01 /data # Spark のパスに移動 $ echo $SPARK_HOME /usr/local/spark $ cd $SPARK_HOME $ pwd /usr/local/spark $ bin/pyspark
補足 前回同様に pandas
から直接 PySpark
の DataFrame
を作成した場合、groupBy
時に java.lang.OutOfMemoryError: Java heap space
エラーが発生してシェルごと落ちる。
CSV ファイルの読み込み
pandas
では前回同様 read_csv
。
import numpy as np import pandas as pd # 表示する行数を設定 pd.options.display.max_rows=10 names = ['SepalLength', 'SepalWidth', 'PetalLength', 'PetalWidth', 'Species'] # pandas pdf = pd.read_csv('~/iris.csv', header=None, names=names) pdf # 略
PySpark
は標準では csv から直接 DataFrame
を作成できないため、一度 Row
のリストを作成して DataFrame
に変換する。
from pyspark.sql import Row lines = sc.textFile("hdfs://127.0.0.1:9000/data/iris.csv") cells = lines.map(lambda l: l.split(",")) rows = cells.map(lambda x: Row(SepalLength=float(x[0]), SepalWidth=float(x[1]), PetalLength=float(x[2]), PetalWidth=float(x[3]), Species=x[4])) sdf = sqlContext.createDataFrame(rows) sdf.show() # 略
グルーピング/集約
ある列の値ごとに集計
pandas
, PySpark
で多少 文法は異なる。
列の値でグループ分けし、一列の合計を取得する場合:
# pandas pdf.groupby('Species')['SepalLength'].sum() # Species # setosa 250.3 # versicolor 296.8 # virginica 329.4 # Name: SepalLength, dtype: float64 # PySpark sdf.groupBy('Species').sum('SepalLength').show() # Species SUM(SepalLength) # virginica 329.3999999999999 # versicolor 296.8 # setosa 250.29999999999998
指定した複数列の合計を取得する場合:
# pandas pdf.groupby('Species')[['PetalWidth', 'PetalLength']].sum() # PetalWidth PetalLength # Species # setosa 12.2 73.2 # versicolor 66.3 213.0 # virginica 101.3 277.6 # PySpark sdf.groupBy('Species').sum('PetalWidth', 'PetalLength').show() # Species SUM(PetalWidth) SUM(PetalLength) # virginica 101.29999999999998 277.59999999999997 # versicolor 66.30000000000001 213.0 # setosa 12.199999999999996 73.2
全列の合計を取得する場合:
# pandas pdf.groupby('Species').sum() # SepalLength SepalWidth PetalLength PetalWidth # Species # setosa 250.3 170.9 73.2 12.2 # versicolor 296.8 138.5 213.0 66.3 # virginica 329.4 148.7 277.6 101.3 # PySpark sdf.groupBy('Species').sum().show() # Species SUM(PetalLength) SUM(PetalWidth) SUM(SepalLength) SUM(SepalWidth) # virginica 277.59999999999997 101.29999999999998 329.3999999999999 148.7 # versicolor 213.0 66.30000000000001 296.8 138.5 # setosa 73.2 12.199999999999996 250.29999999999998 170.90000000000003
補足 pandas
では グループ化したデータも DataFrame
と同じようにスライシングできたりする。
一方、PySpark
の GroupedData
は集約系のAPI しか持っていない。
# pandas pdf.groupby('Species')['PetalWidth'] # <pandas.core.groupby.SeriesGroupBy object at 0x7f62f4218d50> # PySpark (NG!) sdf.groupBy('Species')[['Species']] # TypeError: 'GroupedData' object has no attribute '__getitem__' sdf.groupBy('Species').select('PetalWidth') # AttributeError: 'GroupedData' object has no attribute 'select'
また、pandas
では apply
で自作の集約関数 (UDAF) を利用することができるが、PySpark
1.3.1 時点 では非対応らしい。PySpark
の udf
を利用して定義した自作関数を集約時に使うと以下のエラーになる。
# pandas pdf.groupby('Species')[['PetalWidth', 'PetalLength']].apply(np.sum) # PetalWidth PetalLength # Species # setosa 12.2 73.2 # versicolor 66.3 213.0 # virginica 101.3 277.6 # PySpark (NG!) import pyspark.sql.functions np_sum = pyspark.sql.functions.udf(np.sum, pyspark.sql.types.FloatType()) sdf.groupBy('Species').agg(np_sum(sdf.PetalWidth)) # py4j.protocol.Py4JJavaError: An error occurred while calling o334.agg. # : org.apache.spark.sql.AnalysisException: expression 'pythonUDF' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() if you don't care which value you get.;
行持ち / 列持ち変換
複数列持ちの値を行持ちに展開 (unpivot / melt)
pandas
では pd.melt
。 DataFrame.melt
ではないので注意。
# pandas pmelted = pd.melt(pdf, id_vars=['Species'], var_name='variable', value_name='value') pmelted # Species variable value # 0 setosa SepalLength 5.1 # 1 setosa SepalLength 4.9 # 2 setosa SepalLength 4.7 # 3 setosa SepalLength 4.6 # 4 setosa SepalLength 5.0 # .. ... ... ... # 595 virginica PetalWidth 2.3 # 596 virginica PetalWidth 1.9 # 597 virginica PetalWidth 2.0 # 598 virginica PetalWidth 2.3 # 599 virginica PetalWidth 1.8 # # [600 rows x 3 columns]
同様の処理を PySpark
でやるには、DataFrame.flatMap
。1行の入力に対して複数行 (この例では4行) のデータを返すことができる。fratMap
の返り値は RDD
インスタンスになるため、必要なら再度 DataFrame
化する。
# PySpark def mapper(row): return [Row(Species=row[4], variable='PetalLength', value=row[0]), Row(Species=row[4], variable='PetalWidth', value=row[1]), Row(Species=row[4], variable='SepalLength', value=row[2]), Row(Species=row[4], variable='SepalWidth', value=row[3])] smelted = sqlContext.createDataFrame(sdf.flatMap(mapper)) smelted.show() # Species value variable # setosa 1.4 PetalLength # setosa 0.2 PetalWidth # setosa 5.1 SepalLength # setosa 3.5 SepalWidth # ... .. ... # setosa 1.4 PetalLength # setosa 0.2 PetalWidth # setosa 5.0 SepalLength # setosa 3.6 SepalWidth smelted.count() # 600L
複数行持ちの値を列持ちに変換 (pivot)
pandas
では DataFrame.pivot
。pivotするデータは列にする値 (以下では Species ) と行にする値 (以下では variable ) の組がユニークになっている必要がある。そのため、まず pivot 用データを作成 -> その後 pivot する。
# pandas # pivot 用データを作成 punpivot = pmelted.groupby(['Species', 'variable']).sum() punpivot = punpivot.reset_index() punpivot # Species variable value # 0 setosa PetalLength 73.2 # 1 setosa PetalWidth 12.2 # 2 setosa SepalLength 250.3 # 3 setosa SepalWidth 170.9 # 4 versicolor PetalLength 213.0 # .. ... ... ... # 7 versicolor SepalWidth 138.5 # 8 virginica PetalLength 277.6 # 9 virginica PetalWidth 101.3 # 10 virginica SepalLength 329.4 # 11 virginica SepalWidth 148.7 # # [12 rows x 3 columns] # pivot punpivot.pivot(index='variable', columns='Species', values='value') # Species setosa versicolor virginica # variable # PetalLength 73.2 213.0 277.6 # PetalWidth 12.2 66.3 101.3 # SepalLength 250.3 296.8 329.4 # SepalWidth 170.9 138.5 148.7
PySpark
の DataFrame
のままでは同じ処理はできないようなので、一度 RDD
に変換してから、 groupBy
-> map
# PySpark # pivot 用データを作成 sunpivot = smelted.groupBy('Species', 'variable').sum() sunpivot.show() # Species variable SUM(value) # versicolor SepalWidth 138.5 # versicolor SepalLength 296.8 # setosa PetalLength 73.2 # virginica PetalWidth 101.29999999999998 # versicolor PetalWidth 66.30000000000001 # setosa SepalWidth 170.90000000000003 # virginica PetalLength 277.59999999999997 # setosa SepalLength 250.29999999999998 # versicolor PetalLength 213.0 # setosa PetalWidth 12.199999999999996 # virginica SepalWidth 148.7 # virginica SepalLength 329.3999999999999 def reducer(obj): # variable : value の辞書を作成 result = {o[1]:o[2] for o in obj[1]} return Row(Species=obj[0], **result) # pivot spivot = sunpivot.rdd.groupBy(lambda x: x[0]).map(reducer) spivot.collect() # [Row(PetalLength=277.59999999999997, PetalWidth=101.29999999999998, SepalLength=329.3999999999999, SepalWidth=148.7, Species=u'virginica'), # Row(PetalLength=73.2, PetalWidth=12.199999999999996, SepalLength=250.29999999999998, SepalWidth=170.90000000000003, Species=u'setosa'), # Row(PetalLength=213.0, PetalWidth=66.30000000000001, SepalLength=296.8, SepalWidth=138.5, Species=u'versicolor')] sqlContext.createDataFrame(spivot).show() # PetalLength PetalWidth SepalLength SepalWidth Species # 277.59999999999997 101.29999999999998 329.3999999999999 148.7 virginica # 73.2 12.199999999999996 250.29999999999998 170.90000000000003 setosa # 213.0 66.30000000000001 296.8 138.5 versicolor
列の分割 / 結合
列の値を複数列に分割
ある列の値を適当に文字列処理して、新しい列を作成したい。pandas
には 文字列処理用のアクセサがあるため、 assign
と組み合わせて以下のように書ける。
# pandas psplitted = pmelted.assign(Parts=pmelted['variable'].str.slice(0, 5), Scale=pmelted['variable'].str.slice(5)) psplitted # Species variable value Parts Scale # 0 setosa SepalLength 5.1 Sepal Length # 1 setosa SepalLength 4.9 Sepal Length # 2 setosa SepalLength 4.7 Sepal Length # 3 setosa SepalLength 4.6 Sepal Length # 4 setosa SepalLength 5.0 Sepal Length # .. ... ... ... ... ... # 595 virginica PetalWidth 2.3 Petal Width # 596 virginica PetalWidth 1.9 Petal Width # 597 virginica PetalWidth 2.0 Petal Width # 598 virginica PetalWidth 2.3 Petal Width # 599 virginica PetalWidth 1.8 Petal Width # # [600 rows x 5 columns]
PySpark
には上記のようなメソッドはないので map
で処理する。
# PySpark def splitter(row): parts = row[2][:5] scale = row[2][5:] return Row(Species=row[0], value=row[1], Parts=parts, Scale=scale) ssplitted = sqlContext.createDataFrame(smelted.map(splitter)) ssplitted.show() # Parts Scale Species value # Petal Length setosa 1.4 # Petal Width setosa 0.2 # Sepal Length setosa 5.1 # Sepal Width setosa 3.5 # Petal Length setosa 1.4 # .. .. ... .. # Petal Length setosa 1.4 # Petal Width setosa 0.2 # Sepal Length setosa 5.0 # Sepal Width setosa 3.6
複数列の値を一列に結合
pandas
では普通に文字列結合すればよい。
# pandas psplitted['variable2'] = psplitted['Parts'] + psplitted['Scale'] psplitted # Species variable value Parts Scale variable2 # 0 setosa SepalLength 5.1 Sepal Length SepalLength # 1 setosa SepalLength 4.9 Sepal Length SepalLength # 2 setosa SepalLength 4.7 Sepal Length SepalLength # 3 setosa SepalLength 4.6 Sepal Length SepalLength # 4 setosa SepalLength 5.0 Sepal Length SepalLength # .. ... ... ... ... ... ... # 595 virginica PetalWidth 2.3 Petal Width PetalWidth # 596 virginica PetalWidth 1.9 Petal Width PetalWidth # 597 virginica PetalWidth 2.0 Petal Width PetalWidth # 598 virginica PetalWidth 2.3 Petal Width PetalWidth # 599 virginica PetalWidth 1.8 Petal Width PetalWidth # # [600 rows x 6 columns]
PySpark
では map
。
# PySpark def unite(row): return Row(Species=row[2], value=row[3], variable=row[0] + row[1]) sqlContext.createDataFrame(splitted.map(unite)).show() # Species value variable # setosa 1.4 PetalLength # setosa 0.2 PetalWidth # setosa 5.1 SepalLength # setosa 3.5 SepalWidth # .. .. .. # setosa 1.4 PetalLength # setosa 0.2 PetalWidth # setosa 5.0 SepalLength # setosa 3.6 SepalWidth
補足 withColumn
の場合、オペレータは 数値の演算として扱われてしまうようなのでここでは使えない。
# PySpark (NG!) ssplitted.withColumn('variable', splitted.Parts + splitted.Scale).show() # Parts Scale Species value variable # Petal Length setosa 1.4 null # Petal Width setosa 0.2 null # .. .. .. .. ..
まとめ
PySpark
と pandas
のデータ集約/変形処理を整理した。
データ分析用途で利用したい場合、(ごく当たり前だが) データ量が少なく手元でさっといろいろ試したい場合は pandas
、データ量が比較的多く 単純な処理を全体にかけたい場合は Spark
がよい。
Spark
は map 系の API が充実するとさらに使いやすくなりそうだ。が、小回りの効く文法/機能が充実していくことは考えにくいので 完全に Spark
だけでデータ分析をする、、という状態には将来もならないのではないかと思う。小さいデータは pandas
使いましょう。
Learning Spark: Lightning-Fast Big Data Analysis
- 作者: Holden Karau,Andy Konwinski,Patrick Wendell,Matei Zaharia
- 出版社/メーカー: O'Reilly Media
- 発売日: 2015/01/28
- メディア: Kindle版
- この商品を含むブログを見る
Python pandas 関連エントリの目次
このブログ中の pandas
関連のエントリをまとめた目次です。
最近 pandas
開発チーム と PyData グループ の末席に加えていただき、パッケージ自体の改善にもより力を入れたいと思います。使い方についてご質問などありましたら Twitter で @ ください。
目次につけた絵文字は以下のような意味です。
- 🔰: 最初に知っておけば一通りの操作ができそうな感じのもの。
- 🚧: v0.16.0 時点で少し情報が古く、機能の改善を反映する必要があるもの。
- 🚫: 当該の機能が deprecate 扱いとなり、将来的に 代替の方法が必要になるもの。
基本
また、上記に対応した比較エントリ:
R {dplyr}, {tidyr}
PySpark
Julia
機能別
データ選択
- Python pandas データ選択処理をちょっと詳しく <前編>
- Python pandas データ選択処理をちょっと詳しく <中編>
- Python pandas データ選択処理をちょっと詳しく <後編>
グルーピング/集約/集計/データ変形
- Python pandas の算術演算 / 集約関数 / 統計関数まとめ
- Python pandas アクセサ / Grouperで少し高度なグルーピング/集計
- Python pandas データのイテレーションと関数適用、pipe
- Python pandas 図でみる データ連結 / 結合処理
- Python pandas 欠損値/外れ値/離散化の処理
そのほかの前処理
データ操作
文字列、日付など、各データ型に固有の操作。
入出力
可視化
- Python pandas プロット機能を使いこなす
- Python Jupyter + pandas で DataFrame 表示をカスタマイズする
- Python spyre によるデータ分析結果のWebアプリ化
地理情報の可視化
その他
- Python pandas のデータを scikit-learn でうまいこと処理したい
- Python pandas で日本の株価情報取得とローソク足チャート描画
- Python pandas パフォーマンス維持のための 3 つの TIPS
- Dask で 並列 DataFrame 処理
変更点
自作パッケージ
- Python pandas / scikit-learn 向けのちょっとしたパッケージ作った <前編>
- Python pandas / scikit-learn 向けのちょっとしたパッケージ作った <後編>
- Python pandas 日本語環境向けのちょっとしたパッケージ作った
発表資料
Pythonによるデータ分析入門 ―NumPy、pandasを使ったデータ処理
- 作者: Wes McKinney,小林儀匡,鈴木宏尚,瀬戸山雅人,滝口開資,野上大介
- 出版社/メーカー: オライリージャパン
- 発売日: 2013/12/26
- メディア: 大型本
- この商品を含むブログ (13件) を見る
簡単なデータ操作を PySpark & pandas の DataFrame で行う
Spark v1.3.0 で追加された DataFrame
、結構いいらしいという話は聞いていたのだが 自分で試すことなく時間が過ぎてしまっていた。ようやく PySpark
を少し触れたので pandas
との比較をまとめておきたい。内容に誤りや よりよい方法があればご指摘 下さい。
過去に基本的なデータ操作について 以下 ふたつの記事を書いたことがあるので、同じ処理のPySpark
版を加えたい。今回は ひとつめの "簡単なデータ操作〜" に相当する内容。
pandas 版
準備
環境は EC2 に作る。Spark のインストールについてはそのへんに情報あるので省略。サンプルデータは iris を csv でダウンロードしてホームディレクトリにおいた。以降の操作は PySpark
のコンソールで行う。
# Spark のパスに移動 $ echo $SPARK_HOME /usr/local/spark $ cd $SPARK_HOME $ pwd /usr/local/spark # 既定だとログの出力が邪魔なので抑制 $ cp conf/log4j.properties.template conf/log4j.properties $ vi conf/log4j.properties # INFO をすべて WARN に変える $ bin/pyspark # Welcome to # ____ __ # / __/__ ___ _____/ /__ # _\ \/ _ \/ _ `/ __/ '_/ # /__ / .__/\_,_/_/ /_/\_\ version 1.3.1 # /_/ # # Using Python version 2.7.9 (default, Apr 1 2015 19:28:03) # SparkContext available as sc, HiveContext available as sqlContext.
ここでは HDFS は使わず、pandas
の read_csv
で pandas.DataFrame
を作成する。列名に "." が入っていると PySpark
でうまく動かないようだったため以下のカラム名にした。
import pandas as pd # 表示する行数を設定 pd.options.display.max_rows=10 names = ['SepalLength', 'SepalWidth', 'PetalLength', 'PetalWidth', 'Species'] # pandas pdf = pd.read_csv('~/iris.csv', header=None, names=names) pdf # SepalLength SepalWidth PetalLength PetalWidth Species # 0 5.1 3.5 1.4 0.2 setosa # 1 4.9 3.0 1.4 0.2 setosa # 2 4.7 3.2 1.3 0.2 setosa # 3 4.6 3.1 1.5 0.2 setosa # 4 5.0 3.6 1.4 0.2 setosa # .. ... ... ... ... ... # 145 6.7 3.0 5.2 2.3 virginica # 146 6.3 2.5 5.0 1.9 virginica # 147 6.5 3.0 5.2 2.0 virginica # 148 6.2 3.4 5.4 2.3 virginica # 149 5.9 3.0 5.1 1.8 virginica # # [150 rows x 5 columns]
これを PySpark
の DataFrame
に変換する。pandas
と PySpark
の基本的な違いとして、
PySpark
にはpandas.Index
にあたるものがないPySpark
にはpandas.Series
にあたるものがない ( 代わり? にRow
があるが、どの程度 柔軟な操作ができるかは未知 )
# PySpark sdf = sqlContext.createDataFrame(pdf) sdf # DataFrame[SepalLength: double, SepalWidth: double, PetalLength: double, # PetalWidth: double, Species: string] # データの中身を表示するには DataFrame.show() sdf.show() # SepalLength SepalWidth PetalLength PetalWidth Species # 5.1 3.5 1.4 0.2 setosa # 4.9 3.0 1.4 0.2 setosa # 4.7 3.2 1.3 0.2 setosa # 4.6 3.1 1.5 0.2 setosa # .. ... ... ... ... # 5.4 3.9 1.3 0.4 setosa # 5.1 3.5 1.4 0.3 setosa # 5.7 3.8 1.7 0.3 setosa # 5.1 3.8 1.5 0.3 setosa
以降、pdf
が pandas
、sdf
が PySpark
の DataFrame
をあらわす。
type(pdf) # <class 'pandas.core.frame.DataFrame'> type(sdf) # <class 'pyspark.sql.dataframe.DataFrame'>
基本
まず基本的な操作を。先頭いくつかのデータを確認するには head
。
PySpark
での返り値は Row
インスタンスのリストになる。
# pandas pdf.head(5) # SepalLength SepalWidth PetalLength PetalWidth Species # 0 5.1 3.5 1.4 0.2 setosa # 1 4.9 3.0 1.4 0.2 setosa # 2 4.7 3.2 1.3 0.2 setosa # 3 4.6 3.1 1.5 0.2 setosa # 4 5.0 3.6 1.4 0.2 setosa # PySpark sdf.head(5) # [Row(SepalLength=5.1, SepalWidth=3.5, PetalLength=1.4, PetalWidth=0.2, Species=u'setosa'), # Row(SepalLength=4.9, SepalWidth=3.0, PetalLength=1.4, PetalWidth=0.2, Species=u'setosa'), # Row(SepalLength=4.7, SepalWidth=3.2, PetalLength=1.3, PetalWidth=0.2, Species=u'setosa'), # Row(SepalLength=4.6, SepalWidth=3.1, PetalLength=1.5, PetalWidth=0.2, Species=u'setosa'), # Row(SepalLength=5.0, SepalWidth=3.6, PetalLength=1.4, PetalWidth=0.2, Species=u'setosa')] # pandas type(pdf.head(5)) # <class 'pandas.core.frame.DataFrame'> # PySpark type(sdf.head(5)) # <type 'list'>
各カラムのデータ型の確認には dtypes
。
# pandas pdf.dtypes # SepalLength float64 # SepalWidth float64 # PetalLength float64 # PetalWidth float64 # Species object # dtype: object # PySpark sdf.dtypes # [('SepalLength', 'double'), ('SepalWidth', 'double'), # ('PetalLength', 'double'), ('PetalWidth', 'double'), # ('Species', 'string')]
要約統計量の確認は describe
。
# pandas pdf.describe() # SepalLength SepalWidth PetalLength PetalWidth # count 150.000000 150.000000 150.000000 150.000000 # mean 5.843333 3.054000 3.758667 1.198667 # std 0.828066 0.433594 1.764420 0.763161 # min 4.300000 2.000000 1.000000 0.100000 # 25% 5.100000 2.800000 1.600000 0.300000 # 50% 5.800000 3.000000 4.350000 1.300000 # 75% 6.400000 3.300000 5.100000 1.800000 # max 7.900000 4.400000 6.900000 2.500000 # PySpark sdf.describe().show() # summary SepalLength SepalWidth PetalLength PetalWidth # count 150 150 150 150 # mean 5.843333333333334 3.0540000000000003 3.758666666666666 1.1986666666666668 # stddev 0.8253012917851317 0.4321465800705415 1.7585291834055206 0.760612618588172 # min 4.3 2.0 1.0 0.1 # max 7.9 4.4 6.9 2.5
列操作
列名操作
参照と変更。PySpark
ではプロパティへの代入は不可。
# pandas pdf.columns # Index([u'SepalLength', u'SepalWidth', u'PetalLength', u'PetalWidth', u'Species'], dtype='object') # PySpark sdf.columns # [u'SepalLength', u'SepalWidth', u'PetalLength', u'PetalWidth', u'Species'] # pandas pdf.columns = [1, 2, 3, 4, 5] pdf.columns # Int64Index([1, 2, 3, 4, 5], dtype='int64') # PySpark sdf.columns = [1, 2, 3, 4, 5] # AttributeError: can't set attribute
マッピングによって列名を変更するには、pandas
では DataFrame.rename
。
# pandas pdf.columns # Int64Index([1, 2, 3, 4, 5], dtype='int64') pdf = pdf.rename(columns={1:'SepalLength', 2:'SepalWidth', 3:'PetalLength', 4:'PetalWidth', 5:'Species'}) pdf # SepalLength SepalWidth PetalLength PetalWidth Species # 0 5.1 3.5 1.4 0.2 setosa # 1 4.9 3.0 1.4 0.2 setosa # 2 4.7 3.2 1.3 0.2 setosa # 3 4.6 3.1 1.5 0.2 setosa # 4 5.0 3.6 1.4 0.2 setosa # .. ... ... ... ... ... # 145 6.7 3.0 5.2 2.3 virginica # 146 6.3 2.5 5.0 1.9 virginica # 147 6.5 3.0 5.2 2.0 virginica # 148 6.2 3.4 5.4 2.3 virginica # 149 5.9 3.0 5.1 1.8 virginica # # [150 rows x 5 columns]
PySpark
では DataFrame.withColumnRenamed
。
# PySpark sdf.withColumnRenamed('Species', 'xxx') # DataFrame[SepalLength: double, SepalWidth: double, PetalLength: double, # PetalWidth: double, xxx: string]
補足 非破壊的な処理のため、反映には代入が必要。ここでは列名変更したくないので代入しない。
列名による列選択
pandas.DataFrame
の列選択では 当該の列のデータを含む Series
が返ってくる。
# pandas pdf.Species # 0 setosa # 1 setosa # 2 setosa # 3 setosa # 4 setosa # ... # 145 virginica # 146 virginica # 147 virginica # 148 virginica # 149 virginica # Name: Species, dtype: object pdf['Species'] # 略
PySpark
では、列の属性を表現する Column
インスタンスが返ってくる。
# PySpark sdf.Species # Column<Species> sdf['Species'] # Column<Species>
pandas
、PySpark
いずれも、文字列ではなくリストを渡せば その列を DataFrame
としてスライシングする。
# pandas pdf[['PetalWidth']] # PetalWidth # 0 0.2 # 1 0.2 # 2 0.2 # 3 0.2 # 4 0.2 # .. ... # 145 2.3 # 146 1.9 # 147 2.0 # 148 2.3 # 149 1.8 # # [150 rows x 1 columns] # PySpark sdf[['PetalWidth']] # DataFrame[PetalWidth: double] # pandas pdf[['PetalWidth', 'PetalLength']] # PetalWidth PetalLength # 0 0.2 1.4 # 1 0.2 1.4 # 2 0.2 1.3 # 3 0.2 1.5 # 4 0.2 1.4 # .. ... ... # 145 2.3 5.2 # 146 1.9 5.0 # 147 2.0 5.2 # 148 2.3 5.4 # 149 1.8 5.1 # # [150 rows x 2 columns] # PySpark sdf[['PetalWidth', 'PetalLength']] # DataFrame[PetalWidth: double, PetalLength: double]
PySpark
では DataFrame.select
でもよい。
# PySpark sdf.select(sdf['PetalWidth'], sdf['PetalLength']) # DataFrame[PetalWidth: double, PetalLength: double]
真偽値リストによる列選択
自分はあまり使わないのだが、R {dplyr} との比較という観点で。
indexer = [False, False, True, True, False] # pandas pdf.loc[:, indexer] # PetalLength PetalWidth # 0 1.4 0.2 # 1 1.4 0.2 # 2 1.3 0.2 # 3 1.5 0.2 # 4 1.4 0.2 # .. ... ... # 145 5.2 2.3 # 146 5.0 1.9 # 147 5.2 2.0 # 148 5.4 2.3 # 149 5.1 1.8 # # [150 rows x 2 columns] # PySpark sdf[[c for c, i in zip(sdf.columns, indexer) if i is True]] # DataFrame[PetalLength: double, PetalWidth: double]
列の属性による列選択
列の型が pandas
の場合は float
、PySpark
の場合は double
の列のみ取り出す。
# pandas pdf.loc[:, pdf.dtypes == float] # SepalLength SepalWidth PetalLength PetalWidth # 0 5.1 3.5 1.4 0.2 # 1 4.9 3.0 1.4 0.2 # 2 4.7 3.2 1.3 0.2 # 3 4.6 3.1 1.5 0.2 # 4 5.0 3.6 1.4 0.2 # .. ... ... ... ... # 145 6.7 3.0 5.2 2.3 # 146 6.3 2.5 5.0 1.9 # 147 6.5 3.0 5.2 2.0 # 148 6.2 3.4 5.4 2.3 # 149 5.9 3.0 5.1 1.8 # # [150 rows x 4 columns] # PySpark sdf[[c for c, type in sdf.dtypes if type == 'double']] # DataFrame[SepalLength: double, SepalWidth: double, PetalLength: double, # PetalWidth: double]
行操作
値の条件による行選択
# pandas pdf[pdf['Species'] == 'virginica'] # SepalLength SepalWidth PetalLength PetalWidth Species # 100 6.3 3.3 6.0 2.5 virginica # 101 5.8 2.7 5.1 1.9 virginica # 102 7.1 3.0 5.9 2.1 virginica # 103 6.3 2.9 5.6 1.8 virginica # 104 6.5 3.0 5.8 2.2 virginica # .. ... ... ... ... ... # 145 6.7 3.0 5.2 2.3 virginica # 146 6.3 2.5 5.0 1.9 virginica # 147 6.5 3.0 5.2 2.0 virginica # 148 6.2 3.4 5.4 2.3 virginica # 149 5.9 3.0 5.1 1.8 virginica # # [50 rows x 5 columns] # PySpark sdf[sdf['Species'] == 'virginica'].show() # SepalLength SepalWidth PetalLength PetalWidth Species # 6.3 3.3 6.0 2.5 virginica # 5.8 2.7 5.1 1.9 virginica # 7.1 3.0 5.9 2.1 virginica # 6.3 2.9 5.6 1.8 virginica # .. ... ... ... ... # 6.5 3.0 5.5 1.8 virginica # 7.7 3.8 6.7 2.2 virginica # 7.7 2.6 6.9 2.3 virginica # 6.0 2.2 5.0 1.5 virginica
PySpark
では DataFrame.filter
でもよい。
# PySpark sdf.filter(sdf['Species'] == 'virginica').show() # 略
行番号による行選択
PySpark
では 冒頭の n 行を抽出したりはできるが、適当な行選択ではできない、、、と思う。PySpark
でどうしてもやりたければ index にあたる列を追加 -> 列の値で選択。
4/27追記 DataFrame.collect
で Row
のリストに変換 -> 再度 DataFrame
化すればスキーマ変更せずにできる、、、が、マスタでデータを処理することになるのでよい方法ではなさそう。
# pandas pdf.loc[[2, 3, 4]] # SepalLength SepalWidth PetalLength PetalWidth Species # 2 4.7 3.2 1.3 0.2 setosa # 3 4.6 3.1 1.5 0.2 setosa # 4 5.0 3.6 1.4 0.2 setosa # PySpark [c for i, c in enumerate(sdf.collect()) if i in (2, 3 ,4)] # [Row(SepalLength=4.7, SepalWidth=3.2, PetalLength=1.3, PetalWidth=0.2, Species=u'setosa'), # Row(SepalLength=4.6, SepalWidth=3.1, PetalLength=1.5, PetalWidth=0.2, Species=u'setosa'), # Row(SepalLength=5.0, SepalWidth=3.6, PetalLength=1.4, PetalWidth=0.2, Species=u'setosa')] sqlContext.createDataFrame([c for i, c in enumerate(sdf.collect()) if i in (2, 3 ,4)]).show() # SepalLength SepalWidth PetalLength PetalWidth Species # 4.7 3.2 1.3 0.2 setosa # 4.6 3.1 1.5 0.2 setosa # 5.0 3.6 1.4 0.2 setosa
ランダムサンプリング
pandas
ではindex
をサンプリングしてスライス。
import random # pandas pdf.loc[random.sample(pdf.index, 5)] # Sepal.Length Sepal.Width Petal.Length Petal.Width Species # 64 6.1 2.9 4.7 1.4 versicolor # 17 5.4 3.9 1.3 0.4 setosa # 14 4.3 3.0 1.1 0.1 setosa # 4 4.6 3.1 1.5 0.2 setosa # 146 6.7 3.0 5.2 2.3 virginica
PySpark
では DataFrame.sample
。この関数、fraction
の確率で行ごとに選択しているようで、サンプリングするたびに抽出される行数が変わる ( 行数指定でランダムサンプリング、という処理はできなさそう )。
# PySpark sdf.sample(False, 5.0 / sdf.count()).show() # SepalLength SepalWidth PetalLength PetalWidth Species # 5.4 3.4 1.7 0.2 setosa # 5.7 2.8 4.5 1.3 versicolor # 6.2 2.8 4.8 1.8 virginica # 6.1 3.0 4.9 1.8 virginica
補足 今後、pandas
にもサンプリングメソッド DataFrame.sample
が実装予定 #9666。
列の追加
pandas
では DataFrame
に対して 直接 新しい列を追加できるが、PySpark
ではできない。
# 破壊的な処理になるのでコピー pdf_temp = pdf.copy() # pandas pdf_temp['PetalMult'] = pdf_temp['PetalWidth'] * pdf_temp['PetalLength'] pdf_temp # SepalLength SepalWidth PetalLength PetalWidth Species PetalMult # 0 5.1 3.5 1.4 0.2 setosa 0.28 # 1 4.9 3.0 1.4 0.2 setosa 0.28 # 2 4.7 3.2 1.3 0.2 setosa 0.26 # 3 4.6 3.1 1.5 0.2 setosa 0.30 # 4 5.0 3.6 1.4 0.2 setosa 0.28 # .. ... ... ... ... ... ... # 145 6.7 3.0 5.2 2.3 virginica 11.96 # 146 6.3 2.5 5.0 1.9 virginica 9.50 # 147 6.5 3.0 5.2 2.0 virginica 10.40 # 148 6.2 3.4 5.4 2.3 virginica 12.42 # 149 5.9 3.0 5.1 1.8 virginica 9.18 # # [150 rows x 6 columns] # PySpark sdf['PetalMult'] = sdf['PetalWidth'] * sdf['PetalLength'] # TypeError: 'DataFrame' object does not support item assignment
PySpark
での列追加は DataFrame.withColumn
。
# PySpark sdf.withColumn('PetalMult', sdf.PetalWidth * sdf.PetalLength).show() # SepalLength SepalWidth PetalLength PetalWidth Species PetalMult # 5.1 3.5 1.4 0.2 setosa 0.27999999999999997 # 4.9 3.0 1.4 0.2 setosa 0.27999999999999997 # 4.7 3.2 1.3 0.2 setosa 0.26 # 4.6 3.1 1.5 0.2 setosa 0.30000000000000004 # .. .. ... ... ... ... # 5.4 3.9 1.3 0.4 setosa 0.52 # 5.1 3.5 1.4 0.3 setosa 0.42 # 5.7 3.8 1.7 0.3 setosa 0.51 # 5.1 3.8 1.5 0.3 setosa 0.44999999999999996
補足 pandas
v0.16.0 以降では 類似のメソッド DataFrame.assign
が追加。
# pandas pdf.assign(PetalMult=pdf['PetalWidth'] * pdf['PetalLength']) # 略
まとめ
PySpark
と pandas
のデータ操作を整理した。
PySpark
、上のような基本的な処理は pandas
と似たやり方で直感的に使える感じだ。
大規模処理は PySpark
、細かい取り回しが必要なものは pandas
でうまく併用できるとよさそう。
4/29追記 続きはこちら。
Learning Spark: Lightning-Fast Big Data Analysis
- 作者: Holden Karau,Andy Konwinski,Patrick Wendell,Matei Zaharia
- 出版社/メーカー: O'Reilly Media
- 発売日: 2015/01/28
- メディア: Kindle版
- この商品を含むブログを見る