当前位置: 首页 > 知识库问答 >
问题:

如何在Kafka流中获取当前的Kafka话题?

卫英悟
2023-03-14

我的场景是我使用make很多共享前缀(例如house.door,house.room)的Kafka主题,并使用Kafka stream regex主题模式API消费所有主题。一切看起来都很好,我得到了数据的密钥和信息。

为了处理数据,我需要主题名,这样我就可以根据主题名进行连接,但我不知道如何在Kafka stream DSL中获得主题名。

共有1个答案

巫马嘉祯
2023-03-14

为了添加到Matthias J.Sax point中,我附上了示例代码,以展示如何实现它。

public static void main(final String[] args) {
    try {
        final Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streamProcessor");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.STATE_DIR_CONFIG, "state-store");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        final KStream<String, String> textLines = streamsBuilder.stream(inputTopicList);
        final KStream<String, String> textLines = builder.stream(inputTopiclist);
        textLines.transform(getTopicDetailsTransformer::new)
        .foreach(new ForeachAction<String, String>() {
            public void apply(String key, String value) {
                System.out.println(key + ": " + value);
            }
        });
        textLines.to(outputTopic);
    } catch (Exception e) {
        System.out.println(e);
    }
}
 private static class getTopicDetailsTransformer implements Transformer<String, String, KeyValue<String, String>> {

        private ProcessorContext context;

        @Override
        public void init(final ProcessorContext context) {
             this.context = context;
        }

        public KeyValue<String, String> transform(final String recordKey, final String recordValue) {

          //here i am returning key as topic name.
          return KeyValue.pair(context.topic(), recordValue);
        }

        @Override
        public void close() {
          // Not needed.
        }

      }
 类似资料:
  • 我是apache kafka的新手。现在,有几天想和动物园管理员一起了解Kafka。我想获取与动物园管理员相关的主题。所以我尝试了以下几点 a:)首先我创建了一个动物园管理员客户端,如下所示: 但是使用Java代码执行时主题是空白集。我不明白这里有什么问题。我的动物园管理员道具如下:字符串zkConnect="127.0.0.1:2181";动物园管理员运行得非常好。 请帮助伙计们。

  • 我在Scala中设置了Spark Kafka Consumer,它接收来自多个主题的消息: 我需要为每个主题的消息(将采用JSON格式)开发相应的操作代码。 我提到了以下问题,但其中的答案对我没有帮助: 从spark中的Kafka消息获取主题 那么,在接收到的DStream上是否有任何方法可用于获取主题名称以及消息以确定应该采取什么行动? 对此任何帮助都将不胜感激。谢谢你。

  • 我正在自己开发一个Kafka接收器连接器。我的反序列化程序是JSONConverter。然而,当有人将错误的JSON数据发送到我的连接器主题时,我希望省略此记录,并将此记录发送到我公司的特定主题。 我的困惑是:我找不到任何API让我得到我的连接的bootstrap.servers.(我知道它在融合的etc目录,但它不是一个好主意,编写硬代码的目录connect-distributed.proper

  • 我有下面的Kafka流代码 现在我们的一个客户端正在发送关于kafka标头的版本信息,如下所示。 基于这个标题,我需要为我的消息选择解析器,如何使用KStream操作符读取这个标题?我看过流的所有API,但没有方法给出头 我不能改成普通的kakfa消费者,因为我的应用程序已经依赖于少数KStream API。。

  • 我正在运行一个简单的Kafka streams应用程序,它将使用Node JS记录的信息带到一个Kafka主题。 还需要注意的是,时间戳只是一个数字,表示自1970年6月以来的秒数。 我使用scala中的Kafka流来使用这些数据。 例如。 然而,我不确定如何将时间戳(我从nodeJS发送的)提取到这个流中。 例如,如果我尝试做这样的事情 这会导致错误“无法解析符号流”。我在想我该怎么解决这个问题

  • 假设有Kafka主题顺序。数据以JSON格式存储: 定义订单的状态(待定-1,已完成-2)。 完成后如何在“已完成”上进行更改? 正如我所知,Kafka主题是不可变的,我不能更改消息JSON,只需创建一个带有更改值的新消息,对吗?