Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ArrayBuffer errors with large amounts of messages #55

Closed
fbushman opened this issue Jan 18, 2023 · 7 comments · Fixed by #68
Closed

ArrayBuffer errors with large amounts of messages #55

fbushman opened this issue Jan 18, 2023 · 7 comments · Fixed by #68
Labels
bug Something isn't working

Comments

@fbushman
Copy link

fbushman commented Jan 18, 2023

Hey,

Great library so far, but I encountered an issue that I can't explain.
A bit of background; I'm sending simulated air traffic to a browser that is displaying it in Leaflet. I'm sending info about 6000 planes, an update each 10 seconds. So about 600 messages per second. After 'some time' (seemingly random, but quicker with higher volumes) I get a pair of errors out of amqp-websocket-client.js:

ERROR TypeError: First argument to DataView constructor must be an ArrayBuffer
    at new DataView (<anonymous>)
    at AMQPWebSocketClient.handleMessage (amqp-websocket-client.js:59:38)
    at WebSocket.wrapFn (zone.js:769:39)
    at _ZoneDelegate.invokeTask (zone.js:409:31)
    at core.mjs:25299:55
    at AsyncStackTaggingZoneSpec.onInvokeTask (core.mjs:25299:36)
    at _ZoneDelegate.invokeTask (zone.js:408:60)
    at Object.onInvokeTask (core.mjs:25607:33)
    at _ZoneDelegate.invokeTask (zone.js:408:60)
    at Zone.runTask (zone.js:178:47)
ERROR RangeError: offset is out of bounds
    at Uint8Array.set (<anonymous>)
    at AMQPWebSocketClient.handleMessage (amqp-websocket-client.js:58:38)
    at WebSocket.wrapFn (zone.js:769:39)
    at _ZoneDelegate.invokeTask (zone.js:409:31)
    at core.mjs:25299:55
    at AsyncStackTaggingZoneSpec.onInvokeTask (core.mjs:25299:36)
    at _ZoneDelegate.invokeTask (zone.js:408:60)
    at Object.onInvokeTask (core.mjs:25607:33)
    at _ZoneDelegate.invokeTask (zone.js:408:60)
    at Zone.runTask (zone.js:178:47)

As far as I can tell the data is intact. I have another listener on the same exchange (this one in Java) that parses the messages just fine.

Here is the code for the consumer running in the browser:

private queue_name = uuidv4()
private queue_options = { passive: false, durable: false, autoDelete: true, exclusive: true }
private exchange_name = "exchange_name"
private exchange_type = "fanout"
private exchange_options = { passive: false, durable: true, autoDelete: false, internal: false }
private routing_key = "gps"
private subscribe_options = { noAck: false, exclusive: false }

private connectRabbitMQ() {
    const amqp = new AMQPWebSocketClient("ws://localhost:15670", "/", "rabbituser", "rabbitpassword")
    amqp.connect().then(conn => {
      conn.channel().then(channel => {
        this.rabbitmq_channel = channel
        channel.exchangeDeclare(this.exchange_name, this.exchange_type, this.exchange_options)
        channel.queue(this.queue_name, this.queue_options).then(async queue => {
          await queue.bind(this.exchange_name, this.routing_key)
          await queue.subscribe(this.subscribe_options, msg => {
            if (msg.routingKey === "track") {
              let newTrack: TrackInfo = JSON.parse(msg.bodyToString() as string) as TrackInfo;

              // Some handling...

              msg.ack();
            } else if (msg.routingKey === "plot") {
              let newPlot: PlotInfo = JSON.parse(msg.bodyToString() as string) as PlotInfo;

              // Some handling...

              msg.ack();
            } else {
              console.log(msg);
              console.log(msg.bodyToString());
            }
          })
        })
      })
    })

I am using your latest version:

"@cloudamqp/amqp-client": "^2.1.1"

Am I missing a step, or is something actually going wrong?

@fbushman
Copy link
Author

The track routed messages are on average 63 uint8's long.
The plot routed messages are on average 102 uint8's long.

@ngbrown
Copy link
Contributor

ngbrown commented Feb 16, 2023

I'm running amqp-client in the browser over websockets, and I got the first of these exceptions. I wouldn't say I was receiving a lot of data at once unless the browser decided to batch a bunch of messages. The error did appear to repeat for every message until the connection was closed by a heartbeat timeout.

Screenshot - 2023-02-16 , 8_40_30 AM_ver001
Screenshot - 2023-02-16 , 8_40_39 AM_ver001

The native .NET clients had no problems with the same traffic.

I'm not very familiar with byte and buffer manipulation in JavaScript, so I'm just spit-balling:

  • How often is this.frameSize === 0 and this.framePos !== 0 in test conditions? i.e., is this code exercised much?
  • Should new DataView(this.frameBuffer) instead pass the underlying ArrayBuffer like new DataView(this.frameBuffer.buffer)? This is how it is done later in the handler, with new AMQPView(this.frameBuffer.buffer, 0, this.frameSize).
  • This fails in the browser with the same error: new DataView(new Uint8Array(32)), so if the type is wrong, I'm surprised that TypeScript isn't erroring on it.

@carlhoerberg
Copy link
Member

carlhoerberg commented Feb 17, 2023

Thank you for reporting, and your assessment is right. Also surprised that typescript doesn't react to it..

@carlhoerberg
Copy link
Member

Will try to reproduce the offset is out of bounds error too

@ngbrown
Copy link
Contributor

ngbrown commented Feb 17, 2023

There's no test coverage for amqp-websocket-client.ts 🤔.

As for TypeScript, the DataView spec clearly only allows ArrayBuffer | SharedArrayBuffer but because TypeScript is "structurally typed" it can't catch this sort of error. Both ArrayBuffer and Uint8Array implement byteLength and slice(), so it's a match. More at microsoft/TypeScript#52815...

@ngbrown
Copy link
Contributor

ngbrown commented Feb 17, 2023

Also, recent versions of node implement these same buffer and data view types, so maybe move back to a common set of code for most of this logic?

@ngbrown
Copy link
Contributor

ngbrown commented Jul 18, 2023

I'm still having errors related to this issue and am getting "ERROR RangeError: offset is out of bounds" exceptions.

Somehow, leftOfFrame is ending up negative, causing the copyBytes variable being passed into the length parameter of the Uint8Array constructor to also be negative:

Screenshot - 2023-07-18 , 8_39_48 AM_ver001

This happens after some indeterminate number of messages. In this case it was 41,360 WebSocket messages. While the inspector shows a number of messages in the last twenty seconds, so it's hard to tell exactly which message it crashed on, it might have been the first message after 3 long messages that were part of a single message (first one starts with I< <amq.ctag). The large-size sequence of received messages were 4.10 kB+4.10 kB+2.40 kB. This is likely multiple individual AMQP messages packed together, because there are multiple amq.ctag strings within those three WebSocket messages, and then there's the 9 sent messages that look like acknowledgements. The last large WebSocket message ends with Î (0xc38e?) like every other smaller message, so it appears to be complete?

Then the next received message is a shorter 644 byte message, which might be the message being handled during this exception within handleMessage:

Screenshot - 2023-07-18 , 9_36_52 AM

In this case, I am using the default 4096 for frameMax. Elsewhere, it is mentioned frameMax is for large headers. I'm not clear on a header is vs. a message. I doubt any AMQP header itself is larger than 4096 bytes. Is it possible for there to be a direct detection and a clear Error thrown for when this parameter needs to be larger?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants