Pythonの並列処理
Pythonには multiprocessing という標準ライブラリがあり、これを用いてマルチプロセス化することが出来ます。
これ以外に Joblib というライブラリがあり、 multiprocessingをより簡単に使えるようにしたヘルパークラスを持っています。
今回はinputを配列で与えて配列のそれぞれの値に対して同じ処理をして 結果を返すようなものを考え、その結果を順に返していくようなGenerator関数を考えます。
Joblibでは現状このような用途のための機能を開発中のようです。
- Add return_generator functionality by fcharras · Pull Request #588 · joblib/joblib
- How to yield result instead of getting the result list? · Issue #1242 · joblib/joblib
追記: 2023/04/19
#588の方はマージされたので後はリリースを待つのみ。
追記ここまで
従って今回はmultiprocessingを使った方法を考えます。
generator/multiprocessing_generator というGenerator関数を直接multiprocess化してくれるライブラリとかもあり 便利そうですが、今回はmultiprocessingを直接使ってちょっと色々調べてみました。
ちなみにthreadingというスレッド化のための標準モジュールもありますが、 PythonにはGlobal Interpreter Lock (GIL)の元で走っているため、 シングルプロセスだと複数のスレッドを作って複数のCPU上で走らせても 排他ロックがかかって1つのスレッドのみがバイトコードを実行できる状態になります。 なのでファイルの読み込みなどの処理で時間がかかる処理の並列処理は速くなりますが、 CPUを使うだけの処理ならマルチスレッド化しても速くなりません。
multirpocessingを使ったマルチプロセス化
以下のようなスクリプトでマルチプロセス化出来ます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | |
demo_func: 受け取った数字の分だけsleepしてその数字を返すだけの処理gen: Generator関数main:genから受け取った順にprint
をしているだけです。
Pool(4)は4つのプロセスを使う宣言で、引数(第一引数のprocesses)を与えない、もしくはNoneにすると
システムのCPU分だけプロセスを作ります。
imapは第一引数に関数を取って、iterableな第二引数を順に第一引数の関数に渡して処理を実行します。
この際、結果が遅延評価されるため、すべての引数に対する処理が終わる前に最初の方の結果が返さえるようになります。
実際にやってみると
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | |
みたいな出力が得られます。
このスクリプトでは各処理はn秒だけsleepするわけですが、最初の方から10~1となっているため 後ろの方に行くに従い速く終わります。
従って同時に処理を始めると後ろの処理の方が先に終わるわけですが、
imapでは出力順序が保証されるため、まず10, 次に9、と順に出てきます。
処理自体はdemo_func 9 endの方がdemo_func 10 endより先に出ていて先に終わっていることが分かります。
また、10, 9と数字がmain関数の中で出力されたあとにdemo_func 6 endなど
genの処理がまだ行われていることが分かります。
これが遅延評価です。
map, imap_unordered
Poolクラスのimpapと似たような関数にmap, imap_unorderedがあります。
mapは遅延評価をしないので一旦すべての処理を行ってから次に行くような形になります。
imap_unorderedは順序の保証をせず、終わった順に出力されます。
上のスクリプトでpool.imapの部分をpool.mapに書き換えてみると出力は
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | |
こんな感じで全部の処理が終わったあとで順に出力されています。
上のスクリプトでpool.imapの部分をpool.imap_unorderedに書き換えてみると出力は
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | |
こんな感じで終了したものから即座に出力されていることが分かります。
map, imap, imap_unorderedのどれを使うか
もし出力の順番に意味はなく処理が終わった順にどんどん次の処理に送りたい場合には
imapよりもimap_unorderedを使うべきです。
順序を保持したいけど処理した順にどんどん次へ送りたいときはimapです。
すべての結果を次にまとめて送りたいときはmapでもimapでも出来ますが、
mapはlistを返し、imapはgeneratorを返すのでその型による処理の対処は変えなくてはいけませんが、
基本的に全部待つのであれば処理速度は同じようなものになりそうです。
ただ、ちょっと違うのが、chunksizeという引数。
mapはデフォルトはNoneですが、Noneだと
1 2 3 | |
https://github.com/python/cpython/blob/3.10/Lib/multiprocessing/pool.py#L480
と計算されます。
len(iterable)が入力変数の数(つまりジョブの総数)でlen(self._pool)が指定したプロセス数です。
上の例だと、len(iterable)=10, len(self._pool)=4なのでchunksize=1になります。
ジョブの総数が大きくなるとchunksizeも大きくなることになります。
chunksizeはその数分を1chunkとして1プロセスに送り込む単位になります。
chunkszieの数字が小さい、つまりchunkの数が多くなるとその分だけプロセスのチェックの処理が入り
遅くなります。
一方で1つのchunkの中に入れられたジョブはすべて同じプロセス(CPU)に送られるので 処理速度に偏りがあると別のCPUが空いてても1つのCPUでいくつもジョブを処理続けることになったりします。
そこら辺は実際に処理するジョブの性質によるので、特にジョブの総数が大きくなるような場合に 処理速度を改善したいのであれば色々と調整する余地はある部分です。
で、このchunksizeがimapではデフォルトで1になっています。
例えば10個のジョブを5個ずつのchunkに分けた場合、
上の例だと10, 9, 8, 7, 6と5, 4, 3, 2, 1のchunkに分けられ、
実行されるのは10と5が最初になります。
さらに10の出力はchunkのジョブが全部終わるまで出ません。
やってみると
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | |
こんな感じになって10, 5から始まり、またdemofunc 10 endになってもdemo_func end 6が出るまで出力が出てません。
これだとmapの場合も同じようになってしまいますが、
例えば入力順を[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]にすると
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | |
`
こんな感じで前半のchunkが終われば後半のジョブが終わって無くても出力されることが分かります。
なので、imapのchunksizeの設定は
ジョブの数がプロセスの数に比べてそれほど大きくないときには
chunksize=1で処理していく方がオーバーヘッドを考慮しても効率よくすべてのCPUを使える上に
出力も順次出来るので速くなることが多いです。
桁が違うレベルになってくると ちょっと調整したほうが速くなるかもしれません。
その辺もジョブの性質次第です。
まとめ
やりたいこととしては最初のスクリプトのようにPool().imapを使う感じで。
通常はPoolの引数は無し(もしくはprocesses=None)にしてCPUの数だけプロセスを設定するのが良いかと思います。
もし繰り返しの数が非常に大きい場合には引数のchunksizeをデフォルトの1から変更して少し大きな数で試してみると
速くなるかもしれません。
試しにmapと同じレベル(上の計算式で100ジョブを4 CPUsな環境なら7、1000ジョブを8 CPUs環境なら32、とか)位から始めて大小試して見ると良いかもしれません。
imapの場合にはその後にくる処理次第なところもあるのでそちらとの一連の処理の速度を見てという感じで。
