我试图在Kafka(大约15mb)中写入一条大消息,但没有写入,程序结束时好像一切正常,但主题中没有任何消息。
小消息确实会被写入。
以下是代码:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class Main {
private final static String TOPIC = "rpdc_21596_in2";
private final static String BOOTSTRAP_SERVERS = "host:port";
private static KafkaProducer<String, String> createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "20971520");
props.put("test.whatever", "fdsfdsf");
return new KafkaProducer<>(props);
}
public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {
ProducerRecord<String, String> record =
new ProducerRecord<String, String>(TOPIC,
0,
123L,
"fdsfdsdsdssss",
new String(Files.readAllBytes(Paths.get("/Users/user/Desktop/value1.json")))
);
KafkaProducer<String, String> producer = createProducer();
RecordMetadata recordMetadata = producer.send(record).get();
producer.flush();
producer.close();
System.out.println(recordMetadata);
}
}
这个主题已经被配置为接受大消息,我已经能够用python写进去了。下面是python代码:
from kafka import KafkaProducer
from kafka.errors import KafkaError
producer = KafkaProducer(bootstrap_servers=['host:port'], max_request_size=20971520, request_timeout_ms=100000)
with open('/Users/user/Desktop/value1.json', 'rb') as f:
lines = f.read()
print(type(lines))
# produce keyed messages to enable hashed partitioning
future = producer.send('rpdc_21596_in2', key=b'foo', value=lines)
# Block for 'synchronous' sends
try:
record_metadata = future.get(timeout=50)
except KafkaError:
# Decide what to do if produce request failed...
pass
# Successful result returns assigned partition and offset
print (record_metadata.topic)
print (record_metadata.partition)
print (record_metadata.offset)
producer.flush()
但java版本不起作用。
创建主题时,需要适当配置主题:https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#topicconfigs_max.message.bytes
$ kafka-topics.sh --create --bootstrap-servers ... --config max.message.bytes=20971520
更新:
可能会添加更多的属性,我一直在用这个来推动大基64 Blob:
// Only one in-flight messages per Kafka broker connection
// - max.in.flight.requests.per.connection (default 5)
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
// Set the number of retries - retries
props.put(ProducerConfig.RETRIES_CONFIG, "3");
// Request timeout - request.timeout.ms
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "15000");
// Only retry after one second.
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "1000");
// set max block to one minute by default
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "60000");
// set transaction timeout to one minute
props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "60000");
// set delivery timeout to two minutes
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "120000");
//time to wait before sending messages out to Kafka, should not be too high
props.put(ProducerConfig.LINGER_MS_CONFIG, "100");
// maximum amount of data to be collected before sending the batch, you will always hit that
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "65536");
//those ones are not neccessary but useful for your usecase
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "myClient");
我在用Kafka。 我有10k个jsons列表, 我该怎么做呢? 谢谢
我使用spring框架和有3个代理集群的kafka。我发现使用者没有使用某些消息(假设在所有发送消息中使用0.01%),所以在生产者代码中,我记录了API返回的消息偏移量: 我使用返回偏移量来查询所有分区中的kafka主题,但它没有找到消息(我测试了与消费者使用的和他们在kafka中的消息相关的其他偏移量),问题是什么,我如何确保该消息发送到kafka? 我还在producer中使用了
我正在尝试加载一个简单的文本文件,而不是Kafka中的标准输入。下载Kafka后,我执行了以下步骤: 开始动物园管理员: zookeeper-server-start.sh配置zookeeper.properties 已启动服务器 kafka-server-start.sh配置server.properties 创建了一个名为“test”的主题: <code>bin/kafka主题。sh--创建-
Kafka客户:0.11.0.0-cp1Kafka经纪人: 在Kafka broker滚动重启时,我们的应用程序在发送到broker时丢失了一些消息。我相信滚动重启不应该丢失任何信息。以下是我们正在使用的生产者(将生产者与异步发送()一起使用,而不使用回调/未来等)设置: 我在日志中看到了这些例外 但日志显示重试尝试离开了,我很好奇为什么它没有重试呢?如果有人有任何想法,请告诉我?
我有一个多线程应用程序,它使用producer类生成消息,之前我使用下面的代码为每个请求创建producer。其中KafkaProducer是新建的,每个请求如下: 然后我阅读了关于生产者的Kafka文档,并了解到我们应该使用单个生产者实例来获得良好的性能。 然后我在一个singleton类中创建了KafkaProducer的单个实例。 现在什么时候 或者我们如何在关闭后重新连接到生产者。问题是如