最近の砂場活動その26: goroutineとchannelを使って、クロールしつつ書き込みを並行に行なう

背景

最近、社内の他グループで「Go言語による並行処理」の読書会に参加させてもらっており、goroutineとchannelの理解が深まりつつある。自分は4章の前半と5章の後半を担当してます。

Go言語による並行処理

Go言語による並行処理

Amazon

しかし「読むのは読めるが、自分で同じようなことが書けたり思い付いたりするかは別問題だな...」という感じている。手を動かさないとダメだなと思ったので、ML-Newsのクローラを題材に遊んでみた。正直、やりたいことに対してやっている内容はオーバーエンジニアリング感は否めないのだが、砂場で遊ぶにはもってこいの内容だった。

元々やっていたことと、goroutineとchannelの練習のために課した追加の仕様

やる内容はとてもシンプル。

  • 1: いくつかの主要なRSSのリストを元に、最近のエントリの一覧をクロールする
  • 2: クロールした結果をDynamoDBに書き込む
    • 2.1: Batchでまとめて書き込むのだが、DynamoDBの仕様として一度に書き込めるのは最大で25個という仕様があり、25個のチャンクに分割して書き込む必要がある

今回は「Go言語による並行処理」で勉強した内容を踏まえて、以下のように内容を変更した。

  • 1: いくつかの主要なRSSのリストを元に、最近のエントリの一覧をクロールする。ただし、クロールは最大10並列で行なう
  • 2: クロールした結果をDynamoDBに書き込む
    • ただし、1のクロールした結果が出揃ってから書き込むのではなく、クロールの処理が終わったものがあれば他の結果を待たずして書き込む
      • 書き込むのはなるべく25個揃うのが待つ
    • 一つのサイトでクロールエラーが発生しても気にしないが、一定数以上のクロールが失敗したら、他を含めて終了の処理に入るようにする

準備

ひとまず今回の仕様の修正に関係ないパーツの部分の説明。

サイトのURLを渡すと、エントリの一覧を取ってくる関数。特に面白いところはない。

import (
    "context"
    "github.com/mmcdole/gofeed"
)

func GetUrlsFromFeed(ctx context.Context, feedUrl string) ([]string, error) {
    urls := make([]string, 0)
    fp := gofeed.NewParser()
    feed, err := fp.ParseURLWithContext(feedUrl, ctx)
    if err != nil {
        return nil, err
    }
    now := time.Now()
    for _, item := range feed.Items {
        urls = append(urls, item.Link)
        }
    }
    return urls, nil
}

最終的にはこういう形で動くものを作りたい。

  • 1の要件を満たすため、GetUrlsFromSourceRSSListは並行でクロールさせたい
  • 2の要件を満たすため、splitToChunkWriteUrlsToDynamoDBは1にブロックされず、できあがったところから処理させたい
FeedFetcherOptions := FeedFetcherOptions{
    Concurrent:      10,
    TimeoutDuration: time.Second * 10,
}
feedResult := GetUrlsFromSourceRSSList(ctx, getSourceUrls(), FeedFetcherOptions)

for chunk := range splitToChunk(ctx, feedResult, chunkingOptions) {
    fmt.Println("Writing batch...")
    WriteUrlsToDynamoDB(ctx, svc, chunk)
}

やったこと

コード概要

概観としては以下のような感じになった(クリックすると内容が展開されます)。

並行にクロールさせるためのコード

type CrawlResult struct {
    Urls   []string
    Source string
    Error  error
}

type FeedFetcherOptions struct {
    Concurrent      int
    TimeoutDuration time.Duration
}

