From 1e9307e88aa468fe853f319151572bc31b4aae7e Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 9 Jul 2013 13:14:12 -0700 Subject: [PATCH] clean up tree.go and watcher.go --- command.go | 16 +-- etcd.go | 11 +- handlers.go | 9 +- store/store.go | 304 +++++++++++++++++++++++++++-------------------- store/watcher.go | 63 +++++----- 5 files changed, 223 insertions(+), 180 deletions(-) diff --git a/command.go b/command.go index a43ab43675c..780e7bb6306 100644 --- a/command.go +++ b/command.go @@ -33,7 +33,7 @@ func (c *SetCommand) CommandName() string { // Set the value of key to value func (c *SetCommand) Apply(server *raft.Server) (interface{}, error) { - return store.Set(c.Key, c.Value, c.ExpireTime, server.CommitIndex()) + return etcdStore.Set(c.Key, c.Value, c.ExpireTime, server.CommitIndex()) } // TestAndSet command @@ -51,7 +51,7 @@ func (c *TestAndSetCommand) CommandName() string { // Set the value of key to value func (c *TestAndSetCommand) Apply(server *raft.Server) (interface{}, error) { - return store.TestAndSet(c.Key, c.PrevValue, c.Value, c.ExpireTime, server.CommitIndex()) + return etcdStore.TestAndSet(c.Key, c.PrevValue, c.Value, c.ExpireTime, server.CommitIndex()) } // Get command @@ -66,7 +66,7 @@ func (c *GetCommand) CommandName() string { // Set the value of key to value func (c *GetCommand) Apply(server *raft.Server) (interface{}, error) { - res := store.Get(c.Key) + res := etcdStore.Get(c.Key) return json.Marshal(res) } @@ -82,7 +82,7 @@ func (c *ListCommand) CommandName() string { // Set the value of key to value func (c *ListCommand) Apply(server *raft.Server) (interface{}, error) { - return store.List(c.Prefix) + return etcdStore.List(c.Prefix) } // Delete command @@ -97,7 +97,7 @@ func (c *DeleteCommand) CommandName() string { // Delete the key func (c *DeleteCommand) Apply(server *raft.Server) (interface{}, error) { - return store.Delete(c.Key, server.CommitIndex()) + return etcdStore.Delete(c.Key, server.CommitIndex()) } // Watch command @@ -112,13 +112,13 @@ func (c *WatchCommand) CommandName() string { } func (c *WatchCommand) Apply(server *raft.Server) (interface{}, error) { - ch := make(chan store.Response, 1) + watcher := store.CreateWatcher() // add to the watchers list - store.AddWatcher(c.Key, ch, c.SinceIndex) + etcdStore.AddWatcher(c.Key, watcher, c.SinceIndex) // wait for the notification for any changing - res := <-ch + res := <-watcher.C return json.Marshal(res) } diff --git a/etcd.go b/etcd.go index 813efe3f006..efae00ec000 100644 --- a/etcd.go +++ b/etcd.go @@ -113,9 +113,7 @@ type Info struct { var server *raft.Server var serverTransHandler transHandler -var logger *log.Logger - -var storeMsg chan string +var etcdStore *store.Store //------------------------------------------------------------------------------ // @@ -129,7 +127,6 @@ var storeMsg chan string func main() { var err error - logger = log.New(os.Stdout, "", log.LstdFlags) flag.Parse() // Setup commands. @@ -162,10 +159,10 @@ func main() { serverTransHandler = createTranHandler(st) // Setup new raft server. - s := store.CreateStore(maxSize) + etcdStore = store.CreateStore(maxSize) // create raft server - server, err = raft.NewServer(name, dirPath, serverTransHandler, s, nil) + server, err = raft.NewServer(name, dirPath, serverTransHandler, etcdStore, nil) if err != nil { fatal("%v", err) @@ -226,7 +223,7 @@ func main() { if webPort != -1 { // start web - s.SetMessager(&storeMsg) + etcdStore.SetMessager(&storeMsg) go webHelper() go web.Start(server, webPort) } diff --git a/handlers.go b/handlers.go index 1c56f5beffe..1bcd4fd91d2 100644 --- a/handlers.go +++ b/handlers.go @@ -38,6 +38,7 @@ func VoteHttpHandler(w http.ResponseWriter, req *http.Request) { func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) { aereq := &raft.AppendEntriesRequest{} err := decodeJsonRequest(req, aereq) + if err == nil { debug("[recv] POST http://%s/log/append [%d]", server.Name(), len(aereq.Entries)) if resp := server.AppendEntries(aereq); resp != nil { @@ -121,7 +122,7 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) { duration, err := strconv.Atoi(strDuration) if err != nil { - warn("raftd: Bad duration: %v", err) + warn("Bad duration: %v", err) (*w).WriteHeader(http.StatusInternalServerError) return } @@ -150,7 +151,7 @@ func TestAndSetHttpHandler(w http.ResponseWriter, req *http.Request) { duration, err := strconv.Atoi(strDuration) if err != nil { - warn("raftd: Bad duration: %v", err) + warn("Bad duration: %v", err) w.WriteHeader(http.StatusInternalServerError) return } @@ -266,7 +267,7 @@ func ListHttpHandler(w http.ResponseWriter, req *http.Request) { command.Prefix = prefix if body, err := command.Apply(server); err != nil { - warn("raftd: Unable to write file: %v", err) + warn("Unable to write file: %v", err) w.WriteHeader(http.StatusInternalServerError) return } else { @@ -309,7 +310,7 @@ func WatchHttpHandler(w http.ResponseWriter, req *http.Request) { } if body, err := command.Apply(server); err != nil { - warn("raftd: Unable to write file: %v", err) + warn("Unable to write file: %v", err) w.WriteHeader(http.StatusInternalServerError) return } else { diff --git a/store/store.go b/store/store.go index 72ff06a5bcf..d4bc4e8f49c 100644 --- a/store/store.go +++ b/store/store.go @@ -4,186 +4,205 @@ import ( "encoding/json" "fmt" "path" - "time" "strconv" + "time" ) -// global store -var s *Store - -// CONSTANTS -const ( - ERROR = -1 + iota - SET - DELETE - GET -) - -var PERMANENT = time.Unix(0, 0) +//------------------------------------------------------------------------------ +// +// Typedefs +// +//------------------------------------------------------------------------------ +// The main struct of the Key-Value store type Store struct { - // // use the build-in hash map as the key-value store structure - // Nodes map[string]Node `json:"nodes"` - // use treeMap as the key-value stroe structure + // key-value store structure Tree *tree - // the string channel to send messages to the outside world - // now we use it to send changes to the hub of the web service + + // WatcherHub is where we register all the clients + // who issue a watch request + watcher *WatcherHub + + // The string channel to send messages to the outside world + // Now we use it to send changes to the hub of the web service messager *chan string - // + // A map to keep the recent response to the clients ResponseMap map[string]Response - // + // The max number of the recent responses we can record ResponseMaxSize int + // The current number of the recent responses we have recorded ResponseCurrSize uint - // at some point, we may need to compact the Response + // The index of the first recent responses we have ResponseStartIndex uint64 - // current Index + // Current index of the raft machine Index uint64 } +// A Node represents a Value in the Key-Value pair in the store +// It has its value, expire time and a channel used to update the +// expire time (since we do countdown in a go routine, we need to +// communicate with it via channel) type Node struct { + // The string value of the node Value string `json:"value"` - // if the node is a permanent one the ExprieTime will be Unix(0,0) + // If the node is a permanent one the ExprieTime will be Unix(0,0) // Otherwise after the expireTime, the node will be deleted ExpireTime time.Time `json:"expireTime"` - // a channel to update the expireTime of the node + // A channel to update the expireTime of the node update chan time.Time `json:"-"` } +// The response from the store to the user who issue a command type Response struct { - Action int `json:"action"` - Key string `json:"key"` + Action string `json:"action"` + Key string `json:"key"` PrevValue string `json:"prevValue"` - Value string `json:"value"` + Value string `json:"value"` - // if the key existed before the action, this field should be true - // if the key did not exist before the action, this field should be false + // If the key existed before the action, this field should be true + // If the key did not exist before the action, this field should be false Exist bool `json:"exist"` Expiration time.Time `json:"expiration"` - // countdown until expiration in seconds + // Time to live in second TTL int64 `json:"ttl"` + // The command index of the raft machine when the command is executed Index uint64 `json:"index"` } +// A listNode represent the simplest Key-Value pair with its type +// It is only used when do list opeartion +// We want to have a file system like store, thus we distingush "file" +// and "directory" type ListNode struct { - Key string - Value string - Type string + Key string + Value string + Type string } -// make a new stroe +var PERMANENT = time.Unix(0, 0) + +//------------------------------------------------------------------------------ +// +// Methods +// +//------------------------------------------------------------------------------ + +// Create a new stroe +// Arguement max is the max number of response we want to record func CreateStore(max int) *Store { - s = new(Store) + s := new(Store) + s.messager = nil + s.ResponseMap = make(map[string]Response) s.ResponseStartIndex = 0 s.ResponseMaxSize = max s.ResponseCurrSize = 0 - s.Tree = &tree{ - &treeNode{ - Node { + s.Tree = &tree{ + &treeNode{ + Node{ "/", - time.Unix(0,0), + time.Unix(0, 0), nil, }, - true, + true, make(map[string]*treeNode), }, - } + } - return s -} + s.watcher = createWatcherHub() -// return a pointer to the store -func GetStore() *Store { return s } -// set the messager of the store +// Set the messager of the store func (s *Store) SetMessager(messager *chan string) { s.messager = messager } -// set the key to value, return the old value if the key exists -func Set(key string, value string, expireTime time.Time, index uint64) ([]byte, error) { +// Set the key to value with expiration time +func (s *Store) Set(key string, value string, expireTime time.Time, index uint64) ([]byte, error) { - //update index + //Update index s.Index = index - - key = "/" + key - - key = path.Clean(key) - var isExpire bool = false + key = path.Clean("/" + key) - isExpire = !expireTime.Equal(PERMANENT) + isExpire := !expireTime.Equal(PERMANENT) - // when the slow follower receive the set command + // When the slow follower receive the set command // the key may be expired, we should not add the node // also if the node exist, we need to delete the node if isExpire && expireTime.Sub(time.Now()) < 0 { - return Delete(key, index) + return s.Delete(key, index) } var TTL int64 - // update ttl + + // Update ttl if isExpire { TTL = int64(expireTime.Sub(time.Now()) / time.Second) } else { + // For permanent value, we set ttl to -1 TTL = -1 } - // get the node + // Get the node node, ok := s.Tree.get(key) if ok { - // if node is not permanent before - // update its expireTime + // Update when node exists + + // Node is not permanent if !node.ExpireTime.Equal(PERMANENT) { + // If node is not permanent + // Update its expireTime node.update <- expireTime } else { - // if we want the permanent node to have expire time - // we need to create a chan and create a go routine + + // If we want the permanent node to have expire time + // We need to create create a go routine with a channel if isExpire { node.update = make(chan time.Time) - go expire(key, node.update, expireTime) + go s.monitorExpiration(key, node.update, expireTime) } - + } - // update the information of the node + // Update the information of the node s.Tree.set(key, Node{value, expireTime, node.update}) - resp := Response{SET, key, node.Value, value, true, expireTime, TTL, index} + resp := Response{"SET", key, node.Value, value, true, expireTime, TTL, index} - msg, err := json.Marshal(resp) + s.watcher.notify(resp) - notify(resp) + msg, err := json.Marshal(resp) - // send to the messager + // Send to the messager if s.messager != nil && err == nil { *s.messager <- string(msg) } - updateMap(index, &resp) + s.addToResponseMap(index, &resp) return msg, err - // add new node + // Add new node } else { update := make(chan time.Time) @@ -191,32 +210,31 @@ func Set(key string, value string, expireTime time.Time, index uint64) ([]byte, s.Tree.set(key, Node{value, expireTime, update}) if isExpire { - go expire(key, update, expireTime) + go s.monitorExpiration(key, update, expireTime) } - resp := Response{SET, key, "", value, false, expireTime, TTL, index} + resp := Response{"SET", key, "", value, false, expireTime, TTL, index} msg, err := json.Marshal(resp) - // nofity the watcher - notify(resp) + // Nofity the watcher + s.watcher.notify(resp) - // notify the web interface + // Send to the messager if s.messager != nil && err == nil { *s.messager <- string(msg) } - updateMap(index, &resp) + s.addToResponseMap(index, &resp) return msg, err } } -// get the value of the key -func Get(key string) Response { - key = "/" + key - - key = path.Clean(key) +// Get the value of the key +func (s *Store) Get(key string) Response { + + key = path.Clean("/" + key) node, ok := s.Tree.get(key) @@ -226,22 +244,24 @@ func Get(key string) Response { isExpire = !node.ExpireTime.Equal(PERMANENT) - // update ttl + // Update ttl if isExpire { TTL = int64(node.ExpireTime.Sub(time.Now()) / time.Second) } else { TTL = -1 } - return Response{GET, key, node.Value, node.Value, true, node.ExpireTime, TTL, s.Index} - } else { + return Response{"GET", key, node.Value, node.Value, true, node.ExpireTime, TTL, s.Index} - return Response{GET, key, "", "", false, time.Unix(0, 0), 0, s.Index} + } else { + // we do not found the key + return Response{"GET", key, "", "", false, time.Unix(0, 0), 0, s.Index} } } -// // List all the item in the prefix -func List(prefix string) ([]byte, error) { +// List all the item in the prefix +func (s *Store) List(prefix string) ([]byte, error) { + nodes, keys, dirs, ok := s.Tree.list(prefix) var ln []ListNode @@ -256,14 +276,13 @@ func List(prefix string) ([]byte, error) { return json.Marshal(ln) } -// delete the key -func Delete(key string, index uint64) ([]byte, error) { - //update index - key = "/" + key +// Delete the key +func (s *Store) Delete(key string, index uint64) ([]byte, error) { - s.Index = index + key = path.Clean("/" + key) - key = path.Clean(key) + //Update index + s.Index = index node, ok := s.Tree.get(key) @@ -275,17 +294,17 @@ func Delete(key string, index uint64) ([]byte, error) { } else { - // kill the expire go routine + // Kill the expire go routine node.update <- PERMANENT s.Tree.delete(key) } - resp := Response{DELETE, key, node.Value, "", true, node.ExpireTime, 0, index} + resp := Response{"DELETE", key, node.Value, "", true, node.ExpireTime, 0, index} msg, err := json.Marshal(resp) - notify(resp) + s.watcher.notify(resp) // notify the messager if s.messager != nil && err == nil { @@ -293,50 +312,70 @@ func Delete(key string, index uint64) ([]byte, error) { *s.messager <- string(msg) } - updateMap(index, &resp) + s.addToResponseMap(index, &resp) + return msg, err } else { - resp := Response{DELETE, key, "", "", false, time.Unix(0, 0), 0, index} + resp := Response{"DELETE", key, "", "", false, time.Unix(0, 0), 0, index} - updateMap(index, &resp) + s.addToResponseMap(index, &resp) return json.Marshal(resp) } } -// set the value of the key to the value if the given prevValue is equal to the value of the key -func TestAndSet(key string, prevValue string, value string, expireTime time.Time, index uint64) ([]byte, error) { - resp := Get(key) +// Set the value of the key to the value if the given prevValue is equal to the value of the key +func (s *Store) TestAndSet(key string, prevValue string, value string, expireTime time.Time, index uint64) ([]byte, error) { + resp := s.Get(key) if resp.PrevValue == prevValue { - return Set(key, value, expireTime, index) + + // If test success, do set + return s.Set(key, value, expireTime, index) } else { + + // If fails, return the result of get which contains the current + // status of the key-value pair return json.Marshal(resp) } + } -// should be used as a go routine to delete the key when it expires -func expire(key string, update chan time.Time, expireTime time.Time) { +// Add a channel to the watchHub. +// The watchHub will send response to the channel when any key under the prefix +// changes [since the sinceIndex if given] +func (s *Store) AddWatcher(prefix string, watcher *Watcher, sinceIndex uint64) error { + return s.watcher.addWatcher(prefix, watcher, sinceIndex, s.ResponseStartIndex, s.Index, &s.ResponseMap) +} + +// This function should be created as a go routine to delete the key-value pair +// when it reaches expiration time + +func (s *Store) monitorExpiration(key string, update chan time.Time, expireTime time.Time) { + duration := expireTime.Sub(time.Now()) for { select { - // timeout delete the node + + // Timeout delete the node case <-time.After(duration): node, ok := s.Tree.get(key) + if !ok { return + } else { s.Tree.delete(key) - resp := Response{DELETE, key, node.Value, "", true, node.ExpireTime, 0, s.Index} + resp := Response{"DELETE", key, node.Value, "", true, node.ExpireTime, 0, s.Index} msg, err := json.Marshal(resp) - notify(resp) + s.watcher.notify(resp) // notify the messager if s.messager != nil && err == nil { @@ -349,21 +388,25 @@ func expire(key string, update chan time.Time, expireTime time.Time) { } case updateTime := <-update: - //update duration - // if the node become a permanent one, the go routine is + // Update duration + // If the node become a permanent one, the go routine is // not needed if updateTime.Equal(PERMANENT) { - fmt.Println("permanent") return } - // update duration + + // Update duration duration = updateTime.Sub(time.Now()) } } } -func updateMap(index uint64, resp *Response) { +// When we receive a command that will change the state of the key-value store +// We will add the result of it to the ResponseMap for the use of watch command +// Also we may remove the oldest response when we add new one +func (s *Store) addToResponseMap(index uint64, resp *Response) { + // zero case if s.ResponseMaxSize == 0 { return } @@ -372,11 +415,13 @@ func updateMap(index uint64, resp *Response) { s.ResponseMap[strIndex] = *resp // unlimited - if s.ResponseMaxSize < 0{ + if s.ResponseMaxSize < 0 { s.ResponseCurrSize++ return } + // if we reach the max point, we need to delete the most latest + // response and update the startIndex if s.ResponseCurrSize == uint(s.ResponseMaxSize) { s.ResponseStartIndex++ delete(s.ResponseMap, strconv.FormatUint(s.ResponseStartIndex, 10)) @@ -385,8 +430,7 @@ func updateMap(index uint64, resp *Response) { } } - -// save the current state of the storage system +// Save the current state of the storage system func (s *Store) Save() ([]byte, error) { b, err := json.Marshal(s) if err != nil { @@ -396,31 +440,35 @@ func (s *Store) Save() ([]byte, error) { return b, nil } -// recovery the state of the stroage system from a previous state +// Recovery the state of the stroage system from a previous state func (s *Store) Recovery(state []byte) error { err := json.Unmarshal(state, s) - // clean the expired nodes - clean() + // The only thing need to change after the recovery is the + // node with expiration time, we need to delete all the node + // that have been expired and setup go routines to monitor the + // other ones + s.checkExpiration() return err } -// clean all expired keys -func clean() { - s.Tree.traverse(cleanNode, false) +// Clean the expired nodes +// Set up go routines to mon +func (s *Store) checkExpiration() { + s.Tree.traverse(s.checkNode, false) } - -func cleanNode(key string, node *Node) { +// Check each node +func (s *Store) checkNode(key string, node *Node) { if node.ExpireTime.Equal(PERMANENT) { return } else { - if node.ExpireTime.Sub(time.Now()) >= time.Second { + node.update = make(chan time.Time) - go expire(key, node.update, node.ExpireTime) + go s.monitorExpiration(key, node.update, node.ExpireTime) } else { // we should delete this node diff --git a/store/watcher.go b/store/watcher.go index 0f55a018ea1..df590f4bad6 100644 --- a/store/watcher.go +++ b/store/watcher.go @@ -5,43 +5,44 @@ import ( "strconv" "strings" ) +//------------------------------------------------------------------------------ +// +// Typedefs +// +//------------------------------------------------------------------------------ +// WatcherHub is where the client register its watcher type WatcherHub struct { - watchers map[string][]Watcher + watchers map[string][]*Watcher } +// Currently watcher only contains a response channel type Watcher struct { - c chan Response + C chan Response } -// global watcher -var w *WatcherHub - -// init the global watcher -func init() { - w = createWatcherHub() -} - -// create a new watcher +// Create a new watcherHub func createWatcherHub() *WatcherHub { w := new(WatcherHub) - w.watchers = make(map[string][]Watcher) + w.watchers = make(map[string][]*Watcher) return w } -func GetWatcherHub() *WatcherHub { - return w +// Create a new watcher +func CreateWatcher() *Watcher { + return &Watcher{C: make(chan Response, 1)} } -// register a function with channel and prefix to the watcher -func AddWatcher(prefix string, c chan Response, sinceIndex uint64) error { +// Add a watcher to the watcherHub +func (w *WatcherHub) addWatcher(prefix string, watcher *Watcher, sinceIndex uint64, + responseStartIndex uint64, currentIndex uint64, resMap *map[string]Response) error { - prefix = "/" + path.Clean(prefix) + prefix = path.Clean("/" + prefix) - if sinceIndex != 0 && sinceIndex >= s.ResponseStartIndex { - for i := sinceIndex; i <= s.Index; i++ { - if check(prefix, i) { - c <- s.ResponseMap[strconv.FormatUint(i, 10)] + if sinceIndex != 0 && sinceIndex >= responseStartIndex { + for i := sinceIndex; i <= currentIndex; i++ { + if checkResponse(prefix, i, resMap) { + watcher.C <- (*resMap)[strconv.FormatUint(i, 10)] return nil } } @@ -51,25 +52,21 @@ func AddWatcher(prefix string, c chan Response, sinceIndex uint64) error { if !ok { - w.watchers[prefix] = make([]Watcher, 0) - - watcher := Watcher{c} + w.watchers[prefix] = make([]*Watcher, 0) w.watchers[prefix] = append(w.watchers[prefix], watcher) } else { - watcher := Watcher{c} - w.watchers[prefix] = append(w.watchers[prefix], watcher) } return nil } -// check if the response has what we are watching -func check(prefix string, index uint64) bool { +// Check if the response has what we are watching +func checkResponse(prefix string, index uint64, resMap *map[string]Response) bool { - resp, ok := s.ResponseMap[strconv.FormatUint(index, 10)] + resp, ok := (*resMap)[strconv.FormatUint(index, 10)] if !ok { // not storage system command @@ -89,8 +86,8 @@ func check(prefix string, index uint64) bool { } -// notify the watcher a action happened -func notify(resp Response) error { +// Notify the watcher a action happened +func (w *WatcherHub) notify(resp Response) error { resp.Key = path.Clean(resp.Key) segments := strings.Split(resp.Key, "/") @@ -104,10 +101,10 @@ func notify(resp Response) error { if ok { - newWatchers := make([]Watcher, 0) + newWatchers := make([]*Watcher, 0) // notify all the watchers for _, watcher := range watchers { - watcher.c <- resp + watcher.C <- resp } if len(newWatchers) == 0 {