rcmdnk's blog
Last update

Chunk (Kindle Single) (English Edition)

PythonのGeneratorを使う際、 生成される順序を保証しつつ中の処理はマルチプロセス化する方法について。

Pythonの並列処理

Pythonには multiprocessing という標準ライブラリがあり、これを用いてマルチプロセス化することが出来ます。

これ以外に Joblib というライブラリがあり、 multiprocessingをより簡単に使えるようにしたヘルパークラスを持っています。

今回はinputを配列で与えて配列のそれぞれの値に対して同じ処理をして 結果を返すようなものを考え、その結果を順に返していくようなGenerator関数を考えます。

Joblibでは現状このような用途のための機能を開発中のようです。

追記: 2023/04/19

#588の方はマージされたので後はリリースを待つのみ。

追記ここまで

従って今回はmultiprocessingを使った方法を考えます。

generator/multiprocessing_generator というGenerator関数を直接multiprocess化してくれるライブラリとかもあり 便利そうですが、今回はmultiprocessingを直接使ってちょっと色々調べてみました。

ちなみにthreadingというスレッド化のための標準モジュールもありますが、 PythonにはGlobal Interpreter Lock (GIL)の元で走っているため、 シングルプロセスだと複数のスレッドを作って複数のCPU上で走らせても 排他ロックがかかって1つのスレッドのみがバイトコードを実行できる状態になります。 なのでファイルの読み込みなどの処理で時間がかかる処理の並列処理は速くなりますが、 CPUを使うだけの処理ならマルチスレッド化しても速くなりません。

multirpocessingを使ったマルチプロセス化

以下のようなスクリプトでマルチプロセス化出来ます。

mp_imap.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#!/usr/bin/env python3

from time import sleep
from multiprocessing import Pool


def demo_func(n):
    print("demo_func", n, "start")
    sleep(n)
    print("demo_func", n, "end")
    return n



def gen():
    with Pool(4) as pool:
        returns = pool.imap(demo_func, [10, 9, 8, 7, 6, 5, 4, 3, 2, 1])
        for ret in returns:
            yield ret


def main():
    for x in gen():
        print(x)


if __name__ == "__main__":
    main()
  • demo_func: 受け取った数字の分だけsleepしてその数字を返すだけの処理
  • gen: Generator関数
  • main: genから受け取った順にprint

をしているだけです。

Pool(4)は4つのプロセスを使う宣言で、引数(第一引数のprocesses)を与えない、もしくはNoneにすると システムのCPU分だけプロセスを作ります。

imapは第一引数に関数を取って、iterableな第二引数を順に第一引数の関数に渡して処理を実行します。

この際、結果が遅延評価されるため、すべての引数に対する処理が終わる前に最初の方の結果が返さえるようになります。

実際にやってみると

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
$ python mp_imap.py tten
demo_func 10 start
demo_func 9 start
demo_func 8 start
demo_func 7 start
demo_func 7 end
demo_func 6 start
demo_func 8 end
demo_func 5 start
demo_func 9 end
demo_func 4 start
demo_func 10 end
demo_func 3 start
10
9
8
7
demo_func 3 end
demo_func 2 start
demo_func 4 end
demo_func 1 start
demo_func 5 end
demo_func 6 end
6
5
4
3
demo_func 1 end
demo_func 2 end
2
1

みたいな出力が得られます。

このスクリプトでは各処理はn秒だけsleepするわけですが、最初の方から10~1となっているため 後ろの方に行くに従い速く終わります。

従って同時に処理を始めると後ろの処理の方が先に終わるわけですが、 imapでは出力順序が保証されるため、まず10, 次に9、と順に出てきます。

処理自体はdemo_func 9 endの方がdemo_func 10 endより先に出ていて先に終わっていることが分かります。

また、10, 9と数字がmain関数の中で出力されたあとにdemo_func 6 endなど genの処理がまだ行われていることが分かります。 これが遅延評価です。

map, imap_unordered

Poolクラスのimpapと似たような関数にmap, imap_unorderedがあります。

mapは遅延評価をしないので一旦すべての処理を行ってから次に行くような形になります。

imap_unorderedは順序の保証をせず、終わった順に出力されます。

上のスクリプトでpool.imapの部分をpool.mapに書き換えてみると出力は

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
$ python mp_map.py tten
demo_func 10 start
demo_func 9 start
demo_func 8 start
demo_func 7 start
demo_func 7 end
demo_func 6 start
demo_func 8 end
demo_func 5 start
demo_func 9 end
demo_func 4 start
demo_func 10 end
demo_func 3 start
demo_func 3 end
demo_func 2 start
demo_func 4 end
demo_func 5 end
demo_func 1 start
demo_func 6 end
demo_func 1 end
demo_func 2 end
10
9
8
7
6
5
4
3
2
1

こんな感じで全部の処理が終わったあとで順に出力されています。

