当前位置: 首页 > 知识库问答 >
问题:

Kafka与交易制作人只有一次

孟德曜
2023-03-14

我试图用事务性生产者/消费者来准确地理解Kafka。

我遇到了下面的例子。但是,我还是很难准确地理解一次。这个代码正确吗?

制作人sendOffsetsToTransaction-此代码的作用是什么?这是否应该针对同一个目标主题?

什么是消费者之前的系统崩溃。commitSync();//将再次读取相同的消息并生成重复消息

public class ExactlyOnceLowLevel {

    public void runConsumer() throws Exception {
        final KafkaConsumer<byte[], byte[]> consumer = createConsumer();
        final Producer<Long, String> producer = createProducer();

        producer.initTransactions();

        consumer.subscribe(Collections.singletonList(TOPIC));

        while (true) {
            final ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(100));

            try {
                final Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
                producer.beginTransaction();
                for (final ConsumerRecord<byte[], byte[]> record : records) {
                    System.out.printf("Received Message topic =%s, partition =%s, offset = %d, key = %s, value = %s\n", record.topic(), record.partition(),
                                record.offset(), record.key(), record.value());

                    final ProducerRecord<Long, String> producerRecord =
                                new ProducerRecord<>(TOPIC_1, new BigInteger(record.key()).longValue(), record.value().toString());
                    // send returns Future
                    final RecordMetadata metadata = producer.send(producerRecord).get();
                    currentOffsets.put(new TopicPartition(TOPIC_1, record.partition()), new OffsetAndMetadata(record.offset()));
                }
                producer.sendOffsetsToTransaction(currentOffsets, "my-transactional-consumer-group"); // a bit annoying here to reference group id rwice
                producer.commitTransaction();
                consumer.commitSync();
                currentOffsets.clear();
                // EXACTLY ONCE!
            }
            catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
                e.printStackTrace();
                // We can't recover from these exceptions, so our only option is to close the producer and exit.
                producer.close();
            }
            catch (final KafkaException e) {
                e.printStackTrace();
                // For all other exceptions, just abort the transaction and try again.
                producer.abortTransaction();
            }
            finally {
                producer.flush();
                producer.close();
            }
        }
    }

    private static KafkaConsumer<byte[], byte[]> createConsumer() {
        final Properties consumerConfig = new Properties();
        consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());

        consumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED); // this has to be

        return new KafkaConsumer<>(consumerConfig);
    }

    private static Producer<Long, String> createProducer() {
        final Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());

        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        props.put(ProducerConfig.RETRIES_CONFIG, 3); // this is now safe !!!!
        props.put(ProducerConfig.ACKS_CONFIG, "all"); // this has to be all
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // this has to be 1

        return new KafkaProducer<>(props);
    }

    public static void main(final String... args) throws Exception {

        final ExactlyOnceLowLevel example = new ExactlyOnceLowLevel();
        example.runConsumer();

    }
}

共有1个答案

岳浩宕
2023-03-14

在Kafka事务中使用读/处理/写模式时,不应尝试向使用者提交偏移量。正如你暗示的,这可能会导致问题。

在这个用例中,需要将偏移添加到事务中,您应该只使用sendOffsetsToTransaction()来实现这一点。该方法确保只有在完整事务成功时才提交这些偏移量。请参阅Javadoc:

将指定偏移的列表发送给消费者组协调员,并将这些偏移标记为当前事务的一部分。只有当事务成功提交时,这些偏移才会被视为已提交。提交的偏移量应该是应用程序将使用的下一条消息,即lastProcessedMessageOffset 1。

当您需要将已消费和已生成的消息一起批处理时,应该使用此方法,通常是在消费-转换-生成模式中。因此,指定的consumerGroupId应该与配置参数group相同。使用过的消费者的id。请注意,使用者应该具有enable。汽车commit=false,也不应手动提交偏移量(通过同步或异步提交)。

 类似资料:
  • 我想让我的Kafka制作人变得富有交易性。我正在发送10条消息。如果发生任何错误,则不应向Kafka发送任何消息,即无或全部。 我使用的是Spring Boot KafkaTemplate。 我正在发送文件中提到的10条信息,如下所示。应发送9条消息,且I消息大小超过1MB,由于 https://docs.spring.io/spring-kafka/reference/html/#using-K

  • 我正在考虑创建一个独立的Kafka生产者,它作为守护进程运行,通过套接字接收消息,并将其可靠地发送给Kafka。 但是,我决不能是第一个想到这个想法的人。这样做的目的是避免使用PHP或Node编写Kafka生成器,而只是通过套接字将消息从这些语言传递到独立的守护进程,这些语言负责传递,而主应用程序则一直在做自己的事情。 此守护进程应负责在发生中断时进行重试传递,并充当服务器上运行的所有程序的传递点

  • 我用的是Kafka2,看起来只有一次 Kafka流 Kafka读取/转换/写入事务生产者 Kafka连接 在这里,上述所有工作在主题之间(源和目标是主题)。 有可能与其他目的地只进行一次航班吗?

  • 我正在探索反应性Kafka,只是想确认反应性Kafka是否等同于同步制作人。与同步生产者,我们得到消息传递保证与确认字符和生产者序列保持。但是,ASYNC不能保证交付和测序。反应式生产者等同于SYNC还是ASYNC?

  • 我想使用spring cloud stream framework创建一个kafkaendpoint,它将有一个http post api到。如何动态更改属性 我可以使用实现来实现上述功能,但不知道是否有可能在Spring中开发此功能。

  • 我使用spring框架和有3个代理集群的kafka。我发现使用者没有使用某些消息(假设在所有发送消息中使用0.01%),所以在生产者代码中,我记录了API返回的消息偏移量: 我使用返回偏移量来查询所有分区中的kafka主题,但它没有找到消息(我测试了与消费者使用的和他们在kafka中的消息相关的其他偏移量),问题是什么,我如何确保该消息发送到kafka? 我还在producer中使用了