下面的代码继续创建线程,即使队列是空的...直到最终发生OutOfMemory异常。如果我用一个常规的foreach替换并行。Foreach,这不会发生。有人知道为什么会发生这种情况吗?
public delegate void DataChangedDelegate(DataItem obj);
public class Consumer
{
public DataChangedDelegate OnCustomerChanged;
public DataChangedDelegate OnOrdersChanged;
private CancellationTokenSource cts;
private CancellationToken ct;
private BlockingCollection<DataItem> queue;
public Consumer(BlockingCollection<DataItem> queue) {
this.queue = queue;
Start();
}
private void Start() {
cts = new CancellationTokenSource();
ct = cts.Token;
Task.Factory.StartNew(() => DoWork(), ct);
}
private void DoWork() {
Parallel.ForEach(queue.GetConsumingPartitioner(), item => {
if (item.DataType == DataTypes.Customer) {
OnCustomerChanged(item);
} else if(item.DataType == DataTypes.Order) {
OnOrdersChanged(item);
}
});
}
}
在任务并行库内部,并行。For和Parallel。Foreach遵循爬山算法来确定操作应使用多少并行性。
或多或少,他们从一个任务开始运行身体,移动到两个,等等,直到达到一个断点,他们需要减少任务数量。
这对于快速完成的方法体非常有效,但如果方法体需要很长时间才能运行,则可能需要很长时间才能意识到需要减少并行量。在此之前,它会继续添加任务,并可能导致计算机崩溃。
我在任务并行库的一位开发人员的演讲中学习了上述内容。
指定MaxDegreeOfParallelism可能是最简单的方法。
生产者/消费者模式主要用于只有一个生产者和一个消费者的情况。
但是,您试图实现的(多个消费者)更适合Worklist模式。以下代码摘自unit2幻灯片“2c-共享内存模式”的幻灯片,来自犹他大学教授的并行编程课程,可在http://ppcp.codeplex.com/下载
BlockingCollection<Item> workList;
CancellationTokenSource cts;
int itemcount
public void Run()
{
int num_workers = 4;
//create worklist, filled with initial work
worklist = new BlockingCollection<Item>(
new ConcurrentQueue<Item>(GetInitialWork()));
cts = new CancellationTokenSource();
itemcount = worklist.Count();
for( int i = 0; i < num_workers; i++)
Task.Factory.StartNew( RunWorker );
}
IEnumberable<Item> GetInitialWork() { ... }
public void RunWorker() {
try {
do {
Item i = worklist.Take( cts.Token );
//blocks until item available or cancelled
Process(i);
//exit loop if no more items left
} while (Interlocked.Decrement( ref itemcount) > 0);
} finally {
if( ! cts.IsCancellationRequested )
cts.Cancel();
}
}
}
public void AddWork( Item item) {
Interlocked.Increment( ref itemcount );
worklist.Add(item);
}
public void Process( Item i )
{
//Do what you want to the work item here.
}
前面的代码允许您将工作列表项添加到队列中,并允许您设置任意数量的worker(在本例中为四个)以将项目从队列中拉出并处理它们。
关于.Net 4.0上的并行性的另一个重要资源是《使用Microsoft.Net进行并行编程》一书,该书可在以下网站免费获得:http://msdn.microsoft.com/en-us/library/ff963553
我认为并行。Foreach()
主要是为了处理有界集合而制作的。它不期望像Get消耗分区器()
返回的集合那样,其中MoveNext()
长时间阻塞。
问题是并行。ForEach()试图找到最佳并行度,因此它启动的任务数量与任务调度程序运行的数量相同。但是任务调度器(TaskScheduler)发现有许多任务需要很长时间才能完成,而且它们什么都没有做(它们会阻塞),因此它会继续启动新的任务。
我认为最好的解决方案是设置MaxDegreeOfParallelism
。
作为替代方案,您可以使用TPL数据流的ActionBlock。这种情况下的主要区别是,当没有要处理的项目时,ActionBlock
不会阻止任何线程,因此线程的数量不会接近极限。
我试图通过拆分列表来检查元素是否在字符串中: `公共类ParallelSearchComment扩展了RecursiveTask{private static final long serialVersionUID=1L; `
我编写了代码示例: 每100毫秒提交一个新任务(总任务量-20)。每个任务持续时间-0.5秒。因此,可以并行执行5个任务,最佳执行时间为:20*100 500=2.5秒,池应创建5个线程 但我的实验显示为9.6秒。我打开jsvisualvm查看池创建了多少线程,我看到只创建了一个线程: 请更正我的线程池配置不正确的地方。
问题内容: 创建线程很昂贵。但是为什么价格昂贵呢?当创建Java线程使创建过程变得昂贵时,究竟发生了什么?我认为该说法是正确的,但是我只是对JVM中的线程创建机制感兴趣。 线程生命周期开销。线程创建和拆除不是免费的。实际开销因平台而异,但是线程创建会花费时间,从而在请求处理中引入延迟,并且需要JVM和OS进行某些处理活动。如果请求是频繁且轻量的(如在大多数服务器应用程序中一样),则为每个请求创建一
问题内容: SQLAlchemy正在生成但未启用Postgresql中列的序列。我怀疑我在引擎设置中可能做错了什么。 使用SQLAlchemy教程(http://docs.sqlalchemy.org/en/rel_0_9/orm/tutorial.html)中的示例: 使用此脚本,将生成下表: 然而,序列 被 创建: SQLAlchemy 0.9.1,Python 2.7.5 +,Postgre
问题内容: 我了解JSON,但不了解JSONP。Wikipedia上有关JSON的文档是JSONP的最高搜索结果。它说: JSONP或“带填充的JSON”是JSON扩展,其中将前缀指定为调用本身的输入参数。 ??什么电话 这对我来说毫无意义。JSON是一种数据格式。没有电话 在第二个搜索结果是由某些人叫雷米,谁写的这个约JSONP: JSONP是脚本标记注入,它将响应从服务器传递到用户指定的函数。
我正在构建一个软件平台,作为概念验证(并确定它需要什么),我正在使用此处列出的我的平台存储库构建一个名为Telegram RP的即时通讯程序:https://github.com/BlueHuskyStudios/Blue-Husky-Software-Platform 现在,我已经决定将特定于JVM的代码与通用代码分开,但在我这样做的第一步,它将无法编译。 我被难住了。我已经三次检查了语言级别的