QThreadPoolによるPyQtアプリケーションのマルチスレッド化

Image

UIに影響を与えずにバックグラウンドタスクを同時に実行する

GUIアプリケーションを構築する際の一般的な問題は、長時間実行されるバックグラウンドタスクを実行しようとすると、インターフェイスが「ロックアップ」することです。このチュートリアルでは、PyQtで同時実行を実現する最も簡単な方法の1つを取り上げます。

バックグラウンド

Qtベースのアプリケーション(ほとんどのGUIアプリケーションと同様)は、イベントベースです。つまり、ユーザーの操作、シグナル、タイマーに応じて実行が駆動されます。イベント駆動型のアプリケーションでは、ボタンをクリックするとイベントが作成され、アプリケーションはそれを処理して予想される出力を生成します。イベントはイベントキューにプッシュされ、イベントキューから取り出され、順次処理されます。

app = QApplication([])

window = MainWindow()

app.exec_()

イベントループは.exec_()、QApplicationオブジェクトを呼び出すことによって開始され、Pythonコードと同じスレッド内で実行されます。このイベントループを実行するスレッド(一般にGUIスレッドと呼ばれます)は、ホストオペレーティングシステムとのすべてのウィンドウ通信も処理します。

デフォルトでは、イベントループによってトリガーされた実行も、このスレッド内で同期的に実行されます。実際には、これは、PyQtアプリケーションがコードで何かを行うのに費やす時間はいつでも、ウィンドウ通信とGUI対話が凍結されることを意味します。

あなたがやっていることが単純で、制御をGUIループにすばやく返す場合、このフリーズはユーザーには気付かれません。ただし、大きなファイルを開く/書き込む、データをダウンロードする、複雑な画像をレンダリングするなど、実行時間の長いタスクを実行する必要がある場合は、問題が発生します。ユーザーには、アプリケーションが応答していないように見えます(応答しているため)。アプリはOSと通信しなくなったため、MacOS Xではアプリをクリックすると死の回転が表示されます。そして、誰もそれを望んでいません。

解決策は簡単です。作業をGUIスレッドから(そして別のスレッドに)入れます。PyQt(Qt経由)は、まさにそれを行うための簡単なインターフェースを提供します。

準備

次のコードは、Python 2.7とPython 3の両方で動作します。

マルチスレッド実行をデモンストレーションするには、使用するアプリケーションが必要です。以下はPyQtの最小限のスタブアプリケーションです。マルチスレッディングを示し、実際の結果を確認できます。これをコピーして新しいファイルに貼り付け、のような適切なファイル名で保存しますmultithread.py。コードの残りの部分はこのファイルに追加されます(焦った場合は、下部に完全な実用例もあります)。

from PySide2.QtGui import *

from PySide2.QtWidgets import *

from PySide2.QtCore import *

import time

class MainWindow(QMainWindow):

def __init__(self, *args, **kwargs):

super(MainWindow, self).__init__(*args, **kwargs)

self.counter = 0

layout = QVBoxLayout()

self.l = QLabel(“Start”)

b = QPushButton(“DANGER!”)

b.pressed.connect(self.oh_no)

layout.addWidget(self.l)

layout.addWidget(b)

w = QWidget()

w.setLayout(layout)

self.setCentralWidget(w)

self.show()

self.timer = QTimer()

self.timer.setInterval(1000)

self.timer.timeout.connect(self.recurring_timer)

self.timer.start()

def oh_no(self):

time.sleep(5)

def recurring_timer(self):

self.counter +=1

self.l.setText(“Counter: %d” % self.counter)

app = QApplication([])

window = MainWindow()

app.exec_()

:::sh

python3 multithread.py

数字が上に数えるデモウィンドウが表示されます。これは、毎秒1回だけ発生する単純な繰り返し時間によって生成されます。これをイベントループインジケーターと考えてください。これは、アプリケーションが正常に動作していることを知らせる簡単な方法です。「DANGER!」と書かれたボタンもあります

そのボタンを押すと…

ボタンを押すたびに、カウンターの動作が止まり、アプリケーションが完全にフリーズすることがわかります。Windowsではウィンドウが青くなって応答がないことを示し、Macでは回転の死の輪を手に入れます。

フリーズされたインターフェースとして表示されるのは、メインのQtイベントループがウィンドウイベントの処理(および応答)からブロックされていることです。ホストOSによって登録され、アプリケーションに送信されたままのウィンドウのクリック。ただし、ウィンドウはコードの大きな塊(time.sleep)に収まっているため、それらを受け入れたり、それに反応したりすることはできません。彼らはあなたのコードが制御をQtに戻すまで待つ必要があります。

