Skip to content

Commit

Permalink
Consumer header support for >0.11.4 api. Ref KxSystems#48
Browse files Browse the repository at this point in the history
  • Loading branch information
sshanks-kx committed Jun 23, 2020
1 parent e13c115 commit 93e5b77
Showing 1 changed file with 30 additions and 2 deletions.
32 changes: 30 additions & 2 deletions kfk.c
Original file line number Diff line number Diff line change
Expand Up @@ -590,16 +590,44 @@ EXP K1(kfkSubscription){
static J pu(J u){return 1000000LL*(u-10957LL*86400000LL);}
// `mtype`topic`partition`data`key`offset`opaque
K decodeMsg(const rd_kafka_t* rk, const rd_kafka_message_t *msg) {

#if (RD_KAFKA_VERSION >= 0x000b04ff)
rd_kafka_headers_t* hdrs = NULL;
rd_kafka_message_headers(msg,&hdrs);
K kHdrs = NULL;
if (hdrs==NULL)
kHdrs = xD(ktn(KS,0),ktn(KS,0));
else
{
K keys = ktn(KS,(int)rd_kafka_header_cnt(hdrs));
K vals = knk(0);
size_t idx = 0;
const char *name;
const void *value;
size_t size;
while (!rd_kafka_header_get_all(hdrs, idx++,&name, &value, &size))
{
kS(keys)[idx-1]=ss((char*)name);
K val = ktn(KG,(int)size);
memcpy(kG(val),value,(int)size);
jk(&vals,val);
}
kHdrs = xD(keys,vals);
}
#else
K kHdrs = xD(ktn(KS,0),ktn(KS,0));
#endif

K x= ktn(KG, msg->len), y=ktn(KG, msg->key_len), z;
J ts= rd_kafka_message_timestamp(msg, NULL);
memmove(kG(x), msg->payload, msg->len);
memmove(kG(y), msg->key, msg->key_len);
z= ktj(-KP, ts > 0 ? pu(ts) : nj);
return xd0(8,
return xd0(9,
"mtype", msg->err ? ks((S) rd_kafka_err2name(msg->err)) : r1(S0),
"topic", msg->rkt ? ks((S) rd_kafka_topic_name(msg->rkt)) : r1(S0),
"client", ki(indexClient(rk)), "partition", ki(msg->partition), "offset", kj(msg->offset),
"msgtime", z, "data", x, "key", y, (S) 0);
"msgtime", z, "data", x, "key", y, "headers", kHdrs, (S) 0);
}

J pollClient(rd_kafka_t *rk, J timeout, J maxcnt) {
Expand Down

0 comments on commit 93e5b77

Please sign in to comment.