Java是一種廣泛使用的編程語言,用于創建各種類型的應用程序。其中,Kafka是一個流行的開源消息隊列,它使用發布訂閱模式來傳遞消息。Json是一種用于數據交換的輕量級格式,廣泛運用于Web開發和API交互中。
使用Java與Kafka進行消息傳遞時,Json格式的數據常常被使用。下面的代碼示例展示如何使用Java向Kafka發送Json數據:
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producerproducer = new KafkaProducer<>(props); JsonObject jsonObject = new JsonObject(); jsonObject.addProperty("id", 1); jsonObject.addProperty("name", "John"); jsonObject.addProperty("age", 25); String jsonString = jsonObject.toString(); ProducerRecord record = new ProducerRecord<>("test_topic", "key", jsonString); producer.send(record); producer.close();
首先,我們使用了KafkaProducer類創建了一個生產者實例。然后,我們使用JsonObject類創建了一個Json對象,并添加了id、name和age三個屬性。最后,我們通過ProducerRecord類創建了一個包含Key、Value和Topic信息的數據記錄,并將其發送到Kafka。
接收消息時,我們也可以使用Json格式的數據。以下示例演示如何在Java中解析從Kafka接收到的Json數據:
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test_group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); Consumerconsumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("test_topic")); while (true) { ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord record : records) { String jsonString = record.value(); JsonObject jsonObject = new JsonParser().parse(jsonString).getAsJsonObject(); int id = jsonObject.get("id").getAsInt(); String name = jsonObject.get("name").getAsString(); int age = jsonObject.get("age").getAsInt(); System.out.println("id: " + id + ", name: " + name + ", age: " + age); } }
首先,我們創建了一個消費者實例,并訂閱了test_topic主題。在接收到消息時,我們可以使用ConsumerRecord類來解析數據,并使用JsonParser和JsonObject來將Json字符串轉換為Json對象。最后,我們可以通過JsonObject的get()方法來獲取Json對象中的屬性值,并將其用于后續的操作。
上一篇html環球圓形代碼