控制按钮列中的某一行不显示按钮。

屏蔽的方法:把按钮改成普通的单元格。参考代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//定义绘画表格前的事件,在绘画前把按钮转换成普通单元格。
dataGrid.RowPrePaint += dataGrid_RowPrePaint;
//控制第一行和最后一行的按钮不显示。
private void dataGrid_RowPrePaint(object sender, DataGridViewRowPrePaintEventArgs e)
{
if (e.RowIndex == 0 e.RowIndex == dataGrid.RowCount - 1)
{
if (dataGrid[7, e.RowIndex] is DataGridViewButtonCell)
{
dataGrid[7, e.RowIndex] = new DataGridViewTextBoxCell();
dataGrid[7, e.RowIndex].Value = string.Empty;
}
}
}

控制某单元格是否允许修改。

由于没有单独控制某个单元格的ReadOnly属性,因此,只能通过技巧来控制了。原理其实是判断进入的单元格时判断做逻辑处理,对于允许修改的,就把整个表格的ReadOnly设为false,否则就设为 true

1
2
3
4
5
6
7
8
9
10
private void dataGrid_RowEnter(object sender, DataGridViewCellEventArgs e)
{
if (e.RowIndex == dataGrid.RowCount - 1)
dataGrid.ReadOnly = true;
else
dataGrid.ReadOnly = false;

dataGrid.Columns[0].ReadOnly = true;
dataGrid.Columns[3].ReadOnly = true;
}

参考来源:https://blog.csdn.net/kfarvid/article/details/7280427

下面这段SQL用于生成数据库对应的实体对象的代码。 执行效果

1
2
3
Integer deleteFlag;
String note;
String parentId;

SQL语句,执行的时候 注意修改 数据库名 和表名

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
set @db_name = 'test';
set @table_name = 'table';

CREATE TEMPORARY TABLE if not exists tally(n INT NOT NULL AUTO_INCREMENT PRIMARY KEY);

truncate table tally;

INSERT INTO tally (n)
SELECT NULL
FROM
(SELECT 0 AS N UNION ALL SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4 UNION ALL SELECT 5 UNION ALL SELECT 6 UNION ALL SELECT 7 UNION ALL SELECT 8 UNION ALL SELECT 9) a
,(SELECT 0 AS N UNION ALL SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4 UNION ALL SELECT 5 UNION ALL SELECT 6 UNION ALL SELECT 7 UNION ALL SELECT 8 UNION ALL SELECT 9) b
;
with cols as (
select COLUMN_NAME,DATA_TYPE,ordinal_position from information_schema.`COLUMNS`
where TABLE_SCHEMA=@db_name and TABLE_NAME=@table_name
)
, c as (
select SUBSTRING_INDEX(SUBSTRING_INDEX(COLUMN_NAME, '_', n.n), '_', -1) word, COLUMN_NAME,n from cols
CROSS JOIN tally n
where n.n <= 1 + (CHAR_LENGTH(COLUMN_NAME) - CHAR_LENGTH(REPLACE(COLUMN_NAME, '_', '')))
),
cd as (
select if(n>1, concat(UPPER(left(word,1)),substring(word,2,(length(word)-1))), word ) word,COLUMN_NAME,n from c
),
cn as (
select group_concat(word order by n separator '') varName,COLUMN_NAME from cd group by COLUMN_NAME
),
vct as (
select varName,cn.COLUMN_NAME,DATA_TYPE from cn left join cols on cn.COLUMN_NAME=cols.COLUMN_NAME
),
map as (
select 'varchar' as dbType,'String' as javaType union
select 'timestamp' as dbType,'Date' as javaType union
select 'datetime' as dbType,'Date' as javaType union
select 'date' as dbType,'Date' as javaType union
select 'tinyint' as dbType,'Integer' as javaType union
select 'int' as dbType,'Integer' as javaType union
select 'smallint' as dbType,'Integer' as javaType union
select 'year' as dbType,'Integer' as javaType union
select 'bigint' as dbType,'Long' as javaType union
select 'mediumint' as dbType,'Long' as javaType union
select 'float' as dbType,'Float' as javaType union
select 'double' as dbType,'Double' as javaType union
select 'bit' as dbType,'Boolean' as javaType union
select 'decimal' as dbType,'BigDecimal' as javaType union
select 'char' as dbType,'String' as javaType union
select 'tinytext' as dbType,'String' as javaType union
select 'mediumtext' as dbType,'String' as javaType union
select 'longtext' as dbType,'String' as javaType union
select 'json' as dbType,'String' as javaType union
select 'enum' as dbType,'String' as javaType union
select 'text' as dbType,'String' as javaType
),
vctm as (
select varName,COLUMN_NAME,DATA_TYPE,javaType from vct left join map on vct.DATA_TYPE=map.dbType
),
cl as (
select concat(javaType,' ', varName, ';') from vctm inner join cols on vctm.COLUMN_NAME = cols.COLUMN_NAME ORDER BY cols.ordinal_position
)
select * from cl;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class GroupTest {

public static void main(String[] args){
List<GroupTest> list = new ArrayList<>();
list.add(new GroupTest(1,"AA"));
list.add(new GroupTest(1,"BB"));
list.add(new GroupTest(2,"CC"));
list.add(new GroupTest(3,"33"));
list.add(new GroupTest(3,"44"));
list.add(new GroupTest(2,"55"));
list.add(new GroupTest(2,"66"));

Map<Integer,List<GroupTest>> m1 = list.stream().collect(
Collectors.groupingBy(GroupTest::getAge)
);
Map<Integer,List<String>> m2 = list.stream().collect(
Collectors.groupingBy(GroupTest::getAge,
Collectors.mapping(GroupTest::getTitle,Collectors.toList())
)
);
System.out.println(m1);
System.out.println(m2);
}
Integer age;
String title;

@Override
public String toString() {
return "GroupTest{" +
"age=" + age +
", title='" + title + '\'' +
'}';
}

public Integer getAge() {
return age;
}

public void setAge(Integer age) {
this.age = age;
}

public String getTitle() {
return title;
}

public void setTitle(String title) {
this.title = title;
}

public GroupTest(Integer age, String title) {
this.age = age;
this.title = title;
}

}

