nutuml docsify
1 | <script> |
1 | <script> |
最近遇到一个需求,一个表里有50个字段左右。我需要写一个详情接口,把这些字段都查出来。我想这很简单呀,我直接select * 不就好了嘛。于是我就这样干了
1 | select * from foo; |
嗯,似乎很完美,但是突然发现,这个表里有个敏感字段的内容是不能输出的,也就是我得从其中去掉一个字段。那就麻烦了,我可不想select column1,column2,column3,column4, 这样一个个写,要是这样一个个的写完,这么多的字段,我手都得残废了。我就想能不能写个SQL来自动拼接字段名称。答案当然是可以的,我们来看看这个SQL
1 | select GROUP_CONCAT(concat('`', COLUMN_NAME,'`')) from information_schema.`COLUMNS` |
对的,就这样就可以了。这下舒服了。 当然SQL语句前面的 select 后面 from 和where 就根据自己的实际需要来补充了。
简单说明一下,使用上面SQL的时候要把dbname换成你的数据库的名称,tablename换成表名。比如我们查wordpress的wp_user表。
1 | mysql> select GROUP_CONCAT(concat('`', COLUMN_NAME,'`')) from information_schema.`COLUMNS` where TABLE_SCHEMA='blog' and TABLE_NAME='wp_users'; |
有人说,要不要解析一下这个SQL。好的,这就解析一下
COLUMNS
这个表里把所有的字段名称查出来,就是 COLUMN_NAME 这个字段。
之前在一个项目里,遇到一个问题:当时是消费者有两个节点,其中一个节点能消费到消息,另一个节点就一直消费不到消息。当时也是对kafka不太了解,也正因为这个问题,使我对kafka的分区有了更深入的理解。我们先来看几个问题:
希望通过这篇文章的介绍,能让大家对kafka的分区有一个清楚的理解,也能很好的回答上面的问题。
写入消息和消费消息时都需要指定主题(Topic),消息是按主题来分类的。每个主题可以有多个分区(Partition),每个分区同一时间只能被一个消费者消费。通过把多个分区分散到多个服务器上,可以提高系统的服务能力。消息以追加的方式写入分区,然后以先进先出的顺序读取。下图是主题内有多个分区的消息写入示意图:
由于一个主题内包含了多个分区,因此无法在整个主题范围内保证消息的顺序,但可以保证消息在单个分区内的顺序。那么我如果希望实现主题内消息的顺序要怎么办呢?一个简单的方法就是只使用一个分区。
每个分区,对于每一个消息者群组,同一时间只能有一个消费者。有以下三种对应的情况:
分区数量大于消费者数量时,一个消费者会消费来自多个分区的数据。
分区数量小于消费者数量时,会有消费者处于无分区消费的情况,这个无分区消费的消费者就处于空闲状态了。这种情况下增加消费者并不能提高消费的处理速度。
那么问题来了,如果一个消息的消费是比较耗时的,那要怎么办呢?比如说:有一个影片转码系统,一个长视频的转码可能需要一个小时,甚至更长的时间,我们把待处理的视频消息放进了kafka。这个时候我们就希望能充分的使用多个消费者来提高整个系统的转码能力。这种情况我们需要做一些特殊处理,每个消费者,消费到一个消息后,在开始转码的任务前先停止消费。这样有新的消息到达时,就会分配给其它消费者。直到转码任务结束,可以开始下一个转码任务后,再开始消费。通过这样的方式实现消费者的资源充分利用。值得注意的是,这样的处理方法,只适用于需要长时间处理消息的情况,如果处理一个消息所花的时间是很短的,则不应该这样处理。下面的时序图可以帮助理解:
1 | $ ./bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic test-topic \ |
1 | $ ./bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --alter --topic test-topic \ |
修改分区数量时,不能比原来的分区数要小,因为减少分区,kafka不知道要如何分区里的数据。如果一定要不顾一切代价,减少某个主题分区的数量,那就只能把这个主题先删除掉,再重新创建。这样一来,主题里原有的数据就全部丢失了。
除了主题创建和主题修改时,我们能控制分区数量外,在生产者和消费者的API使用过程中,也会存在分区的分配过程。
既然有分区,那么当生产者写入一条新的消息时,分区是如何确定的呢?我们直接看 ProducerRecord.java 的源代码中 ProducerRecord 构造函数的定义。下面是从 kafka-clients 2.7.0的代码中提取的片段:
1 | public ProducerRecord(String topic, Integer partition, K key, V value) |
在这里我截取其中三个构造函数,从上面的第一个构造函数可以看到,我们可以直接指定一个消息的 partition, 它的类型是Integer。除了直接指定partition,还可以在创建Producer的时候,通过 partitioner.class 配置,指定自定义的分区器。分区器需要实现 Partitioner 接口。
在默认的情况下,如果指定了 key, 那么Kafka会通过key,取它的哈希值,再对分区总数取模,分配到对应的分区。因此 对于同样的key, 在分区数量不变的情况下,总是会被分配到同一个分区。
特别注意,不要给所有的消息都指定同样的非空key, 这样一来所有的消息都会被分配到同一个分区,这样分区也就没有意义了。
如果没有指定key, 即 key==null 的情况下,kafka 会使用随机算法,给每一批消息分配一个随机的分区。如果你想使用 RoundRobin 轮询的方式来指定分区,可以通过修改 partitioner.class 的配置修改分区器的配置,参考代码:
1 | Properties props = new Properties(); |
从上面的分析我们知道,所有的指定消息对应的分区的逻辑都是在 kafka client 端实现的。
正如前文所说:每个分区同一时间只能被一个消费者消费。那么问题来了,在一个消费者群组内有多个消费者的时候,kafka是如何给消费者分配分区的呢?在消费的配置项里,有一项是:partition.assignment.strategy 就是用来指定分区分配策略的。默认的情况下使用的是 org.apache.kafka.clients.consumer.RangeAssignor 。简单的说就是按范围来分配。我从日志中看到如下的信息:
1 | Notifying assignor about the new Assignment(partitions=[TEST1-4, TEST1-5, TEST1-6, TEST1-7]) |
我们可以看到kafka分配4个连续的分区给这个消费者。
需要注意的是,当有新的消费者加入,或者有消费者失效,kafka就会触发分区再均衡,也就是把一个分区的消费权从一个消费者转移给另一个消费者。在再均衡期间,消费者无法读取消息,整个群组会出现一小段时间的不可用。
我们回过头来看看前面的问题,答案已经在上面的正文里给出来,这里简要的答复一下:
回答:正确,每个分区同一时间只能被一个消费者消费。hack的手段就是消费者消费到消息后,在处理消息前,先停止消费,处理完了再进行下一次消费,只适用于并发小,消息处理时间长的情况。
回答:可以, 通过 kafka-topics.sh 脚本修改分区数量
回答:不可以,因为分区数量减少后,kafka无法处理已经存在的数据。如果一定要不顾一切代价,减少某个主题分区的数量,那就只能把这个主题先删除掉,再重新创建。这样一来,主题里原有的数据就全部丢失了。
回答:这个问题特别开放,回答起来还是蛮模糊的,说几点供大家参考。1. 如果你不知道怎么选择,那么1也许是不错的选择。 2. 如果你发现多个消费者中有人空闲了,那么分区数量应该大于等于一个消费者群组下的消费者的数量。3. 如果你想充分发挥多个broker的性能,那么分区数量应该大于等于broker的数量
回答:1. 生产者发送消息时,需要给消息分配一个指定的分区;在默认的情况下,如果指定了 key, 那么Kafka会通过key,取它的哈希值,再对分区总数取模,分配到对应的分区。如果没有指定key, 即 key==null 的情况下,kafka 会使用随机算法,给每一批消息分配一个随机的分区。也可以自定义分区分配的方案。2. 消费者消费消息时,会给每个消费者分配对应的分区,默认按分区范围来分配,尽量分配连续的分区编号给同一个消费者。当有新的消费者加入,或者有消费者失效,kafka就会触发分区再均衡,也就是把一个分区的消费权从一个消费者转移给另一个消费者
回答:在默认的情况下,如果指定了 key, 那么Kafka会通过key,取它的哈希值,再对分区总数取模,分配到对应的分区。如果没有指定key, 即 key==null 的情况下,kafka 会使用随机算法,给每一批消息分配一个随机的分区。也可以自定义分区分配的方案。
分区是kafka的重要特性,了解其工作模式及原理还是对于问题排查还是很有必要的。合适的分区数量对于提高消息吞吐量,同时消费的能力至关重要。
在Springboot中使用kafka通常有两种式,一种是直接使用kafka-client, 自己做简单封装;另一种是使用spring-kafka的封装。本文介绍的前一种,这样我们只需要更多的了解 kafka本身就可以了。
我们先看示例工程的目录结构:
1 | pom.xml |
我们在pom.xml中引入了spring boot 项目通常都包含的 spring-boot-starter-web 和 lombok. 然后引入了 kafka-clients。
1 | <dependency> |
Application就是启动Springboot的启动入口,没有额外的配置
1 | import org.springframework.boot.SpringApplication; |
1 | import lombok.extern.slf4j.Slf4j; |
这里的示例创建了两个对象实例,一个 kafkaConsumer, 一个 KafkaProducer。如果你的工程只需要生产者,那么把 kafkaConsumer 相关的部分删除掉就可以了。如果你的工程只需要消费者,那么把 KafkaProducer 相关的部分删除掉就可以了。
1 | import lombok.extern.slf4j.Slf4j; |
KafkaService 这里是一个 消费者的示例,通过实现 CommandLineRunner 接口,让这部分代码在springboot 初始化后,执行。在这里启动一独立的线程去消费是为了 避免主线程阻塞在这里。
1 | import org.apache.kafka.clients.producer.KafkaProducer; |
TestController 是一个简单的发送示例。把用户通过浏览器发送的消息写入 kafka 的 TEST1 这个主题中。各位测试的时候要注意看自己的kafka服务器中,是否有 TEST1 这个主题,如果没有的话,可能需要手动创建一下。
这里配置文件,在 KafkaConfig 中引用
1 | kafka: |
经过前面的准备,现在就可以启动测试了。测试前需要确认:
然后就可以运行 Application 开始你的测试。服务起来后,可以通过浏览器访问
1 | http://localhost:8080/test?msg=hello123 |
如果你看到控制台里输出
1 | received message:TEST1,hello123 |
这样的log,就说明集成成功了。
总的来说 KafkaConfig.java这个类是关键的集成部分,其它的其实都是辅助测试用的类,在实际项目中需要根据实际情况来修改。
本文档旨在用最简单的方法,把kafka的生产者和消费者的示例代码,运行起来,并做相应的讲解。读者需要有一定的java基础,会用maven,会使用一款java开发工具。
代码清单包括:
在pom.xml中添加依赖
1 | <dependency> |
kafka-clients是kafka的依赖包,logback-classic是为了看kafka-client日志的,虽然logback-classic不是必须的,但是为了能看到相关的日志显示,建议还是加上。如果你的项目里已经有 log4j了,可以不用logback.
创建一个日志的配置文件src/main/resources/logback.xml,内容如下:
1 | <configuration> |
这里把root的 log level 设成了 info, 实现在 kafka的 debug 日志太多了。
创建一个java类,我这里把文件放到了 src/main/cn/genlei/Producer.java, 你可以根据自己的需要,修改java文件的包名。
1 | import org.apache.kafka.clients.producer.KafkaProducer; |
创建一个java类,我这里把文件放到了 src/main/cn/genlei/Consummer.java, 你可以根据自己的需要,修改java文件的包名。
1 | import org.apache.kafka.clients.consumer.ConsumerRecord; |
代码写好后就可以运行查看效果了。需要说明的是如果你的kafka服务器没有开启topic的自动创建,就需要手动先创建一个topic. 可以参考下面的命令:
1 | ./bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic TEST1 --partitions 1 --replication-factor 1 |
可以先运行 Consummer , 然后再运行 Producer . 每运行一次 Producer 后 ,Consummer都应该能收到一条消息。
我们先问两个问题:
按kafka官网的说法,Apache Kafka 是一个开源分布式事件流平台,它被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。
按个人理解,kafka是一个开源的,流行的消息队列。Kafka是Apache基金会基于宽松的Apache License v2 开源协议开源的软件,可以免费用于商业应用。 Kafka是流行的,广泛在业界使用的。我们看国内几个大的云服务供应商提供的消息队列产品:
云服务
Kafka
RabbitMQ
RocketMQ
Pulsar
阿里云
支持
支持
支持
N
华为云
支持
支持
支持
N
腾讯云
支持
支持
支持
支持
Ucloud优刻得
支持
N
N
N
青云QingCloud
支持
支持
N
N
从上面的表格就可以看出,kafka已经足够流行了。
Kafka属于消息队列,消息队列的作用主要就是 解耦,削峰。
Kafka的逻辑结构请看下图:
生产者是指产生消息的实体,通常是业务的上游模块产生消息。消费者是指消费消息的实体,通常是业务的下游模块消费消息。生产者把消息写入kafka服务器集群,消费者从kafka服务器集群中获取消息,生产者和消费者之间没有直接的联系,生产者不知道消息被哪些消费者消费了,这样一来通过kafka就实现了生产者与消费者之间的解耦。
Kafka的数据单元被称为消息。消息由字节组成,若干个字节表示一条消息。一条消息在kafka里就是一条记录。通常在实际项目中用一个Json对象表达一条消息。消息由生产者产生,生产者产生消息后,把消息发送到kafka服务器,消费者从kafka服务器中消费消息。
一个Kafka集群由多台服务器组成,最少3台。每台服务器上运行一个Borker实例,Broker就是一个独立的Kafka服务器。
写入消息和消费消息时都需要指定主题,消息是按主题来分类的。每个主题可以有多个分区,每个分区同一时间只能被一个消费者消费。通过把多个分区分散到多个服务器上,可以提高系统的服务能力。
假设有一个外卖系统,系统由订单微服务,商家微服务,配送员微服务成。那么一次交易过程的数据流如下图所示。
在这样一个示例中,我们就实现了 订单微服务,商家微服务,配送员微服 的解耦。
准备一台linux机器,我自己创建了一个kafka用户,各位可以根据自己的需求,创建不期望的linux用户。
1 | [kafka@localhost ~]$ wget https://dlcdn.apache.org/kafka/3.0.0/kafka_2.13-3.0.0.tgz --no-check-certificate |
你可以直接使用上面的命令下载安装包,也可以到官网下载安装包。我自己安装时是在 /home/kafka目录下安装的,你可以根据自己的需要选择一个合适的目录。然后就是解压
1 | [kafka@localhost ~]$ tar -zxvf kafka_2.13-3.0.0.tgz |
1 | [kafka@localhost ~]$ cd kafka_2.13-3.0.0 |
先通过 cd kafka_2.13-3.0.0 进入kafka所在目录,然后执行 ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties 命令启动zookeeper。为了方便调试,一开始可以先不加 -daemon 这个后台运行的参数。不加 -daemon就可以在命令行直接看到启动日志。 加了 -daemon 就表示直接后台运行。
1 | [kafka@localhost kafka_2.13-3.0.0]$ ./bin/kafka-server-start.sh -daemon config/server.properties |
到此为止,kafka就正常运行起来了,可以使用kafka客户端测试一下。
我们先看看端口是否正常监听了,通过netstat -ntulp 命令
1 | [kafka@localhost kafka_2.13-3.0.0]$ netstat -ntulp |
如上面的结果,如果你看到 9092端口(kafka默认端口)和2181端口(zookeeper默认端口)都是正常监听状态,就说明kafka已经正常运行了。如果你看不到 可以把上面第二步 和第三步的 -daemon 参数去掉,看看是否有报错。可能会有一些文件目录权限的错误。
接下来我们创建一个topic.
1 | [kafka@localhost kafka_2.13-3.0.0]$ ./bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic test-topic --partitions 1 --replication-factor 1 |
控制台打印出
1 | Created topic test-topic. |
这就说明 topic 创建成功了。
当然这个安装只是单机的安装,如果需要集群部署,还需要修改一些配置。这个就不在这篇文章里讨论了。
通常使用kafka都是安装在linux系统的,但有时候为了方便学习和本地开发测试,我们手上又没有linux机器,就可以考虑直接在windows下安装一个kafka. 请看下面的安装步骤
我们到官网 https://kafka.apache.org/downloads 下载安装包,我写这篇文章的时候,最新的版本是3.0.0。我下载的是 kafka_2.13-2.8.1.tgz 这个安装包。 下载后解压,我把文件解压到了 c:\soft目录下,大家可以根据自己的需要存放
打开 windows 命令行,进入 c:\soft\kafka_2.13-2.8.1 目录
执行 bin\windows\zookeeper-server-start.bat config\zookeeper.properties 命令启动zookeeper
1 | C:\soft\kafka_2.13-2.8.1>bin\windows\zookeeper-server-start.bat config\zookeeper.properties |
如果有错误,注意检查一下自己当前所在的目录。
打开 另一个 windows 命令行,进入 c:\soft\kafka_2.13-2.8.1 目录
执行 bin\windows\kafka-server-start.bat config\server.properties 命令启动kafka-server
1 | C:\soft\kafka_2.13-2.8.1>bin\windows\kafka-server-start.bat config\server.properties |
到此为止,在windows下安装和启动kafka就算结束了,如果要验证安装是否正确,可以使用kafka客户端测试一下。
接下来我们创建一个topic.
1 | C:\soft\kafka_2.13-2.8.1>bin\windows\kafka-topics.bat --bootstrap-server 127.0.0.1:9092 --create --topic test-topic |
控制台打印出
1 | Created topic test-topic. |
这就说明 topic 创建成功了。
一开始我用的是kafka3.0.0的版本,我在执行这个命令时,报了一个错:
1 | [2021-12-02 09:02:37,380] ERROR Failed to create or validate data directory C:\tmp\kafka-logs (kafka.server.LogDirFailureChannel) |
按错误信息看,是一个目录没有权限。后来给目录加上了权限,还是报这个错。在网上查有人说是数据有问题,我就把 c:\tmp下的目录 全删了,再重启zookeeper 和 kafka-server。也还是不行,再查了一下发现有人说是版本有问题。3.0.0的版本会报错,用2.8.1就好了。我试了一下,果然如此。
最后说明一下,windows下使用kafka建议还是仅供本地开发测试。生产还是使用linux系统安装kafka。
kafka从0.2.11版本开始支持事务,本文档对kafka事务作一个简单的说明,同时给出java代码示例,并对代码做一些简单的说明,同时说明相关的注意事项。希望能对需要使用kafka事务的朋友有帮助。
2017年6月28日,Kafka官方发布了0.11.0.0的版本,从这个版本开始,kafka支持了事务。那么,什么是kafka中的事务呢?
kafka事务支持生产者能够将一组消息作为单个事务发送,该事务要么原子地成功要么失败。举个例子,用户支付了某个订单,订单支付后,需要通知库存模块去减少库存,同时需要通知优惠券模型去扣减优惠券,这两个消息你需要让它们要么都成功,要么都失败。这个时候就可以使用kafka事务。
我们先看代码。 第一步:引入依赖,在pom.xml中增加kafka client的依赖
1 | <dependency> |
第二步:发送消息相关代码
1 | Properties props = new Properties(); |
注意事项:
参考文档: https://www.confluent.io/blog/transactions-apache-kafka/
标签是一个很多业务系统里都常用的功能,我们需要统计每个标签对应的文章数量。我们的数据如下:
文章表(article):
id
tags
…
1
aaa,bbb
…
2
bbb,ccc
…
3
ddd
…
我们需要统计每个标签对应的文章数量,期望输出以下内容
tag
cnt
aaa
1
bbb
2
ccc
1
ddd
1
那么这个SQL要怎么写呢?先看最终SQL:
1 | SELECT tag, COUNT(*) cnt |
如果你直接运行上面的SQL,很可能会遇到SQL错误 Table ‘tally’ doesn’t exist 。 是的,这个SQL需要有一个辅助表tally. 你可以使用以下SQL创建这个辅助表:
1 | CREATE TABLE tally(n INT NOT NULL AUTO_INCREMENT PRIMARY KEY); |
article 表的SQL:
1 | CREATE TABLE `article` ( |