並列処理

並列処理 #

処理を並列で実行する #

Go 言語では goroutine という仕組みで並列処理を実行する。

goroutine は任意の関数をメイン関数と並列で実行する仕組みで、go 関数に任意の関数を渡して実行する。

以下にメイン関数と並列処理関数で定期的に出力をする処理を記載した。

処理結果が末尾に記載されており、メイン関数と並列処理関数が同時に実行されていることが確認できる。

package main

import (
	"fmt"
	"time"
)

func main() {
	fmt.Printf("[MAIN] start\n")

	// go 関数で並列処理を開始
	go sleep(3)

	// 処理の終了を待つ
	for i := 0; i < 5; i++ {
		fmt.Printf("[MAIN] %d\n", i)
		time.Sleep(1 * time.Second)
	}

	fmt.Printf("[MAIN] end\n")
}

func sleep(n int) {
	fmt.Printf("[GOROUTINE] start sleep: %d\n", n)

	for i := 0; i < 5; i++ {
		fmt.Printf("[GOROUTINE] %d\n", i)
		time.Sleep(1 * time.Second)
	}

	fmt.Printf("[GOROUTINE] end sleep: %d\n", n)
}

/* 出力結果

[MAIN] start
[MAIN] 0
[GOROUTINE] start sleep: 3
[GOROUTINE] 0
[GOROUTINE] 1
[MAIN] 1
[MAIN] 2
[GOROUTINE] 2
[GOROUTINE] 3
[MAIN] 3
[GOROUTINE] 4
[MAIN] 4
[MAIN] end

*/

play_circleRun open_in_newRun In The Playground

参考ドキュメント: time

並列処理へ値を渡す #

メイン関数から並列実行関数へ値を渡すためにチャネルを使う。

チャネルは変数の亜種であり、並列実行される関数間で値をやり取りするためのキューのような仕組みである。

チャネルは型の定義によって読み込み専用・書き込み専用という指定ができる。 これによりデータの流れる方向をコントロールすることができ、データの同時更新による破損などを避けて安全にデータのやり取りができる。

また、チャネルは読み込み待ちで処理をブロックできるので、並列実行している関数間で実行待ちしたり実行タイミングを調整したりできる。

この例ではメイン関数から並列実行関数へチャネル経由で値を渡す。 並列実行する関数の引数として読み取り専用のチャネル <-chan int 型の引数を受け取る。

package main

import (
	"fmt"
	"time"
)

func main() {
	fmt.Println("[MAIN] start")

	// チャネルの定義
	// 長さが 1 で int 型
	c := make(chan int, 1)

	// 並列処理の実行開始
	// go に無名関数を渡して即時実行する場合
	go func(c <-chan int) {
		fmt.Println("[GOROUTINE] start")

		// チャネルから値を受け取る
		// 値が渡ってくるまで処理が停止する
		n := <-c
		fmt.Println("[GOROUTINE] ", n)

		fmt.Println("[GOROUTINE] end")
	}(c)

	for i := 0; i < 3; i++ {
		fmt.Printf("[MAIN] %d\n", i)
		time.Sleep(1 * time.Second)
	}

	// チャネルへ値を渡す
	c <- 100

	time.Sleep(1 * time.Second)

	fmt.Println("[MAIN] end")
}

/* 出力結果

[MAIN] start
[MAIN] 0
[GOROUTINE] start
[MAIN] 1
[MAIN] 2
[GOROUTINE]  100
[GOROUTINE] end
[MAIN] end

*/

play_circleRun open_in_newRun In The Playground

参考ドキュメント: time

並列処理から値を受け取る #

並列実行関数からメイン関数へチャネル経由で値を渡す。

並列実行する関数の引数として読み取り専用のチャネル <-chan int 型の引数を受け取る。

package main

import (
	"fmt"
)

