当前位置: 首页 > 知识库问答 >
问题:

复杂对象KStream GlobalKTable左联接

上官思博
2023-03-14

我对Kafka的溪流很陌生。我想执行以下KStream-GlobalKTable纯基于DSL的左联接操作,而不使用map操作。

{
    "b_obj": {
        "b_value": "xyz",
        "c_list": [
            {
                "d_obj": {
                    "d_id1": "value1",
                    "d_id2": "value2",
                    "d_value": "some value"
                },
                "c_value": "jkl"
            },
            {
                "d_obj": {
                    "d_id1": "value3",
                    "d_id2": "value4",
                    "d_value": "some value 2"
                },
                "c_value": "pqr"
            }
        ]
    },
    "a_value": "abcd"
}

和另一个输入主题e.topic,它是 ,其中value:

{
    "e_id1": "value1",
    "e_id2": "value2",
    "e_value": "some value"
}

我要执行左联接操作.topic是一个流,主数据.topic是一个全局表,以实现结果值为

{
    "b_obj": {
        "b_value": "xyz",
        "c_list": [
            {
                "d_obj": {
                    "d_id1": "value1",
                    "d_id2": "value2",
                    "d_value": "some value"
                },
                "e_obj": {
                    "e_id1": "value1",
                    "e_id2": "value2",
                    "e_value": "some value a"
                },
                "c_value": "jkl"
            },
            {
                "d_obj": {
                    "d_id1": "value3",
                    "d_id2": "value4",
                    "d_value": "some value 2"
                },
                "e_obj": {
                    "e_id1": "value3",
                    "e_id2": "value4",
                    "e_value": "some value b"
                },
                "c_value": "pqr"
            }
        ]
    },
    "a_value": "abcd"
}

连接条件为a.b.c[i].d.d_id1==e.e_id1&&a.b.c[i].d.d_id2==e.e_id2

代码

public class ComplexBeanStream {

    public static void main(String[] args) {

        Serde<A> aSerde = new JsonSerde<>(A.class);

        Serde<E> eSerde = new JsonSerde<>(E.class);

        final Properties streamsConfiguration = new Properties();
        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "complex-bean-app");
        streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "complex-bean-client");
        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        StreamsBuilder builder = new StreamsBuilder();

        final GlobalKTable<String, E> eGlobalTable =
                builder.globalTable(
                        "e.topic",
                        Materialized.<String, E, KeyValueStore<Bytes, byte[]>>
                                as("E-STORE")
                                .withKeySerde(Serdes.String())
                                .withValueSerde(eSerde)
                );

        final KStream<String, A> aStream =
                builder.stream(
                        "a.topic",
                        Consumed.with(Serdes.String(), aSerde));

        // perform left-join here

        Topology topology = builder.build();

        System.out.println("\n\nComplexBeanStream Topology: \n" + topology.describe());

        final KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration);

        streams.cleanUp();

        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }

}

class A {

    private B b_obj;

    private String a_value;
}

class B {

    private List<C> c_list;

    private String b_value;
}

class C {

    private D d_obj;

    private E e_obj;

    private String c_value;
}

class D {

    private String d_id1;

    private String d_id2;

    private String d_value;
}

class E {

    private String e_id1;

    private String e_id2;

    private String e_value;
}

共有1个答案

焦学海
2023-03-14

在您的情况下,如果连接查找是基于值而不是键,则不可能在没有映射的情况下进行连接。

Kafka流只能基于两边的相同键来连接。这意味着,您应该为连接的两侧映射并选择新的键(重新键控),以实现a.b.c[i].d.d_id1==e.e_id1&&a.b.c[i].d.d_id2==e.e_id2

在这种情况下,一方可以使用[a.b.c[i].d.d_id1,a.b.c[i].d.d_id2]作为其密钥,另一方则使用[e.e_id1,e.e_id2]。如果有匹配项,则可以将这些值连接到一个新对象。您可能应该在重新键控之前对c_list进行平面映射。

阅读Kafaka的加入也很有帮助。

 类似资料:
  • 我试图从一个相当复杂的Java对象生成一个CSV文件。该对象是一个会话,具有一些属性,字符串和消息的列表,这些字符串和消息又具有一些属性,还有一个注释的列表,这些注释具有一些属性。 session类如下所示; 消息类如下所示; 事实上,开始认为(单一的)CSV可能不是解决这个问题的最佳方法。

  • 我正在尝试用多个过滤器获取帖子,我的数据库如下所示, 我尝试了上面的SQL,它可以工作,但问题是,如果我嵌套了两个以上的或条件和不同数量的过滤器,我会得到错误的结果。

  • 问题内容: 这个问题已经在这里有了答案 : SQL中左右联结与左右联结之间的区别[重复] (4个答案) 6年前关闭。 我看到过称为LEFT OUTER JOIN或RIGHT OUTER JOIN的联接。在某些地方,我见过LEFT JOIN或RIGHT JOIN。我对此感到困惑。 我两天前发布了一个问题,但我无法理解解决方案提供的链接。 这些连接类型是否相同,或者两者之间有区别? 问题答案: 两者之

  • 问题内容: 我有以下情况: 在我的Query.xml中,我以这种方式编写了插入内容: 阅读mybatis结果地图指南后,我尝试在mybatis-config.xml文件中添加以下行: 但我一直收到以下错误: 谁能告诉我该如何设置? 问题答案: 中的属性需要引用结果映射的名称,而不是Java类型: 但是,如果作为单独的表存储在数据库中,则不支持嵌套插入。您将需要在Java中调用两个插入。Result

  • 问题内容: 我有一个应用程序,该应用程序使用Jackson将我的复杂对象编组为JSON,从而在DynamoDB中存储一些数据。 例如,我要编组的对象可能如下所示: SomeObject可能看起来像这样: 和SomeOtherObject可能看起来像这样: 可以很好地将对象整理成问题并将其作为JSON字符串存储在数据库中。 当需要从DynamoDB检索数据时,Jackson会自动检索JSON并将其转