输出:

1
2
3
{1=[GroupTest{age=1, title='AA'}, GroupTest{age=1, title='BB'}], 2=[GroupTest{age=2, title='CC'}, GroupTest{age=2, title='55'}, GroupTest{age=2, title='66'}], 3=[GroupTest{age=3, title='33'}, GroupTest{age=3, title='44'}]}

{1=[AA, BB], 2=[CC, 55, 66], 3=[33, 44]}

前一段时间,有一个需求,需要做一个比较复杂的权限判断。问题是这样,已知用户拥有权限[a,b,c].某个资源需要[c,d,e]权限之一即可访问。那么如何用Mysql判断两个json数组是否存交集。

答案就是使用MySQL提供的 JSON_OVERLAPS() 函数。

注意,这个JSON_OVERLAPS() 是MySQL 8.0.17版本以上才支持的,如果你的MySQL版本低于这个版本,就可以考虑一下升级了

先看两个简单示例:

1
2
3
4
5
6
7
mysql> select JSON_OVERLAPS('[1,2]','[2,3]');
+--------------------------------+
JSON_OVERLAPS('[1,2]','[2,3]')
+--------------------------------+
1
+--------------------------------+
1 row in set (0.00 sec)

因为数组[1,2]和数组[2,3]存在交集,所以返回值是1

1
2
3
4
5
6
7
mysql> select JSON_OVERLAPS('[1,2]','[3,4]');
+--------------------------------+
JSON_OVERLAPS('[1,2]','[3,4]')
+--------------------------------+
0
+--------------------------------+
1 row in set (0.00 sec)

因为数组[1,2]和数组[3,4]不存在交集,所以返回值是0。

上面两个示例都是数值型的数组,那么字符串类的数组支持吗?当然是支持的,请看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
mysql> select JSON_OVERLAPS('["a","b"]','["b","c"]');
+----------------------------------------+
JSON_OVERLAPS('["a","b"]','["b","c"]')
+----------------------------------------+
1
+----------------------------------------+
1 row in set (0.00 sec)

mysql> select JSON_OVERLAPS('["a","b"]','["c","d"]');
+----------------------------------------+
JSON_OVERLAPS('["a","b"]','["c","d"]')
+----------------------------------------+
0
+----------------------------------------+
1 row in set (0.00 sec)

那么我们要如何把这个函数应用到select 查询当中呢,请看下面的示例。 我们先创建表,准备基础数据

