我在一个Kafka主题“原始数据”中获取CSV,目标是通过在另一个主题“数据”中发送具有正确时间戳(每行不同)的每行来转换它们。
我想通过直接设置时间戳来删除这个“内部”主题的使用,但我找不到一个方法(时间戳提取器只在消耗时间使用)。
我在文档中偶然发现了这一行:
请注意,通过调用#forward()时显式地为输出记录分配时间戳,可以在处理器API中更改description默认行为。
2018-01-01,hello 2018-01-02,world
(这是一条消息,而不是两条消息)
我想在另一个主题中获得两条消息,其中Kafka记录的时间戳设置为他们的事件时间(2018-01-01和2018-01-02),而不需要中间主题。
为输出设置时间戳需要Kafka Streams2.0并且仅在处理器API中支持。如果使用DSL,则可以使用transform()
来使用这些API。
正如您指出的,您将使用context.forward()
。这一呼吁将是:
stream.transform(new TransformerSupplier() {
public Transformer get() {
return new Transformer() {
// omit other methods for brevity
// you need to get the `context` from `init()`
public KeyValue transform(K key, V value) {
// some business logic
// you can call #forward() as often as you want
context.forward(newKey, newValue, To.all().withTimestamp(newTimestamp));
return null; // only return data via context#forward()
}
}
}
});
因此,我将替换为,就像在这个文档示例中一样(具有更高的maxOutOfOrderness延迟),以便处理乱序事件,但我仍然无法获得任何输出。这是为什么?
我正在窗户上运行Kafka。我正在尝试设置 SASL/可控性代码身份验证。这是我在设置 SASL/超线程管理时所遵循的链接。在运行 我收到错误 未知动态配置:设置('SCRAM-SHA-256) 有人知道可能是什么问题吗?
, 我想知道如何控制聚合时间窗口,这样它就不会为每个传入的消息吐出一条消息,而是等待并聚合其中的一些消息。想象一下流应用程序使用这些消息: null 我的代码很简单 目前,它有时会批量聚合,但不确定如何调整它:
我有一个关于Kafka流的时间窗的问题,有些概念真的让我困惑。 我们有一个主题每天获得1000万个事件,我们有6天的日志保留,所以总的主题包含6000万个事件。 现在,我创建了一个KTable,我正在执行load all操作并迭代事件。正如我之前提到的,实际上我们只是当前的事件,而不是6千万事件,所以我在KTable定义中窗口化了这些数据。 现在,当我用以下语句加载所有事件时,一切都运行良好。 问
问题内容: 我有一个合并语句,该语句应始终更新或插入单个记录。我想记住该语句在变量中的ID。看起来像这样: 现在这不起作用,因为您不能以这种方式在输出子句中设置@int。我知道我可以创建一个模板并在输出中使用 INTO @temptable 。但是由于我 知道 它始终是一条记录,因此我希望将ID包含在INT变量中。这有可能吗?还是我被迫使用表变量。我希望我只是缺少一些语法。 问题答案: 不,您必须
我试图用SSL (TLS)在节点间以及节点和客户端之间配置Kafka节点,但是遇到了配置问题。Kafka版本是2.3.0。我的相关设置是: 仅供参考,为了简单起见,我从实例化Kafka容器的docker-compose文件中复制了设置。env vars将1:1映射到server.properties.中的属性。在容器启动期间,这些设置将应用于server.properties文件。 当我开始使用此