func main() {
	fmt.Println("[MAIN] start")

	c := make(chan int, 5)

	go func(c chan<- int) {
		fmt.Println("[GOROUTINE] start")

		for i := 0; i < 5; i++ {
			fmt.Printf("[GOROUTINE] send %d\n", i)

			// チャネルへ値を入れる
			c <- i
		}

		fmt.Println("[GOROUTINE] end")
	}(c)

	for i := 0; i < 5; i++ {

		// チャネルから値を受け取る
		// goroutine が値を渡すまで処理が停止する
		n := <-c
		fmt.Printf("[MAIN] receive %d\n", n)
	}

	fmt.Println("[MAIN] end")
}

/* 出力結果

[MAIN] start
[GOROUTINE] start
[GOROUTINE] send 0
[GOROUTINE] send 1
[GOROUTINE] send 2
[GOROUTINE] send 3
[GOROUTINE] send 4
[GOROUTINE] end
[MAIN] receive 0
[MAIN] receive 1
[MAIN] receive 2
[MAIN] receive 3
[MAIN] receive 4
[MAIN] end

*/

play_circleRun open_in_newRun In The Playground

参考ドキュメント: time

並列処理の終了を待つ #

メイン関数で並列処理関数の終了を待つためには、並列処理関数にあらかじめチャネルを渡しておき処理終了時にチャネル経由で通知させる。

その際、メイン関数はチャネルの読み込み待ちで処理を停止させておく。

package main

import (
	"fmt"
	"time"
)

func main() {
	fmt.Println("[MAIN] start")

	c := make(chan bool, 1)

	go func(c chan<- bool) {
		fmt.Println("[GOROUTINE] start")

		for i := 0; i < 5; i++ {
			fmt.Printf("[GOROUTINE] sleep %d\n", i)

			time.Sleep(1 * time.Second)
		}

		// チャネルへ値を入れる
		c <- true

		fmt.Println("[GOROUTINE] end")
	}(c)

	fmt.Println("[MAIN] wait")

	// チャネルから値を受け取る
	// goroutine が値を渡すまで処理が停止する
	<-c

	fmt.Println("[MAIN] end")
}

/* 出力結果

[MAIN] start
[MAIN] wait
[GOROUTINE] start
[GOROUTINE] sleep 0
[GOROUTINE] sleep 1
[GOROUTINE] sleep 2
[GOROUTINE] sleep 3
[GOROUTINE] sleep 4
[GOROUTINE] end
[MAIN] end

*/

play_circleRun open_in_newRun In The Playground

参考ドキュメント: time

並列処理を終了させる #

メイン関数から並列処理関数を終了させるためには、並列処理関数にあらかじめチャネルを渡しておき、処理を終了させたいタイミングでメイン関数からチャネルに値を送る。

並列処理関数側では本来の処理を行いながら渡されたチャネルも定期的にチェックして適切に終了処理に入れるようにする。

この例では並列処理関数で select を使い複数のチャネルを同時にリッスンすることで本処理をしながらいつでも終了処理に入れるように実装した。

package main

import (
	"fmt"
	"time"
)

func main() {
	fmt.Println("[MAIN] start")

	// time.Ticker は指定した間隔でチャネルに値を送る機能を持つ
	// 一定時間ごとに何らかの処理を行わせるためのきっかけとできる
	ticker := time.NewTicker(500 * time.Millisecond)

	// 終了を通知するチャネル
	quit := make(chan bool)

	go func(quit <-chan bool, ticker *time.Ticker) {
		fmt.Println("[GOROUTINE] start")

		// for で無限ループを行い select で複数のチャネルを待ち受ける
		//
		// select は case で受けているいずれかのチャネルへ書き込みがあるまで待ち続け
		// チャネルへの書き込みを受けて処理が終わったら for によってまた待ち受け状態に戻る
		for {
			select {
			case <-quit:
				// quit チャネルから値が読めれば関数を終了する
				fmt.Println("[GOROUTINE] end")
				return
			case t := <-ticker.C:
				// ticker から定期的に値が送られてくるので何らかの処理を定期定期に実行できる
				fmt.Println("Tick at", t)
			}
		}

	}(quit, ticker)

	fmt.Println("[MAIN] wait")

	// 少し待ってから並列処理を停止させる
	time.Sleep(3 * time.Second)
	quit <- true

	fmt.Println("[MAIN] end")
}

