当前位置: 首页 > 知识库问答 >
问题:

并发、非阻塞、固定大小的列表?

巫朝明
2023-03-14

我希望实现具有以下特征的数据结构:

  • 推送:将元素添加到列表的前面。
  • 读取
  • :读取列表中的所有元素
    < li >固定大小:列表不应超过指定的阈值,如果超过该阈值,它应自动从末尾(最早的项目)截断。这不需要严格执行,但是一旦列表超过阈值,它最终会被截断。 < li >并发安全:该结构应该安全地容纳多个并行推送器和读取器 < li >非阻塞:这是真正的问题。我想使用不带锁的实现。如果可能的话,许多线程应该能够同时推送/读取。一个不太理想但可以接受的选择是实现加锁,但是最小化多个推送器/读取器之间的争用。我熟悉读写锁,但这些锁假设不经常写入,这不是我的用例。
  • 写-读一致性:如果单个线程推送到结构,则紧随其后的读应该包含写入的项。这很好,但我想知道排除这个要求是否会使上述要求更容易实现

我主要是并发数据结构的新手。是否存在此类数据结构的示例?环形缓冲区很有趣,但我认为它们不能是非阻塞的。链表很有前途,但并发安全、非阻塞的要求使实现变得相当复杂。

我找到了一些关于使用原子 CAS(比较和交换)操作实现非阻塞链表的好论文,但固定大小的要求给这些带来了一些麻烦。也许这个想法可以适应固定大小的列表?

不管怎样,我对在Ruby中使用它很感兴趣。我知道MRI有全局解释器锁,这使得它对MRI有点没用,但其他Ruby运行时可以利用这一点,我认为这是一个学习练习,以提高我的并发编程技能。

共有3个答案

晁开宇
2023-03-14

我想我想出了一个满足要求的有趣解决方案。

我们使用链表作为基础,但在顶部添加线程安全和截断。

在推送过程中,我们通过使用比较和设置操作来实现线程安全。只有当另一个线程尚未推进到最后一个已知的列表头时,推进才会成功。如果推送失败,我们只需重试,直到成功。

当第一个节点被推送时,我们将其指定为“修剪节点”。当项目被推到列表中时,该节点被进一步向下推,但是我们维护对它的引用。当列表达到最大容量时,我们设置断开“修剪节点”上的链接,以允许对后面的节点进行垃圾收集。然后,我们将最新的节点设置为“剪枝节点”。这样,列表大小永远不会超过“容量* 2”。

因为它是一个没有任意插入的链表,所以我们得到的读取基本一致,因为列表节点永远不会重新排列。当我们开始阅读列表时,我们取消了对头部的引用。我们从未读取过超过配置容量的元素。如果列表在读取过程中被截断,我们可能无法读取足够的节点(这可以通过在开始枚举时保存修剪节点来缓解,以便在枚举器处于活动状态时仍可以读取修剪的节点)。

我对截断机制感到非常满意,但似乎基于互斥的解决方案的性能可能与 CAS 解决方案一样好,甚至更好。这可能取决于推送操作的争议程度,并且需要进行基准测试。

require 'concurrent-ruby'

class SizedList
  attr_reader :capacity

  class Node
    attr_reader :value
    attr_reader :nxt

    def initialize(value, nxt = nil)
      @value = value
      @nxt = Concurrent::AtomicReference.new(nxt)
      @count = Concurrent::AtomicFixnum.new(0)
    end

    def increment
      @count.increment
    end
  end

  def initialize(capacity)
    @capacity = capacity
    @head = Node.new(nil)
    @prune_node = Concurrent::AtomicReference.new
  end

  def push(element)
    succeeded = false
    node = nil

    # Maybe should just use a mutex for this write instead of CAS
    # It needs to be benchmarked
    until succeeded
      first = @head.nxt.get
      node = Node.new(element, first)
      succeeded = @head.nxt.compare_and_set(first, node)
    end

    # Every N nodes where N=@capacity is designated as the "prune node"
    # Once we push N times, we drop all the nodes after the prune node by setting
    # it's nxt value to nil.
    # Then we set the first node as the new prune node
    @prune_node.compare_and_set(nil, node) if @prune_node.get.nil?
    prune_node = @prune_node.get
    count = prune_node.increment
    if count >= @capacity
      if @prune_node.compare_and_set(prune_node, node)
        prune_node.nxt.set(nil)
      end
    end

    nil
  end

  def each(&block)
    enum = Enumerator.new do |yielder|
      current = @head
      # Here we just iterate through the list, but limit the results to @capacity
      @capacity.times do
        current = current.nxt.get
        break if current == nil
        yielder.yield(current.value)
      end
    end

    block ? enum.each(&block) : enum
  end
end
姜景焕
2023-03-14

您可以考虑创建如下所示的类。我认为这还不完整。此外,我还没有考虑非阻塞问题,这是一个广泛的话题,并不特定于此类。

