Skip to content

Commit

Permalink
read FeedMessages instead of FeedEntitys πŸ’₯βœ…πŸ“
Browse files Browse the repository at this point in the history
fixes #1
  • Loading branch information
derhuerst committed Sep 15, 2024
1 parent 7232101 commit a90e229
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 28 deletions.
28 changes: 23 additions & 5 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
'use strict'

const {FeedHeader} = require('gtfs-rt-bindings')
const {Writable} = require('stream')
const createEntitiesStore = require('./lib/entities-store')

const {DIFFERENTIAL} = FeedHeader.Incrementality

const tripSignature = (u) => {
if (u.trip.trip_id) return u.trip.trip_id
if (u.trip.route_id && u.vehicle.id) {
Expand Down Expand Up @@ -36,7 +39,7 @@ const gtfsRtAsDump = (opt = {}) => {

const entitiesStore = createEntitiesStore(ttl, timestamp)

const write = (entity) => {
const processFeedEntity = (entity) => {
// If the entity is not being deleted, exactly one of 'trip_update', 'vehicle' and 'alert' fields should be populated.
// https://developers.google.com/transit/gtfs-realtime/reference#message-feedentity
let sig = null
Expand All @@ -45,7 +48,7 @@ const gtfsRtAsDump = (opt = {}) => {
} else if (entity.vehicle) {
sig = vehiclePositionSignature(entity.vehicle)
}
// todo: alert
// todo: alert, see #1

if (sig !== null) {
entitiesStore.put(sig, entity)
Expand All @@ -55,6 +58,21 @@ const gtfsRtAsDump = (opt = {}) => {
err.feedEntity = entity
throw err
}
const processFeedMessage = (msg) => {
if (msg.header.gtfs_realtime_version !== '2.0') {
const err = new Error('FeedMessage GTFS-RT 2.0')
err.feedMessage = msg
throw err
}
if (msg.header.incrementality !== DIFFERENTIAL) {
const err = new Error('FeedMessage must be DIFFERENTIAL')
err.feedMessage = msg
throw err
}
for (const entity of msg.entity) {
processFeedEntity(entity)
}
}

let feedMessage = null
const asFeedMessage = () => {
Expand All @@ -63,13 +81,13 @@ const gtfsRtAsDump = (opt = {}) => {

const out = new Writable({
objectMode: true,
write: (entity, _, cb) => {
write(entity)
write: (feedMsg, _, cb) => {
processFeedMessage(feedMsg)
out.emit('change')
cb(null)
},
writev: (chunks, cb) => {
for (const {chunk: entity} of chunks) write(entity)
for (const {chunk: feedMsg} of chunks) processFeedMessage(feedMsg)
out.emit('change')
cb(null)
},
Expand Down
6 changes: 3 additions & 3 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ const toFull = toFullDataset({
})
toFull.on('error')

differentialFeedEntities.pipe(toFull)
differentialFeedMessages.pipe(toFull)
setInterval(() => {
console.log(toFull.asFeedMessage())
}, 5000)
```

`toFull` will be a [writable stream](https://nodejs.org/api/stream.html#stream_class_stream_writable) in [object mode](https://nodejs.org/api/stream.html#stream_object_mode) that expects JS objects in the [`FeedEntity`](https://gtfs.org/documentation/realtime/reference/#message-feedentity) structure/format.
`toFull` will be a [writable stream](https://nodejs.org/api/stream.html#stream_class_stream_writable) in [object mode](https://nodejs.org/api/stream.html#stream_object_mode) that expects JS objects in the [`FeedMessage`](https://gtfs.org/documentation/realtime/reference/#message-feedmessage) structure/format.

`toFull.asFeedMessage()` returns a [protocol-buffer-encoded](https://protobuf.dev) [`FeedMessage`](https://gtfs.org/documentation/realtime/reference/#message-feedmessage) with all relevant `FeedEntity`s that have been written into `toFull` so far.
`toFull.asFeedMessage()` returns a [protocol-buffer-encoded](https://protobuf.dev) `FULL_DATASET`-mode [`FeedMessage`](https://gtfs.org/documentation/realtime/reference/#message-feedmessage) with all `FeedEntity`s from the `DIFFERENTIAL`-mode `FeedMessage`s that have been written into `toFull` so far, as long as they're still relevant.

`toFull.nrOfEntities()` returns the number of `FeedEntity`s that are currently part of the `FeedMessage`.

Expand Down
8 changes: 3 additions & 5 deletions test/data.ndjson
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
{"id":"1","vehicle":{"trip":{"trip_id":"1|64512|1|86|12032020","route_id":"n18"},"vehicle":{"id":null,"label":"U Mohrenstr."},"position":{"latitude":52.5,"longitude":13.3},"stop_id":"900000005205","current_status":2}}
{"id":"2","vehicle":{"trip":{"trip_id":"1|64466|1|86|12032020","route_id":"n6"},"vehicle":{"id":null,"label":"U Alt-Mariendorf"},"position":{"latitude":52.1,"longitude":13.1},"stop_id":"900000012106","current_status":1}}
{"id":"3","trip_update":{"trip":{"trip_id":"1|25445|2|86|12032020","route_id":"n3"},"vehicle":{"id":"40813","label":"U Wittenbergplatz"},"stop_time_update":[{"stop_id":"900000050301","arrival":{"time":null,"delay":null},"departure":{"time":1584060600,"delay":0},"schedule_relationship":0},{"stop_id":"900000023354","arrival":{"time":1584062400,"delay":-60},"departure":{"time":1584062400,"delay":-60},"schedule_relationship":0},{"stop_id":"900000023203","arrival":{"time":1584062460,"delay":-60},"departure":{"time":1584062460,"delay":-60},"schedule_relationship":0},{"stop_id":"900000023204","arrival":{"time":1584062580,"delay":-60},"departure":{"time":1584062580,"delay":-60},"schedule_relationship":0},{"stop_id":"900000056101","arrival":{"time":1584062640,"delay":-60},"departure":{"time":null,"delay":null},"schedule_relationship":0}]}}
{"id":"4","vehicle":{"trip":{"trip_id":"1|64512|1|86|12032020","route_id":"n18"},"vehicle":{"id":null,"label":"U Mohrenstr."},"position":{"latitude":52.6,"longitude":13.4},"stop_id":"900000005205","current_status":2}}
{"id":"5","vehicle":{"trip":{"trip_id":"t1","route_id":"A"},"vehicle":{"id":"bus-12","label":"Bus 12"},"position":{"latitude":1.23,"longitude":2.34}}}
{"header":{"gtfs_realtime_version":"2.0","incrementality":1,"timestamp":1},"entity":[{"id":"1","vehicle":{"trip":{"trip_id":"1|64512|1|86|12032020","route_id":"n18"},"vehicle":{"id":null,"label":"U Mohrenstr."},"position":{"latitude":52.5,"longitude":13.3},"stop_id":"900000005205","current_status":2}},{"id":"2","trip_update":{"trip":{"trip_id":"1|25445|2|86|12032020","route_id":"n3"},"vehicle":{"id":"40813","label":"U Wittenbergplatz"},"stop_time_update":[{"stop_id":"900000050301","arrival":{"time":null,"delay":null},"departure":{"time":1584060600,"delay":0},"schedule_relationship":0},{"stop_id":"900000023354","arrival":{"time":1584062400,"delay":-60},"departure":{"time":1584062400,"delay":-60},"schedule_relationship":0},{"stop_id":"900000023203","arrival":{"time":1584062460,"delay":-60},"departure":{"time":1584062460,"delay":-60},"schedule_relationship":0},{"stop_id":"900000023204","arrival":{"time":1584062580,"delay":-60},"departure":{"time":1584062580,"delay":-60},"schedule_relationship":0},{"stop_id":"900000056101","arrival":{"time":1584062640,"delay":-60},"departure":{"time":null,"delay":null},"schedule_relationship":0}]}}]}
{"header":{"gtfs_realtime_version":"2.0","incrementality":1,"timestamp":2},"entity":[{"id":"3","vehicle":{"trip":{"trip_id":"1|64466|1|86|12032020","route_id":"n6"},"vehicle":{"id":null,"label":"U Alt-Mariendorf"},"position":{"latitude":52.1,"longitude":13.1},"stop_id":"900000012106","current_status":1}}]}
{"header":{"gtfs_realtime_version":"2.0","incrementality":1,"timestamp":3},"entity":[{"id":"4","vehicle":{"trip":{"trip_id":"1|64512|1|86|12032020","route_id":"n18"},"vehicle":{"id":null,"label":"U Mohrenstr."},"position":{"latitude":52.6,"longitude":13.4},"stop_id":"900000005205","current_status":2}}]}
28 changes: 13 additions & 15 deletions test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -124,22 +124,20 @@ pump(
strictEqual(changeEmitted, true, 'no `change` event emitted')
bufEqual(full.asFeedMessage(), Buffer.from(
`\
0a090a03322e301000180112520a0132224d0a1b0a15317c36343436367c317c38367\
c31323033323032302a026e36120a0d66665042159a99514120013a0c393030303030\
303132313036421212105520416c742d4d617269656e646f726612ac020a01331aa60\
20a1b0a15317c32353434357c327c38367c31323033323032302a026e33121c12001a\
08080010b8b1abf305220c39303030303030353033303128001236121108c4fffffff\
fffffffff0110c0bfabf3051a1108c4ffffffffffffffff0110c0bfabf305220c3930\
3030303030323333353428001236121108c4ffffffffffffffff0110fcbfabf3051a1\
108c4ffffffffffffffff0110fcbfabf305220c393030303030303233323033280012\
36121108c4ffffffffffffffff0110f4c0abf3051a1108c4ffffffffffffffff0110f\
4c0abf305220c39303030303030323332303428001225121108c4ffffffffffffffff\
0110b0c1abf3051a00220c39303030303030353631303128001a1a0a0534303831331\
211552057697474656e62657267706c61747a124f0a0134224a0a1c0a15317c363435\
0a090a03322e301000180112ac020a01321aa6020a1b0a15317c32353434357c327c3\
8367c31323033323032302a026e33121c12001a08080010b8b1abf305220c39303030\
303030353033303128001236121108c4ffffffffffffffff0110c0bfabf3051a1108c\
4ffffffffffffffff0110c0bfabf305220c3930303030303032333335342800123612\
1108c4ffffffffffffffff0110fcbfabf3051a1108c4ffffffffffffffff0110fcbfa\
bf305220c39303030303030323332303328001236121108c4ffffffffffffffff0110\
f4c0abf3051a1108c4ffffffffffffffff0110f4c0abf305220c39303030303030323\
332303428001225121108c4ffffffffffffffff0110b0c1abf3051a00220c39303030\
303030353631303128001a1a0a0534303831331211552057697474656e62657267706\
c61747a12520a0133224d0a1b0a15317c36343436367c317c38367c31323033323032\
302a026e36120a0d66665042159a99514120013a0c393030303030303132313036421\
212105520416c742d4d617269656e646f7266124f0a0134224a0a1c0a15317c363435\
31327c317c38367c31323033323032302a036e3138120a0d666652421566665641200\
23a0c393030303030303035323035420e120c55204d6f6872656e7374722e122c0a01\
3522270a070a0274312a0141120a0da4709d3f158fc2154042100a066275732d31321\
206427573203132`,
23a0c393030303030303035323035420e120c55204d6f6872656e7374722e`,
'hex'
))
strictEqual(full.timeModified(), timestamp(), 'invalid full.timeModified()')
Expand Down

0 comments on commit a90e229

Please sign in to comment.