当kafka分区不能再增加的情况下,使用多线程提升kafka消费能力(附源码)

前两天csdn提醒我又多了一个粉丝,又激发了写作的动力,不要点开看我的粉丝数,哈哈!      

正常情况下,kafka的消费线程数据是分区(patition)一对一,单个patition是kafka并行操作的最小单元,kafka只允许单个partition的数据被一个consumer线程消费,例如我们做20个分区,实际上就对应着20个消费线程,当我们做一些活动的时候,就会有发生消息量猛增,而我们的消费线程有限,处理消息的能力有可能跟不上,导致大量的消息堆积处理不完。

这时我们可能就需求要优化,加大处理能力,多数人可能会想到增加分区,分区是可以增加,但是不可能一直无限向上增加,我们这里参用多线程的方案。

package com.imcbb;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.util.concurrent.*;

/**
* @author kevin
* Date 2020-09-24
* Time 09:43
*/
@Service
public class KafkaConsumer {
private static Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);

ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 3, 10, TimeUnit.SECONDS,
new SynchronousQueue<>(),
new ThreadFactoryBuilder().setNameFormat("KThread-%d").build(),
(r, executor) -> {
logger.warn("Ops,Rejected!");
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
e.printStackTrace();
}

}
// new ThreadPoolExecutor.CallerRunsPolicy()

);

@KafkaListener(topics = "myTopic")
public void listen(ConsumerRecord<?, ?> cr) {

executor.execute(() -> {
logger.info("---------" + cr.toString());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
});

}
}

在上面的代码中,我们创建了一个线程池来消费一个分区的消息,这里有两个需要特别注意的点:

1.线程池里存放task的队列这里我们使用了SynchronousQueue阻塞队列,这个队列很有趣,是一个无容量队列,具体有兴趣的大家再上网查这里不做过多的解释,使用SynchronousQueue是因为,如果系统发生异常宕机或者应用发版重启时防止队列堆积的消息丢失(当然这也无法完全避免在线程工作中,系统发生异常导致线程挂掉,如果要求可用性更高,可考虑先存入数据库,redis中,再做补偿处理机制)。

2.当消息过多,线程池也忙不过时,我们这里有两种处理办法

2.1 让监听消费线程阻塞,使用new ThreadPoolExecutor.CallerRunsPolicy(),此时消息将不会再消费

2.1 自定义拒绝策略,把任务再放回去

(r, executor) -> {
logger.warn("Ops,Rejected!");
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
e.printStackTrace();
}

}

以上两种方案都可以解决问题,选一种使用即可,两种方案都无法保证处理消息的顺序。

使用第二种有一个好处,就是可以通过日志查看消费端的一个消费能力,看看有没有进到拒绝里,来适当的调整线程数。

以上就是多线程消费的办法,这个在我们生产环境经过考验的,哈哈!

文章简单帖了些代码,完整写了一个demo给大家,可以自行下载参考:https://github.com/kevinmails/kafka-consumer-demo

前提要在本机上安装一下kafka,官方有安装手册(最小可用版本,本地测试够用了),如果大家看官方留言搞不定,留言给我,我再写一篇安装文章。官方有安装手册:https://kafka.apache.org/quickstart

希望对大家有帮助!

参考:https://www.confluent.io/blog/how-choose-number-topics-partitions-kafka-cluster/

 

 

 

原创:https://www.panoramacn.com
源码网提供WordPress源码,帝国CMS源码discuz源码,微信小程序,小说源码,杰奇源码,thinkphp源码,ecshop模板源码,微擎模板源码,dede源码,织梦源码等。

专业搭建小说网站,小说程序,杰奇系列,微信小说系列,app系列小说

当kafka分区不能再增加的情况下,使用多线程提升kafka消费能力(附源码)

免责声明,若由于商用引起版权纠纷,一切责任均由使用者承担。

您必须遵守我们的协议,如果您下载了该资源行为将被视为对《免责声明》全部内容的认可-> 联系客服 投诉资源
www.panoramacn.com资源全部来自互联网收集,仅供用于学习和交流,请勿用于商业用途。如有侵权、不妥之处,请联系站长并出示版权证明以便删除。 敬请谅解! 侵权删帖/违法举报/投稿等事物联系邮箱:2640602276@qq.com
未经允许不得转载:书荒源码源码网每日更新网站源码模板! » 当kafka分区不能再增加的情况下,使用多线程提升kafka消费能力(附源码)
关注我们小说电影免费看
关注我们,获取更多的全网素材资源,有趣有料!
120000+人已关注
分享到:
赞(0) 打赏

评论抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

您的打赏就是我分享的动力!

支付宝扫一扫打赏

微信扫一扫打赏