class TruncatedList
  attr_reader :max_size
  alias to_s inspect
  
  def initialize(max_size=Float::INFINITY)
    @max_size = max_size
    @list = []
  end
  def pop(n=1)
    return nil if @list.empty?
    case n
    when 0
      nil
    when 1
      @list.pop
    else
      @list.pop([n, @list.size].min)
    end
  end

  def >>(obj)
    @list.pop if @list.size == @max_size
    @list.unshift(obj)
  end
   
  def unshift(*arr)
    arr.each do |obj|
      @list.pop if @list.size == @max_size
      @list >> obj
    end
  end
  def <<(obj)
    if @list.size < @max_size
      @list << obj
    else
      @list
    end
  end     

  def push(*arr)
    arr.each do |obj|
      break(@list) if @list.size == @max_size
      @list << obj
    end
  end

  def shift(n=1)
    return @list if @list.empty?
    case n
    when 0
      nil
    when 1
      @list.shift
    else
      @list.shift([n, @list.size].min)
    end
  end
  def pop(n=1)
    return nil if @list.empty?
    case n
    when 0
      nil
    when 1
      @list.pop
    else
      @list.pop([n, @list.size].min)
    end
  end

  def inspect
    @list.to_str
  end

  def to_a
    @list
  end

  def size
    @list.size 
  end
end

下面是一个如何使用此集合的示例。

t = TruncatedList.new(6)
  #=> #<TruncatedList:0x00007fe2db0512a0 @max_size=6, @list=[]>

t.inspect
  #=> "[]"      

t.to_a
  #=> []

t >> 1
  #=> 1

t.inspect
  #=> "[1]"

t.unshift(2,3)
  #=> [2, 3]

t.inspect
  #=> "[3, 2, 1]"   

t.unshift(4,5,6,7,8) 
  #=> [4, 5, 6, 7, 8]

t.inspect
  #=> "[8, 7, 6, 5, 4, 3]"

t.to_a
  #=> [8, 7, 6, 5, 4, 3]
司空均
2023-03-14

这个问题可能更适合软件工程,而不是堆栈溢出,因为它似乎更像是一个设计问题。也就是说,我建议使用线程安全数组,或者如果您无法重新设计应用程序以完全避免单一共享对象,则将资源争用委托给MVCC数据库。

您可以使用Concurrent::Array和#unshift和#pop方法实现线程安全列表或模拟循环缓冲区。您还可以选择将锁定外部化到数据库之类的东西,其中Ruby的GIL在很大程度上与底层队列或锁定机制无关。然而,据我所知,没有办法在Ruby中创建真正的无锁并发访问对象,尽管实现自己的多版本并发控制可能很接近。

唾手可得的成果可能是将您的读写操作外部化到一个支持MVCC的数据库,比如PostgreSQL。如果您不能或不愿意这样做,您可能需要接受应用程序和数据结构的ACID属性和性能特征中固有的权衡。特别是,使用单一共享数据结构是一个设计决策,如果可能的话,您也许应该重新评估。

在开始这条路之前,只需确保您有一个真正的性能问题需要解决。虽然确实存在锁会增加明显开销的情况,但即使混合使用 Ruby 的 GIL,许多实际应用程序也有足够的性能。您的里程肯定会有所不同。

 类似资料:
  • 具有offer和flush的非阻塞并发队列 我需要一个基本上只有2个操作的无界非阻塞并发队列: 提供:在此队列的尾部自动插入指定项; flush:获取队列中在该时刻出现的所有项,并开始按照插入顺序逐一处理它们。更具体地说,必须是原子的只是这个“TakeAll”操作,它将是flush的第一个操作。takeAll之后提供给队列的所有项都将被插入,然后仅由另一个后续刷新处理。 目标是使用者在takeAl

  • 9.7. 示例: 并发的非阻塞缓存 本节中我们会做一个无阻塞的缓存,这种工具可以帮助我们来解决现实世界中并发程序出现但没有现成的库可以解决的问题。这个问题叫作缓存(memoizing)函数(译注:Memoization的定义: memoization 一词是Donald Michie 根据拉丁语memorandum杜撰的一个词。相应的动词、过去分词、ing形式有memoiz、memoized、me

  • 问题内容: 我有一个经典的问题,线程将事件推送到第二个线程的传入队列。仅这次,我对性能非常感兴趣。我要实现的是: 我想要并发访问队列,生产者推送,接收者弹出。 当队列为空时,我希望消费者阻止队列,等待生产者。 我的第一个想法是使用,但是我很快意识到它不是并发的,并且会降低性能。另一方面,我现在使用,但仍要为每个出版物支付/ 的费用。由于使用者在找到空队列时不会阻塞,因此我必须进行同步并处于锁定状态

  • 非阻塞 IO 仅对在 Servlet 和 Filter(2.3.3.3节定义的,“异步处理”)中的异步请求处理和升级处理(2.3.3.5节定义的,“升级处理”)有效。否则,当调用 ServletInputStream.setReadListener 或ServletOutputStream.setWriteListener 方法时将抛出IllegalStateException。为了支持在 Ser

  • Web 容器中的非阻塞请求处理有助于提高对改善 Web 容器可扩展性不断增加的需求,增加 Web 容器可同时处理请求的连接数量。servlet 容器的非阻塞 IO 允许开发人员在数据可用时读取数据或在数据可写时写数据。非阻塞 IO 仅对在 Servlet 和 Filter(2.3.3.3节定义的,“异步处理”)中的异步请求处理和升级处理(2.3.3.5节定义的,“升级处理”)有效。否则,当调用 S

  • 我有一个应用程序,在其中按下开始按钮后,服务将开始轮询几个传感器,每当传感器值发生变化时,将传感器数据存储到某个对象中。每10毫秒,就会发生一次数据库插入,获取对象的当前值并将其存储到数据库中。这会发生30分钟 考虑到插入的速度和持续时间,我想在一个独立于UI线程的线程中运行它,这样导航就不会受到影响。因此,我的服务将通过将数据添加到队列中来为线程提供一些数据,然后另一个线程(消费者)将从队列中取