Skip to content

Commit

Permalink
API: koding/kite -> http
Browse files Browse the repository at this point in the history
  • Loading branch information
boz committed Apr 25, 2017
1 parent 0e829d9 commit 8f6a426
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 90 deletions.
6 changes: 2 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,7 @@ This is an alias for `redis.exec` with a default command of `"FLUSHALL"`.
* Allow yaml
* Allow built-in defaults (postgres, redis, etc...)
* Polish/Optimize/Cleanup UI.
* Use simple JSON API instead of [koding/kite](https://github.com/koding/kite).
* Re-add remote actions
* Use websockets instead of [koding/kite](https://github.com/koding/kite)
* Nodejs client example
* Re-add remote actions (websockets API)
* Clients: nodejs, ruby, python, etc...
* Documentation
* Tests
6 changes: 3 additions & 3 deletions example/demo.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"pools": {
"redis": {
"image": "redis",
"size": 3,
"size": 5,
"port": 6379,
"params": {
"database": "0",
Expand All @@ -21,7 +21,7 @@
},
"vault": {
"image": "vault",
"size": 3,
"size": 5,
"port": 8200,
"container": {
"env": ["SKIP_SETCAP=1"]
Expand All @@ -37,7 +37,7 @@
},
"postgres": {
"image": "postgres",
"size": 3,
"size": 5,
"port": 5432,
"params": {
"username": "postgres",
Expand Down
82 changes: 53 additions & 29 deletions net/client.go
Original file line number Diff line number Diff line change
@@ -1,65 +1,89 @@
package net

import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"strconv"

"github.com/boz/ephemerald/params"
"github.com/koding/kite"
)

type ClientBuilder struct {
kclient *kite.Client
kite *kite.Kite
var (
DefaultConnectAddress = net.JoinHostPort("localhost", strconv.Itoa(DefaultPort))
)

host string
port int
type ClientBuilder struct {
address string
}

type Client struct {
kclient *kite.Client
address string
}

func NewClientBuilder() *ClientBuilder {
k := kite.New(kiteName+"-client", kiteVersion)
c := k.NewClient("")
// XXX: race condition
//k.SetLogLevel(kite.DEBUG)
c.Concurrent = true
c.ConcurrentCallbacks = true
return &ClientBuilder{c, k, "localhost", DefaultPort}
return &ClientBuilder{DefaultConnectAddress}
}

func (b *ClientBuilder) WithHost(host string) *ClientBuilder {
b.host = host
func (b *ClientBuilder) WithAddress(address string) *ClientBuilder {
b.address = address
return b
}

func (b *ClientBuilder) WithPort(port int) *ClientBuilder {
b.port = port
address, _, _ := net.SplitHostPort(b.address)
b.address = net.JoinHostPort(address, strconv.Itoa(port))
return b
}

func (b *ClientBuilder) Create() (*Client, error) {
b.kclient.URL = fmt.Sprintf("http://%v:%v/kite", b.host, b.port)
b.kite.Config.Environment = b.host

if err := b.kclient.Dial(); err != nil {
return nil, err
}
return &Client{b.kclient}, nil
return &Client{b.address}, nil
}

func (c *Client) Checkout(names ...string) (params.Set, error) {
ps := params.Set{}
response, err := c.kclient.Tell(rpcCheckoutName, names)

req, err := http.NewRequest("PUT", c.url(rpcCheckoutName), &bytes.Buffer{})

if err != nil {
return ps, err
}

client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return ps, err
}
response.MustUnmarshal(&ps)
return ps, nil
defer resp.Body.Close()

buf, err := ioutil.ReadAll(resp.Body)
if err != nil {
return ps, err
}

err = json.Unmarshal(buf, &ps)
return ps, err
}

func (c *Client) Return(ps params.Set) error {
_, err := c.kclient.Tell(rpcReturnName, ps)
return err
buf, err := json.Marshal(ps)
if err != nil {
return err
}

req, err := http.NewRequest("PUT", c.url(rpcReturnName), bytes.NewBuffer(buf))
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
return nil
}

func (c *Client) url(path string) string {
return fmt.Sprintf("http://%v%v", c.address, path)
}
14 changes: 2 additions & 12 deletions net/net.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,6 @@
package net

import "os"

const (
rpcCheckoutName = "checkout"
rpcReturnName = "return"
rpcCheckoutName = "/checkout"
rpcReturnName = "/return"
)

// XXX: prevent koding/logging race condition

func init() {
if os.Getenv("KITE_LOG_LEVEL") == "" {
os.Setenv("KITE_LOG_LEVEL", "DEBUG")
}
}
2 changes: 0 additions & 2 deletions net/net_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,13 @@ func TestClientServer(t *testing.T) {
require.NoError(t, err)
}

readych := server.ServerReadyNotify()
donech := server.ServerCloseNotify()
defer func() {
<-donech
}()
defer server.Close()

go server.Run()
<-readych

client, err := net.NewClientBuilder().
WithPort(server.Port()).
Expand Down
111 changes: 73 additions & 38 deletions net/server.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,39 @@
package net

import (
"encoding/json"
"fmt"
"net"
"net/http"
"strconv"

"github.com/boz/ephemerald"
"github.com/boz/ephemerald/params"
"github.com/koding/kite"
"github.com/gorilla/mux"
)

const (
kiteName = "ephemerald"
kiteVersion = "0.0.1"
DefaultPort = 6000
DefaultPort = 6000
DefaultAddress = ":6000"
)

type Server struct {
kite *kite.Kite
l *net.TCPListener
srv *http.Server

pools ephemerald.PoolSet

closech chan bool
}

type ServerBuilder struct {
port int
pools ephemerald.PoolSet
address string
pools ephemerald.PoolSet
}

func NewServerBuilder() *ServerBuilder {
return &ServerBuilder{
port: DefaultPort,
address: DefaultAddress,
}
}

Expand All @@ -36,66 +42,77 @@ func (sb *ServerBuilder) WithPoolSet(pools ephemerald.PoolSet) *ServerBuilder {
return sb
}

func (sb *ServerBuilder) WithAddress(address string) *ServerBuilder {
sb.address = address
return sb
}

func (sb *ServerBuilder) WithPort(port int) *ServerBuilder {
sb.port = port
address, _, _ := net.SplitHostPort(sb.address)
sb.address = net.JoinHostPort(address, strconv.Itoa(port))
return sb
}

func (sb *ServerBuilder) Create() (*Server, error) {
k := kite.New(kiteName, kiteVersion)

k.Config.Port = sb.port
k.Config.DisableAuthentication = true

s := &Server{
kite: k,
server := &Server{
closech: make(chan bool),
pools: sb.pools,
}

s.kite.HandleFunc(rpcCheckoutName, s.handleCheckout)
s.kite.HandleFunc(rpcReturnName, s.handleReturn)
r := mux.NewRouter()

r.HandleFunc(rpcCheckoutName, server.handleCheckout).
Methods("PUT")

r.HandleFunc(rpcReturnName, server.handleReturn).
Methods("PUT")

l, err := net.Listen("tcp", sb.address)
if err != nil {
return nil, err
}

server.l = l.(*net.TCPListener)

return s, nil
server.srv = &http.Server{
Handler: r,
}

return server, nil
}

func (s *Server) Run() {
defer close(s.closech)
ch := s.kite.ServerCloseNotify()
go s.kite.Run()
<-ch
s.srv.Serve(s.l)
s.stopPools()
}

func (s *Server) Close() {
s.kite.Close()
s.l.Close()
}

func (s *Server) ServerCloseNotify() chan bool {
return s.closech
}

func (s *Server) ServerReadyNotify() chan bool {
return s.kite.ServerReadyNotify()
func (s *Server) Address() string {
return s.l.Addr().String()
}

func (s *Server) Port() int {
return s.kite.Port()
_, port, _ := net.SplitHostPort(s.Address())
p, _ := strconv.Atoi(port)
return p
}

func (s *Server) stopPools() {
s.pools.Stop()
}

func (s *Server) handleCheckout(r *kite.Request) (interface{}, error) {
var names []string
r.Args.MustUnmarshal(names)
ps, err := s.pools.Checkout(names...)
if err != nil {
return ps, err
}
func (s *Server) handleCheckout(w http.ResponseWriter, r *http.Request) {
host, _, _ := net.SplitHostPort(r.Host)

host := r.Client.Environment
ps, err := s.pools.CheckoutWith(r.Context())

for name, p := range ps {
p2, e := p.ForHost(host)
Expand All @@ -105,15 +122,33 @@ func (s *Server) handleCheckout(r *kite.Request) (interface{}, error) {
}
ps[name] = p2
}

if err != nil {
s.pools.ReturnAll(ps)
http.Error(w, fmt.Sprint(err), http.StatusRequestTimeout)
return
}

buf, err := json.Marshal(ps)
if err != nil {
http.Error(w, fmt.Sprint(err), http.StatusInternalServerError)
return
}
return ps, err

w.Header().Set("Content-Type", "text/json; charset=utf-8")

w.Write(buf)
}

func (s *Server) handleReturn(r *kite.Request) (interface{}, error) {
func (s *Server) handleReturn(w http.ResponseWriter, r *http.Request) {
ps := params.Set{}
r.Args.One().MustUnmarshal(&ps)
defer r.Body.Close()
dec := json.NewDecoder(r.Body)

if err := dec.Decode(&ps); err != nil {
http.Error(w, fmt.Sprint(err), http.StatusInternalServerError)
return
}
s.pools.ReturnAll(ps)
return nil, nil
w.WriteHeader(http.StatusOK)
}
Loading

0 comments on commit 8f6a426

Please sign in to comment.