当前位置: 首页 > 知识库问答 >
问题:

Java并发:并发添加

易炳
2023-03-14

考虑以下方法:

public void add(final List<ReportingSTG> message) {
        if(stopRequested.get()) {
            synchronized (this) {
                if(stopRequested.get()) {
                    retryQueue.put(message);
                }
            }
        }
        messages.add(message);
        if(messages.size() >= batchSize && waitingThreads.get() == 0) {
            synchronized (this) {
                if(messages.size() >= batchSize && waitingThreads.get() == 0) {
                    final List<List<ReportingSTG>> clone = new ArrayList<List<ReportingSTG>>(messages);
                    messages.clear();
                    if(processors.size()>=numOfProcessors) {
                        waitingThreads.incrementAndGet();
                        waitForProcessor();
                        waitingThreads.decrementAndGet();
                    }                   
                    startProcessor(clone);
                }
            }

        }
    }

尤其是这两条线:

 1:   final List<List<ReportingSTG>> clone = new ArrayList<List<ReportingSTG>>(messages);
 2:   messages.clear();

如果线程A进入synchronized块并获得当前对象的锁,这是否意味着该对象的实例属性的状态不能被synchronized块之外的其他线程改变(当线程A在synchronized块中时)?

例如,线程A执行了第1行-

消息是非静态同步列表

UPD:更新的方法,可能的解决方案:

public void add(final List<ReportingSTG> message) {
    if(stopRequested.get()) {
        synchronized (this) {
            if(stopRequested.get()) {
                retryQueue.put(message);
            }
        }
    }
    while (addLock.get()){
        try {
            Thread.sleep(1);
        } catch (InterruptedException e) {}
    }

    messages.add(message);

    if(messages.size() >= batchSize && waitingThreads.get() == 0) {
        synchronized (this) {
            if(messages.size() >= batchSize && waitingThreads.get() == 0) {

                addLock.set(true);
                final List<List<ReportingSTG>> clone = new ArrayList<List<ReportingSTG>>(messages);
                messages.clear();
                addLock.set(false);

                if(processors.size()>=numOfProcessors) {
                    waitingThreads.incrementAndGet();
                    waitForProcessor();
                    waitingThreads.decrementAndGet();
                }                   
                startProcessor(clone);
            }
        }

    }
}

addLock - AtomicBoolean,默认为false

共有2个答案

经福
2023-03-14

我最近整理了一个 DoubleBufferedList 类。也许使用它可以完全避免您的问题。顾名思义,它实现了双缓冲算法,但用于列表。

这个类允许你有许多生产者线程和许多消费者线程。每个生产者线程可以添加到当前列表。每个消费者线程获取整个当前列表进行处理。

这也不使用锁,只使用原子,因此应该高效运行。

请注意,这大部分是测试代码。您可以删除< code>// TESTING注释之后的所有内容,但是您可能会发现测试的严格性令人欣慰。

public class DoubleBufferedList<T> {
  // Atomic reference so I can atomically swap it through.
  // Mark = true means I am adding to it so unavailable for iteration.
  private AtomicMarkableReference<List<T>> list = new AtomicMarkableReference<>(newList(), false);

  // Factory method to create a new list - may be best to abstract this.
  protected List<T> newList() {
    return new ArrayList<>();
  }

  // Get and replace the current list.
  public List<T> get() {
    // Atomically grab and replace the list with an empty one.
    List<T> empty = newList();
    List<T> it;
    // Replace an unmarked list with an empty one.
    if (!list.compareAndSet(it = list.getReference(), empty, false, false)) {
      // Failed to replace! 
      // It is probably marked as being appended to but may have been replaced by another thread.
      // Return empty and come back again soon.
      return Collections.EMPTY_LIST;
    }
    // Successfull replaced an unmarked list with an empty list!
    return it;
  }

  // Grab and lock the list in preparation for append.
  private List<T> grab() {
    List<T> it;
    // We cannot fail so spin on get and mark.
    while (!list.compareAndSet(it = list.getReference(), it, false, true)) {
      // Spin on mark.
    }
    return it;
  }

  // Release the list.
  private void release(List<T> it) {
    // Unmark it. Should never fail because once marked it will not be replaced.
    if (!list.attemptMark(it, false)) {
      throw new IllegalMonitorStateException("it changed while we were adding to it!");
    }
  }

  // Add an entry to the list.
  public void add(T entry) {
    List<T> it = grab();
    try {
      // Successfully marked! Add my new entry.
      it.add(entry);
    } finally {
      // Always release after a grab.
      release(it);
    }
  }

