KRAZY感情TEXTYLE

"くれいじー かんじよう てきしたいる" と読みます

sshuttleっていうOSSにcontributeした

色々あって sshuttle/sshuttle というPythonで書かれたOSSソースコードを読んでいたのですが、デグレっぽいところがあったのでPR投げてマージされました。
github.com

これまで能動的なOSSプロダクトへのコントリビュートは予めIssueが立っているものの修正や、ドキュメントへのtypoぐらいだったんですけど初めてソースコードを眺めて自分から*1見つけることができたので嬉しいです。でも本質のソケットプログラミング的な部分はマジで何もわからんので助けて

*1: 僕が見つけて「これデグレってる?」って直接聞いてくれたのは同僚でした、感謝

ISUCON10にTrust Rustチームとして参加して予選敗退しました

てがきはてなブログがリリースされた日に書いたんですけど誤操作で消しちゃって萎えてましたが再度書きました。一緒に出てくれてた id:dekokunid:stefafafan には感謝です。僕は実装を読んでゴリゴリやる役目だったのに筋力が足りなくて全然出来ませんでした、来年はもっと強くなります。 f:id:Krouton:20201007170640j:plain

チームメイトのエントリ

具体的に何したとかはチームメイトの奴を読んだほうが早いです

dekotech.dekokun.info

stefafafan.hatenablog.com

std::sync::mpsc::channelで少しハマったのでメモ

std::sync::mpsc - Rustの挙動についてちょっと知見を持ったので綴っていく。

やりたいこと

めっちゃ重たいタスクがN個あって、それぞれにスレッドを作ってmpsc::channel経由でメインスレッドに値を返してもらう例を考える。

use std::thread;
use std::sync::mpsc;

let (tx, rx) = mpsc::channel();
for costly_task in costly_tasks.into_iter() {
    let tx = mpsc::Sender::clone(&tx);
    thread::spawn(move || {
        let val = costly_task();
        tx.send(val).unwrap();
    });
}

for val in rx {
    dbg!(val);
}

これだと実は下のfor文を抜けることは出来ない。実際にplaygroundでやると強制的にkillされていることが確認できる。

play.rust-lang.org

ではどうするのか?というと for文の前で drop(tx); を呼んであげる必要がある。

for costly_task in costly_tasks.into_iter() {
 // ...
}

drop(tx);

for val in rx {
//  ...
}

僕みたいになぜ?となった方へこの問題をもう少し簡単にしてみる。ならなかった人はスター押してはてブ追加して頂ければ大丈夫です。

簡単にした例

use std::sync::mpsc;

let (tx, rx) = mpsc::channel();
let tx1 = mpsc::Sender::clone(&tx);

tx1.send(1).unwrap();

for val in rx {
    dbg!(val);
}

これも同様にfor文を抜けることが出来ない。というわけで drop(tx); drop(tx1); を追加してみると同様に抜けることができる。

play.rust-lang.org

どうやらfor _ in rx { ... }はsenderが値が送りきるのを待つ挙動をしているようだ、とのことでドキュメント 1 を確認しに行く。

Returns an iterator that will block waiting for messages, but never panic!. It will return None when the channel has hung up.

うんうん、推測は当たっていたようだ。Sender::send&self なので、1回値を送っただけではdropしない。なので、rxtx達が生きていることが分かっているので明示的にdropしてあげないとforから抜けれなかったわけか。

ということは

use std::thread;
use std::sync::mpsc;

// このtxは使ってないけど明示的にdropさせる必要がある
let (tx, rx) = mpsc::channel();
for costly_task in costly_tasks.into_iter() {
    // ここの tx は for が回るたびにスコープから抜ける => drop する
    let tx = mpsc::Sender::clone(&tx);
    thread::spawn(move || {
        let val = costly_task();
        tx.send(val).unwrap();
    });
}

// 明示的にdropしてあげる
drop(tx);

// 全部のSenderがdropしているのでこのforはちゃんと抜けられる
for val in rx {
    dbg!(val);
}

