Skip to content

Commit

Permalink
Merge pull request #211 from childe/json-include
Browse files Browse the repository at this point in the history
Json include
  • Loading branch information
childe authored Apr 10, 2023
2 parents 7a946f9 + 95d20c0 commit a6a1e08
Show file tree
Hide file tree
Showing 3 changed files with 229 additions and 28 deletions.
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -927,6 +927,8 @@ KV:
Json:
field: request
target: request_fields
include: ["client", "host"]
exclude: ["request", "user_agent"]
```
#### field
Expand All @@ -937,6 +939,14 @@ Json:
目标字段, 如果不设置, 则将Json Filter生成的所有字段写入到根一层.
#### include
只使用 include 中配置的字段,其他字段丢弃。优先级高于 exclude
#### exclude
exclude 中的字段不要。优先级低于 include
### LinkMetric
做简单的流式统计, 统计多个字段之间的聚合数据.
Expand Down
92 changes: 64 additions & 28 deletions filter/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,21 @@ import (
"github.com/golang/glog"
)

type JsonFilter struct {
// JSONFilter will parse json string in `field` and put the result into `target` field
type JSONFilter struct {
field string
target string
overwrite bool
include []string
exclude []string
}

func init() {
Register("Json", newJsonFilter)
Register("Json", newJSONFilter)
}

func newJsonFilter(config map[interface{}]interface{}) topology.Filter {
plugin := &JsonFilter{
func newJSONFilter(config map[interface{}]interface{}) topology.Filter {
plugin := &JSONFilter{
overwrite: true,
target: "",
}
Expand All @@ -39,43 +42,76 @@ func newJsonFilter(config map[interface{}]interface{}) topology.Filter {
plugin.target = target.(string)
}

if include, ok := config["include"]; ok {
plugin.include = include.([]string)
}
if exclude, ok := config["exclude"]; ok {
plugin.exclude = exclude.([]string)
}

return plugin
}

func (plugin *JsonFilter) Filter(event map[string]interface{}) (map[string]interface{}, bool) {
if s, ok := event[plugin.field]; ok {
if reflect.TypeOf(s).Kind() != reflect.String {
// Filter will parse json string in `field` and put the result into `target` field
func (plugin *JSONFilter) Filter(event map[string]interface{}) (map[string]interface{}, bool) {
s, ok := event[plugin.field]
if !ok {
return event, false
}

ss, ok := s.(string)
if !ok {
return event, false
}

var o interface{} = nil
d := json.NewDecoder(strings.NewReader(ss))
d.UseNumber()
err := d.Decode(&o)
if err != nil || o == nil {
return event, false
}

if len(plugin.include) > 0 {
oo := map[string]interface{}{}
if o, ok := o.(map[string]interface{}); ok {
for _, k := range plugin.include {
oo[k] = o[k]
}
} else {
glog.V(5).Infof("%s field is not map type, could not get `include` fields from it", plugin.field)
return event, false
}
var o interface{} = nil
d := json.NewDecoder(strings.NewReader(s.(string)))
d.UseNumber()
err := d.Decode(&o)
if err != nil || o == nil {
o = oo
} else if len(plugin.exclude) > 0 {
if o, ok := o.(map[string]interface{}); ok {
for _, k := range plugin.exclude {
delete(o, k)
}
} else {
glog.V(5).Infof("%s field is not map type, could not get `include` fields from it", plugin.field)
return event, false
}
}

if plugin.target == "" {
if reflect.TypeOf(o).Kind() != reflect.Map {
glog.V(5).Infof("%s field is not map type, `target` must be set in config file", plugin.field)
return event, false
if plugin.target == "" {
if reflect.TypeOf(o).Kind() != reflect.Map {
glog.V(5).Infof("%s field is not map type, `target` must be set in config file", plugin.field)
return event, false
}
if plugin.overwrite {
for k, v := range o.(map[string]interface{}) {
event[k] = v
}
if plugin.overwrite {
for k, v := range o.(map[string]interface{}) {
} else {
for k, v := range o.(map[string]interface{}) {
if _, ok := event[k]; !ok {
event[k] = v
}
} else {
for k, v := range o.(map[string]interface{}) {
if _, ok := event[k]; !ok {
event[k] = v
}
}
}
} else {
event[plugin.target] = o
}
return event, true
} else {
return event, false
event[plugin.target] = o
}
return event, true
}
155 changes: 155 additions & 0 deletions filter/json_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package filter

import (
"encoding/json"
"reflect"
"testing"
)

func TestJson(t *testing.T) {
type testCase struct {
event map[string]interface{}
config map[interface{}]interface{}
want map[string]interface{}
success bool
}

cases := []testCase{
{
map[string]interface{}{
"message": `{"a":1,"b":2}`,
"a": 10,
},
map[interface{}]interface{}{
"field": "message",
"overwrite": true,
},
map[string]interface{}{
"message": `{"a":1,"b":2}`,
"a": json.Number("1"),
"b": json.Number("2"),
},
true,
},
{
map[string]interface{}{
"message": `{"a":1,"b":2}`,
"a": 10,
},
map[interface{}]interface{}{
"field": "message",
"overwrite": false,
},
map[string]interface{}{
"message": `{"a":1,"b":2}`,
"a": 10,
"b": json.Number("2"),
},
true,
},
{
map[string]interface{}{
"message": `{"a":1,"b":2}`,
"a": 10,
},
map[interface{}]interface{}{
"field": "message",
"overwrite": false,
"target": "c",
},
map[string]interface{}{
"message": `{"a":1,"b":2}`,
"a": 10,
"c": map[string]interface{}{
"a": json.Number("1"),
"b": json.Number("2"),
},
},
true,
},
}

for _, c := range cases {
f := newJSONFilter(c.config)
got, ok := f.Filter(c.event)
if !reflect.DeepEqual(got, c.want) {
t.Errorf("config: %#v event: %v: want %#v, got %#v", c.config, c.event, c.want, got)
}

if ok != c.success {
t.Errorf("config: %#v event: %v: want %v, got %v", c.config, c.event, c.success, ok)
}
}
}

func TestIncludeExclude(t *testing.T) {
type testCase struct {
event map[string]interface{}
config map[interface{}]interface{}
want map[string]interface{}
success bool
}

cases := []testCase{
{
map[string]interface{}{
"message": `{"a":1,"b":2, "c": 3}`,
},
map[interface{}]interface{}{
"field": "message",
"overwrite": true,
"include": []string{"a", "b"},
},
map[string]interface{}{
"message": `{"a":1,"b":2, "c": 3}`,
"a": json.Number("1"),
"b": json.Number("2"),
},
true,
},
{
map[string]interface{}{
"message": `{"a":1,"b":2, "c": 3}`,
},
map[interface{}]interface{}{
"field": "message",
"overwrite": true,
"exclude": []string{"a", "b"},
},
map[string]interface{}{
"message": `{"a":1,"b":2, "c": 3}`,
"c": json.Number("3"),
},
true,
},
{
map[string]interface{}{
"message": `{"a":1,"b":2, "c": 3}`,
},
map[interface{}]interface{}{
"field": "message",
"overwrite": true,
"include": []string{"a", "b"},
"exclude": []string{"a", "b"},
},
map[string]interface{}{
"message": `{"a":1,"b":2, "c": 3}`,
"a": json.Number("1"),
"b": json.Number("2"),
},
true,
},
}

for _, c := range cases {
f := newJSONFilter(c.config)
got, ok := f.Filter(c.event)
if !reflect.DeepEqual(got, c.want) {
t.Errorf("config: %#v event: %v: want %#v, got %#v", c.config, c.event, c.want, got)
}

if ok != c.success {
t.Errorf("config: %#v event: %v: want %v, got %v", c.config, c.event, c.success, ok)
}
}
}

0 comments on commit a6a1e08

Please sign in to comment.