From 50067d7880c6a35aee5ac33f870414c6380681d1 Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Fri, 31 Aug 2018 00:38:11 +0200 Subject: [PATCH 1/5] Fix mysql parser handling of connection phase (#8139) The mysql protocol parser assumes that transactions can only be started by the client. This is true once the connection has been negotiated ("command phase"), but not during initial handshake ("connection phase"). This causes parsing problems when a connection is monitored from the start, as sometimes the connection phase leaves the parser confused on which side is client. This patch modifies how client-side is detected, which can only be done by looking at the destination port. --- packetbeat/protos/mysql/mysql.go | 34 ++++++++++-------- packetbeat/protos/mysql/mysql_test.go | 17 +++++---- .../tests/system/pcaps/mysql_connection.pcap | Bin 0 -> 2021 bytes .../test_0067_mysql_connection_phase.py | 22 ++++++++++++ 4 files changed, 52 insertions(+), 21 deletions(-) create mode 100644 packetbeat/tests/system/pcaps/mysql_connection.pcap create mode 100644 packetbeat/tests/system/test_0067_mysql_connection_phase.py diff --git a/packetbeat/protos/mysql/mysql.go b/packetbeat/protos/mysql/mysql.go index dacbdb5c769d..57b152223fe0 100644 --- a/packetbeat/protos/mysql/mysql.go +++ b/packetbeat/protos/mysql/mysql.go @@ -208,10 +208,18 @@ func (stream *mysqlStream) prepareForNewMessage() { stream.data = stream.data[stream.parseOffset:] stream.parseState = mysqlStateStart stream.parseOffset = 0 - stream.isClient = false stream.message = nil } +func (mysql *mysqlPlugin) isServerPort(port uint16) bool { + for _, sPort := range mysql.ports { + if uint16(sPort) == port { + return true + } + } + return false +} + func mysqlMessageParser(s *mysqlStream) (bool, bool) { logp.Debug("mysqldetailed", "MySQL parser called. parseState = %s", s.parseState) @@ -229,12 +237,11 @@ func mysqlMessageParser(s *mysqlStream) (bool, bool) { m.seq = hdr[3] m.typ = hdr[4] - logp.Debug("mysqldetailed", "MySQL Header: Packet length %d, Seq %d, Type=%d", m.packetLength, m.seq, m.typ) + logp.Debug("mysqldetailed", "MySQL Header: Packet length %d, Seq %d, Type=%d isClient=%v", m.packetLength, m.seq, m.typ, s.isClient) - if m.seq == 0 { + if s.isClient { // starts Command Phase - - if m.typ == mysqlCmdQuery { + if m.seq == 0 && m.typ == mysqlCmdQuery { // parse request m.isRequest = true m.start = s.parseOffset @@ -245,11 +252,6 @@ func mysqlMessageParser(s *mysqlStream) (bool, bool) { m.ignoreMessage = true s.parseState = mysqlStateEatMessage } - - if !s.isClient { - s.isClient = true - } - } else if !s.isClient { // parse response m.isRequest = false @@ -275,7 +277,6 @@ func mysqlMessageParser(s *mysqlStream) (bool, bool) { m.ignoreMessage = true s.parseState = mysqlStateEatMessage } - } else { // something else, not expected logp.Debug("mysql", "Unexpected MySQL message of type %d received.", m.typ) @@ -500,9 +501,14 @@ func (mysql *mysqlPlugin) Parse(pkt *protos.Packet, tcptuple *common.TCPTuple, } if priv.data[dir] == nil { + dstPort := tcptuple.DstPort + if dir == 0 { + dstPort = tcptuple.SrcPort + } priv.data[dir] = &mysqlStream{ - data: pkt.Payload, - message: &mysqlMessage{ts: pkt.Ts}, + data: pkt.Payload, + message: &mysqlMessage{ts: pkt.Ts}, + isClient: mysql.isServerPort(dstPort), } } else { // concatenate bytes @@ -521,7 +527,7 @@ func (mysql *mysqlPlugin) Parse(pkt *protos.Packet, tcptuple *common.TCPTuple, } ok, complete := mysqlMessageParser(priv.data[dir]) - //logp.Debug("mysqldetailed", "mysqlMessageParser returned ok=%b complete=%b", ok, complete) + logp.Debug("mysqldetailed", "mysqlMessageParser returned ok=%v complete=%v", ok, complete) if !ok { // drop this tcp stream. Will retry parsing with the next // segment in it diff --git a/packetbeat/protos/mysql/mysql_test.go b/packetbeat/protos/mysql/mysql_test.go index 1a44d0534909..b2f445e85696 100644 --- a/packetbeat/protos/mysql/mysql_test.go +++ b/packetbeat/protos/mysql/mysql_test.go @@ -35,6 +35,8 @@ import ( "time" ) +const serverPort = 3306 + type eventStore struct { events []beat.Event } @@ -55,6 +57,7 @@ func mysqlModForTests(store *eventStore) *mysqlPlugin { var mysql mysqlPlugin config := defaultConfig + config.Ports = []int{serverPort} mysql.init(callback, &config) return &mysql } @@ -82,7 +85,7 @@ func TestMySQLParser_simpleRequest(t *testing.T) { t.Errorf("Failed to decode hex string") } - stream := &mysqlStream{data: message, message: new(mysqlMessage)} + stream := &mysqlStream{data: message, message: new(mysqlMessage), isClient: true} ok, complete := mysqlMessageParser(stream) @@ -482,7 +485,7 @@ func testTCPTuple() *common.TCPTuple { IPLength: 4, BaseTuple: common.BaseTuple{ SrcIP: net.IPv4(192, 168, 0, 1), DstIP: net.IPv4(192, 168, 0, 2), - SrcPort: 6512, DstPort: 3306, + SrcPort: 6512, DstPort: serverPort, }, } t.ComputeHashables() @@ -540,17 +543,17 @@ func Test_gap_in_response(t *testing.T) { private := protos.ProtocolData(new(mysqlPrivateData)) - private = mysql.Parse(&req, tcptuple, 0, private) - private = mysql.Parse(&resp, tcptuple, 1, private) + private = mysql.Parse(&req, tcptuple, 1, private) + private = mysql.Parse(&resp, tcptuple, 0, private) logp.Debug("mysql", "Now sending gap..") - _, drop := mysql.GapInStream(tcptuple, 1, 10, private) + _, drop := mysql.GapInStream(tcptuple, 0, 10, private) assert.Equal(t, true, drop) trans := expectTransaction(t, store) assert.NotNil(t, trans) - assert.Equal(t, trans["notes"], []string{"Packet loss while capturing the response"}) + assert.Equal(t, []string{"Packet loss while capturing the response"}, trans["notes"]) } // Test that loss of data during the request doesn't result in a @@ -567,7 +570,7 @@ func Test_gap_in_eat_message(t *testing.T) { "66726f6d20746573") assert.Nil(t, err) - stream := &mysqlStream{data: reqData, message: new(mysqlMessage)} + stream := &mysqlStream{data: reqData, message: new(mysqlMessage), isClient: true} ok, complete := mysqlMessageParser(stream) assert.Equal(t, true, ok) assert.Equal(t, false, complete) diff --git a/packetbeat/tests/system/pcaps/mysql_connection.pcap b/packetbeat/tests/system/pcaps/mysql_connection.pcap new file mode 100644 index 0000000000000000000000000000000000000000..ee83703282861c1043cbbbaa404dc3fd78daac7c GIT binary patch literal 2021 zcmZvdPfQeN7{8*LKynI-+T*x*dZ^>zwdc} z&-=^_3$r)>2(TKqx%vCv*Yns|aGLX4ebOreUrdV66y{qO41 z^76{$&iJA?j+08b{_8)}pHdydy84dh&zj>c>|^X(?uqh=p<;@yuQvwR`YfFjTlu50 z?}uYUdw=Y%m@J>mkD0|&x-*t*(MvhUIc--)8BuAq8v7eds5lj4OX;vo_4Y#K6%kFR zR}O+|!FwW8UBhAU${hs~N~37M^-smNUGPpg<$_+!x#qaB$ziIx?SyUTOq`5G^nAfI ziY{)mjBvpm;freYk~tdeY-?}p{GfWnxKVLTyBKO~{-U|1wN5V;a_&jHVuj9hwCnAk zM@kj@R3&F|p&(Zw$E|QY20SW}!|2mC*|DF)%E6`O=U3tQi8mxW-jAi;{|~hqi8O!e zn?>I)^rRA#?!57X+}EG<^Q)Nj1p3Z)S@iEkc~!`P=1eBaI#WJ$qy9eh)q@31UM>8p-WF!F9Rkr4UldE2rOXtZEj zrW=h@Gb60AN!&~&_|LxnU2Z1$r@%jlW{sFh zyBMnjTe#1*jFF0Qn#_BCZ{i8pX>1U8QVH{iZyu3(g!ws`zt4G+G7r6cYR}PBGMkF_ zCbP){$xJHh8jed1W*)vD*p+8;17Y|+48KH!hwlV7NN+!w75r2K`{a?vj-V%%U_O*+ zlqCi8OE52T=C3xGNpf&IF=tZ!sh;cxbJIQoz6kX)u2PAFtXpMST>vivc#Q++WObfU z6zfGo_0_R7?W1TYG(Q^LsIG2_WCj43FfU=RYq~=Q5cbxl)2j^tnDgQ?K#<_85&ZU8 zWBoWmB^>pw?Un_7N2?7>%P&IQ(U-Dd0}g3<0>5gNU`bwQ8ns;CK|zPJb#xDa4*>YzpV81oPzaaamU|?*a1;u8R+?_@|Mfi~+1D z=;fyn^2M2!&5aZcM>mT)nb>lObY{>J>0m=J>O1esWiSokG?noBP3V%$B)sl`*AkfS zdEF9I=0mJidyfvJH=eqUH)Y_TEu##u-rGOP^1^ZnmSd2A;BAib7%XQnN*+^^+9(SD hzFmfH7mld}cIVw+WnF 0 From 5cd9070fe6306931ef295cc3f0f41ac00d2965b7 Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Fri, 31 Aug 2018 12:21:40 +0200 Subject: [PATCH 2/5] Updated CHANGELOG --- CHANGELOG.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 82a565f78613..6113bcdbac4f 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -80,6 +80,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff] - Fixed a seccomp related error where the `fcntl64` syscall was not permitted on 32-bit Linux and the sniffer failed to start. {issue}7839[7839] - Added missing `cmdline` and `client_cmdline` fields to index template. {pull}8258[8258] +- Fixed the mysql missing transactions if monitoring a connection from the start. {pull}8173[8173] *Winlogbeat* From a38aeda0503dc839afd0827f7b51871caf0e597d Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Fri, 31 Aug 2018 14:17:46 +0200 Subject: [PATCH 3/5] Remove unreachable branch --- packetbeat/protos/mysql/mysql.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/packetbeat/protos/mysql/mysql.go b/packetbeat/protos/mysql/mysql.go index 57b152223fe0..cf7035258feb 100644 --- a/packetbeat/protos/mysql/mysql.go +++ b/packetbeat/protos/mysql/mysql.go @@ -246,7 +246,6 @@ func mysqlMessageParser(s *mysqlStream) (bool, bool) { m.isRequest = true m.start = s.parseOffset s.parseState = mysqlStateEatMessage - } else { // ignore command m.ignoreMessage = true @@ -277,10 +276,6 @@ func mysqlMessageParser(s *mysqlStream) (bool, bool) { m.ignoreMessage = true s.parseState = mysqlStateEatMessage } - } else { - // something else, not expected - logp.Debug("mysql", "Unexpected MySQL message of type %d received.", m.typ) - return false, false } case mysqlStateEatMessage: From ecf8264603a987651d19a1f7f29251eb9c7661d0 Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Fri, 31 Aug 2018 14:57:33 +0200 Subject: [PATCH 4/5] Use tcp.Direction{Original|Reverse} constants --- packetbeat/protos/mysql/mysql.go | 2 +- packetbeat/protos/mysql/mysql_test.go | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/packetbeat/protos/mysql/mysql.go b/packetbeat/protos/mysql/mysql.go index cf7035258feb..89bd0221a77c 100644 --- a/packetbeat/protos/mysql/mysql.go +++ b/packetbeat/protos/mysql/mysql.go @@ -497,7 +497,7 @@ func (mysql *mysqlPlugin) Parse(pkt *protos.Packet, tcptuple *common.TCPTuple, if priv.data[dir] == nil { dstPort := tcptuple.DstPort - if dir == 0 { + if dir == tcp.TCPDirectionReverse { dstPort = tcptuple.SrcPort } priv.data[dir] = &mysqlStream{ diff --git a/packetbeat/protos/mysql/mysql_test.go b/packetbeat/protos/mysql/mysql_test.go index b2f445e85696..d832231add2b 100644 --- a/packetbeat/protos/mysql/mysql_test.go +++ b/packetbeat/protos/mysql/mysql_test.go @@ -23,6 +23,7 @@ import ( "encoding/hex" "net" "testing" + "time" "github.com/stretchr/testify/assert" @@ -31,8 +32,7 @@ import ( "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/packetbeat/protos" - - "time" + "github.com/elastic/beats/packetbeat/protos/tcp" ) const serverPort = 3306 @@ -367,7 +367,7 @@ func TestParseMySQL_simpleUpdateResponse(t *testing.T) { countHandleMysql++ } - mysql.Parse(&pkt, &tuple, 1, private) + mysql.Parse(&pkt, &tuple, tcp.TCPDirectionOriginal, private) if countHandleMysql != 1 { t.Errorf("handleMysql not called") @@ -408,7 +408,7 @@ func TestParseMySQL_threeResponses(t *testing.T) { countHandleMysql++ } - mysql.Parse(&pkt, &tuple, 1, private) + mysql.Parse(&pkt, &tuple, tcp.TCPDirectionOriginal, private) if countHandleMysql != 3 { t.Errorf("handleMysql not called three times") @@ -450,7 +450,7 @@ func TestParseMySQL_splitResponse(t *testing.T) { countHandleMysql++ } - private = mysql.Parse(&pkt, &tuple, 1, private).(mysqlPrivateData) + private = mysql.Parse(&pkt, &tuple, tcp.TCPDirectionOriginal, private).(mysqlPrivateData) if countHandleMysql != 0 { t.Errorf("handleMysql called on first run") } @@ -543,12 +543,12 @@ func Test_gap_in_response(t *testing.T) { private := protos.ProtocolData(new(mysqlPrivateData)) - private = mysql.Parse(&req, tcptuple, 1, private) - private = mysql.Parse(&resp, tcptuple, 0, private) + private = mysql.Parse(&req, tcptuple, tcp.TCPDirectionOriginal, private) + private = mysql.Parse(&resp, tcptuple, tcp.TCPDirectionReverse, private) logp.Debug("mysql", "Now sending gap..") - _, drop := mysql.GapInStream(tcptuple, 0, 10, private) + _, drop := mysql.GapInStream(tcptuple, tcp.TCPDirectionReverse, 10, private) assert.Equal(t, true, drop) trans := expectTransaction(t, store) From bce039a6b10cb6fbd33e076d9a9464dd3cb35ab5 Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Fri, 31 Aug 2018 15:36:30 +0200 Subject: [PATCH 5/5] Improve test --- packetbeat/tests/system/test_0067_mysql_connection_phase.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packetbeat/tests/system/test_0067_mysql_connection_phase.py b/packetbeat/tests/system/test_0067_mysql_connection_phase.py index 62170e51c29d..ac4118e332c0 100644 --- a/packetbeat/tests/system/test_0067_mysql_connection_phase.py +++ b/packetbeat/tests/system/test_0067_mysql_connection_phase.py @@ -19,4 +19,5 @@ def test_connection_phase(self): self.run_packetbeat(pcap="mysql_connection.pcap") objs = self.read_output() - assert len(objs) > 0 + assert len(objs) == 1 + assert objs[0]['query'] == 'SELECT DATABASE()'