これを回避する最も簡単で、おそらく最も論理的な方法は、コード内からイベントを受け入れることです。これにより、QtはホストOSに応答し続けることができ、アプリケーションは応答性を維持します。クラスで静的.processEvents()関数を使用すると、これを簡単に行うことができますQApplication。実行時間の長いコードブロックのどこかに、次のような行を追加するだけです。

QApplication.processEvents()

たとえば、長時間実行されるコードtime.sleepでは、それを5x 1秒のスリープに分解して.processEvents、その間に挿入することができます。このためのコードは次のとおりです。

def oh_no(self):

for n in range(5):

QApplication.processEvents()

time.sleep(1)

ボタンを押すと、コードは以前と同じように入力されます。ただし、現在はQApplication.processEvents() 断続的に制御をQtに戻し、通常どおりイベントに応答できるようにしています。Qtはイベントを受け入れ、それらを処理してから、残りのコードの実行に戻ります。

これは機能しますが、いくつかの理由で恐ろしいことです。

まず、Qtに制御を戻すと、コードは実行されなくなります。あなたがやろうとしているものは何でも実行時間の長いものがかかるとこれが意味長いです。それは間違いなくあなたが望むものではありません。

次に、メインイベントループ(app.exec_())の外側でイベントを処理すると、アプリケーションはループ内で(トリガーされたスロットやイベントなどの)コードの処理に分岐します。コードが外部状態に依存/応答する場合、これは未定義の動作を引き起こす可能性があります。以下のコードは、これを実際に示しています。

from PySide2.QtGui import *

from PySide2.QtWidgets import *

from PySide2.QtCore import *

import time

class MainWindow(QMainWindow):

def __init__(self, *args, **kwargs):

super(MainWindow, self).__init__(*args, **kwargs)

self.counter = 0

layout = QVBoxLayout()

self.l = QLabel(“Start”)

b = QPushButton(“DANGER!”)

b.pressed.connect(self.oh_no)

c = QPushButton(“?”)

c.pressed.connect(self.change_message)

layout.addWidget(self.l)

layout.addWidget(b)

layout.addWidget(c)

w = QWidget()

w.setLayout(layout)

self.setCentralWidget(w)

self.show()

def change_message(self):

self.message = “OH NO”

def oh_no(self):

self.message = “Pressed”

for n in range(100):

time.sleep(0.1)

self.l.setText(self.message)

QApplication.processEvents()

app = QApplication([])

window = MainWindow()

app.exec_()

このコードを実行すると、以前と同じようにカウンターが表示されます。「DANGER!」を押す oh_no関数のエントリポイントで定義されているように、表示されるテキストを「Pressed」に変更します。ただし、「?」を押すと oh_no実行中のボタンをクリックすると、メッセージが変化することがわかります。ループの外側から状態が変更されています。

これはおもちゃの例です。ただし、アプリケーション内に実行時間の長いプロセスが複数あり、各呼び出しが動作QApplication.processEvents()を続けている場合、アプリケーションの動作は予測できません。

スレッドとプロセス

一歩下がって、アプリケーションで何をしたいのかを考える場合、それはおそらく「他のことが起こると同時に起こること」と要約することができます。

PyQtアプリケーション内で独立したタスクを実行するには、スレッドとプロセスの 2つの主要なアプローチがあります。

スレッドは同じメモリ空間を共有するため、すばやく起動し、最小限のリソースを消費します。共有メモリにより、スレッド間でのデータの受け渡しは簡単になりますが、異なるスレッドからのメモリの読み取り/書き込みにより、競合状態やセグメンテーション違反が発生する可能性があります。Pythonでは、複数のスレッドが同じグローバルインタープリターロック(GIL)によってバインドされているという追加の問題があります。つまり、GILをリリースしないPythonコードは、一度に1つのスレッドでしか実行できません。ただし、これはほとんどの時間がPythonの外で費やされるPyQtの大きな問題ではありません。

プロセスは個別のメモリ空間(および完全に個別のPythonインタプリタ)を使用します。これにより、GILの潜在的な問題が回避されますが、起動時間が遅くなり、メモリのオーバーヘッドが大きくなり、データの送受信が複雑になるという犠牲が伴います。

単純にするために、プロセスを使用する十分な理由がない限り、通常はスレッドを使用するのが理にかなっています(後述の警告を参照)。Qtのサブプロセスは、外部プログラムの実行と通信に適しています。

PyQtアプリケーション内での純粋なPythonスレッド化またはプロセスベースのアプローチの使用を妨げるものは何もありません。

QRunnableとQThreadPool

Qtは他のスレッドでジョブを実行するための非常にシンプルなインターフェースを提供します。これはPyQtでうまく公開されています。これはQRunnable、およびの2つのクラスを中心に構築されていQThreadPoolます。前者は実行する作業のコンテナであり、後者はその作業を代替スレッドに渡す方法です。

使用の優れた点QThreadPoolは、ワーカーのキューイングと実行を処理することです。ジョブをキューに入れて結果を取得する以外に、行うことはほとんどありません。

カスタムを定義するにはQRunnable、基本QRunnableクラスをサブクラス化し、実行したいコードをrun()メソッド内に配置します。以下は、長期実行time.sleepジョブのとしての実装ですQRunnable。次のコードをmultithread.py、MainWindowクラス定義の上に追加します。

class Worker(QRunnable):

”’

Worker thread

”’

@Slot() # QtCore.Slot

def run(self):

”’

Your code goes in this function

”’

print(“Thread start”)

time.sleep(5)

print(“Thread complete”)

別のスレッドで関数を実行することは、単にのインスタンスを作成し、WorkerそれをQThreadPoolインスタンスに渡すだけで、自動的に実行されます。

次に、__init__ブロック内に次のコードを追加して、スレッドプールを設定します。

self.threadpool = QThreadPool()

print(“Multithreading with maximum %d threads” % self.threadpool.maxThreadCount())

最後に、次の行をoh_no関数に追加します。

def oh_no(self):

worker = Worker()

self.threadpool.start(worker)

ここで、ボタンをクリックすると、ワーカーを作成して(長時間実行)プロセスを処理し、スレッドプールを介して別のスレッドにスピンオフします。受信ワーカーを処理するのに十分なスレッドがない場合、それらはキューに入れられ、後で順番に実行されます。

試してみると、アプリケーションがボタンをたたく操作を問題なく処理していることがわかります。

ボタンを複数回押すとどうなるか確認してください。によって報告された数まで、即座に実行されたスレッドが表示され.maxThreadCountます。この数のアクティブなワーカーが既に存在する場合にボタンをもう一度押すと、スレッドが使用可能になるまで後続のワーカーがキューに入れられます。

QRunnablesの改善

カスタムデータを実行関数に渡したい場合は、initを介して行うことができself.、runスロット内からを介してデータにアクセスできます。

class Worker(QRunnable):

”’

Worker thread

:param args: Arguments to make available to the run code

:param kwargs: Keywords arguments to make available to the run code

”’

def __init__(self, *args, **kwargs):

super(Worker, self).__init__()

self.args = args

self.kwargs = kwargs

@Slot() # QtCore.Slot

def run(self):

”’

Initialise the runner function with passed self.args, self.kwargs.

”’

print(args, kwargs)

実際、Pythonでは関数はオブジェクトであり、毎回サブクラス化するのではなく、関数を渡して実行するという事実を利用できます。次の構成では、Workerすべての実行ジョブを処理するために単一のクラスのみが必要です。

class Worker(QRunnable):

”’

Worker thread

Inherits from QRunnable to handler worker thread setup, signals and wrap-up.

:param callback: The function callback to run on this worker thread. Supplied args and

kwargs will be passed through to the runner.

:type callback: function

:param args: Arguments to pass to the callback function

:param kwargs: Keywords to pass to the callback function

”’

def __init__(self, fn, *args, **kwargs):

super(Worker, self).__init__()

# Store constructor arguments (re-used for processing)

self.fn = fn

self.args = args

self.kwargs = kwargs

@Slot() # QtCore.Slot

def run(self):

”’

Initialise the runner function with passed args, kwargs.

”’

self.fn(*self.args, **self.kwargs)

これで、任意のPython関数を渡し、別のスレッドで実行させることができます。

def execute_this_fn(self):

print(“Hello!”)

def oh_no(self):

# Pass the function to execute

worker = Worker(self.execute_this_fn) # Any other args, kwargs are passed to the run function

# Execute

threadpool.start(worker)

スレッドIO

