-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathcore.py
134 lines (107 loc) · 4.01 KB
/
core.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
"""Multithreaded helper module for Qt based QApplications
"""
import time
from PyQt4 import uic
from PyQt4.QtCore import *
from PyQt4.QtGui import *
#client-side thread interface
class ThreadClient(QObject):
"""Client side thread interface
"""
sgnTick = pyqtSignal(dict)
sgnError = pyqtSignal(dict)
sgnFinished = pyqtSignal()
def __init__(self, controller):
"""Constructor
:param ThreadController controller: Instance of user defined ThreadController
Instance of ThreadClient will fire sgnTick on every iteration of the work loop, sgnError on error
and sgnFinished after receiving a stop flag and finishing the last loop iteration
"""
QObject.__init__(self)
self._controller = controller
self._running = False
@property
def is_running(self):
return self._running
def start(self):
"""Starts the thread and connects all signals
"""
self._thread = QThread()
self._controller.moveToThread(self._thread)
self._controller.sgnError.connect(self._on_error)
self._controller.sgnTick.connect(self._on_tick, type = Qt.QueuedConnection)
self._controller.sgnFinished.connect(self._thread_finished)
self._thread.started.connect(self._controller.work)
self._thread.start()
self._running = not self._running
def stop(self):
"""Sends a stop signal to ThreadController
Note: Thread will not finish immediately, it will finish the current iteration first
"""
self._controller.stop()
def _on_tick(self, result):
self.sgnTick.emit(result)
def _thread_finished(self):
self._thread.quit()
self._thread.wait()
self._thread = None
self._running = not self._running
self.sgnFinished.emit()
def _on_error(self, errdict):
self.sgnError.emit(errdict)
#thread-side controller object
class ThreadController(QObject):
sgnTick = pyqtSignal(dict)
sgnError = pyqtSignal(dict)
sgnFinished = pyqtSignal()
def __init__(self, interval, wait_chunk=0.05):
"""Constructor
:param float interval: Pause duration between iterations of the loop in seconds
:param float wait_period: Check period in seconds for thread stop signal, default 0.05 seconds
"""
QObject.__init__(self, None)
self._mutex = QMutex()
self._interval = interval
self._running = True
self._wait_period = wait_chunk
def stop(self):
"""Sets the running flag to False, thread will stop in at most wait_period time, then emit sgnFinished
"""
self._mutex.lock()
self._running = False
self._mutex.unlock()
@pyqtSlot()
def work(self):
"""Performs the actual work.
Before the work loop is started _prepare() is called and _cleanup() after the stop signal is received.
Default implementations of _prepare() and _cleanup() do nothing.
During the loop execution the _process() method is called in each iteration. This method must be overriden and it must return a result
"""
try:
self._prepare()
except Exception as e:
self.sgnError.emit({'error': e})
while self._running:
try:
result = self._process()
self.sgnTick.emit({'result': result})
except Exception as e:
self.sgnError.emit({'error': e})
#wait out the interval period but check for stop signal all the time
wait = 0.0
while wait < self._interval:
if not self._running:
break
time.sleep(self._wait_period)
wait += self._wait_period
try:
self._cleanup()
except Exception as e:
self.sgnError.emit({'error': e})
self.sgnFinished.emit()
def _prepare(self):
pass
def _process(self):
raise NotImplementedError()
def _cleanup(self):
pass