当前位置: 首页 > 工具软件 > go-mosaic > 使用案例 >

《Go Web 编程》之第9章 发挥Go的并发优势

冀耀
2023-12-01

第9章 发挥Go的并发优势

9.1 并发与并行的区别

并发(concurrency),多个任务同一时间段内启动、运行并结束,任务间可能会互动,使用和分享相同的资源。
并行(parallelism),多个任务同时启动并执行(大任务分割成小任务),需要独立的资源(CPU等),相互平行无重叠。
并发是同时处理多项任务,并行是同时执行多项任务。

9.2 goroutine

goroutine是对线程的复用,关联函数,轻量级,启动时需要很小的栈(按需扩大和缩小)。
goroutine被阻塞时,其所在线程被阻塞,运行时环境(runtime)会将阻塞线程上的其它goroutine移动到其它未阻塞的线程上运行。

9.2.1 使用goroutine

goroutine.go

package main

import (
	"fmt"
	"time"
)

func printNumbers1() {
	for i := 0; i < 10; i++ {
		fmt.Printf("%d ", i)
	}
}

func printLetters1() {
	for i := 'A'; i < 'A'+10; i++ {
		fmt.Printf("%c ", i)
	}
}

func printNumbers2() {
	for i := 0; i < 10; i++ {
		time.Sleep(1 * time.Microsecond)
		fmt.Printf("%d ", i)
	}
}

func printLetters2() {
	for i := 'A'; i < 'A'+10; i++ {
		time.Sleep(1 * time.Microsecond)
		fmt.Printf("%c ", i)
	}
}

func print1() {
	printNumbers1()
	printLetters1()
}

func goPrint1() {
	go printNumbers1()
	go printLetters1()
}

func print2() {
	printNumbers2()
	printLetters2()
}

func goPrint2() {
	go printNumbers2()
	go printLetters2()
}

func main() {
}

goroutine_test.go

package main

import (
	"testing"
	"time"
)

// Test cases

// normal run
func TestPrint1(t *testing.T) {
	print1()
}

// run with goroutines
func TestGoPrint1(t *testing.T) {
	goPrint1()
	time.Sleep(100 * time.Millisecond)
}

func TestPrint2(t *testing.T) {
	print2()
}

// run with goroutines and some work
func TestGoPrint2(t *testing.T) {
	goPrint2()
	time.Sleep(100 * time.Millisecond)
}

// Benchmark cases

/*
// normal run
func BenchmarkPrint1(b *testing.B) {
	for i := 0; i < b.N; i++ {
		print1()
	}
}

// run with goroutines
func BenchmarkGoPrint1(b *testing.B) {
	for i := 0; i < b.N; i++ {
		goPrint1()
	}
}

// run with some work
func BenchmarkPrint2(b *testing.B) {
	for i := 0; i < b.N; i++ {
		print2()
	}
}

// run with goroutines and some work
func BenchmarkGoPrint2(b *testing.B) {
	for i := 0; i < b.N; i++ {
		goPrint2()
	}
}
*/
go test -v
go test -run x -bench . -cpu 1

9.2.2 goroutine与性能

goroutine.go

package main

import "time"

func printNumbers1() {
	for i := 0; i < 10; i++ {
		//fmt.Printf("%d ", i)
	}
}

func printLetters1() {
	for i := 'A'; i < 'A'+10; i++ {
		//fmt.Printf("%c ", i)
	}
}

func printNumbers2() {
	for i := 0; i < 10; i++ {
		time.Sleep(1 * time.Microsecond)
		//fmt.Printf("%d ", i)
	}
}

func printLetters2() {
	for i := 'A'; i < 'A'+10; i++ {
		time.Sleep(1 * time.Microsecond)
		//fmt.Printf("%c ", i)
	}
}

func print1() {
	printNumbers1()
	printLetters1()
}

func goPrint1() {
	go printNumbers1()
	go printLetters1()
}

func print2() {
	printNumbers2()
	printLetters2()
}

func goPrint2() {
	go printNumbers2()
	go printLetters2()
}

func main() {
}

goroutine_test.go

package main

import (
	"testing"
	"time"
)

// Test cases

// normal run
func TestPrint1(t *testing.T) {
	print1()
}

// run with goroutines
func TestGoPrint1(t *testing.T) {
	goPrint1()
	time.Sleep(100 * time.Millisecond)
}

func TestPrint2(t *testing.T) {
	print2()
}

