Kafka学习笔记

基本概念

  • Producer:生产者,负责创建消息,然后将其投递到Kafka中
  • Consumer:消费者,连接到Kafka上并接收消息,进而进行相应的业务逻辑处理
  • Broker:服务代理节点。对于Kafka而言,Broker可以简单地看作一个独立的Kafka服务节点或Kafka服务实例,一个或多个Broker组成了一个Kafka集群
  • Topic:主题,消息归类(逻辑上)
  • Partition:分区,消息归类(物理上)
  • ISR:与leader副本保持一定程度同步的副本(包括Leader)组成ISR(In-Sync Replicas)
  • OSR:与leader副本同步滞后过多的副本(不包括leader)副本,组成OSR(Out-Sync Replicas)
  • AR=ISR+OSR
  • HW:High Watermark的缩写,俗称高水位,它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个offset之前的消息
  • LEO:Log End Offset的缩写,它标识当前日志文件中下一条待写入消息的offse,所有副本中最小LEO即为HW

Partition

  • 一个主题对应多个分区,不同分区可能分布在不同的Broker上,增加分区可以增加kafka水平扩展能力,一个分区也只能被同一个消费者组的一个消费者消费
  • 分区可以有多个副本,一主多从,副本默认不提供服务,做容灾使用
  • 同一主题下不同分区的数据不同(hash散列)
  • 单个分区内消息有序
  • 分区在存储层面可以看作一个可追加的日志(Log)文件,消息在被追加到分区日志文件的时候都会分配一个特定的偏移量(offset)

常用配置项

Producer配置项

官方文档

配置项 说明
key.serializer
value.serializer
acks
bootstrap.servers
buffer.memory RecordAccumulator 消息累加器缓存消息的大小,默认32M如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候KafkaProducer的send()方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms的配置,此参数的默认值为60000,即60秒
max.block.ms 消息累加器满后发送消息最长阻塞时间
compression.type
batch.size ProducerBatch 的大小,实际上是BufferPool来管理的
retries
ssl.key.password
ssl.keystore.certificate.chain
ssl.keystore.key
ssl.keystore.location
ssl.keystore.password
ssl.truststore.certificates
client.dns.lookup
client.id
connections.max.idle.ms
delivery.timeout.ms
linger.ms
max.request.size

| partitioner.class | |
| request.timeout.ms | |
| sasl.client.callback.handler.class | |
| sasl.jaas.config | |
| sasl.kerberos.service.name | |
| sasl.login.callback.handler.class | |
| sasl.login.class | |
| sasl.mechanism | |
| security.protocol | |
| send.buffer.bytes | |
| receive.buffer.bytes | |

生产者客户端架构

https://img-blog.csdnimg.cn/20200503133843165.png

核心对象

KafkaProducer

ProducerRecord

分区器
Partitioner
Serialize
Interceptor
KafkaProducer中一般会发生两种类型的异常:可重试的异常和不可重试的异常。常见的可重试异常有:NetworkException、LeaderNotAvailableException、UnknownTopicOrPartitionException、NotEnoughReplicasException、NotCoordinatorException 等。比如NetworkException 表示网络异常,这个有可能是由于网络瞬时故障而导致的异常,可以通过重试解决;又比如LeaderNotAvailableException表示分区的leader副本不可用,这个异常通常发生在leader副本下线而新的 leader 副本选举完成之前,重试之后可以重新恢复。不可重试的异常,比如 1.4 节中提及的RecordTooLargeException异常,暗示了所发送的消息太大,KafkaProducer对此不会进行任何重试,直接抛出异常。对于可重试的异常,如果配置了 retries 参数,那么只要在规定的重试次数内自行恢复了,就不会抛出异常。retries参数的默认值为0,配置方式参考如下

默认的分区器会对 key 进行哈希(采用MurmurHash2算法,具备高运算性能及低碰撞率),最终根据得到的哈希值来计算分区号,拥有相同key的消息会被写入同一个分区。如果key为null,那么消息将会以轮询的方式发往主题内的各个可用分区

如果 key 不为 null,那么计算得到的分区号会是所有分区中的任意一个;如果 key为null,那么计算得到的分区号仅为可用分区中的任意一个,注意两者之间的差别

发送消息主要有三种模式
发后即忘(fire-and-forget)
同步(sync)
异步(async)

broker配置项

  • auto.create.topics.enable:自动创建主题,默认true,生产环境不建议开启
  • num.partitions:默认分区数量
  • default.replication.factor:默认副本数量

listeners参数说明

表示broker监听客户端连接的地址列表,即为客户端要连接broker的入口地址列表,配置格式为 protocol1://hostname1:port1,protocol2://hostname2:port2

  • protocol代表协议类型,Kafka当前支持的协议类型有PLAINTEXT、SSL、SASL_SSL等,如果未开启安全认证,则使用简单的PLAINTEXT即可
  • hostname代表主机名,port代表服务端口,此参数的默认值为 null。比如此参数配置为 PLAINTEXT://198.162.0.2:9092,如果有多个地址,则中间以逗号隔开

advertised.listeners参数说明

作用和listeners类似,默认值也为 null。不过 advertised.listeners 主要用于 IaaS(Infrastructure as a Service)环境,比如公有云上的机器通常配备有多块网卡,即包含私网网卡和公网网卡,对于这种情况而言,可以设置advertised.listeners参数绑定公网IP供外部客户端使用,而配置listeners参数来绑定私网IP地址供broker间通信使用

常用命令

创建主题

1
./bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 2 --partitions 3 --topic test

查看主题

1
./bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181

查看主题详情

1
./bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181 --describe --topic test

删除主题

1
./bin/kafka-topics.sh --delete --zookeeper 127.0.0.1:2181 --topic test

发送消息

1
./bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test

消费消息

1
2
./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning
./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test

查看消费者

1
./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list

查看正在消费的topic(消息积压)

1
./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group console-consumer-63307 

查看topic某分区偏移量最值

1
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic daShuJuPushDataTopic -time -1

创建主题约束

kafka-topics.sh脚本在创建主题时还会检测是否包含“.”或“_”字符。为什么要检测这两个字符呢?因为在Kafka的内部做埋点时会根据主题的名称来命名metrics的名称,并且会将点号“.”改成下画线“_”。假设遇到一个名称为“topic.1_2”的主题,还有一个名称为“topic_1.2”的主题,那么最后的metrics的名称都会为“topic_1_2”,这样就发生了名称冲突。举例如下,首先创建一个以“topic.1_2”为名称的主题,提示 WARNING 警告,之后再创建“topic.1_2”时发生InvalidTopicException异常

kafka机架感知(broker.rack)

Kafka从0.10.x版本开始支持指定broker的机架信息(机架的名称)。如果指定了机架信息,则在分区副本分配时会尽可能地让分区副本分配到不同的机架上。指定机架信息是通过broker端参数broker.rack来配置的,比如配置当前broker所在的机架为“RACK1

本地启动kafka

1
D:\kafka_2.13-2.6.0\bin\windows\kafka-server-start.bat D:\kafka_2.13-2.6.0\config\server.properties