diff --git a/Gopkg.lock b/Gopkg.lock index 889a38ef909a3..cd8226d284c46 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -3,33 +3,44 @@ [[projects]] branch = "master" + digest = "1:be3ccd9f881604e4dd6d15cccfa126aa309232f0ba075ae5f92d3ef729a62758" name = "github.com/BurntSushi/toml" packages = ["."] + pruneopts = "NUT" revision = "a368813c5e648fee92e5f6c30e3944ff9d5e8895" [[projects]] + digest = "1:9752dad5e89cd779096bf2477a4ded16bea7ac62de453c8d6b4bf841d51a8512" name = "github.com/apache/thrift" packages = ["lib/go/thrift"] + pruneopts = "NUT" revision = "b2a4d4ae21c789b689dd162deb819665567f481c" version = "0.10.0" [[projects]] + digest = "1:75d40fa0c338f4c56056a3985e91fa371e8fcd0293e45b80afa7debecaf56012" name = "github.com/beorn7/perks" packages = ["quantile"] + pruneopts = "NUT" revision = "3ac7bf7a47d159a033b107610db8a1b6575507a4" [[projects]] branch = "master" + digest = "1:7b12da6e82292eb06e24dfe544c628115a3f4316c152f9dcb87d4f60cbf7cd7d" name = "github.com/blacktear23/go-proxyprotocol" packages = ["."] + pruneopts = "NUT" revision = "62e368e1c4700c34b4b6f77afd49b215211574c2" [[projects]] + digest = "1:60142a898f3808b3e6aa604e5f3296bdef921e625ef3223b6019c1a345b8765c" name = "github.com/codahale/hdrhistogram" packages = ["."] + pruneopts = "NUT" revision = "f8ad88b59a584afeee9d334eff879b104439117b" [[projects]] + digest = "1:7a5c43af23a0c21f4f1762d54af95586f9c04836257e66631557d8f8200ef0e1" name = "github.com/coreos/etcd" packages = [ "auth/authpb", @@ -38,40 +49,52 @@ "etcdserver/api/v3rpc/rpctypes", "etcdserver/etcdserverpb", "mvcc/mvccpb", - "pkg/types" + "pkg/types", ] + pruneopts = "NUT" revision = "eddf599c689ec85f4752060edff5a72e81e9106a" version = "v3.2.18" [[projects]] + digest = "1:f80ed82cae006d02025cd63bd7cbe63a7e593de2714db785ea36d6323cc995eb" name = "github.com/cznic/mathutil" packages = ["."] + pruneopts = "NUT" revision = "78ad7f262603437f0ecfebc835d80094f89c8f54" [[projects]] branch = "master" + digest = "1:809006f9378a46bcc70bc4330d14f43c1a7818ae9d93c09cab062e575d7e95a2" name = "github.com/cznic/sortutil" packages = ["."] + pruneopts = "NUT" revision = "4c7342852e65c2088c981288f2c5610d10b9f7f4" [[projects]] branch = "master" + digest = "1:f18dbc529543fe5fd5294f8385ea1f71681be964c43461f5f45335bb51ba83ae" name = "github.com/etcd-io/gofail" packages = ["runtime"] + pruneopts = "NUT" revision = "51ce9a71510a58bad5ae66ddd278ef28762a1550" [[projects]] + digest = "1:973dbcbbb1be662b61604319582383c315add70906a68d14894f31542ffc3a25" name = "github.com/go-sql-driver/mysql" packages = ["."] + pruneopts = "NUT" revision = "3955978caca48c1658a4bb7a9c6a0f084e326af3" [[projects]] + digest = "1:38e684375ef5b55e812332266d63f9fc5b6329ab303067c4cdda051db6d29ca4" name = "github.com/gogo/protobuf" packages = ["proto"] + pruneopts = "NUT" revision = "636bf0302bc95575d69441b25a2603156ffdddf1" version = "v1.1.1" [[projects]] + digest = "1:6aef947ba53156da1a66ee891d70d61835e0dcfc9f0d728ae1132db651e81c22" name = "github.com/golang/protobuf" packages = [ "jsonpb", @@ -81,111 +104,143 @@ "ptypes/any", "ptypes/duration", "ptypes/struct", - "ptypes/timestamp" + "ptypes/timestamp", ] + pruneopts = "NUT" revision = "b4deda0973fb4c70b50d226b1af49f3da59f5265" version = "v1.1.0" [[projects]] + digest = "1:c6dfb6c55c1989f1d89622b3c45b786127f76f47322c50e487585f823cb12543" name = "github.com/golang/snappy" packages = ["."] + pruneopts = "NUT" revision = "723cc1e459b8eea2dea4583200fd60757d40097a" [[projects]] branch = "master" + digest = "1:c0883bc20a7c1ff552ff53d414f3cdc28fe847d15fab48c4486aa772ab2fb131" name = "github.com/google/btree" packages = ["."] + pruneopts = "NUT" revision = "316fb6d3f031ae8f4d457c6c5186b9e3ded70435" [[projects]] + digest = "1:dbd86d229eacaa86a98b10f8fb3e3fc69a1913e0f4e010e7cc1f92bf12edca92" name = "github.com/gorilla/context" packages = ["."] + pruneopts = "NUT" revision = "1ea25387ff6f684839d82767c1733ff4d4d15d0a" version = "v1.1" [[projects]] + digest = "1:054b11c45900b575d23fa8e0fa3636a784e2c4d1d43d05e8e20fd592ebe0d5db" name = "github.com/gorilla/mux" packages = ["."] + pruneopts = "NUT" revision = "599cba5e7b6137d46ddf58fb1765f5d928e69604" [[projects]] + digest = "1:0aa5274053fdc232896f0835c712f4a39992d959f8e2363f189a7b0df36f136b" name = "github.com/grpc-ecosystem/go-grpc-middleware" packages = [ ".", "tags", "tracing/opentracing", - "util/metautils" + "util/metautils", ] + pruneopts = "NUT" revision = "82921fcf811d228d2fa202bc31238b356bf9f8d5" [[projects]] + digest = "1:96c558cff0532e2e9ffc0b6d7c8c7431c592d781b109343aa51e27f9fd9a6b82" name = "github.com/grpc-ecosystem/go-grpc-prometheus" packages = ["."] + pruneopts = "NUT" revision = "6b7015e65d366bf3f19b2b2a000a831940f0f7e0" version = "v1.1" [[projects]] + digest = "1:5e55a8699c9ff7aba1e4c8952aeda209685d88d4cb63a8766c338e333b8e65d6" name = "github.com/klauspost/cpuid" packages = ["."] + pruneopts = "NUT" revision = "ae7887de9fa5d2db4eaa8174a7eff2c1ac00f2da" version = "v1.1" [[projects]] branch = "master" + digest = "1:5985ef4caf91ece5d54817c11ea25f182697534f8ae6521eadcd628c142ac4b6" name = "github.com/matttproud/golang_protobuf_extensions" packages = ["pbutil"] + pruneopts = "NUT" revision = "c12348ce28de40eed0136aa2b644d0ee0650e56c" [[projects]] branch = "master" + digest = "1:b95cb972bfd3eb737d334b1703a51a82d1d1d3e92f7f355798d94796c7120c73" name = "github.com/ngaut/pools" packages = ["."] + pruneopts = "NUT" revision = "b7bc8c42aac787667ba45adea78233f53f548443" [[projects]] branch = "master" + digest = "1:7d5f99346aa63d23681f4d92708469f32384d0c26722a4de5725bd0f22caedac" name = "github.com/ngaut/sync2" packages = ["."] + pruneopts = "NUT" revision = "7a24ed77b2efb460c1468b7dc917821c66e80e55" [[projects]] + digest = "1:cc405544fecfb5a8e0c409127ef67ce3b91d11143a00121e5b822e4f8eabe7d2" name = "github.com/opentracing/basictracer-go" packages = [ ".", - "wire" + "wire", ] + pruneopts = "NUT" revision = "1b32af207119a14b1b231d451df3ed04a72efebf" version = "v1.0.0" [[projects]] + digest = "1:7da29c22bcc5c2ffb308324377dc00b5084650348c2799e573ed226d8cc9faf0" name = "github.com/opentracing/opentracing-go" packages = [ ".", "ext", - "log" + "log", ] + pruneopts = "NUT" revision = "1949ddbfd147afd4d964a9f00b24eb291e0e7c38" version = "v1.0.2" [[projects]] branch = "master" + digest = "1:3bf17a6e6eaa6ad24152148a631d18662f7212e21637c2699bff3369b7f00fa2" name = "github.com/petar/GoLLRB" packages = ["llrb"] + pruneopts = "NUT" revision = "53be0d36a84c2a886ca057d34b6aa4468df9ccb4" [[projects]] branch = "master" + digest = "1:dc85cc12f7f97c2b033157f9d40395b2f0458b7bd3083257f8f661ea6f48f89a" name = "github.com/pingcap/check" packages = ["."] + pruneopts = "NUT" revision = "1c287c953996ab3a0bf535dba9d53d809d3dc0b6" [[projects]] + digest = "1:677ff2ee188099669fbecf583de8c23111020adee48d71a6f60c733f6b2fea7a" name = "github.com/pingcap/errors" packages = ["."] + pruneopts = "NUT" revision = "31ffda8a65b0f910c6d65f1fef40e761fd606384" version = "v0.10.1" [[projects]] + digest = "1:8fd099a567b1e9b3a7e1f66d8547a0d2b1852427e86a0dae96fa59e9583e13e6" name = "github.com/pingcap/goleveldb" packages = [ "leveldb", @@ -199,12 +254,14 @@ "leveldb/opt", "leveldb/storage", "leveldb/table", - "leveldb/util" + "leveldb/util", ] + pruneopts = "NUT" revision = "8d44bfdf1030639ae7130922c95df12d6d4da3b6" [[projects]] branch = "master" + digest = "1:84db0209caf3c48beaec43d15bd22ca6256d958b807c2ac626e2425e232e92c4" name = "github.com/pingcap/kvproto" packages = [ "pkg/coprocessor", @@ -214,12 +271,14 @@ "pkg/metapb", "pkg/pdpb", "pkg/raft_serverpb", - "pkg/tikvpb" + "pkg/tikvpb", ] + pruneopts = "NUT" revision = "529c652955d8fa74faf56f91b2f428d5779fd7d5" [[projects]] branch = "master" + digest = "1:5c94c36f7291e82207aff9f34fcdf0382a7d7d78a85a7d21f30ee1b54637d843" name = "github.com/pingcap/parser" packages = [ ".", @@ -231,78 +290,112 @@ "mysql", "opcode", "terror", - "types" + "types", ] + pruneopts = "NUT" revision = "53ac409ed043f89a850043d2440aaaa7dd863a1c" [[projects]] branch = "master" + digest = "1:8f576565a8479071ca1951bf678519b1c32480aad8dec2379a64858c31d9e6a9" name = "github.com/pingcap/pd" packages = ["client"] + pruneopts = "NUT" revision = "eb892dda1e33a0b76191d39894ad4a806f313f6e" +[[projects]] + digest = "1:ef1a3a4694f068c0f914a809fbfa48e0a44d50dc1154d72e85b15be01facda23" + name = "github.com/pingcap/tidb-tools" + packages = [ + "pkg/etcd", + "pkg/utils", + "tidb-binlog/node", + "tidb-binlog/pump_client", + ] + pruneopts = "NUT" + revision = "5db58e3b7e6613456551c40d011806a346b2f44a" + [[projects]] branch = "master" + digest = "1:9d9d9bf758fc9bbf1a4976534c1622dbca61a8da53d98416dbb711e309e379f7" name = "github.com/pingcap/tipb" packages = [ "go-binlog", "go-tipb", - "sharedbytes" + "sharedbytes", ] + pruneopts = "NUT" revision = "11e33c750323a0267a6aee335eaaeb7831bdae67" [[projects]] + digest = "1:4b96ce0f151b10c52b1d6876c179e9a6d2329f98939c4cf8148daaf27f981413" name = "github.com/pkg/errors" packages = ["."] - revision = "9316aeb006f59424c65ff505c217f90c43d6445d" + pruneopts = "NUT" + revision = "1176802fff62540cc87d289bd40c52a2d6b2ea16" source = "https://github.com/pingcap/errors.git" - version = "v0.9.0" + version = "v0.11.0" [[projects]] + digest = "1:1ef3c4d6e78616bd3d1b5b7d8899febb9aa1b83d3373fbbdc2804408c7977b57" name = "github.com/prometheus/client_golang" packages = [ "prometheus", - "prometheus/push" + "prometheus/push", ] + pruneopts = "NUT" revision = "c5b7fccd204277076155f10851dad72b76a49317" version = "v0.8.0" [[projects]] + digest = "1:9fe8945a11a9f588a9d306b4741cad634da9015a704271b9506810e2cc77fa17" name = "github.com/prometheus/client_model" packages = ["go"] + pruneopts = "NUT" revision = "fa8ad6fec33561be4280a8f0514318c79d7f6cb6" [[projects]] + digest = "1:c90717fa0864d47e19eaa855af60b202b537795f485052c7f48333c679dd7310" name = "github.com/prometheus/common" packages = [ "expfmt", "internal/bitbucket.org/ww/goautoneg", - "model" + "model", ] + pruneopts = "NUT" revision = "4402f4e5ea79ec15f3c574773b6a5198fbea215f" [[projects]] + digest = "1:dcfff2d5e99e01dcb856dd8afb0b509c1d05443f0b523cc5333b33a819829ed9" name = "github.com/prometheus/procfs" packages = ["."] + pruneopts = "NUT" revision = "abf152e5f3e97f2fafac028d2cc06c1feb87ffa5" [[projects]] + digest = "1:4c173651d2deb815a0420aeb1b3f7ca3c4aef2d980ba164a501a53f6abf368ef" name = "github.com/sirupsen/logrus" packages = ["."] + pruneopts = "NUT" revision = "3bcb09397d6d88e7676a9bc8433ca11ba5304837" [[projects]] + digest = "1:0af5ed795eeb9df3e3d32e2c0229b012e2c107945f75a0556733d643c94e55be" name = "github.com/spaolacci/murmur3" packages = ["."] + pruneopts = "NUT" revision = "0d12bf811670bf6a1a63828dfbd003eded177fce" version = "v1.0" [[projects]] + digest = "1:194c26fad062f6b1530720ee1afd6cd6f40d79274b2434caef2693b1da5d2ab2" name = "github.com/twinj/uuid" packages = ["."] + pruneopts = "NUT" revision = "70cac2bcd273ef6a371bb96cde363d28b68734c3" [[projects]] + digest = "1:0e28a98b9579858cb5de885935499fefebc8bc44a652cde08a6d035ee7435603" name = "github.com/uber/jaeger-client-go" packages = [ ".", @@ -314,18 +407,22 @@ "thrift-gen/jaeger", "thrift-gen/sampling", "thrift-gen/zipkincore", - "utils" + "utils", ] + pruneopts = "NUT" revision = "d021e646f5187d77b55592c3efee1a2810e895d7" version = "v2.8.0" [[projects]] + digest = "1:0da2810678a062e0567c3215911869b0423da0e497c56683ff8e87e7a6952597" name = "github.com/uber/jaeger-lib" packages = ["metrics"] + pruneopts = "NUT" revision = "3b2a9ad2a045881ab7a0f81d465be54c8292ee4f" version = "v1.1.0" [[projects]] + digest = "1:686219a880e6ec42870431372756a66c19c1396e9fe203b659179422d3c6bf96" name = "golang.org/x/net" packages = [ "context", @@ -334,17 +431,21 @@ "idna", "internal/timeseries", "lex/httplex", - "trace" + "trace", ] + pruneopts = "NUT" revision = "d1e1b351919c6738fdeb9893d5c998b161464f0c" [[projects]] branch = "master" + digest = "1:2f375ec82e53522eb4a1670f2f24f064f407ef2b32e01bb217f5daa4a4d226d6" name = "golang.org/x/sys" packages = ["unix"] + pruneopts = "NUT" revision = "7dfd1290c7917b7ba22824b9d24954ab3002fe24" [[projects]] + digest = "1:a005696b163ffe1842de27eeb3ccfece9d3c2e70d02b83ea1d8c0eeab597c9e0" name = "golang.org/x/text" packages = [ "encoding", @@ -366,19 +467,23 @@ "unicode/bidi", "unicode/cldr", "unicode/norm", - "unicode/rangetable" + "unicode/rangetable", ] + pruneopts = "NUT" revision = "4ee4af566555f5fbe026368b75596286a312663a" [[projects]] + digest = "1:0efcfe82e59b828eb6f4115bba88ff45c0898c38e823fbe7f450bdffed9e739b" name = "google.golang.org/genproto" packages = [ "googleapis/api/annotations", - "googleapis/rpc/status" + "googleapis/rpc/status", ] + pruneopts = "NUT" revision = "6b7d9516179cd47f4714cfeb0103ad1dede756c4" [[projects]] + digest = "1:7cd8d3d0020bbbe235958c87a2c042c4a8505e401b6e0496880990228fc34f4b" name = "google.golang.org/grpc" packages = [ ".", @@ -405,21 +510,88 @@ "stats", "status", "tap", - "transport" + "transport", ] + pruneopts = "NUT" revision = "7a6a684ca69eb4cae85ad0a484f2e531598c047b" source = "https://github.com/grpc/grpc-go.git" version = "v1.12.2" [[projects]] + digest = "1:7d95d61ff5828a65cf072a46f3c68d67daffdd11e97d758b0af2176cde717fcd" name = "gopkg.in/natefinch/lumberjack.v2" packages = ["."] + pruneopts = "NUT" revision = "dd45e6a67c53f673bb49ca8a001fd3a63ceb640e" version = "v2.0" [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "97e05f1d47a56848ca61ce3ca55d52257dca3643cb3b540136da4029fb4c45e8" + input-imports = [ + "github.com/BurntSushi/toml", + "github.com/blacktear23/go-proxyprotocol", + "github.com/coreos/etcd/clientv3", + "github.com/coreos/etcd/clientv3/concurrency", + "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes", + "github.com/coreos/etcd/mvcc/mvccpb", + "github.com/cznic/mathutil", + "github.com/cznic/sortutil", + "github.com/etcd-io/gofail/runtime", + "github.com/go-sql-driver/mysql", + "github.com/golang/protobuf/jsonpb", + "github.com/golang/protobuf/proto", + "github.com/google/btree", + "github.com/gorilla/mux", + "github.com/grpc-ecosystem/go-grpc-middleware", + "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing", + "github.com/grpc-ecosystem/go-grpc-prometheus", + "github.com/klauspost/cpuid", + "github.com/ngaut/pools", + "github.com/ngaut/sync2", + "github.com/opentracing/basictracer-go", + "github.com/opentracing/opentracing-go", + "github.com/pingcap/check", + "github.com/pingcap/goleveldb/leveldb", + "github.com/pingcap/goleveldb/leveldb/comparer", + "github.com/pingcap/goleveldb/leveldb/iterator", + "github.com/pingcap/goleveldb/leveldb/memdb", + "github.com/pingcap/goleveldb/leveldb/opt", + "github.com/pingcap/goleveldb/leveldb/storage", + "github.com/pingcap/goleveldb/leveldb/util", + "github.com/pingcap/kvproto/pkg/coprocessor", + "github.com/pingcap/kvproto/pkg/errorpb", + "github.com/pingcap/kvproto/pkg/kvrpcpb", + "github.com/pingcap/kvproto/pkg/metapb", + "github.com/pingcap/kvproto/pkg/tikvpb", + "github.com/pingcap/parser", + "github.com/pingcap/parser/ast", + "github.com/pingcap/parser/auth", + "github.com/pingcap/parser/charset", + "github.com/pingcap/parser/model", + "github.com/pingcap/parser/mysql", + "github.com/pingcap/parser/opcode", + "github.com/pingcap/parser/terror", + "github.com/pingcap/parser/types", + "github.com/pingcap/pd/client", + "github.com/pingcap/tidb-tools/tidb-binlog/pump_client", + "github.com/pingcap/tipb/go-binlog", + "github.com/pingcap/tipb/go-tipb", + "github.com/pkg/errors", + "github.com/prometheus/client_golang/prometheus", + "github.com/prometheus/client_golang/prometheus/push", + "github.com/sirupsen/logrus", + "github.com/spaolacci/murmur3", + "github.com/twinj/uuid", + "github.com/uber/jaeger-client-go/config", + "golang.org/x/net/context", + "golang.org/x/text/transform", + "google.golang.org/grpc", + "google.golang.org/grpc/codes", + "google.golang.org/grpc/credentials", + "google.golang.org/grpc/keepalive", + "google.golang.org/grpc/metadata", + "gopkg.in/natefinch/lumberjack.v2", + ] solver-name = "gps-cdcl" solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml index b67407783d317..57a9061159479 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -95,9 +95,14 @@ required = ["github.com/golang/protobuf/jsonpb"] [[constraint]] name = "github.com/pkg/errors" - version = "0.9.0" + version = "0.11.0" source = "https://github.com/pingcap/errors.git" [[constraint]] branch = "master" - name = "github.com/pingcap/parser" \ No newline at end of file + name = "github.com/pingcap/parser" + + +[[constraint]] + name = "github.com/pingcap/tidb-tools" + revision = "5db58e3b7e6613456551c40d011806a346b2f44a" diff --git a/vendor/github.com/pingcap/tidb-tools/LICENSE b/vendor/github.com/pingcap/tidb-tools/LICENSE new file mode 100644 index 0000000000000..b67d9091009d1 --- /dev/null +++ b/vendor/github.com/pingcap/tidb-tools/LICENSE @@ -0,0 +1,201 @@ +Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/github.com/pingcap/tidb-tools/pkg/etcd/etcd.go b/vendor/github.com/pingcap/tidb-tools/pkg/etcd/etcd.go new file mode 100644 index 0000000000000..9919ad7da3069 --- /dev/null +++ b/vendor/github.com/pingcap/tidb-tools/pkg/etcd/etcd.go @@ -0,0 +1,236 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcd + +import ( + "crypto/tls" + "path" + "strings" + "time" + + "github.com/coreos/etcd/clientv3" + "github.com/pkg/errors" + "golang.org/x/net/context" +) + +// Node organizes the ectd query result as a Trie tree +type Node struct { + Value []byte + Childs map[string]*Node +} + +// Client is a wrapped etcd client that support some simple method +type Client struct { + client *clientv3.Client + rootPath string +} + +// NewClient returns a wrapped etcd client +func NewClient(cli *clientv3.Client, root string) *Client { + return &Client{ + client: cli, + rootPath: root, + } +} + +// NewClientFromCfg returns a wrapped etcd client +func NewClientFromCfg(endpoints []string, dialTimeout time.Duration, root string, security *tls.Config) (*Client, error) { + cli, err := clientv3.New(clientv3.Config{ + Endpoints: endpoints, + DialTimeout: dialTimeout, + TLS: security, + }) + if err != nil { + return nil, errors.Trace(err) + } + + return &Client{ + client: cli, + rootPath: root, + }, nil +} + +// Close shutdowns the connection to etcd +func (e *Client) Close() error { + if err := e.client.Close(); err != nil { + return errors.Trace(err) + } + return nil +} + +// Create guarantees to set a key = value with some options(like ttl) +func (e *Client) Create(ctx context.Context, key string, val string, opts []clientv3.OpOption) error { + key = keyWithPrefix(e.rootPath, key) + txnResp, err := e.client.KV.Txn(ctx).If( + clientv3.Compare(clientv3.ModRevision(key), "=", 0), + ).Then( + clientv3.OpPut(key, val, opts...), + ).Commit() + if err != nil { + return errors.Trace(err) + } + + if !txnResp.Succeeded { + return errors.AlreadyExistsf("key %s in etcd", key) + } + + return nil +} + +// Get returns a key/value matchs the given key +func (e *Client) Get(ctx context.Context, key string) ([]byte, error) { + key = keyWithPrefix(e.rootPath, key) + resp, err := e.client.KV.Get(ctx, key) + if err != nil { + return nil, errors.Trace(err) + } + + if len(resp.Kvs) == 0 { + return nil, errors.NotFoundf("key %s in etcd", key) + } + + return resp.Kvs[0].Value, nil +} + +// Update updates a key/value. +// set ttl 0 to disable the Lease ttl feature +func (e *Client) Update(ctx context.Context, key string, val string, ttl int64) error { + key = keyWithPrefix(e.rootPath, key) + + var opts []clientv3.OpOption + if ttl > 0 { + lcr, err := e.client.Lease.Grant(ctx, ttl) + if err != nil { + return errors.Trace(err) + } + + opts = []clientv3.OpOption{clientv3.WithLease(lcr.ID)} + } + + txnResp, err := e.client.KV.Txn(ctx).If( + clientv3.Compare(clientv3.ModRevision(key), ">", 0), + ).Then( + clientv3.OpPut(key, val, opts...), + ).Commit() + if err != nil { + return errors.Trace(err) + } + + if !txnResp.Succeeded { + return errors.NotFoundf("key %s in etcd", key) + } + + return nil +} + +// UpdateOrCreate updates a key/value, if the key does not exist then create, or update +func (e *Client) UpdateOrCreate(ctx context.Context, key string, val string, ttl int64) error { + key = keyWithPrefix(e.rootPath, key) + + var opts []clientv3.OpOption + if ttl > 0 { + lcr, err := e.client.Lease.Grant(ctx, ttl) + if err != nil { + return errors.Trace(err) + } + + opts = []clientv3.OpOption{clientv3.WithLease(lcr.ID)} + } + + _, err := e.client.KV.Do(ctx, clientv3.OpPut(key, val, opts...)) + if err != nil { + return errors.Trace(err) + } + return nil +} + +// List returns the trie struct that constructed by the key/value with same prefix +func (e *Client) List(ctx context.Context, key string) (*Node, error) { + key = keyWithPrefix(e.rootPath, key) + if !strings.HasSuffix(key, "/") { + key += "/" + } + + resp, err := e.client.KV.Get(ctx, key, clientv3.WithPrefix()) + if err != nil { + return nil, errors.Trace(err) + } + + root := new(Node) + length := len(key) + for _, kv := range resp.Kvs { + key := string(kv.Key) + if len(key) <= length { + continue + } + + keyTail := key[length:] + tailNode := parseToDirTree(root, keyTail) + tailNode.Value = kv.Value + } + + return root, nil +} + +// Delete deletes the key/values with matching prefix or key +func (e *Client) Delete(ctx context.Context, key string, withPrefix bool) error { + key = keyWithPrefix(e.rootPath, key) + var opts []clientv3.OpOption + if withPrefix { + opts = []clientv3.OpOption{clientv3.WithPrefix()} + } + + _, err := e.client.KV.Delete(ctx, key, opts...) + if err != nil { + return errors.Trace(err) + } + + return nil +} + +// Watch watchs the events of key with prefix. +func (e *Client) Watch(ctx context.Context, prefix string) clientv3.WatchChan { + return e.client.Watch(ctx, prefix, clientv3.WithPrefix()) +} + +func parseToDirTree(root *Node, path string) *Node { + pathDirs := strings.Split(path, "/") + current := root + var next *Node + var ok bool + + for _, dir := range pathDirs { + if current.Childs == nil { + current.Childs = make(map[string]*Node) + } + + next, ok = current.Childs[dir] + if !ok { + current.Childs[dir] = new(Node) + next = current.Childs[dir] + } + + current = next + } + + return current +} + +func keyWithPrefix(prefix, key string) string { + if strings.HasPrefix(key, prefix) { + return key + } + + return path.Join(prefix, key) +} diff --git a/vendor/github.com/pingcap/tidb-tools/pkg/utils/cpu.go b/vendor/github.com/pingcap/tidb-tools/pkg/utils/cpu.go new file mode 100644 index 0000000000000..71e2a9d235ed0 --- /dev/null +++ b/vendor/github.com/pingcap/tidb-tools/pkg/utils/cpu.go @@ -0,0 +1,37 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "syscall" + "time" +) + +var ( + lastInspectUnixNano int64 + lastCPUUsageTime int64 +) + +// GetCPUPercentage calculates CPU usage and returns percentage in float64(e.g. 2.5 means 2.5%). +// http://man7.org/linux/man-pages/man2/getrusage.2.html +func GetCPUPercentage() float64 { + var ru syscall.Rusage + syscall.Getrusage(syscall.RUSAGE_SELF, &ru) + usageTime := ru.Utime.Nano() + ru.Stime.Nano() + nowTime := time.Now().UnixNano() + perc := float64(usageTime-lastCPUUsageTime) / float64(nowTime-lastInspectUnixNano) * 100.0 + lastInspectUnixNano = nowTime + lastCPUUsageTime = usageTime + return perc +} diff --git a/vendor/github.com/pingcap/tidb-tools/pkg/utils/errors.go b/vendor/github.com/pingcap/tidb-tools/pkg/utils/errors.go new file mode 100644 index 0000000000000..cc55919be7620 --- /dev/null +++ b/vendor/github.com/pingcap/tidb-tools/pkg/utils/errors.go @@ -0,0 +1,30 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "github.com/pkg/errors" +) + +//OriginError return original err +func OriginError(err error) error { + for { + e := errors.Cause(err) + if e == err { + break + } + err = e + } + return err +} diff --git a/vendor/github.com/pingcap/tidb-tools/pkg/utils/printer.go b/vendor/github.com/pingcap/tidb-tools/pkg/utils/printer.go new file mode 100644 index 0000000000000..94bac09a0a412 --- /dev/null +++ b/vendor/github.com/pingcap/tidb-tools/pkg/utils/printer.go @@ -0,0 +1,36 @@ +// Copyright 2017 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "fmt" + "runtime" +) + +// Version information. +var ( + Version = "None" + BuildTS = "None" + GitHash = "None" +) + +// GetRawInfo do what its name tells +func GetRawInfo(app string) string { + info := "" + info += fmt.Sprintf("%s: v%s\n", app, Version) + info += fmt.Sprintf("Git Commit Hash: %s\n", GitHash) + info += fmt.Sprintf("UTC Build Time: %s\n", BuildTS) + info += fmt.Sprintf("Go Version: %s\n", runtime.Version()) + return info +} diff --git a/vendor/github.com/pingcap/tidb-tools/pkg/utils/security.go b/vendor/github.com/pingcap/tidb-tools/pkg/utils/security.go new file mode 100644 index 0000000000000..a330bf0de08e7 --- /dev/null +++ b/vendor/github.com/pingcap/tidb-tools/pkg/utils/security.go @@ -0,0 +1,57 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "crypto/tls" + "crypto/x509" + "io/ioutil" + + "github.com/pkg/errors" +) + +// ToTLSConfig generates tls's config. +func ToTLSConfig(SSLCA, SSLCert, SSLKey string) (*tls.Config, error) { + var tlsConfig *tls.Config + if len(SSLCA) != 0 { + var certificates = make([]tls.Certificate, 0) + if len(SSLCert) != 0 && len(SSLKey) != 0 { + // Load the client certificates from disk + certificate, err := tls.LoadX509KeyPair(SSLCert, SSLKey) + if err != nil { + return nil, errors.Errorf("could not load client key pair: %s", err) + } + certificates = append(certificates, certificate) + } + + // Create a certificate pool from the certificate authority + certPool := x509.NewCertPool() + ca, err := ioutil.ReadFile(SSLCA) + if err != nil { + return nil, errors.Errorf("could not read ca certificate: %s", err) + } + + // Append the certificates from the CA + if !certPool.AppendCertsFromPEM(ca) { + return nil, errors.New("failed to append ca certs") + } + + tlsConfig = &tls.Config{ + Certificates: certificates, + RootCAs: certPool, + } + } + + return tlsConfig, nil +} diff --git a/vendor/github.com/pingcap/tidb-tools/pkg/utils/tso.go b/vendor/github.com/pingcap/tidb-tools/pkg/utils/tso.go new file mode 100644 index 0000000000000..221a896c3229c --- /dev/null +++ b/vendor/github.com/pingcap/tidb-tools/pkg/utils/tso.go @@ -0,0 +1,24 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "time" +) + +// TSOToRoughTime translates tso to rough time that used to display +func TSOToRoughTime(ts int64) time.Time { + t := time.Unix(ts>>18/1000, 0) + return t +} diff --git a/vendor/github.com/pingcap/tidb-tools/pkg/utils/urls.go b/vendor/github.com/pingcap/tidb-tools/pkg/utils/urls.go new file mode 100644 index 0000000000000..34651e68d0f19 --- /dev/null +++ b/vendor/github.com/pingcap/tidb-tools/pkg/utils/urls.go @@ -0,0 +1,55 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "net" + "net/url" + "strings" + + "github.com/pkg/errors" +) + +// ParseHostPortAddr returns a scheme://host:port or host:port list +func ParseHostPortAddr(s string) ([]string, error) { + strs := strings.Split(s, ",") + addrs := make([]string, 0, len(strs)) + + for _, str := range strs { + str = strings.TrimSpace(str) + + // str may looks like 127.0.0.1:8000 + if _, _, err := net.SplitHostPort(str); err == nil { + addrs = append(addrs, str) + continue + } + + u, err := url.Parse(str) + if err != nil { + return nil, errors.Errorf("parse url %s failed %v", str, err) + } + if u.Scheme != "http" && u.Scheme != "https" && u.Scheme != "unix" && u.Scheme != "unixs" { + return nil, errors.Errorf("URL scheme must be http, https, unix, or unixs: %s", str) + } + if _, _, err := net.SplitHostPort(u.Host); err != nil { + return nil, errors.Errorf(`URL address does not have the form "host:port": %s`, str) + } + if u.Path != "" { + return nil, errors.Errorf("URL must not contain a path: %s", str) + } + addrs = append(addrs, u.String()) + } + + return addrs, nil +} diff --git a/vendor/github.com/pingcap/tidb-tools/tidb-binlog/node/node.go b/vendor/github.com/pingcap/tidb-tools/tidb-binlog/node/node.go new file mode 100644 index 0000000000000..d1716842a6337 --- /dev/null +++ b/vendor/github.com/pingcap/tidb-tools/tidb-binlog/node/node.go @@ -0,0 +1,83 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package node + +var ( + // DefaultRootPath is the root path of the keys stored in etcd, the `v1` is the tidb-binlog's version. + DefaultRootPath = "/tidb-binlog/v1" + + // PumpNode is the name of pump. + PumpNode = "pump" + + // DrainerNode is the name of drainer. + DrainerNode = "drainer" + + // NodePrefix is the map (node => it's prefix in storage) + NodePrefix = map[string]string{ + PumpNode: "pumps", + DrainerNode: "drainers", + } +) + +const ( + // Online means the node can receive request. + Online = "online" + + // Pausing means the node is pausing. + Pausing = "pausing" + + // Paused means the node is already paused. + Paused = "paused" + + // Closing means the node is closing, and the state will be Offline when closed. + Closing = "closing" + + // Offline means the node is offline, and will not provide service. + Offline = "offline" +) + +// Label is key/value pairs that are attached to objects +type Label struct { + Labels map[string]string `json:"labels"` +} + +// Status describes the status information of a tidb-binlog node in etcd. +type Status struct { + // the id of node. + NodeID string `json:"nodeId"` + + // the host of pump or node. + Addr string `json:"host"` + + // the state of pump. + State string `json:"state"` + + // the node is alive or not. + IsAlive bool `json:"isAlive"` + + // the score of node, it is report by node, calculated by node's qps, disk usage and binlog's data size. + // if Score is less than 0, means this node is useless. Now only used for pump. + Score int64 `json:"score"` + + // the label of this node. Now only used for pump. + // pump client will only send to a pump which label is matched. + Label *Label `json:"label"` + + // for pump: max commit ts in pump + // for drainer: drainer has consume all binlog less than or equal MaxCommitTS + MaxCommitTS int64 `json:"maxCommitTS"` + + // UpdateTS is the last update ts of node's status. + UpdateTS int64 `json:"updateTS"` +} diff --git a/vendor/github.com/pingcap/tidb-tools/tidb-binlog/node/registry.go b/vendor/github.com/pingcap/tidb-tools/tidb-binlog/node/registry.go new file mode 100644 index 0000000000000..c1dc08794c96b --- /dev/null +++ b/vendor/github.com/pingcap/tidb-tools/tidb-binlog/node/registry.go @@ -0,0 +1,187 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package node + +import ( + "encoding/json" + "path" + "strings" + "time" + + "github.com/coreos/etcd/clientv3" + "github.com/pingcap/tidb-tools/pkg/etcd" + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + "golang.org/x/net/context" +) + +// EtcdRegistry wraps the reactions with etcd +type EtcdRegistry struct { + client *etcd.Client + reqTimeout time.Duration +} + +// NewEtcdRegistry returns an EtcdRegistry client +func NewEtcdRegistry(cli *etcd.Client, reqTimeout time.Duration) *EtcdRegistry { + return &EtcdRegistry{ + client: cli, + reqTimeout: reqTimeout, + } +} + +// Close closes the etcd client +func (r *EtcdRegistry) Close() error { + err := r.client.Close() + return errors.Trace(err) +} + +func (r *EtcdRegistry) prefixed(p ...string) string { + return path.Join(p...) +} + +// Node returns the nodeStatus that matchs nodeID in the etcd +func (r *EtcdRegistry) Node(pctx context.Context, prefix, nodeID string) (*Status, error) { + ctx, cancel := context.WithTimeout(pctx, r.reqTimeout) + defer cancel() + + data, err := r.client.Get(ctx, r.prefixed(prefix, nodeID)) + if err != nil { + return nil, errors.Trace(err) + } + + status := &Status{} + if err = json.Unmarshal(data, &status); err != nil { + return nil, errors.Annotatef(err, "Invalid nodeID(%s)", nodeID) + } + return status, nil +} + +// Nodes retruns all the nodeStatuses in the etcd +func (r *EtcdRegistry) Nodes(pctx context.Context, prefix string) ([]*Status, error) { + ctx, cancel := context.WithTimeout(pctx, r.reqTimeout) + defer cancel() + + resp, err := r.client.List(ctx, r.prefixed(prefix)) + if err != nil { + return nil, errors.Trace(err) + } + status, err := NodesStatusFromEtcdNode(resp) + if err != nil { + return nil, errors.Trace(err) + } + return status, nil +} + +// UpdateNode update the node information. +func (r *EtcdRegistry) UpdateNode(pctx context.Context, prefix string, status *Status) error { + ctx, cancel := context.WithTimeout(pctx, r.reqTimeout) + defer cancel() + + if exists, err := r.checkNodeExists(ctx, prefix, status.NodeID); err != nil { + return errors.Trace(err) + } else if !exists { + // not found then create a new node + log.Infof("node %s dosen't exist, will create one", status.NodeID) + return r.createNode(ctx, prefix, status) + } else { + // found it, update status infomation of the node + return r.updateNode(ctx, prefix, status) + } +} + +func (r *EtcdRegistry) checkNodeExists(ctx context.Context, prefix, nodeID string) (bool, error) { + _, err := r.client.Get(ctx, r.prefixed(prefix, nodeID)) + if err != nil { + if errors.IsNotFound(err) { + return false, nil + } + return false, errors.Trace(err) + } + return true, nil +} + +func (r *EtcdRegistry) updateNode(ctx context.Context, prefix string, status *Status) error { + objstr, err := json.Marshal(status) + if err != nil { + return errors.Annotatef(err, "error marshal NodeStatus(%v)", status) + } + key := r.prefixed(prefix, status.NodeID) + err = r.client.Update(ctx, key, string(objstr), 0) + return errors.Trace(err) +} + +func (r *EtcdRegistry) createNode(ctx context.Context, prefix string, status *Status) error { + objstr, err := json.Marshal(status) + if err != nil { + return errors.Annotatef(err, "error marshal NodeStatus(%v)", status) + } + key := r.prefixed(prefix, status.NodeID) + err = r.client.Create(ctx, key, string(objstr), nil) + return errors.Trace(err) +} + +// WatchNode watchs node's event +func (r *EtcdRegistry) WatchNode(pctx context.Context, prefix string) clientv3.WatchChan { + return r.client.Watch(pctx, prefix) +} + +func nodeStatusFromEtcdNode(id string, node *etcd.Node) (*Status, error) { + status := &Status{} + + if err := json.Unmarshal(node.Value, &status); err != nil { + return nil, errors.Annotatef(err, "error unmarshal NodeStatus with nodeID(%s), node value(%s)", id, node.Value) + } + + return status, nil +} + +// NodesStatusFromEtcdNode returns nodes' status under root node. +func NodesStatusFromEtcdNode(root *etcd.Node) ([]*Status, error) { + var statuses []*Status + for id, n := range root.Childs { + status, err := nodeStatusFromEtcdNode(id, n) + if err != nil { + return nil, err + } + if status == nil { + continue + } + statuses = append(statuses, status) + } + return statuses, nil +} + +// AnalyzeNodeID returns nodeID by analyze key path. +func AnalyzeNodeID(key string) string { + // the key looks like: /tidb-binlog/v1/pumps/nodeID, or /tidb-binlog/pumps/nodeID for old binlog version. + paths := strings.Split(key, "/") + nodeIDOffset := 3 + + if len(paths) >= 2 { + // version string start with 'v' + if !strings.HasPrefix(paths[1], "v") { + nodeIDOffset = 2 + } + } else { + log.Errorf("can't get nodeID or node type from key %s", key) + return "" + } + + if len(paths) < nodeIDOffset+1 { + log.Errorf("can't get nodeID or node type from key %s", key) + return "" + } + + return paths[nodeIDOffset] +} diff --git a/vendor/github.com/pingcap/tidb-tools/tidb-binlog/pump_client/client.go b/vendor/github.com/pingcap/tidb-tools/tidb-binlog/pump_client/client.go new file mode 100644 index 0000000000000..192ef1a5be7b1 --- /dev/null +++ b/vendor/github.com/pingcap/tidb-tools/tidb-binlog/pump_client/client.go @@ -0,0 +1,455 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + "crypto/tls" + "encoding/json" + "path" + "strings" + "sync" + "time" + + "github.com/coreos/etcd/mvcc/mvccpb" + "github.com/pingcap/pd/client" + "github.com/pingcap/tidb-tools/pkg/etcd" + "github.com/pingcap/tidb-tools/pkg/utils" + "github.com/pingcap/tidb-tools/tidb-binlog/node" + pb "github.com/pingcap/tipb/go-binlog" + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + "golang.org/x/net/context" +) + +const ( + // DefaultEtcdTimeout is the default timeout config for etcd. + DefaultEtcdTimeout = 5 * time.Second + + // DefaultRetryTime is the default time of retry. + DefaultRetryTime = 20 + + // DefaultBinlogWriteTimeout is the default max time binlog can use to write to pump. + DefaultBinlogWriteTimeout = 15 * time.Second + + // CheckInterval is the default interval for check unavaliable pumps. + CheckInterval = 30 * time.Second + + // RetryInterval is the default interval of retrying to write binlog. + RetryInterval = 100 * time.Millisecond +) + +var ( + // Logger is ... + Logger = log.New() + + // ErrNoAvaliablePump means no avaliable pump to write binlog. + ErrNoAvaliablePump = errors.New("no avaliable pump to write binlog") +) + +// PumpInfos saves pumps' infomations in pumps client. +type PumpInfos struct { + sync.RWMutex + // Pumps saves the map of pump's nodeID and pump status. + Pumps map[string]*PumpStatus + + // AvliablePumps saves the whole avaliable pumps' status. + AvaliablePumps map[string]*PumpStatus + + // UnAvaliablePumps saves the unAvaliable pumps. + // And only pump with Online state in this map need check is it avaliable. + UnAvaliablePumps map[string]*PumpStatus +} + +// PumpsClient is the client of pumps. +type PumpsClient struct { + ctx context.Context + + cancel context.CancelFunc + + wg sync.WaitGroup + + // ClusterID is the cluster ID of this tidb cluster. + ClusterID uint64 + + // the registry of etcd. + EtcdRegistry *node.EtcdRegistry + + // Pumps saves the pumps' information. + Pumps *PumpInfos + + // Selector will select a suitable pump. + Selector PumpSelector + + // the max retry time if write binlog failed. + RetryTime int + + // BinlogWriteTimeout is the max time binlog can use to write to pump. + BinlogWriteTimeout time.Duration + + // Security is the security config + Security *tls.Config +} + +// NewPumpsClient returns a PumpsClient. +func NewPumpsClient(etcdURLs string, timeout time.Duration, securityOpt pd.SecurityOption) (*PumpsClient, error) { + // TODO: get strategy from etcd, and can update strategy in real-time. now use Range as default. + strategy := Range + selector := NewSelector(strategy) + + ectdEndpoints, err := utils.ParseHostPortAddr(etcdURLs) + if err != nil { + return nil, errors.Trace(err) + } + + // get clusterid + pdCli, err := pd.NewClient(ectdEndpoints, securityOpt) + if err != nil { + return nil, errors.Trace(err) + } + clusterID := pdCli.GetClusterID(context.Background()) + pdCli.Close() + + security, err := utils.ToTLSConfig(securityOpt.CAPath, securityOpt.CertPath, securityOpt.KeyPath) + if err != nil { + return nil, errors.Trace(err) + } + + rootPath := path.Join(node.DefaultRootPath, node.NodePrefix[node.PumpNode]) + cli, err := etcd.NewClientFromCfg(ectdEndpoints, DefaultEtcdTimeout, rootPath, security) + if err != nil { + return nil, errors.Trace(err) + } + + pumpInfos := &PumpInfos{ + Pumps: make(map[string]*PumpStatus), + AvaliablePumps: make(map[string]*PumpStatus), + UnAvaliablePumps: make(map[string]*PumpStatus), + } + + ctx, cancel := context.WithCancel(context.Background()) + newPumpsClient := &PumpsClient{ + ctx: ctx, + cancel: cancel, + ClusterID: clusterID, + EtcdRegistry: node.NewEtcdRegistry(cli, DefaultEtcdTimeout), + Pumps: pumpInfos, + Selector: selector, + RetryTime: DefaultRetryTime, + BinlogWriteTimeout: timeout, + Security: security, + } + + err = newPumpsClient.getPumpStatus(ctx) + if err != nil { + return nil, errors.Trace(err) + } + newPumpsClient.Selector.SetPumps(copyPumps(newPumpsClient.Pumps.AvaliablePumps)) + + newPumpsClient.wg.Add(2) + go newPumpsClient.watchStatus() + go newPumpsClient.detect() + + return newPumpsClient, nil +} + +// getPumpStatus retruns all the pumps status in the etcd. +func (c *PumpsClient) getPumpStatus(pctx context.Context) error { + nodesStatus, err := c.EtcdRegistry.Nodes(pctx, node.DefaultRootPath) + if err != nil { + return errors.Trace(err) + } + + for _, status := range nodesStatus { + Logger.Debugf("[pumps client] get pump %v from etcd", status) + c.addPump(NewPumpStatus(status, c.Security), false) + } + + return nil +} + +// WriteBinlog writes binlog to a situable pump. +func (c *PumpsClient) WriteBinlog(binlog *pb.Binlog) error { + pump := c.Selector.Select(binlog) + if pump == nil { + return ErrNoAvaliablePump + } + Logger.Debugf("[pumps client] write binlog choose pump %s", pump.NodeID) + + commitData, err := binlog.Marshal() + if err != nil { + return errors.Trace(err) + } + req := &pb.WriteBinlogReq{ClusterID: c.ClusterID, Payload: commitData} + + retryTime := 0 + startTime := time.Now() + var resp *pb.WriteBinlogResp + + for { + if pump == nil { + return ErrNoAvaliablePump + } + + resp, err = pump.writeBinlog(req, c.BinlogWriteTimeout) + if err == nil && resp.Errmsg != "" { + err = errors.New(resp.Errmsg) + } + if err == nil { + return nil + } + Logger.Errorf("[pumps client] write binlog error %v", err) + + if binlog.Tp == pb.BinlogType_Commit { + // only use one pump to write commit binlog, util write success or blocked for ten minutes. + if time.Since(startTime) > 10*time.Minute { + break + } + } else { + if !isRetryableError(err) { + // this kind of error is not retryable, return directly. + return err + } + + // every pump can retry 5 times, if retry 5 times and still failed, set this pump unavaliable, and choose a new pump. + if (retryTime+1)%5 == 0 { + c.setPumpAvaliable(pump, false) + pump = c.Selector.Next(binlog, retryTime/5+1) + Logger.Debugf("[pumps client] avaliable pumps: %v, write binlog choose pump %v", c.Pumps.AvaliablePumps, pump) + } + + retryTime++ + if retryTime > c.RetryTime { + break + } + } + + time.Sleep(RetryInterval) + } + + return err +} + +// setPumpAvaliable set pump's isAvaliable, and modify UnAvaliablePumps or AvaliablePumps. +func (c *PumpsClient) setPumpAvaliable(pump *PumpStatus, avaliable bool) { + pump.IsAvaliable = avaliable + if pump.IsAvaliable { + err := pump.createGrpcClient(c.Security) + if err != nil { + Logger.Errorf("[pumps client] create grpc client for pump %s failed, error: %v", pump.NodeID, err) + pump.IsAvaliable = false + return + } + + c.Pumps.Lock() + delete(c.Pumps.UnAvaliablePumps, pump.NodeID) + if _, ok := c.Pumps.Pumps[pump.NodeID]; ok { + c.Pumps.AvaliablePumps[pump.NodeID] = pump + } + c.Pumps.Unlock() + + } else { + c.Pumps.Lock() + delete(c.Pumps.AvaliablePumps, pump.NodeID) + if _, ok := c.Pumps.Pumps[pump.NodeID]; ok { + c.Pumps.UnAvaliablePumps[pump.NodeID] = pump + } + c.Pumps.Unlock() + } + + c.Pumps.RLock() + c.Selector.SetPumps(copyPumps(c.Pumps.AvaliablePumps)) + c.Pumps.RUnlock() +} + +// addPump add a new pump. +func (c *PumpsClient) addPump(pump *PumpStatus, updateSelector bool) { + c.Pumps.Lock() + + if pump.State == node.Online { + c.Pumps.AvaliablePumps[pump.NodeID] = pump + } else { + c.Pumps.UnAvaliablePumps[pump.NodeID] = pump + } + c.Pumps.Pumps[pump.NodeID] = pump + + if updateSelector { + c.Selector.SetPumps(copyPumps(c.Pumps.AvaliablePumps)) + } + + c.Pumps.Unlock() +} + +// updatePump update pump's status, and return whether pump's IsAvaliable should be changed. +func (c *PumpsClient) updatePump(status *node.Status) (pump *PumpStatus, avaliableChanged, avaliable bool) { + var ok bool + c.Pumps.Lock() + if pump, ok = c.Pumps.Pumps[status.NodeID]; ok { + if pump.Status.State != status.State { + if status.State == node.Online { + avaliableChanged = true + avaliable = true + } else if pump.Status.State == node.Online { + avaliableChanged = true + avaliable = false + } + } + pump.Status = *status + } + c.Pumps.Unlock() + + return +} + +// removePump removes a pump. +func (c *PumpsClient) removePump(nodeID string) { + c.Pumps.Lock() + if pump, ok := c.Pumps.Pumps[nodeID]; ok { + pump.closeGrpcClient() + } + delete(c.Pumps.Pumps, nodeID) + delete(c.Pumps.UnAvaliablePumps, nodeID) + delete(c.Pumps.AvaliablePumps, nodeID) + c.Selector.SetPumps(copyPumps(c.Pumps.AvaliablePumps)) + c.Pumps.Unlock() +} + +// exist returns true if pumps client has pump matched this nodeID. +func (c *PumpsClient) exist(nodeID string) bool { + c.Pumps.RLock() + _, ok := c.Pumps.Pumps[nodeID] + c.Pumps.RUnlock() + return ok +} + +// watchStatus watchs pump's status in etcd. +func (c *PumpsClient) watchStatus() { + defer c.wg.Done() + rootPath := path.Join(node.DefaultRootPath, node.NodePrefix[node.PumpNode]) + rch := c.EtcdRegistry.WatchNode(c.ctx, rootPath) + for { + select { + case <-c.ctx.Done(): + Logger.Info("[pumps client] watch status finished") + return + case wresp := <-rch: + for _, ev := range wresp.Events { + status := &node.Status{} + err := json.Unmarshal(ev.Kv.Value, &status) + if err != nil { + Logger.Errorf("[pumps client] unmarshal pump status %q failed", ev.Kv.Value) + continue + } + + switch ev.Type { + case mvccpb.PUT: + if !c.exist(status.NodeID) { + Logger.Infof("[pumps client] find a new pump %s", status.NodeID) + c.addPump(NewPumpStatus(status, c.Security), true) + continue + } + + pump, avaliableChanged, avaliable := c.updatePump(status) + if avaliableChanged { + Logger.Infof("[pumps client] pump %s's state is changed to %s", pump.Status.NodeID, status.State) + c.setPumpAvaliable(pump, avaliable) + } + + case mvccpb.DELETE: + // now will not delete pump node in fact, just for compatibility. + nodeID := node.AnalyzeNodeID(string(ev.Kv.Key)) + Logger.Infof("[pumps client] remove pump %s", nodeID) + c.removePump(nodeID) + } + } + } + } +} + +// detect send detect binlog to pumps with online state in UnAvaliablePumps, +func (c *PumpsClient) detect() { + defer c.wg.Done() + for { + select { + case <-c.ctx.Done(): + Logger.Infof("[pumps client] heartbeat finished") + return + default: + // send detect binlog to pump, if this pump can return response without error + // means this pump is avaliable. + needCheckPumps := make([]*PumpStatus, 0, len(c.Pumps.UnAvaliablePumps)) + checkPassPumps := make([]*PumpStatus, 0, 1) + req := &pb.WriteBinlogReq{ClusterID: c.ClusterID, Payload: nil} + c.Pumps.RLock() + for _, pump := range c.Pumps.UnAvaliablePumps { + if pump.Status.State == node.Online { + needCheckPumps = append(needCheckPumps, pump) + } + } + c.Pumps.RUnlock() + + for _, pump := range needCheckPumps { + err := pump.createGrpcClient(c.Security) + if err != nil { + Logger.Errorf("[pumps client] create grpc client for pump %s failed, error %v", pump.NodeID, errors.Trace(err)) + continue + } + if pump.Client == nil { + continue + } + + _, err = pump.writeBinlog(req, c.BinlogWriteTimeout) + if err == nil { + checkPassPumps = append(checkPassPumps, pump) + } else { + Logger.Errorf("[pumps client] write detect binlog to pump %s error %v", pump.NodeID, err) + } + } + + for _, pump := range checkPassPumps { + c.setPumpAvaliable(pump, true) + } + + time.Sleep(CheckInterval) + } + } +} + +// Close closes the PumpsClient. +func (c *PumpsClient) Close() { + Logger.Infof("[pumps client] is closing") + c.cancel() + c.wg.Wait() + Logger.Infof("[pumps client] is closed") +} + +func isRetryableError(err error) bool { + // ResourceExhausted is a error code in grpc. + // ResourceExhausted indicates some resource has been exhausted, perhaps + // a per-user quota, or perhaps the entire file system is out of space. + // https://github.com/grpc/grpc-go/blob/9cc4fdbde2304827ffdbc7896f49db40c5536600/codes/codes.go#L76 + if strings.Contains(err.Error(), "ResourceExhausted") { + return false + } + + return true +} + +func copyPumps(pumps map[string]*PumpStatus) []*PumpStatus { + ps := make([]*PumpStatus, 0, len(pumps)) + for _, pump := range pumps { + ps = append(ps, pump) + } + + return ps +} diff --git a/vendor/github.com/pingcap/tidb-tools/tidb-binlog/pump_client/pump.go b/vendor/github.com/pingcap/tidb-tools/tidb-binlog/pump_client/pump.go new file mode 100644 index 0000000000000..0a509b004729b --- /dev/null +++ b/vendor/github.com/pingcap/tidb-tools/tidb-binlog/pump_client/pump.go @@ -0,0 +1,120 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + "crypto/tls" + "net" + "time" + + "github.com/pingcap/tidb-tools/tidb-binlog/node" + pb "github.com/pingcap/tipb/go-binlog" + "github.com/pkg/errors" + "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" +) + +// PumpStatus saves pump's status. +type PumpStatus struct { + /* + Pump has these state: + Online: + only when pump's state is online that pumps client can write binlog to. + Pausing: + this pump is pausing, and can't provide write binlog service. And this state will turn into Paused when pump is quit. + Paused: + this pump is paused, and can't provide write binlog service. + Closing: + this pump is closing, and can't provide write binlog service. And this state will turn into Offline when pump is quit. + Offline: + this pump is offline, and can't provide write binlog service forever. + */ + node.Status + + // the pump is avaliable or not + IsAvaliable bool + + grpcConn *grpc.ClientConn + + // the client of this pump + Client pb.PumpClient +} + +// NewPumpStatus returns a new PumpStatus according to node's status. +func NewPumpStatus(status *node.Status, security *tls.Config) *PumpStatus { + pumpStatus := &PumpStatus{} + pumpStatus.Status = *status + pumpStatus.IsAvaliable = (status.State == node.Online) + + if status.State != node.Online { + return pumpStatus + } + + err := pumpStatus.createGrpcClient(security) + if err != nil { + Logger.Errorf("[pumps client] create grpc client for %s failed, error %v", status.NodeID, err) + pumpStatus.IsAvaliable = false + } + + return pumpStatus +} + +// createGrpcClient create grpc client for online pump. +func (p *PumpStatus) createGrpcClient(security *tls.Config) error { + // release the old connection, and create a new one + if p.grpcConn != nil { + p.grpcConn.Close() + } + + dialerOpt := grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { + return net.DialTimeout("tcp", addr, timeout) + }) + Logger.Debugf("[pumps client] create gcpc client at %s", p.Addr) + var clientConn *grpc.ClientConn + var err error + if security != nil { + clientConn, err = grpc.Dial(p.Addr, dialerOpt, grpc.WithTransportCredentials(credentials.NewTLS(security))) + } else { + clientConn, err = grpc.Dial(p.Addr, dialerOpt, grpc.WithInsecure()) + } + if err != nil { + return err + } + + p.grpcConn = clientConn + p.Client = pb.NewPumpClient(clientConn) + + return nil +} + +// closeGrpcClient closes the pump's grpc connection. +func (p *PumpStatus) closeGrpcClient() { + if p.grpcConn != nil { + p.grpcConn.Close() + p.Client = nil + } +} + +func (p *PumpStatus) writeBinlog(req *pb.WriteBinlogReq, timeout time.Duration) (*pb.WriteBinlogResp, error) { + if p.Client == nil { + return nil, errors.Errorf("pump %s don't have avaliable pump client", p.NodeID) + } + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + resp, err := p.Client.WriteBinlog(ctx, req) + cancel() + + return resp, err +} diff --git a/vendor/github.com/pingcap/tidb-tools/tidb-binlog/pump_client/selector.go b/vendor/github.com/pingcap/tidb-tools/tidb-binlog/pump_client/selector.go new file mode 100644 index 0000000000000..343c250e1cc67 --- /dev/null +++ b/vendor/github.com/pingcap/tidb-tools/tidb-binlog/pump_client/selector.go @@ -0,0 +1,274 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + "hash/fnv" + "strconv" + "sync" + + pb "github.com/pingcap/tipb/go-binlog" +) + +const ( + // Range means range algorithm. + Range = "range" + + // Hash means hash algorithm. + Hash = "hash" + + // Score means choose pump by it's score. + Score = "score" +) + +// PumpSelector selects pump for sending binlog. +type PumpSelector interface { + // SetPumps set pumps to be selected. + SetPumps([]*PumpStatus) + + // Select returns a situable pump. + Select(*pb.Binlog) *PumpStatus + + // returns the next pump. + Next(*pb.Binlog, int) *PumpStatus +} + +// HashSelector select a pump by hash. +type HashSelector struct { + sync.RWMutex + + // TsMap saves the map of start_ts with pump when send prepare binlog. + // And Commit binlog should send to the same pump. + TsMap map[int64]*PumpStatus + + // PumpMap saves the map of pump's node id with pump. + PumpMap map[string]*PumpStatus + + // the pumps to be selected. + Pumps []*PumpStatus +} + +// NewHashSelector returns a new HashSelector. +func NewHashSelector() PumpSelector { + return &HashSelector{ + TsMap: make(map[int64]*PumpStatus), + PumpMap: make(map[string]*PumpStatus), + Pumps: make([]*PumpStatus, 0, 10), + } +} + +// SetPumps implement PumpSelector.SetPumps. +func (h *HashSelector) SetPumps(pumps []*PumpStatus) { + h.Lock() + h.PumpMap = make(map[string]*PumpStatus) + h.Pumps = pumps + for _, pump := range pumps { + h.PumpMap[pump.NodeID] = pump + } + h.Unlock() +} + +// Select implement PumpSelector.Select. +func (h *HashSelector) Select(binlog *pb.Binlog) *PumpStatus { + // TODO: use status' label to match situale pump. + h.Lock() + defer h.Unlock() + + if pump, ok := h.TsMap[binlog.StartTs]; ok { + // binlog is commit binlog or rollback binlog, choose the same pump by start ts map. + delete(h.TsMap, binlog.StartTs) + return pump + } + + if len(h.Pumps) == 0 { + return nil + } + + if binlog.Tp == pb.BinlogType_Prewrite { + pump := h.Pumps[hashTs(binlog.StartTs)%len(h.Pumps)] + h.TsMap[binlog.StartTs] = pump + return pump + } + + // can't find pump in ts map, or unkow binlog type, choose a new one. + return h.Pumps[hashTs(binlog.StartTs)%len(h.Pumps)] +} + +// Next implement PumpSelector.Next. Only for Prewrite binlog. +func (h *HashSelector) Next(binlog *pb.Binlog, retryTime int) *PumpStatus { + h.Lock() + defer h.Unlock() + + if len(h.Pumps) == 0 { + return nil + } + + nextPump := h.Pumps[(hashTs(binlog.StartTs)+int(retryTime))%len(h.Pumps)] + if binlog.Tp == pb.BinlogType_Prewrite { + h.TsMap[binlog.StartTs] = nextPump + } + + return nextPump +} + +// RangeSelector select a pump by range. +type RangeSelector struct { + sync.RWMutex + + // Offset saves the offset in Pumps. + Offset int + + // TsMap saves the map of start_ts with pump when send prepare binlog. + // And Commit binlog should send to the same pump. + TsMap map[int64]*PumpStatus + + // PumpMap saves the map of pump's node id with pump. + PumpMap map[string]*PumpStatus + + // the pumps to be selected. + Pumps []*PumpStatus +} + +// NewRangeSelector returns a new ScoreSelector. +func NewRangeSelector() PumpSelector { + return &RangeSelector{ + Offset: 0, + TsMap: make(map[int64]*PumpStatus), + PumpMap: make(map[string]*PumpStatus), + Pumps: make([]*PumpStatus, 0, 10), + } +} + +// SetPumps implement PumpSelector.SetPumps. +func (r *RangeSelector) SetPumps(pumps []*PumpStatus) { + r.Lock() + r.PumpMap = make(map[string]*PumpStatus) + r.Pumps = pumps + for _, pump := range pumps { + r.PumpMap[pump.NodeID] = pump + } + r.Offset = 0 + r.Unlock() +} + +// Select implement PumpSelector.Select. +func (r *RangeSelector) Select(binlog *pb.Binlog) *PumpStatus { + // TODO: use status' label to match situale pump. + r.Lock() + defer func() { + if r.Offset == len(r.Pumps) { + r.Offset = 0 + } + r.Unlock() + }() + + if pump, ok := r.TsMap[binlog.StartTs]; ok { + // binlog is commit binlog or rollback binlog, choose the same pump by start ts map. + delete(r.TsMap, binlog.StartTs) + return pump + } + + if len(r.Pumps) == 0 { + return nil + } + + if r.Offset >= len(r.Pumps) { + r.Offset = 0 + } + + if binlog.Tp == pb.BinlogType_Prewrite { + pump := r.Pumps[r.Offset] + r.TsMap[binlog.StartTs] = pump + r.Offset++ + return pump + } + + // can't find pump in ts map, or the pump is not avaliable, choose a new one. + return r.Pumps[r.Offset] +} + +// Next implement PumpSelector.Next. Only for Prewrite binlog. +func (r *RangeSelector) Next(binlog *pb.Binlog, retryTime int) *PumpStatus { + r.Lock() + defer func() { + if len(r.Pumps) != 0 { + r.Offset = (r.Offset + 1) % len(r.Pumps) + } + r.Unlock() + }() + + if len(r.Pumps) == 0 { + return nil + } + + if r.Offset >= len(r.Pumps) { + r.Offset = 0 + } + + nextPump := r.Pumps[r.Offset] + if binlog.Tp == pb.BinlogType_Prewrite { + r.TsMap[binlog.StartTs] = nextPump + } + + return nextPump +} + +// ScoreSelector select a pump by pump's score. +type ScoreSelector struct{} + +// NewScoreSelector returns a new ScoreSelector. +func NewScoreSelector() PumpSelector { + return &ScoreSelector{} +} + +// SetPumps implement PumpSelector.SetPumps. +func (s *ScoreSelector) SetPumps(pumps []*PumpStatus) { + // TODO +} + +// Select implement PumpSelector.Select. +func (s *ScoreSelector) Select(binlog *pb.Binlog) *PumpStatus { + // TODO + return nil +} + +// Next implement PumpSelector.Next. Only for Prewrite binlog. +func (s *ScoreSelector) Next(binlog *pb.Binlog, retryTime int) *PumpStatus { + // TODO + return nil +} + +// NewSelector returns a PumpSelector according to the algorithm. +func NewSelector(algorithm string) PumpSelector { + var selector PumpSelector + switch algorithm { + case Range: + selector = NewRangeSelector() + case Hash: + selector = NewHashSelector() + case Score: + selector = NewScoreSelector() + default: + Logger.Warnf("unknow algorithm %s, use range as default", algorithm) + selector = NewRangeSelector() + } + + return selector +} + +func hashTs(ts int64) int { + h := fnv.New32a() + h.Write([]byte(strconv.FormatInt(ts, 10))) + return int(h.Sum32()) +} diff --git a/vendor/github.com/pkg/errors/errors.go b/vendor/github.com/pkg/errors/errors.go index 0168061fd5276..2e1d3f6289668 100644 --- a/vendor/github.com/pkg/errors/errors.go +++ b/vendor/github.com/pkg/errors/errors.go @@ -152,7 +152,8 @@ func (f *fundamental) Format(s fmt.State, verb rune) { // WithStack annotates err with a stack trace at the point WithStack was called. // If err is nil, WithStack returns nil. // -// Deprecated: use AddStack +// For most use cases this is deprecated and AddStack should be used (which will ensure just one stack trace). +// However, one may want to use this in some situations, for example to create a 2nd trace across a goroutine. func WithStack(err error) error { if err == nil { return nil @@ -173,22 +174,6 @@ func AddStack(err error) error { return WithStack(err) } -// GetStackTracer will return the first StackTracer in the causer chain. -// This function is used by AddStack to avoid creating redundant stack traces. -// -// You can also use the StackTracer interface on the returned error to get the stack trace. -func GetStackTracer(origErr error) StackTracer { - var stacked StackTracer - WalkDeep(origErr, func(err error) bool { - if stackTracer, ok := err.(StackTracer); ok { - stacked = stackTracer - return true - } - return false - }) - return stacked -} - type withStack struct { error *stack @@ -213,10 +198,11 @@ func (w *withStack) Format(s fmt.State, verb rune) { } // Wrap returns an error annotating err with a stack trace -// at the point Annotate is called, and the supplied message. -// If err is nil, Annotate returns nil. +// at the point Wrap is called, and the supplied message. +// If err is nil, Wrap returns nil. // -// Deprecated: use Annotate instead +// For most use cases this is deprecated in favor of Annotate. +// Annotate avoids creating duplicate stack traces. func Wrap(err error, message string) error { if err == nil { return nil @@ -234,10 +220,11 @@ func Wrap(err error, message string) error { } // Wrapf returns an error annotating err with a stack trace -// at the point Annotatef is call, and the format specifier. -// If err is nil, Annotatef returns nil. +// at the point Wrapf is call, and the format specifier. +// If err is nil, Wrapf returns nil. // -// Deprecated: use Annotatef instead +// For most use cases this is deprecated in favor of Annotatef. +// Annotatef avoids creating duplicate stack traces. func Wrapf(err error, format string, args ...interface{}) error { if err == nil { return nil diff --git a/vendor/github.com/pkg/errors/group.go b/vendor/github.com/pkg/errors/group.go index 003932c95e865..e5a969ab76f76 100644 --- a/vendor/github.com/pkg/errors/group.go +++ b/vendor/github.com/pkg/errors/group.go @@ -6,6 +6,15 @@ type ErrorGroup interface { Errors() []error } +// Errors uses the ErrorGroup interface to return a slice of errors. +// If the ErrorGroup interface is not implemented it returns an array containing just the given error. +func Errors(err error) []error { + if eg, ok := err.(ErrorGroup); ok { + return eg.Errors() + } + return []error{err} +} + // WalkDeep does a depth-first traversal of all errors. // Any ErrorGroup is traversed (after going deep). // The visitor function can return true to end the traversal early diff --git a/vendor/github.com/pkg/errors/juju_adaptor.go b/vendor/github.com/pkg/errors/juju_adaptor.go index 773a1970866c7..8bcfe2f37bd5b 100644 --- a/vendor/github.com/pkg/errors/juju_adaptor.go +++ b/vendor/github.com/pkg/errors/juju_adaptor.go @@ -2,16 +2,17 @@ package errors import ( "fmt" + "strings" ) // ==================== juju adaptor start ======================== -// Trace annotates err with a stack trace at the point WithStack was called. -// If err is nil or already contain stack trace return directly. +// Trace just calls AddStack. func Trace(err error) error { return AddStack(err) } +// Annotate adds a message and ensures there is a stack trace. func Annotate(err error, message string) error { if err == nil { return nil @@ -31,6 +32,7 @@ func Annotate(err error, message string) error { } } +// Annotatef adds a message and ensures there is a stack trace. func Annotatef(err error, format string, args ...interface{}) error { if err == nil { return nil @@ -51,6 +53,8 @@ func Annotatef(err error, format string, args ...interface{}) error { } // ErrorStack will format a stack trace if it is available, otherwise it will be Error() +// If the error is nil, the empty string is returned +// Note that this just calls fmt.Sprintf("%+v", err) func ErrorStack(err error) string { if err == nil { return "" @@ -58,6 +62,11 @@ func ErrorStack(err error) string { return fmt.Sprintf("%+v", err) } +// IsNotFound reports whether err was not found error. +func IsNotFound(err error) bool { + return strings.Contains(err.Error(), "not found") +} + // NotFoundf represents an error with not found message. func NotFoundf(format string, args ...interface{}) error { return Errorf(format+" not found", args...) @@ -73,4 +82,19 @@ func NotSupportedf(format string, args ...interface{}) error { return Errorf(format+" not supported", args...) } +// NotValidf represents an error with not valid message. +func NotValidf(format string, args ...interface{}) error { + return Errorf(format+" not valid", args...) +} + +// IsAlreadyExists reports whether err was already exists error. +func IsAlreadyExists(err error) bool { + return strings.Contains(err.Error(), "already exists") +} + +// AlreadyExistsf represents an error with already exists message. +func AlreadyExistsf(format string, args ...interface{}) error { + return Errorf(format+" already exists", args...) +} + // ==================== juju adaptor end ======================== diff --git a/vendor/github.com/pkg/errors/stack.go b/vendor/github.com/pkg/errors/stack.go index 6edd7e5699f79..bb1e6a84f339d 100644 --- a/vendor/github.com/pkg/errors/stack.go +++ b/vendor/github.com/pkg/errors/stack.go @@ -1,10 +1,12 @@ package errors import ( + "bytes" "fmt" "io" "path" "runtime" + "strconv" "strings" ) @@ -14,6 +16,22 @@ type StackTracer interface { StackTrace() StackTrace } +// GetStackTracer will return the first StackTracer in the causer chain. +// This function is used by AddStack to avoid creating redundant stack traces. +// +// You can also use the StackTracer interface on the returned error to get the stack trace. +func GetStackTracer(origErr error) StackTracer { + var stacked StackTracer + WalkDeep(origErr, func(err error) bool { + if stackTracer, ok := err.(StackTracer); ok { + stacked = stackTracer + return true + } + return false + }) + return stacked +} + // Frame represents a program counter inside a stack frame. type Frame uintptr @@ -56,6 +74,11 @@ func (f Frame) line() int { // GOPATH separated by \n\t (\n\t) // %+v equivalent to %+s:%d func (f Frame) Format(s fmt.State, verb rune) { + f.format(s, s, verb) +} + +// format allows stack trace printing calls to be made with a bytes.Buffer. +func (f Frame) format(w io.Writer, s fmt.State, verb rune) { switch verb { case 's': switch { @@ -63,23 +86,25 @@ func (f Frame) Format(s fmt.State, verb rune) { pc := f.pc() fn := runtime.FuncForPC(pc) if fn == nil { - io.WriteString(s, "unknown") + io.WriteString(w, "unknown") } else { file, _ := fn.FileLine(pc) - fmt.Fprintf(s, "%s\n\t%s", fn.Name(), file) + io.WriteString(w, fn.Name()) + io.WriteString(w, "\n\t") + io.WriteString(w, file) } default: - io.WriteString(s, path.Base(f.file())) + io.WriteString(w, path.Base(f.file())) } case 'd': - fmt.Fprintf(s, "%d", f.line()) + io.WriteString(w, strconv.Itoa(f.line())) case 'n': name := runtime.FuncForPC(f.pc()).Name() - io.WriteString(s, funcname(name)) + io.WriteString(w, funcname(name)) case 'v': - f.Format(s, 's') - io.WriteString(s, ":") - f.Format(s, 'd') + f.format(w, s, 's') + io.WriteString(w, ":") + f.format(w, s, 'd') } } @@ -95,23 +120,50 @@ type StackTrace []Frame // // %+v Prints filename, function, and line number for each Frame in the stack. func (st StackTrace) Format(s fmt.State, verb rune) { + var b bytes.Buffer switch verb { case 'v': switch { case s.Flag('+'): - for _, f := range st { - fmt.Fprintf(s, "\n%+v", f) + b.Grow(len(st) * stackMinLen) + for _, fr := range st { + b.WriteByte('\n') + fr.format(&b, s, verb) } case s.Flag('#'): - fmt.Fprintf(s, "%#v", []Frame(st)) + fmt.Fprintf(&b, "%#v", []Frame(st)) default: - fmt.Fprintf(s, "%v", []Frame(st)) + st.formatSlice(&b, s, verb) } case 's': - fmt.Fprintf(s, "%s", []Frame(st)) + st.formatSlice(&b, s, verb) } + io.Copy(s, &b) } +// formatSlice will format this StackTrace into the given buffer as a slice of +// Frame, only valid when called with '%s' or '%v'. +func (st StackTrace) formatSlice(b *bytes.Buffer, s fmt.State, verb rune) { + b.WriteByte('[') + if len(st) == 0 { + b.WriteByte(']') + return + } + + b.Grow(len(st) * (stackMinLen / 4)) + st[0].format(b, s, verb) + for _, fr := range st[1:] { + b.WriteByte(' ') + fr.format(b, s, verb) + } + b.WriteByte(']') +} + +// stackMinLen is a best-guess at the minimum length of a stack trace. It +// doesn't need to be exact, just give a good enough head start for the buffer +// to avoid the expensive early growth. +const stackMinLen = 96 + // stack represents a stack of program counters. type stack []uintptr @@ -120,10 +172,14 @@ func (s *stack) Format(st fmt.State, verb rune) { case 'v': switch { case st.Flag('+'): + var b bytes.Buffer + b.Grow(len(*s) * stackMinLen) for _, pc := range *s { f := Frame(pc) - fmt.Fprintf(st, "\n%+v", f) + b.WriteByte('\n') + f.format(&b, st, 'v') } + io.Copy(st, &b) } } }