当前位置: 首页 > 知识库问答 >
问题:

当连接使用Avro模式定义的两个Kafka流时,如何编写ValueJoiner?

曾弘扬
2023-03-14

我正在构建一个电子商务应用程序,我目前正在处理两个数据馈送:订单执行和销售中断。由于各种原因,销售失败将是无效的执行。失败的销售将具有与订单相同的订单编号,因此连接位于订单编号和行项目编号上。

目前,我有两个主题-订单,和破损。两者都是使用Avro模式定义的,并使用SpecificRecord构建的。键是ordereferencenumber

订单的字段:订单编号,时间戳,订单行,项目编号,数量

坏的的字段:Order参考号,OrderLine,时间戳

通过运行

mvn clean package

我需要将订单断开的左连接,并在输出中包括以下字段:订单参考编号、时间戳、断开的完整性标记、订单行、项目编号、数量

这是我的密码:

public static void main(String[] args) {
    // Declare variables
    final Map<String, String> avroSerdeConfig = Collections.singletonMap(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");

    // Add Kafka Streams Properties
    Properties streamsProperties = new Properties();
    streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "orderProcessor");
    streamsProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    streamsProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
    streamsProperties.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "localhost:8081");

    // Specify Kafka Topic Names
    String orderTopic = "com.ecomapp.input.OrderExecuted";
    String brokenTopic = "com.ecomapp.input.BrokenSale";

    // Specify Serializer-Deserializer or Serdes for each Message Type
    Serdes.StringSerde stringSerde = new Serdes.StringSerde();
    Serdes.LongSerde longSerde = new Serdes.LongSerde();
    // For the Order Executed Message
    SpecificAvroSerde<OrderExecuted> ordersSpecificAvroSerde = new SpecificAvroSerde<OrderExecuted>();
    ordersSpecificAvroSerde.configure(avroSerdeConfig, false);
    // For the Broken Sale Message
    SpecificAvroSerde<BrokenSale> brokenSpecificAvroSerde = new SpecificAvroSerde<BrokenSale>();
    brokenSpecificAvroSerde.configure(avroSerdeConfig, false);


    StreamsBuilder streamBuilder = new StreamsBuilder();

    KStream<String, OrderExecuted> orders = streamBuilder
            .stream(orderTopic, Consumed.with(stringSerde, ordersSpecificAvroSerde))
            .selectKey((key, orderExec) -> orderExec.getMatchNumber().toString());
    KStream<String, BrokenSale> broken = streamBuilder
            .stream(brokenTopic, Consumed.with(stringSerde, brokenSpecificAvroSerde))
            .selectKey((key, brokenS) -> brokenS.getMatchNumber().toString());

    KStream<String, JoinOrdersExecutedNonBroken> joinOrdersNonBroken = orders
        .leftJoin(broken,
                (orderExec, brokenS) -> JoinOrdersExecutedNonBroken.newBuilder()
                        .setOrderReferenceNumber((Long) orderExec.get("OrderReferenceNumber"))
                        .setTimestamp((Long) orderExec.get("Timestamp"))
                        .setBrokenSaleTimestamp((Long) brokenS.get("Timestamp"))
                        .setOrderLine((Long) orderExec.get("OrderLine"))
                        .setItemNumber((String) orderExec.get("ItemNumber"))
                        .setQuantity((Long) orderExec.get("Quantity"))
                        .build(),
                JoinWindows.of(TimeUnit.MILLISECONDS.toMillis(1))
                Joined.with(stringSerde, ordersSpecificAvroSerde, brokenSpecificAvroSerde))
        .peek((key, value) -> System.out.println("key = " + key + ", value = " + value));


    KafkaStreams orderStreams = new KafkaStreams(streamBuilder.build(), streamsProperties);
    orderStreams.start();

    // print the topology
    System.out.println(orderStreams.localThreadsMetadata());

    // shutdown hook to correctly close the streams application
    Runtime.getRuntime().addShutdownHook(new Thread(orderStreams::close));

}

当我运行此命令时,会出现以下maven编译错误:

[ERROR] /Tech/Projects/jCom/src/main/java/com/ecomapp/kafka/orderProcessor.java:[96,26] incompatible types: cannot infer type-variable(s) VO,VR,K,V,VO
    (argument mismatch; org.apache.kafka.streams.kstream.Joined<K,V,com.ecomapp.input.BrokenSale> cannot be converted to org.apache.kafka.streams.kstream.Joined<java.lang.String,com.ecomapp.OrderExecuted,com.ecomapp.input.BrokenSale>)

问题实际上在于定义我的ValueJoiner。当涉及Avro模式时,Confluent文档并不十分清楚如何做到这一点(我也找不到示例)。正确的定义是什么?

共有1个答案

车子平
2023-03-14

不确定为什么Java不能解析类型。

尝试:

Joined.<String,OrderExecuted,BrokenSale>with(stringSerde, ordersSpecificAvroSerde, brokenSpecificAvroSerde))

显式指定类型。

 类似资料:
  • 我正在使用从oracle db获取数据,并按下(两个键 我有一个Kafka流收听这个主题,并有avro Genericrecord。当我启动流时,我开始得到<code>ClassCastException:java.lang.Long不能强制转换为org.apache.avro.generic。GenericRecordconnect生成的架构具有数据类型为“long”的字段 有人对如何解决这个问

  • 我希望使用提供的Avro模式而不是Spark自动生成的模式来编写Avro格式的DataFrame。如何告诉Spark在写时使用自定义模式?

  • 我正在学习kafka connect的教程,我想知道是否有可能接收一些类的类型的消息。 教程:https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-1// 基于avro格式,我用Maven生成了一个类。 然后我用我的类型定义了消费者工厂: 和KafkaList

  • 下面的代码片段是从JoinedStreams的javadoc复制的 这两个流仅基于一个键(通过< code>t =计算)进行连接 我会问我如何基于多个键进行连接,例如,one.a = two.a和

  • 我试图构建一个系统,从Kafka读取json数据(无模式),将其转换为avro并将其推送到s3。 我已经能够使用kstream和KSQL实现json到avro的转换。我想知道使用Kafka Connect的自定义转换是否可以实现同样的效果。 这是我迄今为止所尝试的: 其中avro_schema是avsc文件中指定的架构名称。 我不确定这是否是正确的方法,但我面临的问题是,当调用newRecord(

  • 我们正在使用Flink表API在Flink应用程序中使用一个Kafka主题。 当我们第一次提交应用程序时,我们首先从我们的自定义注册表中读取最新的模式。然后使用Avro模式创建一个Kafka数据流和表。我的数据序列化器的实现与汇合模式注册表的工作方式类似,方法是检查模式ID,然后使用注册表。因此我们可以在运行时应用正确的模式。