Skip to content

Commit

Permalink
Fixed decoding of snappy-java compression framing (issue #55)
Browse files Browse the repository at this point in the history
  • Loading branch information
edenhill committed Jan 8, 2014
1 parent 7a01ba5 commit b99e357
Showing 1 changed file with 168 additions and 42 deletions.
210 changes: 168 additions & 42 deletions rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -2249,9 +2249,125 @@ static void rd_kafka_broker_producer_serve (rd_kafka_broker_t *rkb) {
}


/**
* Decompress Snappy message with Snappy-java framing.
* Returns a malloced buffer with the uncompressed data, or NULL on failure.
*/
static char *rd_kafka_snappy_java_decompress (rd_kafka_broker_t *rkb,
int64_t Offset,
const char *inbuf,
size_t inlen,
size_t *outlenp) {
int pass;
char *outbuf = NULL;

/**
* Traverse all chunks in two passes:
* pass 1: calculate total uncompressed length
* pass 2: uncompress
*
* Each chunk is prefixed with 4: length */

for (pass = 1 ; pass <= 2 ; pass++) {
ssize_t of = 0; /* inbuf offset */
ssize_t uof = 0; /* outbuf offset */

while (of + 4 <= inlen) {
/* compressed length */
uint32_t clen = ntohl(*(uint32_t *)(inbuf+of));
/* uncompressed length */
size_t ulen;
int r;

of += 4;

if (unlikely(clen > inlen - of)) {
rd_rkb_dbg(rkb, MSG, "SNAPPY",
"Invalid snappy-java chunk length for "
"message at offset %"PRId64" "
"(%"PRIu32">%zd: ignoring message",
Offset, clen, inlen - of);
return NULL;
}

/* Acquire uncompressed length */
if (unlikely(!snappy_uncompressed_length(inbuf+of,
clen, &ulen))) {
rd_rkb_dbg(rkb, MSG, "SNAPPY",
"Failed to get length of "
"(snappy-java framed) Snappy "
"compressed payload for message at "
"offset %"PRId64" (%"PRId32" bytes): "
"ignoring message",
Offset, clen);
return NULL;
}

if (pass == 1) {
/* pass 1: calculate total length */
of += clen;
uof += ulen;
continue;
}

/* pass 2: Uncompress to outbuf */
if (unlikely((r = snappy_uncompress(inbuf+of, clen,
outbuf+uof)))) {
rd_rkb_dbg(rkb, MSG, "SNAPPY",
"Failed to decompress Snappy-java framed "
"payload for message at offset %"PRId64
" (%"PRId32" bytes): %s: ignoring message",
Offset, clen,
strerror(-r/*negative errno*/));
free(outbuf);
return NULL;
}

of += clen;
uof += ulen;
}

if (unlikely(of != inlen)) {
rd_rkb_dbg(rkb, MSG, "SNAPPY",
"%zd trailing bytes in Snappy-java framed compressed "
"data at offset %"PRId64": ignoring message",
inlen - of, Offset);
return NULL;
}

if (pass == 1) {
if (uof <= 0) {
rd_rkb_dbg(rkb, MSG, "SNAPPY",
"Empty Snappy-java framed data "
"at offset %"PRId64" (%zd bytes): "
"ignoring message",
Offset, uof);
return NULL;
}

/* Allocate memory for uncompressed data */
outbuf = malloc(uof);
if (unlikely(!outbuf)) {
rd_rkb_dbg(rkb, MSG, "SNAPPY",
"Failed to allocate memory for uncompressed "
"Snappy data at offset %"PRId64
" (%zd bytes): %s",
Offset, uof, strerror(errno));
return NULL;
}

} else {
/* pass 2 */
*outlenp = uof;
}
}

return outbuf;
}


/**
* Parsers a MessageSet and enqueues internal ops on the local
* Parses a MessageSet and enqueues internal ops on the local
* application queue for each Message.
*/
static rd_kafka_resp_err_t rd_kafka_messageset_handle (rd_kafka_broker_t *rkb,
Expand Down Expand Up @@ -2367,56 +2483,66 @@ static rd_kafka_resp_err_t rd_kafka_messageset_handle (rd_kafka_broker_t *rkb,
int r;
static const char snappy_java_magic[] =
{ 0x82, 'S','N','A','P','P','Y', 0 };
static const int snappy_java_hdrlen = 8+4+4+4;
static const int snappy_java_hdrlen = 8+4+4;

/* snappy-java adds its own header (SnappyCodec)
* which is not compatible with the official Snappy
* implementation. Skip it.
* 8: magic, 4: version, 4: compatible, 4: unknown? */
if (likely(Value_len > snappy_java_hdrlen &&
* implementation.
* 8: magic, 4: version, 4: compatible
* followed by any number of chunks:
* 4: length
* ...: snappy-compressed data. */
if (likely(Value_len > snappy_java_hdrlen + 4 &&
!memcmp(inbuf, snappy_java_magic, 8))) {
/* snappy-java framing */

inbuf = inbuf + snappy_java_hdrlen;
Value_len -= snappy_java_hdrlen;
}

outbuf = rd_kafka_snappy_java_decompress(rkb,
hdr->Offset,
inbuf,
Value_len,
&outlen);
if (unlikely(!outbuf))
continue;

/* Acquire uncompressed length */
if (unlikely(!snappy_uncompressed_length(inbuf,
Value_len,
&outlen))) {
rd_rkb_dbg(rkb, MSG, "SNAPPY",
"Failed to get length of Snappy "
"compressed payload "
"for message at offset %"PRId64
" (%"PRId32" bytes): "
"ignoring message",
hdr->Offset, Value_len);
continue;
}

rd_rkb_dbg(rkb, MSG, "SNAPPY",
"Uncompressed length %zd for %"PRId32
" compressed bytes at offset %"PRId64,
outlen, Value_len, hdr->Offset);

/* Allocate output buffer for uncompressed data */
outbuf = malloc(outlen);
} else {
/* no framing */

/* Acquire uncompressed length */
if (unlikely(!snappy_uncompressed_length(inbuf,
Value_len,
&outlen))) {
rd_rkb_dbg(rkb, MSG, "SNAPPY",
"Failed to get length of Snappy "
"compressed payload "
"for message at offset %"PRId64
" (%"PRId32" bytes): "
"ignoring message",
hdr->Offset, Value_len);
continue;
}

/* Uncompress to outbuf */
if (unlikely((r = snappy_uncompress(inbuf,
Value_len,
outbuf)))) {
rd_rkb_dbg(rkb, MSG, "SNAPPY",
"Failed to decompress Snappy "
"payload for message at offset "
"%"PRId64
" (%"PRId32" bytes): %s: "
"ignoring message",
hdr->Offset, Value_len,
strerror(-r/*negative errno*/));
free(outbuf);
continue;
/* Allocate output buffer for uncompressed data */
outbuf = malloc(outlen);

/* Uncompress to outbuf */
if (unlikely((r = snappy_uncompress(inbuf,
Value_len,
outbuf)))) {
rd_rkb_dbg(rkb, MSG, "SNAPPY",
"Failed to decompress Snappy "
"payload for message at offset "
"%"PRId64
" (%"PRId32" bytes): %s: "
"ignoring message",
hdr->Offset, Value_len,
strerror(-r/*negative errno*/));
free(outbuf);
continue;
}
}

}
break;
}
Expand Down

0 comments on commit b99e357

Please sign in to comment.