実行中のワーカーから状態とデータを戻すことができると便利な場合があります。これには、計算の結果、発生した例外、または進行中の進捗(進捗バーを考える)が含まれる場合があります。Qtは、シグナルとスロットのフレームワークを提供します。これにより、スレッドセーフであり、実行中のスレッドからGUIフロントエンドに直接安全に通信できます。シグナルを使用すると.emit値を取得できます。この値は、にリンクされているスロット関数によってコードの他の場所で取得されます.connect。

以下は、WorkerSignals多数のサンプル信号を含むように定義された単純なクラスです。

カスタム信号は「QObject」から派生したオブジェクトでのみ定義できます。`QRunnable`は` QObject`から派生していないので、そこで直接信号を定義することはできません。信号を保持するカスタムQObjectが最も簡単なソリューションです。

import traceback, sys

class WorkerSignals(QObject):

”’

Defines the signals available from a running worker thread.

Supported signals are:

finished

No data

error

`tuple` (exctype, value, traceback.format_exc() )

result

`object` data returned from processing, anything

”’

finished = Signal() # QtCore.Signal

error = Signal(tuple)

result = Signal(object)

この例では、5つのカスタム信号を定義しています。

  1. タスクが完了したことを示すデータがない終了信号。
  2. エラー受信信号tupleのExceptionタイプ、Exception値、フォーマットトレースバック。
  3. object実行された関数から任意のタイプを受け取る結果信号。

これらの信号すべてが必要なわけではありませんが、何が可能かを示すために含まれています。次のコードでは、これらの信号を利用してユーザーに役立つ情報を提供する、長時間実行されるタスクを実装します。

class Worker(QRunnable):

”’

Worker thread

Inherits from QRunnable to handler worker thread setup, signals and wrap-up.

:param callback: The function callback to run on this worker thread. Supplied args and

kwargs will be passed through to the runner.

:type callback: function

:param args: Arguments to pass to the callback function

:param kwargs: Keywords to pass to the callback function

”’

def __init__(self, fn, *args, **kwargs):

super(Worker, self).__init__()

# Store constructor arguments (re-used for processing)

self.fn = fn

self.args = args

self.kwargs = kwargs

self.signals = WorkerSignals()

@Slot() # QtCore.Slot

def run(self):

”’

Initialise the runner function with passed args, kwargs.

”’

# Retrieve args/kwargs here; and fire processing using them

try:

result = self.fn(

*self.args, **self.kwargs

status=self.signals.status

progress=self.signals.progress

)

except:

traceback.print_exc()

exctype, value = sys.exc_info()[:2]

self.signals.error.emit((exctype, value, traceback.format_exc()))

else:

self.signals.result.emit(result) # Return the result of the processing

finally:

self.signals.finished.emit() # Done

独自のハンドラー関数をこれらのシグナルに接続して、スレッドの完了(または結果)の通知を受け取ることができます。

def execute_this_fn(self):

for n in range(0, 5):

time.sleep(1)

return “Done.”

def print_output(self, s):

print(s)

def thread_complete(self):

print(“THREAD COMPLETE!”)

def oh_no(self):

# Pass the function to execute

worker = Worker(self.execute_this_fn) # Any other args, kwargs are passed to the run function

worker.signals.result.connect(self.print_output)

worker.signals.finished.connect(self.thread_complete)

# Execute

self.threadpool.start(worker)

また、実行時間の長いスレッドからステータス情報を受け取りたい場合もあります。これは、実行中のコードが情報を送信できるコールバックを渡すことで実行できます。ここには2つのオプションがあります。新しいシグナルを定義する(イベントループを使用して処理を実行できるようにする)か、標準のPython関数を使用します。

どちらの場合も、これらのコールバックを使用できるようにするには、これらのコールバックをターゲット関数に渡す必要があります。シグナルベースのアプローチは、以下の完成したコードで使用されます。ここでintは、スレッドの進行状況を示すインジケーターとしてバックを渡します。

全てのコード

完全な動作例を以下に示し、カスタムQRunnableワーカーとワーカーおよび進行状況シグナルを紹介します。開発したマルチスレッドアプリケーションにこのコードを簡単に適合させることができるはずです。

from PySide2.QtGui import *

from PySide2.QtWidgets import *

from PySide2.QtCore import *

import time

import traceback, sys

class WorkerSignals(QObject):

”’

Defines the signals available from a running worker thread.

Supported signals are:

finished

No data

error

`tuple` (exctype, value, traceback.format_exc() )

result

`object` data returned from processing, anything

progress

