diff --git a/.changelog/13407.txt b/.changelog/13407.txt
new file mode 100644
index 00000000000..7a2fb340bac
--- /dev/null
+++ b/.changelog/13407.txt
@@ -0,0 +1,3 @@
+```release-note:bug
+core: Fixed a bug where the plan applier could deadlock if leader's state lagged behind plan's creation index for more than 5 seconds.
+```
diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go
index 7f1fffc54f8..58002b2258a 100644
--- a/nomad/plan_apply.go
+++ b/nomad/plan_apply.go
@@ -154,6 +154,7 @@ func (p *planner) planApply() {
 		// This also limits how out of date our snapshot can be.
 		if planIndexCh != nil {
 			idx := <-planIndexCh
+			planIndexCh = nil
 			prevPlanResultIndex = max(prevPlanResultIndex, idx)
 			snap, err = p.snapshotMinIndex(prevPlanResultIndex, pending.plan.SnapshotIndex)
 			if err != nil {
@@ -177,7 +178,7 @@ func (p *planner) planApply() {
 	}
 }
 
-// snapshotMinIndex wraps SnapshotAfter with a 5s timeout and converts timeout
+// snapshotMinIndex wraps SnapshotAfter with a 10s timeout and converts timeout
 // errors to a more descriptive error message. The snapshot is guaranteed to
 // include both the previous plan and all objects referenced by the plan or
 // return an error.
@@ -188,7 +189,11 @@ func (p *planner) snapshotMinIndex(prevPlanResultIndex, planSnapshotIndex uint64
 	// plan result's and current plan's snapshot index.
 	minIndex := max(prevPlanResultIndex, planSnapshotIndex)
 
-	const timeout = 5 * time.Second
+	// This timeout creates backpressure where any concurrent
+	// Plan.Submit RPCs will block waiting for results. This sheds
+	// load across all servers and gives raft some CPU to catch up,
+	// because schedulers won't dequeue more work while waiting.
+	const timeout = 10 * time.Second
 	ctx, cancel := context.WithTimeout(context.Background(), timeout)
 	snap, err := p.fsm.State().SnapshotMinIndex(ctx, minIndex)
 	cancel()
@@ -368,14 +373,12 @@ func updateAllocTimestamps(allocations []*structs.Allocation, timestamp int64) {
 func (p *planner) asyncPlanWait(indexCh chan<- uint64, future raft.ApplyFuture,
 	result *structs.PlanResult, pending *pendingPlan) {
 	defer metrics.MeasureSince([]string{"nomad", "plan", "apply"}, time.Now())
+	defer close(indexCh)
 
 	// Wait for the plan to apply
 	if err := future.Error(); err != nil {
 		p.logger.Error("failed to apply plan", "error", err)
 		pending.respond(nil, err)
-
-		// Close indexCh on error
-		close(indexCh)
 		return
 	}
 
diff --git a/nomad/plan_endpoint_test.go b/nomad/plan_endpoint_test.go
index 8c02c2ba9a9..c36cb4f1989 100644
--- a/nomad/plan_endpoint_test.go
+++ b/nomad/plan_endpoint_test.go
@@ -1,6 +1,7 @@
 package nomad
 
 import (
+	"sync"
 	"testing"
 	"time"
 
@@ -9,6 +10,7 @@ import (
 	"github.com/hashicorp/nomad/nomad/mock"
 	"github.com/hashicorp/nomad/nomad/structs"
 	"github.com/hashicorp/nomad/testutil"
+	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/require"
 )
 
@@ -128,3 +130,69 @@ func TestPlanEndpoint_Submit_Bad(t *testing.T) {
 	// Ensure no plans were enqueued
 	require.Zero(t, s1.planner.planQueue.Stats().Depth)
 }
+
+func TestPlanEndpoint_ApplyConcurrent(t *testing.T) {
+	t.Parallel()
+
+	s1, cleanupS1 := TestServer(t, func(c *Config) {
+		c.NumSchedulers = 0
+	})
+	defer cleanupS1()
+	testutil.WaitForLeader(t, s1.RPC)
+
+	plans := []*structs.Plan{}
+
+	for i := 0; i < 5; i++ {
+
+		// Create a node to place on
+		node := mock.Node()
+		store := s1.fsm.State()
+		require.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 100, node))
+
+		// Create the eval
+		eval1 := mock.Eval()
+		s1.evalBroker.Enqueue(eval1)
+		require.NoError(t, store.UpsertEvals(
+			structs.MsgTypeTestSetup, 150, []*structs.Evaluation{eval1}))
+
+		evalOut, token, err := s1.evalBroker.Dequeue([]string{eval1.Type}, time.Second)
+		require.NoError(t, err)
+		require.Equal(t, eval1, evalOut)
+
+		// Submit a plan
+		plan := mock.Plan()
+		plan.EvalID = eval1.ID
+		plan.EvalToken = token
+		plan.Job = mock.Job()
+
+		alloc := mock.Alloc()
+		alloc.JobID = plan.Job.ID
+		alloc.Job = plan.Job
+
+		plan.NodeAllocation = map[string][]*structs.Allocation{
+			node.ID: []*structs.Allocation{alloc}}
+
+		plans = append(plans, plan)
+	}
+
+	var wg sync.WaitGroup
+
+	for _, plan := range plans {
+		plan := plan
+		wg.Add(1)
+		go func() {
+
+			req := &structs.PlanRequest{
+				Plan:         plan,
+				WriteRequest: structs.WriteRequest{Region: "global"},
+			}
+			var resp structs.PlanResponse
+			err := s1.RPC("Plan.Submit", req, &resp)
+			assert.NoError(t, err)
+			assert.NotNil(t, resp.Result, "missing result")
+			wg.Done()
+		}()
+	}
+
+	wg.Wait()
+}