当前位置: 首页 > 知识库问答 >
问题:

Kafka Streams API使用与Streams配置不同的消费者引导服务器

公孙辰龙
2023-03-14
    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "otherhost:9092");
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    KStreamBuilder builder = new KStreamBuilder();
    // 1 - stream from Kafka

    KStream<String, String> textLines = builder.stream("word-count-input");
    KTable<String, Long> wordCounts = textLines
            // 2 - map values to lowercase
            .mapValues(textLine -> textLine.toLowerCase())
            // can be alternatively written as:
            // .mapValues(String::toLowerCase)
            // 3 - flatmap values split by space
            .flatMapValues(textLine -> Arrays.asList(textLine.split("\\W+")))
            // 4 - select key to apply a key (we discard the old key)
            .selectKey((key, word) -> word)
            // 5 - group by key before aggregation
            .groupByKey()
            // 6 - count occurrences
            .count("Counts");

    // 7 - to in order to write the results back to kafka
    wordCounts.to(Serdes.String(), Serdes.Long(), "word-count-output");

    KafkaStreams streams = new KafkaStreams(builder, config);
    streams.start();
WARN [Consumer clientId=xxxxxxxxx-678dee93-a403-4635-9cfb-ccde35489acc-StreamThread-1-consumer, groupId=xxxxxxxxxx] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient:748) 

共有1个答案

卞浩漫
2023-03-14

返回给客户机的地址由代理上的adveredsed.listeners地址设置。

代码中给出的地址只是初始的引导连接。

您需要编辑代理设置,以确保返回外部可解析的“广告”地址,然后重新启动它

 类似资料:
  • 我正在尝试使用SSL实现Kafka消费者,在应用程序中提供所有必需的配置。 当我启动Spring启动Kafka消费者应用程序;消费者试图连接localhost:9092而不是提到Kafka经纪人。 KafkaConfig.java公司 正在加载所有与SSL和引导服务器相关的属性。值,我可以在调试模式中看到它。 应用程序.yml 在应用程序日志中,我得到了下面的日志 我找不到它,为什么它连接到本地主

  • 我使用的是spring-kafka 2.1.10.release。我有一个拥有next属性的消费者(复制了几乎所有的属性): 我生产的Apache Kafka版本是2.11-1.0.0-0pan4。有一个集群内部有3个kafka的节点: 甚至无法在本地复制。事情是这样的: 4)最神秘:在应用程序中,一切都可以工作。Spring-consumer读取新消息并将它们发送给kafka。我看到了这样的日志

  • 我试图找到一种在以下场景中使用ThreadPoolExecutor的方法: 我有一个单独的线程在线程池中生成和提交任务 为了提供更多的上下文,我目前只需一次提交所有任务,并取消ExecutorService返回的所有未来。在最长生成时间到期后提交。我忽略所有产生的取消异常,因为它们是预期的。问题是未来的行为。cancel(false)很奇怪,不适合我的用例: 它可以防止任何未启动的任务运行(良好)

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 你们中有人知道用SOAP web服务实现消费者驱动的契约的方法或工具吗?我有一个发布SOAP web服务的遗留Java应用程序,用Apache CXF实现,由一群Spring Boot Java微服务使用。我已经在使用Pact和Spring Cloud Contract来测试我在微服务之间的REST调用,但是找不到一种方法来使用这些相同的工具或任何其他工具来实现SOAP web服务。

  • Kafka消费者是否一直在检查代理(Kafka服务器)的运行状况,反之亦然 让我们说,无论如何,消费者和经纪人都知道彼此的健康状况,那么消费者将如何准确地从分区中读取 假设一个主题有48个分区,该主题有两个使用者组,那么有多少线程将使用所有分区中的数据