Kafka学习笔记
基本概念
- Producer:生产者,负责创建消息,然后将其投递到Kafka中
- Consumer:消费者,连接到Kafka上并接收消息,进而进行相应的业务逻辑处理
- Broker:服务代理节点。对于Kafka而言,Broker可以简单地看作一个独立的Kafka服务节点或Kafka服务实例,一个或多个Broker组成了一个Kafka集群
- Topic:主题,消息归类(逻辑上)
- Partition:分区,消息归类(物理上)
- ISR:与leader副本保持一定程度同步的副本(包括Leader)组成ISR(In-Sync Replicas)
- OSR:与leader副本同步滞后过多的副本(不包括leader)副本,组成OSR(Out-Sync Replicas)
- AR=ISR+OSR
- HW:High Watermark的缩写,俗称高水位,它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个offset之前的消息
- LEO:Log End Offset的缩写,它标识当前日志文件中下一条待写入消息的offse,所有副本中最小LEO即为HW
Partition
- 一个主题对应多个分区,不同分区可能分布在不同的Broker上,增加分区可以增加kafka水平扩展能力,一个分区也只能被同一个消费者组的一个消费者消费
- 分区可以有多个副本,一主多从,副本默认不提供服务,做容灾使用
- 同一主题下不同分区的数据不同(hash散列)
- 单个分区内消息有序
- 分区在存储层面可以看作一个可追加的日志(Log)文件,消息在被追加到分区日志文件的时候都会分配一个特定的偏移量(offset)
常用配置项
Producer配置项
配置项 | 说明 |
---|---|
key.serializer | |
value.serializer | |
acks | |
bootstrap.servers | |
buffer.memory | RecordAccumulator 消息累加器缓存消息的大小,默认32M如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候KafkaProducer的send()方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms的配置,此参数的默认值为60000,即60秒 |
max.block.ms | 消息累加器满后发送消息最长阻塞时间 |
compression.type | |
batch.size | ProducerBatch 的大小,实际上是BufferPool来管理的 |
retries | |
ssl.key.password | |
ssl.keystore.certificate.chain | |
ssl.keystore.key | |
ssl.keystore.location | |
ssl.keystore.password | |
ssl.truststore.certificates | |
client.dns.lookup | |
client.id | |
connections.max.idle.ms | |
delivery.timeout.ms | |
linger.ms | |
max.request.size |
| partitioner.class | |
| request.timeout.ms | |
| sasl.client.callback.handler.class | |
| sasl.jaas.config | |
| sasl.kerberos.service.name | |
| sasl.login.callback.handler.class | |
| sasl.login.class | |
| sasl.mechanism | |
| security.protocol | |
| send.buffer.bytes | |
| receive.buffer.bytes | |
生产者客户端架构
https://img-blog.csdnimg.cn/20200503133843165.png
核心对象
KafkaProducer
ProducerRecord
分区器
Partitioner
Serialize
Interceptor
KafkaProducer中一般会发生两种类型的异常:可重试的异常和不可重试的异常。常见的可重试异常有:NetworkException、LeaderNotAvailableException、UnknownTopicOrPartitionException、NotEnoughReplicasException、NotCoordinatorException 等。比如NetworkException 表示网络异常,这个有可能是由于网络瞬时故障而导致的异常,可以通过重试解决;又比如LeaderNotAvailableException表示分区的leader副本不可用,这个异常通常发生在leader副本下线而新的 leader 副本选举完成之前,重试之后可以重新恢复。不可重试的异常,比如 1.4 节中提及的RecordTooLargeException异常,暗示了所发送的消息太大,KafkaProducer对此不会进行任何重试,直接抛出异常。对于可重试的异常,如果配置了 retries 参数,那么只要在规定的重试次数内自行恢复了,就不会抛出异常。retries参数的默认值为0,配置方式参考如下
默认的分区器会对 key 进行哈希(采用MurmurHash2算法,具备高运算性能及低碰撞率),最终根据得到的哈希值来计算分区号,拥有相同key的消息会被写入同一个分区。如果key为null,那么消息将会以轮询的方式发往主题内的各个可用分区
如果 key 不为 null,那么计算得到的分区号会是所有分区中的任意一个;如果 key为null,那么计算得到的分区号仅为可用分区中的任意一个,注意两者之间的差别
发送消息主要有三种模式
发后即忘(fire-and-forget)
同步(sync)
异步(async)
broker配置项
- auto.create.topics.enable:自动创建主题,默认true,生产环境不建议开启
- num.partitions:默认分区数量
- default.replication.factor:默认副本数量
listeners参数说明
表示broker监听客户端连接的地址列表,即为客户端要连接broker的入口地址列表,配置格式为 protocol1://hostname1:port1,protocol2://hostname2:port2
- protocol代表协议类型,Kafka当前支持的协议类型有PLAINTEXT、SSL、SASL_SSL等,如果未开启安全认证,则使用简单的PLAINTEXT即可
- hostname代表主机名,port代表服务端口,此参数的默认值为 null。比如此参数配置为 PLAINTEXT://198.162.0.2:9092,如果有多个地址,则中间以逗号隔开
advertised.listeners参数说明
作用和listeners类似,默认值也为 null。不过 advertised.listeners 主要用于 IaaS(Infrastructure as a Service)环境,比如公有云上的机器通常配备有多块网卡,即包含私网网卡和公网网卡,对于这种情况而言,可以设置advertised.listeners参数绑定公网IP供外部客户端使用,而配置listeners参数来绑定私网IP地址供broker间通信使用
常用命令
创建主题
1 | ./bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 2 --partitions 3 --topic test |
查看主题
1 | ./bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181 |
查看主题详情
1 | ./bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181 --describe --topic test |
删除主题
1 | ./bin/kafka-topics.sh --delete --zookeeper 127.0.0.1:2181 --topic test |
发送消息
1 | ./bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test |
消费消息
1 | ./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning |
查看消费者
1 | ./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list |
查看正在消费的topic(消息积压)
1 | ./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group console-consumer-63307 |
查看topic某分区偏移量最值
1 | ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic daShuJuPushDataTopic -time -1 |
创建主题约束
kafka-topics.sh脚本在创建主题时还会检测是否包含“.”或“_”字符。为什么要检测这两个字符呢?因为在Kafka的内部做埋点时会根据主题的名称来命名metrics的名称,并且会将点号“.”改成下画线“_”。假设遇到一个名称为“topic.1_2”的主题,还有一个名称为“topic_1.2”的主题,那么最后的metrics的名称都会为“topic_1_2”,这样就发生了名称冲突。举例如下,首先创建一个以“topic.1_2”为名称的主题,提示 WARNING 警告,之后再创建“topic.1_2”时发生InvalidTopicException异常
kafka机架感知(broker.rack)
Kafka从0.10.x版本开始支持指定broker的机架信息(机架的名称)。如果指定了机架信息,则在分区副本分配时会尽可能地让分区副本分配到不同的机架上。指定机架信息是通过broker端参数broker.rack来配置的,比如配置当前broker所在的机架为“RACK1
本地启动kafka
1 | D:\kafka_2.13-2.6.0\bin\windows\kafka-server-start.bat D:\kafka_2.13-2.6.0\config\server.properties |