-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathvision_subsystem.py
230 lines (178 loc) · 7.85 KB
/
vision_subsystem.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# Import the required modules
import dlib
import cv2
from imutils.object_detection import non_max_suppression
import argparse as ap
import numpy as np
import imutils
import threading
rpi = True
try:
from picamera import PiCamera
except ImportError:
rpi = False
if rpi:
from picamera.array import PiRGBArray
class Vision_Subsystem(threading.Thread):
def __init__(self, threadID, lat_dist_queue, logger, debug_mode=False):
self.lat_dist_queue = lat_dist_queue
self.logger = logger
self.threadID = threadID
self.debug_mode = debug_mode
threading.Thread.__init__(self)
def getInitialRoiPts(self, frame):
print("Getting ROI points")
# Initialize the HOG descriptor/person detector with the default
# Open CV 2 people detector
hog = cv2.HOGDescriptor()
hog.setSVMDetector(cv2.HOGDescriptor_getDefaultPeopleDetector())
# Set up return values
points = []
# Get new frame width and height
frameWidth = min(250, frame.shape[1])
frameLength = (float(frameWidth)/frame.shape[1])*frame.shape[0]
# Load the image and resize it to (1) reduce detection time
# and (2) improve detection accuracy
frame = imutils.resize(frame, width=frameWidth)
orig = frame.copy()
# Detect people in the image
(rects, weights) = hog.detectMultiScale(frame, winStride=(4, 4),
padding=(8, 8), scale=1.02)
# Draw the original bounding boxes
for (x, y, w, h) in rects:
cv2.rectangle(orig, (x, y), (x + w, y + h), (0, 0, 255), 2)
# Apply non-maxima suppression to the bounding boxes using a
# fairly large overlap threshold to try to maintain overlapping
# boxes that are still people
rects = np.array([[x, y, x + w, y + h] for (x, y, w, h) in rects])
pick = non_max_suppression(rects, probs=None, overlapThresh=0.65)
# Draw the final bounding boxes and return normalized points
for (xA, yA, xB, yB) in pick:
cv2.rectangle(frame, (xA, yA), (xB, yB), (0, 255, 0), 2)
subFactorX = 0.2*((xB-xA)/2)
subFactorY = 0.2*((yA-yB)/2)
points.append((float(xA+subFactorX)/frameWidth,float(yA-subFactorY)/frameLength,
float(xB-subFactorX)/frameWidth,float(yB+subFactorY)/frameLength))
break
cv2.imshow("After NMS", frame)
return points
def run(self, source=0, dispLoc=False):
if rpi:
cam = PiCamera()
cam.resolution = (640, 480)
cam.framerate = 8
raw_capture = PiRGBArray(cam, size=(640, 480))
camWidth = 640
camHeight = 480
else:
# Create the VideoCapture object
cam = cv2.VideoCapture(source)
# Get the camera frame width and height
camWidth = cam.get(3)
camHeight = cam.get(4)
# Determine the center coordinate
frameCenterX = camWidth/2;
frameCenterY = camHeight/2;
# If raspberry pi, then get the frame iterator
if rpi:
frame_iter = cam.capture_continuous(raw_capture, format="bgr", use_video_port=True)
print("Camera width: %s, height: %s" % (camWidth, camHeight))
# If Camera Device is not opened, exit the program
if not rpi and not cam.isOpened():
print "Video device or file couldn't be opened"
exit()
# Human ROI points. Co-ordinates of the Human
# to be tracked will be stored in this var.
roiPts = []
roiImg = 0
while True:
if rpi:
retval = True
img = frame_iter.next()
img = img.array
img.setflags(write=1)
raw_capture.truncate(0)
else:
# Read from camera (get a frame)
retval, img = cam.read()
# Check successful frame capture
if not retval:
print "Cannot capture frame device"
else:
pts = []
pts = self.getInitialRoiPts(img)
if len(pts) != 0:
# Scale up the point according to the big frame
roiPts.append((int(pts[0][0]*camWidth), int(pts[0][1]*camHeight),
int(pts[0][2]*camWidth), int(pts[0][3]*camHeight)))
roiImg = img
break
cv2.namedWindow("Image", cv2.WINDOW_NORMAL)
cv2.imshow("Image", img)
cv2.destroyWindow("Image")
# Co-ordinates of objects to be tracked
# will be stored in a list named `points`
if not roiPts:
print "ERROR: No object to be tracked."
exit()
scaledImageRoi = img.copy()
cv2.rectangle(scaledImageRoi, (roiPts[0][0], roiPts[0][1]), (roiPts[0][2], roiPts[0][3]), (0, 255, 0), 2)
cv2.namedWindow("ScaledImageRoi", cv2.WINDOW_NORMAL)
cv2.imshow("ScaledImageRoi", scaledImageRoi)
# Initial co-ordinates of the object to be tracked
# Create the tracker object
tracker = dlib.correlation_tracker()
# Provide the tracker the initial position of the object
tracker.start_track(img, dlib.rectangle(*roiPts[0]))
while True:
if rpi:
retval = True
img = frame_iter.next()
img = img.array
raw_capture.truncate(0)
else:
# Read from camera (get a frame)
retval, img = cam.read()
if not retval:
print "Cannot capture frame device | CODE TERMINATING :("
exit()
img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# Update the tracker
tracker.update(img)
# Get the position of the object, draw a
# bounding box around it and display it.
rect = tracker.get_position()
pt1 = (int(rect.left()), int(rect.top()))
pt2 = (int(rect.right()), int(rect.bottom()))
cv2.rectangle(img, pt1, pt2, (255, 255, 255), 3)
# Calculate the lateral offset of the person
# - Positive lateral offset indicates the center of the roi is to the right of the center
# - Negative lateral offset indicates the center of the roi is to the left of the center
roiCenterX = (rect.right() + rect.left())/2
roiCenterY = (rect.top() + rect.bottom())/2
lateralOffset = roiCenterX - frameCenterX
normalizedLatOffset = lateralOffset/camWidth
#print("ROI Center X is %s and the normalized lateral offset is %s" % (roiCenterX, normalizedLatOffset))
# Add it to the control system queue
if not self.debug_mode:
self.lat_dist_queue.put(normalizedLatOffset)
#print("Sending!!! --->" + str(normalizedLatOffset))
# Show the ROI center on the image
cv2.rectangle(img, (int(roiCenterX-10), int(roiCenterY+10)),
(int(roiCenterX+10), int(roiCenterY-10)), (155, 155, 155), 5)
#print "Object tracked at [{}, {}] \r".format(pt1, pt2),
if dispLoc:
loc = (int(rect.left()), int(rect.top()-20))
txt = "Object tracked at [{}, {}]".format(pt1, pt2)
cv2.putText(img, txt, loc , cv2.FONT_HERSHEY_SIMPLEX, .5, (255,255,255), 1)
cv2.namedWindow("Image", cv2.WINDOW_NORMAL)
cv2.imshow("Image", img)
# Continue until the user presses ESC key
if cv2.waitKey(1) == 27:
break
# Relase the VideoCapture object
cam.release()
if __name__ == "__main__":
# Run the vision system
vision = Vision_Subsystem(1, None, None, True)
vision.run(0, False)