本文概览:介绍了使用java client api来发送消息和消费消息。
1 Maven配置
1 2 3 4 5 6 7 |
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.0.0</version> </dependency> </dependencies> |
2 生产者
2.1 代码
生产者创建和生产消息代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class ProducerDemo { public void produceMessage(String topic,String message) { // 创建Producer Producer producer = buildProducer(); // 发送消息 producer.send(new ProducerRecord<String, String>(topic, message)); } private Producer buildProducer(){ // create instance for properties to access producer configs Properties props = new Properties(); // bootstrap.servers是Kafka集群的IP地址。多个时,使用逗号隔开 props.put("bootstrap.servers", "localhost:9092"); //Set acknowledgements for producer requests. props.put("acks", "all"); //If the request fails, the producer can automatically retry, props.put("retries", 0); //Specify buffer size in config props.put("batch.size", 16384); //Reduce the no of requests less than 0 props.put("linger.ms", 1); //The buffer.memory controls the total amount of memory available to the producer for buffering. props.put("buffer.memory", 33554432); // Kafka消息是以键值对的形式发送,需要设置key和value类型序列化器 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<String, String>(props); return producer; } } |
3 消费者
3.1 代码
创建消费者代码和获取消息的逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; public class ConsumerDemo { public void consumeMessage(String topic) { // 1.构建KafkaCustomer Consumer consumer = buildCustomer(); // 2.设置主题 consumer.subscribe(Arrays.asList(topic)); // 3.接受消息 while (true) { ConsumerRecords<String, String> records = consumer.poll(500); System.out.println("customer Message---"); for (ConsumerRecord<String, String> record : records) // print the offset,key and value for the consumer records. System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); } } public Consumer buildCustomer() { Properties props = new Properties(); // bootstrap.servers是Kafka集群的IP地址。多个时,使用逗号隔开 props.put("bootstrap.servers", "localhost:9092"); // 消费者群组 props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer <String, String>(props); return consumer; } } |
4 测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
public class main { public static void main(String[] args) { // 1.生产者 final ProducerDemo producerDemo = new ProducerDemo(); Thread producerThread = new Thread(new Runnable() { public void run() { for (int i = 1; i < 10; i++) { producerDemo.produceMessage("test", "message-" + i); } } }); producerThread.start(); // 2.消费者 final ConsumerDemo consumerDemo = new ConsumerDemo(); Thread consumerThread = new Thread(new Runnable() { public void run() { consumerDemo.customerMessage("test"); } }); consumerThread.start(); } } |
执行结果为:
1 2 3 4 5 6 7 |
customer Message--- offset = 26, key = null, value = message-1 offset = 27, key = null, value = message-2 customer Message--- offset = 28, key = null, value = message-3 customer Message--- offset = 29, key = null, value = message-4 |
5 参考资料
1 https://www.tutorialspoint.com/apache_kafka/apache_kafka_simple_producer_example.htm