std::sync::mpsc::channelで少しハマったのでメモ

std::sync::mpsc - Rustの挙動についてちょっと知見を持ったので綴っていく。

やりたいこと

めっちゃ重たいタスクがN個あって、それぞれにスレッドを作ってmpsc::channel経由でメインスレッドに値を返してもらう例を考える。

use std::thread;
use std::sync::mpsc;

let (tx, rx) = mpsc::channel();
for costly_task in costly_tasks.into_iter() {
    let tx = mpsc::Sender::clone(&tx);
    thread::spawn(move || {
        let val = costly_task();
        tx.send(val).unwrap();
    });
}

for val in rx {
    dbg!(val);
}

これだと実は下のfor文を抜けることは出来ない。実際にplaygroundでやると強制的にkillされていることが確認できる。

play.rust-lang.org

ではどうするのか?というと for文の前で drop(tx); を呼んであげる必要がある。

for costly_task in costly_tasks.into_iter() {
 // ...
}

drop(tx);

for val in rx {
//  ...
}

僕みたいになぜ?となった方へこの問題をもう少し簡単にしてみる。ならなかった人はスター押してはてブ追加して頂ければ大丈夫です。

簡単にした例

use std::sync::mpsc;

let (tx, rx) = mpsc::channel();
let tx1 = mpsc::Sender::clone(&tx);

tx1.send(1).unwrap();

for val in rx {
    dbg!(val);
}

これも同様にfor文を抜けることが出来ない。というわけで drop(tx); drop(tx1); を追加してみると同様に抜けることができる。

play.rust-lang.org

どうやらfor _ in rx { ... }はsenderが値が送りきるのを待つ挙動をしているようだ、とのことでドキュメント 1 を確認しに行く。

Returns an iterator that will block waiting for messages, but never panic!. It will return None when the channel has hung up.

うんうん、推測は当たっていたようだ。Sender::send&self なので、1回値を送っただけではdropしない。なので、rxtx達が生きていることが分かっているので明示的にdropしてあげないとforから抜けれなかったわけか。

ということは

use std::thread;
use std::sync::mpsc;

// このtxは使ってないけど明示的にdropさせる必要がある
let (tx, rx) = mpsc::channel();
for costly_task in costly_tasks.into_iter() {
    // ここの tx は for が回るたびにスコープから抜ける => drop する
    let tx = mpsc::Sender::clone(&tx);
    thread::spawn(move || {
        let val = costly_task();
        tx.send(val).unwrap();
    });
}

// 明示的にdropしてあげる
drop(tx);

// 全部のSenderがdropしているのでこのforはちゃんと抜けられる
for val in rx {
    dbg!(val);
}

ってわけですね、少しハマったのでメモです

宣伝

今回の挙動を確認したりドキュメントを読んだりするのにrust-jpのSlackで質問を投げていました。活発なコミュニティで優しい方が多いので皆さん入りましょう!!

rust-jp.herokuapp.com


  1. 本当はinto_iterなんだけどなかったしiterでも同じでしょう