我正在编写一个程序来处理文本文件中的数百万行,500k耗时5秒来验证文件,我想加快速度。
我想遍历所有项目并异步处理它们中的x,然后等待响应以查看是否应该继续。
我已经写了一些伪代码,我不确定我写的内容是否有意义,这看起来似乎很复杂,是否有更简单,更优雅的方法来做到这一点。
package main
import (
"fmt"
"sync"
"time"
)
func main() {
// Need an object to loop over
// need a loop to read the response
items := 100000
concurrency := 20
sem := make(chan bool, concurrency)
returnChan := make(chan error)
finChan := make(chan bool)
var wg sync.WaitGroup
go func() {
for x := 0; x < items; x++ {
// loop over all items
// only do maxitems at a time
wg.Add(1)
sem <- true
go delayFunc(x, sem, returnChan, &wg)
}
wg.Wait()
finChan <- true
}()
var err error
finished := false
for {
select {
case err = <-returnChan:
if err != nil {
break
}
case _ = <-finChan:
finished = true
break
default:
continue
}
if err != nil || finished == true {
break
}
}
fmt.Println(err)
}
func delayFunc(x int, sem chan bool, returnChan chan error, wg *sync.WaitGroup) {
//fmt.Printf("PROCESSING (%v)\n", x)
time.Sleep(10 * time.Millisecond)
<-sem // release the lock
wg.Done()
if x == 95000 {
returnChan <- fmt.Errorf("Something not right")
} else {
returnChan <- nil
}
}
您的代码看起来不错,您实现了Go模式中常用的代码。缺点是-
您为每个项目生成工作程序goroutine。廉价地生成goroutine不是免费的。另一种方法是生成N个工人并通过渠道向他们提供物品。像这样
package main
import (
"fmt"
"time"
)
func main() {
items := 100
concurrency := 10
in := make(chan int)
ret := make(chan error)
for x := 0; x < concurrency; x++ {
go worker(in, ret)
}
go func() {
for x := 0; x < items; x++ {
// loop over all items
in <- x
}
close(in)
}()
for err := range ret {
if err != nil {
fmt.Println(err.Error())
break
}
}
}
func worker(in chan int, returnChan chan error) {
//fmt.Printf("PROCESSING (%v)\n", x)
for x := range in {
if x == 95 {
returnChan <- fmt.Errorf("Something not right")
} else {
returnChan <- nil
}
time.Sleep(10 * time.Millisecond)
}
returnChan <- fmt.Errorf("The End")
}
操场
当我使用Spring批处理管理运行长时间运行的批处理作业的多个实例时,它会在达到jobLauncher线程池任务执行程序池大小后阻止其他作业运行。但是从cron中提取多个工作似乎效果不错。下面是作业启动器配置。 Spring批处理管理员Restful API是否使用不同于xml配置中指定的作业启动器?
我们使用Spring Batch进行一些处理,通过Reader读取一些ID,我们希望通过处理器将它们处理为“块”,然后写入多个文件。但是处理器接口一次只允许处理一个项目,我们需要进行批量处理,因为处理器依赖于第三方,不能为每个项目调用服务。 我看到我们可以为“块”中涉及的所有读取器-处理器-写入器创建包装器,以处理列表<>并委托给一些具体的读取器/处理器/写入器。但这对我来说并不是件好事。像这样:
问题内容: 给出以下代码: 我可以假设’dowork’函数将并行执行吗? 这是实现并行性的正确方法,还是对每个goroutine使用通道并将单独的“ workwork”工人分开更好? 问题答案: 关于GOMAXPROCS,您可以在Go 1.5的发行文档中找到: 默认情况下,Go程序在将GOMAXPROCS设置为可用内核数的情况下运行;在以前的版本中,它默认为1。 关于防止main功能立即退出,您可
我目前正在构建一个spring批处理应用程序,其中执行了几个步骤。除了一个,所有的步骤都是简单的tasklet(没有读取器或写入器),它们负责各种任务,如复制文件、发送请求、启动批处理(*.bat)文件等。 大多数步骤应该是串行执行的。在一个特定的步骤中,我希望启动X文件,这些文件最多可以有Y个实例。 null 如果:)我想我必须使用taskExecutor,下面我有一个示例,在这里我开始第一步(
我正在使用JpaPagingItemReaderBuilder查询一个DB,结果被插入到另一个DB中。 查询返回的结果没有任何问题,但我得到了一个错误与读取器的返回,在处理器中,您可以检查我的编码和错误下面。 有谁能给我一点启示吗?为什么我不能处理结果?
我看到公司中的一个应用程序使用 JMS 进行并发调用。该应用程序是在Spring编写的Web服务。这将进行许多外部调用。当收到操作请求时,它会将许多消息发布到JMS队列,MDB处理这些消息。然后,应用程序整理所有响应,将其映射回其数据模型并返回响应。我过去曾使用JMS进行火灾和遗忘调用,但我无法弄清楚应用程序如何等待来自JMS队列的响应。请问您能解释一下是否有办法吗?