当前位置: 首页 > 知识库问答 >
问题:

Spring Kafka:ApplicationContext中不同对象的多个侦听器

万楷
2023-03-14

我能否向社区咨询一下,听多个主题的最佳方式是什么,每个主题都包含一个不同类别的信息?

在过去的几天里,我一直在玩Spring Kafka。到目前为止我的思考过程:

共有1个答案

陆翰藻
2023-03-14

这里有一个非常简单的例子。

// -----------------------------------------------
// Sender
// -----------------------------------------------

@Configuration
public class SenderConfig {
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        ......
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, Class1> producerFactory1() {
        return new DefaultKafkaProducerFactory<String, Class1>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, Class1> kafkaTemplate1() {
        return new KafkaTemplate<>(producerFactory1());
    }

    @Bean
    public Sender1 sender1() {
        return new Sender1();
    }

    //-------- send the second class --------

    @Bean
    public ProducerFactory<String, Class2> producerFactory2() {
        return new DefaultKafkaProducerFactory<String, Class2>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, Class2> kafkaTemplate2() {
        return new KafkaTemplate<>(producerFactory2());
    }

    @Bean
    public Sender2 sender2() {
        return new Sender2();
    }
}

public class Sender1 {
    @Autowired
    private KafkaTemplate<String, Class1> kafkaTemplate1;

    public void send(String topic, Class1 c1) {
        kafkaTemplate1.send(topic, c1);
   }
}

public class Sender2 {
    @Autowired
    private KafkaTemplate<String, Class2> kafkaTemplate2;

    public void send(String topic, Class2 c2) {
        kafkaTemplate2.send(topic, c2);
    }
}

// -----------------------------------------------
// Receiver
// -----------------------------------------------

@Configuration
@EnableKafka
public class ReceiverConfig {

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        ......
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return props;
    }

    @Bean
    public ConsumerFactory<String, Class1> consumerFactory1() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
            new JsonDeserializer<>(Class1.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Class1> kafkaListenerContainerFactory1() {
        ConcurrentKafkaListenerContainerFactory<String, Class1> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory1());
        return factory;
    }

    @Bean
    public Receiver1 receiver1() {
        return new Receiver1();
    }

    //-------- add the second listener

    @Bean
    public ConsumerFactory<String, Class2> consumerFactory2() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
            new JsonDeserializer<>(Class2.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Class2> kafkaListenerContainerFactory2() {
        ConcurrentKafkaListenerContainerFactory<String, Class2> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory2());
        return factory;
    }

    @Bean
    public Receiver2 receiver2() {
        return new Receiver2();
    }
}

public class Receiver1 {
    @KafkaListener(id="listener1", topics = "topic1", containerFactory = "kafkaListenerContainerFactory1")
    public void receive(Class1 c1) {
        LOGGER.info("Received c1");
    }
}

public class Receiver2 {
    @KafkaListener(id="listener2", topics = "topic2", containerFactory = "kafkaListenerContainerFactory2")
    public void receive(Class2 c2) {
        LOGGER.info("Received c2");
    }
}
 类似资料:
  • 服务器部件: 客户部分:io.js 消息组件 信息形式——发布过程的开始

  • 我想知道如何在一个键事件中按下所有的键。例如,我想为Ctrl+F编写一个监听器,它可以切换全屏。如何检查在一个事件中是否同时按下了Ctrl和F?

  • 我是JComboBox的新手 我有4个JComboBox:专用、etudiant、annee和semestre。 每次更改所选项目并将结果添加到滚动窗格(groupe des matieres ouvertes)时,我都需要从其中的4个项目中获取所选项目

  • 问题内容: 我需要使用同一端口收听2个不同的多播组。会从和那里听。两个多播组都使用相同的文件,但我无法控制它。 运行程序时,我在每个程序中都接收到两个多播流,即和上广播的数据包。我怀疑问题是由于通用端口引起的。这是我用来订阅多播的代码: 如何在每个程序中过滤特定的多播组? 问题答案: 如果你改变 至 您可能会获得更大的成功。 (如果您更改程序以使用,则可以使其适应未来。)

  • 我有一个监听两个不同端口的TCP服务器。我创建了两个不同的套接字,一个在端口8888上,一个在端口6634上。我监听这些端口,然后我在FD_SET中添加两个套接字,并将它们传递给Select()函数...当套接字准备好读取时,我检查FD_ISSET,看看哪个端口上有消息要读取。 不管怎样,当我连接到8888端口时,构思是成功的,我可以向服务器发送和接收。。。当我在客户端ctrl c时,选择函数再次

  • 问题内容: 我可能会尝试以困难的方式执行此操作,所以请让我知道是否有更好的解决方案。 我正在用Java开发一个简单的文字游戏,您可以通过GUI选择动作。我有几个班级正在尝试序列化一个是播放器,另一个是NPC。是否有一种简单的方法可以将一个以上的对象(播放器和NPC)序列化到同一文件中?我可以序列化一个对象并将其加载回游戏中。 我会以错误的方式处理吗?有没有更简单的方法来尝试保存游戏状态? 如果我有