Skip to content

Commit

Permalink
Add support for opendistro destinations, monitors.
Browse files Browse the repository at this point in the history
  • Loading branch information
phillbaker committed Jun 10, 2019
1 parent 4b3e6df commit b7f6d6c
Show file tree
Hide file tree
Showing 11 changed files with 867 additions and 42 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go:
- master
env:
- ES_VERSION=5.6.16 ES_OSS_IMAGE=elasticsearch:${ES_VERSION} ES_IMAGE=docker.elastic.co/elasticsearch/elasticsearch:${ES_VERSION} ES_COMMAND="elasticsearch -Epath.repo=/tmp"
- ES_VERSION=6.8.0 ES_OSS_IMAGE=docker.elastic.co/elasticsearch/elasticsearch-oss:${ES_VERSION} ES_IMAGE=docker.elastic.co/elasticsearch/elasticsearch:${ES_VERSION}
- ES_VERSION=6.8.0 ES_OSS_IMAGE=docker.elastic.co/elasticsearch/elasticsearch-oss:${ES_VERSION} ES_IMAGE=docker.elastic.co/elasticsearch/elasticsearch:${ES_VERSION} ES_OPENDISTRO_IMAGE=amazon/opendistro-for-elasticsearch:0.9.0
- ES_VERSION=7.0.1 ES_OSS_IMAGE=docker.elastic.co/elasticsearch/elasticsearch-oss:${ES_VERSION} ES_IMAGE=docker.elastic.co/elasticsearch/elasticsearch:${ES_VERSION}
matrix:
allow_failures:
Expand Down
30 changes: 27 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ services:
hard: -1
ports:
- 9200:9200
platinum:
xpack:
image: ${ES_IMAGE}
hostname: elasticsearch-platinum
hostname: elasticsearch-xpack
environment:
- cluster.name=platinum
- cluster.name=xpack
- bootstrap.memory_lock=true
- discovery.type=single-node
- path.repo=/tmp
Expand All @@ -49,3 +49,27 @@ services:
hard: -1
ports:
- 9210:9210
opendistro:
image: ${ES_OPENDISTRO_IMAGE:-rwgrim/docker-noop}
hostname: elasticsearch-opendistro
environment:
- cluster.name=opendistro
- bootstrap.memory_lock=true
- discovery.type=single-node
- path.repo=/tmp
- opendistro_security.disabled=true
- http.port=9220
- network.publish_host=127.0.0.1
- logger.org.elasticsearch=warn
- "ES_JAVA_OPTS=-Xms1g -Xmx1g"
- ELASTIC_PASSWORD=elastic
ulimits:
nproc: 65536
nofile:
soft: 65536
hard: 65536
memlock:
soft: -1
hard: -1
ports:
- 9220:9220
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/deoxxa/aws_signing_client v0.0.0-20161109131055-c20ee106809e
github.com/grpc-ecosystem/grpc-gateway v1.6.2 // indirect
github.com/hashicorp/terraform v0.12.0
github.com/olivere/elastic v6.2.18+incompatible // indirect
github.com/olivere/elastic v6.2.18+incompatible
github.com/olivere/elastic/v7 v7.0.1
gopkg.in/olivere/elastic.v5 v5.0.81
gopkg.in/olivere/elastic.v6 v6.2.18
Expand Down
2 changes: 2 additions & 0 deletions provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ func Provider() terraform.ResourceProvider {
"elasticsearch_snapshot_repository": resourceElasticsearchSnapshotRepository(),
"elasticsearch_kibana_object": resourceElasticsearchKibanaObject(),
"elasticsearch_watch": resourceElasticsearchWatch(),
"elasticsearch_monitor": resourceElasticsearchMonitor(),
"elasticsearch_destination": resourceElasticsearchDestination(),
},

ConfigureFunc: providerConfigure,
Expand Down
18 changes: 16 additions & 2 deletions provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ var testAccProvider *schema.Provider
var testAccXPackProviders map[string]terraform.ResourceProvider
var testAccXPackProvider *schema.Provider