1
2
3
4
5
6
7
8
CREATE TABLE `user`  (
`username` varchar(45) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,
`role` varchar(45) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,
UNIQUE INDEX `username_UNIQUE`(`username`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 35 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
INSERT INTO `user` VALUES ('u1', '[\"a\",\"b\"]');
INSERT INTO `user` VALUES ('u2', '[\"b\",\"c\"]');
INSERT INTO `user` VALUES ('u3', '[\"c\",\"d\"]');

所以表里数据是这样的。 username

role

u1

[“a”,”b”]

u2

[“b”,”c”]

u3

[“c”,”d”]

如果我需要查询拥有 角色a 或者 角色d 的用户,就可以这样查询:

1
2
3
4
5
6
7
8
mysql> select * from `user` where JSON_OVERLAPS(role,'["a","d"]')=1;
+----------+-----------+
username role
+----------+-----------+
u1 ["a","b"]
u3 ["c","d"]
+----------+-----------+
2 rows in set (0.00 sec)

使用还是蛮简单的,你学会了吗?

本文记录并介绍kafka集群的安装过程。

准备工作

  1. 准备3台Linux机器,可以是虚拟机,我用的是 CentOS 7

服务器名

IP

说明

broker-1

192.168.1.85

Kafka节点1

broker-2

192.168.1.86

Kafka节点2

broker-3

192.168.1.87

Kafka节点3

  1. 三台机器都安装好JDK,我用的是JDK8

安装Zookeeper

1. 下载zookeeper

1
2
cd /usr/local
wget https://dlcdn.apache.org/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz

解压

1
tar -zxvf apache-zookeeper-3.6.3-bin.tar.gz

2. 编辑配置文件

1
2
3
cd apache-zookeeper-3.6.3-bin
cd conf
cp zoo_sample.cfg zoo.cfg

然后使用 vi zoo.cfg 命令编辑配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# ...
PrometheusMetricsProvider
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true

server.1=192.168.1.85:2888:3888
server.2=192.168.1.86:2888:3888
server.3=192.168.1.87:2888:3888

在文件后面加上 server.1, server.2, server.3 这三行 。 IP地址根据你的实际情况修改

3. 编辑myid.

根据上面配置的 dataDir 配置项。到对应的目录里写入myid的值。比如上面的server.1 那么这台机器的 myid时就写1.

确保dataDir对应的目录已经存在了,我本地 /tmp/zookeeper还不存在。所以我先创建了这个目录

1
mkdir /tmp/zookeeper
  1. 在server.1 也就是192.168.1.85上输入 echo ‘1’>/tmp/zookeeper/myid
  2. 在server.2 也就是192.168.1.86上输入 echo ‘2’>/tmp/zookeeper/myid
  3. 在server.3 也就是192.168.1.87上输入 echo ‘3’>/tmp/zookeeper/myid

4.启动zookeeper

启动

1
# ./bin/zkServer.sh start

然后控制台会输出以下结果:

1
2
3
4
/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/apache-zookeeper-3.6.3-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

查看状态

1
# ./bin/zkServer.sh status

然后控制台会输出以下结果:

1
2
3
4
ZooKeeper JMX enabled by default
Using config: /usr/local/apache-zookeeper-3.6.3-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower

这里输出的Mode会有一个节点是 leader,其它两个是 follower。

要停止zookeeper就运行

1
# ./bin/zkServer.sh stop

安装kafka

1. 下载kafka

创建目录,及下载安装包。可以到官网 https://kafka.apache.org/ 查看最新的下载地址。我这里用的是3.1.0版本。

1
2
cd /usr/local
wget https://dlcdn.apache.org/kafka/3.1.0/kafka_2.13-3.1.0.tgz

下载成功后运行命令解压:

1
tar -zxvf kafka_2.13-3.1.0.tgz 

进入 kafka_2.13-3.1.0 目录

1
cd kafka_2.13-3.1.0

2. 编辑配置文件

1
vi config/server.properties

broker1

1
2
3
broker.id=1
listeners=PLAINTEXT://192.168.1.85:9092
zookeeper.connect=192.168.1.85:2181,192.168.1.86:2181,192.168.1.87:2181

broker2

1
2
3
broker.id=2
listeners=PLAINTEXT://192.168.1.86:9092
zookeeper.connect=192.168.1.85:2181,192.168.1.86:2181,192.168.1.87:2181

broker3

1
2
3
broker.id=2
listeners=PLAINTEXT://192.168.1.87:9092
zookeeper.connect=192.168.1.85:2181,192.168.1.86:2181,192.168.1.87:2181

注意,上面的说明是要找到对应的配置项去修改。配置文件里其它的配置项不动就可以了。配置文件还有其它的配置。

3. 启动kafka

在三台机器上依次输入启动命令

1
./bin/kafka-server-start.sh -daemon config/server.properties 

如果不是很有把握可以先把上面启动命令里的 -daemon去掉。这样控制台会有日志输出。能正常启动后再加上-daemon启动。

如果需要停止kafka, 可以运行

1
./bin/kafka-server-stop.sh

测试Kafka

创建topic

我们在broker1上创建一个 test-topic 主题。我们指定3个副本,一个分区

1
./bin/kafka-topics.sh --create --bootstrap-server 192.168.1.85:9092 --replication-factor 3 --partitions 1 --topic test-topic

查看Topic

我们可以通过命令列出指定Broker的topic

1
./bin/kafka-topics.sh --list --bootstrap-server 192.168.1.85:9092

总结

一开始我想用 kafka自带的zookeeper来搭建集群。但是发现少了 ./bin/zkServer.sh status 这个查看状态对应的脚本。所以稳妥起见还是下载了独立的zookeeper。

走过路过的朋友,别忘了点个赞再走,您的支持是我持续创作的动力!谢谢!

在kafka中要如何实现延时队列呢?很遗憾kafka官方到目前为止并没有给出延时队列的直接实现。也就是说我们需要自己来实现这一部分的功能。

延时队列

实现的原理描述如下:

  1. 根据业务需要,自定义几个特殊主题。比如:topic-delay-1s,topic-delay-5s,topic-delay-5m,topic-delay-30m。分别表示延迟1秒,延迟5秒,延迟5分钟,延迟30分钟。当然具体延迟多少需要根据你自己的业务来设定。但有一点就是延迟的时间间隔是需要固定的。

  2. 自己实现一个delay service。可以是java或者其它语言实现。delay service 同时消费上面定义的主题,然后等待延时时间的到达。时间到达后,把消息转发至业务主题。

  3. 业务处理正常消费业务主题,处理消息

举个例子,比如火车票订单,如果30分钟内未支付,则自动取消订单。那么用时序图描述延时处理过程就是正面这样的。

autonumber
business -> kafka : 发送订单消息至 topic-delay-5m 

participant ds as "delay service"
kafka -> ds : 从 topic-delay-5m 消费订单消息
ds -> ds : 等待5分钟
ds -> kafka : 把订单消息发送至 topic-business 
kafka -> business : 从 topic-business 消费订单消息,并处理

注意事项

实现delay service 里需要注意:

  1. 因为consumer是单线程的,所以理论上有多少个延迟主题,就需要创建多少个线程。像上面topic-delay-1s,topic-delay-5s,topic-delay-5m,topic-delay-30m 这4种主题,就需要4个线程来处理。延迟主题少问题不大,但延迟主题如果比较多的话,还是比较难受的。

  2. 需要手动提交偏移量,因为delay service 可能会因为升级或者故障,导致重启,这个时候消息是不能漏的,所以一定要消息已经转发到业务主题后,再提交偏移量,防止漏消息。

  3. 重复消息的处理,因为delay service 需要先把消息发送到业务 主题,再提交偏移量,就有可能出现 消息发送到业务主题后,还没来的及发送偏移量,delay service 就因电力故障无法正常服务了,下次重启后,就可能继续发送已经发送过的消息。所以业务上建议是要做到幂等,以实现容错。

重试队列 与 死信队列

我们在 延时队列的基础上实现重试队列就比较简单了,当某个消息处理失败时,就把这个消息发送到 延时队列。等时间到了就会重试处理。如果处理成功,则结束。如果处理失败则重试次数加1,再一次进入延时队列。而如果超过了重试次数,则写入死信队列,作为记录。

这里说的重试队列,死信队列都是概念上的东西,kafka本身并不提供。我们是可以在使用层实现这一类概念。下面的时序图,是一个示例。我们假设有一个订单支付成功了,有积分逻辑需要处理,但重试三次后依然失败了。时序过程描述如下:

autonumber
participant os as "订单微服务"
participant ds as "延时微服务"
participant is as "积分微服务"

os -> kafka : 发送支付成功消息至 topic-integral

kafka -> is : 从 topic-integral 消费订单消息
is -> is : 积分处理第1次失败
is -> kafka : 发送重试消息至 topic-delay-1m 
kafka -> ds : 从 topic-delay-1m 消费订单消息
ds -> ds : 等待1分钟
ds -> kafka : 把订单消息发送至 topic-integral

kafka -> is : 从 topic-integral 消费订单消息
is -> is : 积分处理第2次失败
is -> kafka : 发送重试消息至 topic-delay-1m 
kafka -> ds : 从 topic-delay-1m 消费订单消息
ds -> ds : 等待1分钟
ds -> kafka : 把订单消息发送至 topic-integral

kafka -> is : 从 topic-integral 消费订单消息
is -> is : 积分处理第3次失败
is -> kafka : 发送消息至 topic-dead,记录死信消息

实现示例

我在github上面提交了,我的实现示例,这里简单说明一下。github地址: https://github.com/junfengliang/kafka-delay-queue

项目中的kafka-delay-queue是核心代码,kafka-delay-service是springboot集成的示例。

这里简单讲一下集成的过程: 文件目录:

1
2
3
4
5
6
7
8
9
10
├── pom.xml
└── src
└── main
├── java
│   └── cn
│   └── genlei
│   └── service
│   └── Application.java
└── resources
└── application.yml

一共就三个文件。

pom.xml

1
2
3
4
5
6
7
8
9
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>cn.genlei</groupId>
<artifactId>kafka-delay-queue</artifactId>
<version>1.0.0</version>
</dependency>

关键就是添加依赖,说明一下,因为 kafka-delay-queue 这个包还没有传到 maven的中心仓库中。所以需要先本地到kafka-delay-queue目录下通过 mvn install编译这个jar包。

application.yml

1
2
3
4
kafka:
servers: 127.0.0.1:9092
group-id: kafka-delay-group
topics: 5000,topic-delay-5s;60000,topic-delay-1m

注意修改 kafka服务器地址,对应你自己的kafka服务器地址。topics的格式是 延时 + 逗号 + 延时主题。多个配置用 分号 分隔。相关的主题需要先手动在kafka上创建。

Application.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package cn.genlei.service;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import cn.genlei.DelayQueue;
import java.util.Properties;

@SpringBootApplication
public class Application implements CommandLineRunner {

@Value("${kafka.servers}")
String servers;

@Value("${kafka.group-id}")
String groupId;

@Value("${kafka.topics}")
String topics;

public static void main(String[] args){
SpringApplication.run(Application.class);
}

@Override
public void run(String... args) throws Exception {
Properties properties = new Properties();
properties.put("bootstrap.servers",servers);
properties.put("group.id", groupId);
properties.put("topics", topics);
DelayQueue delayQueue = new DelayQueue(properties);
delayQueue.start();
}
}

这里关键的部分就是run方法内的内容。我们创建一个DelayQueue实例,然后调用它的start方法,就会开始延时队列的处理了。

注间:上述代码并没有在生产环境长时间验证过,如果你需要用于生产,请谨慎使用。

最近遇到一个技术问题,需要根据分类过滤数据,这个分类是有层级关系的,也就是当输入一个分类ID时,需要返回分类本身的数据,还有子分类,孙分类的数据。我就是想到了用JSON数组来实现。因为数据库里有记录某篇文章的所有分类,包含所有父级分类。

在网上查一下怎么在MYSQL里查找JSON数据组是否包含某个值,查了半天,也没找到有用的信息。后来还是到MySQL官网找到的答案。就是 MEMBER OF(json_array).

1
2
3
4
5
6
mysql> select 1 member of ('[1]');
+---------------------+
1 member of ('[1]')
+---------------------+
1
+---------------------+

member of 匹配则返回1,不匹配则返回0。接下来,我们创建一个测试表,及一些测试数据来试一下这个功能。

1
2
3
4
5
6
7
8
9
10
CREATE TABLE `article` (
`id` int NOT NULL,
`title` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL,
`catalogs` varchar(1024) COLLATE utf8mb4_bin DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
INSERT INTO `test`.`article`(`id`, `title`, `catalogs`) VALUES (1, 't1', '[1]');
INSERT INTO `test`.`article`(`id`, `title`, `catalogs`) VALUES (2, 't2', '[1,2]');
INSERT INTO `test`.`article`(`id`, `title`, `catalogs`) VALUES (3, 't3', '[11]');
INSERT INTO `test`.`article`(`id`, `title`, `catalogs`) VALUES (4, 't4', NULL);

有的分类,是有层级的。比如一个地址:广东省,深圳市,宝安区。有时候我们希望查询的是广东省的所有数据。所以会希望能过一个ID能查询出直接属于这个分类的,以及它的子分类的内容。像上面的第2条记录[1,2]表示的就是这一类的情况。

查询分类1对应的记录:

1
2
3
4
5
6
7
8
mysql> select * from article where 1 member of(catalogs);
+----+-------+----------+
id title catalogs
+----+-------+----------+
1 t1 [1]
2 t2 [1,2]
+----+-------+----------+
2 rows in set (0.00 sec)

我们看到 [1] 和 [1,2] 都是能正确返回的。[11]则能正确的被过滤掉。

值的注意的是,如果你的内容是字符串类的表达的,要加上’’去匹配。注意看下面的区别:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
mysql> select 1 member of ('["1"]');
+-----------------------+
1 member of ('["1"]')
+-----------------------+
0
+-----------------------+
1 row in set (0.00 sec)

mysql> select "1" member of ('["1"]');
+-------------------------+
"1" member of ('["1"]')
+-------------------------+
1
+-------------------------+
1 row in set (0.00 sec)

所以不能用数字1去匹配字符串”1”。

注:MEMBER OF()是MySQL 8.0.17.版本才加入的功能,所以如果上面的SQL报错,注意查看mysql的版本。

如果是在mysql 5.7 版本执行上面的SQL,就会报错

1
2
3
4
5
6
7
8
mysql> select 1 member of ('[1]');
ERROR 1064 (42000): You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'of ('[1]')' at line 1
mysql> select version();
+-----------+
version()
+-----------+
5.7.30
+-----------+

参考: https://dev.mysql.com/doc/refman/8.0/en/json-search-functions.html#operator_member-of

我们先来看几个问题:

  1. 简述消费者与消费组之间的关系
  2. 有哪些情形会造成重复消费?
  3. 那些情景下会造成消息漏消费?
  4. KafkaConsumer 是非线程安全的,那么怎么样实现多线程消费?
  5. 我明明发了消息,但是为什么没收到?
  6. 怎么知道现在消息是不是有堆积,消费不及时?
  7. 怎么查看某个主题的某个分区分配给哪个消费者了?

上面的这些问题,或者是在面试时会被问到,或者是在实际项目中排查问题会遇到。希望能通过这篇文章为大家解答这些问题。

概念

消费者就是通过订阅一个或多个主题,消费消息的实体。消费者从属于消费者群组,一个群组内的多个消费者通过分区分配来接收不同分区的消息。下面是要点:

  1. 对于同一个消费者群组,一条消息只会被其中一个消费者消费。
  2. 一个消费者可以从多个分区中消费消息,但对于每一个消费者群组,每个分区在同一时刻只能被一个消费者消费。
  3. 不同的消费者群组之间没有影响

示例一:假设有这样一个微服务,需要把一些配置信息加载到内存中,这些配置信息是在数据库里存储于一张配置表里的,同时提供了一个web界面去修改。要求在web界面修改后,内存中的缓存也要即时更新。这个时候如果我们只一个web节点,那是很简单的,就是当配置信息发生变更的时候,直接重新加载一下配置信息就可以了。但是在上规模的系统里,往往是要求水平扩展的,我们会有2个或者更多的web节点,这个时候,当操作人员在界面上操作配置变更时,只有其中一个web节点,知道配置变更了,其它节点并不知道。

为了解决这个问题,我们就会先把变更的消息发送到kafka, 然后每个web节点都来消费这个消息,这种情况需要特别注意不能使用相同的消费者群组,不然还是只有一个web节点能收到这个消息。

示例二:假设有一个电商系统,包括:订单微服务务,配送微服务。订单支持成功后,订单消息写入kafka, 通知配送微服务,后来由于业务扩展,需要新增积分微服务,这个时候积分微服务只需要使用一个新的消费者群组,就可以完成业务扩展,不影响原来的消费者群组。所以对于生产者来说,它并不知道有多少个消息者群组会来消费消息。也是这个特性使用kafka后,系统的可扩展性,会变得更好。完全可以实现,在现有微服务不变的前提下,增加新的微服务,完成业务的扩展。

消费过程及偏移量控制

我们先来看一段简单的消费代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.1.102:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

List<String> list = new ArrayList<>();
list.add("TEST1");
consumer.subscribe(list);

while (true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
logger.info("{},{}",record.topic(),record.value());
}
}

上面的这段代码主要是三部分:

  1. 创建一个 KafkaConsumer 对象
  2. 订阅目标主题
  3. 循环消费消息

如果只看上面的代码,我们找不到任何关于偏移量的相关代码。那么什么是偏移量?它有什么作用?

回答这个问题前,我们先来看几个业务场景:

  • 场景一:不在乎过去,只关心从消费者开始消费后产生的消息。
  • 场景二:尽可能往早去追溯,越早越好
  • 场景三:从某一特定时间点开始追溯,比如:从今天0点开始

偏移量(offset) 是一个不断递增的整数值,在创建消息时,kafka会把它添加到消息里。在给定的分区里,每个消息的偏移量是唯一的。Kafka对于每一个消费者群组,记录其对应的每个分区最后消费的偏移量。因此偏移量是用于记录和控制消费进度的状态值。通过控制偏移量,可以消费指定偏移量后的消息,也可以实现消费从上次中断后产生的新消息。

auto.offset.reset 是一个关键的消费者配置项,它的值是字符串类型,有两个值 earliest 和 latest,默认值是latest。它表示消费者的偏移量控制策略。当kafka服务器找不到该消费者群组订阅的当前主题的当前分区的消费偏移量记录时,就会使用这个策略。比如一个新的消费者群组开始消费时,因为从来没有消费过,kafka服务器肯定是没有消费记录的。

  • earliest: 自动重置到最早的偏移量,这种情况能消费到目前依然记录在服务器上的所有消息。
  • latest: 自动重置到最新的偏移量,这种情况能消费到从消费者群组首次活跃后产生的消息。

我们回过头来再看看上面的代码,没有对 auto.offset.reset 进行任何配置,也没有指定offset,因此默认值 latest 会生效,上面的代码对应了前面讲的场景一,会消费到从这个消费者群组首次启动后产生的消息。

那么如果要实现场景二,也很简单,加一行代码,修改 auto.offset.reset 的配置就好了。

1
props.put("auto.offset.reset", "earliest");

如果要实现场景三,那么代码会多一些,示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.1.102:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

List<String> list = new ArrayList<>();
list.add("TEST1");
consumer.subscribe(list);

long start = getStart(); // 按实际需要设置开始的时间戳
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
for(TopicPartition topicPartition:consumer.assignment()){
timestampsToSearch.put(topicPartition,start);
}

Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap = consumer.offsetsForTimes(timestampsToSearch);
for(Map.Entry<TopicPartition, OffsetAndTimestamp> entry:offsetAndTimestampMap.entrySet()){
if(entry.getValue()!=null) {
consumer.seek(entry.getKey(), entry.getValue().offset());
}else{
// make some log
}
}

while (true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
logger.info("{},{}",record.topic(),record.value());
}
}

从上面的代码可以看出,我们先通过 KafkaConsumer 的 offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)方法,要检索指定主题指定分区对应特定时间的偏移量。再通过 KafkaConsumer 的 seek 方法,来设置偏移量。

