当前位置: 首页 > 知识库问答 >
问题:

使用kafka streams创建包含多个聚合的新KStream

西门建安
2023-03-14

我正在发送JSON消息,其中包含有关web服务请求和对Kafka主题的响应的详细信息。我想在每条消息到达Kafka时使用Kafka流进行处理,并将结果作为持续更新的摘要(JSON消息)发送到客户端连接的websocket。

然后,客户端将解析JSON,并在网页上显示各种计数/摘要。

示例输入消息如下所示

{
  "reqrespid":"048df165-71c2-429c-9466-365ad057eacd",
  "reqDate":"30-Aug-2017",
  "dId":"B198693",
  "resp_UID":"N",
  "resp_errorcode":"T0001",
  "resp_errormsg":"Unable to retrieve id details. DB Procedure error",
  "timeTaken":11,
  "timeTakenStr":"[0 minutes], [0 seconds], [11 milli-seconds]",
  "invocation_result":"T"
}

{
  "reqrespid":"f449af2d-1f8e-46bd-bfda-1fe0feea7140",
  "reqDate":"30-Aug-2017",
  "dId":"G335887",
  "resp_UID":"Y",
  "resp_errorcode":"N/A",
  "resp_errormsg":"N/A",
  "timeTaken":23,
  "timeTakenStr":"[0 minutes], [0 seconds], [23 milli-seconds]",
  "invocation_result":"S"
}

{
  "reqrespid":"e71b802d-e78b-4dcd-b100-fb5f542ea2e2",
  "reqDate":"30-Aug-2017",
  "dId":"X205014",
  "resp_UID":"Y",
  "resp_errorcode":"N/A",
  "resp_errormsg":"N/A",
  "timeTaken":18,
  "timeTakenStr":"[0 minutes], [0 seconds], [18 milli-seconds]",
  "invocation_result":"S"
}

随着信息流进入Kafka,我希望能够实时计算

**

  • 请求总数,即所有请求的计数
  • invocation_result等于'S'的请求总数
  • invocation_result不等于'S'的请求总数
  • invocation_result等于S和UID等于Y的请求总数
  • invocation_result等于S和UID等于Y的请求总数
  • 最小花费时间,即min(timeTake)
  • 花费的最大时间,即max(time)
  • 平均花费时间,即平均花费时间

**

并将它们写入一个KStream,其中new key设置为reqdate值,new value设置为一个JSON消息,该消息包含如下所示的计算值,使用前面显示的3条消息

{
  "total_cnt":3, "num_succ":2, "num_fail":1, "num_succ_data":2, 
  "num_succ_nodata":0, "num_fail_biz":0, "num_fail_tech":1,
  "min_timeTaken":11, "max_timeTaken":23, "avg_timeTaken":17.3
}

我是Kafka溪的新手。我如何在一个或一系列不同的步骤中通过不同的列进行多次计数?Apache flink或方解石更合适吗?因为我对KTable的理解表明,您只能有一个键,例如2017年8月30日,然后是一个单列值,例如计数,比如3。我需要一个带有一个键和多个计数值的结果表结构。

非常感谢所有的帮助。

共有1个答案

燕建中
2023-03-14

你可以做一个复杂的聚合步骤,一次计算所有这些。我只是概述一下这个想法:

class AggResult {
    long total_cnt = 0;
    long num_succ = 0;
    // and many more
}

stream.groupBy(...).aggregate(
    new Initializer<AggResult>() {
        public AggResult apply() {
            return new AggResult();
        }
    },
    new Aggregator<KeyType, JSON, AggResult> {
        AggResult apply(KeyType key, JSON value, AggResult aggregate) {
            ++aggregate.total_cnt;
            if (value.get("success").equals("true")) {
                ++aggregate.num_succ;
            }
            // add more conditions to get all the other aggregate results
            return aggregate;
        }
    },
    // other parameters omitted for brevity
)
.to("result-topic");
 类似资料:
  • 我需要创建一个fiRecovery文档,它也有一个集合,理想情况下是在单个写入操作中。 我在文档中没有看到类似的内容,因此,如果失败了,那么关于获取创建的文档id然后将多个文档添加到集合中有什么提示吗? 编辑:我正在用typescript/js开发

  • 我有一个充满json对象的表。只有一列“数据”。 我想将这个表转换成一个包含json对象中所有键的多列的表。 例如, 现在我的桌子是这样的: 但我希望它是这样的: 我不想以这种方式查询它,我想以我上面展示的格式完全创建一个新表。 我可以试试跑步: 但由于我的json对象有数百列,这可能效率低下。有没有更有效的方法?感谢您的帮助或建议。

  • 问题内容: 我希望有人可以在这里帮助我的语法。我有两个桌子, 该表具有与中的列匹配的列。 但是,有时可以在不同的服务日期(第列)中显示两次。 我正在尝试将表中的列更新为其中in table = in table中的最大值 它仍然失败: 消息157,级别15,状态1,行1聚合可能不会出现在UPDATE语句的设置列表中。 我尝试做其他一些事情,例如声明ID,但无法正常工作。 问题答案: 使用相关子查询

  • 本文向大家介绍dart 创建一个新的集合,包括了dart 创建一个新的集合的使用技巧和注意事项,需要的朋友参考一下 示例 可以通过构造函数创建集合:            

  • 我有索引,其中每个文档都有这样的结构: 我需要计算每个演员对应的电影数量(演员可以在actor_1_name、actor_2_name或actor_3_name字段中) 这3个字段的映射是: 有没有一种方法,我可以聚合的结果,可以结合所有3个演员领域的条款,并给出一个单一的聚合。 目前,我正在为每个actor字段创建单独的聚合,并通过我的JAVA代码将这些不同的聚合合并成一个。 通过创建不同的聚合

  • 我有以下代码: 在Bouml中,我从这段代码中生成了类图: 图表 我认为 A 和 F 之间的 (https://softwareengineering.stackexchange.com/questions/255973/c-association-aggregation-and-composition) 关系必须产生聚合关系,但它产生了关联关系。如何建立聚合关系?