Skip to content

Commit

Permalink
Merge pull request #98 from aliyun/update_consumer
Browse files Browse the repository at this point in the history
1. fix consumer readonly shard checkpoint
2. add list saved search and dashboard display name
  • Loading branch information
shabicheng authored Oct 15, 2020
2 parents 821cae7 + c4820ae commit 736a691
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 1 deletion.
37 changes: 37 additions & 0 deletions client_alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ type SavedSearch struct {
DisplayName string `json:"displayName"`
}

type ResponseSavedSearchItem struct {
SavedSearchName string `json:"savedsearchName"`
DisplayName string `json:"displayName"`
}

const (
NotificationTypeSMS = "SMS"
NotificationTypeWebhook = "Webhook"
Expand Down Expand Up @@ -203,6 +208,38 @@ func (c *Client) ListSavedSearch(project string, savedSearchName string, offset,
return listSavedSearch.Savedsearches, listSavedSearch.Total, listSavedSearch.Count, err
}


func (c *Client) ListSavedSearchV2(project string, savedSearchName string, offset, size int) (savedSearches []string, savedsearchItems []ResponseSavedSearchItem, total int, count int, err error) {
h := map[string]string{
"x-log-bodyrawsize": "0",
"Content-Type": "application/json",
"savedsearchName": savedSearchName,
"offset": strconv.Itoa(offset),
"size": strconv.Itoa(size),
}

uri := "/savedsearches"
r, err := c.request(project, "GET", uri, h, nil)
if err != nil {
return nil, nil,0, 0, err
}
defer r.Body.Close()

type ListSavedSearch struct {
Total int `json:"total"`
Count int `json:"count"`
Savedsearches []string `json:"savedsearches"`
SavedsearchItems []ResponseSavedSearchItem `json:"savedsearchItems"`
}

buf, _ := ioutil.ReadAll(r.Body)
listSavedSearch := &ListSavedSearch{}
if err = json.Unmarshal(buf, listSavedSearch); err != nil {
err = NewClientError(err)
}
return listSavedSearch.Savedsearches, listSavedSearch.SavedsearchItems, listSavedSearch.Total, listSavedSearch.Count, err
}

func (c *Client) CreateAlert(project string, alert *Alert) error {
body, err := json.Marshal(alert)
if err != nil {
Expand Down
37 changes: 37 additions & 0 deletions client_dashboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ type Dashboard struct {
DisplayName string `json:"displayName"`
}

type ResponseDashboardItem struct {
DashboardName string `json:"dashboardName"`
DisplayName string `json:"displayName"`
}


func (c *Client) CreateChart(project, dashboardName string, chart Chart) error {
body, err := json.Marshal(chart)
if err != nil {
Expand Down Expand Up @@ -265,3 +271,34 @@ func (c *Client) ListDashboard(project string, dashboardName string, offset, siz
}
return dashboards.DashboardList, dashboards.Count, dashboards.Total, err
}

func (c *Client) ListDashboardV2(project string, dashboardName string, offset, size int) (dashboardList []string, dashboardItems []ResponseDashboardItem, count, total int, err error) {
h := map[string]string{
"x-log-bodyrawsize": "0",
"Content-Type": "application/json",
"dashboardName": dashboardName,
"offset": strconv.Itoa(offset),
"size": strconv.Itoa(size),
}
uri := "/dashboards"
r, err := c.request(project, "GET", uri, h, nil)
if err != nil {
return nil, nil,0, 0, err
}
defer r.Body.Close()

type ListDashboardResponse struct {
DashboardList []string `json:"dashboards"`
Total int `json:"total"`
Count int `json:"count"`
DashboardItems []ResponseDashboardItem `json:"dashboardItems"`
}

buf, _ := ioutil.ReadAll(r.Body)
fmt.Println(string(buf))
dashboards := &ListDashboardResponse{}
if err = json.Unmarshal(buf, dashboards); err != nil {
err = NewClientError(err)
}
return dashboards.DashboardList, dashboards.DashboardItems, dashboards.Count, dashboards.Total, err
}
3 changes: 3 additions & 0 deletions client_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func CreateTokenAutoUpdateClient(endpoint string, tokenUpdateFunc UpdateTokenFun
return tauc, nil
}


// ClientInterface for all log's open api
type ClientInterface interface {
// #################### Client Operations #####################
Expand Down Expand Up @@ -202,6 +203,7 @@ type ClientInterface interface {

// #################### Chart&Dashboard Operations #####################
ListDashboard(project string, dashboardName string, offset, size int) (dashboardList []string, count, total int, err error)
ListDashboardV2(project string, dashboardName string, offset, size int) (dashboardList []string, dashboardItems []ResponseDashboardItem, count, total int, err error)
GetDashboard(project, name string) (dashboard *Dashboard, err error)
GetDashboardString(project, name string) (dashboard string, err error)
DeleteDashboard(project, name string) error
Expand All @@ -220,6 +222,7 @@ type ClientInterface interface {
DeleteSavedSearch(project string, savedSearchName string) error
GetSavedSearch(project string, savedSearchName string) (*SavedSearch, error)
ListSavedSearch(project string, savedSearchName string, offset, size int) (savedSearches []string, total int, count int, err error)
ListSavedSearchV2(project string, savedSearchName string, offset, size int) (savedSearches []string, savedsearchItems []ResponseSavedSearchItem, total int, count int, err error)
CreateAlert(project string, alert *Alert) error
UpdateAlert(project string, alert *Alert) error
DeleteAlert(project string, alertName string) error
Expand Down
2 changes: 1 addition & 1 deletion consumer/shard_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (consumer *ShardConsumerWorker) consume() {
} else {
consumer.lastFetchTimeForForceFlushCpt = time.Now().Unix()
}
if consumer.lastFetchTimeForForceFlushCpt != 0 && time.Now().Unix()-consumer.lastFetchTimeForForceFlushCpt > 30 {
if time.Now().Unix()-consumer.lastFetchTimeForForceFlushCpt > 30 {
err := consumer.consumerCheckPointTracker.flushCheckPoint()
if err != nil {
level.Warn(consumer.logger).Log("msg", "Failed to save the final checkpoint", "error:", err)
Expand Down
21 changes: 21 additions & 0 deletions token_auto_update_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,17 @@ func (c *TokenAutoUpdateClient) ListDashboard(project string, dashboardName stri
}
return
}

func (c *TokenAutoUpdateClient) ListDashboardV2(project string, dashboardName string, offset, size int) (dashboardList []string, dashboardItems []ResponseDashboardItem, count, total int, err error) {
for i := 0; i < c.maxTryTimes; i++ {
dashboardList, dashboardItems, count, total, err = c.logClient.ListDashboardV2(project, dashboardName, offset, size)
if !c.processError(err) {
return
}
}
return
}

func (c *TokenAutoUpdateClient) GetDashboard(project, name string) (dashboard *Dashboard, err error) {
for i := 0; i < c.maxTryTimes; i++ {
dashboard, err = c.logClient.GetDashboard(project, name)
Expand Down Expand Up @@ -842,6 +853,16 @@ func (c *TokenAutoUpdateClient) ListSavedSearch(project string, savedSearchName
return
}

func (c *TokenAutoUpdateClient) ListSavedSearchV2(project string, savedSearchName string, offset, size int) (savedSearches []string, savedsearchItems []ResponseSavedSearchItem, total int, count int, err error) {
for i := 0; i < c.maxTryTimes; i++ {
savedSearches, savedsearchItems, total, count, err = c.logClient.ListSavedSearchV2(project, savedSearchName, offset, size)
if !c.processError(err) {
return
}
}
return
}

func (c *TokenAutoUpdateClient) CreateAlert(project string, alert *Alert) (err error) {
for i := 0; i < c.maxTryTimes; i++ {
err = c.logClient.CreateAlert(project, alert)
Expand Down

0 comments on commit 736a691

Please sign in to comment.