当前位置: 首页 > 知识库问答 >
问题:

使用源队列实现线程的akka-http中的连接池安全吗?

濮献
2023-03-14

关于中提到的以下实施:

http://doc.akka.io/docs/akka-http/10.0.5/scala/http/client-side/host-level.html

val poolClientFlow = Http().cachedHostConnectionPool[Promise[HttpResponse]]("akka.io")
val queue =
  Source.queue[(HttpRequest, Promise[HttpResponse])](QueueSize, OverflowStrategy.dropNew)
    .via(poolClientFlow)
    .toMat(Sink.foreach({
      case ((Success(resp), p)) => p.success(resp)
      case ((Failure(e), p))    => p.failure(e)
    }))(Keep.left)
    .run()

从多个线程提供队列http请求是线程安全的吗?如果不是,那么执行这种要求的最佳方法是什么?也许用一个有奉献精神的演员?

共有1个答案

傅奕
2023-03-14

不,它不是线程安全的,根据api文档:sourcequeue,当前源物化到的仅用于单线程。

一个专用的执行元可以很好地工作,但如果可以,使用source.actorref(文档链接)而不是source.queue会更容易。

通常,source.actorref的缺点是缺乏背压,但是当您使用overflowstrategy.dropnew时,很明显您并不期望有背压。因此,您可以使用source.actorref获得相同的行为。

 类似资料:
  • 我必须在一个系统中管理计划的文件复制。文件复制是由用户安排的,我需要限制复制期间使用的系统资源数量。没有定义每次复制可能需要的时间(即,可能计划每15分钟运行一次复制,并且在下一次运行到期时,上一次运行可能仍在运行)。 我有一个调度器,它定期检查到期的文件复制,对于每个文件复制,(1)如果它没有排队也没有运行,就将它添加到阻塞队列中;(2)否则就删除它。 我还有一个线程池,等待直到队列中有复制并执

  • 我要创建一个程序,给定N个线程,这些线程可以在队列中插入或删除一个元素,但是线程访问队列是有条件的: null 我用同步块做的,就像这样: run void很简单,它只是在插入或删除元素时永远循环。 我的问题是,在不使用synchronized的情况下,我如何遵循那个条件? 没有同步块,怎么可能保证互斥呢? 编辑:我不能使用类似于同步的东西(就像锁一样)

  • 问题内容: 如何使用HttpClient创建连接池? 我必须经常连接同一台服务器。值得建立这样一个游泳池吗? 是否可以保持实时连接并将其用于各种请求,如果可以,我该如何做? 我正在使用Apache HTTP Client在Java中进行开发。 问题答案: [假设Java和Apache的HttpClient] 使用ThreadSafeClientConnManager。将单个全局实例传递给每个Htt

  • 我使用Apache HTTP客户端在我的REST API调用到某些Web服务的连池。 奇怪的是,尽管我使用了HTTP连接池,但我的性能并没有任何提高。 我正在使用Apache HTTP客户端连接到我的web服务,其中的代码如下所示: 我使用Spring的来包装使用Spring的

  • 基础的FIFO队列 # queue_fifo.py import queue q = queue.Queue() for i in range(5): q.put(i) while not q.empty(): print(q.get(), end=' ') print() LIFO队列 # queue_lifo.py import queue q = queue.Lif

  • 问题内容: 我已经看到了线程池执行程序的实现及其所提供的拒绝执行策略。但是,我有一个自定义要求- 我想拥有一个回调机制,在该机制中,当达到队列大小限制时,我会收到通知,并说何时队列大小减少到最大允许队列大小的80%。 我觉得可以通过子类化线程池执行程序来实现,但是已经有一个实现的版本吗?我很乐意在需要时提供更多详细信息和我的工作,以便提供清晰的信息。 问题答案: 我希望有一个回调机制,当达到队列大