在Springboot中使用kafka通常有两种式,一种是直接使用kafka-client, 自己做简单封装;另一种是使用spring-kafka的封装。本文介绍的前一种,这样我们只需要更多的了解 kafka本身就可以了。
目录结构
我们先看示例工程的目录结构:
1 2 3 4 5 6 7 8 9 10 11
| pom.xml +---src \---main +---java \---cn.genlei Application.java KafkaConfig.java KafkaService.java TestController.java \---resources application.yml
|
pom.xml
我们在pom.xml中引入了spring boot 项目通常都包含的 spring-boot-starter-web 和 lombok. 然后引入了 kafka-clients。
1 2 3 4 5 6 7 8 9 10 11 12 13
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.7.0</version> </dependency>
|
Application.java
Application就是启动Springboot的启动入口,没有额外的配置
1 2 3 4 5 6 7 8 9
| import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication public class Application { public static void main(String[] args){ SpringApplication.run(Application.class); } }
|
KafkaConfig.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.List; import java.util.Properties;
@Configuration @Slf4j public class KafkaConfig {
//kafka 服务器IP地址和端口 @Value("${kafka.host}") private String kafkaHost; // 消费组 @Value("${kafka.groupid}") private String groupid; // 监听的主题列表,多个主题用,分隔 @Value("#{'${kafka.topics}'.split(',')}") List<String> topics;
/** * 初始化一个 KafkaProducer 对象,并作为 Bean 注入到 Spring 管理的对象中 * @return KafkaProducer 实例 */ @Bean KafkaProducer kafkaProducer() { Properties props = new Properties(); props.put("bootstrap.servers", kafkaHost); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props); log.info("KafkaProducer init finished."); return kafkaProducer; }
/** * 初始化一个 kafkaConsumer 对象,并作为 Bean 注入到 Spring 管理的对象中 * @return kafkaConsumer 实例 */ @Bean KafkaConsumer kafkaConsumer() { Properties props = new Properties(); props.put("bootstrap.servers", kafkaHost); props.put("group.id", groupid);
//配置自动提交offset props.put("enable.auto.commit", "true"); //配置自动提交offset的时间间隔为1秒 props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(topics); log.info("KafkaConsumer init finished. topics:{}",topics); return consumer; } }
|
这里的示例创建了两个对象实例,一个 kafkaConsumer, 一个 KafkaProducer。如果你的工程只需要生产者,那么把 kafkaConsumer 相关的部分删除掉就可以了。如果你的工程只需要消费者,那么把 KafkaProducer 相关的部分删除掉就可以了。
KafkaService.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Service; import java.time.Duration;
@Service @Slf4j public class KafkaService implements CommandLineRunner {
@Autowired KafkaConsumer kafkaConsumer;
@Override public void run(String... args) throws Exception { Thread thread = new Thread(new Runnable() { @Override public void run() { while (true){ ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { log.info("received message:{},{}",record.topic(),record.value()); } } } }); thread.setName("kafkaConsumer"); thread.start(); } }
|
KafkaService 这里是一个 消费者的示例,通过实现 CommandLineRunner 接口,让这部分代码在springboot 初始化后,执行。在这里启动一独立的线程去消费是为了 避免主线程阻塞在这里。
TestController.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController;
@RestController public class TestController {
@Autowired KafkaProducer kafkaProducer;
@RequestMapping("/test") public String send(@RequestParam String msg){ ProducerRecord<String, String> record = new ProducerRecord<>("TEST1", msg); kafkaProducer.send(record); return "send ok. msg:" + msg; } }
|
TestController 是一个简单的发送示例。把用户通过浏览器发送的消息写入 kafka 的 TEST1 这个主题中。各位测试的时候要注意看自己的kafka服务器中,是否有 TEST1 这个主题,如果没有的话,可能需要手动创建一下。
application.yml
这里配置文件,在 KafkaConfig 中引用
1 2 3 4
| kafka: host: 127.0.0.1:9092 groupid: test-group topics: TEST1
|
启动测试
经过前面的准备,现在就可以启动测试了。测试前需要确认:
- kafka的ip地址和端口是否已经改成了你本地可用的地址和端口
- kafka 服务器中是否已经有了 TEST1 主题,如果没有可以手动创建。
然后就可以运行 Application 开始你的测试。服务起来后,可以通过浏览器访问
1
| http://localhost:8080/test?msg=hello123
|
如果你看到控制台里输出
1
| received message:TEST1,hello123
|
这样的log,就说明集成成功了。
总的来说 KafkaConfig.java这个类是关键的集成部分,其它的其实都是辅助测试用的类,在实际项目中需要根据实际情况来修改。