diff --git a/.travis.yml b/.travis.yml index a3ac7e4..b715616 100644 --- a/.travis.yml +++ b/.travis.yml @@ -9,6 +9,7 @@ env: go: - 1.11.x - 1.12.x + - 1.13.x install: false script: diff --git a/dialects/kafka/consumer/main.go b/dialects/kafka/consumer/main.go index ef485ff..dd4066b 100644 --- a/dialects/kafka/consumer/main.go +++ b/dialects/kafka/consumer/main.go @@ -90,7 +90,7 @@ func (client *Client) Connect(brokers []string, config *sarama.Config, initialOf topics := []string{} for _, topic := range ts { - topics = append(topics, topic.Name) + topics = append(topics, topic.Name()) } client.conn = conn @@ -123,11 +123,11 @@ func (client *Client) Subscribe(topics ...types.Topic) (<-chan *types.Message, e } for _, topic := range topics { - if client.topics[topic.Name] == nil { - client.topics[topic.Name] = NewTopic() + if client.topics[topic.Name()] == nil { + client.topics[topic.Name()] = NewTopic() } - client.topics[topic.Name].subscriptions[subscription.messages] = subscription + client.topics[topic.Name()].subscriptions[subscription.messages] = subscription } return subscription.messages, nil @@ -167,7 +167,8 @@ func (client *Client) Claim(consumed *sarama.ConsumerMessage) (err error) { defer client.topics[topic].mutex.RUnlock() for _, subscription := range client.topics[topic].subscriptions { - message.Async() + message.Reset() + select { case subscription.messages <- message: result := message.Await() diff --git a/dialects/kafka/metadata/message.go b/dialects/kafka/metadata/message.go index f2a82ac..ad0b8ce 100644 --- a/dialects/kafka/metadata/message.go +++ b/dialects/kafka/metadata/message.go @@ -21,9 +21,7 @@ func MessageFromMessage(consumed *sarama.ConsumerMessage) *commander.Message { }) message := &types.Message{ - Topic: types.Topic{ - Name: consumed.Topic, - }, + Topic: types.NewTopic(consumed.Topic, nil, types.EventMessage, types.DefaultMode), Data: consumed.Value, Key: consumed.Key, Timestamp: consumed.Timestamp, @@ -135,7 +133,7 @@ func MessageToMessage(produce *commander.Message) *sarama.ProducerMessage { } return &sarama.ProducerMessage{ - Topic: produce.Topic.Name, + Topic: produce.Topic.Name(), Key: sarama.ByteEncoder(produce.Key), Value: sarama.ByteEncoder(produce.Data), Headers: headers, diff --git a/dialects/mock/consumer.go b/dialects/mock/consumer.go index d795b51..47a5f6d 100644 --- a/dialects/mock/consumer.go +++ b/dialects/mock/consumer.go @@ -25,7 +25,7 @@ func (consumer *Consumer) Emit(message *types.Message) { defer consumer.consumptions.Done() - collection, has := consumer.subscriptions[message.Topic.Name] + collection, has := consumer.subscriptions[message.Topic.Name()] if !has { consumer.mutex.Unlock() return @@ -42,7 +42,7 @@ func (consumer *Consumer) Emit(message *types.Message) { go func(collection *SubscriptionCollection, message *types.Message) { collection.mutex.Lock() for _, subscription := range collection.list { - message.Async() + message.Reset() subscription.messages <- message message.Await() } @@ -68,11 +68,11 @@ func (consumer *Consumer) Subscribe(topics ...types.Topic) (<-chan *types.Messag defer consumer.mutex.Unlock() for _, topic := range topics { - if consumer.subscriptions[topic.Name] == nil { - consumer.subscriptions[topic.Name] = NewTopic() + if consumer.subscriptions[topic.Name()] == nil { + consumer.subscriptions[topic.Name()] = NewTopic() } - consumer.subscriptions[topic.Name].list[subscription.messages] = subscription + consumer.subscriptions[topic.Name()].list[subscription.messages] = subscription } consumer.logger.Debugf("subscribing to: %+v, %v", topics, subscription.messages) diff --git a/examples/mock/go.mod b/examples/mock/go.mod index e52cf03..8746f5d 100644 --- a/examples/mock/go.mod +++ b/examples/mock/go.mod @@ -3,6 +3,7 @@ module github.com/jeroenrinzema/commander/examples/mock require ( github.com/gofrs/uuid v3.2.0+incompatible github.com/jeroenrinzema/commander v1.0.0-rc.25 + google.golang.org/api v0.9.0 // indirect ) replace github.com/jeroenrinzema/commander => ../../ diff --git a/examples/mock/go.sum b/examples/mock/go.sum index 91b1a9c..3d2b80d 100644 --- a/examples/mock/go.sum +++ b/examples/mock/go.sum @@ -1,4 +1,9 @@ +cloud.google.com/go v0.26.0 h1:e0WKqKTd5BnrG8aKH3J3h+QvEIQtSUcf2n5UZ5ZgLtQ= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +cloud.google.com/go v0.38.0 h1:ROfEUZz+Gh5pa62DJWXSaonyu3StP6EA6lPEXPI6mCo= +cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= @@ -12,10 +17,24 @@ github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRx github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= +github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= +github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= +github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= +github.com/googleapis/gax-go/v2 v2.0.5 h1:sjZBwGj9Jlw33ImPtvFviGYvseOtDM7hkSKB7+Tv3SM= +github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= +github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU= +github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/jeroenrinzema/commander/examples/kafka v0.0.0-20190530123436-e19f7e417aeb/go.mod h1:VW4mbxUMl4lofIwZz4mOZglXHzqWQ4HD25qMvAsNEJo= github.com/jeroenrinzema/commander/examples/kafka v0.0.0-20190613124800-6c8bc78e3138/go.mod h1:VW4mbxUMl4lofIwZz4mOZglXHzqWQ4HD25qMvAsNEJo= @@ -25,6 +44,7 @@ github.com/jeroenrinzema/commander/examples/mock-multiple-groups v0.0.0-20190530 github.com/jeroenrinzema/commander/examples/mock-multiple-groups v0.0.0-20190613124800-6c8bc78e3138/go.mod h1:0DeSCXOO2GXIi4n+FUWGS6o/pCI/ZBpsKebVuW6hIj8= github.com/jeroenrinzema/commander/examples/streaming v0.0.0-20190702094603-24861dd4d416/go.mod h1:7463oz3hsyxpJu+n9qvoXo63DISG29e8OXbSD/Jpx10= github.com/jeroenrinzema/commander/examples/zipkin v0.0.0-20190613124800-6c8bc78e3138/go.mod h1:0kH96eT7K8FRLOQj/93XMym10lFwlLKJD9MSthr90z0= +github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= @@ -38,25 +58,75 @@ github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4 github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +go.opencensus.io v0.21.0 h1:mU6zScU4U1YAFPHEHYk+3JC4SY7JxgkqS10ZOSyksNg= +go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/lint v0.0.0-20190409202823-959b441ac422/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20181114220301-adae6a3d119a h1:gOpx8G595UYyvj8UK4+OFyY4rx037g3fmfhe5SasG3U= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c h1:uOCk1iQW6Vc18bnC13MfzScl+wdKBmM9Y9kU7Z83/lw= +golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be h1:vEDujvNQGv4jgYKudGeI/+DAX4Jffq6hpD55MmoEvKs= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= +golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 h1:SVwTIAaPC2U/AvvLNZ2a7OVsmBpC8L5BlwK1whH3hm0= +golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b h1:ag/x1USPSsqHud38I9BAC88qdNLDHHtQ4mlgQIZPPNA= +golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190312170243-e65039ee4138/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190506145303-2d16b83fe98c/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE= +google.golang.org/api v0.9.0 h1:jbyannxz0XFD3zdjgrSUsaJbgpH4eTrkdhRChkHPfO8= +google.golang.org/api v0.9.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= +google.golang.org/genproto v0.0.0-20190418145605-e7d98fc518a7/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= +google.golang.org/genproto v0.0.0-20190502173448-54afdca5d873 h1:nfPFGzJkUDX6uBmpN/pSw7MbOAWegH5QDQuoXFHedLg= +google.golang.org/genproto v0.0.0-20190502173448-54afdca5d873/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.20.1 h1:Hz2g2wirWK7H0qIIhGIqRGTuMwTE8HEKFnDZZ7lm9NU= +google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/examples/mock/main.go b/examples/mock/main.go index 40e5a80..612f054 100644 --- a/examples/mock/main.go +++ b/examples/mock/main.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" "os" + "time" "github.com/gofrs/uuid" "github.com/jeroenrinzema/commander" @@ -16,6 +17,8 @@ func main() { dialect := mock.NewDialect() group := commander.NewGroup( + commander.WithJSONCodec(), + commander.WithAwaitTimeout(1*time.Second), commander.NewTopic("commands", dialect, commander.CommandMessage, commander.ConsumeMode|commander.ProduceMode), commander.NewTopic("events", dialect, commander.EventMessage, commander.ConsumeMode|commander.ProduceMode), ) @@ -44,7 +47,7 @@ func main() { */ http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { key := uuid.Must(uuid.NewV4()).Bytes() - command := commander.NewMessage("example", 1, key, nil) + command := commander.NewMessage("example", 1, key, []byte(`{"message":"hello world"}`)) event, err := group.SyncCommand(command) if err != nil { diff --git a/examples/streaming/main.go b/examples/streaming/main.go index 8ca4d73..01b36bc 100644 --- a/examples/streaming/main.go +++ b/examples/streaming/main.go @@ -4,7 +4,6 @@ import ( "encoding/json" "fmt" "net/http" - "os" "time" "github.com/gofrs/uuid" @@ -14,7 +13,7 @@ import ( ) func main() { - os.Setenv("DEBUG", "true") + // os.Setenv("DEBUG", "true") dialect := mock.NewDialect() group := commander.NewGroup( @@ -69,9 +68,9 @@ func main() { // The connection is closed when a timeout is reached of a EOS event is consumed. for message := range messages { parent, has := types.ParentIDFromContext(message.Ctx) - if !has || parent != types.ParentID(message.ID) { + if !has || parent != types.ParentID(command.ID) { message.Next() - break + continue } json.NewEncoder(w).Encode(message) diff --git a/group.go b/group.go index 5c54946..9b51d42 100644 --- a/group.go +++ b/group.go @@ -19,13 +19,6 @@ var ( ErrNoAction = errors.New("no action defined") ) -const ( - // DefaultAttempts represents the default amount of retry attempts - DefaultAttempts = 5 - // DefaultTimeout represents the default timeout when awaiting a "sync" command to complete - DefaultTimeout = 5 * time.Second -) - // EventType represents a middleware event type type EventType string @@ -40,25 +33,19 @@ const ( ) // NewGroup initializes a new commander group. -func NewGroup(t ...Topic) *Group { - topics := make(map[types.TopicMode][]types.Topic) - for _, topic := range t { - if topic.HasMode(ConsumeMode) { - topics[ConsumeMode] = append(topics[ConsumeMode], topic) - } - - if topic.HasMode(ProduceMode) { - topics[ProduceMode] = append(topics[ProduceMode], topic) - } - } +func NewGroup(definitions ...types.GroupOption) *Group { + options := types.NewGroupOptions(definitions) group := &Group{ - Timeout: DefaultTimeout, - Retries: DefaultAttempts, - Topics: topics, + Timeout: options.Timeout, + Retries: options.Retries, + Topics: options.Topics, + Codec: options.Codec, logger: log.New(), } + // NOTE: possible creation of a "universal" logger interface that could easily be implemented. + // Log levels should be defined/set outside of commander if os.Getenv(DebugEnv) != "" { group.logger.SetLevel(log.DebugLevel) } @@ -73,24 +60,23 @@ func NewGroup(t ...Topic) *Group { type Group struct { Middleware *middleware.Client Timeout time.Duration - Topics map[types.TopicMode][]types.Topic - Retries int + Topics []types.Topic + Codec types.Codec + Retries int8 logger *log.Logger } // Close represents a closing method -type Close func() +type Close = types.Close // Next indicates that the next message could be called -type Next func() +type Next = types.Next // Handle message handle message, writer implementation -type Handle func(*Message, Writer) +type Handle = types.Handle // Handler interface handle wrapper -type Handler interface { - Handle(*Message, Writer) -} +type Handler = types.Handler // AsyncCommand produces a message to the given group command topic // and does not await for the responding event. If no command key is set will the command id be used. @@ -215,8 +201,12 @@ func (group *Group) AwaitEOS(messages <-chan *types.Message, parent types.Parent func (group *Group) FetchTopics(t types.MessageType, m types.TopicMode) []types.Topic { topics := []Topic{} - for _, topic := range group.Topics[m] { - if topic.Type != t { + for _, topic := range group.Topics { + if topic.Type() != t { + continue + } + + if !topic.HasMode(m) { continue } @@ -243,13 +233,8 @@ func (group *Group) ProduceCommand(message *Message) error { topic := topics[0] message.Topic = topic - amount := group.Retries - if amount == 0 { - amount = DefaultAttempts - } - retry := Retry{ - Amount: amount, + Amount: group.Retries, } err := retry.Attempt(func() error { @@ -280,13 +265,8 @@ func (group *Group) ProduceEvent(message *Message) error { topic := topics[0] message.Topic = topic - amount := group.Retries - if amount == 0 { - amount = DefaultAttempts - } - retry := Retry{ - Amount: amount, + Amount: group.Retries, } err := retry.Attempt(func() error { @@ -306,7 +286,7 @@ func (group *Group) Publish(message *Message) error { group.Middleware.Emit(message.Ctx, BeforePublish, message) defer group.Middleware.Emit(message.Ctx, AfterPublish, message) - err := message.Topic.Dialect.Producer().Publish(message) + err := message.Topic.Dialect().Producer().Publish(message) if err != nil { return err } @@ -330,7 +310,7 @@ func (group *Group) NewConsumer(sort types.MessageType) (<-chan *types.Message, topic := topics[0] sink := make(chan *Message, 0) - messages, err := topic.Dialect.Consumer().Subscribe(topics...) + messages, err := topic.Dialect().Consumer().Subscribe(topics...) if err != nil { close(sink) @@ -370,7 +350,7 @@ func (group *Group) NewConsumer(sort types.MessageType) (<-chan *types.Message, breaker.Open() close(sink) - go topic.Dialect.Consumer().Unsubscribe(messages) + go topic.Dialect().Consumer().Unsubscribe(messages) } return sink, closer, nil @@ -402,28 +382,52 @@ func (group *Group) NewConsumerWithDeadline(timeout time.Duration, t types.Messa return messages, closing, nil } +// Handle awaits messages from the given MessageType and action. +// Once a message is received is the callback method called with the received command. +// The handle is closed once the consumer receives a close signal. +func (group *Group) Handle(sort types.MessageType, action string, handler Handler) (Close, error) { + return group.HandleFunc(sort, action, handler.Handle) +} + // HandleFunc awaits messages from the given MessageType and action. // Once a message is received is the callback method called with the received command. // The handle is closed once the consumer receives a close signal. func (group *Group) HandleFunc(sort types.MessageType, action string, callback Handle) (Close, error) { - group.logger.Debugf("setting up new consumer handle: %d, %s", sort, action) + return group.HandleContext( + WithAction(action), + WithMessageType(sort), + WithCallback(callback), + WithMessageSchema(func() interface{} { + return group.Codec.Schema() + }), + ) +} + +// HandleContext constructs a handle context based on the given definitions. +func (group *Group) HandleContext(definitions ...types.HandleOption) (Close, error) { + options := types.NewHandleOptions(definitions) + group.logger.Debugf("setting up new consumer handle: %d, %s", options.MessageType, options.Action) - messages, closing, err := group.NewConsumer(sort) + messages, closing, err := group.NewConsumer(options.MessageType) if err != nil { return nil, err } go func() { for message := range messages { - if message.Action != action { + if options.Action != "" && message.Action != options.Action { message.Next() continue } + schema := options.Schema() + group.Codec.Unmarshal(message.Data, &schema) + message.NewSchema(schema) + group.Middleware.Emit(message.Ctx, BeforeActionConsumption, message) writer := NewWriter(group, message) - callback(message, writer) + options.Callback(message, writer) message.Next() @@ -433,10 +437,3 @@ func (group *Group) HandleFunc(sort types.MessageType, action string, callback H return closing, nil } - -// Handle awaits messages from the given MessageType and action. -// Once a message is received is the callback method called with the received command. -// The handle is closed once the consumer receives a close signal. -func (group *Group) Handle(sort types.MessageType, action string, handler Handler) (Close, error) { - return group.HandleFunc(sort, action, handler.Handle) -} diff --git a/group_test.go b/group_test.go index 540f89a..dac10b2 100644 --- a/group_test.go +++ b/group_test.go @@ -458,7 +458,7 @@ func TestCommandTimestampPassed(t *testing.T) { // TestMessageMarked tests if the command timestamp is passed to the produced event func TestMessageMarked(t *testing.T) { message := types.NewMessage("testing", 1, nil, nil) - message.Async() + message.Reset() go func() { message.Next() }() diff --git a/main.go b/main.go index b453292..c5591b7 100644 --- a/main.go +++ b/main.go @@ -37,21 +37,18 @@ func NewClient(groups ...*Group) (*Client, error) { for _, group := range groups { group.Middleware = client.Middleware - - for _, t := range group.Topics { - topics = append(topics, t...) - } + topics = append(topics, group.Topics...) } topic: for _, topic := range topics { for _, dialect := range dialects { - if topic.Dialect == dialect { + if topic.Dialect() == dialect { continue topic } } - dialects = append(dialects, topic.Dialect) + dialects = append(dialects, topic.Dialect()) } for _, dialect := range dialects { @@ -75,15 +72,13 @@ func (client *Client) Close() error { dialects := make(map[types.Dialect]bool) for _, group := range client.Groups { - for _, topics := range group.Topics { - for _, topic := range topics { - if dialects[topic.Dialect] { - continue - } - - topic.Dialect.Close() - dialects[topic.Dialect] = true + for _, topic := range group.Topics { + if dialects[topic.Dialect()] { + continue } + + topic.Dialect().Close() + dialects[topic.Dialect()] = true } } diff --git a/options.go b/options.go new file mode 100644 index 0000000..4c1049a --- /dev/null +++ b/options.go @@ -0,0 +1,72 @@ +package commander + +import ( + "time" + + "github.com/jeroenrinzema/commander/types" +) + +type timeout struct { + duration time.Duration +} + +func (t *timeout) Apply(options *types.GroupOptions) { + options.Timeout = t.duration +} + +// WithAwaitTimeout returns a GroupOption that configures the timeout period for the given group +func WithAwaitTimeout(d time.Duration) types.GroupOption { + return &timeout{d} +} + +type action struct { + name string +} + +func (a *action) Apply(options *types.HandleOptions) { + options.Action = a.name +} + +// WithAction returns a HandleOptions that configures the action handle +func WithAction(n string) types.HandleOption { + return &action{n} +} + +type messageType struct { + value types.MessageType +} + +func (t *messageType) Apply(options *types.HandleOptions) { + options.MessageType = t.value +} + +// WithMessageType returns a HandleOptions that configures the message type handle +func WithMessageType(t types.MessageType) types.HandleOption { + return &messageType{t} +} + +type callback struct { + handle types.Handle +} + +func (c *callback) Apply(options *types.HandleOptions) { + options.Callback = c.handle +} + +// WithCallback returns a HandleOptions that configures the callback method for a given handle +func WithCallback(h types.Handle) types.HandleOption { + return &callback{h} +} + +type schema struct { + handle func() interface{} +} + +func (s *schema) Apply(options *types.HandleOptions) { + options.Schema = s.handle +} + +// WithMessageSchema returns a HandleOptions that configures the message schema for a handle +func WithMessageSchema(f func() interface{}) types.HandleOption { + return &schema{f} +} diff --git a/retry.go b/retry.go index b4f1d86..25eda58 100644 --- a/retry.go +++ b/retry.go @@ -2,8 +2,8 @@ package commander // Retry allowes a given method to be retried x amount of times. type Retry struct { - Amount int `json:"amount"` - Retries int + Amount int8 `json:"amount"` + Retries int8 } // Attempt tries to attempt the given method for the given amount of retries. diff --git a/types.go b/types.go index 05ebea0..13dbf38 100644 --- a/types.go +++ b/types.go @@ -46,3 +46,6 @@ type Topic = types.Topic // NewMessage types.NewMessage alias var NewMessage = types.NewMessage + +// WithJSONCodec types.WithJSONCodec alias +var WithJSONCodec = types.WithJSONCodec diff --git a/types/codec.go b/types/codec.go new file mode 100644 index 0000000..34d43db --- /dev/null +++ b/types/codec.go @@ -0,0 +1,79 @@ +package types + +import "encoding/json" + +// Codec Codec defines the interface commander uses to encode and decode messages. +// Note that implementations of this interface must be thread safe; a Codec's methods can be called from concurrent goroutines. +type Codec interface { + // Marshal returns the wire format of s. + // The s interface could be nil which represents a unkown schema format. + // When no schema is defined should a default schema be used or a error be thrown. + Marshal(s interface{}) ([]byte, error) + // Unmarshal parses the wire format into s. + // The s interface could be nil which represents a unkown schema format. + // When no schema is defined should a default schema be used or a error be thrown. + Unmarshal(data []byte, s interface{}) error + // Schema returns the default schema implementation for the given codec. + Schema() interface{} +} + +// DefaultCodec returns the default codec that preforms no action during marshalling ur unmarshalling +func DefaultCodec() Codec { + return &IgnoreCodec{} +} + +// IgnoreCodec is the default codec interperter that preforms no action during marshalling ur unmarshalling +type IgnoreCodec struct { +} + +// Schema returns the default schema implementation for the JSON codec +func (codec *IgnoreCodec) Schema() interface{} { + return make([]byte, 0) +} + +// Apply applies the given JSON codec to the given server options +func (codec *IgnoreCodec) Apply(options *GroupOptions) { + options.Codec = codec +} + +// Marshal returns the wire format of s. +func (codec *IgnoreCodec) Marshal(s interface{}) (bb []byte, err error) { + return nil, nil +} + +// Unmarshal parses the wire format into s. +func (codec *IgnoreCodec) Unmarshal(data []byte, s interface{}) (err error) { + return nil +} + +// WithJSONCodec constructs a new JSON message interperter used for encoding and decoding messages. +func WithJSONCodec() GroupOption { + return &JSONCodec{} +} + +// JSONCodec is a JSON message codec interperter used for encoding and decoding messages. +// All messages are decoded as map[string]interface{}. +type JSONCodec struct { +} + +// Schema returns the default schema implementation for the JSON codec +func (codec *JSONCodec) Schema() interface{} { + return make(map[string]interface{}) +} + +// Apply applies the given JSON codec to the given server options +func (codec *JSONCodec) Apply(options *GroupOptions) { + options.Codec = codec +} + +// Marshal returns the wire format of s. +func (codec *JSONCodec) Marshal(s interface{}) (bb []byte, err error) { + bb, err = json.Marshal(s) + return bb, err +} + +// Unmarshal parses the wire format into s. +func (codec *JSONCodec) Unmarshal(data []byte, s interface{}) (err error) { + err = json.Unmarshal(data, &s) + return err +} diff --git a/types/group.go b/types/group.go new file mode 100644 index 0000000..5b33536 --- /dev/null +++ b/types/group.go @@ -0,0 +1,15 @@ +package types + +// Close represents a closing method +type Close func() + +// Next indicates that the next message could be called +type Next func() + +// Handle message handle message, writer implementation +type Handle func(*Message, Writer) + +// Handler interface handle wrapper +type Handler interface { + Handle(*Message, Writer) +} diff --git a/types/message.go b/types/message.go index 0ba8b29..fa5110c 100644 --- a/types/message.go +++ b/types/message.go @@ -102,12 +102,23 @@ type Message struct { Ctx context.Context `json:"-"` // NOTE: include message topic origin? + schema interface{} async chan struct{} result error once sync.Once mutex sync.RWMutex } +// Schema returns the decoded message schema +func (message *Message) Schema() interface{} { + return message.schema +} + +// NewSchema overrides the message schema with the given value +func (message *Message) NewSchema(v interface{}) { + message.schema = v +} + // NewError construct a new error message with the given message as parent func (message *Message) NewError(action string, status StatusCode, err error) *Message { child := message.NewMessage(action, message.Version, message.Key, []byte(err.Error())) @@ -143,8 +154,8 @@ func (message *Message) NewMessage(action string, version Version, key Key, data return child } -// Async set's up a new async that awaits untill resolved -func (message *Message) Async() { +// Reset set's up a new async resolver that awaits untill resolved +func (message *Message) Reset() { if message == nil { return } diff --git a/types/options.go b/types/options.go new file mode 100644 index 0000000..27bc8a9 --- /dev/null +++ b/types/options.go @@ -0,0 +1,81 @@ +package types + +import "time" + +const ( + // DefaultRetries represents the default amount of retry attempts + DefaultRetries = 5 + // DefaultTimeout represents the default timeout when awaiting a "sync" command to complete + DefaultTimeout = 5 * time.Second +) + +// NewServerOptions applies the given serve options to construct a new server options definition +func NewServerOptions(options []ServerOption) (result *ServerOptions) { + result = &ServerOptions{} + for _, option := range options { + option.Apply(result) + } + return result +} + +// ServerOption sets options such as timeouts, codec and retries +type ServerOption interface { + Apply(*ServerOptions) +} + +// ServerOptions represent the available set of server options +type ServerOptions struct { + Timeout time.Duration + Retries int8 +} + +// NewGroupOptions applies the given serve options to construct a new group options definition +func NewGroupOptions(options []GroupOption) (result *GroupOptions) { + result = &GroupOptions{ + Timeout: DefaultTimeout, + Retries: DefaultRetries, + Codec: DefaultCodec(), + } + + for _, option := range options { + option.Apply(result) + } + + return result +} + +// GroupOption sets options such as topic definitions and timeouts +type GroupOption interface { + Apply(*GroupOptions) +} + +// GroupOptions represent the available set of group options +type GroupOptions struct { + Timeout time.Duration + Codec Codec + Retries int8 + Topics []Topic +} + +// NewHandleOptions applies the given serve options to construct a new handle options definition +func NewHandleOptions(options []HandleOption) (result *HandleOptions) { + // TODO: define default options + result = &HandleOptions{} + for _, option := range options { + option.Apply(result) + } + return result +} + +// HandleOption sets options such as codec interfaces and timeouts +type HandleOption interface { + Apply(*HandleOptions) +} + +// HandleOptions represent the available set of handle options +type HandleOptions struct { + Action string + MessageType MessageType + Schema func() interface{} + Callback Handle +} diff --git a/types/topic.go b/types/topic.go index 46135a5..e1a9976 100644 --- a/types/topic.go +++ b/types/topic.go @@ -19,31 +19,62 @@ func NewTopic(name string, dialect Dialect, t MessageType, m TopicMode) Topic { m = DefaultMode } - topic := Topic{ - Name: name, - Dialect: dialect, - Type: t, - Mode: m, + topic := &topic{ + name: name, + dialect: dialect, + messages: t, + mode: m, } dialect.Assigned(topic) return topic } -// Topic contains information of a kafka topic -type Topic struct { - Name string - Dialect Dialect - Type MessageType - Mode TopicMode +// Topic represents a subject for a dialect including it's +// consumer/producer mode. +type Topic interface { + // Dialect returns the topic Dialect + Dialect() Dialect + // Type returns the topic type + Type() MessageType + // Mode returns the topic mode + Mode() TopicMode + // HasMode checks if the topic represents the given topic type + HasMode(TopicMode) bool + // Name returns the topic name + Name() string + // Apply applies the topic to the given group configuration + Apply(*GroupOptions) } -// HasMode checks if the topic represents the given topic type -func (topic *Topic) HasMode(m TopicMode) bool { - return topic.Mode&(m) > 0 +// Topic interface implementation +type topic struct { + name string + dialect Dialect + messages MessageType + mode TopicMode } -// String returns the topic name -func (topic *Topic) String() string { - return topic.Name +func (topic *topic) Apply(options *GroupOptions) { + options.Topics = append(options.Topics, topic) +} + +func (topic *topic) Dialect() Dialect { + return topic.dialect +} + +func (topic *topic) Type() MessageType { + return topic.messages +} + +func (topic *topic) Mode() TopicMode { + return topic.mode +} + +func (topic *topic) HasMode(m TopicMode) bool { + return topic.mode&(m) > 0 +} + +func (topic *topic) Name() string { + return topic.name } diff --git a/types/writer.go b/types/writer.go new file mode 100644 index 0000000..b949189 --- /dev/null +++ b/types/writer.go @@ -0,0 +1,37 @@ +package types + +// Writer handle implementation for a given group and message +type Writer interface { + // Event creates and produces a new event to the assigned group. + // The produced event is marked as EOS (end of stream). + Event(action string, version int8, key []byte, data []byte) (*Message, error) + + // EventStream creates and produces a new event to the assigned group. + // The produced event is one of many events in the event stream. + EventStream(action string, version int8, key []byte, data []byte) (*Message, error) + + // EventEOS alias of Event + EventEOS(action string, version int8, key []byte, data []byte) (*Message, error) + + // Error produces a new error event to the assigned group. + // The produced error event is marked as EOS (end of stream). + Error(action string, status StatusCode, err error) (*Message, error) + + // ErrorStream produces a new error event to the assigned group. + // The produced error is one of many events in the event stream. + ErrorStream(action string, status StatusCode, err error) (*Message, error) + + // ErrorEOS alias of Error + ErrorEOS(action string, status StatusCode, err error) (*Message, error) + + // Command produces a new command to the assigned group. + // The produced error event is marked as EOS (end of stream). + Command(action string, version int8, key []byte, data []byte) (*Message, error) + + // CommandStream produces a new command to the assigned group. + // The produced comamnd is one of many commands in the command stream. + CommandStream(action string, version int8, key []byte, data []byte) (*Message, error) + + // CommandEOS alias of Command + CommandEOS(action string, version int8, key []byte, data []byte) (*Message, error) +} diff --git a/writer.go b/writer.go index b12a95a..e34c740 100644 --- a/writer.go +++ b/writer.go @@ -1,8 +1,6 @@ package commander import ( - "log" - "github.com/jeroenrinzema/commander/types" ) @@ -17,40 +15,7 @@ func NewWriter(group *Group, parent *Message) Writer { } // Writer handle implementation for a given group and message -type Writer interface { - // Event creates and produces a new event to the assigned group. - // The produced event is marked as EOS (end of stream). - Event(action string, version int8, key []byte, data []byte) (*Message, error) - - // EventStream creates and produces a new event to the assigned group. - // The produced event is one of many events in the event stream. - EventStream(action string, version int8, key []byte, data []byte) (*Message, error) - - // EventEOS alias of Event - EventEOS(action string, version int8, key []byte, data []byte) (*Message, error) - - // Error produces a new error event to the assigned group. - // The produced error event is marked as EOS (end of stream). - Error(action string, status types.StatusCode, err error) (*Message, error) - - // ErrorStream produces a new error event to the assigned group. - // The produced error is one of many events in the event stream. - ErrorStream(action string, status types.StatusCode, err error) (*Message, error) - - // ErrorEOS alias of Error - ErrorEOS(action string, status types.StatusCode, err error) (*Message, error) - - // Command produces a new command to the assigned group. - // The produced error event is marked as EOS (end of stream). - Command(action string, version int8, key []byte, data []byte) (*Message, error) - - // CommandStream produces a new command to the assigned group. - // The produced comamnd is one of many commands in the command stream. - CommandStream(action string, version int8, key []byte, data []byte) (*Message, error) - - // CommandEOS alias of Command - CommandEOS(action string, version int8, key []byte, data []byte) (*Message, error) -} +type Writer = types.Writer // Writer is a struct representing the ResponseWriter interface type writer struct { @@ -60,7 +25,6 @@ type writer struct { // NewMessage constructs a new message or a child of the parent. func (writer *writer) NewMessage(action string, version int8, key []byte, data []byte) *Message { - log.Println(writer.parent) if writer.parent != nil { return writer.parent.NewMessage(action, types.Version(version), types.Key(key), data) }