当前位置: 首页 > 知识库问答 >
问题:

Kafka流 API GroupBy 行为

戚逸清
2023-03-14

我是kafka流的新手,我正在尝试使用groupBy函数将一些流数据聚合到KTable中。问题如下:

生成的消息是json msg,格式如下:

{ "current_ts": "2019-12-24 13:16:40.316952",
  "primary_keys": ["ID"],
  "before": null,
  "tokens": {"txid":"3.17.2493", 
             "csn":"64913009"},
  "op_type":"I",
  "after":  { "CODE":"AAAA41",
              "STATUS":"COMPLETED",
              "ID":24},
  "op_ts":"2019-12-24 13:16:40.316941",
  "table":"S_ORDER"} 

我想隔离json字段“after”,然后用“key”=“ID”创建一个KTable,并对整个json赋值“after”。

首先,我创建了一个KStream来隔离“after”JSON,它工作得很好。

KStream代码块:(不要注意if语句,因为“before”和“after”的格式相同。)

KStream<String, String> s_order_list = s_order
                .mapValues(value -> {
                    String time;
                    JSONObject json = new JSONObject(value);
                    if (json.getString("op_type").equals("I")) {
                        time = "after";
                    }else {
                        time = "before";
                    }
                    JSONObject json2 = new JSONObject(json.getJSONObject(time).toString());
                    return json2.toString();
                });

按预期,输出如下:

...
null {"CODE":"AAAA48","STATUS":"SUBMITTED","ID":6}
null {"CODE":"AAAA16","STATUS":"COMPLETED","ID":1}
null {"CODE":"AAAA3","STATUS":"SUBMITTED","ID":25}
null {"CODE":"AAAA29","STATUS":"SUBMITTED","ID":23}
...

之后,我实现了一个KTable to groupBy json的“ID”。

K表代码块:

  KTable<String, String> s_table = s_order_list
                .groupBy((key, value) -> {
                    JSONObject json = new JSONObject(value);
                    return json.getString("ID");
                });

并且有一个错误,我想创建KTable

Required type: KTable<String,String>
Provided:KGroupedStream<Object,String>
no instance(s) of type variable(s) KR exist so that KGroupedStream<KR, String> conforms to KTable<String, String>

总之,问题是KGroupedStreams到底是什么,以及如何正确实现KTable?


共有1个答案

卢骏俊
2023-03-14

在groupBy处理器之后,您可以使用有状态处理器,如聚合或reduce(处理器返回KTable)。你可以这样做:

KGroupedStream<String, String> s_table = s_order_list
                     .groupBy((key, value) ->
                         new JSONObject(value).getString("ID"),
                         Grouped.with(
                                 Serdes.String(),
                                 Serdes.String())
                     );

KTable<String, StringAggregate> aggregateStrings = s_table.aggregate(
                     (StringAggregate::new),
                     (key, value, aggregate) -> aggregate.addElement(value));

StringAggregate类似于:

public class StringAggregate {

    private static List<String> elements = new ArrayList<>();

    public StringAggregate addElement(String element){
        elements.add(element);
        return this;
    }
    //other methods
}
 类似资料:
  • 我知道在你的流中的任何时间点都可能发生再平衡。当它发生时,由于没有提交给定偏移量的最新偏移量,可能会发生事件的重新处理。 Kafka流是否允许在重新平衡发生之前完成任何飞行中处理?我的意思是,你的应用程序正在消耗一个记录(在你的过程方法内部),发生一个再平衡事件。该处理是否立即中止或允许处理方法完成? 一个具体的例子是 最后一次计算是否会在状态存储中结束并转发到接收器主题?因此,这意味着当重新平衡

  • 我需要获取存储中的行数,存储在低级处理器API中维护。我看到,方法“近似数字条目()”可以在此存储中提供键值映射的近似计数。你能澄清一下准确度的%吗,这意味着如果商店里有100行,我们会得到95行作为近似计数吗?或者它有时会低于50行吗?只是想了解影响计数准确性的因素。 注意:假设流应用程序使用单个主题并在单个实例上运行。存储是通过低级处理器API访问的,不确定默认情况下是否应用了任何缓存。提交频

  • 我是spark streaming的新手,我有一个关于其用法的一般性问题。我目前正在实现一个应用程序,它从一个Kafka主题流式传输数据。 使用应用程序只运行一次批处理是一种常见的场景吗,例如,一天结束,收集主题中的所有数据,做一些聚合和转换等等? 这意味着在用spark-submit启动应用程序后,所有这些东西将在一批中执行,然后应用程序将被关闭。或者spark stream build是为了在

  • 我有一个关于kafka流应用程序中的控制流的基本问题。如果有两个源主题 我做了一个非常初步的测试,当记录被消费时,我偷看了一下,然后用一个简单的速溶软件打印了它们被处理的瞬间。现在 这些是主题中记录的开始和结束时间戳 主题B记录在主题A之前提取。Sysout显示主题B中的所有记录。有人能帮助理解这一点吗?我希望在编写具有多个输入源的流式应用程序时使用这种理解。 提前感谢

  • 我用的是Apache Kafka 2.7.0和Spring Cloud Stream Kafka Streams。 在我的Spring Cloud Stream (Kafka Streams)应用程序中,我已经将我的application.yml配置为当输入主题中的消息出现反序列化错误时使用sendToDlq机制: 我启动了我的应用程序,但我看不到这个主题存在。文档指出,如果 DLQ 主题不存在,