您好,我需要为kafka streams中创建的内部/中间主题指定一个自定义名称。 但是,只有“*重新分区”主题会被重命名,而“聚合”主题不会被重命名。 kafka streams 2.5.0似乎不支持在reduce上命名? 我配置了它,但它不采取它...分组的采取命名 我需要为了能够很好地演示Azkarra流时浏览状态存储查询。 更新:您好,kafka社区,我知道,我使用的命名参数用于配置用于在
我不熟悉反射Java。MyCustomExcpse类实现了DeseriazationExceptionHandler接口和在流配置,我知道类可以提供。但是,有没有办法提供(在配置方法中)与类?你能提供一个示例代码吗? .
我正在尝试使用Java运行我的Kafka Streams应用程序,并以K,V对的形式将合流凭证作为环境变量传递。 但是我面临着一个错误的问题- 下面是我如何设置Kafka属性的- 下面是我如何传递环境变量的- 但是如果我直接在properties对象中硬编码Creds字符串,比如-
例如,-从索引0迭代到索引10。 -但从10到0不起作用,如何使用流API实现?
正在尝试在scala 2.10.4中使用0.10.0 flink版本的流式api。尝试编译此第一个版本时: 我遇到编译时错误: 在数据流的反编译版本中。类,我已将其包括在项目中。有接受此类类型的函数(最后一个): 这里会出什么问题?如果你能提供一些见解,我将不胜感激。提前谢谢你。
我有课 给出一个Person类列表,我根据该类的不同属性进行聚合。对于(如)- 现在我需要得到一个结果,这样我就应该根据国家和城市的组合得到总的“totalcountrytoCityCount”,并且根据国家、城市和宠物的组合得到总的“petCount”。我可以使用groupingBy和summingint分别获得它们 它给出了结果 但我想要的实际结果是:- 令人惊讶地删除了计数
我正在尝试使用 Java 8 中的 Streams API 从集合中检索 n 个唯一的随机元素以进行进一步处理,但是,没有太多运气。 更准确地说,我想要这样的东西: 我想尽可能高效地做这件事。 这可以做到吗? 编辑:我的第二次尝试——虽然不是我的目标: 编辑:第三次尝试(受Holger启发),如果coll.size()很大而n很小的话会去掉很多shuffle的开销:
我正在做我的项目,然后我需要Json数据中的reach an元素看起来像: 因此,我可以用以下方法访问name元素: 那么,如何在java 8中使用stream()到达学校呢? 还有,我调用的< code>getName();
我正在使用SpringBoot和Kafka流,尽管我没有使用SpringCloud流。 我确实使用了并将设置为: 但当我启动应用程序时,我点击了以下NPE: 更新: 我使用以下依赖项: Spring靴2.5.5 千分尺1.7.4 apache kafka streams 3.0.0 感谢您的帮助!谢谢
我正在使用火花流和Kafka,我得到了这个错误。 线程“streaming-start”中的异常java.lang.NosuchMethoderror:scala.predef$.arrowassoc(ljava/lang/object;)ljava/lang/object;在org.apache.spark.streaming.kafka010.directkafkainputdstream$$
你能就解决下列问题的方法给我一个建议吗。我有两个主题,一个是静态内容,另一个是数据流。任务是连接数据,这在正常情况下很容易。我将静态内容理解为GlobalKTable,动态内容理解为KStream,然后简单地将它们连接起来。问题在于查找数据存在于同一主题的多个版本中。“版本”由“validFrom”字段标识。因此,流的数据需要根据其时间戳与相应版本的查找数据连接。有没有办法过滤GlobalKTab
我们有一些消息需要保持序列。我们已经决定将所有消息从一个特定的源发送到一个分区,这样就可以维护消息序列(多个源可以产生到同一个分区,但一个源不能产生到多个分区),并且我们将能够用它们的密钥标识每个源。 现在,我们需要使用这些消息并进行一些处理。我们对已消费的消息执行多个独立操作(例如,将它们存储在数据库中,转发它们等)。现在,我一直在考虑是使用Kafka Streams API还是消费者API来实
我已经编写了一个streams应用程序,用于在由5个代理和10个分区组成的集群上与主题对话。我在这里尝试了多种组合,比如10个应用程序实例(在10台不同的机器上),每个实例有1个流线程,5个实例每个实例有2个线程。但由于某种原因,当我签入kafka manager时,分区和流线程之间的1:1映射没有发生。一些线程正在拾取2个分区,而一些线程没有拾取任何分区。你能帮我做同样的事吗??所有线程都是同一
我有以下情况: 表A和表B使用FK连接 如何丢弃? 一个选项是执行,但在查询的情况下,这仍然是一个问题。 我们尝试使用事件时间戳进行过滤(即使用最新的时间戳保留事件),但时间戳的唯一性无法保证。 最终目标是能够识别最新的聚合,以便我们可以在查询时过滤出中间结果(在Athena/Presto或某些RDBMS中)。
当通过kafka steams应用程序推送批量数据时,我看到它多次记录以下消息。。。