/* 出力結果

[MAIN] start
[MAIN] wait
[GOROUTINE] start
Tick at 2009-11-10 23:00:00.5 +0000 UTC m=+0.500000001
Tick at 2009-11-10 23:00:01 +0000 UTC m=+1.000000001
Tick at 2009-11-10 23:00:01.5 +0000 UTC m=+1.500000001
Tick at 2009-11-10 23:00:02 +0000 UTC m=+2.000000001
Tick at 2009-11-10 23:00:02.5 +0000 UTC m=+2.500000001
Tick at 2009-11-10 23:00:03 +0000 UTC m=+3.000000001
[GOROUTINE] end
[MAIN] end

*/

play_circleRun open_in_newRun In The Playground

参考ドキュメント: time

複数の並列処理の終了を待つ #

sync.WaitGroup を使うと複数の並列処理の終了を待てる。

使い方は単純で、WaitGroup が内部でカウンタを持っているので、並列処理の開始でカウンタを +1 して、終了でカウンタを -1 する。

メイン関数ではこのカウンタが 0 になるのを待ち受ける。

package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	fmt.Println("[MAIN] start")

	// 初期化
	wg := new(sync.WaitGroup)

	for i := 0; i < 3; i++ {
		// 開始時にカウンタを +1
		wg.Add(1)

		go func(wg *sync.WaitGroup, i int) {
			fmt.Printf("[GOROUTINE][%d] start\n", i)

			time.Sleep(time.Duration(i) * time.Second)

			// 終了したのでカウンタを -1
			wg.Done()

			fmt.Printf("[GOROUTINE][%d] end\n", i)
		}(wg, i)

	}

	// カウンタが 0 になるまで待つ
	wg.Wait()

	fmt.Println("[MAIN] end")
}

/* 出力結果

[MAIN] start
[GOROUTINE][2] start
[GOROUTINE][0] start
[GOROUTINE][0] end
[GOROUTINE][1] start
[GOROUTINE][1] end
[GOROUTINE][2] end
[MAIN] end

*/

play_circleRun open_in_newRun In The Playground

参考ドキュメント: time , sync

並列処理で安全に値を変更する #

複数の並列処理で同時に同じ値を変更するとデータの破損が発生したりエラーが発生する。

並列処理での値の変更を安全に行うためにはチャネルを使うか専用の仕組みを用いるのが良い。

数値を安全に変更するためには sync/atomic パッケージの func AddUint64(addr *uint64, delta uint64) (new uint64) などの関数が用意されている。

マップやスライス、構造体などを安全に変更するためには sync パッケージの Mutex によるロック機構を用いる。

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
)

var ui uint64 = 0
var m = map[int]int{}
var mu = &sync.Mutex{}

func main() {
	fmt.Println("[MAIN] start")

	wg := new(sync.WaitGroup)

	for i := 1; i <= 10; i++ {
		go func(wg *sync.WaitGroup, mu *sync.Mutex) {
			for i := 0; i < 1000; i++ {

				// 並列処理でも安全な map の更新
				mu.Lock()
				m[i%3] += 1
				mu.Unlock()

				// 並列処理でも安全な uint64 の加算
				atomic.AddUint64(&ui, 1)

				// 危険な map の更新. コメントを外して実行すると同時書き込みのため panic が起こる
				// m[i%3] += 1

				// 危険な uint64 の加算. コメントを外して実行すると同時書き込みにより正常に加算できず最終結果が 10000 にならない
				// ui += 1
			}
			wg.Done()
		}(wg, mu)

		wg.Add(1)
	}
	wg.Wait()

	fmt.Printf("[MAIN] ui:%v, m:%v\n", ui, m)

	fmt.Println("[MAIN] end")
}

/* 出力結果

[MAIN] start
[MAIN] ui:10000, m:map[0:3340 1:3330 2:3330]
[MAIN] end

*/

play_circleRun open_in_newRun In The Playground

参考ドキュメント: sync , sync/atomic