当前位置: 首页 > 知识库问答 >
问题:

无论如何,在kafka流应用程序中,对不同的输入主题使用不同的auto.offset.reset策略吗?

暴招
2023-03-14

用例是:我有一个kafka流应用程序,它使用一个输入主题,输出到一个中间主题,然后在同一个流中使用这个中间主题的另一个拓扑。

每当应用程序id被更新时,两个主题都从最早开始向消费者传递。我想将中间主题的auto.offset.reset改为latest,同时将输入主题的auto . offset . reset改为earliest。

共有1个答案

哈沛
2023-03-14

是的。您可以通过以下方式为每个主题设置重置策略:

// Processor API
topology.addSource(AutoOffsetReset offsetReset, String name, String... topics); 

// DSL
builder.stream(String topic, Consumed.with(AutoOffsetReset offsetReset));
builder.table(String topic, Consumed.with(AutoOffsetReset offsetReset));

所有这些方法都有一些重载,允许设置它。

 类似资料:
  • 上下文 我编写了几个小的Kafka Connect连接器。一个每秒只生成随机数据,另一个将其记录在控制台中。它们与模式注册表集成,因此数据使用Avro序列化。 我使用Landoop提供的fast data dev Docker映像将它们部署到本地Kafka环境中 基本设置工作,并每秒生成一条记录的消息 但是,我想更改主题名称策略。默认设置生成两个主题:

  • 我有一个Kafka的话题,我正在听。然后将消息内容写入websocket通道,在该通道中我有一个订阅了该通道的SockJS客户机。这很管用。然后我创建了一个新的主题,然后添加了第二个KafKalistener。但是,当调用secong侦听器时,我看到它正在尝试处理/读取与第一个KafkaListener和主题相对应的有效负载,由于它没有被配置为这样做,因此会引发一个MessageConversio

  • 我从教程中创建了示例Kafka Streams应用程序: 不幸的是,这个应用程序不读取输入流。我有一个来自PostgreSQL的JDBC源连接器,它正在处理来自一个数据库的精细流数据(我可以在本主题中的Kafka Connect UI数据上看到)。 我的问题是,即使我在BOOTSTRAP\u SERVERS\u CONFIG的Properties IP is localhost中更改了IP,我也不

  • 我使用flink版本1.13.0 当我试图使用flink doc的Kafka水印策略时,这似乎不起作用,窗口处理功能将不会运行。 我想知道,在Kafka中,水印的时间戳将使用消费时间还是生产时间? 我的消费者代码如下: 并像这样使用窗口: 拓扑图是这样的:

  • 我设置我的类,以便使用Laravel授权和策略功能。但是,在为我的方法定义中间件时,我一直遇到这个错误(类App\Policies\StatusPolicy不存在)。这就是我所拥有的: AuthServiceProvider。php ontroller.php 状态策略。php(由php artisan生成):策略状态策略--model=Status

  • 我正在将一个应用程序迁移到SpringCloudStream的新的基于功能的编程模型中,但阻止了事件路由。 我必须路由来自两个不同kafka主题的事件,我不知道如何将functionRouter-in-0绑定到两个不同的目的地。 路由可以通过添加