当前位置: 首页 > 知识库问答 >
问题:

正在过期的xxxxx的1条记录:自批处理创建后已过30030毫秒加上停留时间

阳枫涟
2023-03-14

下面是我用来推送主题的方法:

public DomainEntity push(DomainEntity pDomainEntity) throws Exception {
    logger.log(Level.INFO, "streaming...");
    wKafkaProperties.put("bootstrap.servers", "localhost:9092");
    wKafkaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    wKafkaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    KafkaProducer wKafkaProducer = new KafkaProducer(wKafkaProperties);
    ProducerRecord wProducerRecord = new ProducerRecord("DomainEntityCommandStream", getJSON(pDomainEntity));
    wKafkaProducer.send(wProducerRecord, (RecordMetadata r, Exception e) -> {
        if (e != null) {
            logger.log(Level.SEVERE, e.getMessage());
        }
    }).get();
    return pDomainEntity;
}

使用命令shell脚本

./kafka-console-producer.sh--broker-list 10.0.1.15:9092--主题DomainEntityCommandStream

./kafka-console-consumer.sh--boostrap-server 10.0.1.15:9092-topic DomainEntityCommandStream--从头开始

工作得很好。

通过Stackoverflow的一些相关问题,我试图清除这个主题:

有效负载小得离谱,为什么我要更改batch.size?

<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/"
                  xmlns:gs="http://soap.problem.com">
   <soapenv:Header/>
   <soapenv:Body>
      <gs:streamDomainEntityRequest>
         <gs:domainEntity>
                <gs:name>12345</gs:name>
                <gs:value>Quebec</gs:value>
                <gs:version>666</gs:version>
            </gs:domainEntity>
      </gs:streamDomainEntityRequest>
   </soapenv:Body>
</soapenv:Envelope>

共有1个答案

龙永思
2023-03-14

使用Docker和Kafka 0.11.0.1图像,您需要向容器中添加以下环境参数:

KAFKA_ZOOKEEPER_CONNECT=x.x.x.x:xxxx(您的zookeeper IP或域:端口默认2181)

KAFKA_ADVERTISED_HOST_NAME=x.x.x.x(您的kafka IP或域)

可选:

KAFKA_BROKER_ID=999(某个值)

kafka_create_topics=test:1:1(开始时要创建一些主题名称)

 类似资料:
  • 我有一个单节点,多(3)代理Zookeeper/Kafka设置。我使用的是Kafka0.10 Java客户端。 我写了以下simple remote(在不同于Kafka的服务器上)Producer(在代码中,我用MYIP替换了我的公共IP地址): 这3个代理的服务器属性如下所示(在3个不同的服务器属性文件中,Broker.ID为0、1、2、listeners为plaintext://:9092、p

  • 2018-04-19 15:12:57[kafka-producer-network-thread producer-1]错误O.s.K.s.LoggingProducerListener-向主题xxxxx-v1:org.apache.kafka.common.errors.TimeoutException:自批处理创建后已超过xxxxx-v1-3:60043毫秒的过期1条记录,有效负载=“{79

  • 我们在生产方面面临以下问题: 是因为无效的配置,如批量大小、请求超时或其他原因吗?

  • 我正在尝试使用Java开发2D游戏。到目前为止,我已经设法将游戏设置为使用全屏独占模式,并在自定义线程中进行活动渲染。我决定使用的游戏循环是固定时间步长变量渲染类型。这种类型的游戏循环应该尽可能快地渲染设备可以处理,我对此并不完全满意。所以我试图使用来限制帧速率。 如果我关闭所有渲染,并简单地在游戏循环中更新游戏,< code>Thread.sleep(1)大约在< code>1 ms内成功Hib

  • 我正在开发spring-mvc应用程序。 我需要处理超过10万条数据记录。我不能让它依赖于数据库,所以我必须用java实现所有逻辑。 目前,我正在创建多个线程,并将1000条记录分配给每个要处理的线程。 我正在使用org。springframework。行程安排。同时发生的ThreadPoolTaskExecutor(线程池任务执行器)。 列表项 问题: 建议使用的线程数 我应该在线程之间平均分配