// run with goroutines and some work
func TestGoPrint2(t *testing.T) {
	goPrint2()
	time.Sleep(100 * time.Millisecond)
}

// Benchmark cases


// normal run
func BenchmarkPrint1(b *testing.B) {
	for i := 0; i < b.N; i++ {
		print1()
	}
}

// run with goroutines
func BenchmarkGoPrint1(b *testing.B) {
	for i := 0; i < b.N; i++ {
		goPrint1()
	}
}

// run with some work
func BenchmarkPrint2(b *testing.B) {
	for i := 0; i < b.N; i++ {
		print2()
	}
}

// run with goroutines and some work
func BenchmarkGoPrint2(b *testing.B) {
	for i := 0; i < b.N; i++ {
		goPrint2()
	}
}

go test -run x -bench . -cpu 1
go test -run x -bench . -cpu 2
go test -run x -bench . -cpu 4

启动goroutine有一定代价。
在多个CPU上调度和运行任务需要消耗一定的资源,如果使用多个CPU带来的性能优势不足以抵消随之而来的额外消耗,那么程序的性能就会不升反降。

9.2.3 等待goroutine

等待组(WaitGroup)机制:

  • 声明等待组;
  • Add为等待组计数器设置值;
  • 一个goroutine结束时,Done方法让等待组计数器减一;
  • Wait阻塞,直到等待组计数器为0。
package main

import (
	"fmt"
	"sync"
	"time"
)

func printNumbers2(wg *sync.WaitGroup) {
	defer wg.Done()
	for i := 0; i < 10; i++ {
		time.Sleep(1 * time.Microsecond)
		fmt.Printf("%d ", i)
	}
}

func printLetters2(wg *sync.WaitGroup) {
	defer wg.Done()
	for i := 'A'; i < 'A'+10; i++ {
		time.Sleep(1 * time.Microsecond)
		fmt.Printf("%c ", i)
	}
}

func main() {
	var wg sync.WaitGroup
	wg.Add(2)
	go printNumbers2(&wg)
	go printLetters2(&wg)
	wg.Wait()
}

9.3 通道

channel,带有类型的值(typed value),让goroutine相互通信。

//默认无缓冲通道(unbuffered channel),同步。
ch := make(chan int)

//有缓冲通道,buffered channel
ch := make(chan int, 10)

ch <- 1
i := <-ch

//默认双向(bidirectional),可只读,可只写
ch := make(chan<- string)
ch := make( <-chan string)

9.3.1 通过通道实现同步

channel_wait.go

package main

import (
	"fmt"
	"time"
)

func printNumbers(w chan bool) {
	for i := 0; i < 10; i++ {
		time.Sleep(1 * time.Microsecond)
		fmt.Printf("%d ", i)
	}
	w <- true
}

func printLetters(w chan bool) {
	for i := 'A'; i < 'A'+10; i++ {
		time.Sleep(1 * time.Microsecond)
		fmt.Printf("%c ", i)
	}
	w <- true
}

func main() {
	w1, w2 := make(chan bool), make(chan bool)
	go printNumbers(w1)
	go printLetters(w2)
	<-w1
	<-w2
}

9.3.2 通过通道实现消息传递

channel_message.go

package main

import (
	"fmt"
	"time"
)

func thrower(c chan int) {
	for i := 0; i < 5; i++ {
		c <- i
		fmt.Println("Threw  >>", i)
	}
}

func catcher(c chan int) {
	for i := 0; i < 5; i++ {
		num := <-c
		fmt.Println("Caught <<", num)
	}
}

func main() {
	c := make(chan int)
	go thrower(c)
	go catcher(c)
	time.Sleep(100 * time.Millisecond)
}

9.3.3 有缓冲通道

channel_buffered.go

package main

import (
	"fmt"
	"time"
)

func thrower(c chan int) {
	for i := 0; i < 5; i++ {
		c <- i
		fmt.Println("Threw  >>", i)
	}
}

func catcher(c chan int) {
	for i := 0; i < 5; i++ {
		num := <-c
		fmt.Println("Caught <<", num)
	}
}

func main() {
	c := make(chan int, 3)
	go thrower(c)
	go catcher(c)
	time.Sleep(100 * time.Millisecond)
}

9.3.4 从多个通道中选择

channel_select.go

package main

import (
	"fmt"
	//"time"
)

func callerA(c chan string) {
	c <- "Hello World!"
	close(c)
}

