当前位置: 首页 > 知识库问答 >
问题:

Scala中线程执行器池的替代

卢鸿彩
2023-03-14
val pathSuffixList = fileStatuses.getOrElse("FileStatus", List[Any]()).asInstanceOf[List[Map[String, Any]]]
  pathSuffixList.foreach(block => {
    ConsumptionExecutor.execute(new Consumption(webHdfsUri,block))
  })
class Consumption(webHdfsUri: String, block:Map[String,Any]) extends Runnable {

      override def run(): Unit = {
        val uriSplit = webHdfsUri.split("\\?")
        val fileOpenUri = uriSplit(0) + "/" + block.getOrElse("pathSuffix", "").toString + "?op=OPEN"
        val inputStream = new URL(fileOpenUri).openStream()
        val datumReader = new GenericDatumReader[Void]()
        val dataStreamReader = new DataFileStream(inputStream, datumReader)
        //        val schema = dataStreamReader.getSchema()
        val dataIterator = dataStreamReader.iterator()
        while (dataIterator.hasNext) {
          println(" data : " + dataStreamReader.next())
        }
      }

    }

ConsumptionExecutor:

object ConsumptionExecutor{

  val counter: AtomicLong = new AtomicLong()

  val executionContext: ExecutorService = Executors.newCachedThreadPool(new ThreadFactory {
    def newThread(r: Runnable): Thread = {
      val thread: Thread = new Thread(r)
      thread.setName("ConsumptionExecutor-" + counter.incrementAndGet())
      thread
    }
  })
  executionContext.asInstanceOf[ThreadPoolExecutor].setMaximumPoolSize(200)

  def execute(trigger: Runnable) {
    executionContext.execute(trigger)
  }

}

然而,我想使用Akka流/Akka Actor,在这里我不需要给出固定的线程池大小,Akka负责所有的事情。我对Akka和流媒体和演员的概念很陌生。有人能给我任何线索,以示例代码的形式,以适合我的用例?提前道谢!

共有1个答案

韦睿
2023-03-14

一个想法是为您从中读取的每个HDFS节点创建ActorPublisher的(子类)实例,然后在流程图中将它们作为多个sourcemerge

类似这样的伪代码,其中省略了ActorPublisher源的详细信息:

val g = PartialFlowGraph { implicit b =>
  import FlowGraphImplicits._
  val in1 = actorSource1
  val in2 = actorSource2
  // etc.

  val out = UndefinedSink[T]
  val merge = Merge[T]

  in1 ~> merge ~> out
  in2 ~> merge
  // etc.
}

对于一个actor源集合来说,这可以改进,只需对它们进行迭代,并为每个actor源添加一个边,但这给出了一个思路。

 类似资料:
  • 我正在使用线程池执行器更改遗留设计。详情如下:- 遗留:-对于遗留设计,在应用程序启动时创建600个线程。和放置在各种池中,然后在需要时提取这些池,并将任务分配给相应的线程。 新:-在新设计中,我将线程池替换为执行器服务 我观察到的是,对于Executor,在启动时不会创建线程。它们是在从客户端激发请求时创建的。因此,与前一个线程相比,在内存中创建的线程要少得多。 但我的问题是,这样做是否正确,因

  • 我使用线程池执行器,将其替换为旧版线程。 我创建了如下执行器: 这里的核心大小是maxpoolsize/5。我已经在应用程序启动时预先启动了所有核心线程,大约160个线程。 在传统设计中,我们创建并启动了大约670个线程。 但关键是,即使在使用Executor并创建和替换遗留设计之后,我们也不会得到更好的结果。 对于结果内存管理,我们使用Top命令来查看内存使用情况。对于时间,我们将System.

  • 我正在编写一个定制的ThreadPoolExecutor,具有以下额外功能:- > 如果有理想的线程,并且随着任务的到来,将该任务分配到队列中,而不是将其添加到队列中。 如果所有线程(最大池大小)都忙,则在新任务到来时,使用RejectionHandler的reject方法将它们添加到队列中 我已经重写了线程池执行程序的java 1.5版本的执行方法。 新守则如下:- 遗留代码如下所示:- 现在正

  • Java SE6文档中的ThreadPoolExecutor类具有以下方法: 返回正在积极执行任务的线程的大致数目。 这里近似和积极执行是什么意思? 在调用之前、期间和之后,是否保证 null 我已经研究了线程池执行器监视需求,以及如何在java中判断线程池中是否有可用的线程,但它们没有回答我的查询。

  • 我需要帮助设计基于多线程的应用程序,包括动态url创建和线程处理。 我在我的应用程序中使用了一个Spring调度器,它每30秒调度一次。从这个调度方法中,我调用了一些基于服务的api,它在循环中,而且我需要每个API有一个线程池执行器,上面有一个线程处理。 由于这个过程是从计划方法开始的,所以每次创建新的线程池时,这就是问题所在。你可以在代码中看到。 我想要的是,如果对于任何一个应用编程接口,如果

  • 我们正在对使用SpringBoot 2.2.2和Spring执行器的应用程序进行性能测试。 我们希望监控: 正在使用多少tomcat线程 有多少tomcat请求正在排队 正在使用多少个ThreadPoolTaskExector线程(我们将@Async与线程池一起用于某些任务) 执行器中是否提供此信息?我看不到需要使用哪些指标。