自动提交偏移量

enable.auto.commit 是一个消费者配置项,它的值是布尔类型,默认是true. 当它的值为true时,消息者会自动在后台周期性的提交偏移量。所以默认的情况下,kafka consumer就是会自动的提交偏移量。

auto.commit.interval.ms 是跟 enable.auto.commit 配套的配置,也是消费者的配置项,它表示 自动提交偏移量的时间间隔,单位是毫秒,默认值是5000,也就是5秒自动提交一次。

手动提交偏移量

如果你期望手动控制偏移量的提交,可以把 enable.auto.commit 设为 false. 这样你可以自己用代码手动控制偏移量的提交。 代码片断参考:

1
2
3
4
5
6
7
8
9
10
11
while(true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
logger.info("{},{}",record.topic(),record.value());
}
try {
consumer.commitSync();
}catch (RuntimeException e){
logger.error("commit failed:{}",e);
}
}

上述代码表示,消费到一批数据,进行处理后,手动提交偏移量。commitSync()可能会抛出异常,如果提交失败,打印日志记录,便于排查问题。commitSync()表示同步提交,这样会影响吞吐量,如果对性能要求较高,可以使用 commitAsync()进行异步提交。

重复消费

什么是重复消费?一条消息被再两次或多次消费,就是重复消费。按前面讲的偏移量的说明,如果由于某种原因导致消息消费了,但偏移量没有及时提交,那么下一次再消费消息时,就会导致重复消费。下面是一个按时间线的举例:

1
2
3
4
5
6
7
10:00:00 consumer拉取了新的2条消息,M1,M2
10:00:01 consumer完成M1的处理
10:00:02 consumer所在的主机停电。自动提交偏移量的时间窗口未到,偏移量未提交
...
10:01:00 consumer所在的主机电力系统恢复,重新消费消息
10:01:01 consumer再次拉取了消息,M1,M2。
10:01:02 M1被重复消费,由于前面的故障导致原来消费的偏移量没有提交,所以造成重复消费

对于重要的业务系统,幂等的设计是很有必要的,比如一个电商系统中有一个积分模块,定义每消费1元即可获得1个积分。如果出现了重复消费,就可能导致一个订单加了两次积分。为了避免这种情况的发生,我们需要有一张积分处理记录表,记录某个的订单ID对应的订单是否已经处理过积分了。这样重复消费时,发现该订单对应的积分已经处理过了,就不再对积分进行增加的操作。

漏消费

漏消费正好跟重复消费相反,漏消费就是某条消息没有被正常处理,被漏掉了。当consumer已经拉取了某个消息,且偏移量正常提交了,但业务代码并没有正常处理这个消息。下面的时间线给出了一个示例:

1
2
3
4
5
6
7
8
11:00:00 consumer拉取了新的2条消息,M3,M4
11:00:01 consumer开始处理M3
...
11:00:05 consumer自动提交偏移量,此时kafka认为M3,M4均已被消费
11:00:06 consumer所在的主机停电。M3处理到一半,M4未处理
...
11:01:01 consumer所在的主机电力恢复。重新消费
11:01:02 consumer拉取了新的M5。 因为它以为M3,M4已处理