上のスクリプトでpool.imapの部分をpool.imap_unorderedに書き換えてみると出力は

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
$ python mp_imap_unordered.py tten
demo_func 10 start
demo_func 9 start
demo_func 8 start
demo_func 7 start
demo_func 7 end
demo_func 6 start
7
demo_func 8 end
demo_func 5 start
8
demo_func 9 end
demo_func 4 start
9
demo_func 10 end
demo_func 3 start
10
demo_func 3 end
demo_func 4 end
demo_func 2 start
demo_func 1 start
demo_func 5 end
demo_func 6 end
3
4
5
6
demo_func 1 end
1
demo_func 2 end
2

こんな感じで終了したものから即座に出力されていることが分かります。

map, imap, imap_unorderedのどれを使うか

もし出力の順番に意味はなく処理が終わった順にどんどん次の処理に送りたい場合には imapよりもimap_unorderedを使うべきです。

順序を保持したいけど処理した順にどんどん次へ送りたいときはimapです。

すべての結果を次にまとめて送りたいときはmapでもimapでも出来ますが、 maplistを返し、imapgeneratorを返すのでその型による処理の対処は変えなくてはいけませんが、 基本的に全部待つのであれば処理速度は同じようなものになりそうです。

ただ、ちょっと違うのが、chunksizeという引数。

mapはデフォルトはNoneですが、Noneだと

pool.py
1
2
3
chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
if extra:
    chunksize += 1

https://github.com/python/cpython/blob/3.10/Lib/multiprocessing/pool.py#L480

と計算されます。

len(iterable)が入力変数の数(つまりジョブの総数)でlen(self._pool)が指定したプロセス数です。

上の例だと、len(iterable)=10, len(self._pool)=4なのでchunksize=1になります。

ジョブの総数が大きくなるとchunksizeも大きくなることになります。

chunksizeはその数分を1chunkとして1プロセスに送り込む単位になります。

chunkszieの数字が小さい、つまりchunkの数が多くなるとその分だけプロセスのチェックの処理が入り 遅くなります。

一方で1つのchunkの中に入れられたジョブはすべて同じプロセス(CPU)に送られるので 処理速度に偏りがあると別のCPUが空いてても1つのCPUでいくつもジョブを処理続けることになったりします。

そこら辺は実際に処理するジョブの性質によるので、特にジョブの総数が大きくなるような場合に 処理速度を改善したいのであれば色々と調整する余地はある部分です。

で、このchunksizeimapではデフォルトで1になっています。

例えば10個のジョブを5個ずつのchunkに分けた場合、 上の例だと10, 9, 8, 7, 65, 4, 3, 2, 1のchunkに分けられ、 実行されるのは105が最初になります。

さらに10の出力はchunkのジョブが全部終わるまで出ません。

やってみると

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
$ python mp_imap_chunksize5.py
demo_func 10 start
demo_func 5 start
demo_func 5 end
demo_func 4 start
demo_func 4 end
demo_func 3 start
demo_func 10 end
demo_func 9 start
demo_func 3 end
demo_func 2 start
demo_func 2 end
demo_func 1 start
demo_func 1 end
demo_func 9 end
demo_func 8 start
demo_func 8 end
demo_func 7 start
demo_func 7 end
demo_func 6 start
demo_func 6 end
10
9
8
7
6
5
4
3
2
1

こんな感じになって10, 5から始まり、またdemofunc 10 endになってもdemo_func end 6が出るまで出力が出てません。

これだとmapの場合も同じようになってしまいますが、 例えば入力順を[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]にすると

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
$ python mp_imap_chunksize5.py
demo_func 1 start
demo_func 6 start
demo_func 1 end
demo_func 2 start
demo_func 2 end
demo_func 3 start
demo_func 6 end
demo_func 7 start
demo_func 3 end
demo_func 4 start
demo_func 4 end
demo_func 5 start
demo_func 7 end
demo_func 8 start
demo_func 5 end
1
2
3
4
5
demo_func 8 end
demo_func 9 start
demo_func 9 end
demo_func 10 start
demo_func 10 end
6
7
8
9
1

`

こんな感じで前半のchunkが終われば後半のジョブが終わって無くても出力されることが分かります。

なので、imapのchunksizeの設定は ジョブの数がプロセスの数に比べてそれほど大きくないときには chunksize=1で処理していく方がオーバーヘッドを考慮しても効率よくすべてのCPUを使える上に 出力も順次出来るので速くなることが多いです。

桁が違うレベルになってくると ちょっと調整したほうが速くなるかもしれません。

その辺もジョブの性質次第です。

まとめ

やりたいこととしては最初のスクリプトのようにPool().imapを使う感じで。

通常はPoolの引数は無し(もしくはprocesses=None)にしてCPUの数だけプロセスを設定するのが良いかと思います。

もし繰り返しの数が非常に大きい場合には引数のchunksizeをデフォルトの1から変更して少し大きな数で試してみると 速くなるかもしれません。

試しにmapと同じレベル(上の計算式で100ジョブを4 CPUsな環境なら7、1000ジョブを8 CPUs環境なら32、とか)位から始めて大小試して見ると良いかもしれません。

imapの場合にはその後にくる処理次第なところもあるのでそちらとの一連の処理の速度を見てという感じで。

Sponsored Links
Sponsored Links

« Windowsで余計な入力言語を削除する方法 2 pytestで並列処理したいときの色々 »

}