Skip to content

Commit

Permalink
start of a v8 data importer via the cli
Browse files Browse the repository at this point in the history
  • Loading branch information
corylanou committed Aug 6, 2015
1 parent 5aacb34 commit 30a0ca0
Show file tree
Hide file tree
Showing 4 changed files with 366 additions and 49 deletions.
104 changes: 103 additions & 1 deletion client/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,66 @@ import (
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"time"

"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/tsdb"
)

const (
// DEFAULT_HOST is the default host used to connect to an InfluxDB instance
DEFAULT_HOST = "localhost"
// DEFAULT_PORT is the default port used to connect to an InfluxDB instance
DEFAULT_PORT = 8086
// DEFAULT_TIMEOUT is the default connection timeout used to connect to an InfluxDB instance
DEFAULT_TIMEOUT = 0
)

// Query is used to send a command to the server. Both Command and Database are required.
type Query struct {
Command string
Database string
}

// ParseConnectionString will parse a string to create a valid connection URL
func ParseConnectionString(path string, ssl bool) (url.URL, error) {
var host string
var port int

if strings.Contains(path, ":") {
h := strings.Split(path, ":")
i, e := strconv.Atoi(h[1])
if e != nil {
return url.URL{}, fmt.Errorf("invalid port number %q: %s\n", path, e)
}
port = i
if h[0] == "" {
host = DEFAULT_HOST
} else {
host = h[0]
}
} else {
host = path
// If they didn't specify a port, always use the default port
port = DEFAULT_PORT
}

u := url.URL{
Scheme: "http",
}
if ssl {
u.Scheme = "https"
}
u.Host = net.JoinHostPort(host, strconv.Itoa(port))

return u, nil
}

// Config is used to specify what server to connect to.
// URL: The URL of the server connecting to.
// Username/Password are optional. They will be passed via basic auth if provided.
Expand All @@ -34,6 +80,16 @@ type Config struct {
Timeout time.Duration
}

// NewConfig will create a config to be used in connecting to the client
func NewConfig(u url.URL, username, password, userAgent string, timeout time.Duration) *Config {
return &Config{
URL: u,
Username: username,
Password: password,
UserAgent: userAgent,
}
}

// Client is used to make calls to the server.
type Client struct {
url url.URL
Expand All @@ -51,7 +107,7 @@ const (
)

// NewClient will instantiate and return a connected client to issue commands to the server.
func NewClient(c Config) (*Client, error) {
func NewClient(c *Config) (*Client, error) {
client := Client{
url: c.URL,
username: c.Username,
Expand Down Expand Up @@ -183,6 +239,52 @@ func (c *Client) Write(bp BatchPoints) (*Response, error) {
return nil, nil
}

// WriteLineProtocol takes a string with line returns to delimit each write
// If successful, error is nil and Response is nil
// If an error occurs, Response may contain additional information if populated.
func (c *Client) WriteLineProtocol(data, database, retentionPolicy, precision, writeConsistency string) (*Response, error) {
c.url.Path = "write"

var b bytes.Buffer
b.WriteString(data)

req, err := http.NewRequest("POST", c.url.String(), &b)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "")
req.Header.Set("User-Agent", c.userAgent)
if c.username != "" {
req.SetBasicAuth(c.username, c.password)
}
params := req.URL.Query()
params.Add("db", database)
params.Add("rp", retentionPolicy)
params.Add("precision", precision)
params.Add("consistency", writeConsistency)
req.URL.RawQuery = params.Encode()

resp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

var response Response
body, err := ioutil.ReadAll(resp.Body)
if err != nil && err.Error() != "EOF" {
return nil, err
}

if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK {
var err = fmt.Errorf(string(body))
response.Err = err
return &response, err
}

return nil, nil
}

// Ping will check to see if the server is up
// Ping returns how long the request took, the version of the server it connected to, and an error if one occurred.
func (c *Client) Ping() (time.Duration, string, error) {
Expand Down
208 changes: 208 additions & 0 deletions cmd/influx/importer/v8.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
package importer

import (
"bufio"
"compress/gzip"
"fmt"
"io"
"log"
"net/url"
"os"
"strings"
"sync"

"github.com/influxdb/influxdb/client"
)

const batchSize = 5000

type V8Config struct {
username, password string
url url.URL
precision string
writeConsistency string
file, version string
compressed bool
}

func NewV8Config(username, password, precision, writeConsistency, file, version string, u url.URL, compressed bool) *V8Config {
return &V8Config{
username: username,
password: password,
precision: precision,
writeConsistency: writeConsistency,
file: file,
version: version,
url: u,
compressed: compressed,
}
}

type V8 struct {
client *client.Client
database string
retentionPolicy string
config *V8Config
wg sync.WaitGroup
line, command chan string
done chan struct{}
batch []string
totalInserts, totalCommands int
}

func NewV8(config *V8Config) *V8 {
return &V8{
config: config,
done: make(chan struct{}),
line: make(chan string),
command: make(chan string),
batch: make([]string, 0, batchSize),
}
}

func (v8 *V8) Import() error {
// Create a client and try to connect
config := client.NewConfig(v8.config.url, v8.config.username, v8.config.password, v8.config.version, client.DEFAULT_TIMEOUT)
cl, err := client.NewClient(config)
if err != nil {
return fmt.Errorf("could not create client %s", err)
}
v8.client = cl
if _, _, e := v8.client.Ping(); e != nil {
return fmt.Errorf("failed to connect to %s\n", v8.client.Addr())
}

// Validate args
if v8.config.file == "" {
return fmt.Errorf("file argument required")
}

defer func() {
v8.wg.Wait()
if v8.totalInserts > 0 {
log.Printf("Processed %d commands\n", v8.totalCommands)
log.Printf("Processed %d inserts\n", v8.totalInserts)
}
}()

// Open the file
f, err := os.Open(v8.config.file)
if err != nil {
return err
}
defer f.Close()

var r io.Reader

// If gzipped, wrap in a gzip reader
if v8.config.compressed {
gr, err := gzip.NewReader(f)
if err != nil {
return err
}
defer gr.Close()
// Set the reader to the gzip reader
r = gr
} else {
// Standard text file so our reader can just be the file
r = f
}

// start our accumulator
go v8.batchAccumulator()

// start our command executor
go v8.queryExecutor()

// Get our reader
scanner := bufio.NewScanner(r)

// Process the scanner
v8.processDDL(scanner)
v8.processDML(scanner)

// Signal go routines we are done
close(v8.done)

// Check if we had any errors scanning the file
if err := scanner.Err(); err != nil {
return fmt.Errorf("reading standard input: %s", err)
}

return nil
}

func (v8 *V8) processDDL(scanner *bufio.Scanner) {
for scanner.Scan() {
line := scanner.Text()
// If we find the DML token, we are done with DDL
if strings.HasPrefix(line, "# DML") {
return
}
if strings.HasPrefix(line, "#") {
continue
}
v8.command <- line
}
}

func (v8 *V8) processDML(scanner *bufio.Scanner) {
for scanner.Scan() {
line := scanner.Text()
if strings.HasPrefix(line, "#") {
continue
}
v8.line <- line
}
}

func (v8 *V8) execute(command string) {
response, err := v8.client.Query(client.Query{Command: command, Database: v8.database})
if err != nil {
log.Printf("error: %s\n", err)
return
}
if err := response.Error(); err != nil {
log.Printf("error: %s\n", response.Error())
}
}

func (v8 *V8) queryExecutor() {
v8.wg.Add(1)
defer v8.wg.Done()
for {
select {
case c := <-v8.command:
v8.totalCommands++
v8.execute(c)
case <-v8.done:
return
}
}
}

func (v8 *V8) batchAccumulator() {
v8.wg.Add(1)
defer v8.wg.Done()
for {
select {
case l := <-v8.line:
v8.batch = append(v8.batch, l)
if len(v8.batch) == batchSize {
v8.batchWrite()
v8.totalInserts += len(v8.batch)
v8.batch = v8.batch[:0]
}
case <-v8.done:
v8.totalInserts += len(v8.batch)
return
}
}
}

func (v8 *V8) batchWrite() {
_, e := v8.client.WriteLineProtocol(strings.Join(v8.batch, "\n"), v8.database, v8.retentionPolicy, v8.config.precision, v8.config.writeConsistency)
if e != nil {
log.Println("error writing batch: ", e)
}
}
Loading

0 comments on commit 30a0ca0

Please sign in to comment.