ナニワヤのローストビーフはいいぞという話

うちは毎年、年末年始や記念日を家で過ごす場合、結構高確率で麻布十番にある「スーパーナニワヤ」というスーパー内の精肉店で売っているローストビーフを買って食べることが多い。
ナニワヤのローストビーフはこれまでの人生で一番美味しいローストビーフといっても全然過言でないくらい美味しいのである。

赤身なのである程度量を食べてもしつこくならないし、そこそこ厚めに切っても歯で噛み切れる柔らかさで食べやすくて最高。
ついでにお惣菜コーナーでポテトサラダを買って帰って一緒に食べるのが定番のパターン。(本当はマッシュポテトがいいけど売ってないので仕方がない)

ナニワヤのローストビーフとの出会いは、前職のオフィスがまだ麻布十番にあった頃、夜遅くにオフィスで飲んでいたら社長がおつまみ代わりに買ってきてくれたのが最初で、食べてみたらものすごい美味しくて、「何これ!どこのなんすか!?????」と聞いたら「十番商店街にあるスーパーだよ」と教えてもらったのがきっかけ。

この情報を聞けただけでも前職に入社した甲斐があったというものである。
話を聞いた当時はシュッと買えるほどにはお金がなかったので、それなりに稼げるようになってからホクホク買いに行くようになったのであった。

スーパーナニワヤは麻布十番駅から7,8分くらい歩いた十番商店街の外れ、というかちょっと抜けたところにある、めちゃめちゃ昔ながらのスーパーという感じ。
中に精肉店があるので、そこでローストビーフが買える。

以前は14時過ぎにならないとローストビーフが焼きあがらないので14時くらいに行って焼きあがりを待って買うという感じだったのだけど、お店の人に聞いたところ最近は午前中も提供されているらしい。
モモ肉の塊肉で提供されていて、1パック200gから300gくらい。
本当に人気なので、週末は夕方以降だともう売り切れていることがあるので注意。以前16時くらいに行ったらもう売り切れてたのは悲しい思い出。
特に年末、この記事を書いている前日の12/30に買いにいったら14時過ぎ時点で20分ほど行列に並ぶ必要があったので、買うのは大変だけど、その大変さの価値はある。

よいお年を。

lexicalエディタでMermaid記法で図を描けるようにするプラグインを作った

facebookが公開しているlexicalエディタというリッチテキストエディタのライブラリがあって、便利に使っている。

lexical.dev

エディタstateを管理するためのコアライブラリといくつかのプラグインが提供されていて、プラグインの仕様に沿ったReactコンポーネントを作ることでエディタ上で実現したい機能を拡張していくことができる。
プラグインの仕様がとても良くできているし、selection APIに介入したりdecorateができたりと非常に柔軟な拡張ができるので、2025年12月時点ではReactを使ったフロントエンドに高機能なリッチテキストエディタを導入したい場合はおすすめできる。

最近MermaidでER図を作る機会があり、lexicalエディタでも描けるといいなーと思ったのでプラグインを作って公開してみた。 せっかくなのでコードは全てClaude Codeに書かせてみた。

https://www.npmjs.com/package/lexical-mermaid

GItHubはこちら

github.com

READMEに詳しく載せているが、こんな感じでlexicalエディタ上でMermaidで描いた図と入力フォームを切り替えて使える。 Storybookが付いているので簡単に動きを試せます。

