diff --git a/.gitignore b/.gitignore index 5053090f144df5..e90f8c750f9b26 100644 --- a/.gitignore +++ b/.gitignore @@ -10,5 +10,5 @@ libmessaging.* libmessaging_shared.* services.h .sconsign.dblite -libcereal_shared.so +libcereal_shared.* diff --git a/README.md b/README.md new file mode 100644 index 00000000000000..ec0efc67621992 --- /dev/null +++ b/README.md @@ -0,0 +1,42 @@ +What is cereal? +---- + +cereal is both a messaging spec for robotics systems as well as generic high performance IPC pub sub messaging with a single publisher and multiple subscribers. + +Imagine this use case: +* A sensor process reads gyro measurements directly from an IMU and publishes a sensorEvents packet +* A calibration process subscribes to the sensorEvents packet to use the IMU +* A localization process subscribes to the sensorEvents packet to use the IMU also + + +Messaging Spec +---- + +You'll find the message types in [log.capnp](log.capnp). It uses [Cap'n proto](https://capnproto.org/capnp-tool.html) and defines one struct called Event. + +All Events have a logMonoTime and a valid. Then a big union defines the packet type. + + +Pub Sub Backends +---- + +cereal supports two backends, one based on [zmq](https://zeromq.org/), the other called msgq, a custom pub sub based on shared memory that doesn't require the bytes to pass through the kernel. + +Example +--- +```python +import cereal.messaging as messaging + +# in subscriber +sm = messaging.SubMaster(['sensorEvents']) +while 1: + sm.update() + print(sm['sensorEvents']) + +# in publisher +pm = messaging.PubMaster(['sensorEvents']) +dat = messaging.new_message() +dat.init('sensorEvents', 1) +dat.sensorEvents[0] = {"gyro": {"v": [0.1, -0.1, 0.1]}} +pm.send('sensorEvents', dat) +``` diff --git a/SConscript b/SConscript index 789e83023a078c..e0c1b2b52c16a4 100644 --- a/SConscript +++ b/SConscript @@ -29,7 +29,7 @@ cereal_objects = env.SharedObject([ ]) env.Library('cereal', cereal_objects) -env.SharedLibrary('cereal_shared', cereal_objects) +env.SharedLibrary('cereal_shared', cereal_objects, LIBS=["capnp_c"]) cereal_dir = Dir('.') services_h = env.Command( @@ -49,7 +49,7 @@ Depends('messaging/impl_zmq.cc', services_h) # note, this rebuilds the deps shared, zmq is statically linked to make APK happy # TODO: get APK to load system zmq to remove the static link -shared_lib_shared_lib = [zmq, 'm', 'stdc++'] + ["gnustl_shared"] if arch == "aarch64" else [] +shared_lib_shared_lib = [zmq, 'm', 'stdc++'] + ["gnustl_shared"] if arch == "aarch64" else [zmq] env.SharedLibrary('messaging_shared', messaging_objects, LIBS=shared_lib_shared_lib) env.Program('messaging/bridge', ['messaging/bridge.cc'], LIBS=[messaging_lib, 'zmq']) diff --git a/car.capnp b/car.capnp index 68a723206a6d4b..48618a2780b1a9 100644 --- a/car.capnp +++ b/car.capnp @@ -88,6 +88,8 @@ struct CarEvent @0x9b1657f34caf3ad3 { lowMemory @63; stockAeb @64; ldw @65; + carUnrecognized @66; + radarCommIssue @67; } } @@ -410,11 +412,11 @@ struct CarParams { enum SafetyModel { silent @0; - honda @1; + hondaNidec @1; toyota @2; elm327 @3; gm @4; - hondaBosch @5; + hondaBoschGiraffe @5; ford @6; cadillac @7; hyundai @8; @@ -428,7 +430,9 @@ struct CarParams { toyotaIpas @16; allOutput @17; gmAscm @18; - noOutput @19; # like silent but with silent CAN TXs + noOutput @19; # like silent but without silent CAN TXs + hondaBoschHarness @20; + volkswagenPq @21; } enum SteerControlType { @@ -444,7 +448,9 @@ struct CarParams { struct CarFw { ecu @0 :Ecu; - fwVersion @1 :Text; + fwVersion @1 :Data; + address @2: UInt32; + subAddress @3: UInt8; } enum Ecu { @@ -452,5 +458,11 @@ struct CarParams { esp @1; fwdRadar @2; fwdCamera @3; + engine @4; + unknown @5; + + # Toyota only + dsu @6; + apgs @7; } } diff --git a/log.capnp b/log.capnp index b059218feba89c..bdd7ed1bb18e61 100644 --- a/log.capnp +++ b/log.capnp @@ -310,6 +310,7 @@ struct HealthData { hasGps @6 :Bool; canSendErrs @7 :UInt32; canFwdErrs @8 :UInt32; + canRxErrs @19 :UInt32; gmlanSendErrs @9 :UInt32; hwType @10 :HwType; fanSpeedRpm @11 :UInt16; @@ -484,6 +485,7 @@ struct ControlsState @0x97ff69c53601abf1 { decelForTurn @47 :Bool; decelForModel @54 :Bool; + canErrorCounter @57 :UInt32; lateralControlState :union { indiState @52 :LateralINDIState; @@ -575,6 +577,7 @@ struct ModelData { leadFuture @7 :LeadData; speed @8 :List(Float32); meta @10 :MetaData; + longitudinal @11 :LongitudinalData; struct PathData { points @0 :List(Float32); @@ -605,6 +608,7 @@ struct ModelData { yuvCorrection @5 :List(Float32); inputTransform @6 :List(Float32); } + struct MetaData { engagedProb @0 :Float32; desirePrediction @1 :List(Float32); @@ -612,6 +616,11 @@ struct ModelData { gasDisengageProb @3 :Float32; steerOverrideProb @4 :Float32; } + + struct LongitudinalData { + speeds @0 :List(Float32); + accelerations @1 :List(Float32); + } } struct CalibrationFeatures { @@ -1757,6 +1766,8 @@ struct DriverMonitoring { leftBlinkProb @8 :Float32; rightBlinkProb @9 :Float32; irPwrDEPRECATED @10 :Float32; + faceOrientationStd @11 :List(Float32); + facePositionStd @12 :List(Float32); } struct Boot { diff --git a/messaging/__init__.py b/messaging/__init__.py index 7a0d4936b3e653..e5a004740dfaaa 100644 --- a/messaging/__init__.py +++ b/messaging/__init__.py @@ -1,6 +1,7 @@ # must be build with scons from .messaging_pyx import Context, Poller, SubSocket, PubSocket # pylint: disable=no-name-in-module, import-error from .messaging_pyx import MultiplePublishersError, MessagingError # pylint: disable=no-name-in-module, import-error +import capnp assert MultiplePublishersError assert MessagingError @@ -116,6 +117,7 @@ def recv_one_retry(sock): if dat is not None: return log.Event.from_bytes(dat) +# TODO: This does not belong in messaging def get_one_can(logcan): while True: can = recv_one_retry(logcan) @@ -147,12 +149,12 @@ def __init__(self, services, ignore_alive=None, addr="127.0.0.1"): self.freq[s] = service_list[s].frequency data = new_message() - if s in ['can', 'sensorEvents', 'liveTracks', 'sendCan', - 'ethernetData', 'cellInfo', 'wifiScan', - 'trafficEvents', 'orbObservation', 'carEvents']: - data.init(s, 0) - else: + try: data.init(s) + except capnp.lib.capnp.KjException: + # lists + data.init(s, 0) + self.data[s] = getattr(data, s) self.logMonoTime[s] = 0 self.valid[s] = data.valid diff --git a/messaging/bridge.cc b/messaging/bridge.cc index 7abcdd09dae6e0..8e29566ca26c64 100644 --- a/messaging/bridge.cc +++ b/messaging/bridge.cc @@ -4,6 +4,8 @@ #include #include +typedef void (*sighandler_t)(int sig); + #include "services.h" #include "impl_msgq.hpp" diff --git a/messaging/impl_msgq.cc b/messaging/impl_msgq.cc index bf017d900b6e3f..d37b8c986d7237 100644 --- a/messaging/impl_msgq.cc +++ b/messaging/impl_msgq.cc @@ -85,7 +85,6 @@ Message * MSGQSubSocket::receive(bool non_blocking){ msgq_msg_t msg; MSGQMessage *r = NULL; - r = NULL; int rc = msgq_msg_recv(&msg, q); @@ -109,17 +108,23 @@ Message * MSGQSubSocket::receive(bool non_blocking){ } } - if (rc > 0){ - r = new MSGQMessage; - r->takeOwnership(msg.data, msg.size); - } - errno = msgq_do_exit ? EINTR : 0; if (!non_blocking){ std::signal(SIGINT, prev_handler_sigint); std::signal(SIGTERM, prev_handler_sigterm); } + errno = msgq_do_exit ? EINTR : 0; + + if (rc > 0){ + if (msgq_do_exit){ + msgq_msg_close(&msg); // Free unused message on exit + } else { + r = new MSGQMessage; + r->takeOwnership(msg.data, msg.size); + } + } + return (Message*)r; } diff --git a/messaging/messaging.cc b/messaging/messaging.cc index 9b12b298495112..1a9f860cd35666 100644 --- a/messaging/messaging.cc +++ b/messaging/messaging.cc @@ -4,20 +4,20 @@ Context * Context::create(){ Context * c; - if (std::getenv("MSGQ")){ - c = new MSGQContext(); - } else { + if (std::getenv("ZMQ")){ c = new ZMQContext(); + } else { + c = new MSGQContext(); } return c; } SubSocket * SubSocket::create(){ SubSocket * s; - if (std::getenv("MSGQ")){ - s = new MSGQSubSocket(); - } else { + if (std::getenv("ZMQ")){ s = new ZMQSubSocket(); + } else { + s = new MSGQSubSocket(); } return s; } @@ -60,10 +60,10 @@ SubSocket * SubSocket::create(Context * context, std::string endpoint, std::stri PubSocket * PubSocket::create(){ PubSocket * s; - if (std::getenv("MSGQ")){ - s = new MSGQPubSocket(); - } else { + if (std::getenv("ZMQ")){ s = new ZMQPubSocket(); + } else { + s = new MSGQPubSocket(); } return s; } @@ -82,10 +82,10 @@ PubSocket * PubSocket::create(Context * context, std::string endpoint){ Poller * Poller::create(){ Poller * p; - if (std::getenv("MSGQ")){ - p = new MSGQPoller(); - } else { + if (std::getenv("ZMQ")){ p = new ZMQPoller(); + } else { + p = new MSGQPoller(); } return p; } diff --git a/messaging/msgq.cc b/messaging/msgq.cc index 380b9c7fe6aee2..4ccd13df44527a 100644 --- a/messaging/msgq.cc +++ b/messaging/msgq.cc @@ -23,8 +23,8 @@ #include "msgq.hpp" -void sigusr1_handler(int signal) { - assert(signal == SIGUSR1); +void sigusr2_handler(int signal) { + assert(signal == SIGUSR2); } uint64_t msgq_get_uid(void){ @@ -80,7 +80,7 @@ void msgq_wait_for_subscriber(msgq_queue_t *q){ int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){ assert(size < 0xFFFFFFFF); // Buffer must be smaller than 2^32 bytes - std::signal(SIGUSR1, sigusr1_handler); + std::signal(SIGUSR2, sigusr2_handler); const char * prefix = "/dev/shm/"; char * full_path = new char[strlen(path) + strlen(prefix) + 1]; @@ -136,7 +136,7 @@ void msgq_close_queue(msgq_queue_t *q){ void msgq_init_publisher(msgq_queue_t * q) { - std::cout << "Starting publisher" << std::endl; + //std::cout << "Starting publisher" << std::endl; uint64_t uid = msgq_get_uid(); *q->write_uid = uid; @@ -150,6 +150,15 @@ void msgq_init_publisher(msgq_queue_t * q) { q->write_uid_local = uid; } +static void thread_signal(uint32_t tid) { + #ifndef SYS_tkill + // TODO: this won't work for multithreaded programs + kill(tid, SIGUSR2); + #else + syscall(SYS_tkill, tid, SIGUSR2); + #endif +} + void msgq_init_subscriber(msgq_queue_t * q) { assert(q != NULL); assert(q->num_readers != NULL); @@ -173,7 +182,7 @@ void msgq_init_subscriber(msgq_queue_t * q) { *q->read_uids[i] = 0; // Wake up reader in case they are in a poll - syscall(SYS_tkill, old_uid & 0xFFFFFFFF, SIGUSR1); + thread_signal(old_uid & 0xFFFFFFFF); } continue; @@ -196,7 +205,7 @@ void msgq_init_subscriber(msgq_queue_t * q) { } } - std::cout << "New subscriber id: " << q->reader_id << " uid: " << q->read_uid_local << " " << q->endpoint << std::endl; + //std::cout << "New subscriber id: " << q->reader_id << " uid: " << q->read_uid_local << " " << q->endpoint << std::endl; msgq_reset_reader(q); } @@ -278,8 +287,7 @@ int msgq_msg_send(msgq_msg_t * msg, msgq_queue_t *q){ // Notify readers for (uint64_t i = 0; i < num_readers; i++){ uint64_t reader_uid = *q->read_uids[i]; - - syscall(SYS_tkill, reader_uid & 0xFFFFFFFF, SIGUSR1); + thread_signal(reader_uid & 0xFFFFFFFF); } return msg->size; diff --git a/service_list.yaml b/service_list.yaml index 884bfe9a5bf991..e551dac59f9508 100644 --- a/service_list.yaml +++ b/service_list.yaml @@ -25,7 +25,7 @@ encodeIdx: [8015, true, 20.] liveTracks: [8016, true, 20.] sendcan: [8017, true, 100.] logMessage: [8018, true, 0.] -liveCalibration: [8019, true, 5.] +liveCalibration: [8019, true, 4., 4] androidLog: [8020, true, 0.] carState: [8021, true, 100., 10] # 8022 is reserved for sshd @@ -68,7 +68,7 @@ orbFeaturesSummary: [8062, true, 0.] driverMonitoring: [8063, true, 5., 1] liveParameters: [8064, true, 10.] liveMapData: [8065, true, 0.] -cameraOdometry: [8066, true, 5.] +cameraOdometry: [8066, true, 20.] pathPlan: [8067, true, 20.] kalmanOdometry: [8068, true, 0.] thumbnail: [8069, true, 0.2, 1]