MySQLのレプリケーションプロトコルを使ったBinlogイベントを処理できるライブラリを作った

久々にライブラリを作って Rubygems に publish した。

github.com

ライブラリの名前は MysqlReplicator とした。これは MySQL の Binlog イベントを Ruby のプログラムで受け取って自由に処理を書くためのライブラリ。

MySQL にはレプリケーションプロトコルというのがあって、これを使うと自分の書いたプログラムが接続した MySQL のレプリカとして振る舞うことが可能になる。
要は Binlog イベントをプログラムで受け取って処理することができる。

どういうユースケースで使えるかというと、例えば Binlog イベントで INSERT / UPDATE / DELETE 文の実行結果を受け取って、Elasticsearch や DynamoDB といった別のデータベースにデータを同期する、といったことができる。

mysqldump を使って Binlog ファイルをストリーミング読み込みする手もあるのだけど、MySQL サーバを docker で動かす場合別の docker から読むのが大変なので、接続できればいいレプリケーションプロトコルの方が実装は大変だけど環境構築はやりやすいので、レプリケーションプロトコルを活用するためのライブラリを作った。

モチベーション

今年の7月に AWS OpenSearch が RDS からのデータ同期をサポートしたリリースがあって、これは自社サービスで使えそうならぜひ検討したいと思ったのがきっかけ。

今まで MySQL のデータを Elasticsearch に同期する場合、ジョブキューを用意して、何らかの原因でリクエストに失敗したらデッドレターキューに入れて再送して・・とインフラコストやアーキテクチャが大きくなってしまうのが課題感としてあって、同期処理を AWS 側でやってくれるんだったら最高だなと思った。

ただ、仮にこれを採用するとなった場合、今度は開発環境どうするという問題が起きる。なるべく開発環境の挙動とステージング/本番の挙動は揃えたい。

記事を追っていくと、どうもログベースで同期処理をかける仕組みのようだ。そういえば MySQL は Binlog を見れば追加/更新/削除した行データが分かるな、よしローカルの開発環境でも Binlog イベントを受け取って Elasticsearch にデータを投げるようにすれば挙動を揃えられるじゃんという。

自社のサービスは Ruby on Rails を採用しているので、Ruby でやりたい。
初めは mysql2 gem で出来ないか見てみたところ、mysql2 はレプリケーションプロトコルには対応していないことが分かった。

Rubygems で Binlog イベントを受け取れるライブラリはないかなと探したのだけど、無さそうだったので自作するしかなさそうだったというのと、レプリケーションに関する知識も深まりそうなので作ってみたくなったのがきっかけ。
アプリケーションロジックとは切り離された別プロセスになるのでアプリケーションへの影響もないし。

何か参考になるものはないかなーと調べてみると、過去に似たようなことを試していた事例は見つけたのだけど、今は亡き Bitbucket のリンクしかなく無念(mysql-replication-listener 自体もう10年以上メンテナンスされてないが・・) https://so-wh.at/entry/20120827/p1

ちなみに Golanggo-mysqlレプリケーションプロトコルに対応している模様。 go-mysqlを使ったレプリケーション この Qiita も書いているのは先のブログと同じ winebarrel さんだった。先駆者すぎる。

MySQLレプリケーションプロトコルについて

最初に書いたように、自分のプログラムを MySQL サーバのレプリカとして振る舞わせる、Binlog イベントをリアルタイムで受け取れるようにするための仕組み。

これを実装するための前段として、プログラムから MySQL サーバの認証を通し、プログラムから SQL を実行できるようにする必要があるので、記事にしておいた。

Rubyでcaching_sha2_password認証を使ってMySQLに接続する
RubyのTCPソケットでMySQLにクエリを発行する

レプリケーションプロトコルを使ってやり取りするには、普段 MySQLサーバをレプリカとして設定する際と同様の命令をプログラムからパケット送信して実行すればいい。

こんな感じでプログラムからやることになります。

  1. SHOW MASTER STATUS クエリを実行し、Binlog のファイル名と読み取り位置を取得する
  2. SHOW VARIABLES LIKE "binlog_checksum"クエリを実行し、チェックサムのあり/なしを取得する
  3. COM_REGISTER_SLAVE コマンドを実行し、プログラムをレプリカとして登録する
  4. COM_BINLOG_DUMP コマンドを実行し、Binlog イベントをストリームで受け取れるようにする
  5. Binlog イベントのパケットを受信して、イベントタイプごとに処理をする

公式のドキュメントはここ
使うことになるイベントタイプはこの辺り。

  • ROTATE_EVENT
    • Binlog ファイルのローテーションが起きたときに発火されるイベント
  • FORMAT_DESCRIPTION_EVENT
    • レプリケーション接続の開始時に初期化のための情報を取得するために発火されるイベント
  • QUERY_EVENT
  • TABLE_MAP_EVENT
    • 行データの変更前に対象になるテーブル情報を取得するために発火されるイベント
  • WRITE_ROWS_V2_EVENT
    • 行データが挿入された時に発火されるイベント
    • 行データは複数になることもある
  • UPDATE_ROWS_V2_EVENT
    • 行データが更新された時に発火されるイベント
    • 行データは複数になることもある
    • 変更前と変更後の行データを両方受け取れる
  • DELETE_ROWS_V2_EVENT
    • 行データが削除された時に発火されるイベント
    • 行データは複数になることもある
  • XID_EVENT

公式を見れば各種イベントタイプごとのパケットの仕様が書かれている...かと思いきや、共通のヘッダー部くらいしか載っていなかったりするので参考にならない。
MySQL のヘッダーファイルを見ると、各種イベントタイプの仕様がコメントでしっかり書かれているので、これを見るのが一番良いと思います。

https://github.com/mysql/mysql-server/blob/8.0/libbinlogevents/include/statement_events.h https://github.com/mysql/mysql-server/blob/8.0/libbinlogevents/include/rows_event.h

後は書かれている仕様通りに各種イベントタイプを処理するためのパーサーを愚直に書いていけばいいです。

MysqlReplicator を作っていて、地味にしんどかったのは MySQLJSON 型は当たり前なんですけどバイナリーフォーマットで保存されているので、Binary JSON 用のパーサーも作らないといけなかったこと。この辺もいい感じに使える gem が多分ないので自作せざるを得なかった。
自作の JSON パーサーを書いている過程で、MySQLJSON には Opaque 型という MySQL の型を JSON に含められる特殊な型があることが分かって勉強になった。
なお Opaque 型は使ったことないので MysqlReplicator では対応はしていない。

こういう感じで入れられるらしい。

-- JSONにDATE型を含める例
INSERT INTO tests (json) VALUES (JSON_OBJECT('created', CAST('2025-12-10' AS DATE)));

次は MysqlReplicator をベースにして、Elasticsearch への同期処理を作って Docker Image として動かせるようにしていく。