ってわけですね、少しハマったのでメモです

宣伝

今回の挙動を確認したりドキュメントを読んだりするのにrust-jpのSlackで質問を投げていました。活発なコミュニティで優しい方が多いので皆さん入りましょう!!

rust-jp.herokuapp.com


  1. 本当はinto_iterなんだけどなかったしiterでも同じでしょう

Derive macrosを使ってstructにBuilder Patternを生やしてくれるやつを書いてみた

こんばんは、id:Krouton です。ISUCON10にRustで参加した話をてがきはてなブログで書こうと思ったら消えたので萎えました。ISUCON11までには書きます。多分...。

本題

Derive macros使ってますか?serdeとか使うと出てくる #[derive(Serialize, Deserialize)]の部分ですね。

Derive macrosでstructにBuilder Patternの実装を提供してくれる良さそうなライブラリ*1があって、練習のために実装の真似をしてました

use proc_builder_pattern::Builder;

#[derive(Default, Builder)]
struct Point {
    x: usize,
    y: usize,
}

ってやったら

struct Point {
  x: usize,
  y: usize,
}

impl Point {
    fn x(self, val: usize) -> Self {
        Self { x: val, ..self }
    }
}

impl Point {
    fn y(self, val: usize) -> Self {
        Self { y: val, ..self }
    }
}

Point::default().x(1).y(2)

って感じに使うことができます。 最初は解説を書こうと思ったけど実装とドキュメント読めば自明じゃね!?って思ったので実装したリポジトリだけ置きます、参考にしたライブラリは初期化してないメンバを検知する機構があったり、Intoを満たしてるように変換したりしてて便利だったので実用するならそっちを使ってください

github.com

binutilsが入ってるとopamでswitch createした時のビルドに失敗するっぽい?

➜  ~ opam switch create 4.09.0

<><> Gathering sources ><><><><><><><><><><><><><><><><><><><><><><><><><><>  🐫 
[ocaml-base-compiler.4.09.0] downloaded from cache at https://opam.ocaml.org/cache

<><> Processing actions <><><><><><><><><><><><><><><><><><><><><><><><><><>  🐫 
∗ installed base-bigarray.base
∗ installed base-threads.base
∗ installed base-unix.base
[ERROR] The compilation of ocaml-base-compiler failed at "/Users/krouton/.opam/opam-init/hooks/sandbox.sh build make -j15 world".

#=== ERROR while compiling ocaml-base-compiler.4.09.0 =========================#
# context     2.0.7 | macos/x86_64 |  | https://opam.ocaml.org#0f1d4fba
# path        ~/.opam/4.09.0/.opam-switch/build/ocaml-base-compiler.4.09.0
# command     ~/.opam/opam-init/hooks/sandbox.sh build make -j15 world
# exit-code   2
# env-file    ~/.opam/log/ocaml-base-compiler-4548-6cc03f.env
# output-file ~/.opam/log/ocaml-base-compiler-4548-6cc03f.out
### output ###
# ld: symbol(s) not found for architecture x86_64
# [...]
#   "_caml_weak_set", referenced from:
#       _caml_builtin_cprim in prims.o
#   "_main", referenced from:
#      implicit entry/start for main executable
# ld: symbol(s) not found for architecture x86_64
# clang: error: linker command failed with exit code 1 (use -v to see invocation)
# clang: error: linker command failed with exit code 1 (use -v to see invocation)
# make[1]: *** [ocamlrund] Error 1
# make[1]: *** Waiting for unfinished jobs....
# make[1]: *** [ocamlruni] Error 1
# make: *** [coldstart] Error 2



<><> Error report <><><><><><><><><><><><><><><><><><><><><><><><><><><><><>  🐫 
┌─ The following actions failed
│ λ build ocaml-base-compiler 4.09.0
└─ 
┌─ The following changes have been performed (the rest was aborted)
│ ∗ install base-bigarray base
│ ∗ install base-threads  base
│ ∗ install base-unix     base
└─ 

