原文来自linkedin的一篇PPTproducer-performance-tuning-for-apache-kafka。
给定一个要发送的数据集,在满足持久性、有序性的前提下优化以下两点:
优化专注于优化平均性能,这样对所有的producer都有效。
PS: 更大的批次,意味着更好的压缩率、更高的吞吐量。但是负面影响,就是延迟会高些。
这个之前在kafka生产者原理详解一文中做了一些分析。现在来看看kafka的 committer如何来分析的发送者原理的。其分析相对更加简明扼要。
发送者发送消息的过程简单概括为:
序列化
假设现在Record Accumulator中已经包含了如下的数据:
当一个batch准备完毕后,用户线程就可以去执行具体的发送操作了。当满足以下条件之一时,我们认为一个batch是已经“准备完毕的”:
用户线程获取batch的过程如下:
PS: 接下来的说明,都假设max.in.flight.requests.per.connection=1
生产者调优,主要可以利用kafka-producer-perf-test.sh(org.apache.kafka.tools.ProducerPerformance)。通过测试不同的配置来对比发送效率。
使用方法例子:
./kafka-producer-perf-test.sh --num-records 1000000 --record-size 1000 --topic becket_test_3_replicas_1_partition --throughput 1000000 --producer-props bootstrap.servers=192.168.1.22:9092 max.in.flight.requests.per.connection=1 batch.size=100000 compression.type=lz4
PS: kafka 0.8的版本还支持thread-num等选项,现在0.10.1中还没有,不过已经有issue在解决了。相信马上会有了。详情见:
关于第三点,是以前没有的特性。这个对生产者调优十分重要。使用ProducerPerformance的时候,打印的度量信息有:
PS:以上度量信息,需要至少1分钟运行时间才能保证稳定。
使用例子:
./kafka-producer-perf-test.sh --num-records 1000000 --record-size 1000 --topic becket_test_3_replicas_4_partition --throughput 100000 --num-threads 1 --value-bound 50000 --producer-props bootstrap.servers=localhost:9092 compression.type=gzip max.in.flight.requests.per.connection=1
吞吐量可以用以下公式估算:
throughput_Avg(平均吞吐量) ~= Request_Rate_Avg (平均请求速率)* Request_Size_Avg(平均请求大小) / Compression_Rate_Avg (压缩率)
估算的实际值会比实际值大一些,因为会有一些request overhead没有考虑进去。
平均请求大小的计算公式为:
Request_Size_Avg(平均请求大小) = Records_Per_Request_Avg (每个请求的消息数)Record_Size (消息大小) Compression_Rate_Avg(压缩率) +Request_Overhead
request overhead取决于:
假设我们使用以下的生产者来测试:
根据得到的结果,我们发现吞吐量为9.96MB/s,远远小于我们实际的网络带宽1Gbps。
request_rate_avg和理论上限差距不大,而压缩率又是固定的。所以我们的目标为增大request_size_avg来增加吞吐量。增加吞吐量的方式主要有:
linger.ms与batch size、压缩率以及吞吐量和延迟之间的关系:
上图看出来,batching增大之后,吞吐量反而变差了,而且压缩率也只有少量增长。这种原因主要是:增大batch会显著增加压缩的耗时。
相关测试:
总结: 一般我们说增大批次,都有利于增加吞吐量(减少了网络IO次数)。但是这里之所以行不通是因为增大批次带来的好处无法抵消压缩时间的增长。从上图的实验结果可以看到,采用16KB或者索性采用较大的256KB都是可以的。避免采用处在中间的batch size
可见:发送者的线程数,不是越多越好,因为线程数过多会影响延迟,而且有时候会产生负面效果。但是一般线程数小于topic分区数都是没啥问题的。
通过增加分区数、线程数、batch size,使得吞吐量得到改善:
acks=all的时候,瓶颈很有肯能发生在replication time。
高水位线的值变更需要等待下一次fetch过来之后才变更。所有ProduceRequest里面的高水位线全部抵达当前offset了,才会返回ProduceResponse。
第二个fetch过来的时候,partition0的高水位线移动到当前offset
假设broker1只有1个replication线程,则replicaiton time为
显而易见的是增加num.replica.fetchers,从而使得并发的fetch来做复制。这样的Replication time则为:
设置多少的replica fetchers合理?一般按照官方的生产建议设置成4就好了。
有个跨洋的pipeline
现有情况的计算,发现确实吞吐量比较低。
解决办法是增加send和 recieve buffer。下图可以看到增大吞吐量之后,最多能达到20MB/s的吞吐量。