《Kafka权威指南》读书笔记

第一章 初识Kafka

broker,发布消息的中间点

1.2

消息和批次

Kafka的数据单元被称为消息。为键生成一个一致性散列值,而后用散列值对主题分区数进行取模,为消息选取分区。这样保证相同键的消息总是被写到相同的分区。

为了提高消息,消息被分批次写入Kafka。批次是一组消息,这些消息同属同一个主题和分区。要在时间延迟和吞吐量之间作出权衡。批次数据会被压缩。

模式

JSON、XML易用、可读性好,但缺乏强类型的处理能力。不同版本间的兼容性也不好。

主题和分区

主题好比数据库的表,主题可以分为若干个分区,一个分区就是一个提交日志。消息以追加的方式写入分区,以先进先出的顺序读取。 一个主题可以包含多个分区,无法在整个主题范围保证消息的顺序,但消息在单个分区是有序。 Kafka通过分区实现数据冗余和伸缩性。

生产者和消费者

生产者在默认情况下把消息均衡地分布到主题的所有分区上。生产者也可以自定义分区器,将消息映射到指定分区。

消费者订阅一个或多个主题,按消息生成的顺序读取。检查消息的偏移量来区分已经读过的消息。 偏移量是一种元数据,不断递增的整数,在创建消息时,Kafka会把它添加到消息里。

消费者群组保证每个分区只能被一个消息者使用。如果一个消费者失效,群组里的其他消费者可以接管。

broker和集群

一个独立的Kafka称为broker。broker接收生产者消息,为消息设置偏移量,提交至硬盘。broker为消费者提供服务,响应读取,返回磁盘上的消息。

每个集群都有一个broker作为集群控制器。负责管理,把分区分配给broker,监控broker。 在集群中,一个分区属于一个broker,该broker是分区首领。把一个分区分配给多个broker,则会发生分区复制。

消息保留。一段时间(7天)或达到一定的字节数(1GB) 可以配置紧凑型日志,只有最后一个带有特定键的消息会被保留下来。

多集群好处

  • 数据类型分离
  • 安全需要
  • 多数据中心,灾难恢复。

为什么选择Kafka

  • 推送和拉取模型。
  • 多个生产者
  • 多个消费者。 与其他队列系统不同,其他队列系统的消息一旦被一个客户端读取,其他客户端就无法再读取。
  • 基于磁盘的数据储存。
  • 伸缩性,ActiveMQ无法满足横向扩展。
  • 高性能

1.4 数据生态系统

Kafka为客户端提供一致的接口。生产者和消费者之间不再有紧密的耦合。

第二章 安装Kafka

使用Zookeeper保存集群的元数据和消费者信息。 不建议一个群组包含超过7个节点,因为Zookeeper的一致性协议,节点过多会降低整个群组的性能。(3,5,7)

2.3 broker配置

常规配置

  • broker.id 唯一标识
  • log.dirs 如果指定了多个路径,会根据最少使用的原则,把同一个分区的日志保存到同一路径。非最少磁盘空间。
  • auto.create.topics.enable 自动创建主题的情况
    • 写入消息
    • 读取消息
    • 客户端向主题发送元数据请求

主题配置

  • num.partitions 主题将包含多少个分区。如果要让一个主题的分区数少于num.partitions,需要手动创建主题。如果每秒钟要从主题上写入和读取1GB的数据,并且每个消费者每秒能处理50MB的数据,则需要20个分区。
  • log.retention.ms 默认保留168小时,即1周。检查磁盘上日志文件片段的最后修改时间。
  • log.segment.bytes 默认1GB。当消息到达broker,被追加到分区的当前日志片段。当日志片段达到log.segment.bytes,则当前日志片段关闭,创建新的日志片段。
  • log.segment.ms 多长时间后日志片段被关闭。 对于那些数据量小的分区来说,日志片段的关闭总是同时发生。
  • message.max.bytes 单个消息的大小。默认1MB

2.4 硬件的选择

  • 使用多个硬盘,设置多个数据目录。或者组成磁盘阵列。
  • JVM不需要太大的内存。剩余的内存可用作页面缓存,或正在运行的日志片段。
  • 多CPU要求低。批量解压,设置偏移量,然后重新批量压缩。

2.6 集群

好处,可以跨服务器进行负载均衡,可以使用复制功能避免单点故障。

需要多少broker

  • 需要多少磁盘空间。 整个集群10TB,单个broker可以存2TB,则至少需要5个。
  • 处理请求的能力。

broker配置

  • broker必须配置相同的zookeeper.connect,指定用于保存元数据的Zookeeper群组和路径
  • 每个broker的id必须唯一。

操作系统调优

虚拟内存

对于依赖吞吐量的应用,避免内存交换。

vm.dirty_ratio参数可以增加被内存进程刷新到磁盘之前的脏页数据。如果设置了较高的值,建议启动Kafka的复制功能,避免因系统崩溃造成数据丢失。

网络

  • 可以对分配给socket的读写缓冲区的内存大小作调整
  • 设置TCP socket的读写缓冲区

2.7 生产环境注意事项

垃圾回收

G1会自动根据负载进行自我调节,并且停顿时间是恒定的。轻松处理大块的堆内存,把堆内存分为若干个小块。

数据中心布局

把集群broker安装在不同机架

新版的Kafka让消费者把偏移量提交到Kafka服务器。不建议把Zookeeper共享给其他应用。

第三章 生产者

Kafka作为消息队列、消息总线或数据存储平台。

生产者概览

创建一个ProducerRecord对象,包含目标主题、发送内容,还可以指定键和分区。 如果消息成功写入Kafka,就返回一个RecordMetaData对象,包含主题、分区信息,以及记录在分区的偏移量

必须参数

  • bootstrap.servers broker的地址清单,建议至少两个。生产者能够从给定的broker查找到其他broker信息。
  • key.serializer
  • value.serializer
1
2
3
4
5
private Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "broker1:0902,broker2:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String,String>(kafkaProps);

发送消息的3种方式 * 发送并忘记

1
2
ProducerRecord<String,String> record = new ProducerRecord<>("topic1", "key1", "val1");
producer.send(record)
* 同步发送
1
producer.send(record).get();
* 异步发送
1
2
3
4
5
6
7
private Class DemoProducerCallback implements Callback{
  @Override
  public void onCompletion(RecordMetadata recordMetadata, Exception e){
    // do something
  }
}
producer.send(record, new DemoProducerCallback());

3.4 生产者配置

  • acks 指定了必须要多少个分区副本收到消息,生产者才会认为消息写入成功
    • 0, 不会等待服务器的响应
    • 1, 只要集群的首领节点收到,生产者就会收到服务器的成功消息。
    • all,只有所有参与复制的节点收到消息。
  • compression.type 默认不会压缩
  • retries 可重发次数
  • batch.size 当有多个消息需要发送到同一分区,把它们放到一个批次。
  • linger.ms 生产者在发送批次前的等待时间。就算批次只有一条消息。
  • max.in.flight.requests.per.connection 生产者在接受到服务器响应之前最多可以发送多少个消息。设置为1可以保证消息有序。即使重试。
  • max.block.ms 获取元数据时生产者的阻塞时间
  • max.request.size 单个请求的最大值。1MB

序列化

Avro、Thrift、Protobuf

Avro

schema通过JSON来描述,数据被序列化为二进制文件或JSON文件。

3.6 分区

1
new ProducerRecord<>("topic1", "value1");

这里键为null,分区器使用轮询。

当键不为null,对键进行散列。

第四章 消费者

Kafka消息者从属于消费者群组。一个群组里的消费者订阅的是同一个主题,每个消费者接收主题一部分分区的消息。

只要保证每个应用程序有自己的消费者群组,就可以获取到主题的所有的消息。不同于传统的消息系统,横向伸缩Kafka消费者和消费者群并不会造成负面影响。

|主题 |消费者群组1 |消费者群组2| |———|—————–|———| |分区0 |消费者1 |消费者1| |分区1 |消费者2 |消费者2| |分区2 |消费者3 | | |分区3 |消费者4 | |

再均衡

分区的所有权从一个消费者转移到另一个消费者。 再均衡期间,消费者无法读取到消息,造成整个群组的不可用。另外,消费者的当前读取状态会丢失,还可能刷新缓存。

消费者向群组协调器的broker发送心跳。来维持从属关系和对分区的所有权。

当消费者加入群组时,向群组协调器发送一个joinGroup请求,第一个加入群组的消费者成为群主。群主从协调器获取成员列表。并负责给每一个消费者分配分区。

1
2
3
4
5
6
private Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "broker1:0902,broker2:9092");
kafkaProps.put("group.id", "group1");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(props);

4.3 订阅主题

1
2
consumer.subscribe(Collections.singletonList("topic1"))
consumer.subscribe(Collections.singletonList("test.*"))

如果有人创建新主题,并且主题的名字与正则匹配,会立即触发一次再均衡。

在Kafka和其他系统复制数据时,使用正则表达式的方法订阅多个主题很常见。

4.4 轮询

消费者API的核心。

1
2
3
4
5
6
7
try {
  while(true){
    ConsumerRecords<String,String> records = consumer.poll(100);
  }
} finally {
  consumer.close();
}
* 第一次轮询会加入群组,接收分配的分区。 * 如果发生再均衡,也是在轮询间进行 * 心跳

消费者配置

  • fetch.min.bytes
  • fetch.max.wait.ms
  • max.partition.fetch.bytes
  • session.timeout.ms
  • auto.offset.reset 在偏移量无效的情况下,从最新的记录开始读(latest),造成丢数据。也可以在起始位置读(earliest),需要重复处理。
  • enable.auto.commit 默认true
  • partition.assignment.strategy
    • Range 把主题的连续分区分配给消费者
    • RoundRobin 把主题的分区逐个分配给消费者
  • max.poll.records
  • receive.buffer.bytes, send.buffer.bytes

提交和偏移量

提交:把更新分区当前位置的操作。

如果消费者一直运行,那么偏移量就没有用处。

自动提交

默认5s。 在最近一次提交后的3s发生再均衡,则这3s内到达的消息会被重复处理。

提交偏移量

把auto.commit.offset设为false。在处理所有记录后调用commitSync()。会阻塞。

1
2
3
4
5
6
7
8
9
while(true){
  ConsumerRecords<String,String> records = consumer.poll(100);
  for(...){
    ...
  }
  try{
    consumer.commitSync()
  }
}

异步提交

1
2
3
4
5
6
7
while(true){
  ConsumerRecords<String,String> records = consumer.poll(100);
  for(...){
    ...
  }
  consumer.commitAsync()
}

重试异步提交。使用一个单调递增的序列号来维护异步提交的顺序。在每次提交偏移量之后或回调里提交偏移量时递增序列号。重试前检查 * 序列号和即将提交的偏移量相等,可以安全重试。 * 如果序列号较大,说明一个新的提交已发送。应停止重试。

提交特定偏移量

在批次中间提交。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private Map<TopicPartition, OffsetAndMetadata> currentOffset = new HashMap<>();
int count = 0;

while(true){
  ConsumerRecords<String,String> records = consumer.poll(100);
  for(ConsumerRecord<String,String> record: records){
    ...
    currentOffset.put(new TopicPartition(record.topic(), record.partition()),
      new OffsetAndMetadata(record.offset()+1, "no metadata"))
    if(count % 1000 == 0){
      consumer.commitAsync(currentOffsets, null)
    }
    count++;
  }
}

再均衡监听器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private Map<TopicPartition, OffsetAndMetadata> currentOffset = new HashMap<>();

private class HandleRebalance implements Consumer RebalanceListener {
  // 再均衡开始之前,消费者停止读取消息之后
  public void onPartitionsAssigned(Collection<TopicPartition> partitions){
  }

  // 重新分配分区之后
  public void onPartitionsRevoked(Collection<TopicPartition> partions){
    consumer.commitSynce(currentOffsets);
  }
}

consuemr.subscribe(topics, new HandleRebalance())

在记录被保存到数据库之后以及偏移量被提交之前,应用程序仍有可能发生崩溃,导致数据重复。 在同一个事务里把记录和偏移量都写到数据库里。在消费者启动时或分配到新分区时,使用seek方法查找保存在数据库里的偏移量。

1
2
3
4
5
6
7
8
9
10
11
private class HandleRebalance implements Consumer RebalanceListener {
  public void onPartitionsAssigned(Collection<TopicPartition> partitions){
    commitDBTransation();
  }

  public void onPartitionsRevoked(Collection<TopicPartition> partions){
    for(TopicPartition partition: partitions){
      consumer.seek(partition, getOffsetFromDB(partition));
    }
  }
}

如何退出

wakeup, 消费者唯一一个可以从其他线程安全调用的方法,退出poll()

1
consumer.wakeup()

反序列化

