有什么方法可以让我的Kafka Stream应用程序自动从新创建的主题中读取?
即使主题是在流应用程序已经运行时创建的?
类似于在主题名称中使用通配符,如下所示:
KStream<String, String> rawText = builder.stream("topic-input-*");
现在,我有多个客户端将数据(都使用相同的模式)发送到它们自己的主题,我的流应用程序从这些主题中读取数据。然后,我的应用程序进行一些转换,并将结果写入单个主题。
虽然所有的客户都可以写同一个主题,但一个没有偏见的客户也可以代表其他人写。所以我为每个客户创建了单独的主题。问题是,每当有新客户机出现时,我都会创建新主题并用脚本为其设置ACL,但这还不够。我还必须停止我的流应用程序,编辑代码,添加新主题,编译,打包,放到服务器上,然后再次运行!
Kafka Streams支持模式订阅:
builder.stream(Pattern.compile("topic-input-*"));
(我希望语法是正确的;从我的头顶上看不太确定……但重点是,您可以使用采用模式的方法的重载,而不是传入字符串。)
我想从话题的一开始就开始消费。我已经将属性“AUTO\u OFFSET\u RESET\u CONFIG”设置为最早,但不知何故它仍然没有从一开始就读取。 如果我错过了什么,有什么想法吗?我每次都在创造一个新的消费群体。
下面的json数据示例 下面的错误消息 线程“main”org.apache.spark.sql.analysisException中出现异常:未能找到数据源:Kafka。请按照“结构化流+Kafka集成指南”的部署部分部署应用程序。;在org.apache.spark.sql.execution.datasources.datasource$.lookupdatasource(datasourc
我的spring boot项目有一个演示Kafka Streams API的应用程序。我可以使用以下命令使用主题中的所有消息 Kafka Streams API中使用KStream或ktable使用消息的类似命令是什么?我试过了 两者都不起作用。我确实创建了一个测试用例,用而不是流来使用,但它不起作用。代码上传到Github以供参考。任何帮助都会很好。
我的目标是使用Flink KafkaSource阅读来自Kafka主题的所有消息。我尝试用批处理和流模式执行。问题如下:当我将env.setParallelism设置为高于2时,我必须使用包含bug的接收器。于是,我设置了例如:< code > streamexecutionenvironment . setparallelism(1); 我想使用的Kafka主题包含3个分区。这是我的代码片段:
嗨,我正在阅读kafka主题,我想处理从kafka接收到的数据,例如tockenize,过滤掉不必要的数据,删除停用词,最后我想写回另一个kafka主题 然后我得到以下错误 线程"main"中的异常org.apache.spark.sql.Analysis Exception:具有流源的查询必须使用WriteStream.start()执行; 然后,我对代码进行了如下编辑,以从Kafka中读取并写
我有一个Kafka集群(版本:0.10.1.0),有9个代理和10个分区。 我尝试使用camel kafka从java应用程序中获取消息。这是我的pom。xml 这只是我使用的与骆驼Kafka相关的依赖项。下面是骆驼Kafka消费者代码。 我正在使用文档中指定的KafkaURIhttps://camel.apache.org/components/latest/kafka-component.ht