我用Flink来读写来自不同Kafka主题的数据。具体来说,我使用的是FlinkKafkaConsumer和FlinkkafKapProducer。
我想知道是否有可能根据我程序中的逻辑或记录本身的内容,将我正在阅读和写作的Kafka主题更改为“即时”。
例如,如果读取带有新字段的记录,我希望创建一个新主题,并开始将带有该字段的记录转移到新主题。
谢谢。
更多地考虑需求,
第一步是——你将从一个主题开始(为了简单起见),在运行时根据提供的数据生成更多主题,并将相应的消息定向到这些主题。这是完全可能的,不会是一个复杂的代码。使用ZkClient API检查主题名称是否存在,如果不存在,则使用新名称创建一个模型主题,并开始通过与此新主题相关的新生产者将消息推送到模型主题中。无需重新启动作业即可生成指向特定主题的消息。
你最初的消费者成为制作人(新主题)消费者(旧主题)
第二步是-您要使用新主题的消息。一种方法可能是完全创造一份新工作。您可以通过最初创建线程池并向它们提供参数来实现这一点。
同样要更加小心,如果出现循环错误,更多的自动化可能会导致集群过载。考虑一下,如果输入数据不受控制或只是不干净,一段时间后可能会创建太多主题。正如上面评论中提到的,可能会有更好的体系结构方法。
如果您的主题遵循通用命名模式,例如"topic-n*",您的Flink Kafka消费者可以在添加到Kafka时自动读取"topic-n1"、"topic-n2"、...等等。
Flink 1.5(FlinkKafkaConsumer09)增加了对动态分区发现的支持
接受subscriptionPattern:link的使用者构造函数。
我需要从配置文件动态创建kafka流,其中包含每个流的源主题名称和配置。应用程序需要有几十个Kafka流和流将是不同的每个环境(例如阶段,prod)。它可能做到这一点与库? 我们可以通过轻松做到这一点: 我们需要实现spring接口,这样所有流都将自动启动和关闭。 是否可以使用做同样的事情?正如我所看到的,我们需要在代码中创建每个Kafka流,我看不到如何使用创建Kafka流列表的可能性。 但是如
我正在使用JOOQ(JOOQ中的新手)在spring boot项目中使用Rest API在运行时创建数据库。在其中一种情况下,我需要创建一个具有复合主键的表,该主键可以是多个列的组合。我使用下面的代码创建约束- 我已经有
今天,我想讨论一个关于Flink的概念性话题,而不是一个技术性话题。 在我们的例子中,我们确实有两个Kafka主题A和B,需要连接。连接应该始终包括主题A中的所有元素,以及主题B中的所有新元素。实现这一点有两种可能:始终创建一个新的使用者并从一开始就开始使用主题A,或者在使用后将主题A中的所有元素保持在一个状态内。现在,技术方法是通过连接两个数据流,这很快就向我们展示了它在这个用例中的局限性,因为
我们正在使用多个Kafka主题,但希望优先考虑其中一些主题(~服务质量)。 根据我在网上找到的,共识是不要限制运算符,而是限制源,更具体地说是反序列化器[1]。 我们如何访问源中有关流媒体环境状态的信息(即主题落后于当前偏移量的程度)。 目前,我们计划将我们的整个设置转换为CoFlatMaps[2],并拥有一个控制流,该控制流为所有主题发出当前偏移滞后-低优先级流运算符,然后根据高优先级流的滞后H
我正在编程一个客户端工作与Kafka0.9。我想知道如何制造话题。这个答案:如何通过Java在Kafka中创建一个主题,与我所问的类似。除此之外,该解决方案仅适用于Kafka 0.8.2,这与Kafka 0.9的API有很大不同。