Skip to content

Commit

Permalink
Merge pull request #20 from KxSystems/dev
Browse files Browse the repository at this point in the history
Subscriptions to multiple topics from a single client
  • Loading branch information
cmccarthy1 authored Dec 6, 2019
2 parents 6e6b55d + fea8f33 commit a005f01
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 50 deletions.
7 changes: 6 additions & 1 deletion examples/test_consumer.q
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,18 @@ kfk_cfg:(!) . flip(
(`statistics.interval.ms;`10000)
);
client:.kfk.Consumer[kfk_cfg];

data:();
.kfk.consumecb:{[msg]
msg[`data]:"c"$msg[`data];
msg[`rcvtime]:.z.p;
data,::enlist msg;}

.kfk.Sub[client;`test;enlist .kfk.PARTITION_UA];
// Topics to subscribe to
topic1:`test1; topic2:`test2;

// Subscribe to multiple topics from a single client
.kfk.Sub[client;;enlist .kfk.PARTITION_UA]each(topic1;topic2);

client_meta:.kfk.Metadata[client];
show client_meta`topics;
37 changes: 23 additions & 14 deletions examples/test_offsetc.q
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,38 @@ kfk_cfg:(!) . flip(
(`enable.auto.offset.store;`false)
);
client:.kfk.Consumer[kfk_cfg];
TOPIC:`test

// Topics to be published to
topic1:`test1
topic2:`test2
data:();

// Callback function for managing of messages
.kfk.consumecb:{[msg]
msg[`data]:"c"$msg[`data];
msg[`rcvtime]:.z.p;
data,::enlist msg;}
// Define Offset callback functionality
.kfk.offsetcb:{[cid;err;offsets]show (cid;err;offsets);}


show .kfk.AssignOffsets[client;TOPIC;(1#0i)!1#.kfk.OFFSET.END] // start replaying from the end
.kfk.Sub[client;TOPIC;(1#0i)!1#.kfk.OFFSET.END];

// Assign partitions to consume from specified offsets
show .kfk.AssignOffsets[client;;(1#0i)!1#.kfk.OFFSET.END]each (topic1;topic2)
// Subscribe to relevant topics from a defined client
.kfk.Sub[client;;(1#0i)!1#.kfk.OFFSET.END]each (topic1;topic2)

strt:.z.t
// The following example has been augmented to display and commit offsets for each of
// the available topics every 10 seconds
\t 5000
.z.ts:{
.z.ts:{n+:1;topic:$[n mod 2;topic1;topic2];
if[(5000<"i"$.z.t-strt)&1<count data;
seen:exec last offset by partition from data;
show "Position:";
show .kfk.PositionOffsets[client;TOPIC;seen];
show "Before commited:";
show .kfk.CommittedOffsets[client;TOPIC;seen];
.kfk.CommitOffsets[client;TOPIC;seen;0b]; // commit whatever is storred
show "After commited:";
show .kfk.CommittedOffsets[client;TOPIC;seen];]
-1 "\nPublishing information from topic :",string topic;
show seen:exec last offset by partition from data where topic=topic;
show "Position:";
show .kfk.PositionOffsets[client;topic;seen];
show "Before commited:";
show .kfk.CommittedOffsets[client;topic;seen];
.kfk.CommitOffsets[client;topic;seen;0b];
show "After commited:";
show .kfk.CommittedOffsets[client;topic;seen];]
}
20 changes: 14 additions & 6 deletions examples/test_offsetp.q
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,22 @@ kfk_cfg:(!) . flip(
(`fetch.wait.max.ms;`10)
);
producer:.kfk.Producer[kfk_cfg]
test_topic:.kfk.Topic[producer;`test;()!()]

// Create two topics associated with the producer publishing on `test1`test2
topic1:.kfk.Topic[producer;`test1;()!()]
topic2:.kfk.Topic[producer;`test2;()!()]

// Define a delivery callback
.kfk.drcb:{[cid;msg]show "Delivered msg on ",string[cid],": ",.Q.s1 msg;}

// Publish messages at different rates
.z.ts:{n+:1;
.kfk.Pub[topic1;0i;string x;""];
if[n mod 2;.kfk.Pub[topic2;0i;string x;""]]}
-1 "Publishing on topics:", " "sv{string .kfk.TopicName x}each(topic1;topic2);
.kfk.Pub[topic1;0i;string .z.p;""];
.kfk.Pub[topic2;0i;string .z.p;""];

.z.ts:{.kfk.Pub[test_topic;0i;string x;""]}
show "Publishing on topic:",string .kfk.TopicName test_topic;
.kfk.Pub[test_topic;0i;string .z.p;""];
show "Published 1 message";
show "Set timer with \t 1000 to publish message every second";
-1 "Published one message to each topic.\n";
-1"Set timer with \\t 1000 to publish a message on `test1 every second and on `test2 every two seconds.";

17 changes: 11 additions & 6 deletions examples/test_producer.q
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,18 @@ kfk_cfg:(!) . flip(
);
producer:.kfk.Producer[kfk_cfg]

test_topic:.kfk.Topic[producer;`test;()!()]
topic1:.kfk.Topic[producer;`test1;()!()]
topic2:.kfk.Topic[producer;`test2;()!()]

.z.ts:{.kfk.Pub[test_topic;.kfk.PARTITION_UA;string x;""]}
show "Publishing on topic:",string .kfk.TopicName test_topic;
.kfk.Pub[test_topic;.kfk.PARTITION_UA;string .z.p;""];
show "Published 1 message";
.z.ts:{n+:1;topic:$[n mod 2;topic1;topic2];
.kfk.Pub[topic;.kfk.PARTITION_UA;string x;""]}



-1 "Publishing on topics:",string[.kfk.TopicName topic1],", ",string[.kfk.TopicName topic2];
.kfk.Pub[;.kfk.PARTITION_UA;string .z.p;""]each(topic1;topic2);
-1 "Published one message for each topic";
producer_meta:.kfk.Metadata[producer];
show producer_meta`topics;
show "Set timer with \t 1000 to publish message every second";
-1 "Set timer with \\t 500 to publish a message each second to each topic.";

46 changes: 23 additions & 23 deletions kfk.c
Original file line number Diff line number Diff line change
Expand Up @@ -335,17 +335,15 @@ K decodeParList(rd_kafka_topic_partition_list_t *t){
return r;
}

rd_kafka_topic_partition_list_t* plistoffsetdict(S topic,K partitions){

static V plistoffsetdict(S topic,K partitions,rd_kafka_topic_partition_list_t *t_partition){
K dk=kK(partitions)[0],dv=kK(partitions)[1];
I*p;J*o,i;
p=kI(dk);o=kJ(dv);
rd_kafka_topic_partition_list_t *t_partition=
rd_kafka_topic_partition_list_new(dk->n);
for(i= 0; i < dk->n; ++i){
rd_kafka_topic_partition_list_add(t_partition, topic, p[i]);
rd_kafka_topic_partition_list_set_offset(t_partition, topic, p[i],o[i]);
}
return t_partition;
}

EXP K2(kfkFlush){
Expand Down Expand Up @@ -388,19 +386,19 @@ EXP K3(kfkSub){
return KNL;
if(!(rk= clientIndex(x)))
return KNL;
if(KFK_OK != (err = rd_kafka_subscription(rk, &t_partition)))
return krr((S)rd_kafka_err2str(err));
if(z->t == XD){
t_partition = plistoffsetdict(y->s,z);
if(!checkType("IJ", kK(z)[0], kK(z)[1]))
return KNL;
plistoffsetdict(y->s,z,t_partition);
}
else{
t_partition=
rd_kafka_topic_partition_list_new(z->n);
for(i= 0; i < z->n; ++i){
p=kI(z);
else
for(p=kI(z), i= 0; i < z->n; ++i)
rd_kafka_topic_partition_list_add(t_partition, y->s, p[i]);
}
}
if(KFK_OK != (err= rd_kafka_subscribe(rk, t_partition)))
return krr((S) rd_kafka_err2str(err));
rd_kafka_topic_partition_list_destroy(t_partition);
return knk(0);
}

Expand All @@ -420,19 +418,19 @@ EXP K1(kfkUnsub){
// https://github.com/edenhill/librdkafka/wiki/Manually-setting-the-consumer-start-offset
EXP K3(kfkAssignOffsets){
rd_kafka_t *rk;
rd_kafka_topic_partition_list_t *partitions;
rd_kafka_topic_partition_list_t *t_partition;
rd_kafka_resp_err_t err;
if(!checkType("is!", x,y,z))
return KNL;
if(!checkType("IJ",kK(z)[0],kK(z)[1]))
return KNL;
if(!(rk= clientIndex(x)))
return KNL;
partitions = plistoffsetdict(y->s,z);
err=rd_kafka_assign(rk, partitions);
if(KFK_OK != err)
t_partition = rd_kafka_topic_partition_list_new(z->n);
plistoffsetdict(y->s,z,t_partition);
if(KFK_OK != (err=rd_kafka_assign(rk,t_partition)))
return krr((S) rd_kafka_err2str(err));
rd_kafka_topic_partition_list_destroy(partitions);
rd_kafka_topic_partition_list_destroy(t_partition);
return knk(0);
}

Expand All @@ -444,8 +442,9 @@ EXP K4(kfkCommitOffsets){
if(!checkType("IJ",kK(z)[0],kK(z)[1]))
return KNL;
if(!(rk= clientIndex(x)))
return KNL;
t_partition = plistoffsetdict(y->s,z);
return KNL;
t_partition = rd_kafka_topic_partition_list_new(z->n);
plistoffsetdict(y->s,z,t_partition);
if(KFK_OK != (err= rd_kafka_commit(rk, t_partition,r->g)))
return krr((S) rd_kafka_err2str(err));
rd_kafka_topic_partition_list_destroy(t_partition);
Expand All @@ -462,7 +461,8 @@ EXP K3(kfkCommittedOffsets){
return KNL;
if(!checkType("IJ",kK(z)[0],kK(z)[1]))
return KNL;
t_partition = plistoffsetdict(y->s,z);
t_partition = rd_kafka_topic_partition_list_new(z->n);
plistoffsetdict(y->s,z,t_partition);
if(KFK_OK != (err= rd_kafka_committed(rk, t_partition,5000)))
return krr((S) rd_kafka_err2str(err));
r=decodeParList(t_partition);
Expand All @@ -480,7 +480,8 @@ EXP K3(kfkPositionOffsets){
return KNL;
if(!(rk= clientIndex(x)))
return KNL;
t_partition = plistoffsetdict(y->s,z);
t_partition = rd_kafka_topic_partition_list_new(z->n);
plistoffsetdict(y->s,z,t_partition);
if(KFK_OK != (err= rd_kafka_position(rk, t_partition)))
return krr((S) rd_kafka_err2str(err));
r=decodeParList(t_partition);
Expand All @@ -497,8 +498,7 @@ EXP K1(kfkSubscription){
return KNL;
if (!(rk = clientIndex(x)))
return KNL;
err = rd_kafka_subscription(rk, &t);
if (KFK_OK != err)
if (KFK_OK != (err= rd_kafka_subscription(rk, &t)))
return krr((S)rd_kafka_err2str(err));
r = decodeParList(t);
rd_kafka_topic_partition_list_destroy(t);
Expand Down

0 comments on commit a005f01

Please sign in to comment.