标题比较长,实在想不出什么好的描述。大概要解决的问题就是,同一个服务同时监听多个topic,且在每个topic中的group都不相同,具体看问题描述吧。

一、问题背景

 前几天部署了一套系统,每个服务都搭建了多个节点,而且是没有主从关系的节点。每个服务中有很多东西是放到缓存中的,配置多节点之后,相同服务的不同节点出现了缓存不一致的问题。

二、问题描述

 刚开始想出一种解决方案,监听同一个topic1,每个节点分到一个group中,这样每次生产者生产消息后,kafka会将消息分发到所有group中,消息中带一个消息类型字段(mq_type)。
各个节点由于处于不同group中都会消费此消息,然后根据mq_type判断是否该处理此消息。
 然而,pass。原因:由于此系统(系统B)中的服务1还与系统A有消费与生产消息的关系,都放到一个topic下数据不规范。而且如果多个服务1同时消费消息,会进行读表改表操作,还得做处理。

 emmm,又想出了一种解决方案,系统B中每个节点还是分到不同的group中,当某个服务1消费到系统A发送的消息,需要刷新缓存时,该节点对所有节点通过系统B内部的消息队列topic2进行广播,各个服务接收到消费消息后根据消息类型进行缓存的更新。
具体系统图如下:

图片备用链接
ps:以上区分两个topic是为了规范来自不同的渠道的数据走不同的topic,如果没有这种要求完全没有必要做如下这种操作,可以直接通过group和消息内容去做区分
 如上图,系统A通过topic1向系统B中的服务1发送消息,系统B中服务1和服务2以及他们的其他节点在系统B中通过topic2发送消息。
 可以看出,系统B中的服务1扮演了三个角色:系统A发送消息的消费者,系统B内部消息的生产者和消费者。可以得出如下问题:

对于服务1,需要将其配置为监听两个topic,分别监听topic1和topic2
系统A向系统B发送消息时,服务1以及他的其他节点处于topic1的同一个group下,即只有一个服务1节点会去消费系统A发来的消息
系统B内部之间发送消息时,每个服务和节点都处于topic2的不同group下

 说到这里,其实就清楚很多了。其实就是想让服务1-1和他的其他节点在topic1中都处于group-A2B中,服务1-1在topic2中处于group-service1-1中,服务1-2在topic2中处于group-service1-2中。

三、需求实现

3.1 代码基础

 kafka的基础代码请参照我的以下两篇博客,本次修改都是基于这些代码的基础上改造的
 Kafka及Spring&Kafka整合
 kafka动态配置topic

3.2 生产者

 kafka生产者发送消息时,会向该topic下的所有group发送消息,而每个group只会有一个消费者进行消费。所以生产者不用进行更改。

3.3 消费者

3.3.1 消费者的配置

 以下消费者以服务1-1为例,其他节点服务同理。
 由于同一个服务要扮演两个消费者,所以我们需要不同的配置文件用来生成不同的消费者

//首先是获取公共配置方法public Map<String, Object> getCommonPropertis(){Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConfig.BOOTSTRAP_SERVERS);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put("auto.offset.reset", "latest");// 一般配置earliest 或者latest 值return props;}//然后不同的用来生成不同消费者的工厂//topic1的消费者public ConsumerFactory<String, String> consumerFactoryA2B() {Map<String, Object> properties = getCommonPropertis();//所在groupproperties.put(ConsumerConfig.GROUP_ID_CONFIG, "group-A2B");return new DefaultKafkaConsumerFactory<String, String>(properties);}//系统B内topic2的每个服务的group我这里用服务名+ip+端口命名String GROUP_NAME = "service1-1-"+serviceInfoUtil.getIpAddress()+"-"+serviceInfoUtil.getLocalPort();//topic2的消费者public ConsumerFactory<String, String> consumerFactoryB2B(){Map<String, Object> properties = getCommonPropertis();//所在groupproperties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_NAME);return new DefaultKafkaConsumerFactory<String, String>(properties);}//再通过不同的配置工厂生成实例bean//topic1的消费者bean@Beanpublic KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactoryA2B() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactoryA2B());//通过不同工厂获取实例factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}  //topic2的消费者bean@Beanpublic KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactoryB2B() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactoryB2B());//通过不同工厂获取实例factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}  

3.3.2 消费者的使用

 以上消费者的配置就算完成了,接下来就可以直接使用了。

     /*** 监听B2B所有消息* @param record*/@KafkaListener(topics = "#{'${kafka.B2B.listener_topics}'.split(',')}",containerFactory = "kafkaListenerContainerFactoryB2B")public void B2Bconsume(ConsumerRecord<?, ?> record){recordDeal(record);}/*** 监听A2B的所有消息* @param record*/@KafkaListener(topics = "#{'${kafka.A2B.listener_topics}'.split(',')}",containerFactory = "kafkaListenerContainerFactoryA2B")public void A2Bconsume(ConsumerRecord<?, ?> record) {recordDeal(record);}//containerFactory = "kafkaListenerContainerFactoryA2B"  主要就是这个containerFactory参数,用它控制是哪个实例

3.3.3 获取服务启动的ip和端口类

@Configuration
public class ServiceInfoUtil {public static String getIpAddress() throws UnknownHostException {InetAddress address = InetAddress.getLocalHost();return address.getHostAddress();}public static String getLocalPort() throws MalformedObjectNameException {MBeanServer beanServer = ManagementFactory.getPlatformMBeanServer();Set<ObjectName> objectNames = beanServer.queryNames(new ObjectName("*:type=Connector,*"),Query.match(Query.attr("protocol"), Query.value("HTTP/1.1")));String port = objectNames.iterator().next().getKeyProperty("port");return port;}
}