```mermaidMarkdown風に入力して改行するとMermaid記法入力コンポーネントが挿入されるようにもできる。

使い方はプラグイン(Reactコンポーネント)を置くだけ。

<LexicalComposer initialConfig={editorConfig}>
  <div className="editor-container">
    <RichTextPlugin
      contentEditable={<ContentEditable className="editor-input" />}
      placeholder={<div className="editor-placeholder">テキストを入力...</div>}
      ErrorBoundary={LexicalErrorBoundary}
    />
    <MermaidPlugin />{/* Mermaidの図&入力フォームコンポーネント */}
    <MermaidMarkdownPlugin />{/* Markdown風の入力受け付け */}
  </div>
</LexicalComposer>

入力フォームのスタイルはpropsで指定できるのと、INSERT_MERMAID_COMMAND コマンドを用意しているので、図のタイプごとに初期テンプレートを入れるみたいなこともできます。
興味があれば、詳しくはREADMEを見てみてください。

Claude Codeだけで作ってみた

これまでClaude Codeはちょっとしたタスクをやってもらうのに使っていたけど、これくらいのライブラリをClaude Codeに全部書かせてnpmに公開できる品質のものが作れるのかを試したかったので、自分はコードを書かない縛りで作ってみた。

ざっくりこんな感じで進めた。

  1. Claude.ai で今回必要そうな知識と設計の相談
    • 「lexicalエディタのプラグインの仕様を教えてください」
    • 「Mermaid.jsの仕様と内部の仕組みを教えてください」
    • 「lexicalエディタのMermaid.jsのプラグインを作りたいのですが、どういう設計が考えられますか?」
  2. npmプロジェクトを作ってClaude Codeを起動
    • 「lexicalというリッチテキストエディタのライブラリがあるのですが、Mermaidが書けるプラグインを作りたいです」
    • 「動作確認したいので、storybookを用意してください」
    • storybookを動かして見つかったバグを修正
  3. Claude Codeに追加したい機能をお願い
    • PNGエクスポートがしたいので図の右上にエクスポートするためのボタンを追加してください」
  4. Claude Codeに公開するにあたりリポジトリの整備をお願い
    • 「このライブラリはlexicalエディタを使っている人が組み込むのを前提としているので、package.jsonの依存関係を修正してください」
    • 「このリポジトリのREADME.mdを作ってください。README.mdは英語でお願いします」
    • 「eslintの設定ファイルとチェックコマンドを作ってください」
    • 「これまでの作業内容を踏まえてCLAUDE.mdを作ってください。CLAUDE.mdは英語でお願いします」

これだけ。

Claude Codeすごい。CLAUDE.md を最初に作らずとも、プロンプトの指示だけで先のGitHubに上げたくらいのコードであれば作れてしまうのがすごい。
「lexicalというリッチテキストエディタのライブラリがあるのですが、Mermaidが書けるプラグインを作りたいです」と指示したときの plan mode が思った通りの内容で感動した。
これくらいの規模のライブラリの新規実装であれば、CLAUDE.md 作らなくても十分な品質のコードが作れることが分かったのは収穫。
よくあるライブラリ構成のフロントエンドの場合、CLAUDE.md は作らなくても結構いけるよねと思ってたので合ってた。
もちろん大規模リポジトリやコミットする人のレベル感がばらけている場合は CLAUDE.md や slash commands は最初に作った方がいいと思う。

品質に一番効いた指示は、第一版ができた直後にstorybookを作らせたこと。
これによって、修正のたびにClaude Codeがstorybookを起動して動作を確認してブラウザ上のエラーが起きない状態にしてから完了通知をくれるので、挙動に満足できるかのチェックと生成されたコードのチェックだけに集中できるようになりました。

  • メジャーなフロントエンドライブラリはみんな使っているので学習されている情報がおそらく多く、plan modeの精度が高い
  • APIはモック、UIはstorybookを用意するなど、外部サービスに依存せず個別で高速に動く動作環境を簡単に起動できるので、生成AIの確認が正確且つ高速でストレスにならない

という点からフロントエンドは生成AIとの相性がいいなあと実感できた。
サーバサイドは見落としがあった際のデメリットが大きすぎるので、まだ補助的な使い方でいきたい感はあるけど

改めてstorybookは生成AI時代のフロントエンドとの相性がいいな。 デザインシステムレベルの細かいコンポーネントのカタログという固定観念があったんだけど、APIモック周りを整備した上で、コンテナコンポーネントレベルでもstorybook作って確認させればよさそう。

MySQLのレプリケーションプロトコルを使ったBinlogイベントを処理できるライブラリを作った

久々にライブラリを作って Rubygems に publish した。

github.com

ライブラリの名前は MysqlReplicator とした。これは MySQL の Binlog イベントを Ruby のプログラムで受け取って自由に処理を書くためのライブラリ。

MySQL にはレプリケーションプロトコルというのがあって、これを使うと自分の書いたプログラムが接続した MySQL のレプリカとして振る舞うことが可能になる。
要は Binlog イベントをプログラムで受け取って処理することができる。

どういうユースケースで使えるかというと、例えば Binlog イベントで INSERT / UPDATE / DELETE 文の実行結果を受け取って、Elasticsearch や DynamoDB といった別のデータベースにデータを同期する、といったことができる。

mysqldump を使って Binlog ファイルをストリーミング読み込みする手もあるのだけど、MySQL サーバを docker で動かす場合別の docker から読むのが大変なので、接続できればいいレプリケーションプロトコルの方が実装は大変だけど環境構築はやりやすいので、レプリケーションプロトコルを活用するためのライブラリを作った。

モチベーション

今年の7月に AWS OpenSearch が RDS からのデータ同期をサポートしたリリースがあって、これは自社サービスで使えそうならぜひ検討したいと思ったのがきっかけ。

今まで MySQL のデータを Elasticsearch に同期する場合、ジョブキューを用意して、何らかの原因でリクエストに失敗したらデッドレターキューに入れて再送して・・とインフラコストやアーキテクチャが大きくなってしまうのが課題感としてあって、同期処理を AWS 側でやってくれるんだったら最高だなと思った。

ただ、仮にこれを採用するとなった場合、今度は開発環境どうするという問題が起きる。なるべく開発環境の挙動とステージング/本番の挙動は揃えたい。

記事を追っていくと、どうもログベースで同期処理をかける仕組みのようだ。そういえば MySQL は Binlog を見れば追加/更新/削除した行データが分かるな、よしローカルの開発環境でも Binlog イベントを受け取って Elasticsearch にデータを投げるようにすれば挙動を揃えられるじゃんという。

自社のサービスは Ruby on Rails を採用しているので、Ruby でやりたい。
初めは mysql2 gem で出来ないか見てみたところ、mysql2 はレプリケーションプロトコルには対応していないことが分かった。

Rubygems で Binlog イベントを受け取れるライブラリはないかなと探したのだけど、無さそうだったので自作するしかなさそうだったというのと、レプリケーションに関する知識も深まりそうなので作ってみたくなったのがきっかけ。
アプリケーションロジックとは切り離された別プロセスになるのでアプリケーションへの影響もないし。

何か参考になるものはないかなーと調べてみると、過去に似たようなことを試していた事例は見つけたのだけど、今は亡き Bitbucket のリンクしかなく無念(mysql-replication-listener 自体もう10年以上メンテナンスされてないが・・) https://so-wh.at/entry/20120827/p1

ちなみに Golanggo-mysqlレプリケーションプロトコルに対応している模様。 go-mysqlを使ったレプリケーション この Qiita も書いているのは先のブログと同じ winebarrel さんだった。先駆者すぎる。

MySQLレプリケーションプロトコルについて

最初に書いたように、自分のプログラムを MySQL サーバのレプリカとして振る舞わせる、Binlog イベントをリアルタイムで受け取れるようにするための仕組み。

これを実装するための前段として、プログラムから MySQL サーバの認証を通し、プログラムから SQL を実行できるようにする必要があるので、記事にしておいた。

Rubyでcaching_sha2_password認証を使ってMySQLに接続する
RubyのTCPソケットでMySQLにクエリを発行する

レプリケーションプロトコルを使ってやり取りするには、普段 MySQLサーバをレプリカとして設定する際と同様の命令をプログラムからパケット送信して実行すればいい。

こんな感じでプログラムからやることになります。

  1. SHOW MASTER STATUS クエリを実行し、Binlog のファイル名と読み取り位置を取得する
  2. SHOW VARIABLES LIKE "binlog_checksum"クエリを実行し、チェックサムのあり/なしを取得する
  3. COM_REGISTER_SLAVE コマンドを実行し、プログラムをレプリカとして登録する
  4. COM_BINLOG_DUMP コマンドを実行し、Binlog イベントをストリームで受け取れるようにする
  5. Binlog イベントのパケットを受信して、イベントタイプごとに処理をする

公式のドキュメントはここ
使うことになるイベントタイプはこの辺り。

  • ROTATE_EVENT
    • Binlog ファイルのローテーションが起きたときに発火されるイベント
  • FORMAT_DESCRIPTION_EVENT
    • レプリケーション接続の開始時に初期化のための情報を取得するために発火されるイベント
  • QUERY_EVENT
  • TABLE_MAP_EVENT
    • 行データの変更前に対象になるテーブル情報を取得するために発火されるイベント
  • WRITE_ROWS_V2_EVENT
    • 行データが挿入された時に発火されるイベント
    • 行データは複数になることもある
  • UPDATE_ROWS_V2_EVENT
    • 行データが更新された時に発火されるイベント
    • 行データは複数になることもある
    • 変更前と変更後の行データを両方受け取れる
  • DELETE_ROWS_V2_EVENT
    • 行データが削除された時に発火されるイベント
    • 行データは複数になることもある
  • XID_EVENT

公式を見れば各種イベントタイプごとのパケットの仕様が書かれている...かと思いきや、共通のヘッダー部くらいしか載っていなかったりするので参考にならない。
MySQL のヘッダーファイルを見ると、各種イベントタイプの仕様がコメントでしっかり書かれているので、これを見るのが一番良いと思います。

https://github.com/mysql/mysql-server/blob/8.0/libbinlogevents/include/statement_events.h https://github.com/mysql/mysql-server/blob/8.0/libbinlogevents/include/rows_event.h

後は書かれている仕様通りに各種イベントタイプを処理するためのパーサーを愚直に書いていけばいいです。

MysqlReplicator を作っていて、地味にしんどかったのは MySQLJSON 型は当たり前なんですけどバイナリーフォーマットで保存されているので、Binary JSON 用のパーサーも作らないといけなかったこと。この辺もいい感じに使える gem が多分ないので自作せざるを得なかった。
自作の JSON パーサーを書いている過程で、MySQLJSON には Opaque 型という MySQL の型を JSON に含められる特殊な型があることが分かって勉強になった。
なお Opaque 型は使ったことないので MysqlReplicator では対応はしていない。

こういう感じで入れられるらしい。

-- JSONにDATE型を含める例
INSERT INTO tests (json) VALUES (JSON_OBJECT('created', CAST('2025-12-10' AS DATE)));

次は MysqlReplicator をベースにして、Elasticsearch への同期処理を作って Docker Image として動かせるようにしていく。

RubyのTCPソケットでMySQLにクエリを発行する

前回 Ruby の TCPソケット通信で MySQL の認証を通すやり方を書いたので、その続き。 認証を通せれば、Rubyから SQL を発行して実行できるようになるのでやり方をまとめる。

MySQL には各種処理を実行するためのコマンドというのがあり、SQL の実行は COM_QUERY というコマンドになる。

主要なコマンド一覧

コマンド 渡すもの
COM_QUIT 0x01 なし
COM_INIT_DB 0x02 database名
COM_QUERY 0x03 SQLクエリ文字列
COM_PING 0x0E なし
COM_BINLOG_DUMP 0x12 binlog位置情報
COM_REGISTER_SLAVE 0x15 スレーブ情報
COM_STMT_PREPARE 0x16 SQLクエリ
COM_STMT_EXECUTE 0x17 statement_id + パラメータ
COM_STMT_CLOSE 0x19 statement_id

コマンド値は 8ビット符号なし整数でパケット送信する。

例えば、接続確認用の PING コマンドの実行と結果の確認はこんな感じで取れる。

socket = TCPSocket.new(host, port)
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)

sequence_id = 0

def read_packet
  header = socket.read(4)

  packet_length = header[0].unpack1('C') | (header[1].unpack1('C') << 8) | (header[2].unpack1('C') << 16)
  sequence_id = header[3]unpack1('C')
  payload = socket.read(packet_length)

  { packet_length:, payload: }
end

def send_packet(payload)
  sequence_id += 1

  packet_length = payload.length
  header = [packet_length].pack('V')[0..2] + [sequence_id].pack('C')
  socket.write(header + payload)
end

payload = [0x0E].pack('C') # PINGコマンド
send_packet(payload)

response = read_packet(payload)
if response[:payload][0].unpack1('C') == 0x00 # 成功
  puts 'PONG'
else
  puts 'Oops'
end

COM_QUERY コマンドを実行する

SQL を実行させるための COM_QUERY コマンドのペイロードはこれでOK

payload = [0x03].pack('C') + sql.encode('UTF-8')

クエリ実行のパケットを送信すると、MySQL でクエリが実行されて結果をパケット受信できるようになる。
実行結果は OK Packet / ERR Packet / Result Set の3パターンあって、構造が違うのでそれぞれパースする必要がある。

どのパターンかはパケットのペイロードの 1 バイト目を見ればわかるようになっている。

  • 0x00 -> OK Packet
  • 0xFF -> Err Packet
  • 0x01~0xFA -> Result Set
  • 0xFB -> Local INFILE Request

LOCAL INFILE Request は特殊なので今回は未対応。

OK Packet の場合

結果セットを返さないクエリが成功したときがこれ。
メジャーなところだと、INSERT / UPDATE / DELETE文を実行したときこれが返る。

パケットの構造はこんな感じ。

サイズ フィールド
1 byte integer header 0x00が入る
length-encoded integer affected_rows
length-encoded integer last_insert_id
2 bytes integer status_flags サーバ状態を表すフラグ
2 bytes integer warnings
N bytes string info NULL終端

length-encoded integer は最初の 1 バイトで長さが決まる整数。
最初の 1 バイトごとにこんな感じで長さと値を取る。

  • 0x00 から 0xFA(250)
    • 最初の 1 バイトがそのまま値になる
  • 0xFC
    • 後続の 2 バイトが値
  • 0xFD
    • 後続の 3 バイトが値
  • 0xFE
    • 後続の 8 バイトが値
  • 0xFB
    • NULL
def length_encoded_integer(payload, offset)
  first_byte = payload[offset].unpack1('C')

  case first_byte
  when 0..250
    { value: first_byte, bytes_read: 1 }
  when 0xFC
    value = payload[(offset + 1)..(offset + 2)].unpack1('s<')
    { value: value, bytes_read: 3 }
  when 0xFD
    value = payload[(offset + 1)..(offset + 3)].unpack1('V') & 0xFFFFFF
    { value: value, bytes_read: 4 }
  when 0xFE
    value = payload[(offset + 1)..(offset + 8)].unpack1('Q<')
    { value: value, bytes_read: 9 }
  else # Included 0xFB
    { value: nil, bytes_read: 1 }
  end
end

ERR Packet の場合

文字通りクエリ実行がエラーになった時がこれ。

パケットの構造はこんな感じ。

サイズ フィールド
1 byte integer header 0xFFが入る
2 bytes integer error_code
1 byte integer sql_state_marker "#"固定
5 bytes string sql_state
N bytes string error_message NULL終端

Result Set の場合

SELECT みたいな結果行レコードが返るクエリがこれ。

こいつは複雑で、複数のパケットで構成される。

  • カラム数パケット
    • この後何回カラム定義パケットを取ればいいかを伝える
  • カラム定義パケット
  • EOFパケット
    • カラム定義パケットが終わったことを伝える
  • 行データパケット
  • EOFパケット
    • 行データパケットが終わったことを伝える

カラム数パケット

サイズ フィールド
1 byte integer column_count カラムの数が入る

カラム定義パケット

サイズ フィールド
length-encoded string catalog "def" というダミー値固定
length-encoded string schema データベース名
length-encoded string table テーブル名(エイリアス
length-encoded string org_table 物理テーブル名
length-encoded string name カラム名エイリアス
length-encoded string org_name 物理カラム名
length-encoded integer length_of_fixed_fields "0x0c" 固定
2 bytes integer charset
4 bytes integer column_length カラムの最大長
1 byte integer column_type データ型
2 bytes integer flags カラムの属性を表すビットフラグ
1 byte integer decimals 小数点以下の桁数
2 bytes integer filter "0x00 0x00" 固定

length-encoded stringlength-encoded integer + length-encoded integer が返す値の長さの文字列。 こんな感じで取る。

def length_encoded_string(payload, offset)
  length_info = length_encoded_integer(payload, offset)
  return { value: '', bytes_read: length_info[:bytes_read] } if length_info[:value].nil?

  string_start = length_info[:bytes_read]
  string_end = string_start + length_info[:value] - 1
  value = payload[string_start..string_end] || ''

  { value: value, bytes_read: length_info[:bytes_read] + length_info[:value] }
end

EOF パケット

サイズ フィールド
1 byte integer eof "0xFE" 固定

行データパケット

カラム数分 length-encoded string が繰り返され、取れる値がカラムの値になる。
ただし、1 バイト目が 0xFB の場合は値が NULL になっているので最初に NULL チェックする。

これらをまとめると、Result Set パケットを処理するコードはこんな感じになる。

# カラム定義パケットのパース
def parse_column_definition(payload)
  offset = 0

  catalog = length_encoded_string(payload, offset)
  offset += catalog[:bytes_read]

  schema = length_encoded_string(payload, offset)
  offset += schema[:bytes_read]

  table = length_encoded_string(payload, offset)
  offset += table[:bytes_read]

  org_table = length_encoded_string(payload, offset)
  offset += org_table[:bytes_read]

  name = length_encoded_string(payload, offset)
  offset += name[:bytes_read]

  org_name = length_encoded_string(payload, offset)
  offset += org_name[:bytes_read]

  # length of fixed-length fields (1 byte)
  offset += 1

  charset = payload[offset..(offset + 1)].unpack1('v')
  offset += 2

  column_length = payload[offset..(offset + 3)].unpack1('V')
  offset += 4

  column_type = payload[offset].unpack1('C')

  {
    schema: schema[:value],
    table: table[:value],
    org_table: org_table[:value],
    name: name[:value],
    org_name: org_name[:value],
    charset: charset,
    column_length: column_length,
    column_type: column_type
  }
end

# 行データパケットのパース
def parse_row_data(payload, columns)
  first_byte = payload[0].unpack1('C')
  row = {}
  offset = 0

  columns.each do |column|
    column_name_key = column[:name].downcase.to_sym

    if offset >= payload.length
      row[column_name_key] = nil
      next
    end

    if first_byte == 0xFB
      # NULL value
      row[column_name_key] = nil
      offset += 1
    else
      # row data (length-encoded string)
      value = length_encoded_string(payload, offset)
      row[column_name_key] = value[:value]
      offset += value[:bytes_read]
    end
  end

  row
end

# カラム数パケット、カラム定義パケットを処理
columns = []
column_count = length_encoded_integer(payload, 0)[:value].to_i
column_count.times do
  column_packet = read_packet
  column_info = parse_column_definition(column_packet[:payload])
  columns << column_info
end

# EOF パケットを読み進める
connection.read_packet

# 行データパケットを処理
rows = []
loop do
  row_packet = read_packet

  # EOF パケットになったら終了
  if row_packet[:payload][0].unpack1('C') == 0xFE
    break
  end

  rows << parse_row_data(row_packet[:payload], columns)
end

puts rows

Rubyでcaching_sha2_password認証を使ってMySQLに接続する

ローカル環境で、RubyMySQLレプリケーションプロトコルを扱う処理を作りたいのだけど、いつも使っている mysql2 gemレプリケーションプロトコルに対応していないので自分で作ることにした。

普段はナイーブに TCP ソケットを使った処理を書くことはなかったので、よい機会ではある。
そのうち RubyMySQL に接続するプログラムを作る人の役に立つかもしれないし。

まずは Ruby から MySQL に接続しないと話にならないので、プログラムで接続するやり方を調べた。

MySQL はバージョン8 以降からデフォルトの認証方式が従来の mysql_native_password から caching_sha2_password に変わっている。
主な違いとしてはこういう感じらしい。

  • ハッシュアルゴリズムに SHA256 を採用(mysql_native_password は SHA-1
  • 認証成功後、サーバー側でクレデンシャル情報をキャッシュすることで再接続時の認証処理を高速化する
    • これを高速認証と呼ぶそう
  • 初回認証時は SSL/TLS 接続または RSA 暗号化を使ったUnix ソケット接続が必要で、平文でのパスワード送信を防止する

ローカル環境なので SSL/TLS 接続ではなく、RSA 暗号化を使ったUnix ソケット接続を採用する。

caching_sha2_password 認証の仕様については公式ドキュメントを見ながら実装していくことになる

8.4.1.2 Caching SHA-2 Pluggable Authentication 6.1.4 Caching SHA-2 Pluggable Authentication

ざっくり認証時の処理の流れはこんな感じになる。

  • TCP ソケットで MySQL サーバに接続し、Handshake パケットを解析する
    • Handshake に含まれるサーバー情報(バージョン、認証プラグイン等)を以降の処理で使う
  • caching_sha2_password 認証を行う
    • サーバ起動後初回認証時は、RSA公開鍵をサーバーから取得し、パスワードをRSA暗号化して送信する
      • 2回目以降の認証はこの処理はスキップする

MySQLとのパケット送受信

通信パケットの仕様は公式に載っているのでそれを見ればOK
MySQL Packets

送受信共に同じ構造。

サイズ フィールド名
3 bytes ペイロード payloadのバイト数(リトルエンディアン)
1 byte シーケンス番号 パケットのシーケンス番号(0始まり)
n bytes ペイロード 実際のデータ

payloadを取り出してもろもろの処理を行う、payloadを作ってもろもろの命令を送信する、というのが基本になる。
送信時に受信時に受け取ったsequence_idをインクリメントして使う必要があるのに注意。

Rubyだとこんな感じになる

socket = TCPSocket.new(host, port)
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)

sequence_id = 0

# 受信
def read_packet
  header = socket.read(4)

  packet_length = header[0].unpack1('C') | (header[1].unpack1('C') << 8) | (header[2].unpack1('C') << 16)
  sequence_id = header[3]unpack1('C')
  payload = socket.read(packet_length)
end

# 送信
def send_packet(payload)
  packet_length = payload.length
  header = [packet_length].pack('V')[0..2] + [sequence_id + 1].pack('C')
  socket.write(header + payload)
end

Handshakeパケットについて

TCP ソケットで接続したらすぐに Handshake パケットが送られてくるので解析して後続の処理に使う。

Handshake パケットの仕様は公式に載っている。
Protocol::HandshakeV10

サイズ フィールド名
1 byte プロトコルバージョン
変動 サーバーバージョン NULL終端。8.0.4 みたいな文字列が入る
4 bytes スレッドID
8bytes 認証データ(前半)
1 byte フィラー
2 bytes 機能フラグ(下位2バイト)
1 byte 文字セット
2 bytes ステータスフラグ
2 bytes 機能フラグ(上位2バイト)
1 byte 認証データ長 or 0x00
10 bytes 予約済み すべて0x00
13+ bytes 認証データ(後半)
変動 認証プラグイン NULL終端。caching_sha2_password が入る

こんな感じで愚直に仕様通りにパースしていけばいい。

offset = 0

# Protocol version (1 byte)
protocol_version = MysqlReplicator::StringUtil.read_uint8(payload[offset])
offset += 1

# Server version is null-terminated string
server_version_end = payload.index("\0", offset) || 0
server_version = MysqlReplicator::StringUtil.read_str(payload[offset...server_version_end])
offset = server_version_end + 1

# ConnectionID is 4bytes and little endian
connection_id = MysqlReplicator::StringUtil.read_uint32(payload[offset..(offset + 3)])
offset += 4

# Authentication plugin data (first 8 bytes)
auth_plugin_data_part1 = MysqlReplicator::StringUtil.read_str(payload[offset..(offset + 7)])
offset += 8

# Reserved (1 byte, always 0x00)
offset += 1

# Server capability flags (lower 2 bytes)
capability_flags_lower = MysqlReplicator::StringUtil.read_uint16(payload[offset..(offset + 1)])
offset += 2

# Character set (1 byte)
charset = MysqlReplicator::StringUtil.read_uint8(payload[offset])
offset += 1

# Status flags (2 bytes)
status_flags = MysqlReplicator::StringUtil.read_uint16(payload[offset..(offset + 1)])
offset += 2

# Server capability flags (upper 2 bytes)
capability_flags_upper = MysqlReplicator::StringUtil.read_uint16(payload[offset..(offset + 1)])
offset += 2

# Feature flags
capability_flags = capability_flags_lower | (capability_flags_upper << 16)

# Authentication plugin data length (1 byte)
auth_plugin_data_len = MysqlReplicator::StringUtil.read_uint8(payload[offset])
offset += 1

# Reserved (10 bytes)
offset += 10

# Authentication plugin data (part 2)
remaining_auth_data_len = [auth_plugin_data_len - 8, 13].max
auth_plugin_data_part2 = MysqlReplicator::StringUtil.read_str(payload[offset..(offset + remaining_auth_data_len - 1)])
offset += remaining_auth_data_len

# Authentication plugin name (null-terminated string)
plugin_name_end = payload.index("\0", offset)
auth_plugin_name = MysqlReplicator::StringUtil.read_str(payload[offset...plugin_name_end])
auth_plugin_data = auth_plugin_data_part1 + MysqlReplicator::StringUtil.read_str(auth_plugin_data_part2[0..11])
# Adjust 20 bytes
if auth_plugin_data.length > 20
  auth_plugin_data = auth_plugin_data[0..19] || ''
elsif auth_plugin_data.length < 20
  auth_plugin_data += "\x00" * (20 - auth_plugin_data.length)
end

caching_sha2_password 認証について

Handshake パケットを取得できたらいよいよ認証を通せる。

あらためて caching_sha2_password 認証の処理フローはこんな感じになる。

  1. caching_sha2_password 認証用ペイロードを作ってパケット送信
  2. レスポンスを受け取って、高速認証が使われていればここで認証完了
  3. RSA 暗号化のために、公開鍵をリクエストするパケットを送信
  4. 公開鍵を受け取って、パスワードを RSA 暗号化する
  5. RSA 暗号化したパスワードでペイロードを作ってパケット送信
  6. 認証完了

仕様に従って、ペイロードの作成、解析を行っていけばいい。

caching_sha2_password 認証用ペイロードの構造

サイズ フィールド名
4 bytes ケイパビリティフラグ クライアントがサポートする機能のビットマスク
4 bytes 最大パケットサイズ クライアントが受信可能な最大パケット長
1 byte 文字セット 使用する文字エンコーディング。utf8mb4だったら「45」
23 bytes 予約領域 すべて0x00で埋める
ユーザー名 NULL終端
認証データ長 認証データのバイト数を長さエンコード整数にしたもの
32 bytes 認証データ 暗号化したパスワード
データベース名 NULL終端。CLIENT_CONNECT_WITH_DBフラグが立っている場合のみ入れる
認証プラグイン NULL終端。caching_sha2_passwordが入る

暗号化したパスワードは、Handshake パケットで受け取ったスクランブル(auth_plugin_data がそう)とパスワードを使って作る。

# SHA256(password)
hash1 = Digest::SHA256.digest(password.encode('utf-8'))
# SHA256(SHA256(password))
hash2 = Digest::SHA256.digest(hash1)
# SHA256(SHA256(SHA256(password)), salt)
hash3 = Digest::SHA256.digest(hash2 + salt)

# XOR hash1 and hash3
payload = ''
hash1.each_byte.with_index do |byte, i|
   payload += (byte ^ hash3[i].to_s.ord).chr
end

ケイパビリティフラグは基本的な接続であればこれで大丈夫。
データベースを指定して接続する場合はフラグを立てる。

CLIENT_PLUGIN_AUTH = 0x00080000
CLIENT_SECURE_CONNECTION = 0x00008000
CLIENT_PROTOCOL_41 = 0x00000200
CLIENT_CONNECT_WITH_DB = 0x00000008
CLIENT_MULTI_STATEMENTS = 0x00010000
CLIENT_MULTI_RESULTS = 0x00020000

client_flags = CLIENT_PROTOCOL_41 |
                       CLIENT_SECURE_CONNECTION |
                       CLIENT_PLUGIN_AUTH |
                       CLIENT_MULTI_STATEMENTS |
                       CLIENT_MULTI_RESULTS
client_flags |= CLIENT_CONNECT_WITH_DB if database.present?

文字セットは Handshake パケットに入っているものを使えばよいはず。

認証用ペイロードの送信に対するレスポンス

1バイト目が「0x00」だったら高速認証成功なのでそこで認証完了として処理を打ち切る。
「0x01」だったら、次の 1 バイトが「0x03」なら高速認証成功、「0x04」なら初回接続なので RSA暗号化を使った後続の認証処理を行う。

first_byte = payload[0].unpack1('C')
case first_byte
when 0x00
  :success
when 0x01
  command = payload[1].unpack1('C')
  case command
  when 0x03
    :success
  when 0x04
    :challenge
  else
    # エラー
  end
else
  # エラー
end

公開鍵をリクエストするパケット

仕様で「0x02」を8ビット符号なし整数でリクエストしろとあるので、それを送るだけ。

public_key_payload = [0x02].pack('C')
send_packet(public_key_payload)

送ったらパケットを受信すると、ペイロードに公開鍵が入っている(ペイロード = 公開鍵)。

パスワードの RSA 暗号化

パスワードとHandshake パケットのスクランブル(auth_plugin_data がそう)で XOR 演算したものを RSA 暗号化して送信する。
Ruby だと OpenSSL ライブラリを使って暗号化できる。

注意として、パディング方式が MySQL 8.0.5 以降とそれ以前で異なるので、場合分けする必要がある。

MySQLバージョン パディング方式
8.0.4 以下 PKCS#1 v1.5
8.0.5 以上 OAEP (PKCS#1 v2.1)

また、パスワードは NULL 終端の文字列にする必要がある(これにハマった)。

require 'openssl'

rsa_public_key = OpenSSL::PKey::RSA.new(public_key)

password_with_null = password + "\x00"
password_bytes = password_with_null.encode(Encoding::UTF_8).bytes
scramble_bytes = scramble.bytes

xor_result = []
password_bytes.each_with_index do |byte, index|
  scramble_byte = scramble_bytes[index % scramble_bytes.length]
  xor_result << (byte ^ scramble_byte)
end
data_to_encrypt = xor_result.pack('C*')

begin
  # First, try OAEP padding (MySQL 8.0.5+)
  rsa_public_key.public_encrypt(data_to_encrypt, OpenSSL::PKey::RSA::PKCS1_OAEP_PADDING)
rescue OpenSSL::PKey::RSAError
  # If OAEP fails, use PKCS#1 (MySQL 8.0.4 and earlier)
  rsa_public_key.public_encrypt(data_to_encrypt, OpenSSL::PKey::RSA::PKCS1_PADDING)
end

この RSA 暗号化したパスワードを送信すれば認証成功のレスポンスが取れるはず!
認証さえ終われば後は自由に MySQL を扱えるようになる。

別記事で RubyMySQLSQL を発行するやり方を書く。