はじめに
Goの並行処理プリミティブを使用すると、I/Oとマルチコアを効率的に使用するストリーミング・データ・パイプラインを簡単に構築できます。この記事では、パイプラインの例をいくつか紹介し、操作に失敗したときのニュアンスを強調し、失敗を優雅に処理するテクニックを紹介します。
パイプラインとは?
Goにはパイプラインの正式な定義はありません。パイプラインは、多くの並行プログラムの1つのクラスにすぎません。一般的に、パイプラインはチャネルで接続されたステージのシーケンスです。各ステージには同じロジックを実行するゴルーチンがあり、各ステージでゴルーチン
- チャネルからのアップストリームデータの読み込み
- データに対して何らかの操作を行い、通常は新しいデータを生成します。
- チャネル経由でデータを下流に送信
各ステージは任意の数の入力チャンネルと出力チャンネルを持つことができますが、最初のチャンネルとほとんどのチャンネルを持ちます。最初のステップは通常、データソースまたはプロデューサーと呼ばれ、最後のステップはストレージプールまたはコンシューマーと呼ばれます。
この概念とテクニックは、まず単純なパイプラインの例から説明し、より複雑な例は後で紹介します。
数の二乗
パイプラインには3つのステージがあると仮定します。
最初のステップであるgen関数は、数値のリストをチャネルに変換する関数です。gen関数はゴルーチンを開始し、チャネルに数値を送信し、すべての数値が送信された後にチャネルを閉じます。
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
第2ステージのsqは、上記のチャネルから数値を受信し、受信したすべての数値の2乗を含むチャネルを返します。上流のチャネルが閉じられた後、このステージはすべての結果を下流に送信し、その後、出力チャネルを閉じます:
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
メイン関数はこのパイプラインを構築し、第1ステージを実行し、第2ステージから結果を受け取り、チャネルがクローズされるまで1つずつプリントします。
func main() {
// Set up the pipeline.
c := gen(2, 3)
out := sq(c)
// Consume the output.
fmt.Println(<-out) // 4
fmt.Println(<-out) // 9
}
sqは入力チャンネルと出力チャンネルに同じ型を持つため、それらを何度でも組み合わせることができます。また、他のステージのように、メイン関数をループ・トラバーサルとして書き換えることも可能です。
func main() {
// Set up the pipeline and consume the output.
for n := range sq(sq(gen(2, 3))) {
fmt.Println(n) // 16 then 81
}
}
ファンアウトとファンイン
複数の関数が同じチャネルからそのチャネルが閉じられるまでデータを読み取ることができ、これをファンアウトと呼びます。これは、CPUとI/Oを並列に利用するために、複数の作業インスタンスが分散して連携する方法です。
関数は複数の入力からデータを読み込み、すべての入力チャンネルがクローズされるまでデータを処理することができます。この関数はすべての入力チャンネルを1つのチャンネルにインポートします。これをファンインと呼びます。
パイプラインは、sqの2つのインスタンスを実行するように設定できます。新しい関数mergeが導入され、結果のファン化
func main() {
in := gen(2, 3)
// Distribute the sq work across two goroutines that both read from in.
c1 := sq(in)
c2 := sq(in)
// Consume the merged output from c1 and c2.
for n := range merge(c1, c2) {
fmt.Println(n) // 4 then 9, or 9 then 4
}
}
merge関数は各入力チャンネルに対してgoroutineを開始し、同じ出力チャンネルにデータをコピーします。すべての出力ゴルーチンが立ち上がると、mergeは別のゴルーチンを開始し、すべての入力がコピーされた後に出力チャネルを閉じます。
クローズされたチャネルにデータを送信すると例外が発生するので、closeを呼び出してすべての送信アクションが実行されたことを確認することが重要です。
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed, then calls wg.Done.
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}
停止する技術
パイプラインの機能はすべてパターンに従っています:
- 送信者は送信を終えると出力チャネルを閉じます。
- レシーバーは、入力パイプが閉じられるまで、入力パイプからのデータを受信し続けます。
このパターンでは、各受信関数をRANGEループとして記述することができ、データがダウンストリームに正常に送信されると、すべてのゴルーチンがシャットダウンされます。
しかし、実際のケースでは、すべての入力データがレシーバで処理される必要はありません。これは意図的な場合もあります。レシーバはデータのサブセットだけで十分な場合もありますし、より一般的には、入力データにエラーがあるためにレシーバ関数が早期に終了する場合もあります。どちらの場合でも、レシーバは後続のデータが到着するのを待ち続けるべきではありませんし、後続のステップで不要になったデータの生成を上流関数が停止することを期待します。
パイプラインの例では、ステージが入力データをすべて消費できない場合、そのデータを送信するゴルーチンはブロックし続けます:
// Consume the first value from output.
out := merge(c1, c2)
fmt.Println(<-out) // 4 or 9
return
// Since we didn't receive the second value from out,
// one of the output goroutines is hung attempting to send it.
}
これはリソース・リークです:ゴルーチンはメモリとランタイム・リソースを占有します。ゴルーチン・スタックが保持するヒープ参照は、GCがリソースを取り戻すのを妨げます。そして、ゴルーチンはゴミ箱に回収されることはなく、自発的に終了しなければなりません。
パイプラインの上流側の関数は、下流側の関数がすべての入力データを受け取ることができない場合に終了するように設計し直す必要があります。これを実現する1つの方法は、出力チャネルに一定のキャッシュを持たせることです。キャッシュは一定量のデータを保存できます。十分なキャッシュ領域がある場合、送信操作はすぐに戻ります。
c := make(chan int, 2) // buffer size 2
c <- 1 // succeeds immediately
c <- 2 // succeeds immediately
c <- 3 // blocks until another goroutine does <-c and receives 1
キャッシュを持つチャネルは、チャネル作成時に送信するデータ量がわかっていれば、コードを単純化します。例えば、gen関数をオーバーライドして、新しいgoroutineを作成する代わりに、一連の整数をキャッシュされたチャネルにコピーすることができます:
func gen(nums ...int) <-chan int {
out := make(chan int, len(nums))
for _, n := range nums {
out <- n
}
close(out)
return out
}
次に、パイプラインのブロックされたゴルーチンを見て、マージ関数が返す出力チャネルのキャッシュを追加することを検討します:
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int, 1) // enough space for the unread inputs
// ... the rest is unchanged ...
これはプログラム内でゴルーチンをブロックすることは避けられますが、かなりくだらないコードです。1というキャッシュサイズを選択することは、マージ関数が受け取る数値の数と、下流関数が消費する数値の数を知ることに依存します。これは非常に不安定で、genに送られる数値が1つ増えたり、下流関数が消費する数値が少なくなったりすると、再びgoroutineをブロックしてしまいます。
しかし、ダウンストリーム機能がアップストリームの送信者に、ダウンストリームがデータの受信を停止することを通知する方法を提供する必要があります。
#p#
明示的キャンセル
メイン関数がoutからすべてのデータを受け取らずに終了することを決定した場合、上流のgoroutineに送信しようとしているデータをキャンセルするように通知する必要があります。これは、doneと呼ばれるチャネルにデータを送信することで可能です。潜在的にブロックしているゴルーチンが2つあるので、メイン関数は2つのデータを送信します:
func main() {
in := gen(2, 3)
// Distribute the sq work across two goroutines that both read from in.
c1 := sq(in)
c2 := sq(in)
// Consume the first value from output.
done := make(chan struct{}, 2)
out := merge(done, c1, c2)
fmt.Println(<-out) // 4 or 9
// Tell the remaining senders we're leaving.
done <- struct{}{}
done <- struct{}{}
}
sendゴルーチンでは、sendオペレーションをselect文に置き換える必要があり、outに対してsendオペレーションが発生するか、doneからデータが受信されます。アップストリーム関数はブロックしません。
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed or it receives a value
// from done, then output calls wg.Done.
output := func(c <-chan int) {
for n := range c {
select {
case out <- n:
case <-done:
}
}
wg.Done()
}
// ... the rest is unchanged ...
このアプローチには問題があります。各ダウンストリーム関数は、ブロックする可能性のあるアップストリームの送信者の数を知る必要があります。これらの数を追跡し続けることは面白くありませんし、エラーが発生しやすくなります。
未知または無制限の数のゴルーチンがすべてダウンストリームへのデータ送信を停止できるようにする方法が必要です。Goでは、チャネルを閉じることでこれを実現できます。閉じたチャネルからの受信操作は常に直ちに成功し、適切なデータ型のゼロ値を返すからです。
つまり、メイン関数はdoneをクローズするだけで、すべての送信者のブロックを解除することができます。クローズ操作は、送信者への効率的なブロードキャストシグナルです。拡張パイプラインのすべての関数は引数として done を受け取り、defer を介して対応するチャネルの close 操作を実装します。そのため、メイン関数がどの行で終了しても、上流の終了が通知されます。
func main() {
// Set up a done channel that's shared by the whole pipeline,
// and close that channel when this pipeline exits, as a signal
// for all the goroutines we started to exit.
done := make(chan struct{})
defer close(done)
in := gen(done, 2, 3)
// Distribute the sq work across two goroutines that both read from in.
c1 := sq(done, in)
c2 := sq(done, in)
// Consume the first value from output.
out := merge(done, c1, c2)
fmt.Println(<-out) // 4 or 9
// done will be closed by the deferred call.
}
マージ関数のoutputは、受信パイプラインのデータが消費された後に戻ることができます。なぜなら、output関数は、上流のsenderqがdoneが閉じられた後にデータの生成を停止することを知っているからです。また、outputは、defer文によって、すべての終了パスでwq.Doneが呼び出されることを保証します。
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c or done is closed, then calls
// wg.Done.
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
select {
case out <- n:
case <-done:
return
}
}
}
// ... the rest is unchanged ...
同様に、sqはdoneが閉じた直後に戻ることができます。sqはdeferステートメントを使用して、任意の終了パスをその出力チャネルを閉じるようにします。
func sq(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-done:
return
}
}
}()
return out
}
パイプライン建設のガイドラインは以下の通りです:
- 各ステージは、すべての送信操作が完了した後、出力チャンネルを閉じます。
- 各ステージは、入力チャンネルが閉じられるか、プロデューサーのブロックが解除されるまで、入力チャンネルからデータを受信し続けます。
パイプラインがプロデューサのブロックを解除する方法は2つあります。生成されようとしているデータを格納するのに十分なキャッシュ領域があることを確認するか、プロデューサ-コンシューマにデータの受信をキャンセルすることを明示的に通知するかです。
ツリーアブストラクト
より実用的なパイプラインを見てみましょう。
MD5は、ファイルのチェックサムに便利なメッセージ・ダイジェスト・アルゴリズムです。コマンドラインツールのmd5sumは、ファイルの一連のダイジェスト値を表示するのに便利です。
% md5sum *.go
d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go
ee869afd31f83cbb2d10ee81b2b831dc parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go
サンプルのプログラムはmd5sumと似ていますが、引数として1つのフォルダを受け取り、そのフォルダ内のすべての共通ファイルの要約値をパス名でソートして表示します。
% go run serial.go .
d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go
ee869afd31f83cbb2d10ee81b2b831dc parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go
このプログラムのメイン関数は、パス名からダイジェスト値へのハッシュテーブルを返すユーティリティ関数MD5ALLを呼び出し、結果をソートして出力します:
func main() {
// Calculate the MD5 sum of all files under the specified directory,
// then print the results sorted by path name.
m, err := MD5All(os.Args[1])
if err != nil {
fmt.Println(err)
return
}
var paths []string
for path := range m {
paths = append(paths, path)
}
sort.Strings(paths)
for _, path := range paths {
fmt.Printf("%x %s
", m[path], path)
}
}
MD5ALLが議論の中心です。serial.goでは、並行処理は使用されず、フォルダをトラバースしてファイルを読み取り、ダイジェスト値を見つけるだけです。
// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents. If the directory walk
// fails or any read operation fails, MD5All returns an error.
func MD5All(root string) (map[string][md5.Size]byte, error) {
m := make(map[string][md5.Size]byte)
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
data, err := ioutil.ReadFile(path)
if err != nil {
return err
}
m[path] = md5.Sum(data)
return nil
})
if err != nil {
return nil, err
}
return m, nil
}
#p#
要求値の同時抽出
parallel.goでは、MD5ALLは2段階のパイプラインに分割されています。最初のステージであるsumFilesは、要約値のためにファイルごとに1つのgoroutineでフォルダをトラバースし、resultのデータ型を持つチャネルで結果を送信します:
type result struct {
path string
sum [md5.Size]byte
err error
}
sumFilesは2つのチャンネルを返します: 1つは結果を生成するためのチャンネル、もう1つはfilepathのチャンネルです。
func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
// For each regular file, start a goroutine that sums the file and sends
// the result on c. Send the result of the walk on errc.
c := make(chan result)
errc := make(chan error, 1)
go func() {
var wg sync.WaitGroup
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
wg.Add(1)
go func() {
data, err := ioutil.ReadFile(path)
select {
case c <- result{path, md5.Sum(data), err}:
case <-done:
}
wg.Done()
}()
// Abort the walk if done is closed.
select {
case <-done:
return errors.New("walk canceled")
default:
return nil
}
})
// Walk has returned, so all calls to wg.Add are done. Start a
// goroutine to close c once all the sends are done.
go func() {
wg.Wait()
close(c)
}()
// No select needed here, since errc is buffered.
errc <- err
}()
return c, errc
}
MD5Allはcからサマリー値を受け取ります。MD5Allはエラーが発生すると早期に終了し、deferで終了します。
func MD5All(root string) (map[string][md5.Size]byte, error) {
// MD5All closes the done channel when it returns; it may do so before
// receiving all the values from c and errc.
done := make(chan struct{})
defer close(done)
c, errc := sumFiles(done, root)
m := make(map[string][md5.Size]byte)
for r := range c {
if r.err != nil {
return nil, r.err
}
m[r.path] = r.sum
}
if err := <-errc; err != nil {
return nil, err
}
return m, nil
}
境界並列性
parallel.goに実装されているMD5ALLは、ファイルごとにゴルーチンを起動します。大きなファイルを大量に含むフォルダでは、マシンで使用可能なメモリを超えるメモリ割り当てにつながる可能性があります。
これは、ファイルを読み込む際の並行性の程度を制限することで回避できます。bounded.goでは、ある数のgoroutineを作成することでファイルを読み込みます。これで、パイプラインは、フォルダのトラバース、ファイルの読み取りとダイジェスト値の計算、ダイジェスト値の収集の3段階になりました。
最初のステージである walkFiles は、フォルダ内の共通ファイルのファイルパスを出力します:
func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
paths := make(chan string)
errc := make(chan error, 1)
go func() {
// Close the paths channel after Walk returns.
defer close(paths)
// No select needed for this send, since errc is buffered.
errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
select {
case paths <- path:
case <-done:
return errors.New("walk canceled")
}
return nil
})
}()
return paths, errc
}
中間ステージは一定数の消化器ゴルーチンを起動し、パスからファイル名を受け取り、RESULT構造体をc:に送ります。
func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
for path := range paths {
data, err := ioutil.ReadFile(path)
select {
case c <- result{path, md5.Sum(data), err}:
case <-done:
return
}
}
}
前の例とは異なり、出力チャネルは共有され、複数のゴルーチンが同じチャネルにデータを送信するため、消化器はその出力チャネルを閉じません。
// Start a fixed number of goroutines to read and digest files.
c := make(chan result)
var wg sync.WaitGroup
const numDigesters = 20
wg.Add(numDigesters)
for i := 0; i < numDigesters; i++ {
go func() {
digester(done, paths, c)
wg.Done()
}()
}
go func() {
wg.Wait()
close(c)
}()
各消化器がそれ自身の出力チャンネルを作成し、返すようにすることも可能ですが、その場合、それらの結果をファンにするために追加のゴルーチンが必要になります。
最後のステージは、cからすべての結果データを受け取り、errcからエラーをチェックします。このチェックは、walkFilesが下流へのデータ送信をブロックする可能性があるステージでは行うことができません:
m := make(map[string][md5.Size]byte)
for r := range c {
if r.err != nil {
return nil, r.err
}
m[r.path] = r.sum
}
// Check whether the Walk failed.
if err := <-errc; err != nil {
return nil, err
}
return m, nil
}
結論
この投稿では、Goでストリーミングデータパイプラインを構築する場合のテクニックについて説明します。パイプラインの各ステージが下流へのデータ送信をブロックされたり、下流のステージが入力データを気にしなくなったりする可能性があるためです。チャネルを閉じることで、パイプラインを開始するすべてのゴルーチンに完了シグナルをブロードキャストする方法を示し、パイプラインを正しく構築するためのガイドラインを定義します。
深読み:
- Go同時実行パターンは、Go同時実行プリミティブの基本的な概念と、そのいくつかの実装を示します。
- 高度なGo同時実行パターンには、より複雑ないくつかのGo同時実行プリミティブの使用が含まれます。
- ダグラス・マキロイ(Douglas McIlroy)の「Squinting at Power Series」論文は、Goのような並行性パターンが複雑な計算をいかにエレガントにサポートするかを示しています。