Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add pipeline route for mqttproxy #453

Merged
merged 15 commits into from
Jan 12, 2022
33 changes: 13 additions & 20 deletions pkg/context/mqttcontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,19 @@ type (
MQTTContext interface {
Context
Client() MQTTClient
Backend() MQTTBackend
Cancel(error)
Canceled() bool
Duration() time.Duration
Finish()

PacketType() MQTTPacketType
ConnectPacket() *packets.ConnectPacket // read only
PublishPacket() *packets.PublishPacket // read only
DisconnectPacket() *packets.DisconnectPacket // read only
SubscribePacket() *packets.SubscribePacket // read only
UnsubscribePacket() *packets.UnsubscribePacket // read only

PublishPacket() MQTTPublishPacket

SetDrop() // set drop value to true
Drop() bool // if true, this mqtt packet will be dropped
SetDisconnect() // set disconnect value to true
Expand All @@ -52,11 +52,6 @@ type (
EarlyStop() bool // if early stop is true, pipeline will skip following filters and return
}

// MQTTBackend is backend of MQTT proxy
MQTTBackend interface {
Publish(target string, data []byte, headers map[string]string) error
}

// MQTTClient contains client info that send this packet
MQTTClient interface {
ClientID() string
Expand All @@ -77,10 +72,11 @@ type (
startTime time.Time
endTime time.Time
client MQTTClient
backend MQTTBackend
packet packets.ControlPacket
packetType MQTTPacketType

publishPacket MQTTPublishPacket

err error
drop int32
disconnect int32
Expand All @@ -101,13 +97,13 @@ const (
MQTTPublish MQTTPacketType = 2

// MQTTDisconnect is mqtt packet type of disconnect
MQTTDisconnect = 3
MQTTDisconnect MQTTPacketType = 3

// MQTTSubscribe is mqtt packet type of subscribe
MQTTSubscribe = 4
MQTTSubscribe MQTTPacketType = 4

// MQTTUnsubscribe is mqtt packet type of unsubscribe
MQTTUnsubscribe = 5
MQTTUnsubscribe MQTTPacketType = 5

// MQTTOther is all other mqtt packet type
MQTTOther MQTTPacketType = 99
Expand All @@ -116,22 +112,22 @@ const (
var _ MQTTContext = (*mqttContext)(nil)

// NewMQTTContext create new MQTTContext
func NewMQTTContext(ctx stdcontext.Context, backend MQTTBackend, client MQTTClient, packet packets.ControlPacket) MQTTContext {
func NewMQTTContext(ctx stdcontext.Context, client MQTTClient, packet packets.ControlPacket) MQTTContext {
stdctx, cancelFunc := stdcontext.WithCancel(ctx)
startTime := time.Now()
mqttCtx := &mqttContext{
ctx: stdctx,
cancelFunc: cancelFunc,
startTime: startTime,
client: client,
backend: backend,
}

switch packet.(type) {
switch packet := packet.(type) {
case *packets.ConnectPacket:
mqttCtx.packetType = MQTTConnect
case *packets.PublishPacket:
mqttCtx.packetType = MQTTPublish
mqttCtx.publishPacket = newMQTTPublishPacket(packet)
case *packets.DisconnectPacket:
mqttCtx.packetType = MQTTDisconnect
case *packets.SubscribePacket:
Expand All @@ -142,6 +138,7 @@ func NewMQTTContext(ctx stdcontext.Context, backend MQTTBackend, client MQTTClie
mqttCtx.packetType = MQTTOther
}
mqttCtx.packet = packet

return mqttCtx
}

Expand Down Expand Up @@ -226,8 +223,8 @@ func (ctx *mqttContext) ConnectPacket() *packets.ConnectPacket {
return ctx.packet.(*packets.ConnectPacket)
}

func (ctx *mqttContext) PublishPacket() *packets.PublishPacket {
return ctx.packet.(*packets.PublishPacket)
func (ctx *mqttContext) PublishPacket() MQTTPublishPacket {
return ctx.publishPacket
}

func (ctx *mqttContext) DisconnectPacket() *packets.DisconnectPacket {
Expand Down Expand Up @@ -265,7 +262,3 @@ func (ctx *mqttContext) SetEarlyStop() {
func (ctx *mqttContext) EarlyStop() bool {
return atomic.LoadInt32(&ctx.earlyStop) == 1
}

func (ctx *mqttContext) Backend() MQTTBackend {
return ctx.backend
}
24 changes: 0 additions & 24 deletions pkg/context/mqttmock.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,27 +52,3 @@ func (m *MockMQTTClient) Store(key interface{}, value interface{}) {
func (m *MockMQTTClient) Delete(key interface{}) {
m.MockKVMap.Delete(key)
}

// MockMQTTMsg is message send to MockMQTTBackend
type MockMQTTMsg struct {
Target string
Data []byte
Headers map[string]string
}

// MockMQTTBackend is mocked MQTT backend
type MockMQTTBackend struct {
Messages map[string]MockMQTTMsg
}

var _ MQTTBackend = (*MockMQTTBackend)(nil)

// Publish publish msg to MockMQTTBackend
func (m *MockMQTTBackend) Publish(target string, data []byte, headers map[string]string) error {
m.Messages[target] = MockMQTTMsg{
Target: target,
Data: data,
Headers: headers,
}
return nil
}
90 changes: 90 additions & 0 deletions pkg/context/mqttpublish.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright (c) 2017, MegaEase
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package context

import "github.com/eclipse/paho.mqtt.golang/packets"

type (

// MQTTPublishPacket defines publish packet for MQTTContext
MQTTPublishPacket interface {
Topic() string
SetTopic(string)

GetHeader(string) string
SetAllHeader(map[string]string)
SetHeader(string, string)
VisitAllHeader(func(string, string))

Payload() []byte
}

mqttPublishPacket struct {
topic string
header map[string]string
payload []byte
}
)

var _ MQTTPublishPacket = (*mqttPublishPacket)(nil)

func newMQTTPublishPacket(packet *packets.PublishPacket) MQTTPublishPacket {
publish := &mqttPublishPacket{
topic: packet.TopicName,
header: make(map[string]string),
payload: packet.Payload,
}
return publish
}

// Topic return topic of mqttPublishPacket
func (p *mqttPublishPacket) Topic() string {
return p.topic
}

// SetTopic set topic of mqttPublishPacket
func (p *mqttPublishPacket) SetTopic(topic string) {
p.topic = topic
}

// GetHeader get header of given key, if key not exist, return empty string
func (p *mqttPublishPacket) GetHeader(key string) string {
return p.header[key]
}

// SetAllHeader set header of mqttPublishPacket
func (p *mqttPublishPacket) SetAllHeader(header map[string]string) {
p.header = header
}

// SetHeader add key, value pair to mqttPublishPacket header
func (p *mqttPublishPacket) SetHeader(key, value string) {
p.header[key] = value
}

// VisitAllHeader run function to all header
func (p *mqttPublishPacket) VisitAllHeader(fn func(key string, value string)) {
for k, v := range p.header {
fn(k, v)
}
}

// Payload return payload of mqttPublishPacket
func (p *mqttPublishPacket) Payload() []byte {
return p.payload
}
2 changes: 1 addition & 1 deletion pkg/filter/connectcontrol/connectcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (cc *ConnectControl) checkBan(ctx context.MQTTContext) bool {
if _, ok := cc.bannedClients[cid]; ok {
return true
}
topic := ctx.PublishPacket().TopicName
topic := ctx.PublishPacket().Topic()
if cc.bannedTopicRe != nil && cc.bannedTopicRe.MatchString(topic) {
return true
}
Expand Down
5 changes: 1 addition & 4 deletions pkg/filter/connectcontrol/connectcontrol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,12 @@ func init() {
}

func newContext(cid string, topic string) context.MQTTContext {
backend := &context.MockMQTTBackend{
Messages: make(map[string]context.MockMQTTMsg),
}
client := &context.MockMQTTClient{
MockClientID: cid,
}
packet := packets.NewControlPacket(packets.Publish).(*packets.PublishPacket)
packet.TopicName = topic
return context.NewMQTTContext(stdcontext.Background(), backend, client, packet)
return context.NewMQTTContext(stdcontext.Background(), client, packet)
}

func defaultFilterSpec(spec *Spec) *pipeline.FilterSpec {
Expand Down
Loading