1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<script>
window.$docsify = {
name: 'My Document',
repo: '',
loadSidebar: true,
markdown: {
renderer: {
code: function(code, lang) {
if (lang === "nutuml") {
return (
'<div class="nutuml">' + nutuml.render(code) + "</div>"
);
}
return this.origin.code.apply(this, arguments);
}
}
}
}
</script>
<!-- Docsify v4 -->
<script src="http://dev.genlei.cn/js/nutuml.js"></script>

最近遇到一个需求,一个表里有50个字段左右。我需要写一个详情接口,把这些字段都查出来。我想这很简单呀,我直接select * 不就好了嘛。于是我就这样干了

1
select * from foo;

嗯,似乎很完美,但是突然发现,这个表里有个敏感字段的内容是不能输出的,也就是我得从其中去掉一个字段。那就麻烦了,我可不想select column1,column2,column3,column4, 这样一个个写,要是这样一个个的写完,这么多的字段,我手都得残废了。我就想能不能写个SQL来自动拼接字段名称。答案当然是可以的,我们来看看这个SQL

1
2
select GROUP_CONCAT(concat('`', COLUMN_NAME,'`')) from information_schema.`COLUMNS`
where TABLE_SCHEMA='dbname' and TABLE_NAME='tablename';

对的,就这样就可以了。这下舒服了。 当然SQL语句前面的 select 后面 from 和where 就根据自己的实际需要来补充了。

简单说明一下,使用上面SQL的时候要把dbname换成你的数据库的名称,tablename换成表名。比如我们查wordpress的wp_user表。

1
2
3
4
5
6
7
mysql> select GROUP_CONCAT(concat('`', COLUMN_NAME,'`')) from information_schema.`COLUMNS` where TABLE_SCHEMA='blog' and TABLE_NAME='wp_users';
+--------------------------------------------------------------------------------------------------------------------------------------------+
GROUP_CONCAT(concat('`', COLUMN_NAME,'`'))
+--------------------------------------------------------------------------------------------------------------------------------------------+
`ID`,`user_login`,`user_pass`,`user_nicename`,`user_email`,`user_url`,`user_registered`,`user_activation_key`,`user_status`,`display_name`
+--------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.06 sec)

