Skip to content

Commit

Permalink
Merge pull request #2736 from lucas-clemente/qlog-ndjson
Browse files Browse the repository at this point in the history
use the new, streaming-friendly NDJSON-based qlog encoding
  • Loading branch information
marten-seemann authored Dec 6, 2020
2 parents 5a0ce24 + dd93d96 commit a7ddb34
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 178 deletions.
25 changes: 5 additions & 20 deletions qlog/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,6 @@ import (

func milliseconds(dur time.Duration) float64 { return float64(dur.Nanoseconds()) / 1e6 }

var eventFields = [4]string{"relative_time", "category", "event", "data"}

type events []event

var _ gojay.MarshalerJSONArray = events{}

func (e events) IsNil() bool { return e == nil }

func (e events) MarshalJSONArray(enc *gojay.Encoder) {
for _, ev := range e {
enc.Array(ev)
}
}

type eventDetails interface {
Category() category
Name() string
Expand All @@ -40,14 +26,13 @@ type event struct {
eventDetails
}

var _ gojay.MarshalerJSONArray = event{}
var _ gojay.MarshalerJSONObject = event{}

func (e event) IsNil() bool { return false }
func (e event) MarshalJSONArray(enc *gojay.Encoder) {
enc.Float64(milliseconds(e.RelativeTime))
enc.String(e.Category().String())
enc.String(e.Name())
enc.Object(e.eventDetails)
func (e event) MarshalJSONObject(enc *gojay.Encoder) {
enc.Float64Key("time", milliseconds(e.RelativeTime))
enc.StringKey("name", e.Category().String()+":"+e.Name())
enc.ObjectKey("data", e.eventDetails)
}

type versions []versionNumber
Expand Down
27 changes: 9 additions & 18 deletions qlog/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,15 @@ var _ = Describe("Events", func() {
eventDetails: mevent{},
})).To(Succeed())

var decoded []interface{}
var decoded interface{}
Expect(json.Unmarshal(buf.Bytes(), &decoded)).To(Succeed())
Expect(decoded).To(HaveLen(4))

// 1st field
Expect(eventFields[0]).To(Equal("relative_time"))
Expect(decoded[0].(float64)).To(Equal(1.337))

// 2nd field
Expect(eventFields[1]).To(Equal("category"))
Expect(decoded[1].(string)).To(Equal(categoryConnectivity.String()))

// 3rd field
Expect(eventFields[2]).To(Equal("event"))
Expect(decoded[2].(string)).To(Equal("mevent"))

// 4th field
Expect(eventFields[3]).To(Equal("data"))
Expect(decoded[3].(map[string]interface{})["event"]).To(Equal("details"))
Expect(decoded).To(HaveLen(3))

Expect(decoded).To(HaveKeyWithValue("time", 1.337))
Expect(decoded).To(HaveKeyWithValue("name", "connectivity:mevent"))
Expect(decoded).To(HaveKey("data"))
data := decoded.(map[string]interface{})["data"].(map[string]interface{})
Expect(data).To(HaveLen(1))
Expect(data).To(HaveKeyWithValue("event", "details"))
})
})
35 changes: 14 additions & 21 deletions qlog/qlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ type connectionTracer struct {
perspective protocol.Perspective
referenceTime time.Time

suffix []byte
events chan event
encodeErr error
runStopped chan struct{}
Expand Down Expand Up @@ -79,39 +78,36 @@ func (t *connectionTracer) run() {
buf := &bytes.Buffer{}
enc := gojay.NewEncoder(buf)
tl := &topLevel{
traces: traces{
{
VantagePoint: vantagePoint{Type: t.perspective},
CommonFields: commonFields{
ODCID: connectionID(t.odcid),
GroupID: connectionID(t.odcid),
ReferenceTime: t.referenceTime,
},
EventFields: eventFields[:],
trace: trace{
VantagePoint: vantagePoint{Type: t.perspective},
CommonFields: commonFields{
ODCID: connectionID(t.odcid),
GroupID: connectionID(t.odcid),
ReferenceTime: t.referenceTime,
},
},
}
if err := enc.Encode(tl); err != nil {
panic(fmt.Sprintf("qlog encoding into a bytes.Buffer failed: %s", err))
}
data := buf.Bytes()
t.suffix = data[buf.Len()-4:]
if _, err := t.w.Write(data[:buf.Len()-4]); err != nil {
if err := buf.WriteByte('\n'); err != nil {
panic(fmt.Sprintf("qlog encoding into a bytes.Buffer failed: %s", err))
}
if _, err := t.w.Write(buf.Bytes()); err != nil {
t.encodeErr = err
}
enc = gojay.NewEncoder(t.w)
isFirst := true
for ev := range t.events {
if t.encodeErr != nil { // if encoding failed, just continue draining the event channel
continue
}
if !isFirst {
t.w.Write([]byte(","))
}
if err := enc.Encode(ev); err != nil {
t.encodeErr = err
continue
}
if _, err := t.w.Write([]byte{'\n'}); err != nil {
t.encodeErr = err
}
isFirst = false
}
}

Expand All @@ -128,9 +124,6 @@ func (t *connectionTracer) export() error {
if t.encodeErr != nil {
return t.encodeErr
}
if _, err := t.w.Write(t.suffix); err != nil {
return err
}
return t.w.Close()
}

Expand Down
Loading

0 comments on commit a7ddb34

Please sign in to comment.