Skip to content

Commit

Permalink
Merge pull request blackbeam#217 from blackbeam/result-set-terminator…
Browse files Browse the repository at this point in the history
…-detection

Fix result set terminator detection
  • Loading branch information
blackbeam authored Dec 10, 2022
2 parents 0cfb682 + 9fb9efa commit e2bfff0
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 18 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ futures-sink = "0.3"
lazy_static = "1"
lru = "0.8.1"
mio = { version = "0.8.0", features = ["os-poll", "net"] }
mysql_common = { version = "0.29.0", default-features = false }
mysql_common = { version = "0.29.2", default-features = false }
once_cell = "1.7.2"
pem = "1.0.1"
percent-encoding = "2.1.0"
Expand Down
33 changes: 21 additions & 12 deletions src/conn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use mysql_common::{
packets::{
binlog_request::BinlogRequest, AuthPlugin, AuthSwitchRequest, CommonOkPacket, ErrPacket,
HandshakePacket, HandshakeResponse, OkPacket, OkPacketDeserializer, OldAuthSwitchRequest,
ResultSetTerminator, SslRequest,
OldEofPacket, ResultSetTerminator, SslRequest,
},
proto::MySerialize,
};
Expand Down Expand Up @@ -697,9 +697,18 @@ impl Conn {
/// Returns `true` for ProgressReport packet.
fn handle_packet(&mut self, packet: &PooledBuf) -> Result<bool> {
let ok_packet = if self.has_pending_result() {
ParseBuf(&*packet)
.parse::<OkPacketDeserializer<ResultSetTerminator>>(self.capabilities())
.map(|x| x.into_inner())
if self
.capabilities()
.contains(CapabilityFlags::CLIENT_DEPRECATE_EOF)
{
ParseBuf(&*packet)
.parse::<OkPacketDeserializer<ResultSetTerminator>>(self.capabilities())
.map(|x| x.into_inner())
} else {
ParseBuf(&*packet)
.parse::<OkPacketDeserializer<OldEofPacket>>(self.capabilities())
.map(|x| x.into_inner())
}
} else {
ParseBuf(&*packet)
.parse::<OkPacketDeserializer<CommonOkPacket>>(self.capabilities())
Expand Down Expand Up @@ -1059,7 +1068,7 @@ impl Conn {
mod test {
use bytes::Bytes;
use futures_util::stream::{self, StreamExt};
use mysql_common::binlog::events::EventData;
use mysql_common::{binlog::events::EventData, constants::MAX_PAYLOAD_LEN};
use tokio::time::timeout;

use std::time::Duration;
Expand Down Expand Up @@ -1448,15 +1457,15 @@ mod test {

#[tokio::test]
async fn should_perform_queries() -> super::Result<()> {
let long_string = ::std::iter::repeat('A')
.take(18 * 1024 * 1024)
.collect::<String>();
let mut conn = Conn::new(get_opts()).await?;
let result: Vec<(String, u8)> = conn
.query(format!(r"SELECT '{}', 231", long_string))
.await?;
for x in (MAX_PAYLOAD_LEN - 2)..=(MAX_PAYLOAD_LEN + 2) {
let long_string = ::std::iter::repeat('A').take(x).collect::<String>();
let result: Vec<(String, u8)> = conn
.query(format!(r"SELECT '{}', 231", long_string))
.await?;
assert_eq!((long_string, 231_u8), result[0]);
}
conn.disconnect().await?;
assert_eq!((long_string, 231_u8), result[0]);
Ok(())
}

Expand Down
11 changes: 6 additions & 5 deletions src/queryable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@

use futures_util::FutureExt;
use mysql_common::{
constants::MAX_PAYLOAD_LEN,
io::ParseBuf,
packets::{OkPacketDeserializer, ResultSetTerminator},
proto::{Binary, Text},
row::RowDeserializer,
value::ServerSide,
Expand Down Expand Up @@ -42,10 +42,11 @@ pub trait Protocol: fmt::Debug + Send + Sync + 'static {
fn result_set_meta(columns: Arc<[Column]>) -> ResultSetMeta;
fn read_result_set_row(packet: &[u8], columns: Arc<[Column]>) -> Result<Row>;
fn is_last_result_set_packet(capabilities: CapabilityFlags, packet: &[u8]) -> bool {
packet.len() < 8
&& ParseBuf(packet)
.parse::<OkPacketDeserializer<ResultSetTerminator>>(capabilities)
.is_ok()
if capabilities.contains(CapabilityFlags::CLIENT_DEPRECATE_EOF) {
packet[0] == 0xFE && packet.len() < MAX_PAYLOAD_LEN
} else {
packet[0] == 0xFE && packet.len() < 8
}
}
}

Expand Down

0 comments on commit e2bfff0

Please sign in to comment.