当前位置: 首页 > 知识库问答 >
问题:

KafkaReactor:准确处理样品一次

乐正峰
2023-03-14

我读过许多文章,其中有许多不同的配置来实现一次处理。

下面是我的生产者配置:

final Map<String, Object> props = Maps.newConcurrentMap();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);


props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.ACKS_CONFIG, "all"); 
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "producer-tx-1");

以下是我的使用者配置:

final Map<String, Object> props = Maps.newHashMap();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

我试图跟随,但我遇到了一些问题:

下面是我的生产者代码:

    @Override
public Mono<SenderResult<Void>> buy(Message msg) {
    final ReactiveKafkaProducerTemplate kafkaProducerTemplate = kafkaConfig.getKafkaProducerTemplate();
    return kafkaProducerTemplate.transactionManager().begin().then(kafkaProducerTemplate.send(mytopic, msg));

}

我的消费代码:

@Override
public void run(ApplicationArguments arg0) throws Exception {
    final ReactiveKafkaProducerTemplate kafkaProducerTemplate = kafkaConfig.getKafkaProducerTemplate();
    final ReactiveKafkaConsumerTemplate kafkaConsumerTemplate = kafkaConfig.getKafkaConsumerTemplate(mytopic, Message.class);

    final Flux<ConsumerRecord<String, Message>> flux = kafkaConsumerTemplate.receiveExactlyOnce(kafkaProducerTemplate.transactionManager())
            .concatMap(receiverRecordFlux -> receiverRecordFlux );

    flux.subscribe(record -> {
        final Message message = record.value();

        System.out.printf("received message: timestamp=%s key=%d value=%s\n",
                dateFormat.format(new Date(record.timestamp())),
                record.key(),
                message);
 transactionService.processAndSendToNextTopic(message)
                .doOnSuccess(aVoid -> kafkaProducerTemplate.transactionManager().commit())
                .subscribe();

    });
}
Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION

共有1个答案

公冶森
2023-03-14

参见ReceiveExactlyOnce的javadocs

/**
 * Returns a {@link Flux} of consumer record batches that may be used for exactly once
 * delivery semantics. A new transaction is started for each inner Flux and it is the
 * responsibility of the consuming application to commit or abort the transaction
 * using {@link TransactionManager#commit()} or {@link TransactionManager#abort()}
 * after processing the Flux. 

begin()已经被调用,所以不需要调用它。

@Override
public Flux<Flux<ConsumerRecord<K, V>>> receiveExactlyOnce(TransactionManager transactionManager) {
    this.ackMode = AckMode.EXACTLY_ONCE;
    Flux<ConsumerRecords<K, V>> flux = withDoOnRequest(createConsumerFlux());
    return  flux.map(consumerRecords -> transactionManager.begin()
                             .then(Mono.fromCallable(() -> awaitingTransaction.getAndSet(true)))
                             .thenMany(transactionalRecords(transactionManager, consumerRecords)))
                             .publishOn(transactionManager.scheduler());
}
 类似资料:
  • 问题内容: 我一直在概念上为我的项目决定异常处理结构。 假设您有一个示例: 还有两个子类FileData和StaticData,它们从某些指定的文件中读取数据,StaticData仅返回一些预定义的常量数据。 现在,在读取文件时,可能会在FileData中引发IOException,但是StaticData将永远不会抛出。大多数样式指南建议在调用堆栈上传播Exception,直到有足够的上下文可以

  • 我试图从CodingBat做这个问题,但不明白为什么它不与输入字符串“你好!”工作。 这是我的代码,下面是我得到的结果。

  • 我已经设置了一个Flink 1.2独立集群,其中包含2个JobManager和3个TaskManager,我正在使用JMeter通过生成Kafka消息/事件对其进行负载测试,然后处理这些消息/事件。处理作业在TaskManager上运行,通常需要大约15K个事件/秒。 作业已设置EXACTLY_ONCE检查点,并将状态和检查点持久化到Amazon S3。如果我关闭运行作业的TaskManager需

  • 问题内容: 我需要一种非常准确的方式来计时程序的各个部分。我可以为此使用常规的高分辨率时钟,但这将返回挂钟时间,这不是我所需要的:我只需要花时间运行我的进程。 我清楚地记得看到过一个Linux内核补丁,该补丁可以使我将进程的时间定为纳秒级,但我忘了给它加上书签,也忘了补丁的名称了:(。 我记得它是如何工作的: 在每个上下文切换上,它将读取高分辨率时钟的值,并将最后两个值的增量添加到正在运行的进程的

  • 对于Spring Cloud Stream示例,请参阅GitHub上的spring-cloud-stream样本存储库。

  • 问题内容: 我现在正在做一些React,我想知道是否有一种“正确的”方式来进行条件样式设计。在本教程中,他们使用 我不想使用内联样式,因此我想使用一个类来控制条件样式。一个人将如何以React的思维方式来实现这一目标?还是应该只使用这种内联样式方式? 问题答案: 如果您更喜欢使用类名,请务必使用类名。 您可能还会发现类名称包很有用。有了它,您的代码将如下所示: 没有“正确”的方法来进行条件样式设计