diff --git a/vendor/github.com/coreos/etcd/pkg/fileutil/dir_unix.go b/vendor/github.com/coreos/etcd/pkg/fileutil/dir_unix.go new file mode 100644 index 0000000000..58a77dfc1a --- /dev/null +++ b/vendor/github.com/coreos/etcd/pkg/fileutil/dir_unix.go @@ -0,0 +1,22 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build !windows + +package fileutil + +import "os" + +// OpenDir opens a directory for syncing. +func OpenDir(path string) (*os.File, error) { return os.Open(path) } diff --git a/vendor/github.com/coreos/etcd/pkg/fileutil/dir_windows.go b/vendor/github.com/coreos/etcd/pkg/fileutil/dir_windows.go new file mode 100644 index 0000000000..c123395c00 --- /dev/null +++ b/vendor/github.com/coreos/etcd/pkg/fileutil/dir_windows.go @@ -0,0 +1,46 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build windows + +package fileutil + +import ( + "os" + "syscall" +) + +// OpenDir opens a directory in windows with write access for syncing. +func OpenDir(path string) (*os.File, error) { + fd, err := openDir(path) + if err != nil { + return nil, err + } + return os.NewFile(uintptr(fd), path), nil +} + +func openDir(path string) (fd syscall.Handle, err error) { + if len(path) == 0 { + return syscall.InvalidHandle, syscall.ERROR_FILE_NOT_FOUND + } + pathp, err := syscall.UTF16PtrFromString(path) + if err != nil { + return syscall.InvalidHandle, err + } + access := uint32(syscall.GENERIC_READ | syscall.GENERIC_WRITE) + sharemode := uint32(syscall.FILE_SHARE_READ | syscall.FILE_SHARE_WRITE) + createmode := uint32(syscall.OPEN_EXISTING) + fl := uint32(syscall.FILE_FLAG_BACKUP_SEMANTICS) + return syscall.CreateFile(pathp, access, sharemode, nil, createmode, fl, 0) +} diff --git a/vendor/github.com/coreos/etcd/pkg/ioutil/pagewriter.go b/vendor/github.com/coreos/etcd/pkg/ioutil/pagewriter.go new file mode 100644 index 0000000000..ed22d94201 --- /dev/null +++ b/vendor/github.com/coreos/etcd/pkg/ioutil/pagewriter.go @@ -0,0 +1,103 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ioutil + +import ( + "io" +) + +var defaultBufferBytes = 128 * 1024 + +// PageWriter implements the io.Writer interface so that writes will +// either be in page chunks or from flushing. +type PageWriter struct { + w io.Writer + // pageOffset tracks the page offset of the base of the buffer + pageOffset int + // pageBytes is the number of bytes per page + pageBytes int + // bufferedBytes counts the number of bytes pending for write in the buffer + bufferedBytes int + // buf holds the write buffer + buf []byte + // bufWatermarkBytes is the number of bytes the buffer can hold before it needs + // to be flushed. It is less than len(buf) so there is space for slack writes + // to bring the writer to page alignment. + bufWatermarkBytes int +} + +func NewPageWriter(w io.Writer, pageBytes int) *PageWriter { + return &PageWriter{ + w: w, + pageBytes: pageBytes, + buf: make([]byte, defaultBufferBytes+pageBytes), + bufWatermarkBytes: defaultBufferBytes, + } +} + +func (pw *PageWriter) Write(p []byte) (n int, err error) { + if len(p)+pw.bufferedBytes <= pw.bufWatermarkBytes { + // no overflow + copy(pw.buf[pw.bufferedBytes:], p) + pw.bufferedBytes += len(p) + return len(p), nil + } + // complete the slack page in the buffer if unaligned + slack := pw.pageBytes - ((pw.pageOffset + pw.bufferedBytes) % pw.pageBytes) + if slack != pw.pageBytes { + partial := slack > len(p) + if partial { + // not enough data to complete the slack page + slack = len(p) + } + // special case: writing to slack page in buffer + copy(pw.buf[pw.bufferedBytes:], p[:slack]) + pw.bufferedBytes += slack + n = slack + p = p[slack:] + if partial { + // avoid forcing an unaligned flush + return n, nil + } + } + // buffer contents are now page-aligned; clear out + if err = pw.Flush(); err != nil { + return n, err + } + // directly write all complete pages without copying + if len(p) > pw.pageBytes { + pages := len(p) / pw.pageBytes + c, werr := pw.w.Write(p[:pages*pw.pageBytes]) + n += c + if werr != nil { + return n, werr + } + p = p[pages*pw.pageBytes:] + } + // write remaining tail to buffer + c, werr := pw.Write(p) + n += c + return n, werr +} + +func (pw *PageWriter) Flush() error { + if pw.bufferedBytes == 0 { + return nil + } + _, err := pw.w.Write(pw.buf[:pw.bufferedBytes]) + pw.pageOffset = (pw.pageOffset + pw.bufferedBytes) % pw.pageBytes + pw.bufferedBytes = 0 + return err +} diff --git a/vendor/github.com/coreos/etcd/pkg/pbutil/pbutil.go b/vendor/github.com/coreos/etcd/pkg/pbutil/pbutil.go index 8f96b4d549..b618988c43 100644 --- a/vendor/github.com/coreos/etcd/pkg/pbutil/pbutil.go +++ b/vendor/github.com/coreos/etcd/pkg/pbutil/pbutil.go @@ -18,7 +18,7 @@ package pbutil import "github.com/coreos/pkg/capnslog" var ( - plog = capnslog.NewPackageLogger("github.com/coreos/etcd/pkg", "flags") + plog = capnslog.NewPackageLogger("github.com/coreos/etcd/pkg", "pbutil") ) type Marshaler interface { diff --git a/vendor/github.com/coreos/etcd/raft/progress.go b/vendor/github.com/coreos/etcd/raft/progress.go index 3954705347..71cb85772f 100644 --- a/vendor/github.com/coreos/etcd/raft/progress.go +++ b/vendor/github.com/coreos/etcd/raft/progress.go @@ -189,8 +189,7 @@ type inflights struct { func newInflights(size int) *inflights { return &inflights{ - size: size, - buffer: make([]uint64, size), + size: size, } } @@ -203,10 +202,28 @@ func (in *inflights) add(inflight uint64) { if next >= in.size { next -= in.size } + if next >= len(in.buffer) { + in.growBuf() + } in.buffer[next] = inflight in.count++ } +// grow the inflight buffer by doubling up to inflights.size. We grow on demand +// instead of preallocating to inflights.size to handle systems which have +// thousands of Raft groups per process. +func (in *inflights) growBuf() { + newSize := len(in.buffer) * 2 + if newSize == 0 { + newSize = 1 + } else if newSize > in.size { + newSize = in.size + } + newBuffer := make([]uint64, newSize) + copy(newBuffer, in.buffer) + in.buffer = newBuffer +} + // freeTo frees the inflights smaller or equal to the given `to` flight. func (in *inflights) freeTo(to uint64) { if in.count == 0 || to < in.buffer[in.start] { @@ -228,6 +245,11 @@ func (in *inflights) freeTo(to uint64) { // free i inflights and set new start index in.count -= i in.start = idx + if in.count == 0 { + // inflights is empty, reset the start index so that we don't grow the + // buffer unnecessarily. + in.start = 0 + } } func (in *inflights) freeFirstOne() { in.freeTo(in.buffer[in.start]) } diff --git a/vendor/github.com/coreos/etcd/raft/raft.go b/vendor/github.com/coreos/etcd/raft/raft.go index 740c832b8f..f236281e7c 100644 --- a/vendor/github.com/coreos/etcd/raft/raft.go +++ b/vendor/github.com/coreos/etcd/raft/raft.go @@ -22,6 +22,8 @@ import ( "math/rand" "sort" "strings" + "sync" + "time" pb "github.com/coreos/etcd/raft/raftpb" ) @@ -45,6 +47,25 @@ const ( campaignTransfer CampaignType = "CampaignTransfer" ) +// lockedRand is a small wrapper around rand.Rand to provide +// synchronization. Only the methods needed by the code are exposed +// (e.g. Intn). +type lockedRand struct { + mu sync.Mutex + rand *rand.Rand +} + +func (r *lockedRand) Intn(n int) int { + r.mu.Lock() + v := r.rand.Intn(n) + r.mu.Unlock() + return v +} + +var globalRand = &lockedRand{ + rand: rand.New(rand.NewSource(time.Now().UnixNano())), +} + // CampaignType represents the type of campaigning // the reason we use the type of string instead of uint64 // is because it's simpler to compare and fill in raft entries @@ -205,7 +226,6 @@ type raft struct { // when raft changes its state to follower or candidate. randomizedElectionTimeout int - rand *rand.Rand tick func() step stepFunc @@ -244,7 +264,6 @@ func newRaft(c *Config) *raft { logger: c.Logger, checkQuorum: c.CheckQuorum, } - r.rand = rand.New(rand.NewSource(int64(c.ID))) for _, p := range peers { r.prs[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight)} } @@ -598,7 +617,7 @@ func (r *raft) Step(m pb.Message) error { lead := m.From if m.Type == pb.MsgVote { force := bytes.Equal(m.Context, []byte(campaignTransfer)) - inLease := r.checkQuorum && r.state != StateCandidate && r.electionElapsed < r.electionTimeout + inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout if !force && inLease { // If a server receives a RequestVote request within the minimum election timeout // of hearing from a current leader, it does not update its term or grant its vote @@ -1024,7 +1043,7 @@ func (r *raft) pastElectionTimeout() bool { } func (r *raft) resetRandomizedElectionTimeout() { - r.randomizedElectionTimeout = r.electionTimeout + r.rand.Intn(r.electionTimeout) + r.randomizedElectionTimeout = r.electionTimeout + globalRand.Intn(r.electionTimeout) } // checkQuorumActive returns true if the quorum is active from diff --git a/vendor/github.com/coreos/etcd/wal/doc.go b/vendor/github.com/coreos/etcd/wal/doc.go index 031a043a3d..a3abd69613 100644 --- a/vendor/github.com/coreos/etcd/wal/doc.go +++ b/vendor/github.com/coreos/etcd/wal/doc.go @@ -25,7 +25,7 @@ to it with the Save method: ... err := w.Save(s, ents) -After saving an raft snapshot to disk, SaveSnapshot method should be called to +After saving a raft snapshot to disk, SaveSnapshot method should be called to record it. So WAL can match with the saved snapshot when restarting. err := w.SaveSnapshot(walpb.Snapshot{Index: 10, Term: 2}) diff --git a/vendor/github.com/coreos/etcd/wal/encoder.go b/vendor/github.com/coreos/etcd/wal/encoder.go index fdeceaf800..edbd1785ab 100644 --- a/vendor/github.com/coreos/etcd/wal/encoder.go +++ b/vendor/github.com/coreos/etcd/wal/encoder.go @@ -15,19 +15,24 @@ package wal import ( - "bufio" "encoding/binary" "hash" "io" "sync" "github.com/coreos/etcd/pkg/crc" + "github.com/coreos/etcd/pkg/ioutil" "github.com/coreos/etcd/wal/walpb" ) +// walPageBytes is the alignment for flushing records to the backing Writer. +// It should be a multiple of the minimum sector size so that WAL repair can +// safely between torn writes and ordinary data corruption. +const walPageBytes = 8 * minSectorSize + type encoder struct { mu sync.Mutex - bw *bufio.Writer + bw *ioutil.PageWriter crc hash.Hash32 buf []byte @@ -36,7 +41,7 @@ type encoder struct { func newEncoder(w io.Writer, prevCrc uint32) *encoder { return &encoder{ - bw: bufio.NewWriter(w), + bw: ioutil.NewPageWriter(w, walPageBytes), crc: crc.New(prevCrc, crcTable), // 1MB buffer buf: make([]byte, 1024*1024), diff --git a/vendor/github.com/coreos/etcd/wal/wal.go b/vendor/github.com/coreos/etcd/wal/wal.go index 13193c064a..377f35fef4 100644 --- a/vendor/github.com/coreos/etcd/wal/wal.go +++ b/vendor/github.com/coreos/etcd/wal/wal.go @@ -69,7 +69,11 @@ var ( // A just opened WAL is in read mode, and ready for reading records. // The WAL will be ready for appending after reading out all the previous records. type WAL struct { - dir string // the living directory of the underlay files + dir string // the living directory of the underlay files + + // dirFile is a fd for the wal directory for syncing on Rename + dirFile *os.File + metadata []byte // metadata recorded at the head of each WAL state raftpb.HardState // hardstate recorded at the head of WAL @@ -108,10 +112,10 @@ func Create(dirpath string, metadata []byte) (*WAL, error) { if err != nil { return nil, err } - if _, err := f.Seek(0, os.SEEK_END); err != nil { + if _, err = f.Seek(0, os.SEEK_END); err != nil { return nil, err } - if err := fileutil.Preallocate(f.File, SegmentSizeBytes, true); err != nil { + if err = fileutil.Preallocate(f.File, SegmentSizeBytes, true); err != nil { return nil, err } @@ -121,17 +125,33 @@ func Create(dirpath string, metadata []byte) (*WAL, error) { encoder: newEncoder(f, 0), } w.locks = append(w.locks, f) - if err := w.saveCrc(0); err != nil { + if err = w.saveCrc(0); err != nil { + return nil, err + } + if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil { return nil, err } - if err := w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil { + if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil { return nil, err } - if err := w.SaveSnapshot(walpb.Snapshot{}); err != nil { + + if w, err = w.renameWal(tmpdirpath); err != nil { return nil, err } - return w.renameWal(tmpdirpath) + // directory was renamed; sync parent dir to persist rename + pdir, perr := fileutil.OpenDir(path.Dir(w.dir)) + if perr != nil { + return nil, perr + } + if perr = fileutil.Fsync(pdir); perr != nil { + return nil, perr + } + if perr = pdir.Close(); err != nil { + return nil, perr + } + + return w, nil } // Open opens the WAL at the given snap. @@ -141,7 +161,14 @@ func Create(dirpath string, metadata []byte) (*WAL, error) { // the given snap. The WAL cannot be appended to before reading out all of its // previous records. func Open(dirpath string, snap walpb.Snapshot) (*WAL, error) { - return openAtIndex(dirpath, snap, true) + w, err := openAtIndex(dirpath, snap, true) + if err != nil { + return nil, err + } + if w.dirFile, err = fileutil.OpenDir(w.dir); err != nil { + return nil, err + } + return w, nil } // OpenForRead only opens the wal files for read. @@ -373,6 +400,10 @@ func (w *WAL) cut() error { if err = os.Rename(newTail.Name(), fpath); err != nil { return err } + if err = fileutil.Fsync(w.dirFile); err != nil { + return err + } + newTail.Close() if newTail, err = fileutil.LockFile(fpath, os.O_WRONLY, fileutil.PrivateFileMode); err != nil { @@ -475,6 +506,11 @@ func (w *WAL) Close() error { plog.Errorf("failed to unlock during closing wal: %s", err) } } + + if err := w.dirFile.Close(); err != nil { + return err + } + return nil } diff --git a/vendor/github.com/coreos/etcd/wal/wal_unix.go b/vendor/github.com/coreos/etcd/wal/wal_unix.go index 101ea6acc3..82fd6a17a7 100644 --- a/vendor/github.com/coreos/etcd/wal/wal_unix.go +++ b/vendor/github.com/coreos/etcd/wal/wal_unix.go @@ -16,7 +16,11 @@ package wal -import "os" +import ( + "os" + + "github.com/coreos/etcd/pkg/fileutil" +) func (w *WAL) renameWal(tmpdirpath string) (*WAL, error) { // On non-Windows platforms, hold the lock while renaming. Releasing @@ -34,5 +38,7 @@ func (w *WAL) renameWal(tmpdirpath string) (*WAL, error) { } w.fp = newFilePipeline(w.dir, SegmentSizeBytes) - return w, nil + df, err := fileutil.OpenDir(w.dir) + w.dirFile = df + return w, err }