当前位置: 首页 > 知识库问答 >
问题:

Kafka Streams如何获取kafka标头

徐知
2023-03-14

我有下面的Kafka流代码

    public class KafkaStreamHandler implements  Processor<String, String>{

    private ProcessorContext context;


        @Override
    public void init(ProcessorContext context) {
        // TODO Auto-generated method stub
        this.context = context;
    }

    public KeyValue<String, KafkaStatusRecordWrapper> process(String key, String value) {

        Headers contexts = context.headers();

        contexts.forEach(header -> System.out.println(header));
     }

public void StartFailstreamHandler() {
       StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> userStream = builder.stream("usertopic",Consumed.with(Serdes.String(), Serdes.String()));
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "failed-streams-userstream");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "ALL my bootstrap servers);
        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "500");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        //consumer_timeout_ms
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 2000);

        props.put("state.dir","/tmp/kafka/stat));

     userStream.peek((key,value)->System.out.println("key :"+key+" value :"+value));

     /* take few descsion based on Header */
     /* How to get the Header */ 

       userStream.map(this::process);
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props);


kafkaStreams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
            @Override
            public void uncaughtException(Thread t, Throwable e) {

                logger.error("Thread Name :" + t.getName() + " Error while processing:", e);
            }
        });


        kafkaStreams.cleanUp();
        kafkaStreams.start();
    }

    }

现在我们的一个客户端正在发送关于kafka标头的版本信息,如下所示。

ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("topic", 1, "message");
record.headers().add(new RecordHeader("version", "v1".getBytes()));
producer.send(record);

基于这个标题,我需要为我的消息选择解析器,如何使用KStream操作符读取这个标题?我看过流的所有API,但没有方法给出头

我不能改成普通的kakfa消费者,因为我的应用程序已经依赖于少数KStream API。。

共有2个答案

潘英豪
2023-03-14

我们可以从上下文中获取标题

userStream.to { key, value, recordContext ->
            recordContext.headers()
            destinationTopic
}
司徒胤
2023-03-14

处理器不允许您在下游DSL中链接新的运营商,您应该使用transformValues,以便use可以继续使用流DSL:

  1. 首先从ValueTransformerSusKey内部提取标头
public class ExtractHeaderThenDoSomethingTransformer implements ValueTransformerWithKey<String, String, String> {

    ProcessorContext context;

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
    }

    @Override
    public String transform(String readOnlyKey, String value) {
        Headers headers = context.headers();
        /* take few descsion based on Header: if you want to filter base on then just return null then chaining another filter operator after transformValues*/
        /* How to get the Header */
        return value;
    }

    @Override
    public void close() {

    }
}
userStream
        .transformValues(ExtractHeaderThenDoSomethingTransformer::new)
        .map(this::processs);
 类似资料:
  • 问题内容: 这是要求 我想得到这个。 我试过了 它记录。我如何获得价值,为什么会有价值? 问题答案: “ [userId:null]”通常是DOM元素的“ toString”打印输出。如果您执行类似操作,则很有可能 您将看到它是somesort的DOM元素子类。因此,类似: 可能会打印文本节点。

  • 我是Vert.x的新手,对Kafka来说相对较新。 如何设置我的Vert.x KafkaProducer来导出Prometheus指标? 目前,我可以启用Prometheus度量,如<code>vertx_http_server_request_bytes_max</code>并通过Web服务器查看它们: 在使用Vert. x之前,我可以使用Apache KafkaProducer并绑定Kafka

  • 问题内容: 我使用以下代码将单元格渲染更改为显示图像而不是文本: 现在,我希望能够获取中的每一行的图像,以便将其保存在数据库中。我该怎么办? 问题答案: 我无法抗拒这样的例子

  • 我想得到openweathermap图标。https://openweathermap.org/weather-conditions和http://samples.openweathermap.org/data/2.5/weather?q=London,英国 非常感谢你。你的建议对我很重要

  • 我不知道为什么我不能从头AUTHORIZATION中获得值,因为我在Postman(从服务器返回)中看到。 http://img110.xooimage.com/files/1/6/9/postman-567005e.png 我尝试了很多方法,但不知道为什么仍然得到空值。 http://img110.xooimage.com/files/b/c/f/debug-5670075.png 这是我的代码

  • 我的场景是我使用make很多共享前缀(例如house.door,house.room)的Kafka主题,并使用Kafka stream regex主题模式API消费所有主题。一切看起来都很好,我得到了数据的密钥和信息。 为了处理数据,我需要主题名,这样我就可以根据主题名进行连接,但我不知道如何在Kafka stream DSL中获得主题名。