はじめに
どうも、最近「FLASH」を見たいがために Hulu を契約したら意外と面白いコンテンツ多くてなんのために契約したのか忘れたけんつです。
いつもは「 Redis はフェチ」なんていうよくわからないことを言っていますが今日は MySQL の話です。
といってもジョブキューに応用したいなって話なのである程度は平常運転です。
何をやるか
MySQL の BLACKHOLE Engine というストレージエンジンとみんな大好きバイナリログを使って雑にジョブキュー的な何かを作ってみようねということをやります。
ジョブキューと言ってしまうと大層なものを考えてしまいそうなので、ただのリアルタイムハンドラーを作ってみただけの話です。
暇になったらジョブキューを作ってみようかなと思っていますが、Redis 使ったほうが絶対いいでしょっていうのが作ってみた感想です。
以下が雑に作ったレポジトリです。
github.com
どうしてやるか
できると思ったから。
BLACKHOLE Engine とは
BLACKHOLE Engine とは「データは受け取るが保存しない」という特性を持つストレージエンジンのこと。Linux で言うところの /dev/null のデータベース版と考えてくれれば大体は良いはず。
dev.mysql.com
詳しくはこのドキュメントを見てもらえればわかると思う。
色々と使いみちはあるが、実データが存在しないから autoincrement が使えなかったりする点とかまぁまぁ注意が必要な部分がある。
そして今回のポイントとして、実データは保持しないがバイナリログにイベントは残るという点もある意味では注目するべき観点なのかなと思っている。
バイナリログとは
バイナリログにはcreate/drop/insert/update/deleteとか、ある意味での CRUD 的な処理がトランザクションが完了した段階でイベントとして記録されるログファイルのこととなっている。
公式ドキュメントは以下のもの。
dev.mysql.com
このブログが割とライトでわかりやすい。
purple-jwl.hateblo.jp
本来はデータのリカバリとかに使われたりする。
実装する
MySQL (Master - Slave 構成)
とりあえず、DB は MySQL 5.7 を使用した。
Master Slave 構成になっているけど、実際は Master だけで出来ることがわかったので Slave は存在しているが使っていない。
両方共バイナリログ周りの設定をしている。
version: '3' services: master: build: ./docker/mysql/master ports: - "3306:3306" environment: - MYSQL_ROOT_PASSWORD=root expose: - "3306" slave: build: ./docker/mysql/slave image: mysql:5.7 depends_on: - master environment: - MYSQL_ROOT_PASSWORD=root - MYSQL_MASTER_SERVER=master - MYSQL_MASTER_ROOT_PASS=root - MYSQL_MASTER_WAIT_TIME=5 expose: - "3306" ports: - "3307:3306"
#master cnf [mysqld] log-bin=/var/log/mysql/bin-log max_binlog_size=256M expire_logs_days=2 innodb_flush_log_at_trx_commit=1 sync_binlog=1 sysdate_is_now server-id=1
#slave cnf [mysqld] log-bin=/var/log/mysql/bin-log max_binlog_size=256M expire_logs_days=2 innodb_flush_log_at_trx_commit=1 sync_binlog=1 sysdate_is_now server-id=2
# init.sql create database blackhole; create table blackhole.queue (msg text) ENGINE = BLACKHOLE;
BlackHole Engine を設定してテーブルを作る。
Golang 実装
package main import ( "context" "fmt" "github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go-mysql/replication" ) func main() { cfg := replication.BinlogSyncerConfig{ ServerID: 1, Flavor: "mysql", Host: "127.0.0.1", Port: 3306, User: "root", Password: "root", } syncer := replication.NewBinlogSyncer(cfg) streamer, err := syncer.StartSync(mysql.Position{}) if err != nil { panic(err) } ch1 := make(chan string, 4) go receiver(ch1) for { ev, _ := streamer.GetEvent(context.Background()) if ev.Header.EventType.String() == "WriteRowsEventV2" { size := int(ev.RawData[32]) value := string(ev.RawData[33:34+size]) ch1 <- value } } } func receiver(ch <-chan string) { for { value := <- ch fmt.Println(value) } }
これだけで出来る。
channel に対してバッファのサイズを 4 としているため、同時に 4 つ以上のデータを受け取った場合 FIFO 的な動作をするようになっている。
また go-mysql が何故か binlog をリアルタイムでハンドリングする機能を搭載しているのでそれを使っている。
これの辛い所
- このジョブキューもどきが死んだとき、どのイベントで死んだかどうかを記録しないと復旧時にバイナリログに記載されているすべてのイベントがロードされてしまうので完了したタスクを再度走らせてしまいそう。
- 一応ログとしてすべてのイベントを持っているが↑に関連してアトミックにイベントを扱うことが難しそう
Position を的確に取るべきだったけど、docker container ベースにしたから権限周りがだるくてやらなかった
- バイナリログのパーサがないと辛い
- Canal を何故かパッケージで認識していないからパーサーが使えなかった
if ev.Header.EventType.String() == "WriteRowsEventV2" { size := int(ev.RawData[32]) value := string(ev.RawData[33:34+size]) ch1 <- value }
何でここで 32 番目を参照しているかというと、 ナマのバイナリでは 32 文字目に Values のサイズが記載されているため
これはテーブルのサイズによるのでパーサーがないと微妙につらい。
まとめ
Redis で Pub/Sub とか使って作った方がいい。
4時間あれば意外といける。