背景
最近、社内の他グループで「Go言語による並行処理」の読書会に参加させてもらっており、goroutineとchannelの理解が深まりつつある。自分は4章の前半と5章の後半を担当してます。
しかし「読むのは読めるが、自分で同じようなことが書けたり思い付いたりするかは別問題だな...」という感じている。手を動かさないとダメだなと思ったので、ML-Newsのクローラを題材に遊んでみた。正直、やりたいことに対してやっている内容はオーバーエンジニアリング感は否めないのだが、砂場で遊ぶにはもってこいの内容だった。
元々やっていたことと、goroutineとchannelの練習のために課した追加の仕様
やる内容はとてもシンプル。
- 1: いくつかの主要なRSSのリストを元に、最近のエントリの一覧をクロールする
- 2: クロールした結果をDynamoDBに書き込む
- 2.1: Batchでまとめて書き込むのだが、DynamoDBの仕様として一度に書き込めるのは最大で25個という仕様があり、25個のチャンクに分割して書き込む必要がある
今回は「Go言語による並行処理」で勉強した内容を踏まえて、以下のように内容を変更した。
- 1: いくつかの主要なRSSのリストを元に、最近のエントリの一覧をクロールする。ただし、クロールは最大10並列で行なう
- 2: クロールした結果をDynamoDBに書き込む
- ただし、1のクロールした結果が出揃ってから書き込むのではなく、クロールの処理が終わったものがあれば他の結果を待たずして書き込む
- 書き込むのはなるべく25個揃うのが待つ
- 一つのサイトでクロールエラーが発生しても気にしないが、一定数以上のクロールが失敗したら、他を含めて終了の処理に入るようにする
- ただし、1のクロールした結果が出揃ってから書き込むのではなく、クロールの処理が終わったものがあれば他の結果を待たずして書き込む
準備
ひとまず今回の仕様の修正に関係ないパーツの部分の説明。
サイトのURLを渡すと、エントリの一覧を取ってくる関数。特に面白いところはない。
import ( "context" "github.com/mmcdole/gofeed" ) func GetUrlsFromFeed(ctx context.Context, feedUrl string) ([]string, error) { urls := make([]string, 0) fp := gofeed.NewParser() feed, err := fp.ParseURLWithContext(feedUrl, ctx) if err != nil { return nil, err } now := time.Now() for _, item := range feed.Items { urls = append(urls, item.Link) } } return urls, nil }
最終的にはこういう形で動くものを作りたい。
- 1の要件を満たすため、
GetUrlsFromSourceRSSList
は並行でクロールさせたい - 2の要件を満たすため、
splitToChunk
やWriteUrlsToDynamoDB
は1にブロックされず、できあがったところから処理させたい
FeedFetcherOptions := FeedFetcherOptions{ Concurrent: 10, TimeoutDuration: time.Second * 10, } feedResult := GetUrlsFromSourceRSSList(ctx, getSourceUrls(), FeedFetcherOptions) for chunk := range splitToChunk(ctx, feedResult, chunkingOptions) { fmt.Println("Writing batch...") WriteUrlsToDynamoDB(ctx, svc, chunk) }
やったこと
コード概要
概観としては以下のような感じになった(クリックすると内容が展開されます)。
並行にクロールさせるためのコード
type CrawlResult struct { Urls []string Source string Error error } type FeedFetcherOptions struct { Concurrent int TimeoutDuration time.Duration } func GetUrlsFromSourceRSSList( ctx context.Context, feedUrls []string, options FeedFetcherOptions, ) <-chan CrawlResult { feedResult := make(chan CrawlResult) var wg sync.WaitGroup sem := make(chan struct{}, options.Concurrent) crawl := func(ctx context.Context, feedUrl string) { defer func() { <-sem }() defer wg.Done() result := CrawlResult{Source: feedUrl} urls, err := GetUrlsFromFeed(ctx, feedUrl) if err != nil { result.Error = err } else { result.Urls = urls } feedResult <- result } wg.Add(len(feedUrls)) go func() { for _, feedUrl := range feedUrls { sem <- struct{}{} ctx, cancel := context.WithTimeout(ctx, options.TimeoutDuration) go func(feedUrl string) { defer cancel() crawl(ctx, feedUrl) }(feedUrl) } }() go func() { wg.Wait() close(feedResult) }() return feedResult }
DynamoDBへの書き込みのために適切なチャンクに分割するコード
type ChunkingOptions struct { MaxChunkSize int AllowedErrorCount int } func splitToChunk(ctx context.Context, crawlResult <-chan CrawlResult, options ChunkingOptions) <-chan []string { chunk := make(chan []string) var totalErrCnt uint32 go func(crawlResult <-chan CrawlResult) { defer close(chunk) tmp := make([]string, 0) for result := range crawlResult { if result.Error != nil { fmt.Printf("error (%s) for processing %s\n", result.Error, result.Source) atomic.AddUint32(&totalErrCnt, 1) if totalErrCnt > uint32(options.AllowedErrorCount) { fmt.Printf("too many errors: totalErrCnt = %d\n", totalErrCnt) return } } for _, url := range result.Urls { tmp = append(tmp, url) if len(tmp) >= options.MaxChunkSize { chunk <- tmp tmp = make([]string, 0) } } } chunk <- tmp }(crawlResult) return chunk }
見所ポイントはいくつかあるので、それぞれ紹介する。
戻り値は読み込み専用のチャネル
1のクロールの処理を待たず、終わったものからDynamoDBに書き出すという2の仕様を満たすため、戻り値は「読み込み専用」の「チャネル」(<-chan CrawlResult
)にしている。それぞれ説明する。
まず、結果が(スライスなどではなく)チャネルであるという点について。以下のコードように、並行でクロールはさせるものの別のゴルーチンで走らせており、return feedResult
で次の処理に渡すことができる。
wg.Add(len(feedUrls)) go func() { for _, feedUrl := range feedUrls { sem <- struct{}{} ctx, cancel := context.WithTimeout(ctx, options.TimeoutDuration) go func(feedUrl string) { defer cancel() crawl(ctx, feedUrl) }(feedUrl) } }() go func() { wg.Wait() close(feedResult) }() return feedResult
「Go言語による並行処理」ではチャネルを表わす変数名にstream
と入っていることが多いのだが、処理が出揃わなくても他に処理を渡せる流動的なものという意味でstreamという名前はなるほどと思った。なお、別のゴルーチンでクローラを起動(上のコードの2行目)させないと、クロールが全て終わるまでreturnできずに後段の処理がブロックされてしまうので注意が必要。私ははまった。
また、チャネルが「読み込み専用」というのも重要なポイント。読み込み専用であるということが型で明示されているため、意図せぬところでcloseされてしまうということはないし、このチャネルの生産者であるGetUrlsFromSourceRSSList
がclose
するまでの責任を持たなければならない、ということが表現できている(破ろうとすればコンパイル時点で叩き落としてくれる)。本でいうと4.1 拘束
で詳しく説明されている。
goroutineの終了まで待つ + 終了したらチャネルを閉じる
並行で走らせるものの、全てのサイトをクロールし終わるまではプログラムは終了して欲しくないので、WaitGroup
で待たせるようにする(本では3.2 syncパッケージ
で説明されている)。wg.Wait()
を別goroutineで起動させておかないと、全部完了するまで戻り値を返せなくなってしまうので、注意。
また、wg.Wait()
した後にclose(feedResult)
するのも忘れないように。チャネルはスライルのようにfor range
で回すことができ、これもまたstreamっぽいなと思うんだけど、close
しないと「他にもまだ書き込みがあるかも」となり、後段がブロックされてしまう。
上限付きの並行処理をさせたい場合はgoroutine + バッファ付きチャネルを使う
あまりにも並行でクロールしまくると、クロール先にも迷惑だし、動作させる環境のネットワークリソースも枯渇してしまう。そのため、並行でクロール自体はさせたいが、並行で走る処理数の上限は設けたい。こうしたことを実現したければバッファ付きのチャネルを使うとよい。バッファが埋まっていれば、席が空くまでブロックしてくれるので、並行処理数の上限になってくれる。
sem := make(chan struct{}, options.Concurrent) // 最大の処理数を制限 // 新しくgoroutineを動かすときにはsemに何か入れておく sem <- struct{}{} // 処理が終わったら、他の処理が動かせるように席を空けておく defer func() { <-sem }()
バッファ付きチャネルは本でいうと、3.3 チャネル(channel)
で説明されている。
並行処理におけるエラーハンドリング
並行処理におけるエラーハンドリングでは、呼び出し元に情報を失なわずに伝えてあげることが重要。そのため、結果(今回の場合はUrls
)だけでなく、エラー内容もペアになったstruct
を作っている。
type CrawlResult struct { Urls []string Source string // これはなくてもいいが、どの情報源からエラーが起きているかエラーメッセージに含めたかったので追加した Error error }
例えば、URLのリストを25個ずつのチャンクに分けるといった後段の処理において、クロールエラーの合計数が一定数以上であれば終了させる、といったことができるようになる。
- エラー数が一定以上になると、goroutineで起動している関数が
return
される - その結果、
defer close(chunk)
が呼ばれ、後段のチャネルを読んでいる処理も終了される
この辺りは本だと4.5 エラーハンドリング
に詳しく説明されている。
func splitToChunk(ctx context.Context, crawlResult <-chan CrawlResult, options ChunkingOptions) <-chan []string { chunk := make(chan []string) var totalErrCnt uint32 go func(crawlResult <-chan CrawlResult) { defer close(chunk) tmp := make([]string, 0) for result := range crawlResult { if result.Error != nil { fmt.Printf("error (%s) for processing %s\n", result.Error, result.Source) atomic.AddUint32(&totalErrCnt, 1) if totalErrCnt > uint32(options.AllowedErrorCount) { fmt.Printf("too many errors: totalErrCnt = %d\n", totalErrCnt) return } } for _, url := range result.Urls { tmp = append(tmp, url) if len(tmp) >= options.MaxChunkSize { chunk <- tmp tmp = make([]string, 0) } } } chunk <- tmp }(crawlResult) return chunk }
キャンセルを送れるようにする
クローラやクラウドのAPIコールなど外部リソースを叩く場合、適切にタイムアウトを仕込むことは大事。ある箇所でタイムアウトやキャンセルが起こった場合、適切に起動したgoroutineを終了させる必要がある。さもないと、本の4.3 ゴルーチンリークを避ける
で説明されているようなゴルーチンのリークが発生してしまう。4章の最初ではdone
チャネルを使って説明されているが、この辺りを抽象化しつつ使いやすくしたcontextパッケージを積極的に使っていこう。本では4.12 contextパッケージ
に詳しく説明されている。
所感
Go言語を書くときは並行処理を使う機会が結構あるものの、goroutineやchannelのことを雰囲気で使っていた(あまりちゃんと使えていなかった、が正確か...)が、「Go言語による並行処理」を読んだことで、そもそもどういうことができるのか、どういうことに気を付けるべきかあたりが体系的だったり、よく使うものに関してはデザインパターンのように知ることができた。読むだけだと実践するには少し難しい部分はあったが、砂場である程度手を動かしたら少しだけ自身が持てたので、必要な場所ではgoroutineとchannelを積極的に使っていきたいなと思った。