diff --git a/go.mod b/go.mod index 1bca0d2ae7c..b15f4e4cd20 100644 --- a/go.mod +++ b/go.mod @@ -37,6 +37,8 @@ require ( github.com/go-http-utils/headers v0.0.0-20181008091004-fed159eddc2a github.com/go-playground/validator/v10 v10.23.0 github.com/go-redis/cache/v9 v9.0.0 + github.com/go-redis/redis v6.15.9+incompatible + github.com/go-redis/redismock/v9 v9.2.0 github.com/go-redsync/redsync/v4 v4.13.0 github.com/go-sql-driver/mysql v1.7.0 github.com/gofrs/flock v0.8.1 diff --git a/go.sum b/go.sum index cfaff65459d..c0f5e38c309 100644 --- a/go.sum +++ b/go.sum @@ -631,6 +631,8 @@ github.com/go-redis/redis/v7 v7.4.1/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRf github.com/go-redis/redis/v8 v8.11.4/go.mod h1:2Z2wHZXdQpCDXEGzqMockDpNyYvi2l4Pxt6RJr792+w= github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= +github.com/go-redis/redismock/v9 v9.2.0 h1:ZrMYQeKPECZPjOj5u9eyOjg8Nnb0BS9lkVIZ6IpsKLw= +github.com/go-redis/redismock/v9 v9.2.0/go.mod h1:18KHfGDK4Y6c2R0H38EUGWAdc7ZQS9gfYxc94k7rWT0= github.com/go-redsync/redsync/v4 v4.8.1/go.mod h1:LmUAsQuQxhzZAoGY7JS6+dNhNmZyonMZiiEDY9plotM= github.com/go-redsync/redsync/v4 v4.13.0 h1:49X6GJfnbLGaIpBBREM/zA4uIMDXKAh1NDkvQ1EkZKA= github.com/go-redsync/redsync/v4 v4.13.0/go.mod h1:HMW4Q224GZQz6x1Xc7040Yfgacukdzu7ifTDAKiyErQ= diff --git a/scheduler/resource/persistentcache/host_test.go b/scheduler/resource/persistentcache/host_test.go new file mode 100644 index 00000000000..41f1f52c98a --- /dev/null +++ b/scheduler/resource/persistentcache/host_test.go @@ -0,0 +1,273 @@ +/* + * Copyright 2025 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package persistentcache + +import ( + "testing" + "time" + + logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/pkg/idgen" + "d7y.io/dragonfly/v2/pkg/types" + "github.com/stretchr/testify/assert" +) + +var ( + mockRawHost = Host{ + ID: mockHostID, + Type: types.HostTypeNormal, + Hostname: "foo", + IP: "127.0.0.1", + Port: 8003, + DownloadPort: 8001, + OS: "darwin", + Platform: "darwin", + PlatformFamily: "Standalone Workstation", + PlatformVersion: "11.1", + KernelVersion: "20.2.0", + CPU: mockCPU, + Memory: mockMemory, + Network: mockNetwork, + Disk: mockDisk, + Build: mockBuild, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + Log: logger.With("host", "foo"), + } + + mockRawSeedHost = Host{ + ID: mockSeedHostID, + Type: types.HostTypeSuperSeed, + Hostname: "bar", + IP: "127.0.0.1", + Port: 8003, + DownloadPort: 8001, + OS: "darwin", + Platform: "darwin", + PlatformFamily: "Standalone Workstation", + PlatformVersion: "11.1", + KernelVersion: "20.2.0", + CPU: mockCPU, + Memory: mockMemory, + Network: mockNetwork, + Disk: mockDisk, + Build: mockBuild, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + Log: logger.With("host", "foo"), + } + + mockCPU = CPU{ + LogicalCount: 4, + PhysicalCount: 2, + Percent: 1, + ProcessPercent: 0.5, + Times: CPUTimes{ + User: 240662.2, + System: 317950.1, + Idle: 3393691.3, + Nice: 0, + Iowait: 0, + Irq: 0, + Softirq: 0, + Steal: 0, + Guest: 0, + GuestNice: 0, + }, + } + + mockMemory = Memory{ + Total: 17179869184, + Available: 5962813440, + Used: 11217055744, + UsedPercent: 65.291858, + ProcessUsedPercent: 41.525125, + Free: 2749598908, + } + + mockNetwork = Network{ + TCPConnectionCount: 10, + UploadTCPConnectionCount: 1, + Location: mockHostLocation, + IDC: mockHostIDC, + DownloadRate: 100, + DownloadRateLimit: 200, + UploadRate: 100, + UploadRateLimit: 200, + } + + mockDisk = Disk{ + Total: 499963174912, + Free: 37226479616, + Used: 423809622016, + UsedPercent: 91.92547406065952, + InodesTotal: 4882452880, + InodesUsed: 7835772, + InodesFree: 4874617108, + InodesUsedPercent: 0.1604884305611568, + } + + mockBuild = Build{ + GitVersion: "v1.0.0", + GitCommit: "221176b117c6d59366d68f2b34d38be50c935883", + GoVersion: "1.18", + Platform: "darwin", + } + + mockAnnounceInterval = 5 * time.Minute + + mockHostID = idgen.HostIDV2("127.0.0.1", "foo", false) + mockSeedHostID = idgen.HostIDV2("127.0.0.1", "bar", true) + mockHostLocation = "baz" + mockHostIDC = "bas" +) + +func TestNewHost(t *testing.T) { + tests := []struct { + name string + id string + hostname string + ip string + os string + platform string + platformFamily string + platformVersion string + kernelVersion string + port int32 + downloadPort int32 + schedulerClusterId uint64 + disableShared bool + typ types.HostType + cpu CPU + memory Memory + network Network + disk Disk + build Build + announceInterval time.Duration + createdAt time.Time + updatedAt time.Time + log *logger.SugaredLoggerOnWith + }{ + { + name: "new host", + id: "test-id", + hostname: "test-host", + ip: "127.0.0.1", + os: "linux", + platform: "amd64", + platformFamily: "debian", + platformVersion: "11", + kernelVersion: "5.10.0", + port: 8002, + downloadPort: 8001, + schedulerClusterId: 1, + disableShared: false, + typ: types.HostTypeNormal, + cpu: CPU{ + LogicalCount: 4, + PhysicalCount: 2, + Percent: 50.0, + ProcessPercent: 25.0, + Times: CPUTimes{ + User: 100.0, + System: 50.0, + Idle: 200.0, + Nice: 10.0, + Iowait: 5.0, + Irq: 1.0, + Softirq: 2.0, + Steal: 0.0, + Guest: 0.0, + GuestNice: 0.0, + }, + }, + memory: Memory{ + Total: 16 * 1024 * 1024 * 1024, + Available: 8 * 1024 * 1024 * 1024, + Used: 8 * 1024 * 1024 * 1024, + UsedPercent: 50.0, + ProcessUsedPercent: 25.0, + Free: 4 * 1024 * 1024 * 1024, + }, + network: Network{ + TCPConnectionCount: 100, + UploadTCPConnectionCount: 50, + Location: "us-west", + IDC: "test-idc", + DownloadRate: 1024 * 1024, + DownloadRateLimit: 2 * 1024 * 1024, + UploadRate: 512 * 1024, + UploadRateLimit: 1024 * 1024, + }, + disk: Disk{ + Total: 1000 * 1024 * 1024 * 1024, + Free: 500 * 1024 * 1024 * 1024, + Used: 500 * 1024 * 1024 * 1024, + UsedPercent: 50.0, + InodesTotal: 1000000, + InodesUsed: 500000, + InodesFree: 500000, + InodesUsedPercent: 50.0, + WriteBandwidth: 100 * 1024 * 1024, + ReadBandwidth: 200 * 1024 * 1024, + }, + build: Build{ + GitVersion: "v1.0.0", + GitCommit: "abc123", + GoVersion: "go1.17", + RustVersion: "1.57", + Platform: "linux/amd64", + }, + announceInterval: 30 * time.Second, + createdAt: time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC), + updatedAt: time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC), + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + got := NewHost( + tc.id, tc.hostname, tc.ip, tc.os, tc.platform, tc.platformFamily, tc.platformVersion, + tc.kernelVersion, tc.port, tc.downloadPort, tc.schedulerClusterId, tc.disableShared, + tc.typ, tc.cpu, tc.memory, tc.network, tc.disk, tc.build, tc.announceInterval, + tc.createdAt, tc.updatedAt, tc.log, + ) + + assert.Equal(t, tc.id, got.ID) + assert.Equal(t, tc.hostname, got.Hostname) + assert.Equal(t, tc.ip, got.IP) + assert.Equal(t, tc.os, got.OS) + assert.Equal(t, tc.platform, got.Platform) + assert.Equal(t, tc.platformFamily, got.PlatformFamily) + assert.Equal(t, tc.platformVersion, got.PlatformVersion) + assert.Equal(t, tc.kernelVersion, got.KernelVersion) + assert.Equal(t, tc.port, got.Port) + assert.Equal(t, tc.downloadPort, got.DownloadPort) + assert.Equal(t, tc.schedulerClusterId, got.SchedulerClusterID) + assert.Equal(t, tc.disableShared, got.DisableShared) + assert.Equal(t, tc.typ, got.Type) + assert.Equal(t, tc.cpu, got.CPU) + assert.Equal(t, tc.memory, got.Memory) + assert.Equal(t, tc.network, got.Network) + assert.Equal(t, tc.disk, got.Disk) + assert.Equal(t, tc.build, got.Build) + assert.Equal(t, tc.announceInterval, got.AnnounceInterval) + assert.Equal(t, tc.createdAt, got.CreatedAt) + assert.Equal(t, tc.updatedAt, got.UpdatedAt) + }) + } +} diff --git a/scheduler/resource/persistentcache/peer_test.go b/scheduler/resource/persistentcache/peer_test.go new file mode 100644 index 00000000000..994cfc636c3 --- /dev/null +++ b/scheduler/resource/persistentcache/peer_test.go @@ -0,0 +1,115 @@ +/* + * Copyright 2025 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package persistentcache + +import ( + "testing" + "time" + + logger "d7y.io/dragonfly/v2/internal/dflog" + "github.com/bits-and-blooms/bitset" + "github.com/stretchr/testify/assert" +) + +func TestNewPeer(t *testing.T) { + tests := []struct { + name string + id string + state string + persistent bool + finishedPieces *bitset.BitSet + blockParents []string + task *Task + host *Host + cost time.Duration + createdAt time.Time + updatedAt time.Time + log *logger.SugaredLoggerOnWith + expectedState string + }{ + { + name: "new peer with pending state", + id: "peer-1", + state: PeerStatePending, + persistent: true, + finishedPieces: bitset.New(64), + blockParents: []string{"parent-1"}, + task: &Task{ + ID: "task-1", + }, + host: &Host{ + ID: "host-1", + Hostname: "host-1", + IP: "127.0.0.1", + }, + cost: time.Second, + createdAt: time.Now(), + updatedAt: time.Now(), + expectedState: PeerStatePending, + }, + { + name: "new peer with running state", + id: "peer-2", + state: PeerStateRunning, + persistent: false, + finishedPieces: bitset.New(128), + blockParents: []string{"parent-2", "parent-3"}, + task: &Task{ + ID: "task-2", + }, + host: &Host{ + ID: "host-2", + Hostname: "host-2", + IP: "127.0.0.2", + }, + cost: 2 * time.Second, + createdAt: time.Now(), + updatedAt: time.Now(), + expectedState: PeerStateRunning, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + peer := NewPeer( + tc.id, + tc.state, + tc.persistent, + tc.finishedPieces, + tc.blockParents, + tc.task, + tc.host, + tc.cost, + tc.createdAt, + tc.updatedAt, + tc.log, + ) + + assert.Equal(t, tc.id, peer.ID) + assert.Equal(t, tc.persistent, peer.Persistent) + assert.Equal(t, tc.finishedPieces, peer.FinishedPieces) + assert.Equal(t, tc.blockParents, peer.BlockParents) + assert.Equal(t, tc.task, peer.Task) + assert.Equal(t, tc.host, peer.Host) + assert.Equal(t, tc.cost, peer.Cost) + assert.Equal(t, tc.createdAt, peer.CreatedAt) + assert.Equal(t, tc.updatedAt, peer.UpdatedAt) + assert.Equal(t, tc.expectedState, peer.FSM.Current()) + assert.NotNil(t, peer.Log) + }) + } +} diff --git a/scheduler/resource/persistentcache/task_test.go b/scheduler/resource/persistentcache/task_test.go new file mode 100644 index 00000000000..e3fd1a3f4e0 --- /dev/null +++ b/scheduler/resource/persistentcache/task_test.go @@ -0,0 +1,169 @@ +/* + * Copyright 2025 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package persistentcache + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + + commonv2 "d7y.io/api/v2/pkg/apis/common/v2" + logger "d7y.io/dragonfly/v2/internal/dflog" +) + +func TestNewTask(t *testing.T) { + tests := []struct { + name string + id string + tag string + application string + state string + persistentReplicaCount uint64 + pieceLength uint64 + contentLength uint64 + totalPieceCount uint32 + ttl time.Duration + createdAt time.Time + updatedAt time.Time + log *logger.SugaredLoggerOnWith + expectedState string + }{ + { + name: "new task with pending state", + id: "task-1", + tag: "tag-1", + application: "app-1", + state: TaskStatePending, + persistentReplicaCount: 3, + pieceLength: 1024 * 1024, + contentLength: 1024 * 1024 * 10, + totalPieceCount: 10, + ttl: time.Hour, + createdAt: time.Now(), + updatedAt: time.Now(), + expectedState: TaskStatePending, + }, + { + name: "new task with uploading state", + id: "task-2", + tag: "tag-2", + application: "app-2", + state: TaskStateUploading, + persistentReplicaCount: 5, + pieceLength: 1024 * 1024, + contentLength: 1024 * 1024 * 20, + totalPieceCount: 20, + ttl: 2 * time.Hour, + createdAt: time.Now(), + updatedAt: time.Now(), + expectedState: TaskStateUploading, + }, + { + name: "new task with tiny file", + id: "task-3", + tag: "tag-3", + application: "app-3", + state: TaskStateSucceeded, + persistentReplicaCount: 2, + pieceLength: 128, + contentLength: TinyFileSize, + totalPieceCount: 1, + ttl: 30 * time.Minute, + createdAt: time.Now(), + updatedAt: time.Now(), + expectedState: TaskStateSucceeded, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + task := NewTask( + tc.id, + tc.tag, + tc.application, + tc.state, + tc.persistentReplicaCount, + tc.pieceLength, + tc.contentLength, + tc.totalPieceCount, + tc.ttl, + tc.createdAt, + tc.updatedAt, + tc.log, + ) + + assert.Equal(t, tc.id, task.ID) + assert.Equal(t, tc.tag, task.Tag) + assert.Equal(t, tc.application, task.Application) + assert.Equal(t, tc.persistentReplicaCount, task.PersistentReplicaCount) + assert.Equal(t, tc.pieceLength, task.PieceLength) + assert.Equal(t, tc.contentLength, task.ContentLength) + assert.Equal(t, tc.totalPieceCount, task.TotalPieceCount) + assert.Equal(t, tc.ttl, task.TTL) + assert.Equal(t, tc.createdAt, task.CreatedAt) + assert.Equal(t, tc.updatedAt, task.UpdatedAt) + assert.Equal(t, tc.expectedState, task.FSM.Current()) + assert.NotNil(t, task.Log) + }) + } +} + +func TestTask_SizeScope(t *testing.T) { + tests := []struct { + name string + contentLength uint64 + totalPieceCount uint32 + expectedSizeScope commonv2.SizeScope + }{ + { + name: "empty file", + contentLength: EmptyFileSize, + totalPieceCount: 0, + expectedSizeScope: commonv2.SizeScope_EMPTY, + }, + { + name: "tiny file", + contentLength: TinyFileSize, + totalPieceCount: 1, + expectedSizeScope: commonv2.SizeScope_TINY, + }, + { + name: "small file", + contentLength: TinyFileSize + 1, + totalPieceCount: 1, + expectedSizeScope: commonv2.SizeScope_SMALL, + }, + { + name: "normal file", + contentLength: 1024 * 1024, + totalPieceCount: 10, + expectedSizeScope: commonv2.SizeScope_NORMAL, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + task := &Task{ + ContentLength: tc.contentLength, + TotalPieceCount: tc.totalPieceCount, + } + got := task.SizeScope() + assert.Equal(t, tc.expectedSizeScope, got) + }) + } +}