Skip to content

Commit

Permalink
optimize interface for etl jobs, add test cases and examples for etl
Browse files Browse the repository at this point in the history
  • Loading branch information
licheng committed Oct 10, 2022
1 parent 48be6df commit 503d91c
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 55 deletions.
77 changes: 45 additions & 32 deletions client_etl.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,50 +48,45 @@ type ETLSink struct {
}

type ListETLResponse struct {
Total int `json:"total"`
Count int `json:"count"`
Total int `json:"total"`
Count int `json:"count"`
Results []*ETL `json:"results"`
}


func NewETL(endpoint, accessKeyId, accessKeySecret, logstore, name, project string) ETL {
sink := ETLSink{
AccessKeyId:accessKeyId,
AccessKeySecret:accessKeySecret,
Endpoint:endpoint,
Logstore:logstore,
Name:name,
Project:project,
Type: ETLSinksType,
}
config := ETLConfiguration {
AccessKeyId:accessKeyId,
AccessKeySecret:accessKeySecret,
FromTime: time.Now().Unix(),
Script: "e_set('new','aliyun')",
Version:ETLVersion,
Logstore:logstore,
ETLSinks:[]ETLSink{sink},
Parameters: map[string]string{},

AccessKeyId: accessKeyId,
AccessKeySecret: accessKeySecret,
Endpoint: endpoint,
Logstore: logstore,
Name: name,
Project: project,
Type: ETLSinksType,
}
config := ETLConfiguration{
AccessKeyId: accessKeyId,
AccessKeySecret: accessKeySecret,
FromTime: time.Now().Unix(),
Script: "e_set('new','aliyun')",
Version: ETLVersion,
Logstore: logstore,
ETLSinks: []ETLSink{sink},
Parameters: map[string]string{},
}
schedule := ETLSchedule{
Type:"Resident",
Type: "Resident",
}
etljob := ETL {
Configuration:config,
DisplayName:"displayname",
Description:"go sdk case",
Name:name,
Schedule:schedule,
Type:ETLType,

etljob := ETL{
Configuration: config,
DisplayName: "displayname",
Description: "go sdk case",
Name: name,
Schedule: schedule,
Type: ETLType,
}
return etljob
}



func (c *Client) CreateETL(project string, etljob ETL) error {
body, err := json.Marshal(etljob)
if err != nil {
Expand Down Expand Up @@ -212,3 +207,21 @@ func (c *Client) StopETL(project, name string) error {
r.Body.Close()
return nil
}

func (c *Client) RestartETL(project string, etljob ETL) error {
body, err := json.Marshal(etljob)
if err != nil {
return NewClientError(err)
}
h := map[string]string{
"x-log-bodyrawsize": fmt.Sprintf("%v", len(body)),
"Content-Type": "application/json",
}
uri := fmt.Sprintf("/jobs/%s?action=RESTART", etljob.Name)
r, err := c.request(project, "PUT", uri, h, body)
if err != nil {
return err
}
r.Body.Close()
return nil
}
88 changes: 65 additions & 23 deletions client_etl_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package sls

import (
"github.com/stretchr/testify/suite"
"os"
"testing"
"time"

"github.com/stretchr/testify/suite"
)

func TestETLJobV2(t *testing.T) {
Expand All @@ -19,7 +20,6 @@ type ETLJobTestV2Suite struct {
accessKeyID string
accessKeySecret string
targetLogstoreName string
etlName string
client *Client
}

Expand All @@ -37,12 +37,12 @@ func (s *ETLJobTestV2Suite) SetupTest() {
}
}

func (s *ETLJobTestV2Suite) createETLJobV2() error {
func (s *ETLJobTestV2Suite) createETLJobV2(etlName string) error {
sink := ETLSink{
AccessKeyId: s.accessKeyID,
AccessKeySecret: s.accessKeySecret,
Endpoint: s.endpoint,
Logstore: s.logstoreName,
Logstore: s.targetLogstoreName,
Name: "aliyun-etl-test",
Project: s.projectName,
}
Expand All @@ -63,86 +63,128 @@ func (s *ETLJobTestV2Suite) createETLJobV2() error {
Configuration: config,
DisplayName: "displayName",
Description: "go sdk case",
Name: s.etlName,
Name: etlName,
Schedule: schedule,
Type: "ETL",
}
return s.client.CreateETL(s.projectName, etljob)
}

func (s *ETLJobTestV2Suite) TestClient_UpdateETLJobV2() {
err := s.createETLJobV2()
etlName := "test_update_etl"
err := s.createETLJobV2(etlName)
s.Require().Nil(err)
etljob, err := s.client.GetETL(s.projectName, s.etlName)
etljob, err := s.client.GetETL(s.projectName, etlName)
s.Require().Nil(err)
etljob.DisplayName = "update"
etljob.Description = "update description"
etljob.Configuration.Script = "e_set('update','update')"
err = s.client.UpdateETL(s.projectName, *etljob)
s.Require().Nil(err)
etljob, err = s.client.GetETL(s.projectName, s.etlName)
etljob, err = s.client.GetETL(s.projectName, etlName)
s.Require().Nil(err)
s.Require().Equal("update", etljob.DisplayName)
s.Require().Equal("update description", etljob.Description)
err = s.client.DeleteETL(s.projectName, s.etlName)
err = s.client.DeleteETL(s.projectName, etlName)
s.Require().Nil(err)
}

func (s *ETLJobTestV2Suite) TestClient_DeleteETLJobV2() {
err := s.createETLJobV2()
etlName := "test_delete_etl"
err := s.createETLJobV2(etlName)
s.Require().Nil(err)
_, err = s.client.GetETL(s.projectName, s.etlName)
_, err = s.client.GetETL(s.projectName, etlName)
s.Require().Nil(err)
err = s.client.DeleteETL(s.projectName, s.etlName)
err = s.client.DeleteETL(s.projectName, etlName)
s.Require().Nil(err)
time.Sleep(time.Second * 100)
_, err = s.client.GetETL(s.projectName, s.etlName)
_, err = s.client.GetETL(s.projectName, etlName)
s.Require().NotNil(err)

}

func (s *ETLJobTestV2Suite) TestClient_ListETLJobV2() {
err := s.createETLJobV2()
etlName := "test_list_etl"
err := s.createETLJobV2(etlName)
s.Require().Nil(err)
etljobList, err := s.client.ListETL(s.projectName, 0, 100)
s.Require().Nil(err)
s.Require().Equal(1, etljobList.Total)
s.Require().Equal(1, etljobList.Count)
err = s.client.DeleteETL(s.projectName, s.etlName)
err = s.client.DeleteETL(s.projectName, etlName)
s.Require().Nil(err)

}

func (s *ETLJobTestV2Suite) TestClient_StartStopETLJobV2() {
err := s.createETLJobV2()
etlName := "test_start_stop_etl"
err := s.createETLJobV2(etlName)
s.Require().Nil(err)
for {
etljob, err := s.client.GetETL(s.projectName, s.etlName)
etljob, err := s.client.GetETL(s.projectName, etlName)
s.Require().Nil(err)
time.Sleep(10 * time.Second)
if etljob.Status == "RUNNING" {
break
}
}

err = s.client.StopETL(s.projectName, s.etlName)
err = s.client.StopETL(s.projectName, etlName)
for {
etljob, err := s.client.GetETL(s.projectName, s.etlName)
etljob, err := s.client.GetETL(s.projectName, etlName)
s.Require().Nil(err)
time.Sleep(10 * time.Second)
if etljob.Status == "STOPPED" {
break
}
}
err = s.client.StartETL(s.projectName, s.etlName)
err = s.client.StartETL(s.projectName, etlName)
for {
etljob, err := s.client.GetETL(s.projectName, etlName)
s.Require().Nil(err)
time.Sleep(10 * time.Second)
if etljob.Status == "RUNNING" {
break
}
}
err = s.client.DeleteETL(s.projectName, etlName)
s.Require().Nil(err)
}

func (s *ETLJobTestV2Suite) TestClient_RestartETLJobV2() {
etlName := "test_restart_etl"
err := s.createETLJobV2(etlName)
s.Require().Nil(err)
for {
etljob, err := s.client.GetETL(s.projectName, s.etlName)
etljob, err := s.client.GetETL(s.projectName, etlName)
s.Require().Nil(err)
time.Sleep(10 * time.Second)
if etljob.Status == "RUNNING" {
break
}
}

etljob, err := s.client.GetETL(s.projectName, etlName)
s.Require().Nil(err)
etljob.DisplayName = "update"
etljob.Description = "update description"
etljob.Configuration.Script = "e_set('update','update')"

err = s.client.RestartETL(s.projectName, *etljob)
s.Require().Nil(err)

for {
time.Sleep(10 * time.Second)
etljob, err := s.client.GetETL(s.projectName, etlName)
s.Require().Nil(err)
if etljob.Status == "RUNNING" {
break
}
}

etljob, err = s.client.GetETL(s.projectName, etlName)
s.Require().Nil(err)
s.Require().Equal("update", etljob.DisplayName)
s.Require().Equal("update description", etljob.Description)

err = s.client.DeleteETL(s.projectName, etlName)
s.Require().Nil(err)
}
9 changes: 9 additions & 0 deletions client_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,15 @@ type ClientInterface interface {
RemoveConfigFromMachineGroup(project string, confName, groupName string) (err error)

// #################### ETL Operations #####################
CreateETL(project string, etljob ETL) error
UpdateETL(project string, etljob ETL) error
GetETL(project string, etlName string) (ETLJob *ETL, err error)
ListETL(project string, offset int, size int) (*ListETLResponse, error)
DeleteETL(project string, etlName string) error
StartETL(project, name string) error
StopETL(project, name string) error
RestartETL(project string, etljob ETL) error

CreateEtlMeta(project string, etlMeta *EtlMeta) (err error)
UpdateEtlMeta(project string, etlMeta *EtlMeta) (err error)
DeleteEtlMeta(project string, etlMetaName, etlMetaKey string) (err error)
Expand Down
80 changes: 80 additions & 0 deletions token_auto_update_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,86 @@ func (c *TokenAutoUpdateClient) RemoveConfigFromMachineGroup(project string, con
return
}

func (c *TokenAutoUpdateClient) CreateETL(project string, etljob ETL) (err error) {
for i := 0; i < c.maxTryTimes; i++ {
err = c.logClient.CreateETL(project, etljob)
if !c.processError(err) {
return
}
}
return
}

func (c *TokenAutoUpdateClient) UpdateETL(project string, etljob ETL) (err error) {
for i := 0; i < c.maxTryTimes; i++ {
err = c.logClient.UpdateETL(project, etljob)
if !c.processError(err) {
return
}
}
return
}

func (c *TokenAutoUpdateClient) GetETL(project string, etlName string) (ETLJob *ETL, err error) {
for i := 0; i < c.maxTryTimes; i++ {
ETLJob, err = c.logClient.GetETL(project, etlName)
if !c.processError(err) {
return
}
}
return
}

func (c *TokenAutoUpdateClient) ListETL(project string, offset int, size int) (ETLResponse *ListETLResponse, err error) {
for i := 0; i < c.maxTryTimes; i++ {
ETLResponse, err = c.logClient.ListETL(project, offset, size)
if !c.processError(err) {
return
}
}
return
}

func (c *TokenAutoUpdateClient) DeleteETL(project string, etlName string) (err error) {
for i := 0; i < c.maxTryTimes; i++ {
err = c.logClient.DeleteETL(project, etlName)
if !c.processError(err) {
return
}
}
return
}

func (c *TokenAutoUpdateClient) StartETL(project string, name string) (err error) {
for i := 0; i < c.maxTryTimes; i++ {
err = c.logClient.StartETL(project, name)
if !c.processError(err) {
return
}
}
return
}

func (c *TokenAutoUpdateClient) StopETL(project string, name string) (err error) {
for i := 0; i < c.maxTryTimes; i++ {
err = c.logClient.StopETL(project, name)
if !c.processError(err) {
return
}
}
return
}

func (c *TokenAutoUpdateClient) RestartETL(project string, etljob ETL) (err error) {
for i := 0; i < c.maxTryTimes; i++ {
err = c.logClient.RestartETL(project, etljob)
if !c.processError(err) {
return
}
}
return
}

func (c *TokenAutoUpdateClient) CreateEtlMeta(project string, etlMeta *EtlMeta) (err error) {
for i := 0; i < c.maxTryTimes; i++ {
err = c.logClient.CreateEtlMeta(project, etlMeta)
Expand Down

0 comments on commit 503d91c

Please sign in to comment.