Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

properly groupArray macros parsing golang part #200

Merged
merged 1 commit into from
Jun 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 119 additions & 25 deletions datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,18 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"math"
"net/http"
"net/url"
"reflect"
"runtime/debug"
"strconv"
"strings"

"github.com/bitly/go-simplejson"
"github.com/grafana/grafana-plugin-model/go/datasource"
plugin "github.com/hashicorp/go-plugin"
"github.com/hashicorp/go-plugin"
"golang.org/x/net/context"
"golang.org/x/net/context/ctxhttp"
)
Expand All @@ -30,7 +34,7 @@ func (t *ClickhouseDatasource) Query(ctx context.Context, req *datasource.Dataso
// catch all panics and override err return value
defer func() {
if panicMsg := recover(); panicMsg != nil {
err = fmt.Errorf("clickhouse plugin panicked: %#w", panicMsg)
err = fmt.Errorf("clickhouse plugin panicked: %+v stacktrace:\n%s", panicMsg, debug.Stack())
}
}()

Expand All @@ -50,7 +54,11 @@ func (t *ClickhouseDatasource) Query(ctx context.Context, req *datasource.Dataso
if err != nil {
return nil, err
}
defer response.Body.Close()
defer func() {
if err := response.Body.Close(); err!=nil {
log.Fatal("can't close HTTP Response body")
}
}()

// Body must be drained and closed on each request as per the docs: https://golang.org/pkg/net/http/#Client.Do
// otherwise the http client connection cannot be reused
Expand All @@ -70,12 +78,12 @@ func createRequest(req *datasource.DatasourceRequest, query string) (*http.Reque
body := ""
method := http.MethodGet
headers := http.Header{}
url, err := url.Parse(req.Datasource.Url)
dataSourceUrl, err := url.Parse(req.Datasource.Url)
if err != nil {
return nil, fmt.Errorf("unable to parse clickhouse url: %w", err)
return nil, fmt.Errorf("unable to parse clickhouse dataSourceUrl: %w", err)
}

params := url.Query()
params := dataSourceUrl.Query()
params.Add("query", query+" FORMAT JSON")

/*
Expand Down Expand Up @@ -133,8 +141,8 @@ func createRequest(req *datasource.DatasourceRequest, query string) (*http.Reque
}
}

url.RawQuery = params.Encode()
request, err := http.NewRequest(method, url.String(), bytes.NewBufferString(body))
dataSourceUrl.RawQuery = params.Encode()
request, err := http.NewRequest(method, dataSourceUrl.String(), bytes.NewBufferString(body))
if err != nil {
return nil, err
}
Expand All @@ -143,50 +151,135 @@ func createRequest(req *datasource.DatasourceRequest, query string) (*http.Reque
return request, nil
}

var floatType = reflect.TypeOf(float64(0))
var stringType = reflect.TypeOf("")

func parseFloat64(v interface{}) (float64, error) {
switch i := v.(type) {
case string:
return strconv.ParseFloat(i, 64)
case float64:
return i, nil
case float32:
return float64(i), nil
case int64:
return float64(i), nil
case int32:
return float64(i), nil
case int:
return float64(i), nil
case uint64:
return float64(i), nil
case uint32:
return float64(i), nil
case uint:
return float64(i), nil
default:
tv := reflect.ValueOf(i)
tv = reflect.Indirect(tv)
if tv.Type().ConvertibleTo(floatType) {
fv := tv.Convert(floatType)
return fv.Float(), nil
} else if tv.Type().ConvertibleTo(stringType) {
sv := tv.Convert(stringType)
s := sv.String()
return strconv.ParseFloat(s, 64)
} else {
return math.NaN(), fmt.Errorf("can't convert %v to float64", tv.Type())
}
}
}

func parseResponse(body []byte, refId string) (*datasource.DatasourceResponse, error) {

parsedBody := ClickHouseResponse{}
err := json.Unmarshal(body, &parsedBody)
if err != nil {
return nil, fmt.Errorf("unable to parse response json: %s: %w", body, err)
}

seriesMap := map[string]*datasource.TimeSeries{}
metaTypesMap := map[string]string{}
// expect first column as timestamp
tsMetaName := parsedBody.Meta[0].Name
for _, meta := range parsedBody.Meta {
if meta.Name != "t" {
if meta.Name != tsMetaName && !strings.HasPrefix(meta.Type,"Array(Tuple("){
seriesMap[meta.Name] = &datasource.TimeSeries{Name: meta.Name, Points: []*datasource.Point{}}
}
metaTypesMap[meta.Name] = meta.Type
}

for _, dataPoint := range parsedBody.Data {
timestamp, err := strconv.ParseInt(dataPoint[tsMetaName].(string), 10, 64)
if err != nil {
return nil, fmt.Errorf("unable to parse timestamp with alias t: %w", err)
}
for k, v := range dataPoint {
if k != "t" {
timestamp, err := strconv.ParseInt(dataPoint["t"], 10, 64)
if err != nil {
return nil, fmt.Errorf("unable to parse timestamp with alias t: %w", err)
}
if k != tsMetaName {
var point float64
var err error

point, err := strconv.ParseFloat(v, 64)
if err != nil {
return nil, fmt.Errorf("unable to parse value for '%s': %w", k, err)
}
if !strings.HasPrefix(metaTypesMap[k],"Array(Tuple(") {

seriesMap[k].Points = append(seriesMap[k].Points, &datasource.Point{
Timestamp: timestamp,
Value: point,
})
point, err = parseFloat64(v)
if err != nil {
return nil, fmt.Errorf("unable to parse value %v for '%s': %w", v, k, err)
}

seriesMap[k].Points = append(seriesMap[k].Points, &datasource.Point{
Timestamp: timestamp,
Value: point,
})
} else {
var arrayOfTuples [][]string
switch arrays := v.(type) {
case []interface{}:
for _, array := range arrays {
switch tuple := array.(type) {
case []interface{}:
var t []string
for _, s := range tuple {
t = append(t, fmt.Sprintf("%v",s))
}
arrayOfTuples = append(arrayOfTuples, t)
default:
return nil, fmt.Errorf("unable to parse data section type=%T in response json: %s", tuple, tuple)
}
}
default:
return nil, fmt.Errorf("unable to parse data section type=%T in response json: %s", v, v)
}
for _, tuple := range arrayOfTuples {
tsName := tuple[0]
tsValue := tuple[1]
ts, isExists := seriesMap[tsName]
if !isExists {
ts = &datasource.TimeSeries{Name: tsName, Points: []*datasource.Point{}}
seriesMap[tsName] = ts
}
point, err = parseFloat64(tsValue)
if err != nil {
return nil, fmt.Errorf("unable to parse value %v for '%s': %w", tsValue, tsName, err)
}
ts.Points = append(ts.Points, &datasource.Point{
Timestamp: timestamp,
Value: point,
})
}
}
}
}
}

series := []*datasource.TimeSeries{}
var series []*datasource.TimeSeries
for _, timeSeries := range seriesMap {
series = append(series, timeSeries)
}

metaJSON, _ := json.Marshal(parsedBody.Meta)
return &datasource.DatasourceResponse{
Results: []*datasource.QueryResult{
&datasource.QueryResult{
{
Series: series,
RefId: refId,
MetaJson: string(metaJSON),
Expand All @@ -195,9 +288,10 @@ func parseResponse(body []byte, refId string) (*datasource.DatasourceResponse, e
}, nil
}


type ClickHouseResponse struct {
Meta []ClickHouseMeta
Data []map[string]string
Data []map[string]interface{}
Rows int
}

Expand Down
Binary file modified dist/vertamedia-clickhouse-plugin_linux_amd64
Binary file not shown.
2 changes: 1 addition & 1 deletion plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"os"

"github.com/grafana/grafana-plugin-model/go/datasource"
plugin "github.com/hashicorp/go-plugin"
"github.com/hashicorp/go-plugin"
)

func main() {
Expand Down