func callerB(c chan string) {
	c <- "Hola Mundo!"
	close(c)
}

func main() {
	a, b := make(chan string), make(chan string)
	go callerA(a)
	go callerB(b)
	var msg string
	openA, openB := true, true
	for openA || openB {
		select {
		case msg, openA = <-a:
			if openA {
				fmt.Printf("%s from A\n", msg)
			}			
		case msg, openB = <-b:
			if openB {
				fmt.Printf("%s from B\n", msg)
			}			
		}
	}
}
/*
func main() {
	a, b := make(chan string), make(chan string)
	go callerA(a)
	go callerB(b)
	msg1, msg2 := "A", "B"
	for {
		time.Sleep(1 * time.Microsecond)

		select {
		case msg1 = <-a:
			fmt.Printf("%s from A\n", msg1)
		case msg2 = <-b:
			fmt.Printf("%s from B\n", msg2)
		default:
			fmt.Println("Default")
		}
		if msg1 == "" && msg2 == "" {
			break
		}

	}
}
*/

9.3.5 通过通道共享数据

多goroutine中,map必须加锁。

channel_shared.go

package main

import (
	"fmt"
	"time"
	"runtime"
)

type Store struct {
	hash map[string]string
	in chan [2]string
}

var DB Store

func StoreInit() {
	DB = Store{
		hash: make(map[string]string),
		in: make(chan [2]string),
	}
	go func() {
		for {
			a := <-DB.in
			DB.hash[a[0]] = a[1]
		}
	}()
}

func (store *Store) Get(key string) (value string, err error) {
	value = store.hash[key]
	return
}

func (store *Store) Add(key string, value string) (err error) {
	a := [2]string{key, value}
	store.in <- a
	return
}

func (store *Store) Set(key string, value string) (err error) {
	return
}

func (store *Store) Del(key string) (err error) {
	return
}

func (store *Store) Pop(key string) (value string, err error) {
	return
}

func main() {
	runtime.GOMAXPROCS(4)
	StoreInit()
	for i := 0; i < 10; i++ {
		go DB.Add("a", "A")
		go DB.Add("a", "B")
		go DB.Add("a", "C")
	
		time.Sleep(1 * time.Microsecond)
	
		s, _ := DB.Get("a")
		fmt.Printf("%s ", s)
	}
}

9.4 Web应用中使用并发

9.4.1 创建马赛克图片

  • mosaic_original.go
package main

import (
	"fmt"
	"image"
	"image/color"
	"io/ioutil"
	"math"
	"os"
)

// 平方
func sq(n float64) float64 {
	return n * n
}

// 欧几里得距离
func distance(p1 [3]float64, p2 [3]float64) float64 {
	return math.Sqrt(sq(p2[0]-p1[0]) + sq(p2[1]-p1[1]) + sq(p2[2]-p1[2]))
}

// 最匹配图像文件
func nearest(target [3]float64, db *map[string][3]float64) string {
	var filename string
	smallest := 1000000.0
	for k, v := range *db {
		if dist := distance(target, v); dist < smallest {
			filename, smallest = k, dist
		}
	}
	delete(*db, filename)
	return filename
}

// 计算图像平均颜色
func averageColor(img image.Image) [3]float64 {
	bounds := img.Bounds()
	width := bounds.Dx()
	height := bounds.Dy()
	totalPixels := float64(width * height)

	r, g, b := 0.0, 0.0, 0.0
	for y := 0; y < height; y++ {
		for x := 0; x < width; x++ {
			r1, g1, b1, _ := img.At(x, y).RGBA()
			r, g, b = r+float64(r1), g+float64(g1), b+float64(b1)
		}
	}
	return [3]float64{r / totalPixels, g / totalPixels, b / totalPixels}
}

// 缩放图像到指定大小
func resize(in image.Image, newWidth int, newHeight int) image.Image {
	bounds := in.Bounds()
	width := bounds.Dx()
	height := bounds.Dy()
	newBounds := image.Rect(0, 0, newWidth, newHeight)
	out := image.NewNRGBA64(newBounds)

	for j := 0; j < newHeight; j++ {
		y := j * (height - 1) / (newHeight - 1)
		for i := 0; i < newWidth; i++ {
			x := i * (width - 1) / (newWidth - 1)
			r, g, b, a := in.At(x, y).RGBA()
			out.SetNRGBA64(i, j, color.NRGBA64{uint16(r), uint16(g), uint16(b), uint16(a)})
		}
	}
	return out.SubImage(newBounds)
}

