当前位置: 首页 > 知识库问答 >
问题:

我如何用Kafka制作人API向Kafka写一条重要的信息?[重复]

史商震
2023-03-14

我试图在Kafka(大约15mb)中写入一条大消息,但没有写入,程序结束时好像一切正常,但主题中没有任何消息。

小消息确实会被写入。

以下是代码:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class Main {
    private final static String TOPIC = "rpdc_21596_in2";
    private final static String BOOTSTRAP_SERVERS = "host:port";

    private static KafkaProducer<String, String> createProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "20971520");
        props.put("test.whatever", "fdsfdsf");

        return new KafkaProducer<>(props);
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {
        ProducerRecord<String, String> record =
                new ProducerRecord<String, String>(TOPIC,
                        0,
                        123L,
                        "fdsfdsdsdssss",
                        new String(Files.readAllBytes(Paths.get("/Users/user/Desktop/value1.json")))
                );
        KafkaProducer<String, String> producer = createProducer();
        RecordMetadata recordMetadata = producer.send(record).get();
        producer.flush();
        producer.close();

        System.out.println(recordMetadata);
    }
}

这个主题已经被配置为接受大消息,我已经能够用python写进去了。下面是python代码:

from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers=['host:port'], max_request_size=20971520, request_timeout_ms=100000)

with open('/Users/user/Desktop/value1.json', 'rb') as f:
    lines = f.read()
    print(type(lines))

    # produce keyed messages to enable hashed partitioning
    future = producer.send('rpdc_21596_in2', key=b'foo', value=lines)

    # Block for 'synchronous' sends
    try:
        record_metadata = future.get(timeout=50)
    except KafkaError:
        # Decide what to do if produce request failed...
        pass

    # Successful result returns assigned partition and offset
    print (record_metadata.topic)
    print (record_metadata.partition)
    print (record_metadata.offset)


    producer.flush()

但java版本不起作用。

共有1个答案

利思源
2023-03-14

创建主题时,需要适当配置主题:https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#topicconfigs_max.message.bytes

$ kafka-topics.sh --create --bootstrap-servers ... --config max.message.bytes=20971520

更新:

可能会添加更多的属性,我一直在用这个来推动大基64 Blob:

    // Only one in-flight messages per Kafka broker connection
    // - max.in.flight.requests.per.connection (default 5)
    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
    // Set the number of retries - retries
    props.put(ProducerConfig.RETRIES_CONFIG, "3");

    // Request timeout - request.timeout.ms
    props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "15000");

    // Only retry after one second.
    props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "1000");
   
    // set max block to one minute by default
    props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "60000");
   
    // set transaction timeout to one minute
    props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "60000");
   
    // set delivery timeout to two minutes
    props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "120000");

    //time to wait before sending messages out to Kafka, should not be too high
    props.put(ProducerConfig.LINGER_MS_CONFIG, "100");
    // maximum amount of data to be collected before sending the batch, you will always hit that
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, "65536");

    //those ones are not neccessary but useful for your usecase
    props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
    props.put(ProducerConfig.CLIENT_ID_CONFIG, "myClient");
 类似资料:
  • 我在用Kafka。 我有10k个jsons列表, 我该怎么做呢? 谢谢

  • 我使用spring框架和有3个代理集群的kafka。我发现使用者没有使用某些消息(假设在所有发送消息中使用0.01%),所以在生产者代码中,我记录了API返回的消息偏移量: 我使用返回偏移量来查询所有分区中的kafka主题,但它没有找到消息(我测试了与消费者使用的和他们在kafka中的消息相关的其他偏移量),问题是什么,我如何确保该消息发送到kafka? 我还在producer中使用了

  • 我正在尝试加载一个简单的文本文件,而不是Kafka中的标准输入。下载Kafka后,我执行了以下步骤: 开始动物园管理员: zookeeper-server-start.sh配置zookeeper.properties 已启动服务器 kafka-server-start.sh配置server.properties 创建了一个名为“test”的主题: <code>bin/kafka主题。sh--创建-

  • Kafka客户:0.11.0.0-cp1Kafka经纪人: 在Kafka broker滚动重启时,我们的应用程序在发送到broker时丢失了一些消息。我相信滚动重启不应该丢失任何信息。以下是我们正在使用的生产者(将生产者与异步发送()一起使用,而不使用回调/未来等)设置: 我在日志中看到了这些例外 但日志显示重试尝试离开了,我很好奇为什么它没有重试呢?如果有人有任何想法,请告诉我?

  • 我有一个多线程应用程序,它使用producer类生成消息,之前我使用下面的代码为每个请求创建producer。其中KafkaProducer是新建的,每个请求如下: 然后我阅读了关于生产者的Kafka文档,并了解到我们应该使用单个生产者实例来获得良好的性能。 然后我在一个singleton类中创建了KafkaProducer的单个实例。 现在什么时候 或者我们如何在关闭后重新连接到生产者。问题是如