3.3.4 最后

 这样修改后启动时,通过配置文件中的kafka.A2B.listener_topics去判断这个消费者该监听哪个topic,通过containerFactory = "kafkaListenerContainerFactoryA2B"判断这个消费者在这个topic中属于哪个group。
然后发送消息测试,成了。

四、感谢大佬

这几个大佬的对于kafka的group的讲解比较好:
KAFKA 多个消费者同一个GROUPID,只有一个能收到消息的原因
Kafka消费组(consumer group)
springboot 集成kafka 实现多个customer不同group

kafka一个服务配置到不同topic的不同group相关推荐

  1. Kafka SSL服务配置及客户端使用(Linux+Pykafka)

    内容:1: Kafka及Zookeeper快速安装配置及测试2: Kafka SSL服务端配置3: Kafka 客户端3.1: Linux下配置及测试3.2: Pykafka配置及测试4: 脚本5: ...

  2. kafka java 查询信息_Kafka查看topic、consumer group状态命令

    最近工作中遇到需要使用kafka的场景,测试消费程序启动后,要莫名的过几十秒乃至几分钟才能成功获取到到topic的partition和offset,而后开始消费数据,于是学习了一下查看kafka br ...

  3. kafka消息服务的producer、broker、consumer的配置

    2019独角兽企业重金招聘Python工程师标准>>> server.properties配置: server.properties中所有配置参数说明(解释)如下列表: 参数 说明( ...

  4. 汽车位置服务之kafka集群配置注意事项

    1)  重复消费问题. 问题描述  采用kafka读取消息进行处理时,consumer会重复读取afka队列中的数据. 问题原因  kafka的consumer消费数据时首先会从broker里读取一批 ...

  5. kafka多分区只有一个在消费_Kafka的topic为什么要分区

    ​从这篇文章你将了解到什么?Kafka的topic为什么要分区. 消费者组的作用. Kafka的分区分配. "Kafka是一个分布式.支持分区的(partition).多副本的(replic ...

  6. Kafka监控工具KafkaOffsetMonitor配置及使用

    一.KafkaOffsetMonitor简述 KafkaOffsetMonitor是Kafka的一款客户端消费监控工具,用来实时监控Kafka服务的Consumer以及它们所在的Partition中的 ...

  7. 深度对比三种主流微服务配置中心

    在撰写这篇技术选型的文章之前,是比较犹豫的.因为,以其中一个开源项目开发者的身份,去写一篇三个开源项目的对比,即便很克制的去客观的比较,也很难有信服力.这就像,既是参赛选手,又想做裁判,观众肯定是不买 ...

  8. Kafka整体结构图、Consumer与topic关系、Kafka消息分发、Consumer的负载均衡、Kafka文件存储机制、Kafka partition segment等(来自学习资料)

    ##1. Kafka整体结构图 Kafka名词解释和工作方式  Producer : 消息生产者,就是向kafka broker发消息的客户端.  Consumer : 消息消费者,向kafka ...

  9. kafka 消息服务

    apache kafka参考 http://kafka.apache.org/documentation.html 消息队列方式: 点对点: 消息生产者生产消息发送到queue中,然后消息消费者从qu ...

最新文章

  1. 在Ubuntu 14.04 64bit上安装思维导图工具XMind
  2. 数论基础--洛谷P1072 Hankson 的趣味题
  3. python3入门书籍-学习python3入门书籍选哪些?
  4. maven-source 1.3 中不支持注释请使用 -source 5 或更高版本以启用注释
  5. 豆瓣图书的推荐与搜索、简易版知识引擎构建(neo4j)
  6. centos7 firewall指定IP与端口访问(常用)
  7. c 语言车牌识别系统课题设计,车牌识别系统的设计--课程设计报告.doc
  8. PTA-7-8 删除重复字符 (20分)(C语言)
  9. wamp phpcms部署网站问题
  10. Java-构建器模式(Buider模式)
  11. 为什么会有ResNet? Why ResNet?
  12. mysql 递归查找父节点_MySQL递归查询树状表的子节点、父节点具体实现
  13. Google Jump Consistent Hash 一致性哈希算法
  14. python做图片浏览器_保护隐私,用Python打造自己的照片浏览器
  15. pdf python 3.7编程快速入门 潘中强_无python基础,这些书籍可以帮您快速入门。
  16. 环境判断:区别h5打开还是weixin打开?
  17. C#串口操作实际应用开发详解
  18. selenium所有检测点和绕过方式[运行命令后被检测/打开就被检测/环境检测]
  19. 金蝶软件和用友软件部署在阿里云ECS具体方法及教程
  20. 工字型钢弹性截面模量计算公式_弹性截面模量与塑性截面模量的例析

热门文章

  1. 幽灵的威胁:针对石油运输船的黑客攻击活动
  2. 用ubuntu下载电影:磁力链接,torrent,迅雷链接
  3. IoT 物联网设备该如何选择固件升级 OTA ?
  4. Scratch3.0创意编程(基础篇):第6课 逃离细菌
  5. mysql子查询一般分为几种_子查询一般分为几种
  6. java下拉框查询_Java Swing应用程序 JComboBox下拉框联动查询
  7. html抢答器代码,单片机八路抢答器代码+Proteus仿真
  8. 怎么用matlab计算机械手运动,Matlab Robotics ToolBox 实战 -- 斯坦福机械手运动学建模及分析...
  9. 小米商城html代码_微服务架构实战:商城的用户登录与账户切换设计、订单查询设计...
  10. 可攻陷所有WiFi网络!KRACK 漏洞发现者回答纪实