当前位置: 首页 > 知识库问答 >
问题:

带有单个分区的Kafka Streams在出错时暂停

柯翔
2023-03-14

我有一个带有单个分区的Kafka代理。要求执行以下操作:

  1. 从此分区读取
  2. 通过调用REST API转换消息
  3. 将转换后的消息发布到另一个REST API
  4. 将响应消息推送到另一个主题

我使用Kafka Streams通过以下代码实现这一点

StreamsBuilder builder = new StreamsBuilder();`
KStream<Object, Object> consumerStream = builder.stream(kafkaConfiguration.getConsumerTopic());
consumerStream = consumerStream.map(getKeyValueMapper(keyValueMapperClassName));
consumerStream.to(kafkaConfiguration.getProducerTopic(), Produced.with(lStringKeySerde, lAvroValueSerde));
return builder.build();

以下是我的配置:

        streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, String.join(",", bootstrapServers));
        if (schemaRegistry != null && schemaRegistry.length > 0) {
            streamsConfig.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, String.join(",", schemaRegistry));          
        }
        streamsConfig.put(this.keySerializerKeyName, keyStringSerializerClassName);
        streamsConfig.put(this.valueSerialzerKeyName, valueAVROSerializerClassName);
        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
        streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        streamsConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
        streamsConfig.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, FailOnInvalidTimestamp.class);
        streamsConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once");
        streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000);
        streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
        streamsConfig.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
        streamsConfig.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class);
        streamsConfig.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, ProductionExceptionHandler.class);
        streamsConfig.put(StreamsConfig.TOPOLOGY_OPTIMIZATION,StreamsConfig.OPTIMIZE);
        streamsConfig.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionMode);
        streamsConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);

我在我的KeyValueMapper中寻找一种机制来执行以下操作:

  1. 如果任何REST API关闭,则会捕获异常
  2. 我希望在系统备份之前保持相同的偏移量循环,或者暂停消耗直到系统备份

我已经检查了以下链接,但它们似乎没有帮助。

如何使用单个应用实例和单个主题分区有效地运行kafka流?

以下链接讨论了Kafka交易管理器,但这将不起作用,我猜KStream的初始化方式在上面

Kafka事务失败,但无论如何都会偏移提交

任何这方面的帮助/指点将不胜感激。

共有1个答案

秦飞航
2023-03-14

你想做的事情实际上并不受支持。在Kafka Streams中无法暂停消费者。

只有在< code>KeyValueMapper内html" target="_blank">循环时,才能“暂停”处理,但是,在这种情况下,使用者可能会退出使用者组。对于您的情况,使用单个输入主题分区,并且无论如何在单个< code>KafkaStreams实例中只能有一个线程,因此,它不会影响组中的任何其他成员(因为没有任何成员)。然而,问题是,在线程退出组之后,提交偏移量将会失败。因此,在线程重新加入组后,它将获取一个旧的偏移量并重新处理一些数据(也就是说,您会得到重复的数据处理)。为了避免脱离使用者组,可以将< code > max . poll . interval . ms config设置为一个较高的值(甚至可以是< code >整数。MAX _ VALUE )——假设您在消费者组中只有一个成员,设置一个较高的值应该没问题。

另一种选择可能是将transform()与状态存储一起使用。如果无法进行REST调用,请将数据放入存储区,然后重试。这样消费者就不会退出该组。然而,读取新数据永远不会停止,您需要缓冲存储区中的所有数据,直到可以再次调用RESTAPI。您应该能够通过在Transformer中“Hibernate”来减慢读取新数据的速度(以减少需要缓冲的数据量)——您只需要确保不违反max.poll.interval。msconfig(默认值为30秒)。

 类似资料:
  • 问题内容: 我正在尝试使用包含CSV文件的AJAX提交表单。因此,想法是使用ajax发送表单,通过生成表在不同文件中对其进行处理,然后将处理后的表回调回页面。 我所拥有的是 JavaScript是 我在萤火虫中遇到这种错误, 我在这里做错了什么?请帮我 问题答案: 不要将文件传递到构造函数中,而应使用append,例如:

  • 问题内容: 我有以下格式的日期: 2010-03-01T00:00:00-08:00 我向它抛出了以下SimpleDateFormats来对其进行解析: 我有一个使用如下格式的便捷方法: 它似乎已击中该模式,yyyyMMddHHmm但将日期返回为Thu Dec 03 00:01:00 PST 2009。 解析此日期的正确模式是什么? 更新:我不需要时区解析。我不希望在区域之间移动对时间敏感的问题,

  • 我正在使用Apache Kafka 0.8.2.1,计划升级应用程序以使用Apache Kafka 1.0.0。当我考察Kafka流的时候,我得到了一些关于Kafka流和Kafka流的区别的问题。 我知道KafkaConsumer基本上用于字面上,从broker和KafkaStreams可以做各种事情,如或与数据库交互,甚至重新生成到其他kafka或任何其他系统。 所以,这是我的问题。KafkaC

  • 问题内容: 在Python中使用模块时,如何暂时禁用单个单元测试? 问题答案: 单个的测试方法或类都可以使用装饰器禁用。 有关其他选项,请参阅文档“跳过测试和预期的失败”。

  • 下面的spring批处理作业带有一个分区步骤,它为一个分区步骤创建3600个分区。我使用的ThreadPoolTaskExecutor的最大池大小为100,队列容量为100(尽管这似乎对速度没有什么影响)。Im使用Visual VM监视线程,我注意到taskExecutor线程在启动作业后超过5分钟才启动。 奇怪的是,如果我将分区的数量限制为100,那么线程启动得相当快,大约在一分钟内完成。 我注