当前位置: 首页 > 知识库问答 >
问题:

如何设置Kafka中的消息大小?

干浩然
2023-03-14

我目前使用的是Kafka0.9.0.1。根据我找到的一些来源,设置消息大小的方法是修改server.properties中的以下键值。

  • message.max.bytes
  • replica.fetch.max.bytes
  • fetch.message.max.bytes

我的server.properties文件实际上有这些设置。

message.max.bytes=10485760
replica.fetch.max.bytes=20971520
fetch.message.max.bytes=10485760

其他可能相关的设置如下。

socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

但是,当我试图发送具有4到6 MB大小的有效负载的消息时,使用者永远不会得到任何消息。生产者似乎在发送消息时没有抛出任何异常。如果我确实发送了较小的有效负载(例如<1 MB),那么使用者确实接收到了消息。

Producer<String, byte[]> producer = new KafkaProducer<>(getProducerProps());
File dir = new File("/path/to/dir");
for(String s : dir.list()) {
  File f = new File(dir, s);
  byte[] data = Files.readAllBytes(f.toPath());
  Payload payload = new Payload(data); //a simple pojo to store payload
  String key = String.valueOf(System.currentTimeMillis());
  byte[] val = KryoUtil.toBytes(payload); //custom util to use kryo to get bytes[]
  producer.send(new ProducerRecord<>("test", key, val));
}
producer.close();

下面是接收消息的示例代码。

KafkaConsumer consumer = new KafkaConsumer<>(getConsumerProps());
consumer.subscribe(Arrays.asList("test"));
while(true) {
  ConsumerRecord<String, byte[]> records = consumer.poll(100);
  for(ConsumerRecord<String, byte[]> record : records) {
    long offset = record.offset();
    String key = record.key();
    byte[] val = record.value();
    Payload payload = (Payload)KryoUtil.toObject(val, Payload.class); //custom util to use kryo to deserialize back to object
    System.out.println(
      System.format("offset=%d, key=%s", offset, key));
  }
}

下面是为生产者和消费者填充属性文件的方法。

public static Properties getProducerProps() {
  Properties props = new Properties();
  props.put("bootstrap.servers", "qc1:9092,qc2:9092,qc3:9092,qc4:9092");
  props.put("acks", "all");
  props.put("retries", 0);
  props.put("batch.size", 16384);
  props.put("linger.ms", 1);
  props.put("buffer.memory", 33554432);
  props.put("compression.type", "snappy");
  props.put("max.request.size", 10485760); //need this
  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
  return props;
}

public static Properties getConsumerProps() {
  Properties props = new Properties();
  props.put("bootstrap.servers", "qc1:9092,qc2:9092,qc3:9092,qc4:9092");
  props.put("group.id", "test");
  props.put("enable.auto.commit", "true");
  props.put("auto.commit.interval.ms", "1000");
  props.put("session.timeout.ms", "30000");
  props.put("max.partition.fetch.bytes", 10485760); //need this too
  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
  return props;
}

共有1个答案

楚丰羽
2023-03-14

Jane,首先不要使用fetch.message.max.bytes,因为它是来自使用者的属性,不在server.properties文件中;其次,因为它是用于旧版本的使用者,所以当您将使用者创建为用于实例化它的属性的一部分时,请使用max.partition.fetch.bytes

 类似资料:
  • 我正在使用Kafka(与雅虎Kafka经理) 我想为重置消息设置一个规则,或者他们如何称呼它:“分区偏移量的总和” 在server.properties上是否有滚动kafka偏移量的参数? (即:我想重置或删除所有影响邮件保留的参数) 谢谢。

  • 使用Kafka Streams,我们无法确定在处理写入接收器主题的消息后压缩这些消息所需的配置。 另一方面,使用经典的Kafka Producer,可以通过在KafkaProducer属性上设置配置“compression.type”轻松实现压缩 然而,似乎没有任何记录在案的Kafka Streams压缩处理过的消息的例子。 至于这次(2019年初),有没有一种方法可以使用Kafka流进行压缩?

  • 我没有找到任何关于Debezium如何设置Kafka消息时间戳的文档,以及是否设置了时间戳。 通过比较这些值,Kafka消息的时间戳总是在数据库表(source.ts_ms)更改的时间戳之后,也在Debezium(ts_ms)处理更改的时间之后。这表明Kafka消息时间戳只是Kafka代理设置为摄入时间的时间戳。 有人知道关于Debezium是否以及如何在其填充的接收器主题中设置Kafka消息时间

  • 我对Kafka2.6.0中的消息大小配置有点困惑。但让我们讲一个故事: 我们正在使用由3个节点组成的Kafka集群。到目前为止,消息的标准配置。“zstd压缩”被激活。 相关的代理配置很简单: 此时,生产者配置也很简单: 现在我们想把一个8Mbyte的消息放到一个特定的主题中。这些数据的压缩大小只有200 KB。 如果我将这些数据放入主题中,会出现以下错误: 所以我改变了生产者配置如下: 现在制作

  • (异常出现在生产者中,我在这个应用程序中没有消费者。) 我该怎么做才能摆脱这个例外呢?

  • 设置节点允许的消息大小上限。大于此限制的接收和发送消息都将被拒绝。 Whisper中的消息大小不能超过底层P2P协议的10Mb上限。 调用: web3.shh.setMaxMessageSize(size, [callback]) 参数: size:Number - 字节为单位的消息大小 callback:Function - 可选的回调函数,其第一个参数为错误对象,第二个参数为返回结果 返回值