给定以下代码:
KStream<String, Custom> stream =
builder.stream(Serdes.String(), customSerde, "test_in");
stream
.groupByKey(Serdes.String(), customSerde)
.reduce(new CustomReducer(), "reduction_state")
.print(Serdes.String(), customSerde);
在reduce语句之后我需要什么才能看到reduce的结果?如果一个值被推到输入,我不希望看到任何东西。如果推送具有相同键的第二个值,我希望还原器应用(它确实应用了),并且我还希望还原的结果继续到处理管道中的下一个步骤。正如所描述的,我没有在管道的后续步骤中看到任何东西,我不明白为什么。
从Kafka0.10.1.0
起,所有聚合运算符都使用内部去重复缓存来减少结果KTable changelog流的负载。例如,如果您对两个具有相同键的记录进行计数和处理,则完整的changelog流将为
。
使用新的缓存功能,缓存将接收
并存储它,但不会立即向下游发送它。计算
时,它将替换缓存的第一个条目。根据缓存大小、不同键数、吞吐量和提交间隔,缓存向下游发送条目。这种情况要么发生在单个键项的缓存驱逐时,要么发生在缓存的完全刷新时(向下游发送所有项)。因此,KTable变更日志可能只显示
(因为
已删除重复)。
您可以通过流配置参数streamconfig.cache_max_bytes_buffering_config
控制缓存的大小。如果将该值设置为零,则完全禁用缓存,KTable更改日志将包含所有更新(有效地提供pre0.10.1.0
行为)。
Confluent文档中有一节更详细地解释了缓存:
问题内容: 我有以下代码: 以及其他各种方法,例如@ Before,@ After,@ Test或@AfterClass方法。 测试在启动时不会像看起来的那样失败。有谁可以帮助我吗? 我有JUnit 4.5 该方法无法立即调用注释为@before的setUp()。类def是: 问题答案: 不要扩展TestCase并同时使用注释! 如果需要使用批注创建测试套件,请使用RunWith批注,例如: (按
问题内容: 为什么?将迭代器项指针移到第一位置的最佳方法是什么? 问题答案: 为什么? 因为如果强制迭代器具有重置方法,则 每个 迭代器都必须具有重置方法。这给每个迭代器编写者额外的工作。再加上一些迭代器确实很难重置(或者真的很昂贵),并且您不希望用户对其调用reset。文件或流上的迭代器就是很好的例子。 将迭代器项指针移到第一位置的最佳方法是什么? 创建一个新的迭代器。它很少比重置昂贵。
问题内容: 我有一个可加载的内核模块,其初始化如下 我还启用了正在使用的内核版本上启用的动态调试-ie 。 在模块的Makefile中,我在其中添加了一行,即文件名。 现在,我在执行此模块的insmod后检查了一下,在其中发现了以下几行 即使做了所有这些,令我失望的是,在dmesg的输出中找不到上述两个pr_debug语句。那我想念什么或做错什么呢? 问题答案: 假设是模块源文件,请将以下内容添加
我的中的片段: 当我运行时,我看不到这两个命令的任何输出,即使它们没有被缓存。留档说默认情况下是冗长的。为什么我看不到命令的输出?我以前见过它们。 构建时的输出: 建筑完工后我看到的输出: 是从基于Debian 9的node: 12.18.0创建的。 Docker版本19.03.13,内部版本4484c46d9d。
问题内容: 我知道静态方法在类级别。因此,我知道我不需要创建实例来调用静态方法。但我也知道我可以将静态方法(如LIKE)称为实例方法。这是我感到困惑的地方,因为我期望从null对象调用静态方法(就像在调用实例方法中一样)。我真的很感谢一些解释,为什么我错了一个期望。 这是示例代码: 问题答案: 通过实例调用静态方法不需要实例存在。只要编译器能够确定变量的类型,它就可以在评估表达式并丢弃结果后静态进
问题内容: 我有一个发出异步请求的函数。我如何从中返回响应/结果? 我尝试从回调中返回值,以及将结果分配给函数内部的局部变量并返回该局部变量,但这些方法均未真正返回响应(它们都返回或变量的初始值为任意值) 。 使用jQuery函数的示例: 使用node.js的示例: 使用承诺块的示例: 问题答案: →有关使用不同示例的异步行为的更一般说明,请参见 在函数内部修改变量后 , 为什么变量未更改?-异步