当前位置: 首页 > 面试题库 >

线程锁内的多线程

吴峰
2023-03-14
问题内容

我正在努力加快某些过程的执行速度,这些过程将大量记录(大多数是几百万个)发布到Elasticsearch。在我的C#代码中,我已经使用Dataflow实现了一个多线程解决方案,如下所示:

var fetchRecords = new TransformBlock<?, ?>(() => { ... });
var sendRecordsToElastic = new ActionBlock<List<?>>(records => sendBulkRequest(records));

fetchRecords.LinkTo(sendRecordsToElastic, { PropogateCompletion = true });

fetchRecords.Post("Start");

然后我要实现的发送批量请求调用:

public IBulkResponse sendBulkRequest(List<?> records)
{
    lock(SomeStaticObject)
    {
       // Execute several new threads to send records in bulk
    }
}

我的问题 ,你 是对的实用性存在的数据流管道的一部分的锁内执行额外的线程。

这个可以吗?我可以在性能,执行,缓存/内存丢失等方面看到任何潜在的问题吗?

任何见识都会很高兴地被接受。


问题答案:

您可能要在BulkAll这里使用,它实现了可观察的模式,以向Elasticsearch发出并发批量请求。这是一个例子

void Main()
{   
    var pool = new SingleNodeConnectionPool(new Uri("http://localhost:9200"));
    var connectionSettings = new ConnectionSettings(pool);

    var client = new ElasticClient(connectionSettings);
    var indexName = "bulk-index";

    if (client.IndexExists(indexName).Exists)
        client.DeleteIndex(indexName);

    client.CreateIndex(indexName, c => c
        .Settings(s => s
            .NumberOfShards(3)
            .NumberOfReplicas(0)
        )
        .Mappings(m => m
            .Map<DeviceStatus>(p => p.AutoMap())
        )
    );

    var size = 500;

    // set up the observable
    var bulkAllObservable = client.BulkAll(GetDeviceStatus(), b => b
        .Index(indexName)
        .MaxDegreeOfParallelism(4)
        .RefreshOnCompleted()
        .Size(size)
    );

    var countdownEvent = new CountdownEvent(1);

    Exception exception = null;

    // set up an observer. Delegates passed are:
    // 1. onNext
    // 2. onError
    // 3. onCompleted
    var bulkAllObserver = new BulkAllObserver(
        response => Console.WriteLine($"Indexed {response.Page * size} with {response.Retries} retries"),
        ex => 
        {
            // capture exception for throwing outside Observer.
            // You may decide to do something different here
            exception = ex;
            countdownEvent.Signal();
        },
        () => 
        {
            Console.WriteLine("Finished");
            countdownEvent.Signal();
        });

    // subscribe to the observable          
    bulkAllObservable.Subscribe(bulkAllObserver);

    // wait indefinitely for it to finish. May want to put a
    // max timeout on this  
    countdownEvent.Wait();

    if (exception != null) 
    {
        throw exception;
    }
}

// lazily enumerated collection
private static IEnumerable<DeviceStatus> GetDeviceStatus()
{
    for (var i = 0; i < DocumentCount; i++)
        yield return new DeviceStatus(i); 
}

private const int DocumentCount = 20000;

public class DeviceStatus
{
    public DeviceStatus(int id) => Id = id;
    public int Id {get;set;}
}

如果您不需要在观察者中做任何特别的事情,可以.Wait()在可观察对象上使用

var bulkAllObservable = client.BulkAll(GetDeviceStatus(), b => b
    .Index(indexName)
    .MaxDegreeOfParallelism(4)
    .RefreshOnCompleted()
    .Size(size)
)
.Wait(
    TimeSpan.FromHours(1), 
    response => Console.WriteLine($"Indexed {response.Page * size} with {response.Retries} retries")
);

有可观察的方法BulkAllScrollAllReindex(虽然有ReindexOnServer内Elasticsearch其重新索引和映射到所述重新索引API -的Reindex方法早此)



 类似资料:
  • 笔记摘要 这里介绍了java5中的线程锁技术:Lock和Condition,实现线程间的通信,其中的读锁和写锁的使用通过一个缓存系统进行了演示,对于Condition的应用通过一个阻塞队列进行演示。 线程锁技术:Lock & Condition 实现线程同步通信所属包:java.util.concurrent.locks 线程锁 说明 Synchronized 同步方法,锁对象是this;同步静态

  • 我是vert的新手。x、 我在尝试垂直。x“NetServer”功能。http://vertx.io/core_manual_java.html#writing-tcp服务器和客户端,它的工作方式很有魅力。 然而,我也读到“verticle实例严格来说是单线程的。 如果您创建一个简单的TCP服务器并部署它的单个实例,那么该服务器的所有处理程序始终在同一个事件循环(线程)上执行。” 目前,对于我的实

  • 问题内容: 我使用此类来管理与基础SQLiteDatabase的连接 它包含两个锁,一个用于读取,第二个用于写入。但是我仍然偶尔会遇到这种例外情况: 这意味着,在尝试获取getWritableDatabase中的锁定时,数据库以某种方式被锁定。 我的SQLiteOpenHelper是单例模式,并且DataSources仅使用BasicDataSource作为父类。 我可以做些什么来避免在显示的代码

  • 我有一个类似的问题,但是我知道当我要求阅读一行时,发件人应该发送一个行尾。 让我困惑的是,在调试中,它是有效的。可能是因为我在调试时跳过的顺序(直到现在我都不知道这会有什么不同),但我想更好地理解它。 我已经使用线程,但不是很多。 这是我的服务器类: 线程(基于此) 和客户: 它似乎在某个地方进入了死锁,出于某种原因,除非在调试中运行,否则永远不要在向客户端发送数据的服务器类上输入该死锁 (顺便说

  • 我的问题是。。。为什么选择completionLock。run方法中的lock()未锁定资源。当我在系统中运行程序时。出来println(Thread.currentThread())。getName()) 我得到以下输出:Thread-1 Thread-0 Thread-0 Thread-1 NoLock ATM:130 Locked ATM:160应该是:160程序终止。 还有什么是等到完成才

  • 我试图理解java中的公平锁,并从中执行了一个实现 http://tutorials.jenkov.com/java-concurrency/starvation-and-fairness.html 哪个很好 代码如下所示 队列对象的代码 我了解其中的大部分,但我有两个疑问 1)在这一行代码中 这个零件是做什么用的? 它有什么作用?因为我删除了这部分代码,得到了相同的正确结果。 2) 因为我相信我