前回 Ruby の TCPソケット通信で MySQL の認証を通すやり方を書いたので、その続き。 認証を通せれば、Rubyから SQL を発行して実行できるようになるのでやり方をまとめる。
MySQL には各種処理を実行するためのコマンドというのがあり、SQL の実行は COM_QUERY というコマンドになる。
主要なコマンド一覧
| コマンド | 値 | 渡すもの |
|---|---|---|
| COM_QUIT | 0x01 | なし |
| COM_INIT_DB | 0x02 | database名 |
| COM_QUERY | 0x03 | SQLクエリ文字列 |
| COM_PING | 0x0E | なし |
| COM_BINLOG_DUMP | 0x12 | binlog位置情報 |
| COM_REGISTER_SLAVE | 0x15 | スレーブ情報 |
| COM_STMT_PREPARE | 0x16 | SQLクエリ |
| COM_STMT_EXECUTE | 0x17 | statement_id + パラメータ |
| COM_STMT_CLOSE | 0x19 | statement_id |
コマンド値は 8ビット符号なし整数でパケット送信する。
例えば、接続確認用の PING コマンドの実行と結果の確認はこんな感じで取れる。
socket = TCPSocket.new(host, port) socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) sequence_id = 0 def read_packet header = socket.read(4) packet_length = header[0].unpack1('C') | (header[1].unpack1('C') << 8) | (header[2].unpack1('C') << 16) sequence_id = header[3]unpack1('C') payload = socket.read(packet_length) { packet_length:, payload: } end def send_packet(payload) sequence_id += 1 packet_length = payload.length header = [packet_length].pack('V')[0..2] + [sequence_id].pack('C') socket.write(header + payload) end payload = [0x0E].pack('C') # PINGコマンド send_packet(payload) response = read_packet(payload) if response[:payload][0].unpack1('C') == 0x00 # 成功 puts 'PONG' else puts 'Oops' end
COM_QUERY コマンドを実行する
SQL を実行させるための COM_QUERY コマンドのペイロードはこれでOK
payload = [0x03].pack('C') + sql.encode('UTF-8')
クエリ実行のパケットを送信すると、MySQL でクエリが実行されて結果をパケット受信できるようになる。
実行結果は OK Packet / ERR Packet / Result Set の3パターンあって、構造が違うのでそれぞれパースする必要がある。
どのパターンかはパケットのペイロードの 1 バイト目を見ればわかるようになっている。
- 0x00 -> OK Packet
- 0xFF -> Err Packet
- 0x01~0xFA -> Result Set
- 0xFB -> Local INFILE Request
LOCAL INFILE Request は特殊なので今回は未対応。
OK Packet の場合
結果セットを返さないクエリが成功したときがこれ。
メジャーなところだと、INSERT / UPDATE / DELETE文を実行したときこれが返る。
パケットの構造はこんな感じ。
| サイズ | フィールド | |
|---|---|---|
| 1 byte integer | header | 0x00が入る |
| length-encoded integer | affected_rows | |
| length-encoded integer | last_insert_id | |
| 2 bytes integer | status_flags | サーバ状態を表すフラグ |
| 2 bytes integer | warnings | |
| N bytes string | info | NULL終端 |
length-encoded integer は最初の 1 バイトで長さが決まる整数。
最初の 1 バイトごとにこんな感じで長さと値を取る。
- 0x00 から 0xFA(250)
- 最初の 1 バイトがそのまま値になる
- 0xFC
- 後続の 2 バイトが値
- 0xFD
- 後続の 3 バイトが値
- 0xFE
- 後続の 8 バイトが値
- 0xFB
- NULL
def length_encoded_integer(payload, offset) first_byte = payload[offset].unpack1('C') case first_byte when 0..250 { value: first_byte, bytes_read: 1 } when 0xFC value = payload[(offset + 1)..(offset + 2)].unpack1('s<') { value: value, bytes_read: 3 } when 0xFD value = payload[(offset + 1)..(offset + 3)].unpack1('V') & 0xFFFFFF { value: value, bytes_read: 4 } when 0xFE value = payload[(offset + 1)..(offset + 8)].unpack1('Q<') { value: value, bytes_read: 9 } else # Included 0xFB { value: nil, bytes_read: 1 } end end
ERR Packet の場合
文字通りクエリ実行がエラーになった時がこれ。
パケットの構造はこんな感じ。
| サイズ | フィールド | |
|---|---|---|
| 1 byte integer | header | 0xFFが入る |
| 2 bytes integer | error_code | |
| 1 byte integer | sql_state_marker | "#"固定 |
| 5 bytes string | sql_state | |
| N bytes string | error_message | NULL終端 |
Result Set の場合
SELECT みたいな結果行レコードが返るクエリがこれ。
こいつは複雑で、複数のパケットで構成される。
- カラム数パケット
- この後何回カラム定義パケットを取ればいいかを伝える
- カラム定義パケット
- EOFパケット
- カラム定義パケットが終わったことを伝える
- 行データパケット
- EOFパケット
- 行データパケットが終わったことを伝える
カラム数パケット
| サイズ | フィールド | |
|---|---|---|
| 1 byte integer | column_count | カラムの数が入る |
カラム定義パケット
| サイズ | フィールド | |
|---|---|---|
| length-encoded string | catalog | "def" というダミー値固定 |
| length-encoded string | schema | データベース名 |
| length-encoded string | table | テーブル名(エイリアス) |
| length-encoded string | org_table | 物理テーブル名 |
| length-encoded string | name | カラム名(エイリアス) |
| length-encoded string | org_name | 物理カラム名 |
| length-encoded integer | length_of_fixed_fields | "0x0c" 固定 |
| 2 bytes integer | charset | |
| 4 bytes integer | column_length | カラムの最大長 |
| 1 byte integer | column_type | データ型 |
| 2 bytes integer | flags | カラムの属性を表すビットフラグ |
| 1 byte integer | decimals | 小数点以下の桁数 |
| 2 bytes integer | filter | "0x00 0x00" 固定 |
length-encoded string は length-encoded integer + length-encoded integer が返す値の長さの文字列。
こんな感じで取る。
def length_encoded_string(payload, offset) length_info = length_encoded_integer(payload, offset) return { value: '', bytes_read: length_info[:bytes_read] } if length_info[:value].nil? string_start = length_info[:bytes_read] string_end = string_start + length_info[:value] - 1 value = payload[string_start..string_end] || '' { value: value, bytes_read: length_info[:bytes_read] + length_info[:value] } end
EOF パケット
| サイズ | フィールド | |
|---|---|---|
| 1 byte integer | eof | "0xFE" 固定 |
行データパケット
カラム数分 length-encoded string が繰り返され、取れる値がカラムの値になる。
ただし、1 バイト目が 0xFB の場合は値が NULL になっているので最初に NULL チェックする。
これらをまとめると、Result Set パケットを処理するコードはこんな感じになる。
# カラム定義パケットのパース def parse_column_definition(payload) offset = 0 catalog = length_encoded_string(payload, offset) offset += catalog[:bytes_read] schema = length_encoded_string(payload, offset) offset += schema[:bytes_read] table = length_encoded_string(payload, offset) offset += table[:bytes_read] org_table = length_encoded_string(payload, offset) offset += org_table[:bytes_read] name = length_encoded_string(payload, offset) offset += name[:bytes_read] org_name = length_encoded_string(payload, offset) offset += org_name[:bytes_read] # length of fixed-length fields (1 byte) offset += 1 charset = payload[offset..(offset + 1)].unpack1('v') offset += 2 column_length = payload[offset..(offset + 3)].unpack1('V') offset += 4 column_type = payload[offset].unpack1('C') { schema: schema[:value], table: table[:value], org_table: org_table[:value], name: name[:value], org_name: org_name[:value], charset: charset, column_length: column_length, column_type: column_type } end # 行データパケットのパース def parse_row_data(payload, columns) first_byte = payload[0].unpack1('C') row = {} offset = 0 columns.each do |column| column_name_key = column[:name].downcase.to_sym if offset >= payload.length row[column_name_key] = nil next end if first_byte == 0xFB # NULL value row[column_name_key] = nil offset += 1 else # row data (length-encoded string) value = length_encoded_string(payload, offset) row[column_name_key] = value[:value] offset += value[:bytes_read] end end row end # カラム数パケット、カラム定義パケットを処理 columns = [] column_count = length_encoded_integer(payload, 0)[:value].to_i column_count.times do column_packet = read_packet column_info = parse_column_definition(column_packet[:payload]) columns << column_info end # EOF パケットを読み進める connection.read_packet # 行データパケットを処理 rows = [] loop do row_packet = read_packet # EOF パケットになったら終了 if row_packet[:payload][0].unpack1('C') == 0xFE break end rows << parse_row_data(row_packet[:payload], columns) end puts rows