从上面的示例可以看出,当一个消息的处理时间比较长的时候,就容易导致漏消费。这种情况建议

  1. 每次尽量拉取少量的消息,比如一次拉取1条,需要设置消费者配置 max.poll.records 为 1
  2. 手动提交偏移量
  3. 业务设计上,允许人工补录。比如允许客服对某个指定的订单进行积分手动触发结算

线程问题

上面的代码示例里,都是单线程来处理消息的,在同一个群组里,我们无法让一个线程运行多个消费者,也无法让多个线程安全地共享一个消费者。一个消费者使用一个线程。当然在做消息处理时,可以把业务处理的部分用多个线程来处理。

查看分配信息与消费延迟

1
2
3
4
5
# bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.102:9092 --describe --group my-group

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my-group TEST 0 563 563 0 consumer-1-215d937e-205e-4965-be82-fad8142d3830 /192.168.1.85 consumer-1
my-group TEST 1 527 527 0 consumer-1-215d937e-205e-4965-be82-fad8142d3830 /192.168.1.85 consumer-1

如上面的示例,我们可能使用 kafka-consumer-groups.sh 脚本来查看某个消费者群组的分配情况及延迟情况。如上面的命令执行结果,LAG 列对应的就是延迟的情况,如果延迟比较多,就说明消费者已经处理不过来了。HOST列则是表示这个分区当前正分配给了哪个IP的消费者来处理。

