Rescheduler#
Introduction#
Llumnix’s rescheduler is a request rescheduling component that continuously rebalances load, maximizes SLO attainment and resource efficiency under fluctuating workloads, and proactively migrates requests from unhealthy instances. Operating in a scheduler + rescheduler architecture, the rescheduler complements the initial dispatch by performing ongoing rescheduling of running/waiting requests through two integrated components:
Rescheduling Planner: Evaluates instance load using configurable policies (metrics → filters → selectors) to generate migration decisions.
Migration Dispatcher: Initiates gRPC commands to llumlet processes to trigger request transfer with KV cache state.
The rescheduler serves three primary functions:
Load Balancing: Continuously monitors instance load and migrates requests from overloaded instances to underutilized ones, mitigating fragmentation and eliminating hotspots for improved latency.
Adaptive PD Rescheduling: Enhances adaptive prefill-decode disaggregation by migrating decode requests based on predicted TPOT, consolidating underutilized instances and mitigating overloaded ones to further improve SLO attainment and resource efficiency. See Adaptive PD Scheduling for details.
Failover: Detects unhealthy or unschedulable instances and proactively migrates their requests to healthy instances within configurable failure domains (instance/node/unit domain).
Architecture#
Architectural Overview#
The rescheduler operates within Llumnix’s scheduling layer:
Scheduler: Makes one-time routing decisions for incoming requests.
Rescheduler: Performs continuous migration decisions for running/waiting requests.
┌──────────────────────────────────────────────────────────┐
│ ReschedulerService │
│ ┌────────────────────────────────────────────────────┐ │
│ │ ReschedulingPolicy │ │
│ │ ┌──────────────────────────────────────────────┐ │ │
│ │ │ policies[] (multiple policy instances) │ │ │
│ │ │ - load balance │ │ │
│ │ │ - failover │ │ │
│ │ │ - adaptive PD │ │ │
│ │ └──────────────────────────────────────────────┘ │ │
│ │ - cmsClient (cluster view) │ │
│ │ - llumletClientManager (gRPC to engines) │ │
│ └────────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────┘
│
│ periodic loop
↓
┌───────────────────────────────────────────┐
│ ReschedulingLoop │
│ For each policy in policies[]: │
│ 1. Calculate metrics │
│ 2. Filter src/dst candidates │
│ 3. Select migration pairs │
│ 4. Validate & append to global list │
└───────────────────────────────────────────┘
│
│ aggregated pairs
↓
┌───────────────────────────────────────────┐
│ Execute Migrations │
│ Parallel gRPC calls to llumlets │
└───────────────────────────────────────────┘
│
│ gRPC Migrate()
↓
┌───────────────────────────────────────────┐
│ Llumlet (engine side) │
│ - Receives migration command │
│ - Selects migration requests │
│ - Coordinates KV cache transfer │
└───────────────────────────────────────────┘
Execution Flow#
The rescheduler executes in a continuous loop with the following steps:
Periodic Trigger:
ReschedulingLoop()runs at intervals configured byreschedulingIntervalMs.Cluster View Refresh: Pulls latest instance status from Cluster Metadata Store.
Multi-Policy Sequential Execution: Iterates through all policies in
policies[]list. Each policy executes independently:Metric Calculation: Policy-specific metrics are computed for all instances (e.g.,
kv_cache_usage_ratio_projected).Candidate Filtering: Applies single-instance filters (infer type, schedulability, staleness) and global filters (failover domain) to identify valid source and destination instances.
Pair Selection: Uses policy-specific selector (e.g.,
metricBalanceSelector,roundRobinSelector) to generate migration pairs from high-load sources to low-load destinations.Validation & Aggregation: Each selected pair is validated against existing pairs to avoid conflicts (no A→B and B→A). Valid pairs are appended to global
reschedulingPairslist.
Migration Execution:
executeMigrations()dispatches all aggregated pairs concurrently via gRPC and blocks until all results are collected.Cycle Completion: Returns to sleep until next interval, then repeats from step 2.
Rescheduling Policies#
The rescheduler implements a multi-policy architecture where each policy operates independently and produces migration pairs. Policies are configured via comma-separated lists (--rescheduling-policies).
Policy Registry#
Built-in policies (defined in pkg/consts/consts.go):
Policy Name |
Infer Type |
Purpose |
|---|---|---|
|
Decode |
Balance decode instance load |
|
Neutral |
Balance neutral instance load |
|
Prefill |
Migrate from failing prefill instances |
|
Decode |
Migrate from failing decode instances |
|
Neutral |
Migrate from failing neutral instances |
|
Decode |
Mitigate TPOT SLO violations |
|
Decode |
Consolidate underutilized instances |
Each policy implements its own metric calculation, instance filtering, and pair selection logic. See sections 3.2-3.4 for detailed mechanisms.
Load Balance Policy#
Purpose: Reduces load imbalance across instances of the same infer type within a configurable scope (cluster-wide or per-unit).
Mechanism:
Source filter: Instances with load metric >= threshold (configurable via
--rescheduling-decode-load-thresholdor--rescheduling-neutral-load-threshold).Destination filter: Instances with load metric < threshold.
Selector:
metricBalanceSelectorpairs highest-load sources with lowest-load destinations.Scope: Supports
cluster(global balancing) orunit(per-unit balancing) via--rescheduling-load-balance-scope.
Configuration:
--rescheduling-policies=decode_load,neutral_load
--rescheduling-decode-load-metric=kv_cache_usage_ratio_projected
--rescheduling-decode-load-threshold=0.7
--rescheduling-load-balance-scope=cluster
--rescheduling-req-select-rule=TOKEN
--rescheduling-req-select-order=SR
--rescheduling-req-select-value=1024
Example: In a decode cluster with 5 instances (load: 0.9, 0.3, 0.8, 0.2, 0.4), the policy would:
Filter sources: instances with load >= 0.7 → [0.9, 0.8]
Filter destinations: instances with load < 0.7 → [0.3, 0.2, 0.4]
Pair: (0.9 → 0.2), (0.8 → 0.3)
Migrate specified number of tokens from source to destination.
Adaptive PD Rescheduling Policies#
For the full design of adaptive PD scheduling (including scheduler dispatch logic and rescheduling), see Adaptive PD Scheduling.
Purpose: Enhances adaptive prefill-decode disaggregation by migrating decode requests based on predicted TPOT, consolidating underutilized instances and mitigating overloaded ones to further improve SLO attainment and resource efficiency.
Background: In PD-disaggregated deployments with static partitioning, load fluctuations cause suboptimal resource utilization: decode instances may become overloaded (TPOT exceeds SLO) or underutilized (TPOT well below SLO). The rescheduler complements the scheduler’s adaptive PD logic by proactively rebalancing decode requests across instances.
Migration Constraints:
Only migrate decode requests (prefill requests remain stationary).
At most one migration pair per policy per rescheduling cycle.
Excludes prefill-reserved instances from migration sources/destinations.
Built-in Policies:
Policy Name |
Purpose |
Source Filter |
Destination Filter |
Selector |
|---|---|---|---|---|
|
Mitigate TPOT SLO violations |
Predicted TPOT >= ceiling threshold (default 0.95 × TPOT SLO) |
Predicted TPOT < dispatch threshold (default 0.85 × TPOT SLO) |
|
|
Consolidate underutilized instances |
Predicted TPOT < floor threshold (default 0.60 × TPOT SLO) AND decode batch size > 0.1 |
Predicted TPOT < dispatch threshold AND decode batch size > 0.1 |
|
Threshold Configuration:
--enable-adaptive-pd=true
--scheduling-policy=slo
--tpot-slo=50
--tpot-slo-dispatch-threshold=0.85
--tpot-migrate-out-ceil-threshold=0.95
--tpot-migrate-out-floor-threshold=0.60
--rescheduling-policies=binpacking_mitigation,binpacking_consolidation
--rescheduling-interval-ms=100
--colocated-rescheduling-mode=true
Example - Mitigation: Instance D1 has predicted TPOT = 48ms (exceeds 0.95 × 50ms = 47.5ms ceiling). Policy migrates decode requests from D1 to D2 (predicted TPOT = 35ms, below 0.85 × 50ms = 42.5ms dispatch threshold).
Example - Consolidation: Instance D3 has predicted TPOT = 25ms (below 0.60 × 50ms = 30ms floor) with active decode requests. Policy migrates all requests from D3 to D4 (predicted TPOT = 40ms, heavily loaded but SLO-compliant), freeing D3 for prefill assignment.
Note: For details, see Adaptive PD Scheduling.
Failover Policy#
Purpose: Responds to unhealthy or unschedulable instances by migrating their requests to healthy instances outside the failure domain.
Mechanism:
Source filter: Global
failoverMigrationSrcFilteridentifies instances requiring failover based onneedsFailovermarks set by schedulability and staleness filters.Destination filter: Excludes all instances within the failure domain (configured via
--failover-domain).Selector:
roundRobinSelectordistributes requests evenly across available destinations.
Failover Domains (defined in pkg/consts/consts.go):
instance: Only the failed instance itself (default).node: All instances on the same physical node.instance-unit: All instances sharing the same unit with the failed instance.node-unit: All instances sharing units with instances on the failed node.
Configuration:
--rescheduling-policies=prefill_failover,decode_failover,neutral_failover
--failover-domain=node
--instance-staleness-seconds=100
Example: If instance decode-3 becomes unschedulable with failover-domain=node:
Source filter: Marks
decode-3as needing failover.Global filter: Blocks all instances on the same node as
decode-3.Destination filter: Keeps only instances on other nodes.
Selector: Round-robin assigns requests from
decode-3to remaining instances.
Migration Implementation#
Migration Request Types#
The rescheduler supports three migration granularities (defined in pkg/consts/consts.go):
Type |
Constant |
Description |
|---|---|---|
|
|
Migrate N requests |
|
|
Migrate N tokens |
|
|
Migrate N% of KV cache |
Migration Request Ordering#
When selecting which requests to migrate, the following ordering policies are supported:
Order |
Constant |
Description |
|---|---|---|
|
|
Last Come Running: the last request to come (among running requests) |
|
|
First Come Running: the first request to come (among running requests) |
|
|
Longest Running: the request with the longest sequence length |
|
|
Shortest Running: the request with the shortest sequence length |
|
|
First Come Waiting: the first request to come (among waiting requests) |
|
|
First Come Waiting, if none exist, then Shortest Running |
Execution Pipeline#
The migration execution pipeline (lines 177-259 in rescheduling_policy.go):
Channel Setup: Creates a buffered channel to collect results from concurrent migrations.
Parallel Dispatch: Launches goroutines for each migration pair:
Retrieves gRPC client from connection pool.
Constructs
MigrateRequestwith source/destination instance IDs, migration type, order, and value.Issues gRPC call with timeout (configured via
--llumlet-grpc-timeout-seconds).
Error Handling:
Logs timeout errors separately from other gRPC failures.
Records
rescheduling_failed_countmetric on failure.Continues executing remaining migrations despite individual failures.
Result Aggregation: Collects all results and returns them for monitoring/debugging.
Concurrency Model: All migrations execute in parallel, bounded only by gRPC connection pool size (--llumlet-grpc-connection-pool-size).
Configuration#
Core Rescheduling Flags#
Flag |
Default |
Description |
|---|---|---|
|
|
Enable rescheduling |
|
|
Comma-separated list of rescheduling policies |
|
|
Interval between rescheduling iterations |
|
|
Run rescheduler inside scheduler process |
|
|
Run rescheduler as separate process |
Load Balance Configuration#
Flag |
Default |
Description |
|---|---|---|
|
|
Load metric for decode instances |
|
|
Threshold for source/destination filtering: instances >= this value are migration sources, instances < this value are migration destinations |
|
|
Load metric for neutral instances |
|
|
Threshold for source/destination filtering: instances >= this value are migration sources, instances < this value are migration destinations |
|
|
Minimum load difference required to trigger migration |
|
|
Balancing scope: |
Adaptive PD Configuration#
Flag |
Default |
Description |
|---|---|---|
|
|
Enable adaptive PD scheduling |
|
|
Must be set to |
|
|
TPOT SLO target (ms) |
|
|
Fraction of TPOT SLO used as dispatch/destination filter threshold |
|
|
Fraction of TPOT SLO above which mitigating rescheduling triggers (source filter) |
|
|
Fraction of TPOT SLO below which consolidating rescheduling triggers (source filter) |
|
|
Rescheduling policies for adaptive PD |
|
|
Interval between rescheduling iterations (use |
Note: For details, see Adaptive PD Scheduling.
Failover Configuration#
Flag |
Default |
Description |
|---|---|---|
|
|
Failure domain: |
|
|
Time after which an instance is considered stale |
Migration Request Configuration#
Flag |
Default |
Description |
|---|---|---|
|
|
Migration request selection rule: |
|
|
Migration request selection order: |
|
|
Number of requests/tokens or KV cache ratio to migrate |
gRPC Configuration#
Flag |
Default |
Description |
|---|---|---|
|
|
Size of gRPC connection pool per instance |
|
|
Timeout for gRPC migration calls |
Deployment Modes#
Mode |
Flag |
Process |
|---|---|---|
Colocated |
|
Inside scheduler process |
Standalone |
|
Separate process |