Skip to content

Commit

Permalink
expect GTFS_RT_2 stream & consumer to be created by the user 💥📝
Browse files Browse the repository at this point in the history
  • Loading branch information
derhuerst committed Dec 4, 2024
1 parent 1d21a2a commit b9b7f32
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 40 deletions.
7 changes: 7 additions & 0 deletions cli.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ const {
'nats-client-name': {
type: 'string',
},
'nats-consumer-name': {
type: 'string',
},
't0': {
type: 'string',
},
Expand All @@ -54,6 +57,10 @@ Options:
Default: $NATS_USER
--nats-client-name Name identifying the NATS client among others.
Default: ${NATS_CLIENT_NAME_PREFIX}\${randomHex(4)}
--nats-consumer-name Name of the NATS JetStream consumer on the
GTFS_RT_2 stream.
Default: \$GTFS_RT_CONSUMER_NAME, otherwise \`nats-

Check failure on line 62 in cli.js

View workflow job for this annotation

GitHub Actions / test (20)

Unnecessary escape character: \$

Check failure on line 62 in cli.js

View workflow job for this annotation

GitHub Actions / test (22)

Unnecessary escape character: \$
consuming-gtfs-rt-server\`
--diff-entities-ttl Time to keep DIFFERENTIAL-mode GTFS-RT FeedEntities
in the combined FULL_DATASET-mode feed for, in seconds.
Default: 10 minutes
Expand Down
63 changes: 23 additions & 40 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,11 @@ import {
import {MAJOR_VERSION} from './lib/major-version.js'
import {createLogger} from './lib/logger.js'
import {createMetricsServer, register as metricsRegister} from './lib/metrics.js'
import {
connectToNats,
AckPolicy as NatsAckPolicy,
DeliverPolicy as NatsDeliverPolicy,
} from './lib/nats.js'
import {connectToNats} from './lib/nats.js'

// todo: DRY with OpenDataVBB/gtfs-rt-feed
const NATS_JETSTREAM_GTFSRT_STREAM_NAME = `GTFS_RT_${MAJOR_VERSION}`

// todo: DRY with OpenDataVBB/gtfs-rt-feed
// https://github.com/OpenDataVBB/gtfs-rt-feed/blob/9bcc8e46945107e1a96d65f612df72c1404d2818/lib/gtfs-rt-mqtt-topics.js#L11
const GTFS_RT_TOPIC_PREFIX = 'gtfsrt.'

// > enum Incrementality {
// > FULL_DATASET = 0;
// > DIFFERENTIAL = 1;
Expand Down Expand Up @@ -59,25 +51,22 @@ const serveGtfsRtDataFromNats = async (cfg, opt = {}) => {

const {
natsOpts,
natsConsumerDurableName,
natsConsumerTtl,
natsConsumerName,
// shiftTimesToEnsureGaps: shouldShiftTimesToEnsureGaps,
differentialEntitiesTtl,
t0,
} = {
natsOpts: {},
natsConsumerDurableName: process.env.MATCHING_CONSUMER_DURABLE_NAME
? process.env.MATCHING_CONSUMER_DURABLE_NAME
: NATS_JETSTREAM_GTFSRT_STREAM_NAME + '_' + Math.random().toString(16).slice(2, 6),
natsConsumerTtl: 10 * 60 * 1000, // 10 minutes
natsConsumerName: process.env.GTFS_RT_CONSUMER_NAME
? process.env.GTFS_RT_CONSUMER_NAME
: 'nats-consuming-gtfs-rt-server',
// shiftTimesToEnsureGaps: false,
differentialEntitiesTtl: process.env.GTFS_RT_DIFFERENTIAL_ENTITIES_TTL
? process.env.GTFS_RT_DIFFERENTIAL_ENTITIES_TTL
: 10 * 60 * 1000, // 10m
t0: Date.now(),
...opt,
}
ok(Number.isInteger(natsConsumerTtl), 'opt.natsConsumerTtl must be an integer')
ok(Number.isInteger(differentialEntitiesTtl), 'opt.differentialEntitiesTtl must be an integer')
ok(Number.isInteger(t0), 'opt.t0 must be an integer')

Expand Down Expand Up @@ -291,39 +280,33 @@ const serveGtfsRtDataFromNats = async (cfg, opt = {}) => {
}

{
const natsJetstreamManager = await natsClient.jetstreamManager()
const natsJetstreamClient = await natsClient.jetstream()

{
// create/update NATS JetStream stream for GTFS-RT data
const streamInfo = await natsJetstreamManager.streams.add({
name: NATS_JETSTREAM_GTFSRT_STREAM_NAME,
subjects: [
GTFS_RT_TOPIC_PREFIX + '>',
],
// todo: limits?
})
// query details of the (externally created) NATS JetStream stream for AUS IstFahrts
const stream = await natsJetstreamClient.streams.get(NATS_JETSTREAM_GTFSRT_STREAM_NAME)
const streamInfo = await stream.info()
logger.debug({
streamInfo,
}, 'created/re-used NATS JetStream stream')
}, 'using NATS JetStream stream for GTFS-RT feedEntities')
}

// create durable NATS JetStream consumer for GTFS-RT stream
const consumerInfo = await natsJetstreamManager.consumers.add(NATS_JETSTREAM_GTFSRT_STREAM_NAME, {
ack_policy: NatsAckPolicy.Explicit,
durable_name: natsConsumerDurableName,
deliver_policy: NatsDeliverPolicy.New,
// todo: this makes the consumer stop receiving new messages after a short time 🤔
// inactive_threshold: natsConsumerTtl,
})
logger.debug({
consumerInfo,
}, 'created/re-used NATS JetStream consumer')
const gtfsRtConsumer = await natsJetstreamClient.consumers.get(
NATS_JETSTREAM_GTFSRT_STREAM_NAME,
natsConsumerName,
)

{
// query details of the (externally created) NATS JetStream consumer
const consumerInfo = await gtfsRtConsumer.info()
logger.debug({
consumerInfo,
}, 'using NATS JetStream consumer')
}

const tripUpdatesConsumer = await natsJetstreamClient.consumers.get(NATS_JETSTREAM_GTFSRT_STREAM_NAME, consumerInfo.name)
const tripUpdatesSub = await tripUpdatesConsumer.consume()
const gtfsRtSub = await gtfsRtConsumer.consume()
execPipe(
tripUpdatesSub,
gtfsRtSub,
asyncMap(onNatsMsg),
asyncConsume,
).catch(abortWithError)
Expand Down
51 changes: 51 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ Options:
Default: $NATS_USER
--nats-client-name Name identifying the NATS client among others.
Default: vdv453-1-${randomHex(4)}
--nats-consumer-name Name of the NATS JetStream consumer on the
GTFS_RT_2 stream.
Default: $GTFS_RT_CONSUMER_NAME, otherwise `nats-
consuming-gtfs-rt-server`
--diff-entities-ttl Time to keep DIFFERENTIAL-mode GTFS-RT FeedEntities
in the combined FULL_DATASET-mode feed for, in seconds.
Default: 10 minutes
Expand All @@ -46,6 +50,53 @@ Examples:
serve-gtfs-rt-from-nats --port 1234 --nats-user foo
```

### create NATS stream & consumer

Before running `nats-consuming-gtfs-rt-server`, you must create a [NATS JetStream](https://docs.nats.io/nats-concepts/jetstream) [stream](https://docs.nats.io/nats-concepts/jetstream/streams) called `GTFS_RT_2` holding the `FeedEntity` GTFS-RT messages. This can be done using the [NATS CLI](https://github.com/nats-io/natscli):


```shell
nats stream add \
# omit this if you want to configure more details
--defaults \
# collect all messages published to these subjects
--subjects='gtfsrt.>' \
# acknowledge publishes
--ack \
# with limited storage, discard the oldest limits first
--retention=limits --discard=old \
--description='GTFS-Realtime FeedEntity messages' \
# name of the stream
GTFS_RT_2
```

On the `GTFS_RT_2` stream, you must also create a durable [consumer](https://docs.nats.io/nats-concepts/jetstream/consumers):

```shell
nats consumer add \
# omit this if you want to configure more details
--defaults \
# create a pull-based consumer (refer to the NATS JetStream docs)
--pull \
# let gtfs-rt-feed explicitly acknowledge all received messages
--ack=explicit \
# let the newly created consumer start with the latest messages in GTFS_RT_2 (not all)
--deliver=new \
# send gtfs-rt-feed at most 200 messages at once
--max-pending=500 \
# when & how often to re-deliver a message that hasn't been acknowledged (usually because it couldn't be processed)
--max-deliver=3 \
--backoff=linear \
--backoff-steps=2 \
--backoff-min=15s \
--backoff-max=1m \
--description 'OpenDataVBB/nats-consuming-gtfs-rt-server' \
# name of the stream
GTFS_RT_2 \
# name of the consumer
nats-consuming-gtfs-rt-server
```


## Related

Expand Down

0 comments on commit b9b7f32

Please sign in to comment.