当前位置: 首页 > 面试题库 >

Golang并发,处理一批项目

龙欣德
2023-03-14
问题内容

我正在编写一个程序来处理文本文件中的数百万行,500k耗时5秒来验证文件,我想加快速度。

我想遍历所有项目并异步处理它们中的x,然后等待响应以查看是否应该继续。

我已经写了一些伪代码,我不确定我写的内容是否有意义,这看起来似乎很复杂,是否有更简单,更优雅的方法来做到这一点。

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    // Need an object to loop over
    // need a loop to read the response
    items := 100000
    concurrency := 20
    sem := make(chan bool, concurrency)
    returnChan := make(chan error)
    finChan := make(chan bool)

    var wg sync.WaitGroup

    go func() {
        for x := 0; x < items; x++ {
            // loop over all items
            // only do maxitems at a time
            wg.Add(1)
            sem <- true
            go delayFunc(x, sem, returnChan, &wg)
        }
        wg.Wait()
        finChan <- true
    }()

    var err error
    finished := false
    for {
        select {
        case err = <-returnChan:
            if err != nil {
                break
            }
        case _ = <-finChan:
            finished = true
            break
        default:
            continue
        }

        if err != nil || finished == true {
            break
        }
    }
    fmt.Println(err)
}

func delayFunc(x int, sem chan bool, returnChan chan error, wg *sync.WaitGroup) {
    //fmt.Printf("PROCESSING (%v)\n", x)
    time.Sleep(10 * time.Millisecond)
    <-sem // release the lock
    wg.Done()
    if x == 95000 {
        returnChan <- fmt.Errorf("Something not right")
    } else {
        returnChan <- nil
    }
}

问题答案:

您的代码看起来不错,您实现了Go模式中常用的代码。缺点是-
您为每个项目生成工作程序goroutine。廉价地生成goroutine不是免费的。另一种方法是生成N个工人并通过渠道向他们提供物品。像这样

package main
import (
    "fmt"
    "time"
)

func main() {
    items := 100
    concurrency := 10
    in := make(chan int)
    ret := make(chan error)

    for x := 0; x < concurrency; x++ {
        go worker(in, ret)
    }
    go func() {
        for x := 0; x < items; x++ {
            // loop over all items
            in <- x
        }
        close(in)
    }()
    for err := range ret {
        if err != nil {
            fmt.Println(err.Error())
            break
        }
    }
}
func worker(in chan int, returnChan chan error) {
    //fmt.Printf("PROCESSING (%v)\n", x)
    for x := range in {
        if x == 95 {
            returnChan <- fmt.Errorf("Something not right")
        } else {
            returnChan <- nil
        }
        time.Sleep(10 * time.Millisecond)
    }
    returnChan <- fmt.Errorf("The End")
}

操场



 类似资料:
  • 当我使用Spring批处理管理运行长时间运行的批处理作业的多个实例时,它会在达到jobLauncher线程池任务执行程序池大小后阻止其他作业运行。但是从cron中提取多个工作似乎效果不错。下面是作业启动器配置。 Spring批处理管理员Restful API是否使用不同于xml配置中指定的作业启动器?

  • 我们使用Spring Batch进行一些处理,通过Reader读取一些ID,我们希望通过处理器将它们处理为“块”,然后写入多个文件。但是处理器接口一次只允许处理一个项目,我们需要进行批量处理,因为处理器依赖于第三方,不能为每个项目调用服务。 我看到我们可以为“块”中涉及的所有读取器-处理器-写入器创建包装器,以处理列表<>并委托给一些具体的读取器/处理器/写入器。但这对我来说并不是件好事。像这样:

  • 问题内容: 给出以下代码: 我可以假设’dowork’函数将并行执行吗? 这是实现并行性的正确方法,还是对每个goroutine使用通道并将单独的“ workwork”工人分开更好? 问题答案: 关于GOMAXPROCS,您可以在Go 1.5的发行文档中找到: 默认情况下,Go程序在将GOMAXPROCS设置为可用内核数的情况下运行;在以前的版本中,它默认为1。 关于防止main功能立即退出,您可

  • 我目前正在构建一个spring批处理应用程序,其中执行了几个步骤。除了一个,所有的步骤都是简单的tasklet(没有读取器或写入器),它们负责各种任务,如复制文件、发送请求、启动批处理(*.bat)文件等。 大多数步骤应该是串行执行的。在一个特定的步骤中,我希望启动X文件,这些文件最多可以有Y个实例。 null 如果:)我想我必须使用taskExecutor,下面我有一个示例,在这里我开始第一步(

  • 我正在使用JpaPagingItemReaderBuilder查询一个DB,结果被插入到另一个DB中。 查询返回的结果没有任何问题,但我得到了一个错误与读取器的返回,在处理器中,您可以检查我的编码和错误下面。 有谁能给我一点启示吗?为什么我不能处理结果?

  • 我看到公司中的一个应用程序使用 JMS 进行并发调用。该应用程序是在Spring编写的Web服务。这将进行许多外部调用。当收到操作请求时,它会将许多消息发布到JMS队列,MDB处理这些消息。然后,应用程序整理所有响应,将其映射回其数据模型并返回响应。我过去曾使用JMS进行火灾和遗忘调用,但我无法弄清楚应用程序如何等待来自JMS队列的响应。请问您能解释一下是否有办法吗?