public class Producer {
private final KafkaProducer<String, String> producer;
private final String topic;
public Producer(String topic, String[] args) {
//......
//......
producer = new KafkaProducer<>(props);
this.topic = topic;
}
public void producerMsg() throws InterruptedException {
String data = "Apache Storm is a free and open source distributed";
data = data.replaceAll("[\\pP‘’“”]", "");
String[] words = data.split(" ");
Random _rand = new Random();
Random rnd = new Random();
int events = 10;
for (long nEvents = 0; nEvents < events; nEvents++) {
long runtime = new Date().getTime();
int lastIPnum = rnd.nextInt(255);
String ip = "192.168.2." + lastIPnum;
String msg = words[_rand.nextInt(words.length)];
try {
producer.send(new ProducerRecord<>(topic, ip, msg));
System.out.println("Sent message: (" + ip + ", " + msg + ")");
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
Producer producer = new Producer(Constants.TOPIC, args);
producer.producerMsg();
//If I write Thread.sleep(1000),It will not work!!!!!!!!!!!!!!!!!!!!
Thread.sleep(2000);
}
}
我很感激
你能展示一下你用来配置制作人的道具吗?我只是猜测有可能...
在producerMsg()中,您使用的是异步方式来使用生产者,所以只有producer.send(),这意味着消息被放在一个内部缓冲区中,用于生成稍后发送的批处理。生产者有一个内部线程可以从缓冲区获取并发送批处理。也许只有1000 ms不足以达到这样的条件:生产者真正发送消息(请参见batch.size和linger.ms),主应用程序结束,生产者在没有发送消息的情况下死亡。给它更多的时间(2000毫秒),它可以工作。顺便说一句,我没试过密码。
所以原因似乎是你的:
我有一个两节点的Kafka集群(EC2实例),其中每个节点用作单独的代理。当我使用以下命令在leader实例上运行生成器时: 用列出主题表明主题存在。 主题的说明: 退货 谁能帮忙吗?
我正在使用java API实现apache kafka producer。Apache Kafka安装在localhost上。Zookeeper也在运行,但Producer.send()函数仍然卡在发送消息上,消息没有发布。 我已经创建了“快速消息”主题。
问题内容: 在我的应用程序中,我使用ScheduledExecutorService,但仅产生一个线程来处理计划的任务。这是因为ScheduledExecutorService不会生成线程来处理待处理的任务吗? 这是一个代码片段,将仅输出“ run()1”,而不是预期的“ run()1”,后跟“ run()2” …“ run()10”。 问题答案: 只有一个线程,因为您使用创建线程池,这意味着该线
环境:Spring Boot 2.2.6 启动程序:spring-boot-starter-data-jpa,*-thymeleaf,*-web,*-Tomcat,*-test 其他依赖项:mariadb-java-client 2.6.0,spring-boot-devtools(管理),lombok(管理) DB mariadb:10.4 Hibernate方言:mariadb103 使用内部
我有一个关于在Java中如何工作的基本问题。 很难看出简单创建并行执行某些任务和将每个任务分配给之间的区别。 看起来使用起来也非常简单和高效,所以我想知道为什么我们不一直使用它。 这只是一种方式比另一种方式执行工作更快的问题吗? 这里有两个非常简单的例子来说明这两种方式之间的区别: 使用executor服务:计数器(任务) 使用executor服务:(创建、提交)
我试图从这个地址: 服务器转移。co/api/v1 使用OKHttp库。运行以下代码后: 我收到以下警告,我无法解决。 JAVA网UnknownHostException:无法解析主机“server staging.co/api/v1”:没有与主机名关联的地址