当前位置: 首页 > 知识库问答 >
问题:

Kafka流不读取输入主题

左丘季
2023-03-14

我从教程中创建了示例Kafka Streams应用程序:

    public static void main(String[] args) throws Exception {
    Logger log = Logger.getLogger("Name");

    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordprint");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.99.100:9092");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    final KStreamBuilder builder = new KStreamBuilder();
    builder.stream("onecon_postgres").print();

    final KafkaStreams streams = new KafkaStreams(builder, props);
    final CountDownLatch latch = new CountDownLatch(1);

    // attach shutdown handler to catch control-c
    Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
        @Override
        public void run() {
            streams.close();
            latch.countDown();
        }
    });

    try {
        streams.start();
        log.info("After Start");
        latch.await();
    } catch (Throwable e) {
        System.exit(1);
    }
    System.exit(0);
    }

不幸的是,这个应用程序不读取输入流。我有一个来自PostgreSQL的JDBC源连接器,它正在处理来自一个数据库的精细流数据(我可以在本主题中的Kafka Connect UI数据上看到)。

我的问题是,即使我在BOOTSTRAP\u SERVERS\u CONFIG的Properties IP is localhost中更改了IP,我也不知道为什么。

[main] INFO org.apache.kafka.streams.StreamsConfig - StreamsConfig values: 
    application.id = streams-linesplit
    application.server = 
    **bootstrap.servers = [localhost:9092]**
    buffered.records.per.partition = 1000
    cache.max.bytes.buffering = 10485760
    client.id = 
    commit.interval.ms = 30000
    connections.max.idle.ms = 540000
    default.key.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
    default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp
    default.value.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
    key.serde = null
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    num.standby.replicas = 0
    num.stream.threads = 1
    partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper
    poll.ms = 100
    processing.guarantee = at_least_once
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    replication.factor = 1
    request.timeout.ms = 40000
    retry.backoff.ms = 100
    rocksdb.config.setter = null
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    state.cleanup.delay.ms = 600000
    state.dir = /tmp/kafka-streams
    timestamp.extractor = null
    value.serde = null
    windowstore.changelog.additional.retention.ms = 86400000
    zookeeper.connect = 

为了克服这个问题,我使用netsh转发流量,但我看不到这个应用程序使用我的流。

netsh interface portproxy add v4tov4 listenport=9092 listenaddress=127.0.0.1 connectport=9092 connectaddress=192.168.99.100

共有1个答案

史和泰
2023-03-14

不幸的是,这个应用程序不读取输入流。

您的Kafka Streams应用程序和Kafka代理之间似乎存在网络问题。“Kafka Streams不起作用”是不太可能的。

此外,如果你不提供更多信息,就很难帮助你:

  • 你的Kafka经纪人使用什么Kafka版本
  • 你的应用程序使用什么Kafka(Streams)版本
  • 哪个操作系统
  • 网络设置是什么?
    • 运行应用程序的计算机的IP地址
    • 您的Kafka代理(或多个代理)在哪个IP端口上侦听新连接?是192.168.99.100:9092吗

    我的问题是,即使我在BOOTSTRAP\u SERVERS\u CONFIG的Properties IP is localhost中更改了IP,我也不知道为什么。

    我不明白——为什么你认为将BOOTSTRAP\u SERVERS\u CONFIG更改为localhost:9092可以解决原来的问题?我知道Kafka经纪人实际上在监听192.168.99.100:9092?

    为了克服这个问题,我使用netsh转发流量,但我看不到这个应用程序使用我的流。

    端口转发很可能无济于事。如果不更新Kafka代理的配置,代理默认只会在其“真实”IP端口上进行通信。稍微简化一下:配置为监听192.168.99.100:9092的代理不会响应Kafka Streams应用程序发送的localhost:9092请求,即使您正在从localhost:9092-

    希望这有点帮助!

 类似资料:
  • 我目前正试图从服务器读取图像文件,但得到的数据不完整或 这与缓冲区大小有关吗?我已经尝试使用静态大小而不是ContentLength。敬请指教。

  • 问题内容: 我正在使用从运行约一分钟的进程中读取stdout。 我该如何stdout以流方式打印出该流程的每一行,以便可以看到生成的输出,但仍然阻止该流程终止,然后再继续? 似乎一次给出所有输出。 问题答案: 这是一个简单的示例(不检查错误): 如果ls结束太快,则while循环可能会在你读取所有数据之前结束。 你可以通过以下方式在中捕获其余部分:

  • 我有一个要求加入3个Kafka主题。前两个主题A和B将使用inner join添加,因为消息键相同,并且生成一个POJO与B相同的新Kafka流。现在,使用这个累积的流,我需要加入另一个主题C,并且我需要根据C中存在的字段对输出进行分组。 到目前为止,我有以下方法: 前两个主题(A和B)的KStream-KStream inner join是否可以不发布任何主题的累积输出,并且仍然可以在下面使用它

  • 我正在使用Spring Cloud Stream Kafka Binder。我有以下Kafka活页夹函数。 在yml中,我有: 如果我想从同一个功能向两个不同的主题发送数据,我需要做什么?

  • 我已经在Ubuntu上设置了Kafka和Spark。我正在尝试阅读Kafka的主题通过火花流使用pyspark(Jupyter笔记本)。Spark既没有读取数据,也没有抛出任何错误。 null Kafka生产者:bin/kafka-console-producer.sh--broker-list localhost:9092--topic new_topic Kafka使用者:bin/kafka-

  • 在阅读Kafka主题时,我得到了奇怪的ArrayIndexOutOfBoundsException。花了很多时间却搞不清问题所在。有人能在这方面提供帮助/建议吗。这是我的日志。