当前位置: 首页 > 知识库问答 >
问题:

Kafka 0.10 Java Client TimeoutException:包含1条记录的批处理过期

桓信鸥
2023-03-14

我有一个单节点,多(3)代理Zookeeper/Kafka设置。我使用的是Kafka0.10 Java客户端。

我写了以下simple remote(在不同于Kafka的服务器上)Producer(在代码中,我用MYIP替换了我的公共IP地址):

Properties config = new Properties();
try {
    config.put(ProducerConfig.CLIENT_ID_CONFIG, InetAddress.getLocalHost().getHostName());
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "MYIP:9092, MYIP:9093, MYIP:9094");
    config.put(ProducerConfig.ACKS_CONFIG, "all");
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
    producer = new KafkaProducer<String, byte[]>(config);
    Schema.Parser parser = new Schema.Parser();
    schema = parser.parse(GATEWAY_SCHEMA);
    recordInjection = GenericAvroCodecs.toBinary(schema);
    GenericData.Record avroRecord = new GenericData.Record(schema);
    //Filling in avroRecord (code not here)
    byte[] bytes = recordInjection.apply(avroRecord);

    Future<RecordMetadata> future = producer.send(new ProducerRecord<String, byte[]>(datasetId+"", "testKey", bytes));
    RecordMetadata data = future.get();
} catch (Exception e) {
    e.printStackTrace();
}

这3个代理的服务器属性如下所示(在3个不同的服务器属性文件中,Broker.ID为0、1、2、listeners为plaintext://:9092、plaintext://:9093、plaintext://:9094、host.name为10.2.0.4、10.2.0.5和10.2.0.6)。这是第一个服务器属性文件:

broker.id=0
listeners=PLAINTEXT://:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka1-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000

当我执行代码时,我得到以下异常:

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for 100101-0
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:65)
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:52)
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
    at com.nr.roles.gateway.GatewayManager.addTransaction(GatewayManager.java:212)
    at com.nr.roles.gateway.gw.service(gw.java:126)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
    at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:821)
    at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:583)
    at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1158)
    at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:511)
    at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1090)
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
    at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:109)
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:119)
    at org.eclipse.jetty.server.Server.handle(Server.java:517)
    at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:308)
    at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:242)
    at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:261)
    at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:95)
    at org.eclipse.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:75)
    at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceAndRun(ExecuteProduceConsume.java:213)
    at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:147)
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:654)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:572)
    at java.lang.Thread.run(Thread.java:745)
 Caused by: org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for 100101-0

有人知道我错过了什么吗?如有任何帮助,不胜感激。多谢

暂时还没有答案

 类似资料:
  • 我使用的是spring批处理,和通常使用的一样,我有读取器、处理器和写入器。 我有两个问题 1>Reader查询所有200条记录(表中记录总大小为200,我给出了pageSize=200),因此它得到所有200条记录,在处理器中,我们需要所有这些记录的列表,因为我们必须将每个记录与其他199条记录进行比较,以便将它们分组在不同的层中。因此我在想,如果我们能在处理步骤中得到那个列表,我就可以操纵它们

  • 我正在开发spring-mvc应用程序。 我需要处理超过10万条数据记录。我不能让它依赖于数据库,所以我必须用java实现所有逻辑。 目前,我正在创建多个线程,并将1000条记录分配给每个要处理的线程。 我正在使用org。springframework。行程安排。同时发生的ThreadPoolTaskExecutor(线程池任务执行器)。 列表项 问题: 建议使用的线程数 我应该在线程之间平均分配

  • 所以我只使用Jooq来构建查询,而不是执行查询,如下所示: 对象可以执行查询conn.asyncExecute(org.jooq.query query)。所以我的问题是,如何创建类型为org.jooq的批插入查询。查询?具体来说,给定一个列表 请注意,我知道其他问题询问如何使用Jooq进行批处理插入,但他们使用Jooq执行查询就像下面Jose Martinez的回答一样,而这里我只使用Jooq构

  • 我正在使用spring批处理使用RepositoryItemReader从postgresql DB读取记录,然后将其写入主题。我看到大约有100万条记录需要处理,但它并没有处理所有的记录。我已经将reader的pageSize设置为10,000并且与提交间隔(块大小)相同

  • 下面是我用来推送主题的方法: 使用命令shell脚本 ./kafka-console-producer.sh--broker-list 10.0.1.15:9092--主题DomainEntityCommandStream ./kafka-console-consumer.sh--boostrap-server 10.0.1.15:9092-topic DomainEntityCommandStr

  • 我的计划是 使用多线程步骤,以便每个线程读取一条记录-在处理器中生成多条记录-将生成的记录写入单独的excel文件。 使用同步读取器从进程表中读取。 在处理器中,使用读取器中返回的记录查询DB(涉及多个联接)并形成一个复合对象。 用自定义编写器将复合对象写入文件 就内存管理而言,上面的方法听起来不太好。 因为要写入的记录是在处理器中生成的(而不是从读取器那里获得的,读取器只是给出记录ID),所以只