当前位置: 首页 > 知识库问答 >
问题:

使用Flink动态创建和更改Kafka主题

严峰
2023-03-14

我用Flink来读写来自不同Kafka主题的数据。具体来说,我使用的是FlinkKafkaConsumer和FlinkkafKapProducer。

我想知道是否有可能根据我程序中的逻辑或记录本身的内容,将我正在阅读和写作的Kafka主题更改为“即时”。

例如,如果读取带有新字段的记录,我希望创建一个新主题,并开始将带有该字段的记录转移到新主题。

谢谢。

共有2个答案

谭安翔
2023-03-14

更多地考虑需求,

第一步是——你将从一个主题开始(为了简单起见),在运行时根据提供的数据生成更多主题,并将相应的消息定向到这些主题。这是完全可能的,不会是一个复杂的代码。使用ZkClient API检查主题名称是否存在,如果不存在,则使用新名称创建一个模型主题,并开始通过与此新主题相关的新生产者将消息推送到模型主题中。无需重新启动作业即可生成指向特定主题的消息。

你最初的消费者成为制作人(新主题)消费者(旧主题)

第二步是-您要使用新主题的消息。一种方法可能是完全创造一份新工作。您可以通过最初创建线程池并向它们提供参数来实现这一点。

同样要更加小心,如果出现循环错误,更多的自动化可能会导致集群过载。考虑一下,如果输入数据不受控制或只是不干净,一段时间后可能会创建太多主题。正如上面评论中提到的,可能会有更好的体系结构方法。

万俟鸿波
2023-03-14

如果您的主题遵循通用命名模式,例如"topic-n*",您的Flink Kafka消费者可以在添加到Kafka时自动读取"topic-n1"、"topic-n2"、...等等。

Flink 1.5(FlinkKafkaConsumer09)增加了对动态分区发现的支持

接受subscriptionPattern:link的使用者构造函数。

 类似资料:
  • 我需要从配置文件动态创建kafka流,其中包含每个流的源主题名称和配置。应用程序需要有几十个Kafka流和流将是不同的每个环境(例如阶段,prod)。它可能做到这一点与库? 我们可以通过轻松做到这一点: 我们需要实现spring接口,这样所有流都将自动启动和关闭。 是否可以使用做同样的事情?正如我所看到的,我们需要在代码中创建每个Kafka流,我看不到如何使用创建Kafka流列表的可能性。 但是如

  • 我正在使用JOOQ(JOOQ中的新手)在spring boot项目中使用Rest API在运行时创建数据库。在其中一种情况下,我需要创建一个具有复合主键的表,该主键可以是多个列的组合。我使用下面的代码创建约束- 我已经有

  • 今天,我想讨论一个关于Flink的概念性话题,而不是一个技术性话题。 在我们的例子中,我们确实有两个Kafka主题A和B,需要连接。连接应该始终包括主题A中的所有元素,以及主题B中的所有新元素。实现这一点有两种可能:始终创建一个新的使用者并从一开始就开始使用主题A,或者在使用后将主题A中的所有元素保持在一个状态内。现在,技术方法是通过连接两个数据流,这很快就向我们展示了它在这个用例中的局限性,因为

  • 我们正在使用多个Kafka主题,但希望优先考虑其中一些主题(~服务质量)。 根据我在网上找到的,共识是不要限制运算符,而是限制源,更具体地说是反序列化器[1]。 我们如何访问源中有关流媒体环境状态的信息(即主题落后于当前偏移量的程度)。 目前,我们计划将我们的整个设置转换为CoFlatMaps[2],并拥有一个控制流,该控制流为所有主题发出当前偏移滞后-低优先级流运算符,然后根据高优先级流的滞后H

  • 是否可以使用apache POI创建和更改Excel表? API中有一个表示Excel表的类(http://poi.apache.org/apidocs/org/apache/poi/xssf/usermodel/XSSFTable.html),有两个方法createTable和gettable(http://poi.apache.org/apidocs/org/apache/poi/xssf/u