var testAccOpendistroProviders map[string]terraform.ResourceProvider
var testAccOpendistroProvider *schema.Provider

func init() {
testAccProvider = Provider().(*schema.Provider)
testAccProviders = map[string]terraform.ResourceProvider{
Expand All @@ -25,10 +28,21 @@ func init() {
"elasticsearch": testAccXPackProvider,
}

originalConfigureFunc := testAccXPackProvider.ConfigureFunc
xPackOriginalConfigureFunc := testAccXPackProvider.ConfigureFunc
testAccXPackProvider.ConfigureFunc = func(d *schema.ResourceData) (interface{}, error) {
d.Set("url", "http://elastic:[email protected]:9210")
return originalConfigureFunc(d)
return xPackOriginalConfigureFunc(d)
}

testAccOpendistroProvider = Provider().(*schema.Provider)
testAccOpendistroProviders = map[string]terraform.ResourceProvider{
"elasticsearch": testAccOpendistroProvider,
}

opendistroOriginalConfigureFunc := testAccOpendistroProvider.ConfigureFunc
testAccOpendistroProvider.ConfigureFunc = func(d *schema.ResourceData) (interface{}, error) {
d.Set("url", "http://elastic:[email protected]:9220")
return opendistroOriginalConfigureFunc(d)
}
}

Expand Down
236 changes: 236 additions & 0 deletions resource_elasticsearch_destination.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
package main

import (
"context"
"encoding/json"
"errors"
"fmt"
"log"

"github.com/hashicorp/terraform/helper/schema"
"github.com/olivere/elastic/uritemplates"

elastic7 "github.com/olivere/elastic/v7"
elastic6 "gopkg.in/olivere/elastic.v6"
)

const DESTINATION_TYPE = "_doc"
const DESTINATION_INDEX = ".opendistro-alerting-config"

func resourceElasticsearchDestination() *schema.Resource {
return &schema.Resource{
Create: resourceElasticsearchDestinationCreate,
Read: resourceElasticsearchDestinationRead,
Update: resourceElasticsearchDestinationUpdate,
Delete: resourceElasticsearchDestinationDelete,
Schema: map[string]*schema.Schema{
"body": &schema.Schema{
Type: schema.TypeString,
Required: true,
},
},
Importer: &schema.ResourceImporter{
State: schema.ImportStatePassthrough,
},
}
}

func resourceElasticsearchDestinationCreate(d *schema.ResourceData, m interface{}) error {
res, err := resourceElasticsearchPostDestination(d, m)

if err != nil {
log.Printf("[INFO] Failed to put destination: %+v", err)
return err
}

d.SetId(res.ID)
d.Set("body", res.Destination)
log.Printf("[INFO] Object ID: %s", d.Id())

return nil
}

func resourceElasticsearchDestinationRead(d *schema.ResourceData, m interface{}) error {
res, err := resourceElasticsearchGetDestination(d.Id(), m)

if elastic6.IsNotFound(err) || elastic7.IsNotFound(err) {
log.Printf("[WARN] Destination (%s) not found, removing from state", d.Id())
d.SetId("")
return nil
}

if err != nil {
return err
}

d.Set("body", res.Destination)
d.SetId(res.ID)

return nil
}

func resourceElasticsearchDestinationUpdate(d *schema.ResourceData, m interface{}) error {
_, err := resourceElasticsearchPutDestination(d, m)

if err != nil {
return err
}

return resourceElasticsearchDestinationRead(d, m)
}

func resourceElasticsearchDestinationDelete(d *schema.ResourceData, m interface{}) error {
var err error

path, err := uritemplates.Expand("/_opendistro/_alerting/destinations/{id}", map[string]string{
"id": d.Id(),
})
if err != nil {
return fmt.Errorf("error building URL path for destination: %+v", err)
}

switch m.(type) {
case *elastic7.Client:
client := m.(*elastic7.Client)
_, err = client.PerformRequest(context.TODO(), elastic7.PerformRequestOptions{
Method: "DELETE",
Path: path,
})
case *elastic6.Client:
client := m.(*elastic6.Client)
_, err = client.PerformRequest(context.TODO(), elastic6.PerformRequestOptions{
Method: "DELETE",
Path: path,
})
default:
err = errors.New("destination resource not implemented prior to Elastic v6")
}

return err
}

func resourceElasticsearchGetDestination(destinationID string, m interface{}) (*destinationResponse, error) {
var err error
response := new(destinationResponse)

// See https://github.com/opendistro-for-elasticsearch/alerting/issues/56, no API endpoint for retrieving destination
var body *json.RawMessage
switch m.(type) {
case *elastic7.Client:
client := m.(*elastic7.Client)
body, err = elastic7GetObject(client, DESTINATION_TYPE, DESTINATION_INDEX, destinationID)
case *elastic6.Client:
client := m.(*elastic6.Client)
body, err = elastic6GetObject(client, DESTINATION_TYPE, DESTINATION_INDEX, destinationID)
default:
err = errors.New("destination resource not implemented prior to Elastic v6")
}

if err != nil {
return response, err
}

if err := json.Unmarshal(*body, response); err != nil {
return response, fmt.Errorf("error unmarshalling destination body: %+v: %+v", err, body)
}

response.ID = destinationID
return response, err
}

func resourceElasticsearchPostDestination(d *schema.ResourceData, m interface{}) (*destinationResponse, error) {
destinationJSON := d.Get("body").(string)

var err error
response := new(destinationResponse)

path := "/_opendistro/_alerting/destinations/"

var body json.RawMessage
switch m.(type) {
case *elastic7.Client:
client := m.(*elastic7.Client)
var res *elastic7.Response
res, err = client.PerformRequest(context.TODO(), elastic7.PerformRequestOptions{
Method: "POST",
Path: path,
Body: destinationJSON,
})
body = res.Body
case *elastic6.Client:
client := m.(*elastic6.Client)
var res *elastic6.Response
res, err = client.PerformRequest(context.TODO(), elastic6.PerformRequestOptions{
Method: "POST",
Path: path,
Body: destinationJSON,
})
body = res.Body
default:
err = errors.New("destination resource not implemented prior to Elastic v6")
}

if err != nil {
return response, err
}

if err := json.Unmarshal(body, response); err != nil {
return response, fmt.Errorf("error unmarshalling destination body: %+v: %+v", err, body)
}

return response, nil
}

func resourceElasticsearchPutDestination(d *schema.ResourceData, m interface{}) (*destinationResponse, error) {
destinationJSON := d.Get("body").(string)

var err error
response := new(destinationResponse)

path, err := uritemplates.Expand("/_opendistro/_alerting/destinations/{id}", map[string]string{
"id": d.Id(),
})
if err != nil {
return response, fmt.Errorf("error building URL path for destination: %+v", err)
}

var body json.RawMessage
switch m.(type) {
case *elastic7.Client:
client := m.(*elastic7.Client)
var res *elastic7.Response
res, err = client.PerformRequest(context.TODO(), elastic7.PerformRequestOptions{
Method: "PUT",
Path: path,
Body: destinationJSON,
})
body = res.Body
case *elastic6.Client:
client := m.(*elastic6.Client)
var res *elastic6.Response
res, err = client.PerformRequest(context.TODO(), elastic6.PerformRequestOptions{
Method: "PUT",
Path: path,
Body: destinationJSON,
})
body = res.Body
default:
err = errors.New("destination resource not implemented prior to Elastic v6")
}

if err != nil {
return response, err
}

if err := json.Unmarshal(body, response); err != nil {
return response, fmt.Errorf("error unmarshalling destination body: %+v: %+v", err, body)
}

return response, nil
}

type destinationResponse struct {
Version int `json:"_version"`
ID string `json:"_id"`
Destination interface{} `json:"destination"`
}
Loading

0 comments on commit b7f6d6c

Please sign in to comment.