<><> ocaml-base-compiler.4.09.0 troubleshooting <><><><><><><><><><><><><><>  🐫 
=> A failure in the middle of the build may be caused by build parallelism
      (enabled by default).
      Please file a bug report at https://github.com/ocaml/ocaml/issues
=> You can try installing again including --jobs=1
      to force a sequential build instead.
Switch initialisation failed: clean up? ('n' will leave the switch partially installed) [Y/n] Y

特になぜとかは調べてないが、多分同様に詰まる人がいるかもしれないしまた詰まりそうなので自分用にメモを残します
ここではunlinkしているが、私の環境ではuninstallをしないと入らなかった。
github.com

goroutineで自作ランタイム上にsetTimeoutを再現する(実装の手順とcallback-hellもついてくるよ!!)

こんばんは、id:Kroutonです。RustのFutureについて調べていたはずが*1いつの間にかcallbackでsetTimeoutを再現するコードをGoで書いていたのでその実装手順について簡単に書きたいと思います。実用性はほぼないです。

注意書き

筆者がプログラミングを始めた頃にはもうES6(というかBluebirdを使ったPromise)が広まりかけた頃なので、体験したことありませんし間違ってるかもしれません。また、本エントリを読んだことによるPTSDやフラッシュバックなどの症例については一切の責任を負いません。

要件

var rt = NewRuntime()

func main() {
	rt.run(func() {
		println("これからsetTimeoutをやっていきます")
		setTimeout(1500, func() {
			println(3)
		})
		setTimeout(2000, func() {
			println(4)
		})
		setTimeout(2, func() {
			println(1)
			setTimeout(3000, func() {
				println("こんな風にネストもできるよ、はいおしまい!")
			})
		})
		setTimeout(1000, func() {
			println(2)
		})
		println("1->2->3->4の順番で出るはず")
	})
}

標準出力

これからsetTimeoutをやっていきます
1->2->3->4の順番で出るはず
1
2
3
4
こんな風にネストもできるよ!はいおしまい!

Playground

play.golang.org

初手

package main

import (
	"time"
)

// TODO: そのうちフィールドを用意する
type Runtime struct{}

func NewRuntime() *Runtime {
	return &Runtime{}
}

func (rt *Runtime) run(program func()) {
	program()
}

var runtime = NewRuntime()

// とりあえず今はsleepして実行するだけ
func setTimeout(ms int, callback func()) {
	time.Sleep(time.Duration(ms) * time.Millisecond)
	callback()
}

func main() {
	runtime.run(func() {
		println("開始")
		setTimeout(100, func() {
			println(2)
		})
		// 待つ時間が少ないこっちが先に実行されたい
		setTimeout(20, func() {
			println(1)
		})
                 setTimeout(1000, func() {
		        println("終わり")
                 })
	})
}

このコードは

開始
2
1
終わり

の順に出力されるので改良が必要ですね(といっても並行処理も何もしてないの当たり前ですが)

setTimeoutでgoroutineを呼んでみる

setTimeoutの実装を次のように変えてみましょう

func setTimeout(ms int, callback func()) {
	go func() {
		time.Sleep(time.Duration(ms) * time.Millisecond)
		callback()
	}()
}

そうすると、実行結果が

開始

になりますね。goroutine自体は作れているのですが、その中でprintlnを呼ぶ前にmainの処理が終わってしまうのでこのようになってしまいます。
なので、次はmain関数が終わらないようにしてみましょう。

メインスレッドを無限ループさせる

Runtime#runを次のようにしましょう

func (rt *Runtime) run(program func()) {
	program()
	for {
	}
}

こうすると結果は

開始
まずはこっち
OK
終わり
...(ただし実行は終わらない)

と正しい順番になりつつも無限ループが終わらないといった感じになりますね。ここまで来たら、channelを用意してgoroutine内で値を上手く渡せば無限ループをbreakできそうな感じがしませんか?というわけでそろそろRuntimeのフィールドを追加しましょう。

Runtimeにフィールドを追加してsetTimeoutを実装しきる

type Runtime struct {
	nextID    int
	callbacks map[int]func()
	recv      chan int
}

func NewRuntime() *Runtime {
	recv := make(chan int)
	return &Runtime{recv: recv, callbacks: map[int]func(){}}
}

func (rt *Runtime) register(callback func()) int {
	registeredID := rt.nextID
	rt.callbacks[registeredID] = callback
	rt.nextID++
	return registeredID
}

func setTimeout(ms int, callback func()) {
	callbackID := runtime.register(callback)
	go func() {
		time.Sleep(time.Duration(ms) * time.Millisecond)
		runtime.recv <-callbackID
	}()
}

setTimeoutが呼ばれると、Runtimeのcallbacksにユニークなid(この場合はint)をkey、渡されたcallbackをvalueとしてmapに追加します。そして、goroutine内で指定された時間だけ待った後、 用意したchannel(この場合はruntime.recv)にmapに追加したkeyを送信します。

完成

func (rt *Runtime) run(program func()) {
	program()
	for {
		callbackID := <-rt.recv
		callback := rt.callbacks[callbackID]
		delete(rt.callbacks, callbackID)
		callback()
		if len(rt.callbacks) == 0 {
			break
		}
	}
}

最後はRuntime#runを弄って完成です。 setTimeoutで送信されたkeyを受信して、そのcallbackを引いてきてmapから削除し、実行します。登録されたcallbackがすべて空になったら無限ループが終わる。といった感じです。

あとがき

本当はRustを使ったmspcで書きたかったけどGoの方需要がある気がしたので書きなおしました、まあチャンネルがある言語なら大体同じだと思うので好きな言語で書いてみてください、全然わからんとか間違えてるとかご意見あったら
https://twitter.com/Krout0nまで

追記 2020/07/06

package main
<200b>
import (
	"sync"
	"time"
)
<200b>
func run(program func()) {
	program()
	wg.Wait()
	println("Done")
}
<200b>
var wg = &sync.WaitGroup{}
<200b>
func setTimeout(ms int, callback func()) {
	wg.Add(1)
	go func() {
		time.Sleep(time.Duration(ms) * time.Millisecond)
		callback()
		wg.Done()
	}()
}
<200b>
func main() {
	run(func() {
		println("開始")
		setTimeout(100, func() {
			println("OK")
		})
		// 待つ秒数が少ないこっちが先に実行されたい
		setTimeout(20, func() {
			println("まずはこっち")
		})
		setTimeout(1000, func() {
			println("終わり")
		})
	})
}

これだけでいいらしい、goroutine周りの道具がしっかりあってGoいい感じですね

*1: Introduction - Futures Explained in 200 Lines of Rust このRuntimeのコードもほぼここに乗ってる実装と同じです。 cssamsonさんに圧倒的感謝

thread_localメモ

自分用覚書、thread_local!で定義したstatic変数はスレッド毎に LocalKeyという構造体でwrapされる。各スレッド変数はそれぞれ外側に不変である、可変参照をしたいならRefCellを被せたりする。

use std::cell::RefCell;

thread_local! {
    static X: RefCell<Vec<usize>> = RefCell::new(vec![]);
}

fn main() {
    X.with(|v| {
        println!("initial {:?}", v.borrow());
    });

    let j = std::thread::spawn(move || {
        X.with(|v| {
            v.borrow_mut().push(1000);
            println!("other thread {:?}", v.borrow());
        });
    });
    j.join().unwrap();
    X.with(|v| {
        println!("after other thread {:?}", v.borrow());
        v.borrow_mut().push(1024);
    });

    main_other_fn();
}

fn main_other_fn() {
    X.with(|v| {
        println!("main_other_fn {:?}", v.borrow());
    });
}

initial
other thread [1000]
after other thread

main_other_fn [1024]