Skip to content

Commit

Permalink
feat: add network topology and probes storage structs (#2254)
Browse files Browse the repository at this point in the history
Signed-off-by: XZ <[email protected]>
  • Loading branch information
fcgxz2003 authored Apr 20, 2023
1 parent 905203e commit cf367fe
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 50 deletions.
6 changes: 1 addition & 5 deletions scheduler/networktopology/probes.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,6 @@ type Probes interface {
}

type probes struct {
// host stores host metadata.
host *resource.Host

// limit is the length limit of probe queue.
limit int

Expand All @@ -104,10 +101,9 @@ type probes struct {
}

// NewProbes creates a new probe list instance.
func NewProbes(limit int, host *resource.Host) Probes {
func NewProbes(limit int) Probes {
return &probes{
limit: limit,
host: host,
items: list.New(),
averageRTT: atomic.NewDuration(0),
createdAt: atomic.NewTime(time.Now()),
Expand Down
67 changes: 22 additions & 45 deletions scheduler/networktopology/probes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,27 +52,6 @@ var (
UpdatedAt: atomic.NewTime(time.Now()),
}

mockSeedHost = &resource.Host{
ID: idgen.HostIDV2("127.0.0.1", "hostname_seed"),
Type: types.HostTypeSuperSeed,
Hostname: "hostname_seed",
IP: "127.0.0.1",
Port: 8003,
DownloadPort: 8001,
OS: "darwin",
Platform: "darwin",
PlatformFamily: "Standalone Workstation",
PlatformVersion: "11.1",
KernelVersion: "20.2.0",
CPU: mockCPU,
Memory: mockMemory,
Network: mockNetwork,
Disk: mockDisk,
Build: mockBuild,
CreatedAt: atomic.NewTime(time.Now()),
UpdatedAt: atomic.NewTime(time.Now()),
}

mockCPU = resource.CPU{
LogicalCount: 4,
PhysicalCount: 2,
Expand Down Expand Up @@ -136,7 +115,6 @@ var (
}

mockProbes = &probes{
host: mockHost,
limit: mockQueueLength,
items: list.New(),
averageRTT: atomic.NewDuration(0),
Expand Down Expand Up @@ -179,7 +157,6 @@ func Test_NewProbes(t *testing.T) {
assert := assert.New(t)
probes := p.(*probes)
assert.Equal(probes.limit, mockQueueLength)
assert.EqualValues(probes.host, mockSeedHost)
assert.Equal(probes.items.Len(), 0)
assert.Equal(probes.averageRTT.Load().Nanoseconds(), int64(0))
assert.NotEqual(probes.createdAt.Load(), 0)
Expand All @@ -189,7 +166,7 @@ func Test_NewProbes(t *testing.T) {
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
tc.expect(t, NewProbes(mockQueueLength, mockSeedHost))
tc.expect(t, NewProbes(mockQueueLength))
})
}
}
Expand All @@ -203,7 +180,7 @@ func TestProbes_Peek(t *testing.T) {
}{
{
name: "queue has one probe",
probes: NewProbes(mockQueueLength, mockSeedHost),
probes: NewProbes(mockQueueLength),
mock: func(probes Probes) {
if err := probes.Enqueue(mockProbe); err != nil {
t.Fatal(err)
Expand All @@ -219,7 +196,7 @@ func TestProbes_Peek(t *testing.T) {
},
{
name: "queue has three probes",
probes: NewProbes(mockQueueLength, mockSeedHost),
probes: NewProbes(mockQueueLength),
mock: func(probes Probes) {
if err := probes.Enqueue(mockProbe); err != nil {
t.Fatal(err)
Expand All @@ -243,7 +220,7 @@ func TestProbes_Peek(t *testing.T) {
},
{
name: "queue has no probe",
probes: NewProbes(mockQueueLength, mockSeedHost),
probes: NewProbes(mockQueueLength),
mock: func(probes Probes) {},
expect: func(t *testing.T, p Probes) {
assert := assert.New(t)
Expand All @@ -270,7 +247,7 @@ func TestProbes_Enqueue(t *testing.T) {
}{
{
name: "enqueue probe",
probes: NewProbes(mockQueueLength, mockSeedHost),
probes: NewProbes(mockQueueLength),
mock: func(probes Probes) {
if err := probes.Enqueue(mockProbe); err != nil {
t.Fatal(err)
Expand All @@ -288,7 +265,7 @@ func TestProbes_Enqueue(t *testing.T) {
},
{
name: "enqueue five probes",
probes: NewProbes(mockQueueLength, mockSeedHost),
probes: NewProbes(mockQueueLength),
mock: func(probes Probes) {
if err := probes.Enqueue(mockProbe); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -322,7 +299,7 @@ func TestProbes_Enqueue(t *testing.T) {
},
{
name: "enqueue six probes",
probes: NewProbes(mockQueueLength, mockSeedHost),
probes: NewProbes(mockQueueLength),
mock: func(probes Probes) {
if err := probes.Enqueue(NewProbe(mockHost, 3400000*time.Nanosecond, time.Now())); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -376,7 +353,7 @@ func TestProbes_Dequeue(t *testing.T) {
}{
{
name: "dequeue probe",
probes: NewProbes(mockQueueLength, mockSeedHost),
probes: NewProbes(mockQueueLength),
mock: func(probes Probes) {
if err := probes.Enqueue(mockProbe); err != nil {
t.Fatal(err)
Expand All @@ -395,7 +372,7 @@ func TestProbes_Dequeue(t *testing.T) {
},
{
name: "dequeue probe from empty queue",
probes: NewProbes(mockQueueLength, mockSeedHost),
probes: NewProbes(mockQueueLength),
mock: func(probes Probes) {},
expect: func(t *testing.T, p Probes) {
assert := assert.New(t)
Expand All @@ -422,7 +399,7 @@ func TestProbes_Items(t *testing.T) {
}{
{
name: "queue has one probe",
probes: NewProbes(mockQueueLength, mockSeedHost),
probes: NewProbes(mockQueueLength),
mock: func(probes Probes) {
if err := probes.Enqueue(mockProbe); err != nil {
t.Fatal(err)
Expand All @@ -439,7 +416,7 @@ func TestProbes_Items(t *testing.T) {
},
{
name: "queue has three probe",
probes: NewProbes(mockQueueLength, mockSeedHost),
probes: NewProbes(mockQueueLength),
mock: func(probes Probes) {
if err := probes.Enqueue(mockProbe); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -468,7 +445,7 @@ func TestProbes_Items(t *testing.T) {
},
{
name: "queue is empty",
probes: NewProbes(mockQueueLength, mockSeedHost),
probes: NewProbes(mockQueueLength),
mock: func(probes Probes) {},
expect: func(t *testing.T, probes *list.List) {
assert := assert.New(t)
Expand All @@ -494,7 +471,7 @@ func TestProbes_Length(t *testing.T) {
}{
{
name: "queue has one probe",
probes: NewProbes(mockQueueLength, mockSeedHost),
probes: NewProbes(mockQueueLength),
mock: func(probes Probes) {
if err := probes.Enqueue(mockProbe); err != nil {
t.Fatal(err)
Expand All @@ -507,7 +484,7 @@ func TestProbes_Length(t *testing.T) {
},
{
name: "queue has three probe",
probes: NewProbes(mockQueueLength, mockSeedHost),
probes: NewProbes(mockQueueLength),
mock: func(probes Probes) {
if err := probes.Enqueue(mockProbe); err != nil {
t.Fatal(err)
Expand All @@ -528,7 +505,7 @@ func TestProbes_Length(t *testing.T) {
},
{
name: "queue is empty",
probes: NewProbes(mockQueueLength, mockSeedHost),
probes: NewProbes(mockQueueLength),
mock: func(probes Probes) {},
expect: func(t *testing.T, length int) {
assert := assert.New(t)
Expand Down Expand Up @@ -575,7 +552,7 @@ func TestProbes_UpdatedAt(t *testing.T) {
}{
{
name: "enqueue one probe",
probes: NewProbes(mockQueueLength, mockSeedHost),
probes: NewProbes(mockQueueLength),
mock: func(probes Probes) {
if err := probes.Enqueue(mockProbe); err != nil {
t.Fatal(err)
Expand All @@ -588,7 +565,7 @@ func TestProbes_UpdatedAt(t *testing.T) {
},
{
name: "enqueue three probe",
probes: NewProbes(mockQueueLength, mockSeedHost),
probes: NewProbes(mockQueueLength),
mock: func(probes Probes) {
if err := probes.Enqueue(NewProbe(mockHost, 1000000*time.Nanosecond, time.Now())); err != nil {
t.Fatal(err)
Expand All @@ -609,7 +586,7 @@ func TestProbes_UpdatedAt(t *testing.T) {
},
{
name: "queue is empty",
probes: NewProbes(mockQueueLength, mockSeedHost),
probes: NewProbes(mockQueueLength),
mock: func(probes Probes) {},
expect: func(t *testing.T, updatedAt time.Time) {
assert := assert.New(t)
Expand All @@ -635,7 +612,7 @@ func TestProbes_AverageRTT(t *testing.T) {
}{
{
name: "queue has one probe",
probes: NewProbes(mockQueueLength, mockSeedHost),
probes: NewProbes(mockQueueLength),
mock: func(probes Probes) {
if err := probes.Enqueue(mockProbe); err != nil {
t.Fatal(err)
Expand All @@ -648,7 +625,7 @@ func TestProbes_AverageRTT(t *testing.T) {
},
{
name: "queue has five probe",
probes: NewProbes(mockQueueLength, mockSeedHost),
probes: NewProbes(mockQueueLength),
mock: func(probes Probes) {
if err := probes.Enqueue(NewProbe(mockHost, 3000000*time.Nanosecond, time.Now())); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -677,7 +654,7 @@ func TestProbes_AverageRTT(t *testing.T) {
},
{
name: "queue has six probe",
probes: NewProbes(mockQueueLength, mockSeedHost),
probes: NewProbes(mockQueueLength),
mock: func(probes Probes) {
if err := probes.Enqueue(NewProbe(mockHost, 3400000*time.Nanosecond, time.Now())); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -709,7 +686,7 @@ func TestProbes_AverageRTT(t *testing.T) {
},
{
name: "queue is empty",
probes: NewProbes(mockQueueLength, mockSeedHost),
probes: NewProbes(mockQueueLength),
mock: func(probes Probes) {},
expect: func(t *testing.T, averageRTT time.Duration) {
assert := assert.New(t)
Expand Down
33 changes: 33 additions & 0 deletions scheduler/storage/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,3 +199,36 @@ type Download struct {
// UpdatedAt is peer update nanosecond time.
UpdatedAt int64 `csv:"updatedAt"`
}

// Probes contains content for probes.
type Probes struct {
// AverageRTT is the average round-trip time of probes.
AverageRTT int64 `csv:"averageRTT"`

// CreatedAt is probe create nanosecond time.
CreatedAt int64 `csv:"createdAt"`

// UpdatedAt is probe update nanosecond time.
UpdatedAt int64 `csv:"updatedAt"`
}

// DestHost contains content for destination host.
type DestHost struct {
// Host is probe destination host.
Host Host `csv:"host"`

// Probes is the network information probed to destination host.
Probes Probes `csv:"probes"`
}

// NetworkTopology contains content for network topology.
type NetworkTopology struct {
// ID is network topology id.
ID string `csv:"id"`

// Host is probe source host.
Host Host `csv:"host"`

// DestHosts is the destination hosts probed from source host.
DestHosts []DestHost `csv:"destHosts" csv[]:"10"`
}

0 comments on commit cf367fe

Please sign in to comment.