单个 kafka服务器足以满足本地开发或 POC要求,使用集群的最大好处是可以跨服务器进行负载均衡,再则就是可以使用复制功能来避免因单点故障造成的数据丢失。在维护 Kafka 或底层系统时,使用集群可以确保为客户端提供高可用性。

Kafka集群

# 一、需要多少个 Broker

一个 kafka 需要多少个 broker取决于以下几个因素:
【1】需要多少磁盘空间来保留数据,以及单个broker 有多少空间可用。如果整个集群需要保留 10TB(每天2千万的数据,够用2年)的数据,每个 broker可以存储 2TB,那么至少需要 5个broker。如果启用了复制,那么至少还需要至少一倍的空间,不过还要取决于复制系数是多少。也就是说如果启用了数据复制,那么这个集群至少需要 10个 broker。
【2】集群处理请求的能力。这通常与网络接口处理客户端流量的能力有关,特别是当有多个消费者存在或者在数据保留期间流量发生波动(比如高峰时段流量爆发)时。如果单个 broker 的网络接口在高峰时段可以达到80%的使用量,并且有两个消费者,那么消费者就无法保持峰值。除非有两个 broker。如果集群启动了复制功能,则需要把额外的消费者考虑在内。因磁盘吞吐量低和系统内存不足造成的性能问题,也可以通过扩展多个 broker来解决。

# 二、broker 配置

要把一个 broker 加入到集群里,只需要修改两个配置参数。首先,所有 broker都必须配置相同的 zookeeper.connect 该参数指定了用于保存元数据的 Zookeeper群组和路径。其次,每个 broker 都必须为 broker.id 参数设置唯一的值。如果两个 broker使用相同的 broker.id,那么第二个 broker就无法启动。在运行集群时,还可以配置其他一些参数,特别是那些用于控制数据复制的参数。

# 三、操作系统调优

大部分 Linux发行版的内核调优参数配置已经能够满足大多数应用程序的运行需求,不过还是可以通过调整一些参数来进一步提升 kafka的性能。这些参数主要与虚拟内存、网络子系统和用来存储日志片段的磁盘挂载点有关。这些参数一般配置在 /etc/sysctl.conf 文件里,不过在对内核参数进行调整时,最好参考操作系统的文档。
【1】虚拟内存: 一般来说,Linux 的虚拟内存会根据系统的工作负荷进行自动调整。我们可以对交换分区的处理方式和内存脏页进行调整,从而让 kafka更好地处理工作负载。对于大多数依赖吞吐量的应用程序来说,要尽量避免内存交换。内存页和磁盘之间的交换对 kafka 各方面的性能都有重大影响。kafka大量使用系统页面缓存,如果虚拟内存被交换到磁盘,说明已经没有多余内存可以分配给页面缓存了。
一种避免内存交换的方法是不设置任何交换分区。内存交换不是必须的,不过它确实能够在系统发生灾难性错误时提供一些帮助。进行内存交换可以防止操作系统由于内存不足而突然终止进行。基于此原因建议把 vm.swappiness 参数设置为 1表示“除非发生内存溢出,否则不进行内存交换”。指定了虚拟机子系统将如何使用交换分区,而不是只把内存页从页面缓存里移除。要优先考虑减少页面缓存,而不是进行内存交换。
【2】磁盘: 选择合适的磁盘硬件设备和使用 RAID外,文件系统是影响性能的另一个重要因素。有很多种文件系统可供选择,不过对于本地文件系统来说,EXT4(第四代可扩展文件系统)和 XFS 最为常见。XFS 成为很多 Linux 发行版默认的文件系统,因为它只需要做少量调优就可以承担大部分的工作负荷。
【3】网络: 默认情况下,系统内核没有针对快速的大流量网络传输进行优化,所以对于应用程序来说,一般需要对网络栈进行调优,以实现对大流量的支持。实际上,调整 Kafka的网络配置与调整其它大部分 Web服务器和网络应用程序的网络配置是一样的。首先可以对分配给 socket读写缓冲区的内存大小做出调整,这样可以显著提升网络的传输性能。socket 读写缓冲区对应的参数分别是 net.core.vmen_default 和 net.core.rmem_default,合理的值是 131072(128KB)读写缓冲区最大值对应的参数分别是 net.core.vmen_max 和 net.core.rmem_max,合理的值是 2097152(2MB)要注意最大值并不意味着每个 socket一定要有这么大的缓冲区,只是说在必要的情况下才会达到这个值。

# 四、生产环境的注意事项

