我正在尝试用auto连接aws上的我的经纪人。创造话题。在我的服务器中enable=true。属性文件。当我试图连接Java代理时,出现了以下错误。
1197[Kafka制作人网络线程|制作人-1]错误组织。阿帕奇。Kafka。客户。制作人内部。发送方-Kafka制作人I/O线程中的未捕获错误:org。阿帕奇。Kafka。常见的协议类型。SchemaException:读取字段“topic_metadata”时出错:读取大小为619631的数组时出错,org上只有37字节可用。阿帕奇。Kafka。常见的协议类型。模式。在org上阅读(Schema.java:73)。阿帕奇。Kafka。客户。网络客户端。org上的parseResponse(NetworkClient.java:380)。阿帕奇。Kafka。客户。网络客户端。handleCompletedReceives(NetworkClient.java:449)位于org。阿帕奇。Kafka。客户。网络客户端。在org上投票(NetworkClient.java:269)。阿帕奇。Kafka。客户。制作人内部。发件人。在org上运行(Sender.java:229)。阿帕奇。Kafka。客户。制作人内部。发件人。在java上运行(Sender.java:134)。朗。丝线。运行(未知源)
以下是我的客户生产商代码。
public static void main(String[] argv){
Properties props = new Properties();
props.put("bootstrap.servers", "http://XX.XX.XX.XX:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 0);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("block.on.buffer.full",true);
Producer<String, String> producer = new KafkaProducer<String, String>(props);
try{ for(int i = 0; i < 10; i++)
{ producer.send(new ProducerRecord<String, String>("topicjava", Integer.toString(i), Integer.toString(i)));
System.out.println("Tried sending:"+i);}
}
catch (Exception e){
e.printStackTrace();
}
producer.close();
}
有人能帮我解决这个问题吗?
确保使用正确的版本。假设您使用以下maven dependecy:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.8_2.10</artifactId>
<version>${flink.version}</version>
</dependency>
所以神器等于: flink连接器-kafka-0.8_2.10
现在检查您是否使用正确的Kafka版本:
cd /KAFKA_HOME/libs
现在找到Kafka你的版本来源。罐子
就我而言,我有Kafka2.10-0.8.2.1-sources。罐子所以它很好用!:)如果使用不同的版本,只需更改maven依赖项或下载正确的Kafka版本。
看起来我在客户端设置了错误的属性,我的server.properties文件也有不适合我使用的客户端的属性。所以我决定使用maven将java客户端更改为0.9.0版本。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.9.0.0</version>
</dependency>
我的server.properties文件如下。
broker.id=0
port=9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=9000
delete.topic.enable=true
advertised.host.name=<aws public Ip>
advertised.port=9092
我的制作人代码看起来像
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
public class HelloKafkaProducer
{
public static void main(String args[]) throws InterruptedException, ExecutionException {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"IP:9092");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
KafkaProducer<String,String> producer = new KafkaProducer<String,String>(props);
boolean sync = false;
String topic="loader1";
String key = "mykey";
for(int i=0;i<1000;i++)
{
String value = "myvaluehasbeensent"+i+i;
ProducerRecord<String,String> producerRecord = new ProducerRecord<String,String>(topic, key, value);
if (sync) {
producer.send(producerRecord).get();
} else {
producer.send(producerRecord);
}
}
producer.close();
}
}
我也面临过类似的问题。这里的问题是,当pom文件中的kafka客户端版本与kafka服务器版本不匹配时,情况就不同了。我使用的是Kafka客户端0.10.0.01,但Kafka服务器仍然是0.9.0.0。所以我把Kafka服务器的版本升级到了10,问题就解决了。
<dependency>
<groupId>org.apache.servicemix.bundles</groupId>
<artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId>
<version>0.10.0.0_1</version>
</dependency>
所有消息的尾部都有2个字节的crc。11 b5表示消息开始。消息长度应为36字节。72 b5是另一个消息开始标记。112字节长度。73 b5也是消息标记。36字节。请找到蓝色下划线:这是一条好消息。小红胖胖的红是不好的。它是37字节长度。我有一个额外的字节和crc不匹配。下一个好的(绿色)。下一个坏消息。它是114字节而不是112字节,当然crc不匹配。 这是我的代码:
我是一名铁锈新手,尝试阅读两个数字并计算它们的商: 但是当我试图编译它时,我得到了以下错误重复了几次: src/safe_div.rs:12: 12:12:21错误:不匹配类型:预期
嗨,我必须读取一个xml文件,在代码的一部分中有一些字段标签,名称如示例中所述 我只想读取名为“link\u pdf”的字段 如果我用这个密码 我可以读取值,但在某些情况下这不是一个好的工作,如何通过字段名访问值? 不是工作
我正在尝试使用Spring Data Redis绝地组合连接到AWS ElastiCache Redis。[Redis Cluster enabled,因此它有Cluster Config endpoint,有3个碎片-每个碎片有1个主节点和2个副本节点] 读取超时错误。 AWS Redis服务器版本:5.0.3/群集模式:已启用/SSL:已启用/Auth:已启用(通过密码) 库——Spring数
我还在努力把我的头缠在共享的记忆上。我试图完成的是拥有一个豆荚数组。每个pod还将包含一个KeyValue数组。 因此,在这一点上,我在一个pod中有一个keyValue,如果我从同一个文件中读取共享内存,我就没有问题了。 当我试图从另一个进程中读取时,问题就出现了。我有以下文件 printf导致seg错误,我认为它试图访问没有分配任何内容的部分内存,而printf实际上会在我的第一个文件中打印。
我收到错误: 错误类型错误: 无法读取在评估 (webpack-internal:///./node_modules/@angular/common/esm5/http.js:163) 处未定义的属性 “长度”在 Array.forEach () 在 httpHeaders.lazyInit (webpack-internal:///./node_modules/@angular/common/e