Skip to content

Commit

Permalink
Merge pull request #57 from stromnet/master
Browse files Browse the repository at this point in the history
RHEL (CentOS) 5 compilation fixes
  • Loading branch information
edenhill committed Jan 9, 2014
2 parents 730b155 + b5b0b72 commit 80a9d7a
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 37 deletions.
20 changes: 10 additions & 10 deletions rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ void rd_kafka_q_purge (rd_kafka_q_t *rkq) {
}

TAILQ_INIT(&rkq->rkq_q);
rd_atomic_set(&rkq->rkq_qlen, 0);
(void)rd_atomic_set(&rkq->rkq_qlen, 0);

pthread_mutex_unlock(&rkq->rkq_lock);
}
Expand All @@ -241,14 +241,14 @@ size_t rd_kafka_q_move_cnt (rd_kafka_q_t *dstq, rd_kafka_q_t *srcq,
mcnt = srcq->rkq_qlen;
TAILQ_CONCAT(&dstq->rkq_q, &srcq->rkq_q, rko_link);
TAILQ_INIT(&srcq->rkq_q);
rd_atomic_set(&srcq->rkq_qlen, 0);
rd_atomic_add(&dstq->rkq_qlen, mcnt);
(void)rd_atomic_set(&srcq->rkq_qlen, 0);
(void)rd_atomic_add(&dstq->rkq_qlen, mcnt);
} else {
while (mcnt < cnt && (rko = TAILQ_FIRST(&srcq->rkq_q))) {
TAILQ_REMOVE(&srcq->rkq_q, rko, rko_link);
TAILQ_INSERT_TAIL(&dstq->rkq_q, rko, rko_link);
rd_atomic_sub(&dstq->rkq_qlen, 1);
rd_atomic_add(&dstq->rkq_qlen, 1);
(void)rd_atomic_sub(&dstq->rkq_qlen, 1);
(void)rd_atomic_add(&dstq->rkq_qlen, 1);
mcnt++;
}
}
Expand Down Expand Up @@ -347,7 +347,7 @@ int rd_kafka_q_serve (rd_kafka_t *rk,

/* Reset real queue */
TAILQ_INIT(&rkq->rkq_q);
rd_atomic_set(&rkq->rkq_qlen, 0);
(void)rd_atomic_set(&rkq->rkq_qlen, 0);
pthread_mutex_unlock(&rkq->rkq_lock);

rd_kafka_dbg(rk, QUEUE, "QSERVE", "Serving %i ops", localq.rkq_qlen);
Expand Down Expand Up @@ -514,7 +514,7 @@ void rd_kafka_destroy (rd_kafka_t *rk) {
rd_kafka_topic_t *rkt, *rkt_tmp;

rd_kafka_dbg(rk, GENERIC, "DESTROY", "Terminating instance");
rd_atomic_add(&rk->rk_terminate, 1);
(void)rd_atomic_add(&rk->rk_terminate, 1);

/* Decommission all topics */
rd_kafka_lock(rk);
Expand Down Expand Up @@ -742,7 +742,7 @@ static void *rd_kafka_thread_main (void *arg) {
rd_ts_t last_topic_scan = rd_clock();
rd_ts_t last_stats_emit = last_topic_scan;

rd_atomic_add(&rd_kafka_thread_cnt_curr, 1);
(void)rd_atomic_add(&rd_kafka_thread_cnt_curr, 1);

while (likely(rk->rk_terminate == 0)) {
rd_ts_t now = rd_clock();
Expand All @@ -764,7 +764,7 @@ static void *rd_kafka_thread_main (void *arg) {

rd_kafka_destroy0(rk); /* destroy handler thread's refcnt */

rd_atomic_sub(&rd_kafka_thread_cnt_curr, 1);
(void)rd_atomic_sub(&rd_kafka_thread_cnt_curr, 1);

return NULL;
}
Expand Down Expand Up @@ -1025,7 +1025,7 @@ ssize_t rd_kafka_consume_batch (rd_kafka_topic_t *rkt, int32_t partition,
}

TAILQ_REMOVE(&rktp->rktp_fetchq.rkq_q, rko, rko_link);
rd_atomic_sub(&rktp->rktp_fetchq.rkq_qlen, 1);
(void)rd_atomic_sub(&rktp->rktp_fetchq.rkq_qlen, 1);

pthread_mutex_unlock(&rktp->rktp_fetchq.rkq_lock);

Expand Down
24 changes: 12 additions & 12 deletions rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,13 @@ static rd_kafka_buf_t *rd_kafka_buf_new_shadow (void *ptr, size_t size) {

static void rd_kafka_bufq_enq (rd_kafka_bufq_t *rkbufq, rd_kafka_buf_t *rkbuf) {
TAILQ_INSERT_TAIL(&rkbufq->rkbq_bufs, rkbuf, rkbuf_link);
rd_atomic_add(&rkbufq->rkbq_cnt, 1);
(void)rd_atomic_add(&rkbufq->rkbq_cnt, 1);
}

static void rd_kafka_bufq_deq (rd_kafka_bufq_t *rkbufq, rd_kafka_buf_t *rkbuf) {
TAILQ_REMOVE(&rkbufq->rkbq_bufs, rkbuf, rkbuf_link);
assert(rkbufq->rkbq_cnt > 0);
rd_atomic_sub(&rkbufq->rkbq_cnt, 1);
(void)rd_atomic_sub(&rkbufq->rkbq_cnt, 1);
}

static void rd_kafka_bufq_init (rd_kafka_bufq_t *rkbufq) {
Expand All @@ -228,7 +228,7 @@ static void rd_kafka_bufq_init (rd_kafka_bufq_t *rkbufq) {
*/
static void rd_kafka_bufq_concat (rd_kafka_bufq_t *dst, rd_kafka_bufq_t *src) {
TAILQ_CONCAT(&dst->rkbq_bufs, &src->rkbq_bufs, rkbuf_link);
rd_atomic_add(&dst->rkbq_cnt, src->rkbq_cnt);
(void)rd_atomic_add(&dst->rkbq_cnt, src->rkbq_cnt);
rd_kafka_bufq_init(src);
}

Expand Down Expand Up @@ -408,8 +408,8 @@ static ssize_t rd_kafka_broker_send (rd_kafka_broker_t *rkb,
return -1;
}

rd_atomic_add(&rkb->rkb_c.tx_bytes, r);
rd_atomic_add(&rkb->rkb_c.tx, 1);
(void)rd_atomic_add(&rkb->rkb_c.tx_bytes, r);
(void)rd_atomic_add(&rkb->rkb_c.tx, 1);
return r;
}

Expand Down Expand Up @@ -482,7 +482,7 @@ static void rd_kafka_broker_buf_enq0 (rd_kafka_broker_t *rkb,
rkbuf, rkbuf_link);
}

rd_atomic_add(&rkb->rkb_outbufs.rkbq_cnt, 1);
(void)rd_atomic_add(&rkb->rkb_outbufs.rkbq_cnt, 1);
}


Expand Down Expand Up @@ -1270,8 +1270,8 @@ static int rd_kafka_recv (rd_kafka_broker_t *rkb) {
if (rkbuf->rkbuf_of == rkbuf->rkbuf_len + sizeof(rkbuf->rkbuf_reshdr)) {
/* Message is complete, pass it on to the original requester. */
rkb->rkb_recv_buf = NULL;
rd_atomic_add(&rkb->rkb_c.rx, 1);
rd_atomic_add(&rkb->rkb_c.rx_bytes, rkbuf->rkbuf_of);
(void)rd_atomic_add(&rkb->rkb_c.rx, 1);
(void)rd_atomic_add(&rkb->rkb_c.rx_bytes, rkbuf->rkbuf_of);
rd_kafka_req_response(rkb, rkbuf);
}

Expand Down Expand Up @@ -2031,8 +2031,8 @@ static int rd_kafka_broker_produce_toppar (rd_kafka_broker_t *rkb,

do_send:

rd_atomic_add(&rktp->rktp_c.tx_msgs, rkbuf->rkbuf_msgq.rkmq_msg_cnt);
rd_atomic_add(&rktp->rktp_c.tx_bytes, prodhdr->part2.MessageSetSize);
(void)rd_atomic_add(&rktp->rktp_c.tx_msgs, rkbuf->rkbuf_msgq.rkmq_msg_cnt);
(void)rd_atomic_add(&rktp->rktp_c.tx_bytes, prodhdr->part2.MessageSetSize);

prodhdr->part2.MessageSetSize =
htonl(prodhdr->part2.MessageSetSize);
Expand Down Expand Up @@ -3203,7 +3203,7 @@ static void *rd_kafka_broker_thread_main (void *arg) {
rd_kafka_broker_t *rkb = arg;
rd_kafka_t *rk = rkb->rkb_rk;

rd_atomic_add(&rd_kafka_thread_cnt_curr, 1);
(void)rd_atomic_add(&rd_kafka_thread_cnt_curr, 1);

rd_thread_sigmask(SIG_BLOCK,
SIGHUP, SIGINT, SIGTERM, SIGUSR1, SIGUSR2,
Expand Down Expand Up @@ -3249,7 +3249,7 @@ static void *rd_kafka_broker_thread_main (void *arg) {
rd_kafka_broker_fail(rkb, RD_KAFKA_RESP_ERR__DESTROY, NULL);
rd_kafka_broker_destroy(rkb);

rd_atomic_sub(&rd_kafka_thread_cnt_curr, 1);
(void)rd_atomic_sub(&rd_kafka_thread_cnt_curr, 1);

return NULL;
}
Expand Down
2 changes: 1 addition & 1 deletion rdkafka_broker.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

extern const char *rd_kafka_broker_state_names[];

#define rd_kafka_buf_keep(rkbuf) rd_atomic_add(&(rkbuf)->rkbuf_refcnt, 1)
#define rd_kafka_buf_keep(rkbuf) (void)rd_atomic_add(&(rkbuf)->rkbuf_refcnt, 1)
void rd_kafka_buf_destroy (rd_kafka_buf_t *rkbuf);

rd_kafka_broker_t *rd_kafka_broker_find_by_nodeid (rd_kafka_t *rk,
Expand Down
8 changes: 5 additions & 3 deletions rdkafka_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
#include "rdlog.h"


#include "rdsysqueue.h"

#define RD_POLL_INFINITE -1
#define RD_POLL_NOWAIT 0

Expand Down Expand Up @@ -453,7 +455,7 @@ typedef struct rd_kafka_broker_s {

} rd_kafka_broker_t;

#define rd_kafka_broker_keep(rkb) rd_atomic_add(&(rkb)->rkb_refcnt, 1)
#define rd_kafka_broker_keep(rkb) (void)rd_atomic_add(&(rkb)->rkb_refcnt, 1)
#define rd_kafka_broker_lock(rkb) pthread_mutex_lock(&(rkb)->rkb_lock)
#define rd_kafka_broker_unlock(rkb) pthread_mutex_unlock(&(rkb)->rkb_lock)

Expand Down Expand Up @@ -534,7 +536,7 @@ typedef struct rd_kafka_toppar_s {
} rktp_c;
} rd_kafka_toppar_t;

#define rd_kafka_toppar_keep(rktp) rd_atomic_add(&(rktp)->rktp_refcnt, 1)
#define rd_kafka_toppar_keep(rktp) (void)rd_atomic_add(&(rktp)->rktp_refcnt, 1)
#define rd_kafka_toppar_destroy(rktp) do { \
if (rd_atomic_sub(&(rktp)->rktp_refcnt, 1) == 0) \
rd_kafka_toppar_destroy0(rktp); \
Expand Down Expand Up @@ -706,7 +708,7 @@ void rd_kafka_op_reply (rd_kafka_t *rk,
rd_kafka_resp_err_t err,
void *payload, int len);

#define rd_kafka_keep(rk) rd_atomic_add(&(rk)->rk_refcnt, 1)
#define rd_kafka_keep(rk) (void)rd_atomic_add(&(rk)->rk_refcnt, 1)
void rd_kafka_destroy0 (rd_kafka_t *rk);

void rd_kafka_conf_destroy (rd_kafka_conf_t *conf);
Expand Down
4 changes: 2 additions & 2 deletions rdkafka_msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
void rd_kafka_msg_destroy (rd_kafka_t *rk, rd_kafka_msg_t *rkm) {

assert(rk->rk_producer.msg_cnt > 0);
rd_atomic_sub(&rk->rk_producer.msg_cnt, 1);
(void)rd_atomic_sub(&rk->rk_producer.msg_cnt, 1);

if (rkm->rkm_flags & RD_KAFKA_MSG_F_FREE)
free(rkm->rkm_payload);
Expand Down Expand Up @@ -73,7 +73,7 @@ int rd_kafka_msg_new (rd_kafka_topic_t *rkt, int32_t force_partition,

if (unlikely(rd_atomic_add(&rkt->rkt_rk->rk_producer.msg_cnt, 1) >
rkt->rkt_rk->rk_conf.queue_buffering_max_msgs)) {
rd_atomic_sub(&rkt->rkt_rk->rk_producer.msg_cnt, 1);
(void)rd_atomic_sub(&rkt->rkt_rk->rk_producer.msg_cnt, 1);
errno = ENOBUFS;
return -1;
}
Expand Down
16 changes: 8 additions & 8 deletions rdkafka_msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ static inline RD_UNUSED void rd_kafka_msgq_init (rd_kafka_msgq_t *rkmq) {
static inline RD_UNUSED void rd_kafka_msgq_concat (rd_kafka_msgq_t *dst,
rd_kafka_msgq_t *src) {
TAILQ_CONCAT(&dst->rkmq_msgs, &src->rkmq_msgs, rkm_link);
rd_atomic_add(&dst->rkmq_msg_cnt, src->rkmq_msg_cnt);
rd_atomic_add(&dst->rkmq_msg_bytes, src->rkmq_msg_bytes);
(void)rd_atomic_add(&dst->rkmq_msg_cnt, src->rkmq_msg_cnt);
(void)rd_atomic_add(&dst->rkmq_msg_bytes, src->rkmq_msg_bytes);
rd_kafka_msgq_init(src);
}

Expand Down Expand Up @@ -99,8 +99,8 @@ rd_kafka_msg_t *rd_kafka_msgq_deq (rd_kafka_msgq_t *rkmq,
if (likely(do_count)) {
assert(rkmq->rkmq_msg_cnt > 0);
assert(rkmq->rkmq_msg_bytes - rkm->rkm_len >= 0);
rd_atomic_sub(&rkmq->rkmq_msg_cnt, 1);
rd_atomic_sub(&rkmq->rkmq_msg_bytes, rkm->rkm_len);
(void)rd_atomic_sub(&rkmq->rkmq_msg_cnt, 1);
(void)rd_atomic_sub(&rkmq->rkmq_msg_bytes, rkm->rkm_len);
}

TAILQ_REMOVE(&rkmq->rkmq_msgs, rkm, rkm_link);
Expand All @@ -124,8 +124,8 @@ rd_kafka_msg_t *rd_kafka_msgq_pop (rd_kafka_msgq_t *rkmq) {
static inline RD_UNUSED void rd_kafka_msgq_insert (rd_kafka_msgq_t *rkmq,
rd_kafka_msg_t *rkm) {
TAILQ_INSERT_HEAD(&rkmq->rkmq_msgs, rkm, rkm_link);
rd_atomic_add(&rkmq->rkmq_msg_cnt, 1);
rd_atomic_add(&rkmq->rkmq_msg_bytes, rkm->rkm_len);
(void)rd_atomic_add(&rkmq->rkmq_msg_cnt, 1);
(void)rd_atomic_add(&rkmq->rkmq_msg_bytes, rkm->rkm_len);
}

/**
Expand All @@ -134,8 +134,8 @@ static inline RD_UNUSED void rd_kafka_msgq_insert (rd_kafka_msgq_t *rkmq,
static inline RD_UNUSED void rd_kafka_msgq_enq (rd_kafka_msgq_t *rkmq,
rd_kafka_msg_t *rkm) {
TAILQ_INSERT_TAIL(&rkmq->rkmq_msgs, rkm, rkm_link);
rd_atomic_add(&rkmq->rkmq_msg_cnt, 1);
rd_atomic_add(&rkmq->rkmq_msg_bytes, rkm->rkm_len);
(void)rd_atomic_add(&rkmq->rkmq_msg_cnt, 1);
(void)rd_atomic_add(&rkmq->rkmq_msg_bytes, rkm->rkm_len);
}


Expand Down
2 changes: 1 addition & 1 deletion rdkafka_topic.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ void rd_kafka_toppar_deq_msg (rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm);
void rd_kafka_toppar_insert_msgq (rd_kafka_toppar_t *rktp,
rd_kafka_msgq_t *rkmq);

#define rd_kafka_topic_keep(rkt) rd_atomic_add(&(rkt->rkt_refcnt), 1)
#define rd_kafka_topic_keep(rkt) (void)rd_atomic_add(&(rkt->rkt_refcnt), 1)
void rd_kafka_topic_destroy0 (rd_kafka_topic_t *rkt);

rd_kafka_toppar_t *rd_kafka_toppar_get (const rd_kafka_topic_t *rkt,
Expand Down
10 changes: 10 additions & 0 deletions snappy_compat.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@
#define UNALIGNED64_REALLYS_SLOW 1
#endif

#ifndef htole16
# if __BYTE_ORDER == __LITTLE_ENDIAN
# define htole16(x) (x)
# define le32toh(x) (x)
# else
# define htole16(x) __bswap_16 (x)
# define le32toh(x) __bswap_32 (x)
#endif
#endif

typedef unsigned char u8;
typedef unsigned short u16;
typedef unsigned u32;
Expand Down

0 comments on commit 80a9d7a

Please sign in to comment.