当前位置: 首页 > 知识库问答 >
问题:

Scala Futures:非确定性输出

单于淇
2023-03-14

我是Scala的新手,我正在通过创建一些重试方案来练习Futures lib。这样做我得到了以下代码:

import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

object Retries extends App {

  var retries = 0

  def resetRetries(): Unit = retries = 0

  def calc() = if (retries > 3) 10 else {
    retries += 1
    println(s"I am thread ${Thread.currentThread().getId} This is going to fail. Retry count $retries")
    throw new IllegalArgumentException("This failed")
  }

  def fCalc(): Future[Int] = Future(calc())

  resetRetries()

  val ff = fCalc() // 0 - should fail
    .fallbackTo(fCalc()) // 1 - should fail
    .fallbackTo(fCalc()) // 2 - should fail
    .fallbackTo(fCalc()) // 3 - should fail
    .fallbackTo(fCalc()) // 4 - should be a success

  Await.ready(ff, 10.second)

  println(ff.isCompleted)
  println(ff.value)
}

每次运行这段代码,我都会得到不同的结果。我得到的结果示例如下

产出1

I am thread 12 This is going to fail. Retry count 1
I am thread 14 This is going to fail. Retry count 3
I am thread 13 This is going to fail. Retry count 2
I am thread 11 This is going to fail. Retry count 1
I am thread 12 This is going to fail. Retry count 4
true
Some(Failure(java.lang.IllegalArgumentException: This failed))

产出2

I am thread 12 This is going to fail. Retry count 2
I am thread 11 This is going to fail. Retry count 1
I am thread 13 This is going to fail. Retry count 3
I am thread 14 This is going to fail. Retry count 4
true
Some(Success(10))

欧普特 3

I am thread 12 This is going to fail. Retry count 1
I am thread 11 This is going to fail. Retry count 1
I am thread 12 This is going to fail. Retry count 2
I am thread 12 This is going to fail. Retry count 3
I am thread 12 This is going to fail. Retry count 4
true
Some(Failure(java.lang.IllegalArgumentException: This failed))

结果并不总是在成功和失败之间交替。在成功出现之前,可能会有不止几次失败的运行。

据我所知,应该只有4个“我是线程x”的日志这将失败。重试计数 x“,这些应如下所示:

I am thread a This is going to fail. Retry count 1
I am thread b This is going to fail. Retry count 2
I am thread c This is going to fail. Retry count 3
I am thread d This is going to fail. Retry count 4

不一定是按这个顺序——因为我不知道Scala线程模型是如何准确工作的——但是你明白我的意思。尽管如此,我还是得到了这个我无法掌握的非确定性输出with.So....我的问题是:这个非确定性输出来自哪里?

我想提及的是,以下重试机制始终产生相同的结果:

import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

object Retries extends App {

  var retries = 0

  def resetRetries(): Unit = retries = 0

  def calc() = if (retries > 3) 10 else {
    retries += 1
    println(s"I am thread ${Thread.currentThread().getId} This is going to fail. Retry count $retries")
    throw new IllegalArgumentException("This failed")
  }

  def retry[T](op: => T)(retries: Int): Future[T] = Future(op) recoverWith { case _ if retries > 0 => retry(op)(retries - 1) }

  resetRetries()
  val retriableFuture: Future[Future[Int]] = retry(calc())(5)
  Await.ready(retriableFuture, 10 second)

  println(retriableFuture.isCompleted)
  println(retriableFuture.value)
}

输出

I am thread 11 This is going to fail. Retry count 1
I am thread 12 This is going to fail. Retry count 2
I am thread 11 This is going to fail. Retry count 3
I am thread 12 This is going to fail. Retry count 4
true
Some(Success(10))

虽然如果我减少重试次数(重试(calc())(3)),结果是预期的失败的未来

I am thread 11 This is going to fail. Retry count 1
I am thread 12 This is going to fail. Retry count 2
I am thread 11 This is going to fail. Retry count 3
I am thread 12 This is going to fail. Retry count 4
true
Some(Failure(java.lang.IllegalArgumentException: This failed))

共有3个答案

孟杰
2023-03-14

这最终对我起了作用:

(以下< code>calc()方法的代码充分解决了关于日志复制和未来不确定性结果的问题)

var time = 0
  var resetTries = time = 0

  def calc() = this.synchronized {
    if (time > 3) 10 else {
      time += 1
      println(s"I am thread ${Thread.currentThread().getId} This is going to fail. Retry count $time") // For debugging purposes
      throw new IllegalStateException(("not yet"))
    }
  }

不需要< code > atomic integer ——在我看来,这让事情变得更加复杂。需要一个< code >同步的包装器。

我必须强调的事实是,这只是出于演示的目的,在生产代码中使用这样的设计可能不是最好的主意(阻止对< code>calc方法的调用)。应该改用< code>recoverWith实现。

感谢@SergGr、@Tim和@MichalPolitowksi的帮助

阎善
2023-03-14
匿名用户

这不是Scala问题,而是一个更一般的多线程问题,其值为重试。您有多个线程在没有任何同步的情况下读取和写入此值,因此您无法预测每个线程何时运行或将看到什么值。

看起来具体问题是您正在测试重试,然后稍后更新它。所有四个线程都可能在更新值之前对其进行测试。在这种情况下,他们都将看到0并抛出错误。

解决方案是将< code>retries转换为< code>AtomicInteger并使用< code>getAndIncrement。这将自动检索该值并递增,因此每个线程都将看到适当的值。

