Skip to content

Commit

Permalink
Small carcontroller refactor and tesla linting for 0.6.2 (commaai#76)
Browse files Browse the repository at this point in the history
* Update radar_interface.py to clear up linting warnings

* Refactor carcontroller for readability / maintainability

* Linting

* PR Feedback

* Refactor for speedlimit computation
  • Loading branch information
rafcabezas authored Aug 6, 2019
1 parent 2a6304f commit 09fcb9b
Show file tree
Hide file tree
Showing 4 changed files with 255 additions and 236 deletions.
361 changes: 191 additions & 170 deletions selfdrive/car/tesla/carcontroller.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ def __init__(self, dbc_name):

self.radarVin_idx = 0

self.isMetric = (self.params.get("IsMetric") == "1")

def reset_traffic_events(self):
self.stopSign_visible = False
self.stopSign_distance = 1000.
Expand Down Expand Up @@ -228,9 +230,6 @@ def checkWhichSign(self):
if (self.stopLight_distance < self.roadSignDistanceWarning ) and (self.roadSignColor == 1):
self.stopLightWarning = 1




def update(self, enabled, CS, frame, actuators, \
pcm_speed, pcm_override, pcm_cancel_cmd, pcm_accel, \
hud_v_cruise, hud_show_lanes, hud_show_car, hud_alert, \
Expand Down Expand Up @@ -332,7 +331,7 @@ def update(self, enabled, CS, frame, actuators, \
self.speed_limit_offset = float(self.params.get("SpeedLimitOffset"))
else:
self.speed_limit_offset = 0.
if (self.params.get("IsMetric") == "0"):
if not self.isMetric:
self.speed_limit_offset = self.speed_limit_offset * CV.MPH_TO_MS
if CS.useTeslaGPS:
if self.gpsLocationExternal is None:
Expand Down Expand Up @@ -432,174 +431,30 @@ def update(self, enabled, CS, frame, actuators, \
self.speedlimit_valid = True
if self.speedlimit_ms == 0:
self.speedlimit_valid = False
if (self.params.get("IsMetric") == "1"):
self.speedlimit_units = self.speedlimit_ms * CV.MS_TO_KPH + 0.5
else:
self.speedlimit_units = self.speedlimit_ms * CV.MS_TO_MPH + 0.5
self.speedlimit_units = self.speedUnits(fromMetersPerSecond = self.speedlimit_ms)
if frame % 10 == 0:
#get speed limit
for socket, _ in self.poller.poll(1):
if not CS.useTeslaMapData:
if socket is self.speedlimit:
lmd = messaging.recv_one(socket).liveMapData
self.speedlimit_ms = lmd.speedLimit
self.speedlimit_valid = lmd.speedLimitValid
if (self.params.get("IsMetric") == "1"):
self.speedlimit_units = self.speedlimit_ms * CV.MS_TO_KPH + 0.5
else:
self.speedlimit_units = self.speedlimit_ms * CV.MS_TO_MPH + 0.5
self.speed_limit_for_cc = self.speedlimit_ms * CV.MS_TO_KPH
#to show lead car on IC
if socket is self.radarState:
leads = messaging.recv_one(socket).radarState
if leads is not None:
lead_1 = leads.leadOne
lead_2 = leads.leadTwo
if (lead_1 is not None) and lead_1.status:
self.leadDx = lead_1.dRel
self.leadDy = self.curv0-lead_1.yRel
self.leadId = lead_1.trackId
self.leadClass = lead_1.oClass
self.leadVx = lead_1.vRel
if (self.leadId <= 0) or (self.leadId == 63):
self.leadId = 61
else:
self.leadDx = 0.
self.leadDy = 0.
self.leadClass = 0
self.leadId = 0
self.leadVx = 0xF
if (lead_2 is not None) and lead_2.status:
self.lead2Dx = lead_2.dRel
self.lead2Dy = self.curv0-lead_2.yRel
self.lead2Id = lead_2.trackId
self.lead2Class = lead_2.oClass
self.lead2Vx = lead_2.vRel
if (self.lead2Id <= 0) or (self.lead2Id == 63):
self.leadId = 62
else:
self.lead2Dx = 0.
self.lead2Dy = 0.
self.lead2Class = 0
self.lead2Id = 0
self.lead2Vx = 0xF
can_sends.append(teslacan.create_DAS_LR_object_msg(0,self.leadClass, self.leadId,
self.leadDx,self.leadDy,self.leadVx,self.lead2Class,
self.lead2Id,self.lead2Dx,self.lead2Dy,self.lead2Vx))
#to show curvature and lanes on IC
if socket is self.pathPlan:
pp = messaging.recv_one(socket).pathPlan
if pp.paramsValid:
if pp.lProb > 0.75:
self.lLine = 3
elif pp.lProb > 0.5:
self.lLine = 2
elif pp.lProb > 0.25:
self.lLine = 1
else:
self.lLine = 0
if pp.rProb > 0.75:
self.rLine = 3
elif pp.rProb > 0.5:
self.rLine = 2
elif pp.rProb > 0.25:
self.rLine = 1
else:
self.rLine = 0
#first we clip to the AP limits of the coefficients
self.curv0 = -clip(pp.cPoly[3],-3.5,3.5) #self.curv0Matrix.add(-clip(pp.cPoly[3],-3.5,3.5))
self.curv1 = -clip(pp.cPoly[2],-0.2,0.2) #self.curv1Matrix.add(-clip(pp.cPoly[2],-0.2,0.2))
self.curv2 = -clip(pp.cPoly[1],-0.0025,0.0025) #self.curv2Matrix.add(-clip(pp.cPoly[1],-0.0025,0.0025))
self.curv3 = -clip(pp.cPoly[0],-0.00003,0.00003) #self.curv3Matrix.add(-clip(pp.cPoly[0],-0.00003,0.00003))
self.laneWidth = pp.laneWidth
self.laneRange = 50 # it is fixed in OP at 50m pp.viewRange
self.visionCurvC0 = self.curv0
self.prev_ldwStatus = self.ldwStatus
self.ldwStatus = 0
if (self.ALCA.laneChange_direction != 0) and pp.alcaError:
self.ALCA.stop_ALCA()
if self.alca_enabled:
#exagerate position a little during ALCA to make lane change look smoother on IC
if self.ALCA.laneChange_over_the_line:
self.curv0 = self.ALCA.laneChange_direction * self.laneWidth - self.curv0
self.curv0 = clip(self.curv0, -3.5, 3.5)
else:
if CS.enableLdw and (not CS.blinker_on) and (CS.v_ego > 15.6) and (not CS.steer_override):
if pp.lProb > LDW_LANE_PROBAB:
lLaneC0 = -pp.lPoly[3]
if abs(lLaneC0) < LDW_WARNING_2:
self.ldwStatus = 3
elif abs(lLaneC0) < LDW_WARNING_1:
self.ldwStatus = 1
if pp.rProb > LDW_LANE_PROBAB:
rLaneC0 = -pp.rPoly[3]
if abs(rLaneC0) < LDW_WARNING_2:
self.ldwStatus = 3
elif abs(rLaneC0) < LDW_WARNING_1:
self.ldwStatus = 1
if not(self.prev_ldwStatus == self.ldwStatus):
self.warningNeeded = 1
if self.ldwStatus > 0:
self.warningCounter = 50
else:
self.lLine = 0
self.rLine = 0
self.curv0 = self.curv0Matrix.add(0.)
self.curv1 = self.curv1Matrix.add(0.)
self.curv2 = self.curv2Matrix.add(0.)
self.curv3 = self.curv3Matrix.add(0.)
if socket is self.icCarLR:
icCarLR_msg = ui.ICCarsLR.from_bytes(socket.recv())
if icCarLR_msg is not None:
#for icCarLR_msg in icCarLR_list:
can_sends.append(teslacan.create_DAS_LR_object_msg(1,icCarLR_msg.v1Type,icCarLR_msg.v1Id,
icCarLR_msg.v1Dx,icCarLR_msg.v1Dy,icCarLR_msg.v1Vrel,icCarLR_msg.v2Type,
icCarLR_msg.v2Id,icCarLR_msg.v2Dx,icCarLR_msg.v2Dy,icCarLR_msg.v2Vrel))
can_sends.append(teslacan.create_DAS_LR_object_msg(2,icCarLR_msg.v3Type,icCarLR_msg.v3Id,
icCarLR_msg.v3Dx,icCarLR_msg.v3Dy,icCarLR_msg.v3Vrel,icCarLR_msg.v4Type,
icCarLR_msg.v4Id,icCarLR_msg.v4Dx,icCarLR_msg.v4Dy,icCarLR_msg.v4Vrel))
if socket is self.trafficevents:
self.reset_traffic_events()
tr_ev_list = messaging.recv_sock(socket)
if tr_ev_list is not None:
for tr_ev in tr_ev_list.trafficEvents:
if tr_ev.type == 0x00:
if (tr_ev.distance < self.stopSign_distance):
self.stopSign_visible = True
self.stopSign_distance = tr_ev.distance
self.stopSign_action = tr_ev.action
self.stopSign_resume = tr_ev.resuming
if tr_ev.type == 0x04:
if (tr_ev.distance < self.stopLight_distance):
self.stopLight_visible = True
self.stopLight_distance = tr_ev.distance
self.stopLight_action = tr_ev.action
self.stopLight_resume = tr_ev.resuming
self.stopLight_color = 1. #0-unknown, 1-red, 2-yellow, 3-green
if tr_ev.type == 0x01:
if (tr_ev.distance < self.stopLight_distance):
self.stopLight_visible = True
self.stopLight_distance = tr_ev.distance
self.stopLight_action = tr_ev.action
self.stopLight_resume = tr_ev.resuming
self.stopLight_color = 1. #0-unknown, 1-red, 2-yellow, 3-green
if tr_ev.type == 0x02:
if (tr_ev.distance < self.stopLight_distance):
self.stopLight_visible = True
self.stopLight_distance = tr_ev.distance
self.stopLight_action = tr_ev.action
self.stopLight_resume = tr_ev.resuming
self.stopLight_color = 2. #0-unknown, 1-red, 2-yellow, 3-green
if tr_ev.type == 0x03:
if (tr_ev.distance < self.stopLight_distance):
self.stopLight_visible = True
self.stopLight_distance = tr_ev.distance
self.stopLight_action = tr_ev.action
self.stopLight_resume = tr_ev.resuming
self.stopLight_color = 3. #0-unknown, 1-red, 2-yellow, 3-green
self.checkWhichSign()
if not ((self.roadSignType_last == self.roadSignType) and (self.roadSignType == 0xFF)):
can_sends.append(teslacan.create_fake_DAS_sign_msg(self.roadSignType,self.roadSignStopDist,self.roadSignColor,self.roadSignControlActive))
if socket is self.speedlimit and not CS.useTeslaMapData:
#get speed limit
lmd = messaging.recv_one(socket).liveMapData
self.speedlimit_ms = lmd.speedLimit
self.speedlimit_valid = lmd.speedLimitValid
self.speedlimit_units = self.speedUnits(fromMetersPerSecond = self.speedlimit_ms)
self.speed_limit_for_cc = self.speedlimit_ms * CV.MS_TO_KPH
elif socket is self.radarState:
#to show lead car on IC
can_messages = self.showLeadCarOnICCanMessage(radarSocket = socket)
can_sends.extend(can_messages)
elif socket is self.pathPlan:
#to show curvature and lanes on IC
self.handlePathPlanSocketForCurvatureOnIC(pathPlanSocket = socket, CS = CS)
elif socket is self.icCarLR:
can_messages = self.showLeftAndRightCarsOnICCanMessages(icCarLRSocket = socket)
can_sends.extend(can_messages)
elif socket is self.trafficevents:
can_messages = self.handleTrafficEvents(trafficEventsSocket = socket)
can_sends.extend(can_messages)

if (CS.roadCurvRange > 20) and self.useMap:
if self.useZeroC0:
self.curv0 = 0.
Expand Down Expand Up @@ -785,3 +640,169 @@ def update(self, enabled, CS, frame, actuators, \
self.last_accel = apply_accel

return pedal_can_sends + can_sends

#to show lead car on IC
def showLeadCarOnICCanMessage(self, radarSocket):
messages = []
leads = messaging.recv_one(radarSocket).radarState
if leads is None:
return messages
lead_1 = leads.leadOne
lead_2 = leads.leadTwo
if (lead_1 is not None) and lead_1.status:
self.leadDx = lead_1.dRel
self.leadDy = self.curv0-lead_1.yRel
self.leadId = lead_1.trackId
self.leadClass = lead_1.oClass
self.leadVx = lead_1.vRel
if (self.leadId <= 0) or (self.leadId == 63):
self.leadId = 61
else:
self.leadDx = 0.
self.leadDy = 0.
self.leadClass = 0
self.leadId = 0
self.leadVx = 0xF
if (lead_2 is not None) and lead_2.status:
self.lead2Dx = lead_2.dRel
self.lead2Dy = self.curv0-lead_2.yRel
self.lead2Id = lead_2.trackId
self.lead2Class = lead_2.oClass
self.lead2Vx = lead_2.vRel
if (self.lead2Id <= 0) or (self.lead2Id == 63):
self.leadId = 62
else:
self.lead2Dx = 0.
self.lead2Dy = 0.
self.lead2Class = 0
self.lead2Id = 0
self.lead2Vx = 0xF
messages.append(teslacan.create_DAS_LR_object_msg(0,self.leadClass, self.leadId,
self.leadDx,self.leadDy,self.leadVx,self.lead2Class,
self.lead2Id,self.lead2Dx,self.lead2Dy,self.lead2Vx))
return messages

def handlePathPlanSocketForCurvatureOnIC(self, pathPlanSocket, CS):
pp = messaging.recv_one(pathPlanSocket).pathPlan
if pp.paramsValid:
if pp.lProb > 0.75:
self.lLine = 3
elif pp.lProb > 0.5:
self.lLine = 2
elif pp.lProb > 0.25:
self.lLine = 1
else:
self.lLine = 0
if pp.rProb > 0.75:
self.rLine = 3
elif pp.rProb > 0.5:
self.rLine = 2
elif pp.rProb > 0.25:
self.rLine = 1
else:
self.rLine = 0
#first we clip to the AP limits of the coefficients
self.curv0 = -clip(pp.cPoly[3],-3.5,3.5) #self.curv0Matrix.add(-clip(pp.cPoly[3],-3.5,3.5))
self.curv1 = -clip(pp.cPoly[2],-0.2,0.2) #self.curv1Matrix.add(-clip(pp.cPoly[2],-0.2,0.2))
self.curv2 = -clip(pp.cPoly[1],-0.0025,0.0025) #self.curv2Matrix.add(-clip(pp.cPoly[1],-0.0025,0.0025))
self.curv3 = -clip(pp.cPoly[0],-0.00003,0.00003) #self.curv3Matrix.add(-clip(pp.cPoly[0],-0.00003,0.00003))
self.laneWidth = pp.laneWidth
self.laneRange = 50 # it is fixed in OP at 50m pp.viewRange
self.visionCurvC0 = self.curv0
self.prev_ldwStatus = self.ldwStatus
self.ldwStatus = 0
if (self.ALCA.laneChange_direction != 0) and pp.alcaError:
self.ALCA.stop_ALCA()
if self.alca_enabled:
#exagerate position a little during ALCA to make lane change look smoother on IC
if self.ALCA.laneChange_over_the_line:
self.curv0 = self.ALCA.laneChange_direction * self.laneWidth - self.curv0
self.curv0 = clip(self.curv0, -3.5, 3.5)
else:
if CS.enableLdw and (not CS.blinker_on) and (CS.v_ego > 15.6) and (not CS.steer_override):
if pp.lProb > LDW_LANE_PROBAB:
lLaneC0 = -pp.lPoly[3]
if abs(lLaneC0) < LDW_WARNING_2:
self.ldwStatus = 3
elif abs(lLaneC0) < LDW_WARNING_1:
self.ldwStatus = 1
if pp.rProb > LDW_LANE_PROBAB:
rLaneC0 = -pp.rPoly[3]
if abs(rLaneC0) < LDW_WARNING_2:
self.ldwStatus = 3
elif abs(rLaneC0) < LDW_WARNING_1:
self.ldwStatus = 1
if not(self.prev_ldwStatus == self.ldwStatus):
self.warningNeeded = 1
if self.ldwStatus > 0:
self.warningCounter = 50
else:
self.lLine = 0
self.rLine = 0
self.curv0 = self.curv0Matrix.add(0.)
self.curv1 = self.curv1Matrix.add(0.)
self.curv2 = self.curv2Matrix.add(0.)
self.curv3 = self.curv3Matrix.add(0.)

# Generates IC messages for the Left and Right radar identified cars from radard
def showLeftAndRightCarsOnICCanMessages(self, icCarLRSocket):
messages = []
icCarLR_msg = ui.ICCarsLR.from_bytes(icCarLRSocket.recv())
if icCarLR_msg is not None:
#for icCarLR_msg in icCarLR_list:
messages.append(teslacan.create_DAS_LR_object_msg(1,icCarLR_msg.v1Type,icCarLR_msg.v1Id,
icCarLR_msg.v1Dx,icCarLR_msg.v1Dy,icCarLR_msg.v1Vrel,icCarLR_msg.v2Type,
icCarLR_msg.v2Id,icCarLR_msg.v2Dx,icCarLR_msg.v2Dy,icCarLR_msg.v2Vrel))
messages.append(teslacan.create_DAS_LR_object_msg(2,icCarLR_msg.v3Type,icCarLR_msg.v3Id,
icCarLR_msg.v3Dx,icCarLR_msg.v3Dy,icCarLR_msg.v3Vrel,icCarLR_msg.v4Type,
icCarLR_msg.v4Id,icCarLR_msg.v4Dx,icCarLR_msg.v4Dy,icCarLR_msg.v4Vrel))
return messages

def handleTrafficEvents(self, trafficEventsSocket):
messages = []
self.reset_traffic_events()
tr_ev_list = messaging.recv_sock(trafficEventsSocket)
if tr_ev_list is not None:
for tr_ev in tr_ev_list.trafficEvents:
if tr_ev.type == 0x00:
if (tr_ev.distance < self.stopSign_distance):
self.stopSign_visible = True
self.stopSign_distance = tr_ev.distance
self.stopSign_action = tr_ev.action
self.stopSign_resume = tr_ev.resuming
if tr_ev.type == 0x04:
if (tr_ev.distance < self.stopLight_distance):
self.stopLight_visible = True
self.stopLight_distance = tr_ev.distance
self.stopLight_action = tr_ev.action
self.stopLight_resume = tr_ev.resuming
self.stopLight_color = 1. #0-unknown, 1-red, 2-yellow, 3-green
if tr_ev.type == 0x01:
if (tr_ev.distance < self.stopLight_distance):
self.stopLight_visible = True
self.stopLight_distance = tr_ev.distance
self.stopLight_action = tr_ev.action
self.stopLight_resume = tr_ev.resuming
self.stopLight_color = 1. #0-unknown, 1-red, 2-yellow, 3-green
if tr_ev.type == 0x02:
if (tr_ev.distance < self.stopLight_distance):
self.stopLight_visible = True
self.stopLight_distance = tr_ev.distance
self.stopLight_action = tr_ev.action
self.stopLight_resume = tr_ev.resuming
self.stopLight_color = 2. #0-unknown, 1-red, 2-yellow, 3-green
if tr_ev.type == 0x03:
if (tr_ev.distance < self.stopLight_distance):
self.stopLight_visible = True
self.stopLight_distance = tr_ev.distance
self.stopLight_action = tr_ev.action
self.stopLight_resume = tr_ev.resuming
self.stopLight_color = 3. #0-unknown, 1-red, 2-yellow, 3-green
self.checkWhichSign()
if not ((self.roadSignType_last == self.roadSignType) and (self.roadSignType == 0xFF)):
messages.append(teslacan.create_fake_DAS_sign_msg(self.roadSignType,self.roadSignStopDist,self.roadSignColor,self.roadSignControlActive))
return messages

# Returns speed as it needs to be displayed on the IC
def speedUnits(self, fromMetersPerSecond):
return fromMetersPerSecond * (CV.MS_TO_KPH if self.isMetric else CV.MS_TO_MPH) + 0.5
Loading

0 comments on commit 09fcb9b

Please sign in to comment.