// 创建瓷砖图像数据库
func tilesDB() map[string][3]float64 {
	fmt.Println("Start populating tiles db ...")
	db := make(map[string][3]float64)
	files, _ := ioutil.ReadDir("tiles")
	for _, f := range files {
		name := "tiles/" + f.Name()
		file, err := os.Open(name)
		if err == nil {
			img, _, err := image.Decode(file)
			if err == nil {
				db[name] = averageColor(img)
			} else {
				fmt.Println("error in populating tiles db:", err, name)
			}
		} else {
			fmt.Println("cannot open file", name, "when populating tiles db:", err)
		}
		file.Close()
	}
	fmt.Println("Finished populating tiles db.")
	return db
}

// 克隆瓷砖图像数据库
var TILESDB map[string][3]float64

func cloneTilesDB() map[string][3]float64 {
	db := make(map[string][3]float64)
	for k, v := range TILESDB {
		db[k] = v
	}
	return db
}

9.4.2 马赛克图片Web应用

main.go

package main

import (
	"bytes"
	"encoding/base64"
	"fmt"
	"html/template"
	"image"
	"image/draw"
	"image/jpeg"
	"net/http"
	"os"
	"strconv"
	"time"
	// "runtime"
)

func main() {
	// runtime.GOMAXPROCS(runtime.NumCPU())
	mux := http.NewServeMux()
	files := http.FileServer(http.Dir("public"))
	mux.Handle("/static/", http.StripPrefix("/static/", files))
	mux.HandleFunc("/", upload)
	mux.HandleFunc("/mosaic", mosaic)
	server := &http.Server{
		Addr:    "127.0.0.1:8080",
		Handler: mux,
	}
	// building up the source tile database
	TILESDB = tilesDB()
	fmt.Println("Mosaic server started.")
	server.ListenAndServe()
}

func upload(w http.ResponseWriter, r *http.Request) {
	t, _ := template.ParseFiles("upload.html")
	t.Execute(w, nil)
}

func mosaic(w http.ResponseWriter, r *http.Request) {
	t0 := time.Now()

	// get the content from the POSTed form
	r.ParseMultipartForm(10485760) // max body in memory is 10MB

	file, _, _ := r.FormFile("image")
	defer file.Close()

	tileSize, _ := strconv.Atoi(r.FormValue("tile_size"))

	// decode and get original image
	original, _, _ := image.Decode(file)
	bounds := original.Bounds()
	width := bounds.Dx()
	height := bounds.Dy()

	// create a new image for the mosaic
	newimage := image.NewNRGBA(image.Rect(0, 0, width, height))

	// build up the tiles database
	db := cloneTilesDB()

	// source point for each tile, which starts with 0, 0 of each tile
	sp := image.Point{0, 0}
	for y := 0; y < height; y += tileSize {
		//for y := 0; y+tileSize < height; y += tileSize {
		for x := 0; x < width; x += tileSize {
			//for x := 0; x+tileSize < width; x += tileSize {
			// use the top left most pixel as the average color
			r, g, b, _ := original.At(x, y).RGBA()
			color := [3]float64{float64(r), float64(g), float64(b)}
			// get the closest tile from the tiles DB
			nearest := nearest(color, &db)
			file, err := os.Open(nearest)
			if err == nil {
				img, _, err := image.Decode(file)
				if err == nil {
					// resize the tile to the correct size
					t := resize(img, tileSize, tileSize)
					tileBounds := image.Rect(x, y, x+tileSize, y+tileSize)
					// draw the tile into the mosaic
					draw.Draw(newimage, tileBounds, t, sp, draw.Src)
				} else {
					fmt.Println("error:", err, nearest)
				}
			} else {
				fmt.Println("error:", nearest)
			}
			file.Close()
		}
	}

	//保存图像
	//outFile, _ := os.Create("gopher3.png")
	//defer outFile.Close()
	//png.Encode(outFile, img)

	buf1 := new(bytes.Buffer)
	jpeg.Encode(buf1, original, nil)
	originalStr := base64.StdEncoding.EncodeToString(buf1.Bytes())

	buf2 := new(bytes.Buffer)
	jpeg.Encode(buf2, newimage, nil)
	mosaic := base64.StdEncoding.EncodeToString(buf2.Bytes())

	t1 := time.Now()
	images := map[string]string{
		"original": originalStr,
		"mosaic":   mosaic,
		"duration": fmt.Sprintf("%v ", t1.Sub(t0)),
	}
	t, _ := template.ParseFiles("results.html")
	t.Execute(w, images)
}

