日々精進

新しく学んだことを書き留めていきます

DataFrameのgroupbyして集計する処理を並列化すると10倍速くなった

コア数が20あるCPUでシングルプロセスで処理していたのをマルチプロセスにしたところ10倍速くなってビックリした。 やり方も簡単で、以下のようにgroup毎に集計する処理を並列化するだけでいい。これは便利。

def aggregate(sales: DataFrame, window: int):
    def applyParallel(dfGrouped, func):
        retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped)
        return pd.concat(retLst, axis=0)

    def _agg(df: DataFrame):
        return df[SALES_COUNT_LAG_1].rolling(window) \
            .agg(["mean", "max", "min", "std", lambda x: x.mode()[0]]) \
            .reset_index().set_index("seq_idx").astype("float32")

    return applyParallel(sales.groupby([STORE_ID, ITEM_ID]), _agg)

参考:

stackoverflow.com