ローカル環境で、Ruby で MySQL のレプリケーションプロトコルを扱う処理を作りたいのだけど、いつも使っている mysql2 gem がレプリケーションプロトコルに対応していないので自分で作ることにした。
普段はナイーブに TCP ソケットを使った処理を書くことはなかったので、よい機会ではある。
そのうち Ruby で MySQL に接続するプログラムを作る人の役に立つかもしれないし。
まずは Ruby から MySQL に接続しないと話にならないので、プログラムで接続するやり方を調べた。
MySQL はバージョン8 以降からデフォルトの認証方式が従来の mysql_native_password から caching_sha2_password に変わっている。
主な違いとしてはこういう感じらしい。
- ハッシュアルゴリズムに SHA256 を採用(mysql_native_password は SHA-1)
- 認証成功後、サーバー側でクレデンシャル情報をキャッシュすることで再接続時の認証処理を高速化する
- これを高速認証と呼ぶそう
- 初回認証時は SSL/TLS 接続または RSA 暗号化を使ったUnix ソケット接続が必要で、平文でのパスワード送信を防止する
ローカル環境なので SSL/TLS 接続ではなく、RSA 暗号化を使ったUnix ソケット接続を採用する。
caching_sha2_password 認証の仕様については公式ドキュメントを見ながら実装していくことになる
8.4.1.2 Caching SHA-2 Pluggable Authentication 6.1.4 Caching SHA-2 Pluggable Authentication
ざっくり認証時の処理の流れはこんな感じになる。
- TCP ソケットで MySQL サーバに接続し、Handshake パケットを解析する
- Handshake に含まれるサーバー情報(バージョン、認証プラグイン等)を以降の処理で使う
- caching_sha2_password 認証を行う
MySQLとのパケット送受信
通信パケットの仕様は公式に載っているのでそれを見ればOK
MySQL Packets
送受信共に同じ構造。
| サイズ | フィールド名 | |
|---|---|---|
| 3 bytes | ペイロード長 | payloadのバイト数(リトルエンディアン) |
| 1 byte | シーケンス番号 | パケットのシーケンス番号(0始まり) |
| n bytes | ペイロード | 実際のデータ |
payloadを取り出してもろもろの処理を行う、payloadを作ってもろもろの命令を送信する、というのが基本になる。
送信時に受信時に受け取ったsequence_idをインクリメントして使う必要があるのに注意。
Rubyだとこんな感じになる
socket = TCPSocket.new(host, port) socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) sequence_id = 0 # 受信 def read_packet header = socket.read(4) packet_length = header[0].unpack1('C') | (header[1].unpack1('C') << 8) | (header[2].unpack1('C') << 16) sequence_id = header[3]unpack1('C') payload = socket.read(packet_length) end # 送信 def send_packet(payload) packet_length = payload.length header = [packet_length].pack('V')[0..2] + [sequence_id + 1].pack('C') socket.write(header + payload) end
Handshakeパケットについて
TCP ソケットで接続したらすぐに Handshake パケットが送られてくるので解析して後続の処理に使う。
Handshake パケットの仕様は公式に載っている。
Protocol::HandshakeV10
| サイズ | フィールド名 | |
|---|---|---|
| 1 byte | プロトコルバージョン | |
| 変動 | サーバーバージョン | NULL終端。8.0.4 みたいな文字列が入る |
| 4 bytes | スレッドID | |
| 8bytes | 認証データ(前半) | |
| 1 byte | フィラー | |
| 2 bytes | 機能フラグ(下位2バイト) | |
| 1 byte | 文字セット | |
| 2 bytes | ステータスフラグ | |
| 2 bytes | 機能フラグ(上位2バイト) | |
| 1 byte | 認証データ長 or 0x00 | |
| 10 bytes | 予約済み | すべて0x00 |
| 13+ bytes | 認証データ(後半) | |
| 変動 | 認証プラグイン名 | NULL終端。caching_sha2_password が入る |
こんな感じで愚直に仕様通りにパースしていけばいい。
offset = 0 # Protocol version (1 byte) protocol_version = MysqlReplicator::StringUtil.read_uint8(payload[offset]) offset += 1 # Server version is null-terminated string server_version_end = payload.index("\0", offset) || 0 server_version = MysqlReplicator::StringUtil.read_str(payload[offset...server_version_end]) offset = server_version_end + 1 # ConnectionID is 4bytes and little endian connection_id = MysqlReplicator::StringUtil.read_uint32(payload[offset..(offset + 3)]) offset += 4 # Authentication plugin data (first 8 bytes) auth_plugin_data_part1 = MysqlReplicator::StringUtil.read_str(payload[offset..(offset + 7)]) offset += 8 # Reserved (1 byte, always 0x00) offset += 1 # Server capability flags (lower 2 bytes) capability_flags_lower = MysqlReplicator::StringUtil.read_uint16(payload[offset..(offset + 1)]) offset += 2 # Character set (1 byte) charset = MysqlReplicator::StringUtil.read_uint8(payload[offset]) offset += 1 # Status flags (2 bytes) status_flags = MysqlReplicator::StringUtil.read_uint16(payload[offset..(offset + 1)]) offset += 2 # Server capability flags (upper 2 bytes) capability_flags_upper = MysqlReplicator::StringUtil.read_uint16(payload[offset..(offset + 1)]) offset += 2 # Feature flags capability_flags = capability_flags_lower | (capability_flags_upper << 16) # Authentication plugin data length (1 byte) auth_plugin_data_len = MysqlReplicator::StringUtil.read_uint8(payload[offset]) offset += 1 # Reserved (10 bytes) offset += 10 # Authentication plugin data (part 2) remaining_auth_data_len = [auth_plugin_data_len - 8, 13].max auth_plugin_data_part2 = MysqlReplicator::StringUtil.read_str(payload[offset..(offset + remaining_auth_data_len - 1)]) offset += remaining_auth_data_len # Authentication plugin name (null-terminated string) plugin_name_end = payload.index("\0", offset) auth_plugin_name = MysqlReplicator::StringUtil.read_str(payload[offset...plugin_name_end]) auth_plugin_data = auth_plugin_data_part1 + MysqlReplicator::StringUtil.read_str(auth_plugin_data_part2[0..11]) # Adjust 20 bytes if auth_plugin_data.length > 20 auth_plugin_data = auth_plugin_data[0..19] || '' elsif auth_plugin_data.length < 20 auth_plugin_data += "\x00" * (20 - auth_plugin_data.length) end
caching_sha2_password 認証について
Handshake パケットを取得できたらいよいよ認証を通せる。
あらためて caching_sha2_password 認証の処理フローはこんな感じになる。
- caching_sha2_password 認証用ペイロードを作ってパケット送信
- レスポンスを受け取って、高速認証が使われていればここで認証完了
- RSA 暗号化のために、公開鍵をリクエストするパケットを送信
- 公開鍵を受け取って、パスワードを RSA 暗号化する
- RSA 暗号化したパスワードでペイロードを作ってパケット送信
- 認証完了
仕様に従って、ペイロードの作成、解析を行っていけばいい。
caching_sha2_password 認証用ペイロードの構造
| サイズ | フィールド名 | |
|---|---|---|
| 4 bytes | ケイパビリティフラグ | クライアントがサポートする機能のビットマスク |
| 4 bytes | 最大パケットサイズ | クライアントが受信可能な最大パケット長 |
| 1 byte | 文字セット | 使用する文字エンコーディング。utf8mb4だったら「45」 |
| 23 bytes | 予約領域 | すべて0x00で埋める |
| ユーザー名 | NULL終端 | |
| 認証データ長 | 認証データのバイト数を長さエンコード整数にしたもの | |
| 32 bytes | 認証データ | 暗号化したパスワード |
| データベース名 | NULL終端。CLIENT_CONNECT_WITH_DBフラグが立っている場合のみ入れる | |
| 認証プラグイン名 | NULL終端。caching_sha2_passwordが入る |
暗号化したパスワードは、Handshake パケットで受け取ったスクランブル(auth_plugin_data がそう)とパスワードを使って作る。
# SHA256(password) hash1 = Digest::SHA256.digest(password.encode('utf-8')) # SHA256(SHA256(password)) hash2 = Digest::SHA256.digest(hash1) # SHA256(SHA256(SHA256(password)), salt) hash3 = Digest::SHA256.digest(hash2 + salt) # XOR hash1 and hash3 payload = '' hash1.each_byte.with_index do |byte, i| payload += (byte ^ hash3[i].to_s.ord).chr end
ケイパビリティフラグは基本的な接続であればこれで大丈夫。
データベースを指定して接続する場合はフラグを立てる。
CLIENT_PLUGIN_AUTH = 0x00080000 CLIENT_SECURE_CONNECTION = 0x00008000 CLIENT_PROTOCOL_41 = 0x00000200 CLIENT_CONNECT_WITH_DB = 0x00000008 CLIENT_MULTI_STATEMENTS = 0x00010000 CLIENT_MULTI_RESULTS = 0x00020000 client_flags = CLIENT_PROTOCOL_41 | CLIENT_SECURE_CONNECTION | CLIENT_PLUGIN_AUTH | CLIENT_MULTI_STATEMENTS | CLIENT_MULTI_RESULTS client_flags |= CLIENT_CONNECT_WITH_DB if database.present?
文字セットは Handshake パケットに入っているものを使えばよいはず。
認証用ペイロードの送信に対するレスポンス
1バイト目が「0x00」だったら高速認証成功なのでそこで認証完了として処理を打ち切る。
「0x01」だったら、次の 1 バイトが「0x03」なら高速認証成功、「0x04」なら初回接続なので RSA暗号化を使った後続の認証処理を行う。
first_byte = payload[0].unpack1('C') case first_byte when 0x00 :success when 0x01 command = payload[1].unpack1('C') case command when 0x03 :success when 0x04 :challenge else # エラー end else # エラー end
公開鍵をリクエストするパケット
仕様で「0x02」を8ビット符号なし整数でリクエストしろとあるので、それを送るだけ。
public_key_payload = [0x02].pack('C') send_packet(public_key_payload)
送ったらパケットを受信すると、ペイロードに公開鍵が入っている(ペイロード = 公開鍵)。
パスワードの RSA 暗号化
パスワードとHandshake パケットのスクランブル(auth_plugin_data がそう)で XOR 演算したものを RSA 暗号化して送信する。
Ruby だと OpenSSL ライブラリを使って暗号化できる。
注意として、パディング方式が MySQL 8.0.5 以降とそれ以前で異なるので、場合分けする必要がある。
| MySQLバージョン | パディング方式 |
|---|---|
| 8.0.4 以下 | PKCS#1 v1.5 |
| 8.0.5 以上 | OAEP (PKCS#1 v2.1) |
また、パスワードは NULL 終端の文字列にする必要がある(これにハマった)。
require 'openssl' rsa_public_key = OpenSSL::PKey::RSA.new(public_key) password_with_null = password + "\x00" password_bytes = password_with_null.encode(Encoding::UTF_8).bytes scramble_bytes = scramble.bytes xor_result = [] password_bytes.each_with_index do |byte, index| scramble_byte = scramble_bytes[index % scramble_bytes.length] xor_result << (byte ^ scramble_byte) end data_to_encrypt = xor_result.pack('C*') begin # First, try OAEP padding (MySQL 8.0.5+) rsa_public_key.public_encrypt(data_to_encrypt, OpenSSL::PKey::RSA::PKCS1_OAEP_PADDING) rescue OpenSSL::PKey::RSAError # If OAEP fails, use PKCS#1 (MySQL 8.0.4 and earlier) rsa_public_key.public_encrypt(data_to_encrypt, OpenSSL::PKey::RSA::PKCS1_PADDING) end
この RSA 暗号化したパスワードを送信すれば認証成功のレスポンスが取れるはず!
認証さえ終われば後は自由に MySQL を扱えるようになる。