当前位置: 首页 > 知识库问答 >
问题:

错误读取字段'topic_metadata'在Kafka

居飞扬
2023-03-14

我正在尝试用auto连接aws上的我的经纪人。创造话题。在我的服务器中enable=true。属性文件。当我试图连接Java代理时,出现了以下错误。

1197[Kafka制作人网络线程|制作人-1]错误组织。阿帕奇。Kafka。客户。制作人内部。发送方-Kafka制作人I/O线程中的未捕获错误:org。阿帕奇。Kafka。常见的协议类型。SchemaException:读取字段“topic_metadata”时出错:读取大小为619631的数组时出错,org上只有37字节可用。阿帕奇。Kafka。常见的协议类型。模式。在org上阅读(Schema.java:73)。阿帕奇。Kafka。客户。网络客户端。org上的parseResponse(NetworkClient.java:380)。阿帕奇。Kafka。客户。网络客户端。handleCompletedReceives(NetworkClient.java:449)位于org。阿帕奇。Kafka。客户。网络客户端。在org上投票(NetworkClient.java:269)。阿帕奇。Kafka。客户。制作人内部。发件人。在org上运行(Sender.java:229)。阿帕奇。Kafka。客户。制作人内部。发件人。在java上运行(Sender.java:134)。朗。丝线。运行(未知源)

以下是我的客户生产商代码。

public static void main(String[] argv){
         Properties props = new Properties();
         props.put("bootstrap.servers", "http://XX.XX.XX.XX:9092");
         props.put("acks", "all");
         props.put("retries", 0);
         props.put("batch.size", 16384);
         props.put("linger.ms", 0);
         props.put("buffer.memory", 33554432);
         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         props.put("block.on.buffer.full",true);
         Producer<String, String> producer = new KafkaProducer<String, String>(props);
        try{ for(int i = 0; i < 10; i++)
        { producer.send(new ProducerRecord<String, String>("topicjava", Integer.toString(i), Integer.toString(i)));
             System.out.println("Tried sending:"+i);}
        }
        catch (Exception e){
            e.printStackTrace();
        }
         producer.close();
}

有人能帮我解决这个问题吗?

共有3个答案

陆涵畅
2023-03-14

确保使用正确的版本。假设您使用以下maven dependecy:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka-0.8_2.10</artifactId>
  <version>${flink.version}</version>
</dependency>

所以神器等于: flink连接器-kafka-0.8_2.10

现在检查您是否使用正确的Kafka版本:

cd /KAFKA_HOME/libs

现在找到Kafka你的版本来源。罐子

就我而言,我有Kafka2.10-0.8.2.1-sources。罐子所以它很好用!:)如果使用不同的版本,只需更改maven依赖项或下载正确的Kafka版本。

鱼意远
2023-03-14

看起来我在客户端设置了错误的属性,我的server.properties文件也有不适合我使用的客户端的属性。所以我决定使用maven将java客户端更改为0.9.0版本。

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.9.0.0</version>
</dependency>

我的server.properties文件如下。

broker.id=0
port=9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=9000
delete.topic.enable=true
advertised.host.name=<aws public Ip>
advertised.port=9092

我的制作人代码看起来像

    import java.util.Properties;
    import java.util.concurrent.ExecutionException;

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    public class HelloKafkaProducer 
     {


       public static void main(String args[]) throws InterruptedException,      ExecutionException {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"IP:9092");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        KafkaProducer<String,String> producer = new KafkaProducer<String,String>(props);

        boolean sync = false;
        String topic="loader1";
        String key = "mykey";
        for(int i=0;i<1000;i++)
        {
        String value = "myvaluehasbeensent"+i+i;
        ProducerRecord<String,String> producerRecord = new ProducerRecord<String,String>(topic, key, value);
        if (sync) {
            producer.send(producerRecord).get();
        } else {
            producer.send(producerRecord);
        }
        }
        producer.close();
    }
 }
宋伯寅
2023-03-14

我也面临过类似的问题。这里的问题是,当pom文件中的kafka客户端版本与kafka服务器版本不匹配时,情况就不同了。我使用的是Kafka客户端0.10.0.01,但Kafka服务器仍然是0.9.0.0。所以我把Kafka服务器的版本升级到了10,问题就解决了。

<dependency>
            <groupId>org.apache.servicemix.bundles</groupId>
            <artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId>
            <version>0.10.0.0_1</version>
        </dependency>            
 类似资料:
  • 所有消息的尾部都有2个字节的crc。11 b5表示消息开始。消息长度应为36字节。72 b5是另一个消息开始标记。112字节长度。73 b5也是消息标记。36字节。请找到蓝色下划线:这是一条好消息。小红胖胖的红是不好的。它是37字节长度。我有一个额外的字节和crc不匹配。下一个好的(绿色)。下一个坏消息。它是114字节而不是112字节,当然crc不匹配。 这是我的代码:

  • 我是一名铁锈新手,尝试阅读两个数字并计算它们的商: 但是当我试图编译它时,我得到了以下错误重复了几次: src/safe_div.rs:12: 12:12:21错误:不匹配类型:预期

  • 嗨,我必须读取一个xml文件,在代码的一部分中有一些字段标签,名称如示例中所述 我只想读取名为“link\u pdf”的字段 如果我用这个密码 我可以读取值,但在某些情况下这不是一个好的工作,如何通过字段名访问值? 不是工作

  • 我正在尝试使用Spring Data Redis绝地组合连接到AWS ElastiCache Redis。[Redis Cluster enabled,因此它有Cluster Config endpoint,有3个碎片-每个碎片有1个主节点和2个副本节点] 读取超时错误。 AWS Redis服务器版本:5.0.3/群集模式:已启用/SSL:已启用/Auth:已启用(通过密码) 库——Spring数

  • 我还在努力把我的头缠在共享的记忆上。我试图完成的是拥有一个豆荚数组。每个pod还将包含一个KeyValue数组。 因此,在这一点上,我在一个pod中有一个keyValue,如果我从同一个文件中读取共享内存,我就没有问题了。 当我试图从另一个进程中读取时,问题就出现了。我有以下文件 printf导致seg错误,我认为它试图访问没有分配任何内容的部分内存,而printf实际上会在我的第一个文件中打印。

  • 我收到错误: 错误类型错误: 无法读取在评估 (webpack-internal:///./node_modules/@angular/common/esm5/http.js:163) 处未定义的属性 “长度”在 Array.forEach () 在 httpHeaders.lazyInit (webpack-internal:///./node_modules/@angular/common/e