更新以下评论:另一个答案已经解释了为什么同时启动多个线程,所以我不会在这里重复。当多个线程并行运行时,日志记录的顺序始终是不确定的。

瞿文柏
2023-03-14

虽然从技术上讲@Tim是正确的,但我不认为他真的回答了这个问题。

我相信你困惑的真正根源是你对结构的误解:

f.fallbackTo(Future(calc()))

是的。以及它与

f.recoverWith({ case _ => Future(calc())})

有两个重要的区别:

>

  • 回退到的情况下,立即创建未来(calc()),因此(几乎)立即开始执行calc()。因此,原始未来和回退未来同时运行。在使用恢复的情况下,只有在原始未来失败后才创建回退未来。这种差异会影响日志记录顺序。此外,这意味着对var重试的访问是并发的,因此您可能会看到所有线程实际失败的情况,因为对retries的一些更新丢失。

    另一个棘手的一点是(突出显示是我的)

    创建一个新的未来,如果该未来已成功完成,则保留该未来的结果;如果没有,则保存该未来成功完成,则保存该未来的结果。如果两个期货都失败了,那么最终的未来将持有第一个未来的可抛出对象。

    这种差异不会真正影响您的示例,因为您在所有失败尝试中抛出的异常是相同的,但如果它们不同,它可能会影响结果。例如,如果您将代码修改为:

      def calc(attempt: Int) = if (retries > 3) 10 else {
        retries += 1
        println(s"I am thread ${Thread.currentThread().getId} This is going to fail. Retry count $retries")
        throw new IllegalArgumentException(s"This failed $attempt")
      }
    
      def fCalc(attempt: Int): Future[Int] = Future(calc(attempt))
    
      val ff = fCalc(1) // 0 - should fail
          .fallbackTo(fCalc(2)) // 1 - should fail
          .fallbackTo(fCalc(3)) // 2 - should fail
          .fallbackTo(fCalc(4)) // 3 - should fail
          .fallbackTo(fCalc(5)) // 4 - should be a success
    

    那么您应该会得到这两个结果中的任何一个

    Some(Failure(java.lang.IllegalArgumentException: This failed 1))
    Some(Success(10))
    

    并且从来没有任何其他“失败”值。

    请注意,在这里,我显式地通过了在重试时不命中争用条件的尝试

    更多评论的回答 (1月 28)

    在我的前一个示例中,我显式传递<code>尝试</code>的原因是,这是确保逻辑上第一个<code>calc</code>创建的<code>IllegalArgumentException</code>在所有(甚至不是非常现实的)线程调度下都将<code>1</code>作为其值的最简单方法。

    如果您只是想让所有日志都有不同的值,有一个更简单的方法:使用局部变量!

      def calc() = {
        val retries = atomicRetries.getAndIncrement()
        if (retries > 3) 10 
        else {
          println(s"I am thread ${Thread.currentThread().getId} This is going to fail. Retry count $retries")
          throw new IllegalArgumentException(s"This failed $retries")
        }
      }
    

    这样你就避免了经典的托克托问题。

  •  类似资料:
    • 介绍 非确定性是一种通过仅定义问题来解决问题的算法。非确定性程序自动选择符合条件的选项。这项技术很适合逻辑编程。 例如,以下代码返回一对数,其和是一个质数。其中一个数从'(4 6 7)选取,另一个从'(5 8 11)选取。 (let ((i (amb 4 6 7)) (j (amb 5 8 11))) (if (prime? (+ i j)) (list i j)

    • 在形式语言的乔姆斯基分类中,我需要一些非线性、无歧义和非确定性上下文自由语言(N-CFL)的例子? > 线性语言:线性语法是可能的(⊆ 例如 L1={anbn|n≥ 0 } 确定性上下文自由语言(D-CFG):可以使用确定性推下自动机(D-PDA),例如 L2={anbncm|n≥0,m≥0} L2是明确的。 非线性的CF语法是非线性的 Lnl={w:na(w)=nb(w)}也是一个非线性CFG。

    • 程序设计语言让我们得以从烦冗的细节中脱身而出。Lisp 是一门优秀的语言,其原因在于它本身就帮我们处理如此之多的细枝末节,同时程序员对复杂问题的容忍是有限度的,而 Lisp 让程序员能从他们有限的耐受度中发掘出最大的潜力。 本章将会解说宏是怎么样帮助 Lisp 解决另一类重要的细节问题的:即,将非确定性算法转换为确定性算法的问题。 本章共分为五个部分。 第一部分 阐述了什么是非确定性。 第二部分

    • 本文向大家介绍确定性和非确定性算法之间的差异,包括了确定性和非确定性算法之间的差异的使用技巧和注意事项,需要的朋友参考一下 在编程的上下文中,“算法”是依次执行一组明确定义的指令,以执行特定任务并获得所需的输出。在这里,我们说一组定义的指令,这意味着如果某个指令以预期的方式执行,那么某个地方的用户就会知道这些指令的结果。 根据有关指令结果的知识,有两种算法,即-确定性算法和非确定性算法。以下是两种

    • 或者也许有更好的框架专门为这类场景设计? 谢了。

    • 我们对特使的电路断路进行的实验表明,结果并不确定。我们尝试使用如下设置故意跳闸电路,证明了这一点: 该服务是一个简单的Web服务器,它返回具有2秒时间延迟的(该时间延迟确保服务器在异步请求之间保持忙碌)。我们特使Sidecar配置的快照显示,我们启用断路(超文本传输协议/1.1),最多1个连接和1个挂起的请求: 接下来,我们通过向服务发送单个请求来测试这一点,它会像预期的那样可靠地响应。 但是,如