有人说,要不要解析一下这个SQL。好的,这就解析一下

  1. 从information_schema.COLUMNS 这个表里把所有的字段名称查出来,就是 COLUMN_NAME 这个字段。
  2. 通过concat函数,给字段名前后加上`这个字符
  3. 通过 group_concat这个函数,拼接成一行。

简介

之前在一个项目里,遇到一个问题:当时是消费者有两个节点,其中一个节点能消费到消息,另一个节点就一直消费不到消息。当时也是对kafka不太了解,也正因为这个问题,使我对kafka的分区有了更深入的理解。我们先来看几个问题:

  1. “消费组中的消费者个数如果超过topic的分区,那么就会有消费者消费不到数据” 这句话是否正确? 如果正确,那么有没有什么hack的手段?
  2. topic的分区数是否可以增加?如果可以怎么增加?如果不可以,那又是为什么?
  3. topic的分区数是否可以减少?如果可以怎么减少?如果不可以,那又是为什么?
  4. 创建topic时如何选择合适的分区数?
  5. Kafka有哪几处地方有分区分配的概念?简述大致的过程及原理。 6.生产者写入消息里,是如何确定一个消息对应的分区的?

希望通过这篇文章的介绍,能让大家对kafka的分区有一个清楚的理解,也能很好的回答上面的问题。

概念

写入消息和消费消息时都需要指定主题(Topic),消息是按主题来分类的。每个主题可以有多个分区(Partition),每个分区同一时间只能被一个消费者消费。通过把多个分区分散到多个服务器上,可以提高系统的服务能力。消息以追加的方式写入分区,然后以先进先出的顺序读取。下图是主题内有多个分区的消息写入示意图: 主题分区示意图

由于一个主题内包含了多个分区,因此无法在整个主题范围内保证消息的顺序,但可以保证消息在单个分区内的顺序。那么我如果希望实现主题内消息的顺序要怎么办呢?一个简单的方法就是只使用一个分区。

分区消费

每个分区,对于每一个消息者群组,同一时间只能有一个消费者。有以下三种对应的情况:

  1. 分区数量和消费者数量刚好相等

分区消费示意图1

  1. 分区数量大于消费者数量刚

分区消费示意图2

分区数量大于消费者数量时,一个消费者会消费来自多个分区的数据。

  1. 分区数量小于消费者数量

分区消费示意图3

分区数量小于消费者数量时,会有消费者处于无分区消费的情况,这个无分区消费的消费者就处于空闲状态了。这种情况下增加消费者并不能提高消费的处理速度。

那么问题来了,如果一个消息的消费是比较耗时的,那要怎么办呢?比如说:有一个影片转码系统,一个长视频的转码可能需要一个小时,甚至更长的时间,我们把待处理的视频消息放进了kafka。这个时候我们就希望能充分的使用多个消费者来提高整个系统的转码能力。这种情况我们需要做一些特殊处理,每个消费者,消费到一个消息后,在开始转码的任务前先停止消费。这样有新的消息到达时,就会分配给其它消费者。直到转码任务结束,可以开始下一个转码任务后,再开始消费。通过这样的方式实现消费者的资源充分利用。值得注意的是,这样的处理方法,只适用于需要长时间处理消息的情况,如果处理一个消息所花的时间是很短的,则不应该这样处理。下面的时序图可以帮助理解:

长耗时消息费示意图

分区管理

  1. 创建主题时,可以通过 –partitions 参数指定分区数量
1
2
$ ./bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic test-topic \
--partitions 1 --replication-factor 1
  1. 修改主题时,可以通过 –partitions 参数增加分区数量
1
2
$ ./bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --alter --topic test-topic \
--partitions 4

修改分区数量时,不能比原来的分区数要小,因为减少分区,kafka不知道要如何分区里的数据。如果一定要不顾一切代价,减少某个主题分区的数量,那就只能把这个主题先删除掉,再重新创建。这样一来,主题里原有的数据就全部丢失了。

分区分配

除了主题创建和主题修改时,我们能控制分区数量外,在生产者和消费者的API使用过程中,也会存在分区的分配过程。

生产者

既然有分区,那么当生产者写入一条新的消息时,分区是如何确定的呢?我们直接看 ProducerRecord.java 的源代码中 ProducerRecord 构造函数的定义。下面是从 kafka-clients 2.7.0的代码中提取的片段:

1
2
3
public ProducerRecord(String topic, Integer partition, K key, V value)
public ProducerRecord(String topic, K key, V value)
public ProducerRecord(String topic, V value)

在这里我截取其中三个构造函数,从上面的第一个构造函数可以看到,我们可以直接指定一个消息的 partition, 它的类型是Integer。除了直接指定partition,还可以在创建Producer的时候,通过 partitioner.class 配置,指定自定义的分区器。分区器需要实现 Partitioner 接口。

在默认的情况下,如果指定了 key, 那么Kafka会通过key,取它的哈希值,再对分区总数取模,分配到对应的分区。因此 对于同样的key, 在分区数量不变的情况下,总是会被分配到同一个分区。

特别注意,不要给所有的消息都指定同样的非空key, 这样一来所有的消息都会被分配到同一个分区,这样分区也就没有意义了。

如果没有指定key, 即 key==null 的情况下,kafka 会使用随机算法,给每一批消息分配一个随机的分区。如果你想使用 RoundRobin 轮询的方式来指定分区,可以通过修改 partitioner.class 的配置修改分区器的配置,参考代码:

1
2
3
4
5
6
7
8
9
Properties props = new Properties();
props.put("bootstrap.servers", "127.0.0.1:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// ** 重点就是下面这一行 **
props.put("partitioner.class","org.apache.kafka.clients.producer.RoundRobinPartitioner");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

从上面的分析我们知道,所有的指定消息对应的分区的逻辑都是在 kafka client 端实现的。

消费者

正如前文所说:每个分区同一时间只能被一个消费者消费。那么问题来了,在一个消费者群组内有多个消费者的时候,kafka是如何给消费者分配分区的呢?在消费的配置项里,有一项是:partition.assignment.strategy 就是用来指定分区分配策略的。默认的情况下使用的是 org.apache.kafka.clients.consumer.RangeAssignor 。简单的说就是按范围来分配。我从日志中看到如下的信息:

1
Notifying assignor about the new Assignment(partitions=[TEST1-4, TEST1-5, TEST1-6, TEST1-7])

我们可以看到kafka分配4个连续的分区给这个消费者。

需要注意的是,当有新的消费者加入,或者有消费者失效,kafka就会触发分区再均衡,也就是把一个分区的消费权从一个消费者转移给另一个消费者。在再均衡期间,消费者无法读取消息,整个群组会出现一小段时间的不可用。

问答

我们回过头来看看前面的问题,答案已经在上面的正文里给出来,这里简要的答复一下:

  1. “消费组中的消费者个数如果超过topic的分区,那么就会有消费者消费不到数据” 这句话是否正确? 如果正确,那么有没有什么hack的手段?

回答:正确,每个分区同一时间只能被一个消费者消费。hack的手段就是消费者消费到消息后,在处理消息前,先停止消费,处理完了再进行下一次消费,只适用于并发小,消息处理时间长的情况。

  1. topic的分区数是否可以增加?如果可以怎么增加?如果不可以,那又是为什么?

回答:可以, 通过 kafka-topics.sh 脚本修改分区数量

  1. topic的分区数是否可以减少?如果可以怎么减少?如果不可以,那又是为什么?

回答:不可以,因为分区数量减少后,kafka无法处理已经存在的数据。如果一定要不顾一切代价,减少某个主题分区的数量,那就只能把这个主题先删除掉,再重新创建。这样一来,主题里原有的数据就全部丢失了。

  1. 创建 topic 时如何选择合适的分区数?

回答:这个问题特别开放,回答起来还是蛮模糊的,说几点供大家参考。1. 如果你不知道怎么选择,那么1也许是不错的选择。 2. 如果你发现多个消费者中有人空闲了,那么分区数量应该大于等于一个消费者群组下的消费者的数量。3. 如果你想充分发挥多个broker的性能,那么分区数量应该大于等于broker的数量

  1. Kafka有哪几处地方有分区分配的概念?简述大致的过程及原理。

回答:1. 生产者发送消息时,需要给消息分配一个指定的分区;在默认的情况下,如果指定了 key, 那么Kafka会通过key,取它的哈希值,再对分区总数取模,分配到对应的分区。如果没有指定key, 即 key==null 的情况下,kafka 会使用随机算法,给每一批消息分配一个随机的分区。也可以自定义分区分配的方案。2. 消费者消费消息时,会给每个消费者分配对应的分区,默认按分区范围来分配,尽量分配连续的分区编号给同一个消费者。当有新的消费者加入,或者有消费者失效,kafka就会触发分区再均衡,也就是把一个分区的消费权从一个消费者转移给另一个消费者

  1. 生产者写入消息里,是如何确定一个消息对应的分区的?

回答:在默认的情况下,如果指定了 key, 那么Kafka会通过key,取它的哈希值,再对分区总数取模,分配到对应的分区。如果没有指定key, 即 key==null 的情况下,kafka 会使用随机算法,给每一批消息分配一个随机的分区。也可以自定义分区分配的方案。

总结

分区是kafka的重要特性,了解其工作模式及原理还是对于问题排查还是很有必要的。合适的分区数量对于提高消息吞吐量,同时消费的能力至关重要。

参考资料

  1. kafka-clients 2.7.0 源代码
  2. 《kafka权威指南》

在Springboot中使用kafka通常有两种式,一种是直接使用kafka-client, 自己做简单封装;另一种是使用spring-kafka的封装。本文介绍的前一种,这样我们只需要更多的了解 kafka本身就可以了。

目录结构

我们先看示例工程的目录结构:

1
2
3
4
5
6
7
8
9
10
11
   pom.xml
+---src
\---main
+---java
\---cn.genlei
Application.java
KafkaConfig.java
KafkaService.java
TestController.java
\---resources
application.yml

pom.xml

我们在pom.xml中引入了spring boot 项目通常都包含的 spring-boot-starter-web 和 lombok. 然后引入了 kafka-clients。

1
2
3
4
5
6
7
8
9
10
11
12
13
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.7.0</version>
</dependency>

Application.java

Application就是启动Springboot的启动入口,没有额外的配置

1
2
3
4
5
6
7
8
9
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {
public static void main(String[] args){
SpringApplication.run(Application.class);
}
}

KafkaConfig.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.List;
import java.util.Properties;

@Configuration
@Slf4j
public class KafkaConfig {

//kafka 服务器IP地址和端口
@Value("${kafka.host}")
private String kafkaHost;
// 消费组
@Value("${kafka.groupid}")
private String groupid;
// 监听的主题列表,多个主题用,分隔
@Value("#{'${kafka.topics}'.split(',')}")
List<String> topics;

/**
* 初始化一个 KafkaProducer 对象,并作为 Bean 注入到 Spring 管理的对象中
* @return KafkaProducer 实例
*/
@Bean
KafkaProducer kafkaProducer() {
Properties props = new Properties();
props.put("bootstrap.servers", kafkaHost);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
log.info("KafkaProducer init finished.");
return kafkaProducer;
}

/**
* 初始化一个 kafkaConsumer 对象,并作为 Bean 注入到 Spring 管理的对象中
* @return kafkaConsumer 实例
*/
@Bean
KafkaConsumer kafkaConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", kafkaHost);
props.put("group.id", groupid);

//配置自动提交offset
props.put("enable.auto.commit", "true");
//配置自动提交offset的时间间隔为1秒
props.put("auto.commit.interval.ms", "1000");

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(topics);
log.info("KafkaConsumer init finished. topics:{}",topics);
return consumer;
}
}

这里的示例创建了两个对象实例,一个 kafkaConsumer, 一个 KafkaProducer。如果你的工程只需要生产者,那么把 kafkaConsumer 相关的部分删除掉就可以了。如果你的工程只需要消费者,那么把 KafkaProducer 相关的部分删除掉就可以了。

KafkaService.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Service;
import java.time.Duration;

@Service
@Slf4j
public class KafkaService implements CommandLineRunner {

@Autowired
KafkaConsumer kafkaConsumer;

@Override
public void run(String... args) throws Exception {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
while (true){
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
log.info("received message:{},{}",record.topic(),record.value());
}
}
}
});
thread.setName("kafkaConsumer");
thread.start();
}
}

KafkaService 这里是一个 消费者的示例,通过实现 CommandLineRunner 接口,让这部分代码在springboot 初始化后,执行。在这里启动一独立的线程去消费是为了 避免主线程阻塞在这里。

TestController.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class TestController {

@Autowired
KafkaProducer kafkaProducer;

@RequestMapping("/test")
public String send(@RequestParam String msg){
ProducerRecord<String, String> record = new ProducerRecord<>("TEST1", msg);
kafkaProducer.send(record);
return "send ok. msg:" + msg;
}
}

TestController 是一个简单的发送示例。把用户通过浏览器发送的消息写入 kafka 的 TEST1 这个主题中。各位测试的时候要注意看自己的kafka服务器中,是否有 TEST1 这个主题,如果没有的话,可能需要手动创建一下。

application.yml

这里配置文件,在 KafkaConfig 中引用

1
2
3
4
kafka:
host: 127.0.0.1:9092
groupid: test-group
topics: TEST1

启动测试

经过前面的准备,现在就可以启动测试了。测试前需要确认:

  1. kafka的ip地址和端口是否已经改成了你本地可用的地址和端口
  2. kafka 服务器中是否已经有了 TEST1 主题,如果没有可以手动创建。

然后就可以运行 Application 开始你的测试。服务起来后,可以通过浏览器访问

1
http://localhost:8080/test?msg=hello123

如果你看到控制台里输出

1
received message:TEST1,hello123

这样的log,就说明集成成功了。

总的来说 KafkaConfig.java这个类是关键的集成部分,其它的其实都是辅助测试用的类,在实际项目中需要根据实际情况来修改。

本文档旨在用最简单的方法,把kafka的生产者和消费者的示例代码,运行起来,并做相应的讲解。读者需要有一定的java基础,会用maven,会使用一款java开发工具。

代码清单包括:

  1. 依赖项说明 pom.xml
  2. 日志配置 src/main/resources/logback.xml
  3. 生产者类 src/main/cn/genlei/Producer.java
  4. 消费者类 src/main/cn/genlei/Consummer.java

依赖说明

在pom.xml中添加依赖

1
2
3
4
5
6
7
8
9
10
 <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.7</version>
</dependency>

kafka-clients是kafka的依赖包,logback-classic是为了看kafka-client日志的,虽然logback-classic不是必须的,但是为了能看到相关的日志显示,建议还是加上。如果你的项目里已经有 log4j了,可以不用logback.

日志配置

创建一个日志的配置文件src/main/resources/logback.xml,内容如下:

1
2
3
4
5
6
7
8
9
10
11
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>

<root level="info">
<appender-ref ref="STDOUT" />
</root>
</configuration>

这里把root的 log level 设成了 info, 实现在 kafka的 debug 日志太多了。

生产者类

创建一个java类,我这里把文件放到了 src/main/cn/genlei/Producer.java, 你可以根据自己的需要,修改java文件的包名。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class Producer {
public static void main(String[] args){
Properties props = new Properties();
props.put("bootstrap.servers", "10.20.200.166:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

String message = "message " + System.currentTimeMillis();
ProducerRecord<String, String> record = new ProducerRecord<>("TEST1", message);
producer.send(record);

System.out.println(message);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
  1. 前面的三行import就是导入相关用到的类
  2. 第5,6行是 声明 类和默认的main方法,便于测试
  3. 7-11行是创建KafkaProducer对象,这个是生产者的核心对象。配置项中的 bootstrap.servers 需要根据你自己的实际情况修改,通常这个kafka服务器地址应该写到配置文件里
  4. 13-15行是 发送消息
  5. 17行是打印一下消息内容,便于调试
  6. 18-22行只是让主线程等待一下,好让kafka的发送线程能顺利把内容发送到kafka服务器,在正式的项目里,这个sleep是没有必要的。因为真实项目一般都是web项目类的,一直运行的。这里是因为发送消息后,进程直接结束了。因为kafka producer是异步发送的。如果不等待一点时间,会导致消息发不出去。

消费者类

创建一个java类,我这里把文件放到了 src/main/cn/genlei/Consummer.java, 你可以根据自己的需要,修改java文件的包名。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class Consummer {
static Logger logger = LoggerFactory.getLogger(Consummer.class);

public static void main(String[] args){
Properties props = new Properties();
props.put("bootstrap.servers", "10.20.200.166:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

List<String> list = new ArrayList<>();
list.add("TEST1");
consumer.subscribe(list);

while (true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
String topicName = record.topic();
String val = record.value();
logger.info("{},{}",topicName,val);
}
}
}
}
  1. 1-9行是引入相关的类
  2. 12 行是获取一个logger对象,用于打印日志
  3. 15-20行,是创建一个KafkaConsumer对象,这里的group.id是一个关键配置,group.id表一个消费组。一个消息只会被一个消费组消费一次。 key.deserializer 需要和生产者的 key.serializer相匹配,这样数据才能正确的被解释。同样的value.deserializer也需要和生产者的 value.serializer相匹配。
  4. 22-24行是订阅TEST1 这个topic。为后面的poll做准备
  5. 26-33 就是循环消费消息。

调试与运行

代码写好后就可以运行查看效果了。需要说明的是如果你的kafka服务器没有开启topic的自动创建,就需要手动先创建一个topic. 可以参考下面的命令:

1
./bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic TEST1 --partitions 1 --replication-factor 1

可以先运行 Consummer , 然后再运行 Producer . 每运行一次 Producer 后 ,Consummer都应该能收到一条消息。

kafka简介

我们先问两个问题:

  1. kafka是什么?
  2. kafka能做什么?

按kafka官网的说法,Apache Kafka 是一个开源分布式事件流平台,它被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。

按个人理解,kafka是一个开源的,流行的消息队列。Kafka是Apache基金会基于宽松的Apache License v2 开源协议开源的软件,可以免费用于商业应用。 Kafka是流行的,广泛在业界使用的。我们看国内几个大的云服务供应商提供的消息队列产品:

云服务

Kafka

RabbitMQ

RocketMQ

Pulsar

阿里云

支持

支持

支持

N

华为云

支持

支持

支持

N

腾讯云

支持

支持

支持

支持

Ucloud优刻得

支持

N

N

N

青云QingCloud

支持

支持

N

N

从上面的表格就可以看出,kafka已经足够流行了。

Kafka属于消息队列,消息队列的作用主要就是 解耦,削峰。

kafka结构

Kafka的逻辑结构请看下图: Kafka逻辑结构

生产者 和 消费者

生产者是指产生消息的实体,通常是业务的上游模块产生消息。消费者是指消费消息的实体,通常是业务的下游模块消费消息。生产者把消息写入kafka服务器集群,消费者从kafka服务器集群中获取消息,生产者和消费者之间没有直接的联系,生产者不知道消息被哪些消费者消费了,这样一来通过kafka就实现了生产者与消费者之间的解耦。

消息

Kafka的数据单元被称为消息。消息由字节组成,若干个字节表示一条消息。一条消息在kafka里就是一条记录。通常在实际项目中用一个Json对象表达一条消息。消息由生产者产生,生产者产生消息后,把消息发送到kafka服务器,消费者从kafka服务器中消费消息。

集群 和 Broker

一个Kafka集群由多台服务器组成,最少3台。每台服务器上运行一个Borker实例,Broker就是一个独立的Kafka服务器。

主题 和 分区

写入消息和消费消息时都需要指定主题,消息是按主题来分类的。每个主题可以有多个分区,每个分区同一时间只能被一个消费者消费。通过把多个分区分散到多个服务器上,可以提高系统的服务能力。

应用举例

假设有一个外卖系统,系统由订单微服务,商家微服务,配送员微服务成。那么一次交易过程的数据流如下图所示。

应用举例

  1. 顾客支付订单时,由订单微服务处理,订单微服务作为生产者把一个新的订单信息这个消息写入kafka服务器。
  2. 商家微服务从kafka服务器中消费订单信息,并通知商家备餐。
  3. 配送员微服务从kafka服务器中消费订单信息,并通知配送员送餐。

在这样一个示例中,我们就实现了 订单微服务,商家微服务,配送员微服 的解耦。

准备一台linux机器,我自己创建了一个kafka用户,各位可以根据自己的需求,创建不期望的linux用户。

第一步:下载安装包

1
[kafka@localhost ~]$ wget https://dlcdn.apache.org/kafka/3.0.0/kafka_2.13-3.0.0.tgz --no-check-certificate

你可以直接使用上面的命令下载安装包,也可以到官网下载安装包。我自己安装时是在 /home/kafka目录下安装的,你可以根据自己的需要选择一个合适的目录。然后就是解压

1
[kafka@localhost ~]$ tar -zxvf kafka_2.13-3.0.0.tgz 

第二步:运行 zookeeper.

1
2
[kafka@localhost ~]$ cd kafka_2.13-3.0.0
[kafka@localhost kafka_2.13-3.0.0]$ ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

先通过 cd kafka_2.13-3.0.0 进入kafka所在目录,然后执行 ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties 命令启动zookeeper。为了方便调试,一开始可以先不加 -daemon 这个后台运行的参数。不加 -daemon就可以在命令行直接看到启动日志。 加了 -daemon 就表示直接后台运行。

第三步:运行 kafka-server

1
[kafka@localhost kafka_2.13-3.0.0]$ ./bin/kafka-server-start.sh -daemon config/server.properties 

到此为止,kafka就正常运行起来了,可以使用kafka客户端测试一下。

测试验证

我们先看看端口是否正常监听了,通过netstat -ntulp 命令

1
2
3
4
[kafka@localhost kafka_2.13-3.0.0]$ netstat -ntulp
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp6 0 0 :::9092 :::* LISTEN 18815/java
tcp6 0 0 :::2181 :::* LISTEN 17837/java

如上面的结果,如果你看到 9092端口(kafka默认端口)和2181端口(zookeeper默认端口)都是正常监听状态,就说明kafka已经正常运行了。如果你看不到 可以把上面第二步 和第三步的 -daemon 参数去掉,看看是否有报错。可能会有一些文件目录权限的错误。

接下来我们创建一个topic.

1
[kafka@localhost kafka_2.13-3.0.0]$ ./bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic test-topic --partitions 1 --replication-factor 1

控制台打印出

1
Created topic test-topic.

这就说明 topic 创建成功了。

当然这个安装只是单机的安装,如果需要集群部署,还需要修改一些配置。这个就不在这篇文章里讨论了。

通常使用kafka都是安装在linux系统的,但有时候为了方便学习和本地开发测试,我们手上又没有linux机器,就可以考虑直接在windows下安装一个kafka. 请看下面的安装步骤

第一步:下载安装包

我们到官网 https://kafka.apache.org/downloads 下载安装包,我写这篇文章的时候,最新的版本是3.0.0。我下载的是 kafka_2.13-2.8.1.tgz 这个安装包。 下载后解压,我把文件解压到了 c:\soft目录下,大家可以根据自己的需要存放

第二步:运行 zookeeper.

  1. 打开 windows 命令行,进入 c:\soft\kafka_2.13-2.8.1 目录

  2. 执行 bin\windows\zookeeper-server-start.bat config\zookeeper.properties 命令启动zookeeper

    1
    C:\soft\kafka_2.13-2.8.1>bin\windows\zookeeper-server-start.bat config\zookeeper.properties

如果有错误,注意检查一下自己当前所在的目录。

第三步:运行 kafka-server

  1. 打开 另一个 windows 命令行,进入 c:\soft\kafka_2.13-2.8.1 目录

  2. 执行 bin\windows\kafka-server-start.bat config\server.properties 命令启动kafka-server

    1
    C:\soft\kafka_2.13-2.8.1>bin\windows\kafka-server-start.bat config\server.properties

到此为止,在windows下安装和启动kafka就算结束了,如果要验证安装是否正确,可以使用kafka客户端测试一下。

测试验证

接下来我们创建一个topic.

1
C:\soft\kafka_2.13-2.8.1>bin\windows\kafka-topics.bat --bootstrap-server 127.0.0.1:9092 --create --topic test-topic

控制台打印出

1
Created topic test-topic.

这就说明 topic 创建成功了。

小插曲

一开始我用的是kafka3.0.0的版本,我在执行这个命令时,报了一个错:

1
2
3
4
5
6
7
8
9
[2021-12-02 09:02:37,380] ERROR Failed to create or validate data directory C:\tmp\kafka-logs (kafka.server.LogDirFailureChannel)
java.nio.file.AccessDeniedException: C:\tmp
at sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:83)
at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:102)
...
at kafka.server.KafkaServer.startup(KafkaServer.scala:254)
at kafka.Kafka$.main(Kafka.scala:109)
at kafka.Kafka.main(Kafka.scala)

按错误信息看,是一个目录没有权限。后来给目录加上了权限,还是报这个错。在网上查有人说是数据有问题,我就把 c:\tmp下的目录 全删了,再重启zookeeper 和 kafka-server。也还是不行,再查了一下发现有人说是版本有问题。3.0.0的版本会报错,用2.8.1就好了。我试了一下,果然如此。

最后说明一下,windows下使用kafka建议还是仅供本地开发测试。生产还是使用linux系统安装kafka。

kafka从0.2.11版本开始支持事务,本文档对kafka事务作一个简单的说明,同时给出java代码示例,并对代码做一些简单的说明,同时说明相关的注意事项。希望能对需要使用kafka事务的朋友有帮助。

2017年6月28日,Kafka官方发布了0.11.0.0的版本,从这个版本开始,kafka支持了事务。那么,什么是kafka中的事务呢?

kafka事务支持生产者能够将一组消息作为单个事务发送,该事务要么原子地成功要么失败。举个例子,用户支付了某个订单,订单支付后,需要通知库存模块去减少库存,同时需要通知优惠券模型去扣减优惠券,这两个消息你需要让它们要么都成功,要么都失败。这个时候就可以使用kafka事务。

我们先看代码。 第一步:引入依赖,在pom.xml中增加kafka client的依赖

1
2
3
4
5
 <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.7.0</version>
</dependency>

第二步:发送消息相关代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
Properties props = new Properties();
props.put("bootstrap.servers", "10.20.200.166:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("transactional.id", "my-transactional-id");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

producer.initTransactions();

int c =0;
while (true) {
if(c++>=3){
break;
}
try {
producer.beginTransaction();

ProducerRecord<String, String> record = new ProducerRecord<>("TEST1", "message a " + c);
producer.send(record);
System.out.println("send TEST1 " + c);

ProducerRecord<String, String> record2 = new ProducerRecord<>("TEST2", "message b " + c);
producer.send(record2);
System.out.println("send TEST2 " + c);

producer.commitTransaction();
}catch (RuntimeException e){
System.out.println(e.getMessage());
producer.abortTransaction();
}
}
  1. 前面6行,是为了构建一个 KafkaProducer 实例,在spring boot的工程里,这个总分通常会封装成一个方法,然后通过@Bean 完成对象的注入管理。其中 bootstrap.servers 对应你的kafka服务器的地址,需要按你本地的实际情况修改,这一类地址通常也应该放到配置文件里。
  2. 第8行 producer.initTransactions(); 表示对kafka事务进行初始化,这个方法对于 每个producer 调用一次就可以了。
  3. 接下来是一个循环控制,这里是想表达 可以重复使用 producer对象,发送多次事务消息。对于每一次的 事务 以 producer.beginTransaction() 表示一次事务的开始,producer.send(record) 表示这一次事务里,需要发送的消息,每个事务里,send方法可以多次调用,具休取决于业务需求,然后通过 producer.commitTransaction() 提交事务,如果发生了异常,则通过 producer.abortTransaction() 来取消事务。

注意事项:

  1. transactional.id 的取值是不能重复的,如果你的环境里,只有单一节点,那这个值直接用一个固定的字符串就可以了。但是如果你的程序需要支持横向扩展,比如:同时有两个或者更多服务器同时运行你的代码,这个时候就会出问题。对于同样的transactional.id, 在一个新的KafkaProducer 调用 initTransactions 后,原来的进程就会报错。只有最新的进程能正常工作。所以这个时候,你需要保证不同的节点运行时,取到的transactional.id的值是不一样的。你可以使用 UUID.randomUUID().toString() 来生成一个保证不重复的随机ID,或者 直接在不同的实例的服务器里配置不同的transactional.id。 而对于同一个节点的运行,多次事务,是可以使用同一个KafkaProducer的,也就可以使用一样的 transactional.id。

参考文档: https://www.confluent.io/blog/transactions-apache-kafka/

标签是一个很多业务系统里都常用的功能,我们需要统计每个标签对应的文章数量。我们的数据如下:

文章表(article):

id

tags

1

aaa,bbb

2

bbb,ccc

3

ddd

我们需要统计每个标签对应的文章数量,期望输出以下内容

tag

cnt

aaa

1

bbb

2

ccc

1

ddd

1

那么这个SQL要怎么写呢?先看最终SQL:

1
2
3
4
5
6
7
8
9
SELECT tag, COUNT(*) cnt
FROM
(
SELECT SUBSTRING_INDEX(SUBSTRING_INDEX(c.tags, ',', n.n), ',', -1) tag
FROM article c CROSS JOIN tally n
WHERE n.n <= 1 + (CHAR_LENGTH(c.tags) - CHAR_LENGTH(REPLACE(c.tags, ',', '')))
) q
WHERE CHAR_LENGTH(tag) > 0
GROUP BY tag

如果你直接运行上面的SQL,很可能会遇到SQL错误 Table ‘tally’ doesn’t exist 。 是的,这个SQL需要有一个辅助表tally. 你可以使用以下SQL创建这个辅助表:

1
2
3
4
5
6
7
8
CREATE TABLE tally(n INT NOT NULL AUTO_INCREMENT PRIMARY KEY);

INSERT INTO tally (n)
SELECT NULL
FROM
(SELECT 0 AS N UNION ALL SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4 UNION ALL SELECT 5 UNION ALL SELECT 6 UNION ALL SELECT 7 UNION ALL SELECT 8 UNION ALL SELECT 9) a
,(SELECT 0 AS N UNION ALL SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4 UNION ALL SELECT 5 UNION ALL SELECT 6 UNION ALL SELECT 7 UNION ALL SELECT 8 UNION ALL SELECT 9) b
;

article 表的SQL:

1
2
3
4
5
6
7
8
9
CREATE TABLE `article`  (
`id` int(0) NOT NULL,
`tags` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_bin ROW_FORMAT = Dynamic;

INSERT INTO `article` VALUES (1, 'aaa,bbb,');
INSERT INTO `article` VALUES (2, 'bbb,ccc,');
INSERT INTO `article` VALUES (3, 'ddd,');