当前位置: 首页 > 知识库问答 >
问题:

Kafka流聚合删除和/或密钥更改

孙洋
2023-03-14
        ...
        KStream<String, Employee> stream = kStreamBuilder.stream("EMPLOYEE"); // Stream from raw EMPLOYEE
        stream.map((k, v) -> new KeyValue<>(k, transformEmployee(v))) // <-- some stateless enrichment of the employee
                .groupBy((k, emp) -> emp.getDepartmentId(), jsonSerialisedWith(Employee.class))

                // dummy reduce to a get a ktable for agg:
                .reduce((aggValue, newEmp) -> newEmp) 
                .groupBy((k, emp2) -> new KeyValue<>(emp2.getDepartmentId(), emp2), jsonSerialisedWith(Employee.class))

                .aggregate(Department::new, this::addEmployee, this::removeEmployee,
                           jsonValueMaterializedAs("DEPARTMENT-AGG", Department.class))
                .toStream()
                .to("DEPARTMENT", jsonProducedWith(Department.class));
        ...

    private Department addEmployee(String deptId, Employee employee, Department department) {
        department.addEmployee(employee);
        if (department.getId() == null) {
            department.setId(employee.getDepartmentId());
            department.setName(employee.getDepartmentName());
        }
        return department;
    }

这适用于添加或更新。但是,随着时间的推移,员工可能会被删除或重新分配到另一个部门。我认为删除应该是发送到EMPLOYEE主题的tombstone记录(k:empid,v:null)。但是,我不再拥有departmentId,我必须进行空检查(并为departmentId返回空),因此删除员工时不会发生removeEmployee。DepartmentID的更改也有类似问题。

那么,Kafka的方法是什么呢?

共有1个答案

朱兴安
2023-03-14

我认为使用您的代码就足够了,但是稍微改变一下删除该员工的语义。

您应该添加某种mock部门(将在用户从部门中删除时使用)。

如果删除了employee,则将department设置为null,则应将其分配给mockdepartment。

 类似资料:
  • 我有一个KTable,数据如下所示(key=>value),其中keys是客户ID,而value是包含一些客户数据的小型JSON对象: 我想对这个KTable做一些聚合,基本上保留每个的记录数。所需的KTable数据如下所示: 假设属于上面的组,她的生日使她进入了新的年龄组。支持第一个KTable的状态存储现在应该如下所示: 我希望得到的聚合KTable结果反映这一点。例如。 我可能过度概括了这里

  • 我想连接两个主题流(左连接),并在连接的流上进行基于窗口的聚合。然而,聚合将某些消息计数两倍,因为在连接期间,根据正确主题中的延迟,某些消息将发出两倍。以下是POC的代码。 它是否可以修复以避免因连接而重复?

  • 我有两个对象 当我与以下对象合并时: 我得到了完整的目标2,但它没有error_to_update_profile钥匙。 如何维护第一个对象的键,而第二个对象没有?

  • 我有一个KStream,其中包含从主题到1的数据,如下所示: 和KTable,构造如下: 稍后,主题To2中出现以下消息: 现在,我希望我的KTable能够反映这些变化,并且看起来像这样: 但看起来是这样的: 我想我缩小了范围:显然聚合的只在第一次调用--之后聚合总是接收作为最后一个参数,例如。 其中,在第一次调用(通过初始值设定项创建)时为,但在第二次调用时为。 有什么想法吗? 编辑2 编辑3

  • 我有一个密钥库,名为keystore。jks和意外添加了两个键。我必须使用键2制作一个签名的apk。因为我已经使用key2上传了一个apk,我想从keystore中删除Key1。jks。 签名的apk是使用Key1生成的,但我需要使用key2生成。 请帮帮我。提前谢谢。

  • 是否可以将列作为分区和聚类键?例如, 创建表citylist2(城市varchar,loc list,pop int,zip varchar,state varchar,primary key(city,city,zip)),使用集群顺序BY(城市ASC,zip DESC);