AvroSerializer保证写入主题的数据与主题的schema兼容。出现与兼容性相关的错误会被捕捉到,并带有消息描述。

第五章 深入Kafka

使用Zookeeper维护集群成员的信息。broker启动时,创建临时节点把自己的id注册到Zookeeper。Kafka订阅Zookeeper的/brokers/ids路径,当有broker加入或退出,这些组件就可以获得通知。 broker从Zookeeper断开,在启动时创建的临时节点会自动从Zookeeper删除。

控制器

也是一个broker, 还负责分区首领的选举。 在Zookeeper创建一个临时节点/controller让自己成为controller。

当控制器断开连接, 其他broker会尝试让自己成为新的控制器。新选出的控制器通过Zookeeper的条件递增操作获得一个全新的、数值更大的controller epoch

复制

每个主题分为多个分区,每个分区可以有多个副本。

  • 首领副本。只有一个,为了一致性,生产者和消费志的请求都经过首领。
  • 跟随者副本。 首领以外的副本,复制消息。不处理请求。

同步副本:持续请求得到最新消息的副本。在首领失效时,只有同步副本才有资格称为新首领。 首选首领:创建主题时选定的首领就是分区的首选首领。默认情况,auto.leader.rebalance.enable=true。Kafka会检查首选首领是不是当前首领。如果不是,并且首选首领是同步的,则立即触发首领选举。

处理请求

客户端发送元数据请求,包含感兴趣的主题。 服务器的响应消息里有这些主题的分区,每个分区的副本,以及哪个副本是首领。元数据请求可以发送给任一broker,所有broker都会缓存这些信息。

5.5 物理储存

分区分配

  • 在broker间平均分配
  • 确保每个分区的每个副本分布在不同的broker
  • 如果指定了机架信息,尽可能把每个分区的副本分配到不同机架。

文件格式

保存在磁盘上的数据格式与从生产者发送来的或者发送给消费者的消息格式是一样的。可以使用零复制技术给消费者发消息,避免对生产者压缩过的消息进行解压和再压缩。

键、值、偏移量、消息大小、校验和、消息格式版本号、压缩算法、时间戳。

索引

为每个分区维护了一个索引。把偏移量映射到片段文件和偏移量在文件里的位置。

清理

早于保留时间的旧事件会被删除,为每个键保留最新的值

被删除的事件

为了彻底删除一个键,应用需发送一个包含该键且值为null的消息。

何时清理

脏记录的主题数量达到50%

第六章 可靠的数据传递

可靠性保证

  • 分区消息的顺序
  • 只有当消息被写入分区的所有同步副本,才被认为已提交
  • 只要还有一个副本是活跃的,消息不会丢。
  • 消费者只能读取已提交的消息。

复制

分区首领是同步副本,而跟随副本的同步条件: * 与zookeeper有一个活跃会话,6s内有心跳 * 过去10s内从首领那里获取过消息。 * 获取消息几乎要零延迟。

不恰当的垃圾回收配置会造成几秒的停顿,从而让broker与zookeeper之间断开连接。

broker配置

复制系数

replication.factor,假设复制系数是3,每个分区总共会被3个不同的broker复制3次。

不完全的首领选举

unclean.leader.election只能在broker级别配置,默认是true。

  • 如果不同步的首领不能被提升为新首领,那么分区在旧首领恢复之前不可用。
  • 如果不同步的首领可以提升为新首领,那么在这个副本变为不同步之后写入旧首领的消息会全部丢失,数据不一致。

  • 对数据质量和数据一致性要求高的,如银行,禁用不完全的首领选举。

  • 对可用性要求高的系统,如实时点击流分析,启动不完全的首领选举。

最少同步副本

尽管一个主题配置了3个副本,还会出现只有一个同步副本的情况。 Kafka可靠性定义:消息被写入所有同步副本后才被认为已提交。但这里的“所有副本”只有一个副本,这个副本不可用时,数据就会丢失。

设置min.insyc.replicas=2,那么至少要存两个同步副本才能向分区写入数据。当只有一个同步副本,broker会停止接受生产者的请求。消息者仍可读取。即变成只读。

使用生产者例子

  • 为broker配置了3个副本,并禁用了不完全首领选举。把acks设为1. 当首领接到消息,然后立即崩溃了,则消息丢失。但发消息的客户端认为已经写入成功。
  • 为broker配置了3个副本,并禁用了不完全首领选举。设acks为all。假设现在往Kafka发送消息,而刚好分区首领崩溃,新的首领还在选举中,会返回“首领不可用”。客户端需要正确处理该异常。

配置生产者的重试

重试发送一个失败的消息的风险,如果两个消息都成功,会导致消息重复。例如生产者因为网络问题没有收到broker的确认,但消息实际上已经写入。

在消息里加入唯一标识符,用于检测重复消息,消费者在读取消息时对它们进行清理。

幂等。账号有100美元。 不是幂等,账号增加10美元。

使用消费者

显式提交

  • 总是处理完事件后提交偏移量
  • 提交频率时性能和重复数量之间的权衡。
  • 处理再均衡的情况
  • 消费者可能重试
  • 长时间处理,轮询时间不能超过几秒钟,使用另一个线程处理数据。
  • 仅一次传递。 消息包含唯一键。幂等。

生产环境可靠性

  • error-rate
  • retry-rate

对于消费者,最重要的是consumer-lag,消费者的处理速度与最近提交到分区里的偏移量之间还有多少差距。理想情况应为0.

确保生成的数据及时被处理。Kafka在消息里增加了时间戳。

第八章 跨集群数据镜像

复制:同一个集群的节点间移动数据。 镜像:集群间的数据复制。

使用场景 * 区域集群和中心集群 * 冗余 * 云迁移。

跨数据中心的客观情况 * 高延迟 * 有限的带宽。 * 高成本

Kafka假设服务器和客户端之间具有很低的延迟和很高的带宽。高延迟需要增加重试次数。

为每一个数据中心安装一个Kafka集群,并在这些集群间复制数据。

跨数据中心的架构原则: * 每个数据中心至少一个集群 * 每两个数据中心的数据复制做到每个事件仅复制一次。 * 尽量从远程数据中心读取数据,而不向远程数据中心写入数据。

Hub和Spoke

一个中心Kafka集群对应多个本地Kafka集群

只处理单个数据中心数据的应用程序可以部署在本地数据中心。需要处理多个数据中心的应用程序则部署在中央数据中心。 不足:一个数据中心的应用程序无法访问另一个数据中心的数据。

双活 Active-Active

每个数据中心都可以生产和读取数据。 为就近的用户提供服务,性能优势,不会有可用性的问题。冗余、弹性、每个数据中心都具备完整的功能。一旦发生失效,将用户重定向到另一个数据中心。

关键问题。多个位置的数据异步读取和更新避免冲突。将用户粘在同一个数据中心上。

避免循环镜像,相同的事件不能无止境地来回镜像。对于每个逻辑主题,在每个数据中心创建单独的主题。如逻辑主题users,SF中心则为SF.users,NY中心则为NY.users

主备架构

易于实现。

不足。浪费了一个集群。 把一些只读的工作负载定向到灾难备份集群。实际上是Hub和Spoke架构的简化版本,只有一个Spoke。

失效备援

  • 数据丢失和不一致。
  • 备援之后的起始偏移量
    • 重置偏移量。 从分区的起始位置,或者分区的末尾开始读取数据
    • 复制偏移量主题。 消费者会把偏移量提交到名为_consumeroffsets的主题。对这个主题进行镜像。
  • 基于事件的失效备援。每个消息里都包含一个时间戳,知名了消息发送给Kafka的时间。borker提供了一个索引和API,根据timestamp查找偏移量。
  • 偏移量外部映射。主集群和备集群的偏移量会发生偏差。 使用外部数据储存集群之间的偏移量映射。例如主集群的偏移量495被映射到备集群的偏移量500,则外部记录为(495,500)
  • 备援之后,清理旧的主集群,删掉所有的数据和偏移量,然后把新的主集群上的数据镜像回来。
  • 集群发现,不建议把主机的地址硬编码在生产者和消费者的配置文件。创建DNS别名,或服务发现工作。

MirrorMaker

消费者从源集群的主题和分区读取数据,通过公共生产者发送到目标集群。消费者每60秒通知生产者发送所有的数据,并等待确认。 如果MirrorMaker进程发生崩溃,最多只会出现60秒的重复数据。

|源集群 | Mirror Maker | 目标集群| |———–|——————|———| |主题A | 消费者 -> 生产者 | 主题A| |主题B | 消费者 | 主题B| |主题C | 消费者 | 主题C|