diff --git a/rdkafka.c b/rdkafka.c index d1f13a42c4..e23398ccb4 100644 --- a/rdkafka.c +++ b/rdkafka.c @@ -217,7 +217,7 @@ void rd_kafka_q_purge (rd_kafka_q_t *rkq) { } TAILQ_INIT(&rkq->rkq_q); - rd_atomic_set(&rkq->rkq_qlen, 0); + (void)rd_atomic_set(&rkq->rkq_qlen, 0); pthread_mutex_unlock(&rkq->rkq_lock); } @@ -241,14 +241,14 @@ size_t rd_kafka_q_move_cnt (rd_kafka_q_t *dstq, rd_kafka_q_t *srcq, mcnt = srcq->rkq_qlen; TAILQ_CONCAT(&dstq->rkq_q, &srcq->rkq_q, rko_link); TAILQ_INIT(&srcq->rkq_q); - rd_atomic_set(&srcq->rkq_qlen, 0); - rd_atomic_add(&dstq->rkq_qlen, mcnt); + (void)rd_atomic_set(&srcq->rkq_qlen, 0); + (void)rd_atomic_add(&dstq->rkq_qlen, mcnt); } else { while (mcnt < cnt && (rko = TAILQ_FIRST(&srcq->rkq_q))) { TAILQ_REMOVE(&srcq->rkq_q, rko, rko_link); TAILQ_INSERT_TAIL(&dstq->rkq_q, rko, rko_link); - rd_atomic_sub(&dstq->rkq_qlen, 1); - rd_atomic_add(&dstq->rkq_qlen, 1); + (void)rd_atomic_sub(&dstq->rkq_qlen, 1); + (void)rd_atomic_add(&dstq->rkq_qlen, 1); mcnt++; } } @@ -347,7 +347,7 @@ int rd_kafka_q_serve (rd_kafka_t *rk, /* Reset real queue */ TAILQ_INIT(&rkq->rkq_q); - rd_atomic_set(&rkq->rkq_qlen, 0); + (void)rd_atomic_set(&rkq->rkq_qlen, 0); pthread_mutex_unlock(&rkq->rkq_lock); rd_kafka_dbg(rk, QUEUE, "QSERVE", "Serving %i ops", localq.rkq_qlen); @@ -514,7 +514,7 @@ void rd_kafka_destroy (rd_kafka_t *rk) { rd_kafka_topic_t *rkt, *rkt_tmp; rd_kafka_dbg(rk, GENERIC, "DESTROY", "Terminating instance"); - rd_atomic_add(&rk->rk_terminate, 1); + (void)rd_atomic_add(&rk->rk_terminate, 1); /* Decommission all topics */ rd_kafka_lock(rk); @@ -742,7 +742,7 @@ static void *rd_kafka_thread_main (void *arg) { rd_ts_t last_topic_scan = rd_clock(); rd_ts_t last_stats_emit = last_topic_scan; - rd_atomic_add(&rd_kafka_thread_cnt_curr, 1); + (void)rd_atomic_add(&rd_kafka_thread_cnt_curr, 1); while (likely(rk->rk_terminate == 0)) { rd_ts_t now = rd_clock(); @@ -764,7 +764,7 @@ static void *rd_kafka_thread_main (void *arg) { rd_kafka_destroy0(rk); /* destroy handler thread's refcnt */ - rd_atomic_sub(&rd_kafka_thread_cnt_curr, 1); + (void)rd_atomic_sub(&rd_kafka_thread_cnt_curr, 1); return NULL; } @@ -1025,7 +1025,7 @@ ssize_t rd_kafka_consume_batch (rd_kafka_topic_t *rkt, int32_t partition, } TAILQ_REMOVE(&rktp->rktp_fetchq.rkq_q, rko, rko_link); - rd_atomic_sub(&rktp->rktp_fetchq.rkq_qlen, 1); + (void)rd_atomic_sub(&rktp->rktp_fetchq.rkq_qlen, 1); pthread_mutex_unlock(&rktp->rktp_fetchq.rkq_lock); diff --git a/rdkafka_broker.c b/rdkafka_broker.c index d8ac729e81..5278cdfa22 100644 --- a/rdkafka_broker.c +++ b/rdkafka_broker.c @@ -209,13 +209,13 @@ static rd_kafka_buf_t *rd_kafka_buf_new_shadow (void *ptr, size_t size) { static void rd_kafka_bufq_enq (rd_kafka_bufq_t *rkbufq, rd_kafka_buf_t *rkbuf) { TAILQ_INSERT_TAIL(&rkbufq->rkbq_bufs, rkbuf, rkbuf_link); - rd_atomic_add(&rkbufq->rkbq_cnt, 1); + (void)rd_atomic_add(&rkbufq->rkbq_cnt, 1); } static void rd_kafka_bufq_deq (rd_kafka_bufq_t *rkbufq, rd_kafka_buf_t *rkbuf) { TAILQ_REMOVE(&rkbufq->rkbq_bufs, rkbuf, rkbuf_link); assert(rkbufq->rkbq_cnt > 0); - rd_atomic_sub(&rkbufq->rkbq_cnt, 1); + (void)rd_atomic_sub(&rkbufq->rkbq_cnt, 1); } static void rd_kafka_bufq_init (rd_kafka_bufq_t *rkbufq) { @@ -228,7 +228,7 @@ static void rd_kafka_bufq_init (rd_kafka_bufq_t *rkbufq) { */ static void rd_kafka_bufq_concat (rd_kafka_bufq_t *dst, rd_kafka_bufq_t *src) { TAILQ_CONCAT(&dst->rkbq_bufs, &src->rkbq_bufs, rkbuf_link); - rd_atomic_add(&dst->rkbq_cnt, src->rkbq_cnt); + (void)rd_atomic_add(&dst->rkbq_cnt, src->rkbq_cnt); rd_kafka_bufq_init(src); } @@ -408,8 +408,8 @@ static ssize_t rd_kafka_broker_send (rd_kafka_broker_t *rkb, return -1; } - rd_atomic_add(&rkb->rkb_c.tx_bytes, r); - rd_atomic_add(&rkb->rkb_c.tx, 1); + (void)rd_atomic_add(&rkb->rkb_c.tx_bytes, r); + (void)rd_atomic_add(&rkb->rkb_c.tx, 1); return r; } @@ -482,7 +482,7 @@ static void rd_kafka_broker_buf_enq0 (rd_kafka_broker_t *rkb, rkbuf, rkbuf_link); } - rd_atomic_add(&rkb->rkb_outbufs.rkbq_cnt, 1); + (void)rd_atomic_add(&rkb->rkb_outbufs.rkbq_cnt, 1); } @@ -1270,8 +1270,8 @@ static int rd_kafka_recv (rd_kafka_broker_t *rkb) { if (rkbuf->rkbuf_of == rkbuf->rkbuf_len + sizeof(rkbuf->rkbuf_reshdr)) { /* Message is complete, pass it on to the original requester. */ rkb->rkb_recv_buf = NULL; - rd_atomic_add(&rkb->rkb_c.rx, 1); - rd_atomic_add(&rkb->rkb_c.rx_bytes, rkbuf->rkbuf_of); + (void)rd_atomic_add(&rkb->rkb_c.rx, 1); + (void)rd_atomic_add(&rkb->rkb_c.rx_bytes, rkbuf->rkbuf_of); rd_kafka_req_response(rkb, rkbuf); } @@ -2031,8 +2031,8 @@ static int rd_kafka_broker_produce_toppar (rd_kafka_broker_t *rkb, do_send: - rd_atomic_add(&rktp->rktp_c.tx_msgs, rkbuf->rkbuf_msgq.rkmq_msg_cnt); - rd_atomic_add(&rktp->rktp_c.tx_bytes, prodhdr->part2.MessageSetSize); + (void)rd_atomic_add(&rktp->rktp_c.tx_msgs, rkbuf->rkbuf_msgq.rkmq_msg_cnt); + (void)rd_atomic_add(&rktp->rktp_c.tx_bytes, prodhdr->part2.MessageSetSize); prodhdr->part2.MessageSetSize = htonl(prodhdr->part2.MessageSetSize); @@ -3203,7 +3203,7 @@ static void *rd_kafka_broker_thread_main (void *arg) { rd_kafka_broker_t *rkb = arg; rd_kafka_t *rk = rkb->rkb_rk; - rd_atomic_add(&rd_kafka_thread_cnt_curr, 1); + (void)rd_atomic_add(&rd_kafka_thread_cnt_curr, 1); rd_thread_sigmask(SIG_BLOCK, SIGHUP, SIGINT, SIGTERM, SIGUSR1, SIGUSR2, @@ -3249,7 +3249,7 @@ static void *rd_kafka_broker_thread_main (void *arg) { rd_kafka_broker_fail(rkb, RD_KAFKA_RESP_ERR__DESTROY, NULL); rd_kafka_broker_destroy(rkb); - rd_atomic_sub(&rd_kafka_thread_cnt_curr, 1); + (void)rd_atomic_sub(&rd_kafka_thread_cnt_curr, 1); return NULL; } diff --git a/rdkafka_broker.h b/rdkafka_broker.h index 8bf10a97a5..6b497ce45e 100644 --- a/rdkafka_broker.h +++ b/rdkafka_broker.h @@ -30,7 +30,7 @@ extern const char *rd_kafka_broker_state_names[]; -#define rd_kafka_buf_keep(rkbuf) rd_atomic_add(&(rkbuf)->rkbuf_refcnt, 1) +#define rd_kafka_buf_keep(rkbuf) (void)rd_atomic_add(&(rkbuf)->rkbuf_refcnt, 1) void rd_kafka_buf_destroy (rd_kafka_buf_t *rkbuf); rd_kafka_broker_t *rd_kafka_broker_find_by_nodeid (rd_kafka_t *rk, diff --git a/rdkafka_int.h b/rdkafka_int.h index 45c74a2c37..178ad66309 100644 --- a/rdkafka_int.h +++ b/rdkafka_int.h @@ -41,6 +41,8 @@ #include "rdlog.h" +#include "rdsysqueue.h" + #define RD_POLL_INFINITE -1 #define RD_POLL_NOWAIT 0 @@ -453,7 +455,7 @@ typedef struct rd_kafka_broker_s { } rd_kafka_broker_t; -#define rd_kafka_broker_keep(rkb) rd_atomic_add(&(rkb)->rkb_refcnt, 1) +#define rd_kafka_broker_keep(rkb) (void)rd_atomic_add(&(rkb)->rkb_refcnt, 1) #define rd_kafka_broker_lock(rkb) pthread_mutex_lock(&(rkb)->rkb_lock) #define rd_kafka_broker_unlock(rkb) pthread_mutex_unlock(&(rkb)->rkb_lock) @@ -534,7 +536,7 @@ typedef struct rd_kafka_toppar_s { } rktp_c; } rd_kafka_toppar_t; -#define rd_kafka_toppar_keep(rktp) rd_atomic_add(&(rktp)->rktp_refcnt, 1) +#define rd_kafka_toppar_keep(rktp) (void)rd_atomic_add(&(rktp)->rktp_refcnt, 1) #define rd_kafka_toppar_destroy(rktp) do { \ if (rd_atomic_sub(&(rktp)->rktp_refcnt, 1) == 0) \ rd_kafka_toppar_destroy0(rktp); \ @@ -706,7 +708,7 @@ void rd_kafka_op_reply (rd_kafka_t *rk, rd_kafka_resp_err_t err, void *payload, int len); -#define rd_kafka_keep(rk) rd_atomic_add(&(rk)->rk_refcnt, 1) +#define rd_kafka_keep(rk) (void)rd_atomic_add(&(rk)->rk_refcnt, 1) void rd_kafka_destroy0 (rd_kafka_t *rk); void rd_kafka_conf_destroy (rd_kafka_conf_t *conf); diff --git a/rdkafka_msg.c b/rdkafka_msg.c index 40bef95462..7f01313a97 100644 --- a/rdkafka_msg.c +++ b/rdkafka_msg.c @@ -38,7 +38,7 @@ void rd_kafka_msg_destroy (rd_kafka_t *rk, rd_kafka_msg_t *rkm) { assert(rk->rk_producer.msg_cnt > 0); - rd_atomic_sub(&rk->rk_producer.msg_cnt, 1); + (void)rd_atomic_sub(&rk->rk_producer.msg_cnt, 1); if (rkm->rkm_flags & RD_KAFKA_MSG_F_FREE) free(rkm->rkm_payload); @@ -73,7 +73,7 @@ int rd_kafka_msg_new (rd_kafka_topic_t *rkt, int32_t force_partition, if (unlikely(rd_atomic_add(&rkt->rkt_rk->rk_producer.msg_cnt, 1) > rkt->rkt_rk->rk_conf.queue_buffering_max_msgs)) { - rd_atomic_sub(&rkt->rkt_rk->rk_producer.msg_cnt, 1); + (void)rd_atomic_sub(&rkt->rkt_rk->rk_producer.msg_cnt, 1); errno = ENOBUFS; return -1; } diff --git a/rdkafka_msg.h b/rdkafka_msg.h index 325733e9d4..edcee6c654 100644 --- a/rdkafka_msg.h +++ b/rdkafka_msg.h @@ -52,8 +52,8 @@ static inline RD_UNUSED void rd_kafka_msgq_init (rd_kafka_msgq_t *rkmq) { static inline RD_UNUSED void rd_kafka_msgq_concat (rd_kafka_msgq_t *dst, rd_kafka_msgq_t *src) { TAILQ_CONCAT(&dst->rkmq_msgs, &src->rkmq_msgs, rkm_link); - rd_atomic_add(&dst->rkmq_msg_cnt, src->rkmq_msg_cnt); - rd_atomic_add(&dst->rkmq_msg_bytes, src->rkmq_msg_bytes); + (void)rd_atomic_add(&dst->rkmq_msg_cnt, src->rkmq_msg_cnt); + (void)rd_atomic_add(&dst->rkmq_msg_bytes, src->rkmq_msg_bytes); rd_kafka_msgq_init(src); } @@ -99,8 +99,8 @@ rd_kafka_msg_t *rd_kafka_msgq_deq (rd_kafka_msgq_t *rkmq, if (likely(do_count)) { assert(rkmq->rkmq_msg_cnt > 0); assert(rkmq->rkmq_msg_bytes - rkm->rkm_len >= 0); - rd_atomic_sub(&rkmq->rkmq_msg_cnt, 1); - rd_atomic_sub(&rkmq->rkmq_msg_bytes, rkm->rkm_len); + (void)rd_atomic_sub(&rkmq->rkmq_msg_cnt, 1); + (void)rd_atomic_sub(&rkmq->rkmq_msg_bytes, rkm->rkm_len); } TAILQ_REMOVE(&rkmq->rkmq_msgs, rkm, rkm_link); @@ -124,8 +124,8 @@ rd_kafka_msg_t *rd_kafka_msgq_pop (rd_kafka_msgq_t *rkmq) { static inline RD_UNUSED void rd_kafka_msgq_insert (rd_kafka_msgq_t *rkmq, rd_kafka_msg_t *rkm) { TAILQ_INSERT_HEAD(&rkmq->rkmq_msgs, rkm, rkm_link); - rd_atomic_add(&rkmq->rkmq_msg_cnt, 1); - rd_atomic_add(&rkmq->rkmq_msg_bytes, rkm->rkm_len); + (void)rd_atomic_add(&rkmq->rkmq_msg_cnt, 1); + (void)rd_atomic_add(&rkmq->rkmq_msg_bytes, rkm->rkm_len); } /** @@ -134,8 +134,8 @@ static inline RD_UNUSED void rd_kafka_msgq_insert (rd_kafka_msgq_t *rkmq, static inline RD_UNUSED void rd_kafka_msgq_enq (rd_kafka_msgq_t *rkmq, rd_kafka_msg_t *rkm) { TAILQ_INSERT_TAIL(&rkmq->rkmq_msgs, rkm, rkm_link); - rd_atomic_add(&rkmq->rkmq_msg_cnt, 1); - rd_atomic_add(&rkmq->rkmq_msg_bytes, rkm->rkm_len); + (void)rd_atomic_add(&rkmq->rkmq_msg_cnt, 1); + (void)rd_atomic_add(&rkmq->rkmq_msg_bytes, rkm->rkm_len); } diff --git a/rdkafka_topic.h b/rdkafka_topic.h index 68892b62aa..e58f066ea4 100644 --- a/rdkafka_topic.h +++ b/rdkafka_topic.h @@ -37,7 +37,7 @@ void rd_kafka_toppar_deq_msg (rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm); void rd_kafka_toppar_insert_msgq (rd_kafka_toppar_t *rktp, rd_kafka_msgq_t *rkmq); -#define rd_kafka_topic_keep(rkt) rd_atomic_add(&(rkt->rkt_refcnt), 1) +#define rd_kafka_topic_keep(rkt) (void)rd_atomic_add(&(rkt->rkt_refcnt), 1) void rd_kafka_topic_destroy0 (rd_kafka_topic_t *rkt); rd_kafka_toppar_t *rd_kafka_toppar_get (const rd_kafka_topic_t *rkt, diff --git a/snappy_compat.h b/snappy_compat.h index 71f2275cd6..e3c511bb5b 100644 --- a/snappy_compat.h +++ b/snappy_compat.h @@ -24,6 +24,16 @@ #define UNALIGNED64_REALLYS_SLOW 1 #endif +#ifndef htole16 +# if __BYTE_ORDER == __LITTLE_ENDIAN +# define htole16(x) (x) +# define le32toh(x) (x) +# else +# define htole16(x) __bswap_16 (x) +# define le32toh(x) __bswap_32 (x) +#endif +#endif + typedef unsigned char u8; typedef unsigned short u16; typedef unsigned u32;