Kafka基础知识
Kafka权威指南.pdf (opens new window)
Kafka 是一款基于发布与订阅的消息系统。它一般被称为“分布式提交日志”或者“分布式流平台”。
Kafka 的数据是按照一定顺序持久化保存的,可以按需读取。此外,Kafka 的数据分布在整个系统里,具备数据故障保护和性能伸缩能力。
# 为什么选择Kafka
**多个生产者:**Kafka 可以无缝地支持多个生产者,不管客户端在使用单个主题还是多个主题。所以它很适合用来从多个前端系统收集数据,并以统一的格式对外提供数据。例如,一个包含了多个微服务的网站,可以为页面视图创建一个单独的主题,所有服务都以相同的消息格式向该主题写入数据。消费者应用程序会获得统一的页面视图,而无需协调来自不同生产者的数据流。
**多个消费者:**除了支持多个生产者外,Kafka 也支持多个消费者从一个单独的消息流上读取数据,而且消费者之间互不影响。这与其他队列系统不同,其他队列系统的消息一旦被一个客户端读取,其他客户端就无法再读取它。另外,多个消费者可以组成一个群组,它们共享一个消息流,并保证整个群组对每个给定的消息只处理一次。
基于磁盘的数据存储:Kafka 不仅支持多个消费者,还允许消费者非实时地读取消息,这要归功于Kafka 的数据保留特性。消息被提交到磁盘,根据设置的保留规则进行保存。每个主题可以设置单独的保留规则,以便满足不同消费者的需求,各个主题可以保留不同数量的消息。消费者可能会因为处理速度慢或突发的流量高峰导致无法及时读取消息,而持久化数据可以保证数据不会丢失。消费者可以在进行应用程序维护时离线一小段时间,而无需担心消息丢失或堵塞在生产者端。消费者可以被关闭,但消息会继续保留在Kafka 里。消费者可以从上次中断的地方继续处理消息。
伸缩性:为了能够轻松处理大量数据,Kafka 从一开始就被设计成一个具有灵活伸缩性的系统。用户在开发阶段可以先使用单个broker,再扩展到包含3 个broker 的小型开发集群,然后随着数据量不断增长,部署到生产环境的集群可能包含上百个broker。对在线集群进行扩展丝毫不影响整体系统的可用性。也就是说,一个包含多个broker 的集群,即使个别broker失效,仍然可以持续地为客户提供服务。要提高集群的容错能力,需要配置较高的复制系数。
高性能:上面提到的所有特性,让Kafka 成为了一个高性能的发布与订阅消息系统。通过横向扩展生产者、消费者和broker,Kafka 可以轻松处理巨大的消息流。在处理大量数据的同时,它还能保证亚秒级的消息延迟。
# Kafka的分区和副本
# 分区的原因
可以让leader均匀分散到多台机器中,让读写负载均衡,保证数据安全。
方便在集群中扩展 ,每个 Partition可以通过调整以适应它所在的机器,而一个 topic又可以有多个 Partition组成,因此整个集群就可以适应任意大小的数据了。
可以提高并发 ,因为可以以 Partition为单位读写了。
# 消费者组和分区
消费者组可以有多个,不同消费者消费的数据相互没有影响,同一个消费者组消费的数据不重复。
同一个消费者组内,一个消费者可以消费多个分区的数据,但是多个消费者不能同时消费一个分区的数据。
消费的数据没有被删除,只是标记被哪个消费者组消费到了什么位置。
# 多集群
# 使用多集群的原因
随着Kafka 部署数量的增加,基于以下几点原因,最好使用多个集群。
• 数据类型分离
• 安全需求隔离
• 多数据中心(灾难恢复)
# 多集群的架构
使用多集群的架构,会面临一些问题。比如:如果使用多个数据中心,就需要在它们之间复制消息。这样,在线应用程序才可以访问到多个站点的用户活动信息。例如,如果一个用户修改了他们的资料信息,不管从哪个数据中心都应该能看到这些改动。或者多个站点的监控数据可以被聚集到一个部署了分析程序和告警系统的中心位置。
不过,Kafka 的消息复制机制只能在单个集群里进行,不能在多个集群之间进行。
Kafka 提供了一个叫作MirrorMaker 的工具, 可以用它来实现集群间的消息复制。MirrorMaker 的核心组件包含了一个生产者和一个消费者,两者之间通过一个队列相连。消费者从一个集群读取消息,生产者把消息发送到另一个集群上。图1-8 展示了一个使用MirrorMaker 的例子,两个“本地”集群的消息被聚集到一个“聚合”集群上,然后将该集群复制到其他数据中心。不过,这种方式在创建复杂的数据管道方面显得有点力不从心。
# Kafka生产者
我们从创建一个ProducerRecord 对象开始,ProducerRecord 对象需要包含目标主题和要发送的内容。我们还可以指定键或分区。在发送ProducerRecord 对象时,生产者要先把键和值对象序列化成字节数组,这样它们才能够在网络上传输。
接下来,数据被传给分区器。如果之前在ProducerRecord 对象里指定了分区,那么分区器就不会再做任何事情,直接把指定的分区返回。如果没有指定分区,那么分区器会根据ProducerRecord 对象的键来选择一个分区。选好分区以后,生产者就知道该往哪个主题和分区发送这条记录了。紧接着,这条记录被添加到一个记录批次里,这个批次里的所有消息会被发送到相同的主题和分区上。有一个独立的线程负责把这些记录批次发送到相应的broker 上。
服务器在收到这些消息时会返回一个响应。如果消息成功写入Kafka,就返回一个RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入失败,则会返回一个错误。生产者在收到错误之后会尝试重新发送消息,几次之后如果还是失败,就返回错误信息。
# 顺序保证
Kafka 可以保证同一个分区里的消息是有序的。也就是说,如果生产者按照一定的顺序发送消息,broker 就会按照这个顺序把它们写入分区,消费者也会按照同样的顺序读取它们。在某些情况下,顺序是非常重要的。例如,往一个账户存入100 元再取出来,这个与先取钱再存钱是截然不同的!不过,有些场景对顺序不是很敏感。
如果把retries 设为非零整数,同时把max.in.flight.requests.per.connection设为比1 大的数,那么,如果第一个批次消息写入失败,而第二个批次写入成功,broker 会重试写入第一个批次。如果此时第一个批次也写入成功,那么两个批次的顺序就反过来了。
一般来说,如果某些场景要求消息是有序的,那么消息是否写入成功也是很关键的,所以不建议把retries 设为0。可以把max.in.flight.requests.per.connection 设为1,这样在生产者尝试发送第一批消息时,就不会有其他的消息发送给broker。不过这样会严重影响生产者的吞吐量,所以只有在对消息的顺序有严格要求的情况下才能这么做。
# 序列化
# 为什么不使用自定义序列化器
如果我们有多种类型的消费者,可能需要把customerID 字段变成长整型,或者为Customer 添加startDate 字段,这样就会出现新旧消息的兼容性问题。在不同版本的序列化器和反序列化器之间调试兼容性问题着实是个挑战——你需要比较原始的字节数组。
更糟糕的是,如果同一个公司的不同团队都需要往Kafka 写入Customer 数据,那么他们就需要使用相同的序列化器,如果序列化器发生改动,他们几乎要在同一时间修改代码。
# Avro序列化
Avro 数据通过与语言无关的schema 来定义。schema 通过JSON 来描述,数据被序列化成二进制文件或JSON 文件,不过一般会使用二进制文件。Avro 在读写文件时需要用到schema,schema 一般会被内嵌在数据文件里。
# 分区
ProducerRecord 对象包含了目标主题、键和值。Kafka 的消息是一个个键值对,ProducerRecord 对象可以只包含目标主题和值,键可以设置为默认的null,不过大多数应用程序会用到键。
键有两个用途:可以作为消息的附加信息,也可以用来决定消息该被写到主题的哪个分区。拥有相同键的消息将被写到同一个分区。
如果键值为null,并且使用了默认的分区器,那么记录将被随机地发送到主题内各个可用的分区上。分区器使用轮询(Round Robin)算法将消息均衡地分布到各个分区上。
如果键不为空,并且使用了默认的分区器,那么Kafka 会对键进行散列(使用Kafka 自己的散列算法,即使升级Java 版本,散列值也不会发生变化),然后根据散列值把消息映射到特定的分区上。这里的关键之处在于,同一个键总是被映射到同一个分区上,所以在进行映射时,我们会使用主题所有的分区,而不仅仅是可用的分区。这也意味着,如果写入数据的分区是不可用的,那么就会发生错误。
# 为什么需要自定义分区
业务需要,根据功能进行分区。
假设你是一个B2B 供应商,你有一个大客户,它是手持设备Banana 的制造商。Banana 占据了你整体业务10% 的份额。如果使用默认的散列分区算法,Banana 的账号记录将和其他账号记录一起被分配给相同的分区,导致这个分区比其他分区要大一些。**服务器可能因此出现存储空间不足、处理缓慢等问题。**我们需要给Banana 分配单独的分区,然后使用散列分区算法处理其他账号。
# 消费者
# 消费者与消费者组
Kafka 消费者从属于消费者群组。一个群组里的消费者订阅的是同一个主题,每个消费者接收主题一部分分区的消息。
往群组里增加消费者是横向伸缩消费能力的主要方式。Kafka 消费者经常会做一些高延迟的操作,比如把数据写到数据库或HDFS,或者使用数据进行比较耗时的计算。在这些情况下,单个消费者无法跟上数据生成的速度,所以可以增加更多的消费者,让它们分担负载,每个消费者只处理部分分区的消息,这就是横向伸缩的主要手段。我们有必要为主题创建大量的分区,在负载增长时可以加入更多的消费者。不过要注意,不要让消费者的数量超过主题分区的数量,多余的消费者只会被闲置。
# 分区再均衡-消费者加入或退出
一个新的消费者加入群组时,它读取的是原本由其他消费者读取的消息。当一个消费者被关闭或发生崩溃时,它就离开群组,原本由它读取的分区将由群组里的其他消费者来读取。在主题发生变化时,比如管理员添加了新的分区,会发生分区重分配。
分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡。再均衡非常重要,它为消费者群组带来了高可用性和伸缩性(我们可以放心地添加或移除消费者),不过在正常情况下,我们并不希望发生这样的行为。在再均衡期间,消费者无法读取消息,造成整个群组一小段时间的不可用。另外,当分区被重新分配给另一个消费者时,消费者当前的读取状态会丢失,它有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢应用程序。
消费者通过向被指派为群组协调器的broker(不同的群组可以有不同的协调器)发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息。消费者会在轮询消息(为了获取消息)或提交偏移量时发送心跳。如果消费者停止发送心跳的时间足够长,会话就会过期,群组协调器认为它已经死亡,就会触发一次再均衡。
如果一个消费者发生崩溃,并停止读取消息,群组协调器会等待几秒钟,确认它死亡了才会触发再均衡。在这几秒钟时间里,死掉的消费者不会读取分区里的消息。在清理消费者时,消费者会通知协调器它将要离开群组,协调器会立即触发一次再均衡,尽量降低处理停顿。
# 消费者分区分配策略
每个消费者都会负责若干个分区,负责消费分区中的数据。分配分区的工作由PartitionAssignor角色来完成,其决定了将哪些分区分配给哪个消费者。
Kafka有两个默认的分区分配策略:
Range(连续分配策略):该策略会把主题的若干个连续的分区分配给消费者。假设消费者C1 和消费者C2 同时订阅了主题T1 和主题T2,并且每个主题有3 个分区。那么消费者C1 有可能分配到这两个主题的分区0 和分区1,而消费者C2 分配到这两个主题的分区2。因为每个主题拥有奇数个分区,而分配是在主题内独立完成的,第一个消费者最后分配到比第二个消费者更多的分区。只要使用了Range 策略,而且分区数量无法被消费者数量整除,就会出现这种情况。
RoundRobin(轮询策略):该策略把主题的所有分区逐个分配给消费者。如果使用RoundRobin 策略来给消费者C1和消费者C2 分配分区,那么消费者C1 将分到主题T1 的分区0 和分区2 以及主题T2的分区1,消费者C2 将分配到主题T1 的分区1 以及主题T2 的分区0 和分区2。一般来说,如果所有消费者都订阅相同的主题(这种情况很常见),RoundRobin 策略会给所有消费者分配相同数量的分区(或最多就差一个分区)。
# 提交偏移量
每次调用poll() 方法,它总是返回由生产者写入Kafka 但还没有被消费者读取过的记录,我们因此可以追踪到哪些记录是被群组里的哪个消费者读取的。
消费者可以使用Kafka 来追踪消息在分区里的位置(偏移量)。我们把更新分区当前位置的操作叫作提交。
消费者往一个叫作**_consumer_offset** 的特殊主题发送消息,消息里包含每个分区的偏移量。如果消费者一直处于运行状态,那么偏移量就没有什么用处。不过,如果消费者发生崩溃或者有新的消费者加入群组,就会触发再均衡,完成再均衡之后,每个消费者可能分配到新的分区,而不是之前处理的那个。为了能够继续之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的地方继续处理。
# 自动提交偏移量
最简单的提交方式是让消费者自动提交偏移量。如果enable.auto.commit 被设为true,那么每过5s,消费者会自动把从poll() 方法接收到的最大偏移量提交上去。提交时间间隔由auto.commit.interval.ms 控制,默认值是5s。与消费者里的其他东西一样,自动提交也是在轮询里进行的。消费者每次在进行轮询时会检查是否该提交偏移量了,如果是,那么就会提交从上一次轮询返回的偏移量。
然而,这种配置将带来一些严重的后果,比如重复消费数据的问题:
假设我们仍然使用默认的5s 提交时间间隔,在最近一次提交之后的3s 发生了再均衡,再均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后了3s,所以在这3s 内到达的消息会被重复处理。可以通过修改提交时间间隔来更频繁地提交偏移量,减小可能出现重复消息的时间窗,不过这种情况是无法完全避免的。
# 手动提交当前偏移量
消费者API 提供了另一种提交偏移量的方式,开发者可以在必要的时候提交当前偏移量,而不是基于时间间隔。
把auto.commit.offset 设为false,让应用程序决定何时提交偏移量。使用commitSync()提交偏移量最简单也最可靠。这个API 会提交由poll() 方法返回的最新偏移量,提交成功后马上返回,如果提交失败就抛出异常。
要记住,commitSync() 将会提交由poll() 返回的最新偏移量,所以在处理完所有记录后要确保调用了commitSync(),否则还是会有丢失消息的风险。如果发生了再均衡,从最近一批消息到发生再均衡之间的所有消息都将被重复处理。
# 集群
Kafka 使用Zookeeper 来维护集群成员的信息。每个broker 都有一个唯一标识符,这个标识符可以在配置文件里指定,也可以自动生成。在broker 启动的时候,它通过创建临时节点把自己的ID 注册到Zookeeper。Kafka 组件订阅Zookeeper 的/brokers/ids 路径(broker 在Zookeeper 上的注册路径),当有broker 加入集群或退出集群时,这些组件就可以获得通知。
在broker 停机、出现网络分区或长时间垃圾回收停顿时,broker 会从Zookeeper 上断开连接,此时broker 在启动时创建的临时节点会自动从Zookeeper 上移除。监听broker 列表的Kafka 组件会被告知该broker 已移除。
在关闭broker 时,它对应的节点也会消失,不过它的ID 会继续存在于其他数据结构中。例如,主题的副本列表里就可能包含这些ID。在完全关闭一个broker 之后,如果使用相同的ID 启动另一个全新的broker,它会立即加入集群,并拥有与旧broker相同的分区和主题。
# 控制器
控制器也是一个broker,除了一般的功能外,它还负责分区首领的选举。(所谓的分区首领指的是同一个分区的所有副本的领导者)
集群里第一个启动的broker 通过在Zookeeper 里创建一个临时节点/controller 让自己成为控制器。其他broker 在启动时也会尝试创建这个节点,不过它们会收到一个“节点已存在”的异常,然后“意识”到控制器节点已存在,也就是说集群里已经有一个控制器了。其他broker 在控制器节点上创建Zookeeper watch 对象,这样它们就可以收到这个节点的变更通知。这种方式可以确保集群里一次只有一个控制器存在。
防止脑裂,使用epoch机制:如果控制器被关闭或者与Zookeeper 断开连接,Zookeeper 上的临时节点就会消失。集群里的其他broker 通过watch 对象得到控制器节点消失的通知,它们会尝试让自己成为新的控制器。第一个在Zookeeper 里成功创建控制器节点的broker 就会成为新的控制器,其他节点会收到“节点已存在”的异常,然后在新的控制器节点上再次创建watch 对象。每个新选出的控制器通过Zookeeper 的条件****递增操作获得一个全新的、数值更大的controller epoch。其他broker 在知道当前controller epoch 后,如果收到由控制器发出的包含较旧epoch 的消息,就会忽略它们。
分区首领的选举:当控制器发现一个broker 已经离开集群(通过观察相关的Zookeeper 路径),它就知道,那些失去首领的分区需要一个新首领(这些分区的首领刚好是在这个broker 上)。控制器遍历这些分区,并确定谁应该成为新首领(简单来说就是分区副本列表里的下一个副本),然后向所有包含新首领或现有跟随者的broker 发送请求。该请求消息包含了谁是新首领以及谁是分区跟随者的信息。随后,新首领开始处理来自生产者和消费者的请求,而跟随者开始从新首领那里复制消息。
# 复制机制
复制功能是Kafka 架构的核心。在Kafka 的文档里,Kafka 把自己描述成“一个分布式的、可分区的、可复制的提交日志服务”。
Kafka 使用主题来组织数据,每个主题被分为若干个分区,每个分区有多个副本。那些副本被保存在broker 上,每个broker 可以保存成百上千个属于不同主题和分区的副本。
副本的类型:
首领副本:每个分区都有一个首领副本。为了保证一致性,所有生产者请求和消费者请求都会经过这个副本。
跟随者副本:首领以外的副本都是跟随者副本。跟随者副本不处理来自客户端的请求,它们唯一的任务就是从首领那里复制消息,保持与首领一致的状态。如果首领发生崩溃,其中的一个跟随者会被提升为新首领。
为了与首领保持同步,跟随者向首领发送获取数据的请求,这种请求与消费者为了读取消息而发送的请求是一样的。首领将响应消息发给跟随者。请求消息里包含了跟随者想要获取消息的偏移量,而且这些偏移量总是有序的。
如果跟随者在10s 内没有请求任何消息,或者虽然在请求消息,但在10s 内没有请求最新的数据,那么它就会被认为是不同步的。如果一个副本无法与首领保持一致,在首领发生失效时,它就不可能成为新首领——毕竟它没有包含全部的消息。
# Kafka集群处理请求
broker 会在它所监听的每一个端口上运行一个Acceptor 线程,这个线程会创建一个连接,并把它交给Processor 线程去处理。Processor 线程(也被叫作“网络线程”)的数量是可配置的。网络线程负责从客户端获取请求消息,把它们放进请求队列,然后从响应队列获取响应消息,把它们发送给客户端。
生产请求和获取请求都必须发送给分区的首领副本。如果broker 收到一个针对特定分区的请求,而该分区的首领在另一个broker 上,那么发送请求的客户端会收到一个“非分区首领”的错误响应。当针对特定分区的获取请求被发送到一个不含有该分区首领的broker上,也会出现同样的错误。Kafka 客户端要自己负责把生产请求和获取请求发送到正确的broker 上。
那么客户端怎么知道该往哪里发送请求呢?客户端使用了另一种请求类型,也就是元数据请求。这种请求包含了客户端感兴趣的主题列表。服务器端的响应消息里指明了这些主题所包含的分区、每个分区都有哪些副本,以及哪个副本是首领。元数据请求可以发送给任意一个broker,因为所有broker 都缓存了这些信息。
一般情况下,客户端会把这些信息缓存起来,并直接往目标broker 上发送生产请求和获取请求。
# 生产请求
包含首领副本的broker 在收到生产请求时,会对请求做一些验证。
之后,消息被写入本地磁盘。在Linux 系统上,消息会被写到文件系统缓存里,并不保证它们何时会被刷新到磁盘上。Kafka 不会一直等待数据被写到磁盘上——它依赖复制功能来保证消息的持久性。
在消息被写入分区的首领之后,broker 开始检查acks 配置参数——如果acks 被设为0 或1,那么broker 立即返回响应;如果acks 被设为all,那么请求会被保存在一个叫作**炼狱(purgatory)**的缓冲区里,直到首领发现所有跟随者副本都复制了消息,响应才会被返回给客户端。
# 获取请求
客户端发送请求,向broker 请求主题分区里具有特定偏移量的消息,好像在说:“请把主题Test 分区0 偏移量从53 开始的消息以及主题Test 分区3 偏移量从64 开始的消息发给我。”客户端还可以指定broker 最多可以从一个分区里返回多少数据。这个限制是非常重要的,因为客户端需要为broker 返回的数据分配足够的内存。如果没有这个限制,broker 返回的大量数据有可能耗尽客户端的内存。
请求需要先到达指定的分区首领上,然后客户端通过查询元数据来确保请求的路由是正确的。首领在收到请求时,它会先检查请求是否有效——比如,指定的偏移量在分区上是否存在?如果客户端请求的是已经被删除的数据,或者请求的偏移量不存在,那么broker 将返回一个错误。
如果请求的偏移量存在,broker 将按照客户端指定的数量上限从分区里读取消息,再把消息返回给客户端。Kafka 使用零拷贝技术向客户端发送消息——也就是说,Kafka 直接把消息从文件(或者更确切地说是Linux 文件系统缓存)里发送到网络通道,而不需要经过任何中间缓冲区。这是Kafka 与其他大部分数据库系统不一样的地方,其他数据库在将数据发送给客户端之前会先把它们保存在本地缓存里。这项技术避免了字节复制,也不需要管理内存缓冲区,从而获得更好的性能。
# 物理存储
# 分区分配
在创建主题时,Kafka 首先会决定如何在broker 间分配分区。假设你有6 个broker,打算创建一个包含10 个分区的主题,并且复制系数为3。那么Kafka 就会有30 个分区副本,它们可以被分配给6 个broker。在进行分区分配时,我们要达到如下的目标。
在broker 间平均地分布分区副本。对于我们的例子来说,就是要保证每broker 可以分到5 个副本。
确保每个分区的每个副本分布在不同的broker 上。假设分区0 的首领副本在broker 2 上,那么可以把跟随者副本放在broker 3 和broker 4 上,但不能放在broker 2 上,也不能两个都放在broker 3 上。
如果为broker 指定了机架信息,那么尽可能把每个分区的副本分配到不同机架的broker上。这样做是为了保证一个机架的不可用不会导致整体的分区不可用。
为了实现这个目标,我们先随机选择一个broker(假设是4),然后使用轮询的方式给每个broker 分配分区来确定首领分区的位置。于是,首领分区0 会在broker 4 上,首领分区1 会在broker 5 上,首领分区2 会在broker 0 上(只有6 个broker),并以此类推。然后,我们从分区首领开始,依次分配跟随者副本。如果分区0 的首领在broker 4 上,那么它的第一个跟随者副本会在broker 5 上,第二个跟随者副本会在broker 0 上。分区1 的首领在broker 5 上,那么它的第一个跟随者副本在broker 0 上,第二个跟随者副本在broker 1 上。
如果配置了机架信息,那么就不是按照数字顺序来选择broker 了,而是按照交替机架的方式来选择broker。假设broker 0、broker 1 和broker 2 放置在同一个机架上,broker 3、broker 4和broker 5 分别放置在其他不同的机架上。我们不是按照从0 到5 的顺序来选择broker,而是按照0,3,1,4,2,5 的顺序来选择,这样每个相邻的broker 都在不同的机架上。于是,如果分区0 的首领在broker 4 上,那么第一个跟随者副本会在broker 2 上,这两个broker 在不同的机架上。如果第一个机架下线,还有其他副本仍然活跃着,所以分区仍然可用。这对所有副本来说都是一样的,因此在机架下线时仍然能够保证可用性。
# Kafka可靠性保证
保证这个词,它是指确保系统在各种不同的环境下能够发生一致的行为。
Kafka 可以在哪些方面作出保证呢?
Kafka 可以保证分区消息的顺序。如果使用同一个生产者往同一个分区写入消息,而且消息B 在消息A 之后写入,那么Kafka 可以保证消息B 的偏移量比消息A 的偏移量大,而且消费者会先读取消息A 再读取消息B。
只有当消息被写入分区的所有****同步副本(并不是每个副本都属于同步副本,一定要与领导者同步的副本才能叫同步副本)时(但不一定要写入磁盘),它才被认为是“已提交”的。生产者可以选择接收不同类型的确认,比如在消息被完全提交时的确认,或者在消息被写入首领副本时的确认,或者在消息被发送到网络时的确认。
只要还有一个副本是活跃的,那么已经提交的消息就不会丢失。
消费者只能读取已经提交的消息。
分区首领是同步副本,而对于跟随者副本来说,它需要满足以下条件才能被认为是同步的。
与Zookeeper 之间有一个活跃的会话,也就是说,它在过去的6s(可配置)内向Zookeeper 发送过心跳。
在过去的10s 内(可配置)从首领那里获取过消息。
在过去的10s 内从首领那里获取过最新的消息。光从首领那里获取消息是不够的,它还必须是几乎零延迟的。
构建一个可靠的系统需要作出一些权衡,Kafka 管理员和开发者可以在配置参数上作出权衡,从而得到他们想要达到的可靠性。这种权衡一般是指消息存储的可靠性和一致性的重要程度与可用性、高吞吐量、低延迟和硬件成本的重要程度之间的权衡。
# 为什么将默认复制系数设置为3
如果复制系数为N,那么在N-1 个broker 失效的情况下,仍然能够从主题读取数据或向主题写入数据。所以,更高的复制系数会带来更高的可用性、可靠性和更少的故障。另一方面,复制系数N 需要至少N 个broker,而且会有N 个数据副本,也就是说它们会占用N倍的磁盘空间。我们一般会在可用性和存储硬件之间作出权衡。
那么该如何确定一个主题需要几个副本呢?这要看主题的重要程度,以及你愿意付出多少成本来换取可用性。
如果因broker 重启导致的主题不可用是可接受的(这在集群里是很正常的行为),那么把复制系数设为1 就可以了。在作出这个权衡的时候,要确保这样不会对你的组织和用户造成影响,因为你在节省了硬件成本的同时也降低了可用性。
复制系数为2 意味着可以容忍1 个broker 发生失效,看起来已经足够了。不过要记住,有时候1 个broker 发生失效会导致集群不稳定(通常是旧版的Kafka),迫使你重启另一个broker——集群控制器。也就是说,如果将复制系数设为2,就有可能因为重启等问题导致集群不可用。所以这是一个两难的选择。
基于以上几点原因,我们建议在要求可用性的场景里把复制系数设为3。
# 最小同步副本
我们知道,尽管为一个主题配置了3 个副本,还是会出现只有一个同步副本的情况。如果这个同步副本变为不可用,我们必须在可用性和一致性之间作出选择——这是一个两难的选择。根据Kafka 对可靠性保证的定义,消息只有在被写入到所有同步副本之后才被认为是已提交的。但如果这里的“所有副本”只包含一个同步副本,那么在这个副本变为不可用时,数据就会丢失。
如果要确保已提交的数据被写入不止一个副本,就需要把最少同步副本数量设置为大一点的值。对于一个包含3 个副本的主题,如果min.insync.replicas
被设为2,那么至少要存在两个同步副本才能向分区写入数据。
如果3 个副本都是同步的,或者其中一个副本变为不可用,都不会有什么问题。不过,如果有两个副本变为不可用,那么broker 就会停止接受生产者的请求。尝试发送数据的生产者会收到NotEnoughReplicasException 异常。消费者仍然可以继续读取已有的数据。实际上,如果使用这样的配置,那么当只剩下一个同步副本时,它就变成只读了,这是为了避免在发生不完全选举时数据的写入和读取出现非预期的行为。为了从只读状态中恢复,必须让两个不可用分区中的一个重新变为可用的(比如重启broker),并等待它变为同步的。
# exactly-once语义
有些应用程序不仅仅需要“至少一次”(at-least-once)语义(意味着没有数据丢失),还需要“仅一次”(exactly-once)语义。尽管Kafka 现在还不能完全支持仅一次语义,消费者还是有一些办法可以保证Kafka 里的每个消息只被写到外部系统一次(但不会处理向Kafka 写入数据时可能出现的重复数据)。
实现仅一次处理最简单且最常用的办法是把结果写到一个支持唯一键的系统里,比如键值存储引擎、关系型数据库、ElasticSearch 或其他数据存储引擎。在这种情况下,要么消息本身包含一个唯一键(通常都是这样),要么使用主题、分区和偏移量的组合来创建唯一键——它们的组合可以唯一标识一个Kafka 记录。如果你把消息和一个唯一键写入系统,然后碰巧又读到一个相同的消息,只要把原先的键值覆盖掉即可。数据存储引擎会覆盖已经存在的键值对,就像没有出现过重复数据一样。这个模式被叫作幂等性写入,它是一种很常见也很有用的模式。
如果写入消息的系统支持事务,那么就可以使用另一种方法。最简单的是使用关系型数据库,不过HDFS 里有一些被重新定义过的原子操作也经常用来达到相同的目的。我们把消息和偏移量放在同一个事务里,这样它们就能保持同步。在消费者启动时,它会获取最近处理过的消息偏移量,然后调用seek() 方法从该偏移量位置继续读取数据。
# 多集群架构
因为前面已经使用过“复制”这个词来描述在同一个集群的节点间移动数据,所以我们把集群间的数据复制叫作镜像(mirroring)。Kafka 内置的跨集群复制工具叫作MirrorMaker。
# 架构原则
每个数据中心至少需要一个集群。
每两个数据中心之间的数据复制要做到每个事件仅复制一次(除非出现错误需要重试)。
如果有可能,尽量从远程数据中心读取数据,而不是向远程数据中心写入数据。
# 双活架构
当有两个或多个数据中心需要共享数据并且每个数据中心都可以生产和读取数据时,可以使用双活(Active-Active)架构。
这种架构的主要好处在于,它可以为就近的用户提供服务,具有性能上的优势,而且不会因为数据的可用性问题在功能方面作出牺牲。第二个好处是冗余和弹性。因为每个数据中心具备完整的功能,一旦一个数据中心发生失效,就可以把用户重定向到另一个数据中心。这种重定向完全是网络的重定向,因此是一种最简单、最透明的失效备援方案。
这种架构的主要问题在于,如何在进行多个位置的数据异步读取和异步更新时避免冲突。比如镜像技术方面的问题——如何确保同一个数据不会被无止境地来回镜像?而数据一致性方面的问题则更为关键。
如果能够很好地处理在从多个位置异步读取数据和异步更新数据时发生的冲突问题,那么我们强烈建议使用这种架构。这种架构是我们所知道的最具伸缩性、弹性、灵活性和成本优势的解决方案。所以,它值得我们投入精力去寻找一些办法,用于避免循环复制、把相同用户的请求粘在同一个数据中心,以及在发生冲突时解决冲突。
双活镜像(特别是当数据中心的数量超过两个)的挑战之处在于,每两个数据中心之间都需要进行镜像,而且是双向的。如果有5 个数据中心,那么就需要维护至少20 个镜像进程,还有可能达到40 个,因为为了高可用,每个进程都需要冗余。
# 主备架构
有时候,使用多个集群只是为了达到灾备的目的。你可能在同一个数据中心安装了两个集群,它们包含相同的数据,平常只使用其中的一个。当提供服务的集群完全不可用时,就可以使用第二个集群。
又或者你可能希望它们具备地理位置弹性,比如整体业务运行在加利福尼亚州的数据中心上,但需要在德克萨斯州有第二个数据中心,第二个数据中心平常不怎么用,但是一旦第一个数据中心发生地震,第二个数据中心就能派上用场。德克萨斯州的数据中心可能拥有所有应用程序和数据的非活跃(“冷”)复制,在紧急情况下,管理员可以启动它们,让第二个集群发挥作用。这种需求一般是合规性的,业务不一定会将其纳入规划范畴,但还是要做好充分的准备。
这种架构的好处是易于实现,而且可以被用于任何一种场景。你可以安装第二个集群,然后使用镜像进程将第一个集群的数据完整镜像到第二个集群上,不需要担心数据的访问和冲突问题,也不需要担心它会带来像其他架构那样的复杂性。
这种架构的不足在于,它浪费了一个集群。Kafka 集群间的失效备援比我们想象的要难得多。从目前的情况来看,要实现不丢失数据或无重复数据的Kafka 集群失效备援是不可能的。我们只能尽量减少这些问题的发生,但无法完全避免。
# MirrorMaker
Kafka 提供了一个简单的工具, 用于在两个数据中心之间镜像数据。这个工具叫MirrorMaker,它包含了一组消费者(因为历史原因,它们在MirrorMaker 文档里被称为流),这些消费者属于同一个群组,并从主题上读取数据。每个MirrorMaker 进程都有一个单独的生产者。
镜像过程很简单:MirrorMaker 为每个消费者分配一个线程,消费者从源集群的主题和分区上读取数据,然后通过公共生产者将数据发送到目标集群上。默认情况下,消费者每60 秒通知生产者发送所有的数据到Kafka,并等待Kafka 的确认。然后消费者再通知源集群提交这些事件相应的偏移量。这样可以保证不丢失数据(在源集群提交偏移量之前,Kafka 对消息进行了确认),而且如果MirrorMaker 进程发生崩溃,最多只会出现60 秒的重复数据。