当前位置: 首页 > 工具软件 > Scala Slick > 使用案例 >

akka mysql_面对使用Scala Slick MySQL Akka Stream的问题

漆雕正奇
2023-12-01

问题陈述:我们将

MySQL DB表中特定模块的用户的所有传入请求参数作为一行添加(这是一个巨大的数据).现在,我们想要设计一个进程,该进程将从该表中读取每条记录,并通过调用第三方API获得有关该用户请求的更多信息,之后它将把这个返回的元信息放在另一个表中.

目前的尝试:

我正在使用Scala Slick来做到这一点.由于要读取的数据很大,我想一次一行地读取该表并进行处理.我尝试使用光滑的akka​​流,但是我得到’java.util.concurrent.RejectedExecutionException’

以下是我尝试过的粗略逻辑,

implicit val system = ActorSystem("Example")

import system.dispatcher

implicit val materializer = ActorMaterializer()

val future = db.stream(SomeQuery.result)

Source.fromPublisher(future).map(row => {

id = dataEnrichmentAPI.process(row)

}).runForeach(id => println("Processed row : "+ id))

dataEnrichmentAPI.process:此函数进行第三方REST调用,并执行一些数据库查询以获取所需数据.这个数据库查询是使用’db.run’方法完成的,它也会一直等到它完成(使用Await)

例如.,

def process(row: RequestRecord): Int = {

// SomeQuery2 = Check if data is already there in DB

val retId: Seq[Int] = Await.result(db.run(SomeQuery2.result), Duration.Inf)

if(retId.isEmpty){

val metaData = RestCall()

// SomeQuery3 = Store this metaData in DB

Await.result(db.run(SomeQuery3.result), Duration.Inf)

return metaData.id;

}else{

// SomeQuery4 = Get meta data id

return Await.result(db.run(SomeQuery4.result), Duration.Inf)

}

}

我正在使用阻止调用DB的这个异常.我不认为我是否可以摆脱它,因为后续流程需要返回值才能继续.

“阻止呼叫”是否是此异常背后的原因?

解决此类问题的最佳做法是什么?

谢谢.

 类似资料: