Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more context handling #139

Merged
merged 2 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .drone.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@ steps:
- name: build
image: grafana/grafana-plugin-ci:1.9.5
commands:
- mage -v build
Copy link
Contributor Author

@njvrzm njvrzm May 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of the build steps were randomly failing, I believe because they share a docker volume and one step's mage process is deleting the mage_output_file.go before another step is done with it. This change appears to fix it.

- mage --keep -v build

- name: lint
image: grafana/grafana-plugin-ci:1.9.5
commands:
- mage -v lint
- mage --keep -v lint

- name: test
image: grafana/grafana-plugin-ci:1.9.5
commands:
- mage -v test
- mage --keep -v test

- name: vuln check
image: golang:1.22
Expand Down
8 changes: 4 additions & 4 deletions pkg/awsds/asyncDatasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func isAsyncFlow(query backend.DataQuery) bool {
}

func (ds *AsyncAWSDatasource) NewDatasource(ctx context.Context, settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
db, err := ds.driver.GetAsyncDB(settings, nil)
db, err := ds.driver.GetAsyncDB(ctx, settings, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -153,7 +153,7 @@ func (ds *AsyncAWSDatasource) CheckHealth(ctx context.Context, req *backend.Chec
}, nil
}

func (ds *AsyncAWSDatasource) getAsyncDBFromQuery(q *AsyncQuery, datasourceUID string) (AsyncDB, error) {
func (ds *AsyncAWSDatasource) getAsyncDBFromQuery(ctx context.Context, q *AsyncQuery, datasourceUID string) (AsyncDB, error) {
if !ds.EnableMultipleConnections && len(q.ConnectionArgs) > 0 {
return nil, sqlds.ErrorMissingMultipleConnectionsConfig
}
Expand All @@ -174,7 +174,7 @@ func (ds *AsyncAWSDatasource) getAsyncDBFromQuery(q *AsyncQuery, datasourceUID s
}

var err error
db, err := ds.driver.GetAsyncDB(dbConn.settings, q.ConnectionArgs)
db, err := ds.driver.GetAsyncDB(ctx, dbConn.settings, q.ConnectionArgs)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -211,7 +211,7 @@ func (ds *AsyncAWSDatasource) handleAsyncQuery(ctx context.Context, req backend.
fillMode = q.FillMissing
}

asyncDB, err := ds.getAsyncDBFromQuery(q, datasourceUID)
asyncDB, err := ds.getAsyncDBFromQuery(ctx, q, datasourceUID)
if err != nil {
return getErrorFrameFromQuery(q), err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/awsds/asyncDatasource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type fakeDriver struct {
AsyncDriver
}

func (d fakeDriver) GetAsyncDB(backend.DataSourceInstanceSettings, json.RawMessage) (db AsyncDB, err error) {
func (d fakeDriver) GetAsyncDB(context.Context, backend.DataSourceInstanceSettings, json.RawMessage) (db AsyncDB, err error) {
return d.openDBfn()
}

Expand Down Expand Up @@ -96,7 +96,7 @@ func Test_getDBConnectionFromQuery(t *testing.T) {
ds.storeDBConnection(key, dbConnection{tt.existingDB, settings})
}

dbConn, err := ds.getAsyncDBFromQuery(&AsyncQuery{Query: sqlutil.Query{ConnectionArgs: json.RawMessage(tt.args)}}, tt.dsUID)
dbConn, err := ds.getAsyncDBFromQuery(context.Background(), &AsyncQuery{Query: sqlutil.Query{ConnectionArgs: json.RawMessage(tt.args)}}, tt.dsUID)
if err != nil {
t.Fatalf("unexpected error %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/awsds/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,5 +116,5 @@ type AsyncDB interface {
// AsyncDriver extends the driver interface to also connect to async SQL datasources
type AsyncDriver interface {
sqlds.Driver
GetAsyncDB(settings backend.DataSourceInstanceSettings, queryArgs json.RawMessage) (AsyncDB, error)
GetAsyncDB(ctx context.Context, settings backend.DataSourceInstanceSettings, queryArgs json.RawMessage) (AsyncDB, error)
}
3 changes: 0 additions & 3 deletions pkg/sql/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/aws/aws-sdk-go/aws"
"github.com/grafana/grafana-aws-sdk/pkg/awsds"
"github.com/grafana/grafana-aws-sdk/pkg/sql/models"
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
"github.com/grafana/sqlds/v3"
"github.com/jpillora/backoff"
Expand Down Expand Up @@ -54,8 +53,6 @@ type AWSAPI interface {
Resources
}

type Loader func(cache *awsds.SessionCache, settings models.Settings) (AWSAPI, error)

// WaitOnQuery polls the datasource api until the query finishes, returning an error if it failed.
func WaitOnQuery(ctx context.Context, api SQL, output *ExecuteQueryOutput) error {
backoffInstance := backoff.Backoff{
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestWaitOnQuery(t *testing.T) {

for _, tc := range tests {
t.Run(tc.description, func(t *testing.T) {
err := WaitOnQuery(context.TODO(), tc.ds, &ExecuteQueryOutput{})
err := WaitOnQuery(context.Background(), tc.ds, &ExecuteQueryOutput{})
if tc.ds.statusCounter != len(tc.ds.status) {
t.Errorf("status not called the right amount of times. Want %d got %d", len(tc.ds.status), tc.ds.statusCounter)
}
Expand Down
85 changes: 49 additions & 36 deletions pkg/sql/datasource/datasource.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package datasource

import (
"context"
"database/sql"
"fmt"
"sync"
Expand All @@ -14,28 +15,45 @@ import (
"github.com/grafana/sqlds/v3"
)

// AWSDatasource stores a cache of several instances.
// AWSClient provides creation and caching of sessions, database connections, and API clients
type AWSClient interface {
Init(config backend.DataSourceInstanceSettings)
GetDB(ctx context.Context, id int64, options sqlds.Options) (*sql.DB, error)
GetAsyncDB(ctx context.Context, id int64, options sqlds.Options) (awsds.AsyncDB, error)
GetAPI(ctx context.Context, id int64, options sqlds.Options) (api.AWSAPI, error)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Athena and redshift were defining this interface for themselves, which seemed backwards to me; rather than update both copies with context arguments I've moved it here.


type Loader interface {
LoadSettings(context.Context) models.Settings
LoadAPI(context.Context, *awsds.SessionCache, models.Settings) (api.AWSAPI, error)
LoadDriver(context.Context, api.AWSAPI) (driver.Driver, error)
LoadAsyncDriver(context.Context, api.AWSAPI) (asyncDriver.Driver, error)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love this approach but I like it better than passing around a bunch of typed functions. I'm open to alternatives :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think its better (probably equivalent tbh) but you could make a struct of the options and pass that around?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would work! I think you're right though that it's not really better. Gonna keep it this way since it's already done.


// awsClient provides creation and caching of several types of instances.
// Each Map will depend on the datasource ID (and connection options):
// - sessionCache: AWS cache. This is not a Map since it does not depend on the datasource.
// - config: Base configuration. It will be used as base to populate datasource settings.
// It does not depend on connection options (only one per datasource)
// - api: API instance with the common methods to contact the data source API.
type AWSDatasource struct {
type awsClient struct {
sessionCache *awsds.SessionCache
config sync.Map
api sync.Map

loader Loader
}

func New() *AWSDatasource {
ds := &AWSDatasource{sessionCache: awsds.NewSessionCache()}
func New(loader Loader) AWSClient {
ds := &awsClient{sessionCache: awsds.NewSessionCache(), loader: loader}
return ds
}

func (ds *AWSDatasource) storeConfig(config backend.DataSourceInstanceSettings) {
func (ds *awsClient) storeConfig(config backend.DataSourceInstanceSettings) {
ds.config.Store(config.ID, config)
}

func (ds *AWSDatasource) createDB(dr driver.Driver) (*sql.DB, error) {
func (ds *awsClient) createDB(dr driver.Driver) (*sql.DB, error) {
db, err := dr.OpenDB()
if err != nil {
return nil, fmt.Errorf("%w: failed to connect to database (check hostname and port?)", err)
Expand All @@ -44,7 +62,7 @@ func (ds *AWSDatasource) createDB(dr driver.Driver) (*sql.DB, error) {
return db, nil
}

func (ds *AWSDatasource) createAsyncDB(dr asyncDriver.Driver) (awsds.AsyncDB, error) {
func (ds *awsClient) createAsyncDB(dr asyncDriver.Driver) (awsds.AsyncDB, error) {
db, err := dr.GetAsyncDB()
if err != nil {
return nil, fmt.Errorf("%w: failed to connect to database (check hostname and port)", err)
Expand All @@ -53,12 +71,12 @@ func (ds *AWSDatasource) createAsyncDB(dr asyncDriver.Driver) (awsds.AsyncDB, er
return db, nil
}

func (ds *AWSDatasource) storeAPI(id int64, args sqlds.Options, dsAPI api.AWSAPI) {
func (ds *awsClient) storeAPI(id int64, args sqlds.Options, dsAPI api.AWSAPI) {
key := connectionKey(id, args)
ds.api.Store(key, dsAPI)
}

func (ds *AWSDatasource) loadAPI(id int64, args sqlds.Options) (api.AWSAPI, bool) {
func (ds *awsClient) loadAPI(id int64, args sqlds.Options) (api.AWSAPI, bool) {
key := connectionKey(id, args)
dsAPI, exists := ds.api.Load(key)
if exists {
Expand All @@ -67,34 +85,34 @@ func (ds *AWSDatasource) loadAPI(id int64, args sqlds.Options) (api.AWSAPI, bool
return nil, false
}

func (ds *AWSDatasource) createAPI(id int64, args sqlds.Options, settings models.Settings, loader api.Loader) (api.AWSAPI, error) {
dsAPI, err := loader(ds.sessionCache, settings)
func (ds *awsClient) createAPI(ctx context.Context, id int64, args sqlds.Options, settings models.Settings) (api.AWSAPI, error) {
dsAPI, err := ds.loader.LoadAPI(ctx, ds.sessionCache, settings)
if err != nil {
return nil, fmt.Errorf("%w: Failed to create client", err)
}
ds.storeAPI(id, args, dsAPI)
return dsAPI, err
}

func (ds *AWSDatasource) createDriver(dsAPI api.AWSAPI, loader driver.Loader) (driver.Driver, error) {
dr, err := loader(dsAPI)
func (ds *awsClient) createDriver(ctx context.Context, dsAPI api.AWSAPI) (driver.Driver, error) {
dr, err := ds.loader.LoadDriver(ctx, dsAPI)
if err != nil {
return nil, fmt.Errorf("%w: Failed to create client", err)
}

return dr, nil
}

func (ds *AWSDatasource) createAsyncDriver(dsAPI api.AWSAPI, loader asyncDriver.Loader) (asyncDriver.Driver, error) {
dr, err := loader(dsAPI)
func (ds *awsClient) createAsyncDriver(ctx context.Context, dsAPI api.AWSAPI) (asyncDriver.Driver, error) {
dr, err := ds.loader.LoadAsyncDriver(ctx, dsAPI)
if err != nil {
return nil, fmt.Errorf("%w: Failed to create client", err)
}

return dr, nil
}

func (ds *AWSDatasource) parseSettings(id int64, args sqlds.Options, settings models.Settings) error {
func (ds *awsClient) parseSettings(id int64, args sqlds.Options, settings models.Settings) error {
config, ok := ds.config.Load(id)
if !ok {
return fmt.Errorf("unable to find stored configuration for datasource %d. Initialize it first", id)
Expand All @@ -108,31 +126,29 @@ func (ds *AWSDatasource) parseSettings(id int64, args sqlds.Options, settings mo
}

// Init stores the data source configuration. It's needed for the GetDB and GetAPI functions
func (ds *AWSDatasource) Init(config backend.DataSourceInstanceSettings) {
func (ds *awsClient) Init(config backend.DataSourceInstanceSettings) {
ds.storeConfig(config)
}

// GetDB returns a *sql.DB. It will use the loader functions to initialize the required
// settings, API and driver and finally create a DB.
func (ds *AWSDatasource) GetDB(
func (ds *awsClient) GetDB(
ctx context.Context,
id int64,
options sqlds.Options,
settingsLoader models.Loader,
apiLoader api.Loader,
driverLoader driver.Loader,
) (*sql.DB, error) {
settings := settingsLoader()
settings := ds.loader.LoadSettings(ctx)
err := ds.parseSettings(id, options, settings)
if err != nil {
return nil, err
}

dsAPI, err := ds.createAPI(id, options, settings, apiLoader)
dsAPI, err := ds.createAPI(ctx, id, options, settings)
if err != nil {
return nil, err
}

dr, err := ds.createDriver(dsAPI, driverLoader)
dr, err := ds.createDriver(ctx, dsAPI)
if err != nil {
return nil, err
}
Expand All @@ -142,25 +158,23 @@ func (ds *AWSDatasource) GetDB(

// GetAsyncDB returns a sqlds.AsyncDB. It will use the loader functions to initialize the required
// settings, API and driver and finally create a DB.
func (ds *AWSDatasource) GetAsyncDB(
func (ds *awsClient) GetAsyncDB(
ctx context.Context,
id int64,
options sqlds.Options,
settingsLoader models.Loader,
apiLoader api.Loader,
driverLoader asyncDriver.Loader,
) (awsds.AsyncDB, error) {
settings := settingsLoader()
settings := ds.loader.LoadSettings(ctx)
err := ds.parseSettings(id, options, settings)
if err != nil {
return nil, err
}

dsAPI, err := ds.createAPI(id, options, settings, apiLoader)
dsAPI, err := ds.createAPI(ctx, id, options, settings)
if err != nil {
return nil, err
}

dr, err := ds.createAsyncDriver(dsAPI, driverLoader)
dr, err := ds.createAsyncDriver(ctx, dsAPI)
if err != nil {
return nil, err
}
Expand All @@ -171,22 +185,21 @@ func (ds *AWSDatasource) GetAsyncDB(
// GetAPI returns an API interface. When called multiple times with the same id and options, it
// will return a cached version of the API. The first time, it will use the loader
// functions to initialize the required settings and API.
func (ds *AWSDatasource) GetAPI(
func (ds *awsClient) GetAPI(
ctx context.Context,
id int64,
options sqlds.Options,
settingsLoader models.Loader,
apiLoader api.Loader,
) (api.AWSAPI, error) {
cachedAPI, exists := ds.loadAPI(id, options)
if exists {
return cachedAPI, nil
}

// create new api
settings := settingsLoader()
settings := ds.loader.LoadSettings(ctx)
err := ds.parseSettings(id, options, settings)
if err != nil {
return nil, err
}
return ds.createAPI(id, options, settings, apiLoader)
return ds.createAPI(ctx, id, options, settings)
}
Loading