当前位置: 首页 > 面试题库 >

如何实现基于行的文件内容的并行处理

巫马盛
2023-03-14
问题内容

我正在编写一个POC来处理大约10亿行以上的超大文本文件,并为此进行了尝试。

package main

import (
        "bufio"
        "fmt"
        "log"
        "os"
        "time"
)

func main() {
        start := time.Now()
        file, err := os.Open("dump10.txt")
        if err != nil {
                log.Fatal(err)
        }
        defer file.Close()

        scanner := bufio.NewScanner(file)
        for scanner.Scan() {
                go fmt.Println(scanner.Text())
        }

        if err := scanner.Err(); err != nil {
                log.Fatal(err)
        }
        secs := time.Since(start).Seconds()
        fmt.Printf("Took %.2fs", secs)
}

但是,当运行此命令时,会出现此错误;

紧急:单个文件或套接字上的并发操作过多(最大1048575)

我还没有在网上找到任何可以解决此特定错误的信息。我不确定这是否是文件描述符问题,错误中列出的最大值远高于我的ulimit -n限制500,000。

做这个的最好方式是什么?

不太明显,它fmt.Println是我在处理数据时将调用的实际功能的替代品。


问题答案:

在考虑并行化过程之前,您应该研究输入和计算以确保它有意义。

需要按顺序处理的输入不是很好的匹配,因为并行处理将需要附加的复杂指令来使事物保持顺序,如果这种策略是成功的话,很难预先评估。

同样,为了利用并行化,要运行的计算所花费的时间必须比同步并行任务所需的时间更长。通过批量处理数据可能会超过此成本,但是生成的算法将更加复杂,并会产生其他不良副作用(例如分配)。

否则,请勿并行化。

请参见下面各种计算时间长/短的实现示例及其产生的基准。

结论是,除非您计算出一个长时间运行的异步任务,该任务显然将超过同步成本,否则顺序处理会更快。

main.go

package main

import (
    "bufio"
    "fmt"
    "io"
    "runtime"
    "strings"
    "sync"
    "time"
)

func main() {
    data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
    run_line_short(data, true)
    run_line_long(data, true)
    run_line_short_workers(data, true)
    run_line_long_workers(data, true)
    run_bulk_short(data, true)
    run_bulk_long(data, true)
    run_seq_short(data, true)
    run_seq_long(data, true)
}

func run_line_short(data string, stat bool) {
    if stat {
        s := stats("run_line_short")
        defer s()
    }
    r := strings.NewReader(data)
    err := process(r, line_handler_short)
    if err != nil {
        panic(err)
    }
}
func run_line_long(data string, stat bool) {
    if stat {
        s := stats("run_line_long")
        defer s()
    }
    r := strings.NewReader(data)
    err := process(r, line_handler_long)
    if err != nil {
        panic(err)
    }
}
func run_line_short_workers(data string, stat bool) {
    if stat {
        s := stats("run_line_short_workers")
        defer s()
    }
    r := strings.NewReader(data)
    err := processWorkers(r, line_handler_short)
    if err != nil {
        panic(err)
    }
}
func run_line_long_workers(data string, stat bool) {
    if stat {
        s := stats("run_line_long_workers")
        defer s()
    }
    r := strings.NewReader(data)
    err := processWorkers(r, line_handler_long)
    if err != nil {
        panic(err)
    }
}
func run_bulk_short(data string, stat bool) {
    if stat {
        s := stats("run_bulk_short")
        defer s()
    }
    r := strings.NewReader(data)
    err := processBulk(r, bulk_handler_short)
    if err != nil {
        panic(err)
    }
}
func run_bulk_long(data string, stat bool) {
    if stat {
        s := stats("run_bulk_long")
        defer s()
    }
    r := strings.NewReader(data)
    err := processBulk(r, bulk_handler_long)
    if err != nil {
        panic(err)
    }
}
func run_seq_short(data string, stat bool) {
    if stat {
        s := stats("run_seq_short")
        defer s()
    }
    r := strings.NewReader(data)
    err := processSeq(r, line_handler_short)
    if err != nil {
        panic(err)
    }
}
func run_seq_long(data string, stat bool) {
    if stat {
        s := stats("run_seq_long")
        defer s()
    }
    r := strings.NewReader(data)
    err := processSeq(r, line_handler_long)
    if err != nil {
        panic(err)
    }
}

func line_handler_short(k string) error {
    _ = len(k)
    return nil
}

func line_handler_long(k string) error {
    <-time.After(time.Millisecond * 5)
    _ = len(k)
    return nil
}

func bulk_handler_short(b []string) error {
    for _, k := range b {
        _ = len(k)
    }
    return nil
}

func bulk_handler_long(b []string) error {
    <-time.After(time.Millisecond * 5)
    for _, k := range b {
        _ = len(k)
    }
    return nil
}

func stats(name string) func() {
    fmt.Printf("======================\n")
    fmt.Printf("%v\n", name)
    start := time.Now()
    return func() {
        fmt.Printf("time to run %v\n", time.Since(start))
        var ms runtime.MemStats
        runtime.ReadMemStats(&ms)
        fmt.Printf("Alloc: %d MB, TotalAlloc: %d MB, Sys: %d MB\n",
            ms.Alloc/1024/1024, ms.TotalAlloc/1024/1024, ms.Sys/1024/1024)
        fmt.Printf("Mallocs: %d, Frees: %d\n",
            ms.Mallocs, ms.Frees)
        fmt.Printf("HeapAlloc: %d MB, HeapSys: %d MB, HeapIdle: %d MB\n",
            ms.HeapAlloc/1024/1024, ms.HeapSys/1024/1024, ms.HeapIdle/1024/1024)
        fmt.Printf("HeapObjects: %d\n", ms.HeapObjects)
        fmt.Printf("\n")
    }
}

func process(r io.Reader, h func(string) error) error {
    errs := make(chan error)
    workers := make(chan struct{}, 4)
    var wg sync.WaitGroup
    go func() {
        scanner := bufio.NewScanner(r)
        for scanner.Scan() {
            workers <- struct{}{} // acquire a token
            wg.Add(1)
            go func(line string) {
                defer wg.Done()
                if err := h(line); err != nil {
                    errs <- err
                }
                <-workers
            }(scanner.Text())
        }
        wg.Wait()
        if err := scanner.Err(); err != nil {
            errs <- err
        }
        close(errs)
    }()
    var err error
    for e := range errs {
        if e != nil && err == nil {
            err = e
        }
    }
    return err
}

func processWorkers(r io.Reader, h func(string) error) error {
    errs := make(chan error)
    input := make(chan string)
    y := 4
    var wg sync.WaitGroup
    for i := 0; i < y; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for line := range input {
                if err := h(line); err != nil {
                    errs <- err
                }
            }
        }()
    }
    go func() {
        scanner := bufio.NewScanner(r)
        for scanner.Scan() {
            input <- scanner.Text()
        }
        close(input)
        wg.Wait()
        if err := scanner.Err(); err != nil {
            errs <- err
        }
        close(errs)
    }()
    var err error
    for e := range errs {
        if err == nil && e != nil {
            err = e
        }
    }
    return err
}

func processBulk(r io.Reader, h func([]string) error) error {
    errs := make(chan error)
    input := make(chan []string)
    y := 4
    var wg sync.WaitGroup
    for i := 0; i < y; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for bulk := range input {
                if err := h(bulk); err != nil {
                    errs <- err
                }
            }
        }()
    }
    go func() {
        scanner := bufio.NewScanner(r)
        l := 50
        bulk := make([]string, l)
        i := 0
        for scanner.Scan() {
            text := scanner.Text()
            bulk[i] = text
            i++
            if i == l {
                copied := make([]string, l, l)
                copy(copied, bulk)
                i = 0
                input <- copied
            }
        }
        if i > 0 {
            input <- bulk[:i]
        }
        close(input)
        if err := scanner.Err(); err != nil {
            errs <- err
        }
    }()
    go func() {
        wg.Wait()
        close(errs)
    }()
    var err error
    for e := range errs {
        if err == nil && e != nil {
            err = e
        }
    }
    return err
}

func processSeq(r io.Reader, h func(string) error) error {
    scanner := bufio.NewScanner(r)
    for scanner.Scan() {
        text := scanner.Text()
        if err := h(text); err != nil {
            return err
        }
    }
    return scanner.Err()
}

main_test.go

package main

import (
    "strings"
    "testing"
)

func Benchmark_run_line_short(b *testing.B) {
    data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
    for i := 0; i < b.N; i++ {
        run_line_short(data, false)
    }
}

func Benchmark_run_line_long(b *testing.B) {
    data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
    for i := 0; i < b.N; i++ {
        run_line_long(data, false)
    }
}
func Benchmark_run_line_short_workers(b *testing.B) {
    data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
    for i := 0; i < b.N; i++ {
        run_line_short_workers(data, false)
    }
}
func Benchmark_run_line_long_workers(b *testing.B) {
    data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
    for i := 0; i < b.N; i++ {
        run_line_long_workers(data, false)
    }
}
func Benchmark_run_bulk_short(b *testing.B) {
    data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
    for i := 0; i < b.N; i++ {
        run_bulk_short(data, false)
    }
}
func Benchmark_run_bulk_long(b *testing.B) {
    data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
    for i := 0; i < b.N; i++ {
        run_bulk_long(data, false)
    }
}
func Benchmark_run_seq_short(b *testing.B) {
    data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
    for i := 0; i < b.N; i++ {
        run_seq_short(data, false)
    }
}
func Benchmark_run_seq_long(b *testing.B) {
    data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
    for i := 0; i < b.N; i++ {
        run_seq_long(data, false)
    }
}

结果

$ go run main.go 
======================
run_line_short
time to run 2.747827ms
Alloc: 2 MB, TotalAlloc: 2 MB, Sys: 68 MB
Mallocs: 1378, Frees: 1
HeapAlloc: 2 MB, HeapSys: 63 MB, HeapIdle: 61 MB
HeapObjects: 1377

======================
run_line_long
time to run 1.30987804s
Alloc: 3 MB, TotalAlloc: 3 MB, Sys: 68 MB
Mallocs: 5619, Frees: 5
HeapAlloc: 3 MB, HeapSys: 63 MB, HeapIdle: 59 MB
HeapObjects: 5614

======================
run_line_short_workers
time to run 4.54926ms
Alloc: 1 MB, TotalAlloc: 4 MB, Sys: 68 MB
Mallocs: 6648, Frees: 5743
HeapAlloc: 1 MB, HeapSys: 63 MB, HeapIdle: 61 MB
HeapObjects: 905

======================
run_line_long_workers
time to run 1.29874118s
Alloc: 2 MB, TotalAlloc: 5 MB, Sys: 68 MB
Mallocs: 10670, Frees: 5747
HeapAlloc: 2 MB, HeapSys: 63 MB, HeapIdle: 60 MB
HeapObjects: 4923

======================
run_bulk_short
time to run 1.279059ms
Alloc: 3 MB, TotalAlloc: 6 MB, Sys: 68 MB
Mallocs: 11695, Frees: 5751
HeapAlloc: 3 MB, HeapSys: 63 MB, HeapIdle: 59 MB
HeapObjects: 5944

======================
run_bulk_long
time to run 31.328652ms
Alloc: 1 MB, TotalAlloc: 7 MB, Sys: 68 MB
Mallocs: 12728, Frees: 11361
HeapAlloc: 1 MB, HeapSys: 63 MB, HeapIdle: 61 MB
HeapObjects: 1367

======================
run_seq_short
time to run 956.991µs
Alloc: 3 MB, TotalAlloc: 8 MB, Sys: 68 MB
Mallocs: 13746, Frees: 11160
HeapAlloc: 3 MB, HeapSys: 63 MB, HeapIdle: 59 MB
HeapObjects: 2586

======================
run_seq_long
time to run 5.195705859s
Alloc: 1 MB, TotalAlloc: 9 MB, Sys: 68 MB
Mallocs: 17766, Frees: 15973
HeapAlloc: 1 MB, HeapSys: 63 MB, HeapIdle: 61 MB
HeapObjects: 1793

[mh-cbon@Host-001 bulk] $ go test -bench=. -benchmem -count=4
goos: linux
goarch: amd64
pkg: test/bulk
Benchmark_run_line_short-4                  1000       1750824 ns/op     1029354 B/op       1005 allocs/op
Benchmark_run_line_short-4                  1000       1747408 ns/op     1029348 B/op       1005 allocs/op
Benchmark_run_line_short-4                  1000       1757826 ns/op     1029352 B/op       1005 allocs/op
Benchmark_run_line_short-4                  1000       1758427 ns/op     1029352 B/op       1005 allocs/op
Benchmark_run_line_long-4                      1    1303037704 ns/op     2253776 B/op       4075 allocs/op
Benchmark_run_line_long-4                      1    1305074974 ns/op     2247792 B/op       4032 allocs/op
Benchmark_run_line_long-4                      1    1305353658 ns/op     2246320 B/op       4013 allocs/op
Benchmark_run_line_long-4                      1    1305725817 ns/op     2247792 B/op       4031 allocs/op
Benchmark_run_line_short_workers-4          1000       2148354 ns/op     1029366 B/op       1005 allocs/op
Benchmark_run_line_short_workers-4          1000       2139629 ns/op     1029370 B/op       1005 allocs/op
Benchmark_run_line_short_workers-4          1000       1983352 ns/op     1029359 B/op       1005 allocs/op
Benchmark_run_line_short_workers-4          1000       1909968 ns/op     1029363 B/op       1005 allocs/op
Benchmark_run_line_long_workers-4              1    1298321093 ns/op     2247856 B/op       4013 allocs/op
Benchmark_run_line_long_workers-4              1    1299846127 ns/op     2246384 B/op       4012 allocs/op
Benchmark_run_line_long_workers-4              1    1300003625 ns/op     2246288 B/op       4011 allocs/op
Benchmark_run_line_long_workers-4              1    1302779911 ns/op     2246256 B/op       4011 allocs/op
Benchmark_run_bulk_short-4                  2000        704358 ns/op     1082154 B/op       1011 allocs/op
Benchmark_run_bulk_short-4                  2000        708563 ns/op     1082147 B/op       1011 allocs/op
Benchmark_run_bulk_short-4                  2000        714687 ns/op     1082148 B/op       1011 allocs/op
Benchmark_run_bulk_short-4                  2000        705546 ns/op     1082156 B/op       1011 allocs/op
Benchmark_run_bulk_long-4                     50      31411412 ns/op     1051497 B/op       1088 allocs/op
Benchmark_run_bulk_long-4                     50      31513018 ns/op     1051544 B/op       1088 allocs/op
Benchmark_run_bulk_long-4                     50      31539311 ns/op     1051502 B/op       1088 allocs/op
Benchmark_run_bulk_long-4                     50      31564940 ns/op     1051505 B/op       1088 allocs/op
Benchmark_run_seq_short-4                   2000        574346 ns/op     1028632 B/op       1002 allocs/op
Benchmark_run_seq_short-4                   3000        572857 ns/op     1028464 B/op       1002 allocs/op
Benchmark_run_seq_short-4                   2000        580493 ns/op     1028632 B/op       1002 allocs/op
Benchmark_run_seq_short-4                   3000        572240 ns/op     1028464 B/op       1002 allocs/op
Benchmark_run_seq_long-4                       1    5196313302 ns/op     2245792 B/op       4005 allocs/op
Benchmark_run_seq_long-4                       1    5199995649 ns/op     2245792 B/op       4005 allocs/op
Benchmark_run_seq_long-4                       1    5200460425 ns/op     2245792 B/op       4005 allocs/op
Benchmark_run_seq_long-4                       1    5201080570 ns/op     2245792 B/op       4005 allocs/op
PASS
ok      test/bulk   68.944s

注意:令我惊讶的run_line_short_workers是,它的速度慢于run_line_short,我没有解释该结果,但是使用pprof进行更深入的分析应该可以提供答案。



 类似资料:
  • 本文向大家介绍python实现的用于搜索文件并进行内容替换的类实例,包括了python实现的用于搜索文件并进行内容替换的类实例的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了python实现的用于搜索文件并进行内容替换的类。分享给大家供大家参考。具体实现方法如下: 希望本文所述对大家的Python程序设计有所帮助。

  • 问题内容: 我试图做到这一点,这样我的站点上的mp3可以通过单击鼠标左键来下载,而不必单击鼠标右键并另存为。因此,为此,我必须设置Content- Disposition:附件。这是我的第一个网站,所以我不知道如何实际执行此操作,但是我是在html标记中执行此操作还是在托管网站上以某种方式进行设置? 这是我的标记外观的示例。 问题答案: MP3列表示例: download.php:

  • MaxMesssAgesPerPoll 线程(10) 聚合器 但是我在这里与聚合器没有任何关系,只需要从一个远程位置处理每个文件,处理它,然后将它们放在另一个远程位置。

  • null 问题1:Spark如何并行处理? 我想大部分的执行时间(99%?)上面的解决方案是从USB驱动器中读取1TB文件到Spark集群中。从USB驱动器读取文件是不可并行的。但是在读取整个文件之后,Spark在底层做了什么来并行处理呢? > 有多少节点用于创建DataFrame?(也许只有一个?) 假设Snappy压缩的Parquet文件小10倍,大小=100GB,HDFS块大小=128 MB

  • 考虑一个阶跃豆: 要求:在Reader中,它从文件中读取(Entity1的)记录。在处理器中,它进行处理,在Writer中,它写入数据库。 在TaskExecutor之前,只创建了一个线程,它将在读取器和处理器中循环1000次,如上面的块设置中所定义的。然后它将移动到writer并写入所有1000条记录。它将再次从记录编号1001开始,然后在读取器和处理器中处理另外1000条记录。这是一个同步执行

  • 本文向大家介绍node.js实现逐行读取文件内容的代码,包括了node.js实现逐行读取文件内容的代码的使用技巧和注意事项,需要的朋友参考一下 在此之前先介绍一个逐行读取文件内容NPM:https://github.com/nickewing/line-reader,需要的朋友可以看看。 直接上代码: