Skip to content

Commit b85f4f9

Browse files
authored
fix(websocket sink): send encoded message as binary frame (#18060)
* fix(websocket sink): send encoded message as binary frame * fix: add an extra flag to decide frame format * fix: address comments * fix: binary frames for avro and native codecs * fix: list all codes instead of using default * fix: use use statement * fix: extract the logic into a function * fix: make the func const * fix: fix format
1 parent c592cb1 commit b85f4f9

File tree

1 file changed

+18
-1
lines changed

1 file changed

+18
-1
lines changed

src/sinks/websocket/sink.rs

+18-1
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,17 @@ impl WebSocketSink {
233233
Ok(())
234234
}
235235

236+
const fn should_encode_as_binary(&self) -> bool {
237+
use codecs::encoding::Serializer::{
238+
Avro, Csv, Gelf, Json, Logfmt, Native, NativeJson, RawMessage, Text,
239+
};
240+
241+
match self.encoder.serializer() {
242+
RawMessage(_) | Avro(_) | Native(_) => true,
243+
Csv(_) | Logfmt(_) | Gelf(_) | Json(_) | Text(_) | NativeJson(_) => false,
244+
}
245+
}
246+
236247
async fn handle_events<I, WS, O>(
237248
&mut self,
238249
input: &mut I,
@@ -258,6 +269,7 @@ impl WebSocketSink {
258269

259270
let bytes_sent = register!(BytesSent::from(Protocol("websocket".into())));
260271
let events_sent = register!(EventsSent::from(Output(None)));
272+
let encode_as_binary = self.should_encode_as_binary();
261273

262274
loop {
263275
let result = tokio::select! {
@@ -298,7 +310,12 @@ impl WebSocketSink {
298310
Ok(()) => {
299311
finalizers.update_status(EventStatus::Delivered);
300312

301-
let message = Message::text(String::from_utf8_lossy(&bytes));
313+
let message = if encode_as_binary {
314+
Message::binary(bytes)
315+
}
316+
else {
317+
Message::text(String::from_utf8_lossy(&bytes))
318+
};
302319
let message_len = message.len();
303320

304321
ws_sink.send(message).await.map(|_| {

0 commit comments

Comments
 (0)