Apache Kafka是一個分布式消息系統(tǒng),能夠處理高吞吐量的流式數(shù)據(jù)。而在C語言中,使用librdkafka庫可輕松地向Kafka發(fā)送JSON格式的數(shù)據(jù)。
首先需要安裝librdkafka庫。下面是如何在Linux環(huán)境下安裝:
git clone https://github.com/edenhill/librdkafka.git ./configure make sudo make install
下一步是在C代碼中引用librdkafka庫:
#include <librdkafka/rdkafka.h>
接下來是設置Kafka的配置:
rd_kafka_t *rk; rd_kafka_conf_t *conf; char errstr[512]; conf = rd_kafka_conf_new(); /* 設置Kafka的broker地址 */ if (rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { fprintf(stderr, "Failed to set broker address: %s", errstr); exit(1); } /* 創(chuàng)建Kafka Producer實例 */ rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); if (!rk) { fprintf(stderr, "Failed to create Producer: %s", errstr); exit(1); } rd_kafka_set_logger(rk, NULL); //關閉日志輸出 /* 將消息發(fā)送到名為“test”的主題 */ char* topic = "test"; int partition = RD_KAFKA_PARTITION_UA; int msgflags = 0; char* payload = "{\"name\": \"John\", \"age\": 30, \"isStudent\": true}"; size_t len = strlen(payload); rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, topic, NULL); rd_kafka_produce(rkt, partition, msgflags, payload, len, NULL, 0, NULL); rd_kafka_topic_destroy(rkt); rd_kafka_flush(rk, 5000);
上述代碼中的payload就是要發(fā)送的JSON數(shù)據(jù)。需要注意的是,rd_kafka_flush函數(shù)調用后,才能確保消息被發(fā)送到Kafka。
這樣,就可以輕松地使用C語言向Kafka發(fā)送JSON數(shù)據(jù)了。