Skip to content

Commit

Permalink
some fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Novgorodov Igor, PMK-TV-OP authored and Novgorodov Igor, PMK-TV-OP committed Oct 29, 2019
1 parent e7a0de6 commit 2bc2eba
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 10 deletions.
2 changes: 2 additions & 0 deletions event.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type eventJSON struct {
}

var (
// List of valid fields in an event
// Others will be treated as attributes
eventJSONFields = map[string]bool{
"host": true,
"service": true,
Expand Down
2 changes: 1 addition & 1 deletion helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ var (
},
}

flatbufPoolSmall = sync.Pool{
flatbufPool = sync.Pool{
New: func() interface{} {
return fb.NewBuilder(524288)
},
Expand Down
35 changes: 27 additions & 8 deletions outputTgtConn.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,13 +228,24 @@ func (c *tConn) dispatch() {
}

case e = <-c.chanIn:
c.connMtx.Lock()
if c.httpCli == nil {
c.connMtx.Lock()
}

if err = c.push(e); err != nil {
c.Errorf("Unable to flush batch: %s", err)
// Requeue the event
c.chanIn <- e
// Try to requeue the event
select {
case c.chanIn <- e:
default:
}

time.Sleep(1 * time.Second)
}

if c.httpCli == nil {
c.connMtx.Unlock()
}
c.connMtx.Unlock()
}
}
}
Expand Down Expand Up @@ -276,9 +287,15 @@ func (c *tConn) periodicFlush() {
for {
select {
case <-tick.C:
c.connMtx.Lock()
if c.httpCli == nil {
c.connMtx.Lock()
}

c.tryFlush()
c.connMtx.Unlock()

if c.httpCli == nil {
c.connMtx.Unlock()
}

case <-c.ctx.Done():
return
Expand Down Expand Up @@ -330,6 +347,7 @@ func (c *tConn) close() {
c.wg.Wait()

if c.httpCli == nil {
c.Infof("Closing connection...")
c.disconnect()
}

Expand Down Expand Up @@ -404,6 +422,7 @@ func (c *tConn) writeBatchRiemann(batch []*rpb.Event) (err error) {

func (c *tConn) writeBatchClickhouse(batch []*rpb.Event) (err error) {
rr, wr := io.Pipe()

go func() {
defer wr.Close()
for _, e := range batch {
Expand Down Expand Up @@ -437,13 +456,13 @@ func (c *tConn) writeBatchClickhouse(batch []*rpb.Event) (err error) {
}

func (c *tConn) writeBatchFlatbuf(batch []*rpb.Event) (err error) {
fbb := flatbufPoolSmall.Get().(*fb.Builder)
fbb := flatbufPool.Get().(*fb.Builder)
req := fh.AcquireRequest()
resp := fh.AcquireResponse()

defer func() {
fbb.Reset()
flatbufPoolSmall.Put(fbb)
flatbufPool.Put(fbb)
fh.ReleaseRequest(req)
fh.ReleaseResponse(resp)
}()
Expand Down
2 changes: 1 addition & 1 deletion riemann-relay.dev.conf
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ outputs = [ "fb1" ]
[output.fb1]
type = "flatbuf"
algo = "broadcast"
buffer_size = 100000
buffer_size = 1000000
batch_size = 1024
batch_timeout = "5s"

Expand Down

0 comments on commit 2bc2eba

Please sign in to comment.