【1】垃圾回收器选项: Java7 带来的 G1垃圾回收器,G1会自动根据工作负载情况进行自我调节,而且它的停顿时间是恒定的。它可以轻松地处理大块的堆内存,把堆内存分为若干小块的区域,每次停顿时并不会对这个堆空间进行回收。G1只需要很少的配置就能完成这些工作。以下是 G1的两个调整参数:
MaxGCPauseMillis:该参数指定每次垃圾回收默认的停顿时间。该值不是固定的,G1可以根据需要使用更长的时间。它的默认值是200ms。也就是说,G1会决定垃圾回收的频率以及每一轮需要回收多少个区域,这样算下来,每一轮垃圾回收大概需要200ms的时间。 InitiatingHeapOccupancyPercent:该参数指定了在 G1启动新一轮垃圾回收之前可以使用的推内存百分比,默认值是45。也就是说,在堆内存的使用率达到45%之前,G1不会启动垃圾回收。这个百分比包括新生代和老年代的内存。
Kafka 对堆内存的使用率非常高,容易产生垃圾对象,所以可以把这些值设置的小一些。如果一台服务器有 64GB内存,并且使用 5GB来运行 Kafka,那么可以参考一下配置:MaxGCPauseMillis可以设置为20ms;InitiatingHeapOccupancyPercent可以设置为35,这样可以让垃圾回收比默认的要早一些启动。Kafka启动脚本并没有启用G1回收器,而是使用 Parallel New 和 CMS(Concurrent Mark-Sweep,并发标记和清除)垃圾回收器。不过它可以通过环境变量来修改。

export KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true"
1

【2】数据中心布局: 生产环境服务器不可用意味着金钱的损失,使用 kafka集群的复制功能就显得尤为重要,而服务器在数据中心所处的物理位置也变得重要起来。在为 broker增加新分区的时候,broker并无法获知机架的信息。也就是说,两个 broker有可能是同一个机架上,或者在同一个可用区域(如果运行在像 AWS这样的云服务上),所以,在为分区添加副本的时候,这些副本很可能被分配给同一个机架的broker,他们使用相同的电源和网络连接。如果机架出了问题,这些分区就会离线,客户端就无法访问它们。更糟糕的是,如果发生不完整的主节点选举,那么在恢复时就可能丢失数据。
所以,最好把集群的 broke安装在不同的机架上,至少不要让它们共享可能出现单点故障的基本设施,比如电源和网络。也就是说,部署服务器需要至少两个电源连接(两个不同的回路)和两个网络交换机(保证可以进行无缝的故障切换)。除了这些以外,最好还要把 broker安放在不同的机架上。因为随着时间的推移,机架也需要进行维护,而这会导致机器离线(比如移动机器或者重新连接电源)

# 五、共享 Zookeeper

Kafka 使用 zk来保存 broker、主题和分区的元数据信息。对于一个包含多个节点的 zk群组来说,Kafka集群的这些流量并不算多,那些写操作只是用于构造消费者群组和集群本身。实际上,在很多部署环境里,会让多个 Kafka集群共享一个 zk群组(每个集群使用一个 chroot路径)Kafka0.9版本后引入新的消费者接口,允许 broker直接维护消费者群组的信息,主题信息,消费者分区的偏移量。

消费者可以选择将偏移量提交到zk或Kafka,还可以选择提交偏移量的时间间隔。如果消费者将偏移量提交到zk,那么到每个提交时间点,消费者将会为每一个消费的分区往zk写入一次偏移量。合理的提交间隔是1分钟,因为这刚好是消费者群组的某个消费者发生失效时能够读取到重复消息的时间。值得注意的是,这些提交对于 zk来说流量不算小,特别是当集群里有多个消费者的时候。如果 zk群组无法处理太大的流量,就有必要使用长一点的提交时间间隔。建议使用最新版的Kafka,将偏移量提交到broker上。

虽然多个 kafka集群可以共享一个 zk群组,但是如果可能的话,不建议把 zk共享给其它应用程序。Kafka对 zk的延迟和超时比较敏感,与 zk群组之间的一个通信就可能导致 Kafka服务器出现无法预测的行为。这样很容易让多个 broker同时离线,如果它们与zk 之间断开连接,也会导致分区离线。这也会给集群控制器带来压力,在服务器离线一段时间之后,当控制器尝试关闭一个服务器时,会表现出一些细小的错误。其他的应用程序因重度使用或进行不恰当的操作给zk群组带来压力,所以最好让它们使用自己的 zk群组。

(adsbygoogle = window.adsbygoogle || []).push({});