func GetUrlsFromSourceRSSList(
    ctx context.Context,
    feedUrls []string,
    options FeedFetcherOptions,
) <-chan CrawlResult {
    feedResult := make(chan CrawlResult)
    var wg sync.WaitGroup
    sem := make(chan struct{}, options.Concurrent)

    crawl := func(ctx context.Context, feedUrl string) {
        defer func() { <-sem }()
        defer wg.Done()

        result := CrawlResult{Source: feedUrl}
        urls, err := GetUrlsFromFeed(ctx, feedUrl)
        if err != nil {
            result.Error = err
        } else {
            result.Urls = urls
        }
        feedResult <- result
    }

    wg.Add(len(feedUrls))
    go func() {
        for _, feedUrl := range feedUrls {
            sem <- struct{}{}
            ctx, cancel := context.WithTimeout(ctx, options.TimeoutDuration)
            go func(feedUrl string) {
                defer cancel()
                crawl(ctx, feedUrl)
            }(feedUrl)
        }
    }()

    go func() {
        wg.Wait()
        close(feedResult)
    }()

    return feedResult
}

DynamoDBへの書き込みのために適切なチャンクに分割するコード

type ChunkingOptions struct {
    MaxChunkSize      int
    AllowedErrorCount int
}

func splitToChunk(ctx context.Context, crawlResult <-chan CrawlResult, options ChunkingOptions) <-chan []string {
    chunk := make(chan []string)
    var totalErrCnt uint32

    go func(crawlResult <-chan CrawlResult) {
        defer close(chunk)

        tmp := make([]string, 0)
        for result := range crawlResult {
            if result.Error != nil {
                fmt.Printf("error (%s) for processing %s\n", result.Error, result.Source)
                atomic.AddUint32(&totalErrCnt, 1)
                if totalErrCnt > uint32(options.AllowedErrorCount) {
                    fmt.Printf("too many errors: totalErrCnt = %d\n", totalErrCnt)
                    return
                }
            }
            for _, url := range result.Urls {
                tmp = append(tmp, url)
                if len(tmp) >= options.MaxChunkSize {
                    chunk <- tmp
                    tmp = make([]string, 0)
                }
            }
        }
        chunk <- tmp
    }(crawlResult)
    return chunk
}

見所ポイントはいくつかあるので、それぞれ紹介する。

戻り値は読み込み専用のチャネル

1のクロールの処理を待たず、終わったものからDynamoDBに書き出すという2の仕様を満たすため、戻り値は「読み込み専用」の「チャネル」(<-chan CrawlResult)にしている。それぞれ説明する。

まず、結果が(スライスなどではなく)チャネルであるという点について。以下のコードように、並行でクロールはさせるものの別のゴルーチンで走らせており、return feedResultで次の処理に渡すことができる。

   wg.Add(len(feedUrls))
    go func() {
        for _, feedUrl := range feedUrls {
            sem <- struct{}{}
            ctx, cancel := context.WithTimeout(ctx, options.TimeoutDuration)
            go func(feedUrl string) {
                defer cancel()
                crawl(ctx, feedUrl)
            }(feedUrl)
        }
    }()

    go func() {
        wg.Wait()
        close(feedResult)
    }()

    return feedResult

「Go言語による並行処理」ではチャネルを表わす変数名にstreamと入っていることが多いのだが、処理が出揃わなくても他に処理を渡せる流動的なものという意味でstreamという名前はなるほどと思った。なお、別のゴルーチンでクローラを起動(上のコードの2行目)させないと、クロールが全て終わるまでreturnできずに後段の処理がブロックされてしまうので注意が必要。私ははまった。

また、チャネルが「読み込み専用」というのも重要なポイント。読み込み専用であるということが型で明示されているため、意図せぬところでcloseされてしまうということはないし、このチャネルの生産者であるGetUrlsFromSourceRSSListcloseするまでの責任を持たなければならない、ということが表現できている(破ろうとすればコンパイル時点で叩き落としてくれる)。本でいうと4.1 拘束で詳しく説明されている。

goroutineの終了まで待つ + 終了したらチャネルを閉じる

並行で走らせるものの、全てのサイトをクロールし終わるまではプログラムは終了して欲しくないので、WaitGroupで待たせるようにする(本では3.2 syncパッケージで説明されている)。wg.Wait()を別goroutineで起動させておかないと、全部完了するまで戻り値を返せなくなってしまうので、注意。

また、wg.Wait()した後にclose(feedResult)するのも忘れないように。チャネルはスライルのようにfor rangeで回すことができ、これもまたstreamっぽいなと思うんだけど、closeしないと「他にもまだ書き込みがあるかも」となり、後段がブロックされてしまう。

