-
Notifications
You must be signed in to change notification settings - Fork 257
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
csv,grok,json,ngnix parser添加多线程 #709
Conversation
7fe4d33
to
cec6521
Compare
f1ff457
to
706b030
Compare
logkit.go
Outdated
@@ -239,8 +239,9 @@ func main() { | |||
times.AddLayout(conf.TimeLayouts) | |||
} | |||
if conf.MaxProcs == 0 { | |||
conf.MaxProcs = runtime.NumCPU() | |||
conf.MaxProcs = NumCpu |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/NumCpu/NumCPU/
parser/csv/csv.go
Outdated
@@ -39,6 +41,7 @@ type Parser struct { | |||
allmoreStartNUmber int | |||
allowNotMatch bool | |||
ignoreInvalid bool | |||
routineNumber int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
routineNumber 听起来像某个任务号,建议改叫 numRoutines,其它地方也是
parser/json/json.go
Outdated
@@ -9,6 +9,10 @@ import ( | |||
|
|||
"github.com/qiniu/log" | |||
|
|||
"sync" | |||
|
|||
"sort" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
分组搞一下
2f2d5a6
to
8911b8b
Compare
utils/models/models.go
Outdated
@@ -60,6 +61,11 @@ const ( | |||
InputNumber = "inputNumber" | |||
) | |||
|
|||
var ( | |||
MaxProcs = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里初始化设置为1
utils/utils.go
Outdated
@@ -10,3 +12,15 @@ func IsExist(path string) bool { | |||
_, err := os.Stat(path) | |||
return err == nil || os.IsExist(err) | |||
} | |||
|
|||
func GetTestData(line string) []string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个方法本身的含义是复制数据到默认的maxBatchSize,应该以函数本身的含义命名函数。然后这个函数主要用于测试,这一点可以写在注释上。
@@ -95,6 +98,10 @@ func NewParser(c conf.MapConf) (parser.Parser, error) { | |||
} | |||
allmoreStartNumber, _ := c.GetIntOr(parser.KeyCSVAllowMoreStartNum, 0) | |||
ignoreInvalid, _ := c.GetBoolOr(parser.KeyCSVIgnoreInvalidField, false) | |||
numRoutine := MaxProcs | |||
if numRoutine == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
理论上MaxProcs不可能是0,如果是0,那么代表哪里出bug了,那我建议这里的numRoutine也不用跑飞,只要设置为1保持稳妥就好了
parser/csv/csv.go
Outdated
@@ -451,6 +463,15 @@ func (p *Parser) Rename(datas []Data) []Data { | |||
return newData | |||
} | |||
|
|||
func (p *Parser) RenameData(data Data) Data { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
上面的Rename应该调用你这个RenameData方法。另外这个方法不用挂到(p *Parser)下面,还能方便一下测试
parser/json/json.go
Outdated
return dataSlice, nil | ||
} | ||
|
||
func (p *Parser) parseLine(sendChan chan parser.ParseInfo, resultChan chan parser.ParseResult, wg *sync.WaitGroup) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
你这个sendChan
应该叫 dataPipeline
,那一头是send,这一头实际上是receive,另外channel类型也可以设置为receive only : <-chan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
另外这里为什么要单独写一个,我看parser包里有一个
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个和别的parse不一样,这边返回的是数组,别的不是,json会返回数组
LGTM |
// label 不覆盖数据,其他parser不需要这么一步检验,因为Schema固定,json的Schema不固定 | ||
if _, ok := data[l.Name]; ok { | ||
continue | ||
func (p *Parser) parse(line string) (dataSlice []Data, err error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
为什么要返回数组
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
json可能是数组啊
5cde0ce
to
1f0b5d4
Compare
Unbuntu Parse Test GROK, data: 1GB ParseTime(ns) CSV, data: 1GB ParseTime(ns) NGNIX data: 1GB ParseTime(ns) JSON data: 1GB ParseTime(ns) |
93eadfd
to
8939a44
Compare
parser/csv/csv.go
Outdated
for resultInfo := range resultChan { | ||
parseResultSlice = append(parseResultSlice, resultInfo) | ||
} | ||
sort.Stable(parseResultSlice) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
所有sort的地方加个判断,如果 routine==1,不做这个sort
8752cdb
to
5cb09b8
Compare
parser/csv/csv.go
Outdated
for resultInfo := range resultChan { | ||
parseResultSlice = append(parseResultSlice, resultInfo) | ||
} | ||
if len(parseResultSlice) > 1 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个条件判断的不对
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
应该是判断Routine而不是slice,slice表达的是解析的这批数据量大小,单个Routine出来的slice可以很长,但是已经有序
parser/grok/grok.go
Outdated
if err != nil { | ||
|
||
if parseResult.Err != nil { | ||
log.Debug(parseResult.Err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个删掉吧
parser/json/json.go
Outdated
se.AddSuccess() | ||
|
||
if parseResult.Err != nil { | ||
log.Debug(parseResult.Err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个debug删掉吧
Line: parseInfo.Line, | ||
Index: parseInfo.Index, | ||
} | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
空行还需要传吗?是为了index吗
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
对, DatasourceSkipIndex 需要记录
LGTM |
@wonderflow
@unknwon