`int` indicating % progress

”’

finished = Signal()

error = Signal(tuple)

result = Signal(object)

progress = Signal(int)

class Worker(QRunnable):

”’

Worker thread

Inherits from QRunnable to handler worker thread setup, signals and wrap-up.

:param callback: The function callback to run on this worker thread. Supplied args and

kwargs will be passed through to the runner.

:type callback: function

:param args: Arguments to pass to the callback function

:param kwargs: Keywords to pass to the callback function

”’

def __init__(self, fn, *args, **kwargs):

super(Worker, self).__init__()

# Store constructor arguments (re-used for processing)

self.fn = fn

self.args = args

self.kwargs = kwargs

self.signals = WorkerSignals()

# Add the callback to our kwargs

self.kwargs[‘progress_callback’] = self.signals.progress

@Slot()

def run(self):

”’

Initialise the runner function with passed args, kwargs.

”’

# Retrieve args/kwargs here; and fire processing using them

try:

result = self.fn(*self.args, **self.kwargs)

except:

traceback.print_exc()

exctype, value = sys.exc_info()[:2]

self.signals.error.emit((exctype, value, traceback.format_exc()))

else:

self.signals.result.emit(result) # Return the result of the processing

finally:

self.signals.finished.emit() # Done

class MainWindow(QMainWindow):

def __init__(self, *args, **kwargs):

super(MainWindow, self).__init__(*args, **kwargs)

self.counter = 0

layout = QVBoxLayout()

self.l = QLabel(“Start”)

b = QPushButton(“DANGER!”)

b.pressed.connect(self.oh_no)

layout.addWidget(self.l)

layout.addWidget(b)

w = QWidget()

w.setLayout(layout)

self.setCentralWidget(w)

self.show()

self.threadpool = QThreadPool()

print(“Multithreading with maximum %d threads” % self.threadpool.maxThreadCount())

self.timer = QTimer()

self.timer.setInterval(1000)

self.timer.timeout.connect(self.recurring_timer)

self.timer.start()

def progress_fn(self, n):

print(“%d%% done” % n)

def execute_this_fn(self, progress_callback):

for n in range(0, 5):

time.sleep(1)

progress_callback.emit(n*100/4)

return “Done.”

def print_output(self, s):

print(s)

def thread_complete(self):

print(“THREAD COMPLETE!”)

def oh_no(self):

# Pass the function to execute

worker = Worker(self.execute_this_fn) # Any other args, kwargs are passed to the run function

worker.signals.result.connect(self.print_output)

worker.signals.finished.connect(self.thread_complete)

worker.signals.progress.connect(self.progress_fn)

# Execute

self.threadpool.start(worker)

def recurring_timer(self):

self.counter +=1

self.l.setText(“Counter: %d” % self.counter)

app = QApplication([])

window = MainWindow()

app.exec_()

注意事項

このマスタープランのわずかな欠陥に気付いた方もいらっしゃるかと思いますが、ワーカーの出力を処理するためにイベントループ(およびGUIスレッド)を引き続き使用しています。

進行状況、完了、またはメタデータを返すだけの場合は問題ありません。ただし、大量のデータを返すワーカーがある場合(たとえば、大きなファイルの読み込み、複雑な分析の実行と(大きな)結果の必要性、またはデータベースへのクエリ)は、このデータをGUIスレッドに戻すとパフォーマンスの問題が発生する可能性があるため、回避することをお勧めします。

同様に、アプリケーションが多数のスレッドとPython結果ハンドラーを使用する場合、GILの制限に直面する可能性があります。前述のように、スレッドを使用する場合、Pythonの実行は一度に1つのスレッドに制限されます。スレッドからの信号を処理するPythonコードは、ワーカーによってブロックされる可能性があり、その逆も同様です。スロット関数をブロックするとイベントループがブロックされるため、GUIの応答性に直接影響する可能性があります。

これらの場合、純粋なPythonスレッドプール(たとえば、コンカレントフューチャー)を使用して調査し、処理とスレッドイベント処理をGUIからさらに分離することをお勧めします。ただし、別のプロセスでない限り、 Python GUIコードは他のPythonコードをブロックできることに注意してください。

コメントを残す

メールアドレスが公開されることはありません。

Next Post

55%オフセール 神秘的な古代世界の終末を見届けた、シネマっぽい、空気中のプラットフォームゲーム

金 7月 3 , 2020
https://store.steampowered.com/app/1048600/Stela/? […]