在Java尝试使用一个简单的计数器来实践生产者和消费者。不知道为什么我会在这段代码上得到一个非法的监视器状态异常。
我有计数器rest和计数器Consumer方法,它们在自己的线程中运行。计数器本身是一个静态的int volatile字段。counter类还为您提供了一个锁
如果将wait naotify更改为:
Counter.lock.notify();
Counter.lock.wait();
代码起作用了。dosen't wait()和notify()自动获得锁synchronize is on的引用?
生产者阶层
package multithreading;
public class CounterProducer implements Runnable {
public void run() {
try { incrCounter(); } catch (InterruptedException e) { e.printStackTrace(); }
}
public void incrCounter() throws InterruptedException {
while (true) {
synchronized (Counter.lock) {
if (Counter.counter < 1) {
System.out.println("Counter Reset");
Counter.counter = 10;
notify();
wait();
}
}
}
}
}
消费阶层
package multithreading;
public class CounterConsumer implements Runnable {
public void run() {
try { consumeCounter(); } catch (InterruptedException e) { e.printStackTrace(); }
}
public void consumeCounter() throws InterruptedException {
while (true) {
synchronized (Counter.lock) {
if (Counter.counter > 0) {
System.out.println("Consumed");
Counter.counter--;
notify();
wait();
}
}
}
}
}
The Counter
public class Counter {
public static volatile int counter;
public static final Object lock = new Object();
}
柜台
public class CounterRunner {
public static void main(String[] args) {
Thread con = new Thread(new CounterConsumer());
Thread prod = new Thread(new CounterProducer());
con.start();
prod.start();
}
}
跑者
public class CounterRunner {
public static void main(String[] args) {
Thread con = new Thread(new CounterConsumer());
Thread prod = new Thread(new CounterProducer());
con.start();
prod.start();
}
}
如果将wait naotify更改为以下内容,则代码可以工作:
Counter.lock.notify();
Counter.lock.wait();
每个Java方法要么是某个类的静态方法,要么是某个对象的实例方法。如果您看到一个不包含显式类名或对象引用的方法调用,那么它是对属于this
对象的方法的隐式调用。
也就是说,notify()
的意思与this.notify()
相同,wait()
的意思是this.wait()
。
this
在CounterProducer.incrCounter()
方法中出现时引用CounterConsumer
实例,在CounterConsumer.ConsumeCounter()
方法中出现时引用CounterConsumer
实例。
我的应用程序有一个生产者和一个消费者。我的生产者不定期地生成消息。有时我的队列会是空的,有时我会有一些消息。我想让我的消费者监听队列,当有消息在其中时,接受它并处理这条消息。这个过程可能需要几个小时,如果我的消费者没有完成处理当前消息,我不希望他接受队列中的另一条消息。 我认为AKKA和AWS SQS可以满足我的需求。通过阅读文档和示例,akka-camel似乎可以简化我的工作。 我在github
向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前,使用Ctrl-C关闭zookeeper和kafka服务(这是通过在consumer方法中使用来模拟的)。 发现 在zookeeper和kafka服务被关闭后,消费者继续在控制台上写消息。 问题 我如何使消费者从上次消费的消息的索引+1继续。 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前
本教程演示了如何发送和接收来自Spring Kafka的消息。 首先创建一个能够发送消息给Kafka主题的Spring Kafka Producer。 接下来,我们创建一个Spring Kafka Consumer,它可以收听发送给Kafka主题的消息。使用适当的键/值序列化器和解串器来配置它们。 最后用一个简单的Spring Boot应用程序演示应用程序。 下载并安装Apache Kafka 要
生产者线程与消费者线程使用信号量同步 生产者线程与消费者线程使用信号量同步 源码/* * Copyright (c) 2006-2018, RT-Thread Development Team * * SPDX-License-Identifier: Apache-2.0 * * Change Logs: * Date Author Notes * 2018-08-24 yangjie the f
所谓的生产者消费者模型就是 某个模块(函数)负责生产数据,这些数据由另一个模块来负责处理 一般生产者消费者模型包含三个部分 生产者、缓冲区、消费者 为什么生产者消费者模型要含三个部分?直接生产和消费不行么? 一个案例说明一切 生产者好比现实生活中的某个人 缓冲区好比现实生活中的邮箱 消费者好比现实生活中的邮递员 如果只有生产者和消费者, 那么相当于只有写信的人和邮递员,那么如果将来过去的邮递员离职
我正在处理一个kafka用例,在这个用例中,我需要在生产者和消费者端具有事务性语义...我可以使用kafka transaction API 0.11将事务性消息发布到kafka集群,但在消费者端,我面临着一个问题...我在属性文件中设置了但我不能使用它...我可以看到消息被使用但这不是希望的... 生产者代码 ProducerTX.Properties 消费者 感谢你的帮助..谢谢