当前位置: 首页 > 面试题库 >

Java BlockingQueue有批处理吗?

桑宇
2023-03-14
问题内容

我对与Java
BlockingQueue相同的数据结构感兴趣,不同之处在于它必须能够批处理队列中的对象。换句话说,我希望生产者能够将对象放入队列中,但是让使用者阻塞take()直到队列达到一定大小(批处理大小)。

然后,一旦队列达到批量大小,生产者就必须继续阻塞,put()直到消费者消耗完队列中的所有元素为止(在这种情况下,生产者将再次开始生产,而消费者则阻塞直到再次达到该批次)。

是否存在类似的数据结构?还是我应该写(我不介意),如果那里有东西,我只是不想浪费时间。

更新

也许可以澄清一下:

情况将始终如下。可以有多个生产者将项目添加到队列中,但是从队列中提取项目的消费者永远不会超过一个。

现在的问题是,并行和串行存在多个这些设置。换句话说,生产者为多个队列生产商品,而消费者本身也可以是生产者。可以更容易地将其视为生产者,消费者生产者以及最终消费者的有向图。

生产者应该阻塞直到队列为空(@Peter
Lawrey)的原因是,每个队列都将在线程中运行。如果您仅在空间可用时让它们简单地生产,那么您将遇到以下情况:您有太多线程试图一次处理太多东西。

也许将其与执行服务结合起来可以解决问题?


问题答案:

我建议您使用BlockingQueue.drainTo(Collection,int)。您可以将其与take()一起使用,以确保获得最少数量的元素。

使用此方法的优点是您的批量大小随工作负载而动态增长,并且生产者不必在消费者忙时阻塞。即,它针对延迟和吞吐量进行了自我优化。

要完全按照要求实现(我认为这是个坏主意),可以将SynchronousQueue与繁忙的消费线程一起使用。

即消费线程做一个

 list.clear();
 while(list.size() < required) list.add(queue.take());
 // process list.

生产者将在消费者忙时阻塞。



 类似资料:
  • 我正在使用spring批处理使用RepositoryItemReader从postgresql DB读取记录,然后将其写入主题。我看到大约有100万条记录需要处理,但它并没有处理所有的记录。我已经将reader的pageSize设置为10,000并且与提交间隔(块大小)相同

  • 批处理 本书展示的几个例子中,ElasticSearch提供了高效的批量索引数据的功能,用户只需按批量索引的格式组织数据即可。同时,ElasticSearch也为获取数据和搜索数据提供了批处理功能。值得一提的是,该功能使用方式与批量索引类似,只需把多个请求组合到一起,每个请求可以独立指定索引及索引类型。接下来了解这些功能。 MultiGetMultiGet操作允许用户通过_mget端点在单个请求命

  • 我有以下工作要处理在一定的时间间隔或特别的基础上。 作业中的步骤如下: 我也想要用户界面,在那里我可以触发一个特别的基础上的工作,而且我应该能够提供参数从用户界面。 我想用Spring batch来完成这个任务,但它更多的是用于读->处理->写之类的工作。这里,在第一步中,我正在生成由第二步读取的数据。我不确定我是否还可以使用Spring batch来实现这个,或者有更好的方法来实现这个。

  • 问题内容: 我是hibernate的新手,我对hibernate批处理有疑问,我读了一些有关hibernate批处理的教程,他们说 Hibernate将所有持久化的对象缓存在会话级缓存中,最终您的应用程序将在第50,000行附近出现OutOfMemoryException崩溃。如果您将批处理与Hibernate一起使用,则可以解决此问题, 我的疑问是不是要在外部初始化会话,为什么我们不能将其初始化

  • 通过定义节流限制来解决:这里定义的相同:Spring batch Multithreading:节流限制影响 我注意到当我用20k条记录运行批处理时,一些线程已经开始处理,但在10个请求后就停止了。但是,其他线程正在正常处理。你能建议一下问题是什么吗?如果我保持corepoolsize=threadpoolsize=5,那么所有的线程都是正确分布的。

  • 我在表中总共有8条记录,其中6条在spring批处理调用read时可以使用jpareader。现在我将页面大小和块大小设置为1以进行测试。期望作业运行时,它应该进行6次读取调用,然后它应该逐个处理,逐个写入。但实际上发生的是,它只是调用read 4次(从日志中我可以看到这样读取页面0...1)并处理4个,其中一个由于不匹配写入标准而被过滤掉,然后它只是更新了3个记录,作业标记为成功完成。