上限付きの並行処理をさせたい場合はgoroutine + バッファ付きチャネルを使う

あまりにも並行でクロールしまくると、クロール先にも迷惑だし、動作させる環境のネットワークリソースも枯渇してしまう。そのため、並行でクロール自体はさせたいが、並行で走る処理数の上限は設けたい。こうしたことを実現したければバッファ付きのチャネルを使うとよい。バッファが埋まっていれば、席が空くまでブロックしてくれるので、並行処理数の上限になってくれる。

sem := make(chan struct{}, options.Concurrent) // 最大の処理数を制限

// 新しくgoroutineを動かすときにはsemに何か入れておく
sem <- struct{}{}

// 処理が終わったら、他の処理が動かせるように席を空けておく
defer func() { <-sem }()

バッファ付きチャネルは本でいうと、3.3 チャネル(channel)で説明されている。

並行処理におけるエラーハンドリング

並行処理におけるエラーハンドリングでは、呼び出し元に情報を失なわずに伝えてあげることが重要。そのため、結果(今回の場合はUrls)だけでなく、エラー内容もペアになったstructを作っている。

type CrawlResult struct {
    Urls   []string
    Source string // これはなくてもいいが、どの情報源からエラーが起きているかエラーメッセージに含めたかったので追加した
    Error  error
}

例えば、URLのリストを25個ずつのチャンクに分けるといった後段の処理において、クロールエラーの合計数が一定数以上であれば終了させる、といったことができるようになる。

  • エラー数が一定以上になると、goroutineで起動している関数がreturnされる
  • その結果、defer close(chunk)が呼ばれ、後段のチャネルを読んでいる処理も終了される

この辺りは本だと4.5 エラーハンドリングに詳しく説明されている。

func splitToChunk(ctx context.Context, crawlResult <-chan CrawlResult, options ChunkingOptions) <-chan []string {
    chunk := make(chan []string)
    var totalErrCnt uint32

    go func(crawlResult <-chan CrawlResult) {
        defer close(chunk)

        tmp := make([]string, 0)
        for result := range crawlResult {
            if result.Error != nil {
                fmt.Printf("error (%s) for processing %s\n", result.Error, result.Source)
                atomic.AddUint32(&totalErrCnt, 1)
                if totalErrCnt > uint32(options.AllowedErrorCount) {
                    fmt.Printf("too many errors: totalErrCnt = %d\n", totalErrCnt)
                    return
                }
            }
            for _, url := range result.Urls {
                tmp = append(tmp, url)
                if len(tmp) >= options.MaxChunkSize {
                    chunk <- tmp
                    tmp = make([]string, 0)
                }
            }
        }
        chunk <- tmp
    }(crawlResult)
    return chunk
}

キャンセルを送れるようにする

クローラやクラウドのAPIコールなど外部リソースを叩く場合、適切にタイムアウトを仕込むことは大事。ある箇所でタイムアウトやキャンセルが起こった場合、適切に起動したgoroutineを終了させる必要がある。さもないと、本の4.3 ゴルーチンリークを避けるで説明されているようなゴルーチンのリークが発生してしまう。4章の最初ではdoneチャネルを使って説明されているが、この辺りを抽象化しつつ使いやすくしたcontextパッケージを積極的に使っていこう。本では4.12 contextパッケージに詳しく説明されている。

所感

Go言語を書くときは並行処理を使う機会が結構あるものの、goroutineやchannelのことを雰囲気で使っていた(あまりちゃんと使えていなかった、が正確か...)が、「Go言語による並行処理」を読んだことで、そもそもどういうことができるのか、どういうことに気を付けるべきかあたりが体系的だったり、よく使うものに関してはデザインパターンのように知ることができた。読むだけだと実践するには少し難しい部分はあったが、砂場である程度手を動かしたら少しだけ自身が持てたので、必要な場所ではgoroutineとchannelを積極的に使っていきたいなと思った。

Go言語による並行処理

Go言語による並行処理

Amazon