Skip to content

Commit

Permalink
Introduce a mechanism to scheduler to actively trigger rescheduling
Browse files Browse the repository at this point in the history
Signed-off-by: chaosi-zju <[email protected]>
  • Loading branch information
chaosi-zju committed Apr 18, 2024
1 parent fdad87e commit 17d792e
Show file tree
Hide file tree
Showing 14 changed files with 328 additions and 189 deletions.
10 changes: 10 additions & 0 deletions api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -19151,6 +19151,11 @@
"$ref": "#/definitions/com.github.karmada-io.karmada.pkg.apis.work.v1alpha2.BindingSnapshot"
}
},
"rescheduleTriggeredAt": {
"description": "RescheduleTriggeredAt is a timestamp representing when the referenced resource is triggered rescheduling. Only when this timestamp is later than timestamp in status.lastScheduledTime will the rescheduling actually execute.\n\nIt is represented in RFC3339 form (like '2006-01-02T15:04:05Z') and is in UTC. It is recommended to be populated by the REST handler of command.karmada.io/Reschedule API.",
"default": {},
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Time"
},
"resource": {
"description": "Resource represents the Kubernetes resource to be propagated.",
"default": {},
Expand Down Expand Up @@ -19182,6 +19187,11 @@
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Condition"
}
},
"lastScheduledTime": {
"description": "LastScheduledTime is a timestamp representing scheduler successfully finished a scheduling. It is represented in RFC3339 form (like '2006-01-02T15:04:05Z') and is in UTC.",
"default": {},
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Time"
},
"schedulerObservedGeneration": {
"description": "SchedulerObservedGeneration is the generation(.metadata.generation) observed by the scheduler. If SchedulerObservedGeneration is less than the generation in metadata means the scheduler hasn't confirmed the scheduling result or hasn't done the schedule yet.",
"type": "integer",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1128,6 +1128,16 @@ spec:
- name
type: object
type: array
rescheduleTriggeredAt:
description: "RescheduleTriggeredAt is a timestamp representing when
the referenced resource is triggered rescheduling. Only when this
timestamp is later than timestamp in status.lastScheduledTime will
the rescheduling actually execute. \n It is represented in RFC3339
form (like '2006-01-02T15:04:05Z') and is in UTC. It is recommended
to be populated by the REST handler of command.karmada.io/Reschedule
API."
format: date-time
type: string
resource:
description: Resource represents the Kubernetes resource to be propagated.
properties:
Expand Down Expand Up @@ -1279,6 +1289,12 @@ spec:
- type
type: object
type: array
lastScheduledTime:
description: LastScheduledTime is a timestamp representing scheduler
successfully finished a scheduling. It is represented in RFC3339
form (like '2006-01-02T15:04:05Z') and is in UTC.
format: date-time
type: string
schedulerObservedGeneration:
description: SchedulerObservedGeneration is the generation(.metadata.generation)
observed by the scheduler. If SchedulerObservedGeneration is less
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1128,6 +1128,16 @@ spec:
- name
type: object
type: array
rescheduleTriggeredAt:
description: "RescheduleTriggeredAt is a timestamp representing when
the referenced resource is triggered rescheduling. Only when this
timestamp is later than timestamp in status.lastScheduledTime will
the rescheduling actually execute. \n It is represented in RFC3339
form (like '2006-01-02T15:04:05Z') and is in UTC. It is recommended
to be populated by the REST handler of command.karmada.io/Reschedule
API."
format: date-time
type: string
resource:
description: Resource represents the Kubernetes resource to be propagated.
properties:
Expand Down Expand Up @@ -1279,6 +1289,12 @@ spec:
- type
type: object
type: array
lastScheduledTime:
description: LastScheduledTime is a timestamp representing scheduler
successfully finished a scheduling. It is represented in RFC3339
form (like '2006-01-02T15:04:05Z') and is in UTC.
format: date-time
type: string
schedulerObservedGeneration:
description: SchedulerObservedGeneration is the generation(.metadata.generation)
observed by the scheduler. If SchedulerObservedGeneration is less
Expand Down
13 changes: 13 additions & 0 deletions pkg/apis/work/v1alpha2/binding_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,14 @@ type ResourceBindingSpec struct {
// +kubebuilder:validation:Enum=Abort;Overwrite
// +optional
ConflictResolution policyv1alpha1.ConflictResolution `json:"conflictResolution,omitempty"`

// RescheduleTriggeredAt is a timestamp representing when the referenced resource is triggered rescheduling.
// Only when this timestamp is later than timestamp in status.lastScheduledTime will the rescheduling actually execute.
//
// It is represented in RFC3339 form (like '2006-01-02T15:04:05Z') and is in UTC.
// It is recommended to be populated by the REST handler of command.karmada.io/Reschedule API.
// +optional
RescheduleTriggeredAt metav1.Time `json:"rescheduleTriggeredAt,omitempty"`
}

// ObjectReference contains enough information to locate the referenced object inside current cluster.
Expand Down Expand Up @@ -297,6 +305,11 @@ type ResourceBindingStatus struct {
// +optional
SchedulerObservedAffinityName string `json:"schedulerObservingAffinityName,omitempty"`

// LastScheduledTime is a timestamp representing scheduler successfully finished a scheduling.
// It is represented in RFC3339 form (like '2006-01-02T15:04:05Z') and is in UTC.
// +optional
LastScheduledTime metav1.Time `json:"lastScheduledTime,omitempty"`

// Conditions contain the different condition statuses.
// +optional
Conditions []metav1.Condition `json:"conditions,omitempty"`
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/work/v1alpha2/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 16 additions & 2 deletions pkg/generated/openapi/zz_generated.openapi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 18 additions & 5 deletions pkg/scheduler/core/assignment.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,28 +65,34 @@ type assignState struct {

// targetReplicas is the replicas that we need to schedule in this round
targetReplicas int32

// rescheduleSpecified when spec.rescheduleTriggeredAt later than status.lastScheduledTime in binding, means
// there is a rescheduling explicitly specified by user, and scheduler should do a purely rescale.
rescheduleSpecified bool
}

func newAssignState(candidates []*clusterv1alpha1.Cluster, placement *policyv1alpha1.Placement, obj *workv1alpha2.ResourceBindingSpec) *assignState {
func newAssignState(candidates []*clusterv1alpha1.Cluster, spec *workv1alpha2.ResourceBindingSpec,
status *workv1alpha2.ResourceBindingStatus) *assignState {
var strategyType string

switch placement.ReplicaSchedulingType() {
switch spec.Placement.ReplicaSchedulingType() {
case policyv1alpha1.ReplicaSchedulingTypeDuplicated:
strategyType = DuplicatedStrategy
case policyv1alpha1.ReplicaSchedulingTypeDivided:
switch placement.ReplicaScheduling.ReplicaDivisionPreference {
switch spec.Placement.ReplicaScheduling.ReplicaDivisionPreference {
case policyv1alpha1.ReplicaDivisionPreferenceAggregated:
strategyType = AggregatedStrategy
case policyv1alpha1.ReplicaDivisionPreferenceWeighted:
if placement.ReplicaScheduling.WeightPreference != nil && len(placement.ReplicaScheduling.WeightPreference.DynamicWeight) != 0 {
if spec.Placement.ReplicaScheduling.WeightPreference != nil && len(spec.Placement.ReplicaScheduling.WeightPreference.DynamicWeight) != 0 {
strategyType = DynamicWeightStrategy
} else {
strategyType = StaticWeightStrategy
}
}
}
rescheduleSpecified := spec.RescheduleTriggeredAt.After(status.LastScheduledTime.Time)

return &assignState{candidates: candidates, strategy: placement.ReplicaScheduling, spec: obj, strategyType: strategyType}
return &assignState{candidates: candidates, strategy: spec.Placement.ReplicaScheduling, spec: spec, strategyType: strategyType, rescheduleSpecified: rescheduleSpecified}
}

func (as *assignState) buildScheduledClusters() {
Expand Down Expand Up @@ -194,6 +200,13 @@ func assignByDynamicStrategy(state *assignState) ([]workv1alpha2.TargetCluster,
return nil, fmt.Errorf("failed to scale up: %v", err)
}
return result, nil
} else if state.rescheduleSpecified {
// when a rescheduling is explicitly specified by user, the scheduler should do a purely rescale.
result, err := dynamicReScale(state)
if err != nil {
return nil, fmt.Errorf("failed to do rescale: %v", err)
}
return result, nil
}

return state.scheduledClusters, nil
Expand Down
59 changes: 29 additions & 30 deletions pkg/scheduler/core/assignment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,6 @@ func Test_dynamicScale(t *testing.T) {
name string
candidates []*clusterv1alpha1.Cluster
object *workv1alpha2.ResourceBindingSpec
placement *policyv1alpha1.Placement
want []workv1alpha2.TargetCluster
wantErr bool
}{
Expand All @@ -326,9 +325,9 @@ func Test_dynamicScale(t *testing.T) {
{Name: ClusterMember2, Replicas: 4},
{Name: ClusterMember3, Replicas: 6},
},
},
placement: &policyv1alpha1.Placement{
ReplicaScheduling: dynamicWeightStrategy,
Placement: &policyv1alpha1.Placement{
ReplicaScheduling: dynamicWeightStrategy,
},
},
want: []workv1alpha2.TargetCluster{
{Name: ClusterMember1, Replicas: 1},
Expand Down Expand Up @@ -360,9 +359,9 @@ func Test_dynamicScale(t *testing.T) {
{Name: ClusterMember2, Replicas: 4},
{Name: ClusterMember3, Replicas: 6},
},
},
placement: &policyv1alpha1.Placement{
ReplicaScheduling: dynamicWeightStrategy,
Placement: &policyv1alpha1.Placement{
ReplicaScheduling: dynamicWeightStrategy,
},
},
want: []workv1alpha2.TargetCluster{
{Name: ClusterMember1, Replicas: 6},
Expand Down Expand Up @@ -394,9 +393,9 @@ func Test_dynamicScale(t *testing.T) {
{Name: ClusterMember2, Replicas: 4},
{Name: ClusterMember3, Replicas: 6},
},
},
placement: &policyv1alpha1.Placement{
ReplicaScheduling: dynamicWeightStrategy,
Placement: &policyv1alpha1.Placement{
ReplicaScheduling: dynamicWeightStrategy,
},
},
wantErr: true,
},
Expand All @@ -422,9 +421,9 @@ func Test_dynamicScale(t *testing.T) {
{Name: ClusterMember1, Replicas: 4},
{Name: ClusterMember2, Replicas: 8},
},
},
placement: &policyv1alpha1.Placement{
ReplicaScheduling: aggregatedStrategy,
Placement: &policyv1alpha1.Placement{
ReplicaScheduling: aggregatedStrategy,
},
},
want: []workv1alpha2.TargetCluster{
{Name: ClusterMember2, Replicas: 6},
Expand All @@ -450,9 +449,9 @@ func Test_dynamicScale(t *testing.T) {
{Name: ClusterMember1, Replicas: 4},
{Name: ClusterMember2, Replicas: 8},
},
},
placement: &policyv1alpha1.Placement{
ReplicaScheduling: aggregatedStrategy,
Placement: &policyv1alpha1.Placement{
ReplicaScheduling: aggregatedStrategy,
},
},
want: []workv1alpha2.TargetCluster{
{Name: ClusterMember2, Replicas: 8},
Expand Down Expand Up @@ -481,9 +480,9 @@ func Test_dynamicScale(t *testing.T) {
{Name: ClusterMember1, Replicas: 4},
{Name: ClusterMember2, Replicas: 8},
},
},
placement: &policyv1alpha1.Placement{
ReplicaScheduling: aggregatedStrategy,
Placement: &policyv1alpha1.Placement{
ReplicaScheduling: aggregatedStrategy,
},
},
want: []workv1alpha2.TargetCluster{
{Name: ClusterMember1, Replicas: 6},
Expand Down Expand Up @@ -514,9 +513,9 @@ func Test_dynamicScale(t *testing.T) {
{Name: ClusterMember1, Replicas: 4},
{Name: ClusterMember2, Replicas: 8},
},
},
placement: &policyv1alpha1.Placement{
ReplicaScheduling: aggregatedStrategy,
Placement: &policyv1alpha1.Placement{
ReplicaScheduling: aggregatedStrategy,
},
},
want: []workv1alpha2.TargetCluster{
{Name: ClusterMember1, Replicas: 10},
Expand Down Expand Up @@ -546,9 +545,9 @@ func Test_dynamicScale(t *testing.T) {
{Name: ClusterMember1, Replicas: 4},
{Name: ClusterMember2, Replicas: 8},
},
},
placement: &policyv1alpha1.Placement{
ReplicaScheduling: aggregatedStrategy,
Placement: &policyv1alpha1.Placement{
ReplicaScheduling: aggregatedStrategy,
},
},
wantErr: true,
},
Expand All @@ -574,9 +573,9 @@ func Test_dynamicScale(t *testing.T) {
{Name: ClusterMember1, Replicas: 4},
{Name: ClusterMember2, Replicas: 8},
},
},
placement: &policyv1alpha1.Placement{
ReplicaScheduling: aggregatedStrategy,
Placement: &policyv1alpha1.Placement{
ReplicaScheduling: aggregatedStrategy,
},
},
want: []workv1alpha2.TargetCluster{
{Name: ClusterMember1, Replicas: 7},
Expand All @@ -588,7 +587,7 @@ func Test_dynamicScale(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
state := newAssignState(tt.candidates, tt.placement, tt.object)
state := newAssignState(tt.candidates, tt.object, &workv1alpha2.ResourceBindingStatus{})
got, err := assignByDynamicStrategy(state)
if (err != nil) != tt.wantErr {
t.Errorf("assignByDynamicStrategy() error = %v, wantErr %v", err, tt.wantErr)
Expand Down Expand Up @@ -819,7 +818,7 @@ func Test_dynamicScaleUp(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
state := newAssignState(tt.candidates, tt.placement, tt.object)
state := newAssignState(tt.candidates, tt.object, &workv1alpha2.ResourceBindingStatus{})
state.buildScheduledClusters()
got, err := dynamicScaleUp(state)
if (err != nil) != tt.wantErr {
Expand Down
10 changes: 5 additions & 5 deletions pkg/scheduler/core/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func SelectClusters(clustersScore framework.ClusterScoreList,
// AssignReplicas assigns replicas to clusters based on the placement and resource binding spec.
func AssignReplicas(
clusters []*clusterv1alpha1.Cluster,
placement *policyv1alpha1.Placement,
object *workv1alpha2.ResourceBindingSpec,
spec *workv1alpha2.ResourceBindingSpec,
status *workv1alpha2.ResourceBindingStatus,
) ([]workv1alpha2.TargetCluster, error) {
startTime := time.Now()
defer metrics.ScheduleStep(metrics.ScheduleStepAssignReplicas, startTime)
Expand All @@ -51,13 +51,13 @@ func AssignReplicas(
return nil, fmt.Errorf("no clusters available to schedule")
}

if object.Replicas > 0 {
state := newAssignState(clusters, placement, object)
if spec.Replicas > 0 {
state := newAssignState(clusters, spec, status)
assignFunc, ok := assignFuncMap[state.strategyType]
if !ok {
// should never happen at present
return nil, fmt.Errorf("unsupported replica scheduling strategy, replicaSchedulingType: %s, replicaDivisionPreference: %s, "+
"please try another scheduling strategy", placement.ReplicaSchedulingType(), placement.ReplicaScheduling.ReplicaDivisionPreference)
"please try another scheduling strategy", spec.Placement.ReplicaSchedulingType(), spec.Placement.ReplicaScheduling.ReplicaDivisionPreference)
}
assignResults, err := assignFunc(state)
if err != nil {
Expand Down
Loading

0 comments on commit 17d792e

Please sign in to comment.