import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@SuppressWarnings("unchecked")
public class RingBuffer<T> {
private T[] buffer;
// private volatile T[] buffer;
private int readIndex;
private int writeIndex;
private final int capacity;
private AtomicInteger size;
public RingBuffer(int k) {
this.buffer = (T[]) new Object[k];
this.capacity = k;
this.readIndex = 0;
this.writeIndex = 0;
this.size = new AtomicInteger(0);
}
public boolean offer(T value) {
if (isFull()) return false;
buffer[writeIndex] = value;
writeIndex++;
if (writeIndex == capacity) writeIndex -= capacity;
size.getAndIncrement();
return true;
}
public T poll() {
if (isEmpty()) return null;
int index = readIndex;
T x = buffer[index];
readIndex++;
if (readIndex == capacity) readIndex -= capacity;
size.getAndDecrement();
return x;
}
public boolean isEmpty() {
return size.get() == 0;
}
public boolean isFull() {
return size.get() == capacity;
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
test1();
}
}
public static void test1() throws ExecutionException, InterruptedException {
RingBuffer<String> buffer = new RingBuffer<>(1000);
AtomicBoolean writeDone1 = new AtomicBoolean(false);
ExecutorService service = Executors.newFixedThreadPool(2);
ExecutorService service1 = Executors.newFixedThreadPool(2);
Callable<List<String>> cw1 = () -> {
List<String > x = new ArrayList<>();
int count = 0;
for (int i = 0; i < 10000000; i++) {
if (buffer.offer( i+"")) {
count++;
x.add(i+"");
}
}
writeDone1.set(true);
System.out.println("num write " + count);
return x;
};
Callable<List<String>> cr = () -> {
List<String> x = new ArrayList<>();
int count = 0;
while (!writeDone1.get()) {
String data = buffer.poll();
if (data != null) {
x.add(data);
count++;
}
}
while (true) {
String data = buffer.poll();
if (data != null) {
x.add(data);
count++;
} else {
break;
}
}
System.out.println("num read " + count);
return x;
};
Future<List<String >> fw = service.submit(cw1);
Future<List<String>> fr = service1.submit(cr);
List<String> sw = fw.get();
List<String> sr = fr.get();
System.out.println(sw.size());
System.out.println(sr.size());
boolean check = true;
for (int i =0 ; i< sw.size() ; i++){
if (!sw.get(i).equals( sr.get(i))){
check = false;
break;
}
}
System.out.println(check);
service.shutdown();
service1.shutdown();
}
在某个位置写入数组和从同一位置读取数组之间,在edge之前不会发生任何情况。因此,如果您没有任何顺序保证,您的代码将遭受数据竞争。
如果你也允许并发的报价和并发的投票,那么你也有竞争条件。
我已经玩了很长一段时间了。但通常你会使用尾部和头部序列(例如长)。如果你使ringbuffer是2的幂,你可以做一个廉价的mod将序列转换成索引。头部和尾部序列可能是相对昂贵的挥发物(我真的会从这个开始),然后你可以玩轻松的记忆顺序模式。头部和尾部会给你适当的发生在边缘之前,所以不需要对数组做任何特别的事情。通过这种方法,你也可以摆脱“大小”;你可以计算尾巴和头之间的差值来计算大小;size的问题是,它会导致线程读/写到RingBuffer之间的争用。另外,您需要正确地填充头部/尾部字段,以防止错误共享。
我有一个疑问相对于UI线程和其他线程之间的concurrency。 Ui主线程更新不同变量的值:-flol-long-boolean 我还有另一个线程,它读取同一个变量,并用它执行一些逻辑操作(不编辑它的值),然后用这个操作的结果发送本地广播消息。 是否存在并发问题???我必须使用同步方法和原子变量,否则就无关紧要了? 我反思这个问题,因为没有浮点基元的原子变量,也因为我害怕用错误的代码阻止Ui线
我找到了关于线程安全的代码,但它没有来自给出示例的人的任何解释。我想知道为什么如果我不在“count”之前设置“synchronized”变量,那么count值将是非原子的(总是=200是期望的结果)。谢谢
问题内容: 假设我们的应用程序中有一个CountryList对象,该对象应返回国家/地区列表。国家/地区的加载是一项繁重的操作,因此应将列表缓存。 其他要求: CountryList应该是线程安全的 CountryList应该延迟加载(仅按需加载) CountryList应该支持缓存无效 考虑到极少数情况下会使缓存无效,应该优化CountryList 我想出了以下解决方案: 你怎么看待这件事?你看
问题内容: 我从一个非常简单的多线程示例开始。我试图做一个线程安全的计数器。我想创建两个线程,使计数器间歇地增加到1000。以下代码: 据我所知,while循环现在意味着只有第一个线程才能访问计数器,直到达到1000。输出: 我该如何解决?如何获得共享计数器的线程? 问题答案: 两个线程都可以访问您的变量。 您看到的现象称为线程饥饿。输入代码的受保护部分后(很抱歉,我之前错过了它),其他线程将需要
我如何启动两个线程,其中thread1首先执行,thread2在thread1结束时启动,而主方法线程可以在不锁定其他两个线程的情况下继续工作? 我尝试了join(),但是它需要从线程调用,线程必须等待另一个线程,没有办法执行类似thread2.join(thread1)的操作;因此,如果我在main()中调用join,我将有效地停止主线程的执行,而不仅仅是Thread2的执行。 #编辑:为什么我
我看到了不同的PHP二进制文件,比如非线程或线程安全? 这是什么意思? 这些软件包之间有什么区别?