从零学习Kafka:生产者压缩

前面了解了生产端的分区机制后,我们继续来看生产端的另一个重要的机制——压缩。

压缩与解压

说到压缩,第一个问题一定是 Kafka 在哪里进行压缩,又是在哪里进行解压的?

我不卖关子,直接告诉你答案,Kafka 通常在 Producer 端进行压缩,在 Broker 端保持,到了 Consumer 端解压。除了 Producer 端压缩之外,Broker 端也是可以进行压缩的,但这种情况并不常见。具体情况我们在下一节再聊。

在 Producer 端开启压缩的方法也很简单,我们在设置 Producer 属性时,增加 compression.type 参数即可,例如我们在 Producer 端启用 gzip 压缩,就可以这样设置:

1
props.put("compression.type", "gzip");

在 Kafka 中,压缩并不是针对单条消息,而是针对一个批次,那么如何才能提高压缩的效率呢?答案是增大批次的大小。因为压缩的本质是通过找到数据之间的重复模式来减少体积,批次内积累的数据越多,重复的可能性就越大,压缩概率也就越好。因此我们在启用压缩之后,还需要合理的设置 batch.sizelinger.ms 这两个参数。

重压缩

前面我们提到了 Kafka Broker 端也可以进行压缩,但只有在少数几种场景下才会出现,因为 Kafka 的设计初衷是 Broker 端尽量只做简单的磁盘读写操作。

由于 Broker 端进行压缩需要先把数据解压,然后进行处理,再压缩。我们把这一过程称为重压缩。

重压缩主要出现在以下三种场景:

场景一

压缩算法不一致,例如 Producer 端使用了 gzip 算法,但 Broker 端配置了 compression.type=lz4,这种情况 Broker 必须进行重压缩。

场景二

消息格式版本转换,Kafka 在 0.11.0.0 版本进行了一次消息格式版本的升级,新版本的消息会统一存储消息批次的公共信息。如果你的 Producer 版本是旧版本,Broker 是新版本,那么 Broker 就需要进行消息格式转换。这一操作也要涉及到重压缩。

场景三

Broker 侧注入时间戳,Kafka 支持两种时间戳,一种是 CreateTime,在 Producer 端直接设置。另一种是 LogAppendTime,这种时间戳是记录的 Broker 在写入磁盘时的时间戳,Broker 需要解压整个批次,然后修改每条消息的时间戳,再压缩整个批次。时间戳的配置参数是 log.message.timestamp.type,我们保持默认的 CreateTime 就好。

最后再提一下,重压缩是非常消耗 Broker 端 CPU 资源的,我们要尽量避免 Broker 端的重压缩,让它只做磁盘的读写操作就好。如果有 Broker 端 CPU 资源飙升的问题,也可以按照以上三种场景排查一下是不是重压缩引起的。

压缩算法

Kafka 支持了多种压缩算法,选择合适的压缩算法其实是在压缩率与 CPU 开销之间做出平衡。我们通过一个表格来对比一下几种压缩算法。

算法 压缩率 CPU消耗 压缩速度 适用场景
GZIP 存储敏感,带宽受限的归档、离线处理
Snappy 中等 较低 速度与资源之间取得良好平衡
LZ4 中等偏高 极低 极快 对延迟极度敏感的实时计算、交易等
Zstd 高,与 GZIP 接近 中等 较快 适合大部分通用场景

总的来说,4 种算法在实际使用过程中各有千秋,Zstd 是 Kafka 2.1.0 版本引入的,作为现代均衡的算法,可以作为默认选择。如果想要追求吞吐量,可以选择 LZ4。想要追求极致资源节省,可以选择 GZIP 或者 Zstd。

总结

最后总结一下本文的内容,我们先介绍了怎么 Kafka 在哪里进行压缩的以及如何开启压缩。接着又介绍了 3 种重压缩的场景,重压缩大概率会导致 Broker CPU 飙升。最后对比了 Kafka 支持的 4 种压缩算法。

最后想要抛出一个问题:Consumer 在解压数据时,需要关系 Producer 使用了哪种压缩算法吗?感兴趣的同学欢迎一起交流。