From 9504037aa798e9adb12a2c7e769f42a57198b96a Mon Sep 17 00:00:00 2001 From: Vehicle Researcher Date: Wed, 15 Jan 2020 14:04:38 -0800 Subject: [PATCH] Squashed 'cereal/' changes from b8382bbb..01942b89 01942b89 add TODO b74a456a don't hardcode the lists ed5a4bf5 add face stds 396a2bb5 add can error counter to controlsState c6b5c73b Switch default to msgq (#21) a457ffa0 Fix indentation in readme.md a1fc8c75 explicitly mention Python for syntax colouring (#20) 19e23931 Fix expected for cameraOdometry and liveCalibration e7d2f978 Add radar comm issue error db64cd43 Reserve safety #21 for VAG PQ35/PQ46/NMS (#19) 79d638d5 separate honda safety models between Bosch Giraffe and Bosch Nidec 2614a650 better name b6b84cda add longitudinal 78f5934a Add canRxErrs to health 67588993 qlog liveCalibration df80b870 add more stuff to fw log in CarParams a87805ad fix doxs 4746b208 got doxed 21cf3f55 build on mac 31ac47c2 Add carUnrecognized event git-subtree-dir: cereal git-subtree-split: 01942b890d7acf19aecc09432fe5048ba21c0fc9 --- .gitignore | 2 +- README.md | 42 ++++++++++++++++++++++++++++++++++++++++++ SConscript | 4 ++-- car.capnp | 20 ++++++++++++++++---- log.capnp | 11 +++++++++++ messaging/__init__.py | 12 +++++++----- messaging/bridge.cc | 2 ++ messaging/impl_msgq.cc | 17 +++++++++++------ messaging/messaging.cc | 24 ++++++++++++------------ messaging/msgq.cc | 24 ++++++++++++++++-------- service_list.yaml | 4 ++-- 11 files changed, 122 insertions(+), 40 deletions(-) create mode 100644 README.md diff --git a/.gitignore b/.gitignore index 5053090f144df5..e90f8c750f9b26 100644 --- a/.gitignore +++ b/.gitignore @@ -10,5 +10,5 @@ libmessaging.* libmessaging_shared.* services.h .sconsign.dblite -libcereal_shared.so +libcereal_shared.* diff --git a/README.md b/README.md new file mode 100644 index 00000000000000..ec0efc67621992 --- /dev/null +++ b/README.md @@ -0,0 +1,42 @@ +What is cereal? +---- + +cereal is both a messaging spec for robotics systems as well as generic high performance IPC pub sub messaging with a single publisher and multiple subscribers. + +Imagine this use case: +* A sensor process reads gyro measurements directly from an IMU and publishes a sensorEvents packet +* A calibration process subscribes to the sensorEvents packet to use the IMU +* A localization process subscribes to the sensorEvents packet to use the IMU also + + +Messaging Spec +---- + +You'll find the message types in [log.capnp](log.capnp). It uses [Cap'n proto](https://capnproto.org/capnp-tool.html) and defines one struct called Event. + +All Events have a logMonoTime and a valid. Then a big union defines the packet type. + + +Pub Sub Backends +---- + +cereal supports two backends, one based on [zmq](https://zeromq.org/), the other called msgq, a custom pub sub based on shared memory that doesn't require the bytes to pass through the kernel. + +Example +--- +```python +import cereal.messaging as messaging + +# in subscriber +sm = messaging.SubMaster(['sensorEvents']) +while 1: + sm.update() + print(sm['sensorEvents']) + +# in publisher +pm = messaging.PubMaster(['sensorEvents']) +dat = messaging.new_message() +dat.init('sensorEvents', 1) +dat.sensorEvents[0] = {"gyro": {"v": [0.1, -0.1, 0.1]}} +pm.send('sensorEvents', dat) +``` diff --git a/SConscript b/SConscript index 789e83023a078c..e0c1b2b52c16a4 100644 --- a/SConscript +++ b/SConscript @@ -29,7 +29,7 @@ cereal_objects = env.SharedObject([ ]) env.Library('cereal', cereal_objects) -env.SharedLibrary('cereal_shared', cereal_objects) +env.SharedLibrary('cereal_shared', cereal_objects, LIBS=["capnp_c"]) cereal_dir = Dir('.') services_h = env.Command( @@ -49,7 +49,7 @@ Depends('messaging/impl_zmq.cc', services_h) # note, this rebuilds the deps shared, zmq is statically linked to make APK happy # TODO: get APK to load system zmq to remove the static link -shared_lib_shared_lib = [zmq, 'm', 'stdc++'] + ["gnustl_shared"] if arch == "aarch64" else [] +shared_lib_shared_lib = [zmq, 'm', 'stdc++'] + ["gnustl_shared"] if arch == "aarch64" else [zmq] env.SharedLibrary('messaging_shared', messaging_objects, LIBS=shared_lib_shared_lib) env.Program('messaging/bridge', ['messaging/bridge.cc'], LIBS=[messaging_lib, 'zmq']) diff --git a/car.capnp b/car.capnp index 68a723206a6d4b..48618a2780b1a9 100644 --- a/car.capnp +++ b/car.capnp @@ -88,6 +88,8 @@ struct CarEvent @0x9b1657f34caf3ad3 { lowMemory @63; stockAeb @64; ldw @65; + carUnrecognized @66; + radarCommIssue @67; } } @@ -410,11 +412,11 @@ struct CarParams { enum SafetyModel { silent @0; - honda @1; + hondaNidec @1; toyota @2; elm327 @3; gm @4; - hondaBosch @5; + hondaBoschGiraffe @5; ford @6; cadillac @7; hyundai @8; @@ -428,7 +430,9 @@ struct CarParams { toyotaIpas @16; allOutput @17; gmAscm @18; - noOutput @19; # like silent but with silent CAN TXs + noOutput @19; # like silent but without silent CAN TXs + hondaBoschHarness @20; + volkswagenPq @21; } enum SteerControlType { @@ -444,7 +448,9 @@ struct CarParams { struct CarFw { ecu @0 :Ecu; - fwVersion @1 :Text; + fwVersion @1 :Data; + address @2: UInt32; + subAddress @3: UInt8; } enum Ecu { @@ -452,5 +458,11 @@ struct CarParams { esp @1; fwdRadar @2; fwdCamera @3; + engine @4; + unknown @5; + + # Toyota only + dsu @6; + apgs @7; } } diff --git a/log.capnp b/log.capnp index b059218feba89c..bdd7ed1bb18e61 100644 --- a/log.capnp +++ b/log.capnp @@ -310,6 +310,7 @@ struct HealthData { hasGps @6 :Bool; canSendErrs @7 :UInt32; canFwdErrs @8 :UInt32; + canRxErrs @19 :UInt32; gmlanSendErrs @9 :UInt32; hwType @10 :HwType; fanSpeedRpm @11 :UInt16; @@ -484,6 +485,7 @@ struct ControlsState @0x97ff69c53601abf1 { decelForTurn @47 :Bool; decelForModel @54 :Bool; + canErrorCounter @57 :UInt32; lateralControlState :union { indiState @52 :LateralINDIState; @@ -575,6 +577,7 @@ struct ModelData { leadFuture @7 :LeadData; speed @8 :List(Float32); meta @10 :MetaData; + longitudinal @11 :LongitudinalData; struct PathData { points @0 :List(Float32); @@ -605,6 +608,7 @@ struct ModelData { yuvCorrection @5 :List(Float32); inputTransform @6 :List(Float32); } + struct MetaData { engagedProb @0 :Float32; desirePrediction @1 :List(Float32); @@ -612,6 +616,11 @@ struct ModelData { gasDisengageProb @3 :Float32; steerOverrideProb @4 :Float32; } + + struct LongitudinalData { + speeds @0 :List(Float32); + accelerations @1 :List(Float32); + } } struct CalibrationFeatures { @@ -1757,6 +1766,8 @@ struct DriverMonitoring { leftBlinkProb @8 :Float32; rightBlinkProb @9 :Float32; irPwrDEPRECATED @10 :Float32; + faceOrientationStd @11 :List(Float32); + facePositionStd @12 :List(Float32); } struct Boot { diff --git a/messaging/__init__.py b/messaging/__init__.py index 7a0d4936b3e653..e5a004740dfaaa 100644 --- a/messaging/__init__.py +++ b/messaging/__init__.py @@ -1,6 +1,7 @@ # must be build with scons from .messaging_pyx import Context, Poller, SubSocket, PubSocket # pylint: disable=no-name-in-module, import-error from .messaging_pyx import MultiplePublishersError, MessagingError # pylint: disable=no-name-in-module, import-error +import capnp assert MultiplePublishersError assert MessagingError @@ -116,6 +117,7 @@ def recv_one_retry(sock): if dat is not None: return log.Event.from_bytes(dat) +# TODO: This does not belong in messaging def get_one_can(logcan): while True: can = recv_one_retry(logcan) @@ -147,12 +149,12 @@ def __init__(self, services, ignore_alive=None, addr="127.0.0.1"): self.freq[s] = service_list[s].frequency data = new_message() - if s in ['can', 'sensorEvents', 'liveTracks', 'sendCan', - 'ethernetData', 'cellInfo', 'wifiScan', - 'trafficEvents', 'orbObservation', 'carEvents']: - data.init(s, 0) - else: + try: data.init(s) + except capnp.lib.capnp.KjException: + # lists + data.init(s, 0) + self.data[s] = getattr(data, s) self.logMonoTime[s] = 0 self.valid[s] = data.valid diff --git a/messaging/bridge.cc b/messaging/bridge.cc index 7abcdd09dae6e0..8e29566ca26c64 100644 --- a/messaging/bridge.cc +++ b/messaging/bridge.cc @@ -4,6 +4,8 @@ #include #include +typedef void (*sighandler_t)(int sig); + #include "services.h" #include "impl_msgq.hpp" diff --git a/messaging/impl_msgq.cc b/messaging/impl_msgq.cc index bf017d900b6e3f..d37b8c986d7237 100644 --- a/messaging/impl_msgq.cc +++ b/messaging/impl_msgq.cc @@ -85,7 +85,6 @@ Message * MSGQSubSocket::receive(bool non_blocking){ msgq_msg_t msg; MSGQMessage *r = NULL; - r = NULL; int rc = msgq_msg_recv(&msg, q); @@ -109,17 +108,23 @@ Message * MSGQSubSocket::receive(bool non_blocking){ } } - if (rc > 0){ - r = new MSGQMessage; - r->takeOwnership(msg.data, msg.size); - } - errno = msgq_do_exit ? EINTR : 0; if (!non_blocking){ std::signal(SIGINT, prev_handler_sigint); std::signal(SIGTERM, prev_handler_sigterm); } + errno = msgq_do_exit ? EINTR : 0; + + if (rc > 0){ + if (msgq_do_exit){ + msgq_msg_close(&msg); // Free unused message on exit + } else { + r = new MSGQMessage; + r->takeOwnership(msg.data, msg.size); + } + } + return (Message*)r; } diff --git a/messaging/messaging.cc b/messaging/messaging.cc index 9b12b298495112..1a9f860cd35666 100644 --- a/messaging/messaging.cc +++ b/messaging/messaging.cc @@ -4,20 +4,20 @@ Context * Context::create(){ Context * c; - if (std::getenv("MSGQ")){ - c = new MSGQContext(); - } else { + if (std::getenv("ZMQ")){ c = new ZMQContext(); + } else { + c = new MSGQContext(); } return c; } SubSocket * SubSocket::create(){ SubSocket * s; - if (std::getenv("MSGQ")){ - s = new MSGQSubSocket(); - } else { + if (std::getenv("ZMQ")){ s = new ZMQSubSocket(); + } else { + s = new MSGQSubSocket(); } return s; } @@ -60,10 +60,10 @@ SubSocket * SubSocket::create(Context * context, std::string endpoint, std::stri PubSocket * PubSocket::create(){ PubSocket * s; - if (std::getenv("MSGQ")){ - s = new MSGQPubSocket(); - } else { + if (std::getenv("ZMQ")){ s = new ZMQPubSocket(); + } else { + s = new MSGQPubSocket(); } return s; } @@ -82,10 +82,10 @@ PubSocket * PubSocket::create(Context * context, std::string endpoint){ Poller * Poller::create(){ Poller * p; - if (std::getenv("MSGQ")){ - p = new MSGQPoller(); - } else { + if (std::getenv("ZMQ")){ p = new ZMQPoller(); + } else { + p = new MSGQPoller(); } return p; } diff --git a/messaging/msgq.cc b/messaging/msgq.cc index 380b9c7fe6aee2..4ccd13df44527a 100644 --- a/messaging/msgq.cc +++ b/messaging/msgq.cc @@ -23,8 +23,8 @@ #include "msgq.hpp" -void sigusr1_handler(int signal) { - assert(signal == SIGUSR1); +void sigusr2_handler(int signal) { + assert(signal == SIGUSR2); } uint64_t msgq_get_uid(void){ @@ -80,7 +80,7 @@ void msgq_wait_for_subscriber(msgq_queue_t *q){ int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){ assert(size < 0xFFFFFFFF); // Buffer must be smaller than 2^32 bytes - std::signal(SIGUSR1, sigusr1_handler); + std::signal(SIGUSR2, sigusr2_handler); const char * prefix = "/dev/shm/"; char * full_path = new char[strlen(path) + strlen(prefix) + 1]; @@ -136,7 +136,7 @@ void msgq_close_queue(msgq_queue_t *q){ void msgq_init_publisher(msgq_queue_t * q) { - std::cout << "Starting publisher" << std::endl; + //std::cout << "Starting publisher" << std::endl; uint64_t uid = msgq_get_uid(); *q->write_uid = uid; @@ -150,6 +150,15 @@ void msgq_init_publisher(msgq_queue_t * q) { q->write_uid_local = uid; } +static void thread_signal(uint32_t tid) { + #ifndef SYS_tkill + // TODO: this won't work for multithreaded programs + kill(tid, SIGUSR2); + #else + syscall(SYS_tkill, tid, SIGUSR2); + #endif +} + void msgq_init_subscriber(msgq_queue_t * q) { assert(q != NULL); assert(q->num_readers != NULL); @@ -173,7 +182,7 @@ void msgq_init_subscriber(msgq_queue_t * q) { *q->read_uids[i] = 0; // Wake up reader in case they are in a poll - syscall(SYS_tkill, old_uid & 0xFFFFFFFF, SIGUSR1); + thread_signal(old_uid & 0xFFFFFFFF); } continue; @@ -196,7 +205,7 @@ void msgq_init_subscriber(msgq_queue_t * q) { } } - std::cout << "New subscriber id: " << q->reader_id << " uid: " << q->read_uid_local << " " << q->endpoint << std::endl; + //std::cout << "New subscriber id: " << q->reader_id << " uid: " << q->read_uid_local << " " << q->endpoint << std::endl; msgq_reset_reader(q); } @@ -278,8 +287,7 @@ int msgq_msg_send(msgq_msg_t * msg, msgq_queue_t *q){ // Notify readers for (uint64_t i = 0; i < num_readers; i++){ uint64_t reader_uid = *q->read_uids[i]; - - syscall(SYS_tkill, reader_uid & 0xFFFFFFFF, SIGUSR1); + thread_signal(reader_uid & 0xFFFFFFFF); } return msg->size; diff --git a/service_list.yaml b/service_list.yaml index 884bfe9a5bf991..e551dac59f9508 100644 --- a/service_list.yaml +++ b/service_list.yaml @@ -25,7 +25,7 @@ encodeIdx: [8015, true, 20.] liveTracks: [8016, true, 20.] sendcan: [8017, true, 100.] logMessage: [8018, true, 0.] -liveCalibration: [8019, true, 5.] +liveCalibration: [8019, true, 4., 4] androidLog: [8020, true, 0.] carState: [8021, true, 100., 10] # 8022 is reserved for sshd @@ -68,7 +68,7 @@ orbFeaturesSummary: [8062, true, 0.] driverMonitoring: [8063, true, 5., 1] liveParameters: [8064, true, 10.] liveMapData: [8065, true, 0.] -cameraOdometry: [8066, true, 5.] +cameraOdometry: [8066, true, 20.] pathPlan: [8067, true, 20.] kalmanOdometry: [8068, true, 0.] thumbnail: [8069, true, 0.2, 1]