当前位置: 首页 > 知识库问答 >
问题:

仅一次两个Kafka集群

松骏俊
2023-03-14

我有 2 个Kafka集群。群集 A 和群集 B。这些集群是完全独立的。我有一个Spring启动应用程序,它侦听集群 A 上的主题,转换事件,然后将其生成到集群 B 上。我只需要一次,因为这些是金融事件。我注意到,对于我当前的应用程序,我有时会遇到重复的情况,也会错过一些事件。我试图尽我所能只实现一次。其中一篇帖子说,与Spring启动相比,flink将是一个更好的选择。我应该搬到闪光灯吗?请参阅下面的Spring代码。

消费者配置

@Configuration
public class KafkaConsumerConfig {

    @Value("${kafka.server.consumer}")
    String server;

    @Value("${kafka.kerberos.service.name:}")
    String kerberosServiceName;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> config = new HashMap<>();

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroDeserializer.class);
        config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        
        // skip kerberos if no value is provided
        if (kerberosServiceName.length() > 0) {
          config.put("security.protocol", "SASL_PLAINTEXT");
          config.put("sasl.kerberos.service.name", kerberosServiceName);
        }

        return config;
    }

    @Bean
    public ConsumerFactory<String, AccrualSchema> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
                new AvroDeserializer<>(AccrualSchema.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, AccrualSchema> kafkaListenerContainerFactory(ConsumerErrorHandler errorHandler) {
        ConcurrentKafkaListenerContainerFactory<String, AccrualSchema> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setAutoStartup(true);
        factory.getContainerProperties().setAckMode(AckMode.RECORD);
        
        factory.setErrorHandler(errorHandler);
        
        return factory;
    }

    @Bean
    public KafkaConsumerAccrual receiver() {
        return new KafkaConsumerAccrual();
    }
}

生产者配置

    @Configuration
public class KafkaProducerConfig {

    @Value("${kafka.server.producer}")
    String server;

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        config.put(ProducerConfig.ACKS_CONFIG, "all");
        config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "prod-1");
        config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
        config.put(ProducerConfig.LINGER_MS_CONFIG, "10");
        config.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32*1024));
        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}

Kafka生产者

    @Service
public class KafkaTopicProducer {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
        
    public void topicProducer(String payload, String topic) {
        kafkaTemplate.executeInTransaction(kt->kt.send(topic, payload));            
    }
}

Kafka消费者

public class KafkaConsumerAccrual {

    @Autowired
    KafkaTopicProducer kafkaTopicProducer;

    @Autowired
    Gson gson;
 
    @KafkaListener(topics = "topicname", groupId = "groupid", id = "listenerid")
    public void consume(AccrualSchema accrual,
            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition, @Header(KafkaHeaders.OFFSET) Long offset,
            @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {       
        
        
        AccrualEntity accrualEntity = convertBusinessObjectToAccrual(accrual,partition,offset);

        kafkaTopicProducer.topicProducer(gson.toJson(accrualEntity, AccrualEntity.class), accrualTopic);

    }

    public AccrualEntity convertBusinessObjectToAccrual(AccrualSchema ao, Integer partition,
            Long offset) {
        //Transform code goes here
        return ae;
    }
}

共有2个答案

单于庆
2023-03-14

根据KIP草案:https://cwiki.apache.org/confluence/display/KAFKA/KIP-656:MirrorMaker2精确一次语义

你可以期待的更好的事情是两个集群之间至少有一个语义。这意味着您必须在目标代理的消费者端防止重复。

例如,您可以确定一组属性,这些属性对于特定的时间窗口必须是唯一的。但我将真正依赖于您的用例。

苏健柏
2023-03-14

确切地说,一旦跨集群不支持语义;关键是生成记录并在一个原子事务中提交消耗的偏移量。

这在您的环境中是不可能的,因为一个事务不能跨越两个集群。

 类似资料:
  • 我试图使用kafka流库只使用一次kafka的功能。我只将proessing.guarantee配置为exactly_once。与此同时,需要将事务状态存储在内部主题(__transaction_state)中。 我的问题是,如何定制主题的名称?如果kafka集群由多个消费者共享,那么每个消费者是否需要不同的事务管理主题? 谢谢你,墨蒂

  • 我在一些关于堆栈溢出的答案中看到,通常在web中也看到,Kafka不支持消费确认,或者消费一次就很难实现。 在以下作为示例的条目中,有没有理由使用RabbitMQ而不是Kafka?,我可以读到以下语句: RabbitMQ将保留已消耗/已确认/未确认消息的所有状态,而Kafka则不保留 或 有人能解释一下为什么Kafka的“一次消费保证”很难实现吗?这与Kafka和RabbitMQ等其他更传统的消息

  • 问题内容: 说我有一个数组。如何一次迭代两个? 问题答案: 您可以使用称为stride(to :, by :)的进度循环,每n个元素对元素进行一次迭代: Xcode 8.3.2•Swift 3.1

  • 问题内容: 我在集群环境中将Quartz Scheduler用作Spring bean。 我有一些用@NotConcurrent注释的作业,它们每个集群运行一次(即,仅在一个节点中,仅在一个线程中)。 现在,我需要在集群的每个节点上运行一项作业。我删除了@NotConcurrent批注,但是它仅在一台计算机上的每个线程上运行。它不会在其他节点上触发。 我应该用什么来注释作业? 示例:带注释的Job

  • 我试图连接两个Ktable流,似乎作为连接操作的一个输出,我两次得到与输出相同的消息。似乎在此操作过程中调用了两次值Joiner。 让我知道如何解决这个问题,以便只有一条消息作为加入操作的输出发出。 由于两个ktable(msg1和msg2)之间的连接,我收到两条相同的消息。

  • 问题内容: 我有两个表需要相同的值以实现非规范化。 这是查询。 第一张桌子 第二张桌子 如您所见,两个表之间的唯一区别是它们的名称和两个表没有该字段 无论如何将两个更新合并为一个? 问题答案: 如文档中所述,应该可以进行多表更新。 http://dev.mysql.com/doc/refman/5.5/en/update.html 注意:多表不支持LIMIT,因此根据具体情况,这可能会引起更多麻烦