如果在开发调试的过程中,特别是有团队内多个成员协作的情况下,发现发了消息但是没收到,或者说有的消息能收到,有点消息确又收不到,就可以通过上面的命令来查看。看看是不是所有分区都分配在你当前调试的机器上。有时候如果配置错误,会出现开发环境,测试环境错乱的情况,需要根据当前的分区实际分配情况,来排查问题。

问答

我们回过头来看看前面的问题,答案已经在上面的正文里给出来,这里简要的答复一下:

  1. 简述消费者与消费组之间的关系

    消费者就是通过订阅一个或多个主题,消费消息的实体。消费者从属于消费者群组,一个群组内的多个消费者通过分区分配来接收不同分区的消息。 对于同一个消费者群组,一条消息只会被其中一个消费者消费。

  2. 有哪些情形会造成重复消费?

    如果由于某种原因导致消息消费了,但偏移量没有及时提交,那么下一次再消费消息时,就会导致重复消费

  3. 那些情景下会造成消息漏消费?

    当consumer已经拉取了某个消息,且偏移量正常提交了,但业务代码并没有正常处理这个消息就崩溃了,就会导致漏消费

  4. KafkaConsumer 是非线程安全的,那么怎么样实现多线程消费?

    在同一个群组里,我们无法让一个线程运行多个消费者,也无法让多个线程安全地共享一个消费者。一个消费者使用一个线程。在做消息处理时,可以把业务处理的部分用多个线程来处理。

  5. 我明明发了消息,但是为什么没收到?

    通过 kafka-consumer-groups.sh 检查消费组的分区分配情况,检查是否有配置错误

  6. 怎么知道现在消息是不是有堆积,消费不及时?

    通过 kafka-consumer-groups.sh 检查消费组的情况,查看LAG的数值,如果LAG数值较大,说明有堆积

  7. 怎么查看某个主题的某个分区分配给哪个消费者了?

    通过 kafka-consumer-groups.sh 检查消费组的分区分配情况,查看HOST列

总结

消费是kafka的重要环节,了解其工作模式对于问题排查还是很有必要的。

参考资料

  1. kafka-clients 2.7.0 源代码
  2. 《kafka权威指南》
  3. 官方文档