当前位置: 首页 > 面试题库 >

如果一次执行中发生错误,则关闭多个goroutine

谈灵均
2023-03-14
问题内容

考虑一下这个功能:

func doAllWork() error {
    var wg sync.WaitGroup
    wg.Add(3)
    for i := 0; i < 2; i++ {
        go func() {
            defer wg.Done()
            for j := 0; j < 10; j++ {
                result, err := work(j)
                if err != nil {
                    // can't use `return err` here
                    // what sould I put instead ? 
                    os.Exit(0)
                }
            }
        }()
    }
    wg.Wait()
    return nil
}

在每个goroutine中,该函数work()被调用10次。如果一个调用work()在任何正在运行的goroutine中返回错误,我希望所有goroutine立即停止,并退出程序。可以os.Exit()在这里使用吗?我该如何处理?

编辑 :此问题与如何停止goroutine不同,因为如果一个错误发生,在这里我需要关闭所有goroutine


问题答案:

您可以使用context为此类事情创建的软件包(
“带有截止日期,取消信号…” )。

您创建了一个能够发布带有的取消信号的context.WithCancel()上下文(父上下文可能是所返回的上下文context.Background())。这将为您返回一个cancel()函数,该函数可用于取消(或更准确地
发出 取消意图)给辅助goroutines。
并且在worker
goroutine中,您必须通过检查返回的通道Context.Done()是否关闭来检查是否已经启动了该意图,这是通过尝试从其接收消息(如果关闭该通道将立即进行)最简单的方法。并执行非阻塞检查(因此如果未关闭,则可以继续执行),将select语句与default分支一起使用。

我将使用以下work()实现,该实现模拟10%的失败机会,并模拟1秒的工作:

func work(i int) (int, error) {
    if rand.Intn(100) < 10 { // 10% of failure
        return 0, errors.New("random error")
    }
    time.Sleep(time.Second)
    return 100 + i, nil
}

doAllWork()可能看起来像这样:

func doAllWork() error {
    var wg sync.WaitGroup

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel() // Make sure it's called to release resources even if no errors

    for i := 0; i < 2; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()

            for j := 0; j < 10; j++ {
                // Check if any error occurred in any other gorouties:
                select {
                case <-ctx.Done():
                    return // Error somewhere, terminate
                default: // Default is must to avoid blocking
                }
                result, err := work(j)
                if err != nil {
                    fmt.Printf("Worker #%d during %d, error: %v\n", i, j, err)
                    cancel()
                    return
                }
                fmt.Printf("Worker #%d finished %d, result: %d.\n", i, j, result)
            }
        }(i)
    }
    wg.Wait()

    return ctx.Err()
}

这是可以如何测试的方法:

func main() {
    rand.Seed(time.Now().UnixNano() + 1) // +1 'cause Playground's time is fixed
    fmt.Printf("doAllWork: %v\n", doAllWork())
}

输出(在Go Playground上尝试):

Worker #0 finished 0, result: 100.
Worker #1 finished 0, result: 100.
Worker #1 finished 1, result: 101.
Worker #0 finished 1, result: 101.
Worker #0 finished 2, result: 102.
Worker #1 finished 2, result: 102.
Worker #1 finished 3, result: 103.
Worker #1 during 4, error: random error
Worker #0 finished 3, result: 103.
doAllWork: context canceled

如果没有错误,例如使用以下work()功能时:

func work(i int) (int, error) {
    time.Sleep(time.Second)
    return 100 + i, nil
}

输出就像(在Go Playground上尝试):

Worker #0 finished 0, result: 100.
Worker #1 finished 0, result: 100.
Worker #1 finished 1, result: 101.
Worker #0 finished 1, result: 101.
Worker #0 finished 2, result: 102.
Worker #1 finished 2, result: 102.
Worker #1 finished 3, result: 103.
Worker #0 finished 3, result: 103.
Worker #0 finished 4, result: 104.
Worker #1 finished 4, result: 104.
Worker #1 finished 5, result: 105.
Worker #0 finished 5, result: 105.
Worker #0 finished 6, result: 106.
Worker #1 finished 6, result: 106.
Worker #1 finished 7, result: 107.
Worker #0 finished 7, result: 107.
Worker #0 finished 8, result: 108.
Worker #1 finished 8, result: 108.
Worker #1 finished 9, result: 109.
Worker #0 finished 9, result: 109.
doAllWork: <nil>

笔记:

基本上,我们只是使用了Done()上下文的通道,因此似乎可以轻松地(如果不是更简单)使用done通道而不是Context,关闭通道即可完成cancel()上述解决方案中的工作。

这不是真的。 仅当只有一个goroutine可以关闭通道时才可以使用此方法,但是在我们的情况下,任何工人都可以这样做。
并尝试关闭已经关闭的通道恐慌。因此,您必须确保围绕进行某种同步/排除close(done),这会使它的可读性降低,甚至变得更加复杂。实际上,这恰恰是cancel()函数在后台执行的功能cancel(),使您的代码隐藏/抽象,因此可以多次调用它,以使您的代码/使用起来更简单。

如何从工人那里获取和返还错误?

为此,您可以使用错误通道:

errs := make(chan error, 2) // Buffer for 2 errors

并且在工作人员内部遇到错误时,请在通道上发送该错误,而不是打印该错误:

result, err := work(j)
if err != nil {
    errs <- fmt.Errorf("Worker #%d during %d, error: %v\n", i, j, err)
    cancel()
    return
}

在循环之后,如果有错误,请返回该错误(nil否则):

// Return (first) error, if any:
if ctx.Err() != nil {
    return <-errs
}
return nil

这次输出(在Go Playground上尝试):

Worker #0 finished 0, result: 100.
Worker #1 finished 0, result: 100.
Worker #1 finished 1, result: 101.
Worker #0 finished 1, result: 101.
Worker #0 finished 2, result: 102.
Worker #1 finished 2, result: 102.
Worker #1 finished 3, result: 103.
Worker #0 finished 3, result: 103.
doAllWork: Worker #1 during 4, error: random error

请注意,我使用的缓冲区通道的缓冲区大小等于工作线程数,以确保在该通道上进行发送始终是非阻塞的。这也使您可以接收和处理所有错误,而不仅仅是一个错误(例如第一个错误)。另一种选择是使用缓冲通道仅保留1,并在其上进行非阻塞发送,如下所示:

errs := make(chan error, 1) // Buffered only for the first error

// ...and inside the worker:

result, err := work(j)
if err != nil {
    // Non-blocking send:
    select {
    case errs <- fmt.Errorf("Worker #%d during %d, error: %v\n", i, j, err):
    default:
    }
    cancel()
    return
}


 类似资料:
  • 到目前为止,我们主要通过.await来使用 Futures,它将阻塞当前任务,直到特定的Future完成。但是,真正的异步应用程序,通常需要同时执行几个不同的操作。 Executing Multiple Futures at a Time 在本章中,我们将介绍几种,同时执行多个异步操作的方法: join!:等待全部 Futures 完成 select!:等待几种 Futures 之一,完成 Spa

  • 问题内容: 储存程序 在此存储过程中,我有两个update语句。如果第一次更新成功执行,然后第二次执行。需要进行哪些更改? 问题答案: 在第一个之后,您可以检查受影响的行数。 如果返回的结果是所需的更新数量(可能> 0),则仅触发第二个。 可以通过检查受影响的行数来包围第二个。

  • 如果你在一个for循环内部处理一系列文件,你需要使用defer确保文件在处理完毕后被关闭,例如: for _, file := range files { if f, err = os.Open(file); err != nil { return } // 这是错误的方式,当循环结束时文件没有关闭 defer f.Close() // 对文

  • 如何在一次查询中执行此操作? 表2 ID C1 C2 1 BRAVO Onetwo 2 BRAVO Trenetwur 3 CHARLIE FIVESIX 4 ALPHA Seveneigh 从表1中选择*,其中B1=(从表2中选择C1,其中concat(C2)类似“%netw%”) 这正是我正在努力做的。但是上面的查询仍然不起作用。因此,我更喜欢表1中的行。

  • 当我试图使用命令: 错误:发生了JNI错误,请检查您的安装并在线程“main”java中重试。lang.NoClassDefFoundError:org/apache/http/client/ClientProtocolException 它在eclipse中运行良好。请帮忙。

  • 问题内容: Java:GC在JVM中执行的频率是多少?每一秒?每一分钟?还是随机的,取决于内存大小?我只想有个主意。 谢谢。 问题答案: 这取决于内存使用情况和存储对象的堆。看这个 http://javarevisited.blogspot.com/2011/04/garbage-collection-in- java.html 它不依赖于时间。它仅取决于新的内存要求和可用内存。