From 439fec4cae870ec768d47a0b2564c32ea84e0ffe Mon Sep 17 00:00:00 2001 From: Anatoly Ikorsky Date: Mon, 23 Oct 2023 18:32:02 +0300 Subject: [PATCH] Introduce `BinlogStreamRequest` --- src/conn/mod.rs | 130 ++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 109 insertions(+), 21 deletions(-) diff --git a/src/conn/mod.rs b/src/conn/mod.rs index 9dbbd5ed..d2a11646 100644 --- a/src/conn/mod.rs +++ b/src/conn/mod.rs @@ -14,9 +14,10 @@ use mysql_common::{ crypto, io::ParseBuf, packets::{ - binlog_request::BinlogRequest, AuthPlugin, AuthSwitchRequest, CommonOkPacket, ErrPacket, - HandshakePacket, HandshakeResponse, OkPacket, OkPacketDeserializer, OldAuthSwitchRequest, - OldEofPacket, ResultSetTerminator, SslRequest, + binlog_request::BinlogRequest, AuthPlugin, AuthSwitchRequest, BinlogDumpFlags, + ComRegisterSlave, CommonOkPacket, ErrPacket, HandshakePacket, HandshakeResponse, OkPacket, + OkPacketDeserializer, OldAuthSwitchRequest, OldEofPacket, ResultSetTerminator, Sid, + SslRequest, }, proto::MySerialize, row::Row, @@ -1260,12 +1261,9 @@ impl Conn { Ok(self) } - async fn register_as_slave(&mut self, server_id: u32) -> Result<()> { - use mysql_common::packets::ComRegisterSlave; - + async fn register_as_slave(&mut self, com_register_slave: ComRegisterSlave<'_>) -> Result<()> { self.query_drop("SET @master_binlog_checksum='ALL'").await?; - self.write_command(&ComRegisterSlave::new(server_id)) - .await?; + self.write_command(&com_register_slave).await?; // Server will respond with OK. self.read_packet().await?; @@ -1273,19 +1271,109 @@ impl Conn { Ok(()) } - async fn request_binlog(&mut self, request: BinlogRequest<'_>) -> Result<()> { - self.register_as_slave(request.server_id()).await?; - self.write_command(&request.as_cmd()).await?; + async fn request_binlog(&mut self, request: BinlogStreamRequest<'_>) -> Result<()> { + self.register_as_slave(request.register_slave).await?; + self.write_command(&request.binlog_request.as_cmd()).await?; Ok(()) } - pub async fn get_binlog_stream(mut self, request: BinlogRequest<'_>) -> Result { + /// Turns this connection into a binlog stream. + /// + /// You can use SHOW BINARY LOGS to get the current logfile and position from the master. + /// If the request’s filename is empty, the server will send the binlog-stream of the first known binlog. + pub async fn get_binlog_stream( + mut self, + request: BinlogStreamRequest<'_>, + ) -> Result { self.request_binlog(request).await?; Ok(BinlogStream::new(self)) } } +/// Binlog stream request builder. +pub struct BinlogStreamRequest<'a> { + binlog_request: BinlogRequest<'a>, + register_slave: ComRegisterSlave<'a>, +} + +impl<'a> BinlogStreamRequest<'a> { + /// Creates a new request with the given slave server id. + pub fn new(server_id: u32) -> Self { + Self { + binlog_request: BinlogRequest::new(server_id), + register_slave: ComRegisterSlave::new(server_id), + } + } + + /// Enables GTID-based replication (disabled by default). + pub fn with_gtid(mut self) -> Self { + self.binlog_request = self.binlog_request.with_use_gtid(true); + self + } + + /// Enables `NON_BLOCK` flag. Stream will be terminated as soon as there are no events. + pub fn with_non_blocking(mut self) -> Self { + self.binlog_request = self + .binlog_request + .with_flags(BinlogDumpFlags::BINLOG_DUMP_NON_BLOCK); + self + } + + /// Sets the filename of the binlog on the master (try `SHOW BINARY LOGS`). + pub fn with_filename(mut self, filename: &'a [u8]) -> Self { + self.binlog_request = self.binlog_request.with_filename(filename); + self + } + + /// Sets the start position (defaults to `4`). + pub fn with_pos(mut self, position: u64) -> Self { + self.binlog_request = self.binlog_request.with_pos(position); + self + } + + /// Adds the given set of GTIDs to the request (ignored if not GTID-based). + pub fn with_gtid_set(mut self, set: T) -> Self + where + T: IntoIterator>, + { + self.binlog_request = self.binlog_request.with_sids(set); + self + } + + /// This hostname will be reported to the server (max len 255, default to an empty string). + /// + /// Usually left default. + pub fn with_hostname(mut self, hostname: &'a [u8]) -> Self { + self.register_slave = self.register_slave.with_hostname(hostname); + self + } + + /// This username will be reported to the server (max len 255, default to an empty string). + /// + /// Usually left default. + pub fn with_user(mut self, user: &'a [u8]) -> Self { + self.register_slave = self.register_slave.with_user(user); + self + } + + /// This password will be reported to the server (max len 255, default to an empty string). + /// + /// Usually left default. + pub fn with_password(mut self, password: &'a [u8]) -> Self { + self.register_slave = self.register_slave.with_password(password); + self + } + + /// This port number will be reported to the server (defaults to `0`). + /// + /// Usually left default. + pub fn with_port(mut self, port: u16) -> Self { + self.register_slave = self.register_slave.with_port(port); + self + } +} + #[cfg(test)] mod test { use bytes::Bytes; @@ -1297,7 +1385,7 @@ mod test { use std::time::Duration; use crate::{ - from_row, params, prelude::*, test_misc::get_opts, BinlogDumpFlags, BinlogRequest, + conn::BinlogStreamRequest, from_row, params, prelude::*, test_misc::get_opts, ChangeUserOpts, Conn, Error, OptsBuilder, Pool, Value, WhiteListFsHandler, }; @@ -1429,8 +1517,8 @@ mod test { let mut binlog_stream = conn .get_binlog_stream( - BinlogRequest::new(binlog_server_ids.0) - .with_filename(filename) + BinlogStreamRequest::new(binlog_server_ids.0) + .with_filename(&filename) .with_pos(pos), ) .await @@ -1467,9 +1555,9 @@ mod test { let mut binlog_stream = conn .get_binlog_stream( - BinlogRequest::new(binlog_server_ids.1) - .with_use_gtid(true) - .with_filename(filename) + BinlogStreamRequest::new(binlog_server_ids.1) + .with_gtid() + .with_filename(&filename) .with_pos(pos), ) .await @@ -1507,10 +1595,10 @@ mod test { let mut binlog_stream = conn .get_binlog_stream( - BinlogRequest::new(binlog_server_ids.2) - .with_filename(filename) + BinlogStreamRequest::new(binlog_server_ids.2) + .with_filename(&filename) .with_pos(pos) - .with_flags(BinlogDumpFlags::BINLOG_DUMP_NON_BLOCK), + .with_non_blocking(), ) .await .unwrap();