当前位置: 首页 > 知识库问答 >
问题:

使用kafka流的Topic1到Topic2

江俊能
2023-03-14

我是一个新的Kafka流,我想阅读一个主题,并在一个新的主题中使用Kafka流API写它的一部分。我的键是string,值是Avro有文档/示例可以使用吗?

编辑:

    final StreamsBuilder builder = new StreamsBuilder();
    final KStream<String, GenericRecord> inputStream = builder.stream("Test_CX_TEST_KAFKA_X");
    final KStream<String, String> newStream = inputStream.mapValues(value -> value.get("ID").toString());
    newStream.to("SUB_TOPIC",Produced.with(Serdes.String(),Serdes.String()));
    final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
    streams.start();

在分题中,我有:

如何在新主题中添加Body中的其他字段?示例:

{“id”:“145”,“timestamp”:1552585938545,“week”:“\u0000”,“source”:{“string”:“tmp”},“body”:{“string”:“{\”operation_type\“:\”insert\“,\”old\“:{\”row_id\“:null,\”last_upd\“:null},\”new\“:{\”row_id\“:\”170309-*********\“,\”last_upd\“:\”2019-03-14t17:52:18\“}}”},“

共有1个答案

司徒焕
2023-03-14

您可以简单地将主题作为流使用,并使用.map()/.mapvalues()函数修改Value/KeyValues。

例如:假设您想从avro记录中选择一个列并发布到新的输出主题。

// If you are using Schema registry, make sure to add the schema registry url 
// in streamConfiguration. Also specify the AvroSerde for VALUE_SERDE

final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, GenericRecord> inputStream = builder.stream("inputTopic");
final KStream<String, String> newStream = userProfiles.mapValues(value -> value.get("fieldName").toString());
subStream.to("outputTopic",Produced.with(Serdes.String(),Serdes.String());
final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);

此外,您还可以查看github上的示例:
https://github.com/confluentinc/kafka-streams-examples/blob/5.1.2-post/src/main/java/io/confluent/examples/streams/wikipedaFeedavroexample.java

 类似资料:
  • 我试着把我的头缠绕在Kafka的溪流和一些根本的问题,我似乎无法解决,我自己。我理解和Kafka状态存储的概念,但我很难决定如何实现它。我还在使用Spring Cloud Streams,这在此基础上增加了另一个层次的复杂性。 我的用例: 一些有状态规则如下所示: 我当前的实现将所有这些信息存储在内存中,以便能够执行分析。由于显而易见的原因,它不容易扩展。所以我想我会坚持到Kafka的州立商店。

  • 我有一个架构,我们有两个独立的应用程序。原始源是一个sql数据库。App1监听CDC表以跟踪对该数据库中表的更改,对这些更改进行规范化和序列化。它将这些序列化的消息发送到Kafka主题。App2监听该主题,将消息调整为不同的格式,并通过HTTP将调整后的消息发送到各自的目的地。 所以我们的流媒体架构看起来像: SQL(CDC事件)- 我们希望在失败的情况下添加错误处理,并且不能容忍重复事件、丢失事

  • 我的流服务执行的操作很少: 在进行测试时,我发现我的服务在调用函数后中断了,该函数将把我的数据写入由Kafka Streams将KTable转换为Kafka Streams创建的新主题。 我检查了KStreams创建的主题,主题就在那里: 我发现有三个输入,即,我不知道第三个输入是什么: 为了确保所有内容都被覆盖,这里是我的配置: 我的问题是,我们的部署正在工作,突然所有的东西都开始出现这个错误:

  • 我尝试使用Kafka流将一个带有String/JSON消息的主题转换为另一个作为Avro消息的主题。 并得到如下所示的异常: 这是正确的做法吗?我对Kafka溪流和阿夫罗是新来的

  • 我想加入一个 kstream:从主题创建,该主题具有JSON值。我使用值中的两个属性来重新键控流。示例值(json的片段)。我创建了自定义pojo类并使用自定义SERDES。 键映射为: 我查看了KStream并打印了键和我使用的属性。看起来都很好。 null 现在,当我执行内部连接并对主题进行窥视或通过/时,我看到键和值不匹配。Join似乎不起作用, 我有完全相同的东西通过ksql工作,但想做我

  • 在文件中,我将作为默认值Serde,然后使用使用字符串值。 当我将以下流的配置作为值的默认值时,我看到Avro流(第一个)运行良好,并使用我在该主题上发布的内容。但是当我使用相同的配置发布到字符串值流时,会出现异常。 以下是发布topicTwo和TopicTrey的例外: