Skip to content

Commit

Permalink
Move Adjacency info inside the NodeMetadata struct; move pseudo node …
Browse files Browse the repository at this point in the history
…generation to the probes.
  • Loading branch information
Tom Wilkie committed Aug 28, 2015
1 parent 9edd507 commit 443e92c
Show file tree
Hide file tree
Showing 24 changed files with 369 additions and 584 deletions.
2 changes: 0 additions & 2 deletions probe/docker/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ var (
func TestReporter(t *testing.T) {
want := report.MakeReport()
want.Container = report.Topology{
Adjacency: report.Adjacency{},
EdgeMetadatas: report.EdgeMetadatas{},
NodeMetadatas: report.NodeMetadatas{
report.MakeContainerNodeID("", "ping"): report.MakeNodeMetadataWith(map[string]string{
Expand All @@ -61,7 +60,6 @@ func TestReporter(t *testing.T) {
},
}
want.ContainerImage = report.Topology{
Adjacency: report.Adjacency{},
EdgeMetadatas: report.EdgeMetadatas{},
NodeMetadatas: report.NodeMetadatas{
report.MakeContainerNodeID("", "baz"): report.MakeNodeMetadataWith(map[string]string{
Expand Down
78 changes: 40 additions & 38 deletions probe/endpoint/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,78 +125,80 @@ func (r *Reporter) Report() (report.Report, error) {
}

func (r *Reporter) addConnection(rpt *report.Report, localAddr, remoteAddr string, localPort, remotePort uint16, proc *procspy.Proc) {
localIsClient := int(localPort) > int(remotePort)
var (
localIsClient = int(localPort) > int(remotePort)
hostNodeID = report.MakeHostNodeID(r.hostID)
addNode = func(t report.Topology, nodeID string, nmd report.NodeMetadata) {
if existing, ok := t.NodeMetadatas[nodeID]; ok {
nmd = nmd.Merge(existing)
}
t.NodeMetadatas[nodeID] = nmd
}
)

// Update address topology
{
var (
localAddressNodeID = report.MakeAddressNodeID(r.hostID, localAddr)
remoteAddressNodeID = report.MakeAddressNodeID(r.hostID, remoteAddr)
adjacencyID = ""
edgeID = ""

localNode = report.MakeNodeMetadataWith(map[string]string{
"name": r.hostName,
Addr: localAddr,
report.HostNodeID: hostNodeID,
})
remoteNode = report.MakeNodeMetadataWith(map[string]string{
Addr: remoteAddr,
})
)

if localIsClient {
adjacencyID = report.MakeAdjacencyID(localAddressNodeID)
rpt.Address.Adjacency[adjacencyID] = rpt.Address.Adjacency[adjacencyID].Add(remoteAddressNodeID)

localNode.Adjacency = localNode.Adjacency.Add(remoteAddressNodeID)
edgeID = report.MakeEdgeID(localAddressNodeID, remoteAddressNodeID)
} else {
adjacencyID = report.MakeAdjacencyID(remoteAddressNodeID)
rpt.Address.Adjacency[adjacencyID] = rpt.Address.Adjacency[adjacencyID].Add(localAddressNodeID)

remoteNode.Adjacency = localNode.Adjacency.Add(localAddressNodeID)
edgeID = report.MakeEdgeID(remoteAddressNodeID, localAddressNodeID)
}

addNode(rpt.Address, localAddressNodeID, localNode)
addNode(rpt.Address, remoteAddressNodeID, remoteNode)
countTCPConnection(rpt.Address.EdgeMetadatas, edgeID)

if _, ok := rpt.Address.NodeMetadatas[localAddressNodeID]; !ok {
rpt.Address.NodeMetadatas[localAddressNodeID] = report.MakeNodeMetadataWith(map[string]string{
"name": r.hostName,
Addr: localAddr,
})
}
}

// Update endpoint topology
if r.includeProcesses {
var (
localEndpointNodeID = report.MakeEndpointNodeID(r.hostID, localAddr, strconv.Itoa(int(localPort)))
remoteEndpointNodeID = report.MakeEndpointNodeID(r.hostID, remoteAddr, strconv.Itoa(int(remotePort)))
adjacencyID = ""
edgeID = ""

localNode = report.MakeNodeMetadataWith(map[string]string{
Addr: localAddr,
Port: strconv.Itoa(int(localPort)),
report.HostNodeID: hostNodeID,
})
remoteNode = report.MakeNodeMetadataWith(map[string]string{
Addr: remoteAddr,
Port: strconv.Itoa(int(remotePort)),
})
)

if localIsClient {
adjacencyID = report.MakeAdjacencyID(localEndpointNodeID)
rpt.Endpoint.Adjacency[adjacencyID] = rpt.Endpoint.Adjacency[adjacencyID].Add(remoteEndpointNodeID)

localNode.Adjacency = localNode.Adjacency.Add(remoteEndpointNodeID)
edgeID = report.MakeEdgeID(localEndpointNodeID, remoteEndpointNodeID)
} else {
adjacencyID = report.MakeAdjacencyID(remoteEndpointNodeID)
rpt.Endpoint.Adjacency[adjacencyID] = rpt.Endpoint.Adjacency[adjacencyID].Add(localEndpointNodeID)

remoteNode.Adjacency = remoteNode.Adjacency.Add(localEndpointNodeID)
edgeID = report.MakeEdgeID(remoteEndpointNodeID, localEndpointNodeID)
}

countTCPConnection(rpt.Endpoint.EdgeMetadatas, edgeID)

md, ok := rpt.Endpoint.NodeMetadatas[localEndpointNodeID]
updated := !ok
if !ok {
md = report.MakeNodeMetadataWith(map[string]string{
Addr: localAddr,
Port: strconv.Itoa(int(localPort)),
})
}
if proc != nil && proc.PID > 0 {
pid := strconv.FormatUint(uint64(proc.PID), 10)
updated = updated || md.Metadata[process.PID] != pid
md.Metadata[process.PID] = pid
}
if updated {
rpt.Endpoint.NodeMetadatas[localEndpointNodeID] = md
localNode.Metadata[process.PID] = strconv.FormatUint(uint64(proc.PID), 10)
}

addNode(rpt.Endpoint, localEndpointNodeID, localNode)
addNode(rpt.Endpoint, remoteEndpointNodeID, remoteNode)
countTCPConnection(rpt.Endpoint.EdgeMetadatas, edgeID)
}
}

Expand Down
18 changes: 8 additions & 10 deletions probe/endpoint/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,25 +77,24 @@ func TestSpyNoProcesses(t *testing.T) {
//t.Logf("\n%s\n", buf)

// No process nodes, please
if want, have := 0, len(r.Endpoint.Adjacency); want != have {
if want, have := 0, len(r.Endpoint.NodeMetadatas); want != have {
t.Fatalf("want %d, have %d", want, have)
}

var (
scopedLocal = report.MakeAddressNodeID(nodeID, fixLocalAddress.String())
scopedRemote = report.MakeAddressNodeID(nodeID, fixRemoteAddress.String())
remoteKey = report.MakeAdjacencyID(scopedRemote)
)

if want, have := 1, len(r.Address.Adjacency[remoteKey]); want != have {
t.Fatalf("want %d, have %d", want, have)
if want, have := nodeName, r.Address.NodeMetadatas[scopedLocal].Metadata[docker.Name]; want != have {
t.Fatalf("want %q, have %q", want, have)
}

if want, have := scopedLocal, r.Address.Adjacency[remoteKey][0]; want != have {
t.Fatalf("want %q, have %q", want, have)
if want, have := 1, len(r.Address.NodeMetadatas[scopedRemote].Adjacency); want != have {
t.Fatalf("want %d, have %d", want, have)
}

if want, have := nodeName, r.Address.NodeMetadatas[scopedLocal].Metadata[docker.Name]; want != have {
if want, have := scopedLocal, r.Address.NodeMetadatas[scopedRemote].Adjacency[0]; want != have {
t.Fatalf("want %q, have %q", want, have)
}
}
Expand All @@ -115,14 +114,13 @@ func TestSpyWithProcesses(t *testing.T) {
var (
scopedLocal = report.MakeEndpointNodeID(nodeID, fixLocalAddress.String(), strconv.Itoa(int(fixLocalPort)))
scopedRemote = report.MakeEndpointNodeID(nodeID, fixRemoteAddress.String(), strconv.Itoa(int(fixRemotePort)))
remoteKey = report.MakeAdjacencyID(scopedRemote)
)

if want, have := 1, len(r.Endpoint.Adjacency[remoteKey]); want != have {
if want, have := 1, len(r.Endpoint.NodeMetadatas[scopedRemote].Adjacency); want != have {
t.Fatalf("want %d, have %d", want, have)
}

if want, have := scopedLocal, r.Endpoint.Adjacency[remoteKey][0]; want != have {
if want, have := scopedLocal, r.Endpoint.NodeMetadatas[scopedRemote].Adjacency[0]; want != have {
t.Fatalf("want %q, have %q", want, have)
}

Expand Down
5 changes: 4 additions & 1 deletion probe/host/tagger.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ func NewTagger(hostID string) Tagger {
// Tag implements Tagger.
func (t Tagger) Tag(r report.Report) (report.Report, error) {
other := report.MakeNodeMetadataWith(map[string]string{report.HostNodeID: t.hostNodeID})
for _, topology := range r.Topologies() {

// Explicity don't tag Endpoints and Addresses - These topologies include pseudo nodes,
// and as such do their own host tagging
for _, topology := range []report.Topology{r.Process, r.Container, r.ContainerImage, r.Host, r.Overlay} {
for id, md := range topology.NodeMetadatas {
topology.NodeMetadatas[id] = md.Merge(other)
}
Expand Down
4 changes: 2 additions & 2 deletions probe/host/tagger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ func TestTagger(t *testing.T) {
)

r := report.MakeReport()
r.Endpoint.NodeMetadatas[endpointNodeID] = nodeMetadata
r.Process.NodeMetadatas[endpointNodeID] = nodeMetadata
want := nodeMetadata.Merge(report.MakeNodeMetadataWith(map[string]string{
report.HostNodeID: report.MakeHostNodeID(hostID),
}))
rpt, _ := host.NewTagger(hostID).Tag(r)
have := rpt.Endpoint.NodeMetadatas[endpointNodeID].Copy()
have := rpt.Process.NodeMetadatas[endpointNodeID].Copy()
if !reflect.DeepEqual(want, have) {
t.Error(test.Diff(want, have))
}
Expand Down
1 change: 0 additions & 1 deletion probe/overlay/weave_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ func TestWeaveTaggerOverlayTopology(t *testing.T) {
t.Fatal(err)
}
if want, have := (report.Topology{
Adjacency: report.Adjacency{},
EdgeMetadatas: report.EdgeMetadatas{},
NodeMetadatas: report.NodeMetadatas{
report.MakeOverlayNodeID(mockWeavePeerName): report.MakeNodeMetadataWith(map[string]string{
Expand Down
1 change: 0 additions & 1 deletion probe/process/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ func TestReporter(t *testing.T) {
reporter := process.NewReporter(walker, "")
want := report.MakeReport()
want.Process = report.Topology{
Adjacency: report.Adjacency{},
EdgeMetadatas: report.EdgeMetadatas{},
NodeMetadatas: report.NodeMetadatas{
report.MakeProcessNodeID("", "1"): report.MakeNodeMetadataWith(map[string]string{
Expand Down
41 changes: 21 additions & 20 deletions probe/sniff/sniffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,21 +250,30 @@ func (s *Sniffer) Merge(p Packet, rpt report.Report) {
return
}

addAdjacency := func(t report.Topology, srcNodeID, dstNodeID string) {
srcNode, ok := t.NodeMetadatas[srcNodeID]
if !ok {
srcNode = report.MakeNodeMetadata()
}
srcNode.Adjacency = srcNode.Adjacency.Add(dstNodeID)
t.NodeMetadatas[srcNodeID] = srcNode

if _, ok := t.NodeMetadatas[dstNodeID]; !ok {
t.NodeMetadatas[dstNodeID] = report.MakeNodeMetadata()
}
}

// For sure, we can add to the address topology.
{
var (
srcNodeID = report.MakeAddressNodeID(s.hostID, localIP)
dstNodeID = report.MakeAddressNodeID(s.hostID, remoteIP)
edgeID = report.MakeEdgeID(srcNodeID, dstNodeID)
srcAdjacencyID = report.MakeAdjacencyID(srcNodeID)
srcNodeID = report.MakeAddressNodeID(s.hostID, localIP)
dstNodeID = report.MakeAddressNodeID(s.hostID, remoteIP)
edgeID = report.MakeEdgeID(srcNodeID, dstNodeID)
)

if _, ok := rpt.Address.NodeMetadatas[srcNodeID]; !ok {
rpt.Address.NodeMetadatas[srcNodeID] = report.MakeNodeMetadata()
}
addAdjacency(rpt.Address, srcNodeID, dstNodeID)

emd := rpt.Address.EdgeMetadatas[edgeID]

if egress {
if emd.EgressPacketCount == nil {
emd.EgressPacketCount = new(uint64)
Expand All @@ -284,26 +293,20 @@ func (s *Sniffer) Merge(p Packet, rpt report.Report) {
}
*emd.IngressByteCount += uint64(p.Network)
}

rpt.Address.EdgeMetadatas[edgeID] = emd
rpt.Address.Adjacency[srcAdjacencyID] = rpt.Address.Adjacency[srcAdjacencyID].Add(dstNodeID)
}

// If we have ports, we can add to the endpoint topology, too.
if p.SrcPort != "" && p.DstPort != "" {
var (
srcNodeID = report.MakeEndpointNodeID(s.hostID, localIP, localPort)
dstNodeID = report.MakeEndpointNodeID(s.hostID, remoteIP, remotePort)
edgeID = report.MakeEdgeID(srcNodeID, dstNodeID)
srcAdjacencyID = report.MakeAdjacencyID(srcNodeID)
srcNodeID = report.MakeEndpointNodeID(s.hostID, localIP, localPort)
dstNodeID = report.MakeEndpointNodeID(s.hostID, remoteIP, remotePort)
edgeID = report.MakeEdgeID(srcNodeID, dstNodeID)
)

if _, ok := rpt.Endpoint.NodeMetadatas[srcNodeID]; !ok {
rpt.Endpoint.NodeMetadatas[srcNodeID] = report.MakeNodeMetadata()
}
addAdjacency(rpt.Endpoint, srcNodeID, dstNodeID)

emd := rpt.Endpoint.EdgeMetadatas[edgeID]

if egress {
if emd.EgressPacketCount == nil {
emd.EgressPacketCount = new(uint64)
Expand All @@ -323,8 +326,6 @@ func (s *Sniffer) Merge(p Packet, rpt report.Report) {
}
*emd.IngressByteCount += uint64(p.Transport)
}

rpt.Endpoint.EdgeMetadatas[edgeID] = emd
rpt.Endpoint.Adjacency[srcAdjacencyID] = rpt.Endpoint.Adjacency[srcAdjacencyID].Add(dstNodeID)
}
}
32 changes: 20 additions & 12 deletions probe/sniff/sniffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,19 +65,23 @@ func TestMerge(t *testing.T) {
dstEndpointNodeID = report.MakeEndpointNodeID(hostID, p.DstIP, p.DstPort)
)
if want, have := (report.Topology{
Adjacency: report.Adjacency{
report.MakeAdjacencyID(srcEndpointNodeID): report.MakeIDList(
dstEndpointNodeID,
),
},
EdgeMetadatas: report.EdgeMetadatas{
report.MakeEdgeID(srcEndpointNodeID, dstEndpointNodeID): report.EdgeMetadata{
EgressPacketCount: newu64(1),
EgressByteCount: newu64(256),
},
},
NodeMetadatas: report.NodeMetadatas{
srcEndpointNodeID: report.MakeNodeMetadata(),
srcEndpointNodeID: {
Metadata: map[string]string{},
Counters: map[string]int{},
Adjacency: report.MakeIDList(dstEndpointNodeID),
},
dstEndpointNodeID: report.NodeMetadata{
Metadata: map[string]string{},
Counters: map[string]int{},
Adjacency: report.MakeIDList(),
},
},
}), rpt.Endpoint; !reflect.DeepEqual(want, have) {
t.Errorf("%s", test.Diff(want, have))
Expand All @@ -88,19 +92,23 @@ func TestMerge(t *testing.T) {
dstAddressNodeID = report.MakeAddressNodeID(hostID, p.DstIP)
)
if want, have := (report.Topology{
Adjacency: report.Adjacency{
report.MakeAdjacencyID(srcAddressNodeID): report.MakeIDList(
dstAddressNodeID,
),
},
EdgeMetadatas: report.EdgeMetadatas{
report.MakeEdgeID(srcAddressNodeID, dstAddressNodeID): report.EdgeMetadata{
EgressPacketCount: newu64(1),
EgressByteCount: newu64(512),
},
},
NodeMetadatas: report.NodeMetadatas{
srcAddressNodeID: report.MakeNodeMetadata(),
srcAddressNodeID: report.NodeMetadata{
Metadata: map[string]string{},
Counters: map[string]int{},
Adjacency: report.MakeIDList(dstAddressNodeID),
},
dstAddressNodeID: report.NodeMetadata{
Metadata: map[string]string{},
Counters: map[string]int{},
Adjacency: report.MakeIDList(),
},
},
}), rpt.Address; !reflect.DeepEqual(want, have) {
t.Errorf("%s", test.Diff(want, have))
Expand Down
12 changes: 4 additions & 8 deletions render/detailed_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,7 @@ func connectionDetailsRows(topology report.Topology, originID string) []Row {
return rows
}
// Firstly, collection outgoing connections from this node.
originAdjID := report.MakeAdjacencyID(originID)
for _, serverNodeID := range topology.Adjacency[originAdjID] {
for _, serverNodeID := range topology.NodeMetadatas[originID].Adjacency {
remote, ok := labeler(serverNodeID)
if !ok {
continue
Expand All @@ -232,17 +231,14 @@ func connectionDetailsRows(topology report.Topology, originID string) []Row {
})
}
// Next, scan the topology for incoming connections to this node.
for clientAdjID, serverNodeIDs := range topology.Adjacency {
if clientAdjID == originAdjID {
for clientNodeID, clientNodeMetadata := range topology.NodeMetadatas {
if clientNodeID == originID {
continue
}
serverNodeIDs := clientNodeMetadata.Adjacency
if !serverNodeIDs.Contains(originID) {
continue
}
clientNodeID, ok := report.ParseAdjacencyID(clientAdjID)
if !ok {
continue
}
remote, ok := labeler(clientNodeID)
if !ok {
continue
Expand Down
Loading

0 comments on commit 443e92c

Please sign in to comment.