9.4.3 并发版马赛克图片生成Web应用

mosaic_concurrent.go

package main

import (
	"fmt"
	"image"
	"image/color"
	"io/ioutil"
	"math"
	"os"
	"path/filepath"
	"sync"
)

// 平方
func sq(n float64) float64 {
	return n * n
}

// 欧几里得距离
func distance(p1 [3]float64, p2 [3]float64) float64 {
	return math.Sqrt(sq(p2[0]-p1[0]) + sq(p2[1]-p1[1]) + sq(p2[2]-p1[2]))
}

// 计算图像平均颜色
func averageColor(img image.Image) [3]float64 {
	bounds := img.Bounds()
	width := bounds.Dx()
	height := bounds.Dy()
	totalPixels := float64(width * height)

	r, g, b := 0.0, 0.0, 0.0
	for y := 0; y < height; y++ {
		for x := 0; x < width; x++ {
			r1, g1, b1, _ := img.At(x, y).RGBA()
			r, g, b = r+float64(r1), g+float64(g1), b+float64(b1)
		}
	}
	return [3]float64{r / totalPixels, g / totalPixels, b / totalPixels}
}

// 缩放图像到指定大小
func scale(in image.Image, newWidth int, newHeight int) image.Image {
	bounds := in.Bounds()
	width := bounds.Dx()
	height := bounds.Dy()
	newBounds := image.Rect(0, 0, newWidth, newHeight)
	out := image.NewNRGBA64(newBounds)

	for j := 0; j < newHeight; j++ {
		y := j * (height - 1) / (newHeight - 1)
		for i := 0; i < newWidth; i++ {
			x := i * (width - 1) / (newWidth - 1)
			r, g, b, a := in.At(x, y).RGBA()
			out.SetNRGBA64(i, j, color.NRGBA64{uint16(r), uint16(g), uint16(b), uint16(a)})
		}
	}
	return out.SubImage(newBounds)
}

type DB struct {
	mutex *sync.Mutex
	store map[string][3]float64
}

// 最匹配图像文件
func (db *DB) nearest(target [3]float64) string {
	db.mutex.Lock()
	defer db.mutex.Unlock()

	var filename string
	smallest := 1000000.0
	for k, v := range db.store {
		dist := distance(target, v)
		if dist < smallest {
			filename, smallest = k, dist
		}
	}
	delete(db.store, filename)
	return filename
}

var TILESDB map[string][3]float64

func cloneTilesDB() DB {
	db := make(map[string][3]float64)
	for k, v := range TILESDB {
		db[k] = v
	}
	tiles := DB{
		store: db,
		mutex: &sync.Mutex{},
	}
	return tiles
}

//并发使用map,必须加锁
var lock sync.Mutex

func oneDB(db *map[string][3]float64, name string) {
	file, err := os.Open(name)
	if err == nil {
		img, _, err := image.Decode(file)
		if err == nil {
			lock.Lock()
			(*db)[name] = averageColor(img)
			lock.Unlock()
		} else {
			fmt.Println("error in populating tiles db:", err, name)
		}
	} else {
		fmt.Println("cannot open file", name, "when populating tiles db:", err)
	}
	file.Close()
}

// populate a tiles database in memory
func tilesDB() map[string][3]float64 {
	fmt.Println("Start populating tiles db ...")
	db := make(map[string][3]float64)

	//等待所有goroutine执行结束
	var wg sync.WaitGroup

	//控制最多同时运行的goroutine数量
	tokens := make(chan struct{}, 20)

	files, _ := ioutil.ReadDir("tiles")
	for _, f := range files {
		name := filepath.Join("tiles", f.Name())

		wg.Add(1)
		tokens <- struct{}{}
		go func(name string) {
			defer wg.Done()

			oneDB(&db, name)

			<-tokens
		}(name)

	}

	wg.Wait()
	fmt.Println("Finished populating tiles db.")
	return db
}

main.go

package main

import (
	"bytes"
	"encoding/base64"
	"fmt"
	"html/template"
	"image"
	"image/draw"
	"image/jpeg"
	"net/http"
	"os"
	"strconv"
	"sync"
	"time"
)