  // Add many entries to the list.
  public void add(List<T> entries) {
    List<T> it = grab();
    try {
      // Successfully marked! Add my new entries.
      it.addAll(entries);
    } finally {
      // Always release after a grab.
      release(it);
    }
  }

  // Add a number of entries.
  public void add(T... entries) {
    // Make a list of them.
    add(Arrays.asList(entries));
  }
  // TESTING.
  // How many testers to run.
  static final int N = 10;
  // The next one we're waiting for.
  static final AtomicInteger[] seen = new AtomicInteger[N];
  // The ones that arrived out of order.
  static final Set<Widget>[] queued = new ConcurrentSkipListSet[N];

  static {
    // Populate the arrays.
    for (int i = 0; i < N; i++) {
      seen[i] = new AtomicInteger();
      queued[i] = new ConcurrentSkipListSet();
    }
  }

  // Thing that is produced and consumed.
  private static class Widget implements Comparable<Widget> {
    // Who produced it.
    public final int producer;
    // Its sequence number.
    public final int sequence;

    public Widget(int producer, int sequence) {
      this.producer = producer;
      this.sequence = sequence;
    }

    @Override
    public String toString() {
      return producer + "\t" + sequence;
    }

    @Override
    public int compareTo(Widget o) {
      // Sort on producer
      int diff = Integer.compare(producer, o.producer);
      if (diff == 0) {
        // And then sequence
        diff = Integer.compare(sequence, o.sequence);
      }
      return diff;
    }
  }

  // Produces Widgets and feeds them to the supplied DoubleBufferedList.
  private static class TestProducer implements Runnable {
    // The list to feed.
    final DoubleBufferedList<Widget> list;
    // My ID
    final int id;
    // The sequence we're at
    int sequence = 0;
    // Set this at true to stop me.
    public volatile boolean stop = false;

    public TestProducer(DoubleBufferedList<Widget> list, int id) {
      this.list = list;
      this.id = id;
    }

    @Override
    public void run() {
      // Just pump the list.
      while (!stop) {
        list.add(new Widget(id, sequence++));
      }
    }
  }

  // Consumes Widgets from the suplied DoubleBufferedList
  private static class TestConsumer implements Runnable {
    // The list to bleed.
    final DoubleBufferedList<Widget> list;
    // My ID
    final int id;
    // Set this at true to stop me.
    public volatile boolean stop = false;

    public TestConsumer(DoubleBufferedList<Widget> list, int id) {
      this.list = list;
      this.id = id;
    }

    @Override
    public void run() {
      // The list I am working on.
      List<Widget> l = list.get();
      // Stop when stop == true && list is empty
      while (!(stop && l.isEmpty())) {
        // Record all items in list as arrived.
        arrived(l);
        // Grab another list.
        l = list.get();
      }
    }

    private void arrived(List<Widget> l) {
      for (Widget w : l) {
        // Mark each one as arrived.
        arrived(w);
      }
    }

    // A Widget has arrived.
    private static void arrived(Widget w) {
      // Which one is it?
      AtomicInteger n = seen[w.producer];
      // Don't allow multi-access to the same producer data or we'll end up confused.
      synchronized (n) {
        // Is it the next to be seen?
        if (n.compareAndSet(w.sequence, w.sequence + 1)) {
          // It was the one we were waiting for! See if any of the ones in the queue can now be consumed.
          for (Iterator<Widget> i = queued[w.producer].iterator(); i.hasNext();) {
            Widget it = i.next();
            // Is it in sequence?
            if (n.compareAndSet(it.sequence, it.sequence + 1)) {
              // Done with that one too now!
              i.remove();
            } else {
              // Found a gap! Stop now.
              break;
            }
          }
        } else {
          // Out of sequence - Queue it.
          queued[w.producer].add(w);
        }
      }
    }
  }

  // Main tester
  public static void main(String args[]) {
    try {
      System.out.println("DoubleBufferedList:Test");
      // Create my test buffer.
      DoubleBufferedList<Widget> list = new DoubleBufferedList<>();
      // All threads running - Producers then Consumers.
      List<Thread> running = new LinkedList<>();
      // Start some producer tests.
      List<TestProducer> producers = new ArrayList<>();
      for (int i = 0; i < N; i++) {
        TestProducer producer = new TestProducer(list, i);
        Thread t = new Thread(producer);
        t.setName("Producer " + i);
        t.start();
        producers.add(producer);
        running.add(t);
      }

      // Start the same number of consumers.
      List<TestConsumer> consumers = new ArrayList<>();
      for (int i = 0; i < N; i++) {
        TestConsumer consumer = new TestConsumer(list, i);
        Thread t = new Thread(consumer);
        t.setName("Consumer " + i);
        t.start();
        consumers.add(consumer);
        running.add(t);
      }
      // Wait for a while.
      Thread.sleep(5000);
      // Close down all.
      for (TestProducer p : producers) {
        p.stop = true;
      }
      for (TestConsumer c : consumers) {
        c.stop = true;
      }
      // Wait for all to stop.
      for (Thread t : running) {
        System.out.println("Joining " + t.getName());
        t.join();
      }
      // What results did we get?
      for (int i = 0; i < N; i++) {
        // How far did the producer get?
        int gotTo = producers.get(i).sequence;
        // The consumer's state
        int seenTo = seen[i].get();
        Set<Widget> queue = queued[i];
        if (seenTo == gotTo && queue.isEmpty()) {
          System.out.println("Producer " + i + " ok.");
        } else {
          // Different set consumed as produced!
          System.out.println("Producer " + i + " Failed: gotTo=" + gotTo + " seenTo=" + seenTo + " queued=" + queue);
        }
      }
    } catch (InterruptedException ex) {
      ex.printStackTrace();
    }
  }
}
芮立果
2023-03-14

所描述的场景是可能的。即您可能会丢失消息

synchronized关键字确保您永远不会有两个线程同时运行synchronized部分。它不会阻止另一个线程修改在<code>synchronized</code>块内操作的对象(只要其他线程有权访问它们)。

这是一个可能的解决方案,因为它同步添加和清除。

private Object lock = new Object();

public void add(final List<ReportingSTG> message) {
    if(stopRequested.get()) {
        synchronized (this) {
            if(stopRequested.get()) {
                retryQueue.put(message);
            }
        }
    }
    synchronized(lock){
        messages.add(message);
        if(messages.size() >= batchSize && waitingThreads.get() == 0) {
                final List<List<ReportingSTG>> clone = new ArrayList<List<ReportingSTG>>(messages);
                messages.clear();
                if(processors.size()>=numOfProcessors) {
                    waitingThreads.incrementAndGet();
                    waitForProcessor();
                    waitingThreads.decrementAndGet();
                }                   
                startProcessor(clone);
            }
    }
}
 类似资料:
  • 一、使用线程 实现 Runnable 接口 实现 Callable 接口 继承 Thread 类 实现接口 VS 继承 Thread 二、基础线程机制 Executor Daemon sleep() yield() 三、中断 InterruptedException interrupted() Executor 的中断操作 四、互斥同步 synchronized ReentrantLock 比较

  • 一、前言 本部分内容是关于Java并发的一些知识总结,既是学习的难点,同时也是面试中几乎必问的知识点。 面试中可能会问的一些问题: 创建线程的方式 Synchronized/ReentrantLock 生产者/消费者模式 volatile关键字 乐观锁/悲观锁 死锁 了解的并发集合 因此针对以上问题,整理了相关内容。 二、目录 Java创建线程的三种方式 Java线程池 死锁 Synchroniz

  • 问题内容: 本文在这里建议使用“为了使与并发GC并行年轻一代的GC”。 我的困惑是,为了同时启用并行和并发GC,我应该 使用或 同时使用 和 ? 聚苯乙烯 我正在使用JVM 6。 问题答案: 由于链接的文档是针对1.4.2 VM的,因此我假设您正在使用(JVM 5和6的行为有所不同)。 从http://java.sun.com/docs/hotspot/gc1.4.2/ 如果在命令行上使用了-XX

  • 问题内容: 问题发生在 包含该行的代码位于 所有这一切都在里面,这里是一个 当我触摸时,它可能会激活,这将创建另一个具有不同属性的属性,这些属性会从屏幕上掉下来并在不到一秒钟的时间内销毁自己。这是我创建粒子效果的方式。我们可以将其称为“粒子” ,就像构造函数中的参数一样。 一切正常,直到我添加另一个main为止。现在,我同时在屏幕上有两个,如果我触摸最新的,它可以正常工作并启动粒子。 但是,如果我

  • 我需要一个线程安全的并发列表,同时最适合迭代,并且应该返回精确的大小。我想存储物品的拍卖出价。所以我想能够 检索项目的确切出价数量 为项目添加出价 检索给定项目的所有出价。 移除商品出价 我打算把它放在

  • 主要内容:实例是的子类,并且可以额外地调度在给定延迟之后运行的命令,或定期执行。 实例 以下程序显示了基于线程的环境中接口的使用。 这将产生以下结果 -