cloudwatchのような監視ツールがあればメモリ使用量もグラフで出してくれるんだろうけど、 そこまでしなくていいやという場合。以下をコンソールで実行すると1秒に一度特定のプロセスのメモリ使用量などを取ってきて表示してくれる。
top -d 1 -b |grep <process>
参考:
cloudwatchのような監視ツールがあればメモリ使用量もグラフで出してくれるんだろうけど、 そこまでしなくていいやという場合。以下をコンソールで実行すると1秒に一度特定のプロセスのメモリ使用量などを取ってきて表示してくれる。
top -d 1 -b |grep <process>
参考:
コア数が20あるCPUでシングルプロセスで処理していたのをマルチプロセスにしたところ10倍速くなってビックリした。 やり方も簡単で、以下のようにgroup毎に集計する処理を並列化するだけでいい。これは便利。
def aggregate(sales: DataFrame, window: int): def applyParallel(dfGrouped, func): retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped) return pd.concat(retLst, axis=0) def _agg(df: DataFrame): return df[SALES_COUNT_LAG_1].rolling(window) \ .agg(["mean", "max", "min", "std", lambda x: x.mode()[0]]) \ .reset_index().set_index("seq_idx").astype("float32") return applyParallel(sales.groupby([STORE_ID, ITEM_ID]), _agg)
参考:
売上の移動平均・最大・最小・標準偏差・最頻値を計算したい場合のコードは以下。以外と手こずった。。
return sales.groupby([STORE_ID, ITEM_ID])[SALES_COUNT_LAG_1].rolling(window)\ .agg(["mean", "max", "min", "std", lambda x: x.mode()[0]]) \ .reset_index().set_index("seq_idx").drop(columns=[STORE_ID, ITEM_ID]).astype("float32")
rollingメソッドにoffset引数があることを期待したけど、なかったので例えば1日前~15日前までの移動平均を計算したい場合は
1日前の売上
列をまず作り、それの移動平均を計算する。
ホントは以下のようにshiftを挟むだけでいいのかも知れないが、今回は別の特徴量にした。
sales.groupby([STORE_ID, ITEM_ID])[SALES_COUNT].shift(1).rolling...
参考:
single indexの場合は df.set_index("seq_idx")
でいい。
multi indexの場合は一度reset_indexしないといけない。
知らなかった。。S3にはバージョニング機能があるのでoldフォルダにファイルを取っておいたりしないで良かったのか。 バケット単位でのON/OFFしかできないのでその点はちょっと使いづらいが、便利そう。
永続化フォーマットとして今までparquetを使っていたが、float16に対応していなかったりして困ったのでフォーマットを変えることにした。
以下によるとpickleが良さそう。こういう検証は有り難い。。