func main() {
	fmt.Println("Starting mosaic server ...")
	mux := http.NewServeMux()
	files := http.FileServer(http.Dir("public"))
	mux.Handle("/static/", http.StripPrefix("/static/", files))

	mux.HandleFunc("/", upload)
	mux.HandleFunc("/mosaic", mosaic)

	server := &http.Server{
		Addr:    "127.0.0.1:8080",
		Handler: mux,
	}
	TILESDB = tilesDB()

	fmt.Println("Mosaic server started.")
	server.ListenAndServe()
}

func upload(w http.ResponseWriter, r *http.Request) {
	t, _ := template.ParseFiles("upload.html")
	t.Execute(w, nil)
}

func oneTile(nearest string, tileSize int, x int, y int, newimage *image.NRGBA) {
	sp := image.Point{0, 0}
	file, err := os.Open(nearest)
	if err == nil {
		img, _, err := image.Decode(file)
		if err == nil {
			// resize the tile to the correct size
			t := scale(img, tileSize, tileSize)
			tileBounds := image.Rect(x, y, x+tileSize, y+tileSize)
			// draw the tile into the mosaic
			draw.Draw(newimage, tileBounds, t, sp, draw.Src)
		} else {
			fmt.Println("error:", err, nearest)
		}
	} else {
		fmt.Println("error:", nearest)
	}
	file.Close()
}

//  Handler function for fan-out and fan-in
func mosaic(w http.ResponseWriter, r *http.Request) {
	t0 := time.Now()
	// get the content from the POSTed form
	r.ParseMultipartForm(10485760) // max body in memory is 10MB
	file, _, _ := r.FormFile("image")
	defer file.Close()
	tileSize, _ := strconv.Atoi(r.FormValue("tile_size"))

	//   // decode and get original image
	original, _, _ := image.Decode(file)
	bounds := original.Bounds()
	width := bounds.Dx()
	height := bounds.Dy()
	newimage := image.NewNRGBA(image.Rect(0, 0, width, height))

	db := cloneTilesDB()

	//等待所有goroutine执行结束
	var wg sync.WaitGroup

	//控制最多同时运行的goroutine数量
	tokens := make(chan struct{}, 20)

	//sp := image.Point{0, 0}
	for y := 0; y < height; y += tileSize {
		for x := 0; x < width; x += tileSize {
			// use the top left most pixel as the average color
			r, g, b, _ := original.At(x, y).RGBA()
			color := [3]float64{float64(r), float64(g), float64(b)}
			// get the closest tile from the tiles DB
			nearest := db.nearest(color)

			wg.Add(1)
			tokens <- struct{}{}
			go func(name string, x int, y int) {
				defer wg.Done()

				oneTile(nearest, tileSize, x, y, newimage)

				<-tokens
			}(nearest, x, y)
		}
	}
	wg.Wait()

	buf1 := new(bytes.Buffer)
	jpeg.Encode(buf1, original, nil)
	originalStr := base64.StdEncoding.EncodeToString(buf1.Bytes())

	buf2 := new(bytes.Buffer)
	jpeg.Encode(buf2, newimage, nil)
	mosaic := base64.StdEncoding.EncodeToString(buf2.Bytes())

	t1 := time.Now()
	images := map[string]string{
		"original": originalStr,
		"mosaic":   mosaic,
		"duration": fmt.Sprintf("%v ", t1.Sub(t0)),
	}

	t, _ := template.ParseFiles("results.html")
	t.Execute(w, images)
}

main.go.bak

package main

import (
	"bytes"
	"encoding/base64"
	"fmt"
	"html/template"
	"image"
	"image/draw"
	"image/jpeg"
	"net/http"
	"os"
	"strconv"
	"sync"
	"time"
)

func main() {
	fmt.Println("Starting mosaic server ...")
	mux := http.NewServeMux()
	files := http.FileServer(http.Dir("public"))
	mux.Handle("/static/", http.StripPrefix("/static/", files))

	mux.HandleFunc("/", upload)
	mux.HandleFunc("/mosaic", mosaic)

	server := &http.Server{
		Addr:    "127.0.0.1:8080",
		Handler: mux,
	}
	TILESDB = tilesDB()

	fmt.Println("Mosaic server started.")
	server.ListenAndServe()
}

func upload(w http.ResponseWriter, r *http.Request) {
	t, _ := template.ParseFiles("upload.html")
	t.Execute(w, nil)
}

