本周早些时候,我在这里获得了一些关于Stackoverflow的帮助,这导致了一个生产者/消费者模式的发展,用于加载处理并将大型数据集导入RavenDb。CPU受限任务的并行化与IO受限任务的并行化
我现在希望限制生产商提前准备的工作单元的数量,以管理内存消耗。我已经使用一个基本信号量实现了节流,但在某个点上实现死锁时遇到了问题。
我无法找出导致死锁的原因。以下是代码摘录:
private static void LoadData<TParsedData, TData>(IDataLoader<TParsedData> dataLoader, int batchSize, Action<IndexedBatch<TData>> importProceedure, Func<IEnumerable<TParsedData>, List<TData>> processProceedure)
where TParsedData : class
where TData : class
{
Console.WriteLine(@"Loading {0}...", typeof(TData).ToString());
var batchCounter = 0;
var ist = Stopwatch.StartNew();
var throttler = new SemaphoreSlim(10);
var bc = new BlockingCollection<IndexedBatch<TData>>();
var importTask = Task.Run(() =>
{
bc.GetConsumingEnumerable()
.AsParallel()
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
//or
//.WithDegreeOfParallelism(1)
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
.ForAll(data =>
{
var st = Stopwatch.StartNew();
importProceedure(data);
Console.WriteLine(@"Batch imported {0} in {1} ms", data.Index, st.ElapsedMilliseconds);
throttler.Release();
});
});
var processTask = Task.Run(() =>
{
dataLoader.GetParsedItems()
.Partition(batchSize)
.AsParallel()
.WithDegreeOfParallelism(Environment.ProcessorCount)
//or
//.WithDegreeOfParallelism(1)
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
.ForAll(batch =>
{
throttler.Wait(); //.WaitAsync()
var batchno = ++batchCounter;
var st = Stopwatch.StartNew();
bc.Add(new IndexedBatch<TData>(batchno, processProceedure(batch)));
Console.WriteLine(@"Batch processed {0} in {1} ms", batchno, st.ElapsedMilliseconds);
});
});
processTask.Wait();
bc.CompleteAdding();
importTask.Wait();
Console.WriteLine(nl(1) + @"Loading {0} completed in {1} ms", typeof(TData).ToString(), ist.ElapsedMilliseconds);
}
public class IndexedBatch<TBatch>
where TBatch : class
{
public IndexedBatch(int index, List<TBatch> batch)
{
Index = index;
Batch = batch ?? new List<TBatch>();
}
public int Index { get; set; }
public List<TBatch> Batch { get; set; }
}
这是对LoadData的调用:
LoadData<DataBase, Data>(
DataLoaderFactory.Create<DataBase>(datafilePath),
1024,
(data) =>
{
using (var session = Store.OpenSession())
{
foreach (var i in data.Batch)
{
session.Store(i);
d.TryAdd(i.LongId.GetHashCode(), int.Parse(i.Id.Substring(i.Id.LastIndexOf('/') + 1)));
}
session.SaveChanges();
}
},
(batch) =>
{
return batch.Select(i => new Data()
{
...
}).ToList();
}
);
商店是RavenDb IDocumentStore。DataLoaderFactory为给定数据集构造一个自定义解析器。
你能检查一下你有多少线程吗?可能由于阻塞而耗尽了线程池。如果TPL认为没有线程,您的代码就会死锁,那么它注入的线程比ProcessorCount多。但它只能做到一定限度。
无论如何,在TPL任务中阻塞通常是一个坏主意,因为内置启发式算法最适合非阻塞的东西。
如果没有表示“在这里阻塞!”的大箭头,很难调试死锁。避免在没有调试器的情况下调试代码:BlockingCollection已经可以限制。使用采用int bounded容量
参数的构造函数并消除信号量。解决死锁的可能性非常高。
一、线程间通信的两种方式 1.wait()/notify() Object类中相关的方法有notify方法和wait方法。因为wait和notify方法定义在Object类中,因此会被所有的类所继承。这些方法都是final的,即它们都是不能被重写的,不能通过子类覆写去改变它们的行为。 ①wait()方法: 让当前线程进入等待,并释放锁。 ②wait(long)方法: 让当前线程进入等待,并释放锁,
生产者线程与消费者线程使用信号量同步 生产者线程与消费者线程使用信号量同步 源码/* * Copyright (c) 2006-2018, RT-Thread Development Team * * SPDX-License-Identifier: Apache-2.0 * * Change Logs: * Date Author Notes * 2018-08-24 yangjie the f
我有一个生产者/消费者模式,如下所示 固定数量的生成器线程,每个线程写入它们自己的BlockingQueue,通过执行器调用 单个使用者线程,读取生产者线程 每个生产者都在运行一个数据库查询,并将结果写入其队列。消费者轮询所有生产者队列。目前,如果出现数据库错误,生产者线程就会死掉,然后消费者就会永远停留在产品队列中等待更多的结果。 我应该如何构造它来正确处理catch错误?
本教程演示了如何发送和接收来自Spring Kafka的消息。 首先创建一个能够发送消息给Kafka主题的Spring Kafka Producer。 接下来,我们创建一个Spring Kafka Consumer,它可以收听发送给Kafka主题的消息。使用适当的键/值序列化器和解串器来配置它们。 最后用一个简单的Spring Boot应用程序演示应用程序。 下载并安装Apache Kafka 要
所谓的生产者消费者模型就是 某个模块(函数)负责生产数据,这些数据由另一个模块来负责处理 一般生产者消费者模型包含三个部分 生产者、缓冲区、消费者 为什么生产者消费者模型要含三个部分?直接生产和消费不行么? 一个案例说明一切 生产者好比现实生活中的某个人 缓冲区好比现实生活中的邮箱 消费者好比现实生活中的邮递员 如果只有生产者和消费者, 那么相当于只有写信的人和邮递员,那么如果将来过去的邮递员离职
我有一个消费者作为生产者消费者模式的一部分: 简化: 如果我移除 通过将线程设置为睡眠,CPU使用率攀升到极高的水平(13%),而不是0%。 此外,如果我实例化该类的多个实例,则每个实例的CPU使用率都会以13%的增量攀升。 大约每分钟(可能每30秒)都会向BlockingCollection添加一个新的LogItem,并将适用的消息写入文件。 有没有可能线程以某种方式阻止了其他线程的运行,而系统