// cut out the image and return individual channels with image.Image
// no encoding of JPEG
func cut(original image.Image, db *DB, tileSize, x1, y1, x2, y2 int) <-chan image.Image {
	c := make(chan image.Image)
	sp := image.Point{0, 0}
	go func() {
		newimage := image.NewNRGBA(image.Rect(x1, y1, x2, y2))
		for y := y1; y < y2; y = y + tileSize {
			for x := x1; x < x2; x = x + tileSize {
				r, g, b, _ := original.At(x, y).RGBA()
				color := [3]float64{float64(r), float64(g), float64(b)}
				nearest := db.nearest(color)
				file, err := os.Open(nearest)
				if err == nil {
					img, _, err := image.Decode(file)
					if err == nil {
						tile := scale(img, tileSize, tileSize)
						//tile := t.SubImage(t.Bounds())
						tileBounds := image.Rect(x, y, x+tileSize, y+tileSize)
						draw.Draw(newimage, tileBounds, tile, sp, draw.Src)
					} else {
						fmt.Println("error in decoding nearest", err, nearest)
					}
				} else {
					fmt.Println("error opening file when creating mosaic:", nearest)
				}
				file.Close()
			}
		}
		c <- newimage.SubImage(newimage.Rect)
	}()

	return c
}

// combine the images and return the encoding string
func combine(r image.Rectangle, c1, c2, c3, c4 <-chan image.Image) <-chan string {
	c := make(chan string)
	// start a goroutine
	go func() {
		var wg sync.WaitGroup
		newimage := image.NewNRGBA(r)
		copy := func(dst draw.Image, r image.Rectangle, src image.Image, sp image.Point) {
			defer wg.Done()
			draw.Draw(dst, r, src, sp, draw.Src)
		}
		wg.Add(4)
		var s1, s2, s3, s4 image.Image
		var ok1, ok2, ok3, ok4 bool
		for {
			select {
			case s1, ok1 = <-c1:
				go copy(newimage, s1.Bounds(), s1, image.Point{r.Min.X, r.Min.Y})
			case s2, ok2 = <-c2:
				go copy(newimage, s2.Bounds(), s2, image.Point{r.Max.X / 2, r.Min.Y})
			case s3, ok3 = <-c3:
				go copy(newimage, s3.Bounds(), s3, image.Point{r.Min.X, r.Max.Y / 2})
			case s4, ok4 = <-c4:
				go copy(newimage, s4.Bounds(), s4, image.Point{r.Max.X / 2, r.Max.Y / 2})
			}
			if ok1 && ok2 && ok3 && ok4 {
				break
			}
		}
		// wait till all copy goroutines are complete
		wg.Wait()

		buf2 := new(bytes.Buffer)
		jpeg.Encode(buf2, newimage, nil)
		c <- base64.StdEncoding.EncodeToString(buf2.Bytes())
	}()
	return c
}

//  Handler function for fan-out and fan-in
func mosaic(w http.ResponseWriter, r *http.Request) {
	t0 := time.Now()
	// get the content from the POSTed form
	r.ParseMultipartForm(10485760) // max body in memory is 10MB
	file, _, _ := r.FormFile("image")
	defer file.Close()
	tileSize, _ := strconv.Atoi(r.FormValue("tile_size"))

	//   // decode and get original image
	original, _, _ := image.Decode(file)
	bounds := original.Bounds()
	db := cloneTilesDB()
	width := bounds.Dx()
	height := bounds.Dy()
	// fan-out
	c1 := cut(original, &db, tileSize, 0, 0, width/2, height/2)
	c2 := cut(original, &db, tileSize, width/2, 0, width, height/2)
	c3 := cut(original, &db, tileSize, 0, height/2, width/2, height)
	c4 := cut(original, &db, tileSize, width/2, height/2, width, height)

	// fan-in
	c := combine(bounds, c1, c2, c3, c4)

	buf1 := new(bytes.Buffer)
	jpeg.Encode(buf1, original, nil)
	originalStr := base64.StdEncoding.EncodeToString(buf1.Bytes())

	t1 := time.Now()
	images := map[string]string{
		"original": originalStr,
		"mosaic":   <-c,
		"duration": fmt.Sprintf("%v ", t1.Sub(t0)),
	}

	t, _ := template.ParseFiles("results.html")
	t.Execute(w, images)
}
 类似资料: