vLLM-01-Engine模块-数据结构
核心数据结构列表
Engine 模块包含以下核心数据结构:
| 数据结构 | 类型 | 用途 | 更新时机 | 文件位置 |
|---|---|---|---|---|
EngineCoreRequest |
msgspec.Struct | 引擎内部请求表示 | 请求添加时创建 | vllm/v1/engine/init.py |
EngineCoreOutput |
msgspec.Struct | 单个请求的输出 | 每次 step 生成 | vllm/v1/engine/init.py |
EngineCoreOutputs |
dataclass | 批量输出 + 统计信息 | 每次 step 生成 | vllm/v1/engine/init.py |
RequestState |
class | 请求状态跟踪 | 请求生命周期持续更新 | vllm/v1/engine/output_processor.py |
RequestOutput |
class | 返回给用户的输出 | 每次 step 生成 | vllm/outputs.py |
CompletionOutput |
class | 单个完成输出 | 每次 step 生成 | vllm/outputs.py |
SamplingParams |
class | 采样参数 | 请求创建时设置 | vllm/sampling_params.py |
数据结构详细说明
1. EngineCoreRequest
基本信息
用途:表示提交给 EngineCore 的内部请求格式,经过 Processor 处理后的标准化请求表示。
定义:
class EngineCoreRequest(
msgspec.Struct,
array_like=True,
omit_defaults=True,
gc=False
):
"""引擎核心请求结构"""
request_id: str # 请求唯一标识符
prompt_token_ids: Optional[list[int]] # prompt 的 token IDs
mm_features: Optional[list[MultiModalFeatureSpec]] # 多模态特征
sampling_params: Optional[SamplingParams] # 采样参数
pooling_params: Optional[PoolingParams] # 池化参数(embedding 任务)
eos_token_id: Optional[int] # EOS token ID
arrival_time: float # 请求到达时间
lora_request: Optional[LoRARequest] # LoRA 适配器请求
cache_salt: Optional[str] # 缓存盐(用于 cache invalidation)
data_parallel_rank: Optional[int] # 数据并行 rank
prompt_embeds: Optional[torch.Tensor] = None # Prompt embeddings(如使用)
client_index: int = 0 # 客户端索引(前端扩展)
current_wave: int = 0 # 当前波次(DP 模式)
priority: int = 0 # 请求优先级
trace_headers: Optional[Mapping[str, str]] = None # 追踪请求头
字段说明表
| 字段 | 类型 | 必填 | 默认值 | 约束 | 说明 |
|---|---|---|---|---|---|
| request_id | str | ✓ | - | 全局唯一 | 请求的唯一标识符,用于跟踪和输出匹配 |
| prompt_token_ids | list[int] | ✗ | None | 长度 ≤ max_model_len | 已 tokenize 的 prompt;与 prompt_embeds 二选一 |
| mm_features | list[MultiModalFeatureSpec] | ✗ | None | - | 多模态输入特征(图片、音频、视频) |
| sampling_params | SamplingParams | ✗ | None | - | 采样参数;与 pooling_params 二选一 |
| pooling_params | PoolingParams | ✗ | None | - | 池化参数(embedding 任务);与 sampling_params 二选一 |
| eos_token_id | int | ✗ | None | 有效 token ID | EOS token ID,用于判断生成结束 |
| arrival_time | float | ✓ | - | > 0 | 请求到达时间戳(秒),用于调度和统计 |
| lora_request | LoRARequest | ✗ | None | - | LoRA 适配器信息,如需使用 LoRA 推理 |
| cache_salt | str | ✗ | None | - | 缓存盐值,用于区分相同 prompt 的不同缓存版本 |
| data_parallel_rank | int | ✗ | None | 0 ≤ rank < dp_size | 数据并行时的 rank 分配 |
| prompt_embeds | torch.Tensor | ✗ | None | shape: [seq_len, hidden_size] | 预计算的 prompt embeddings(替代 token IDs) |
| client_index | int | ✗ | 0 | ≥ 0 | 客户端索引,用于多前端场景 |
| current_wave | int | ✗ | 0 | ≥ 0 | 当前波次(DP 模式下避免竞态) |
| priority | int | ✗ | 0 | 任意整数 | 请求优先级,数值越大优先级越高 |
| trace_headers | dict | ✗ | None | - | OpenTelemetry 追踪头 |
用途和更新时机
创建时机:
- 用户调用
LLMEngine.add_request()时 - 经过
Processor.process_inputs()处理
更新时机:
- 不更新(immutable,请求创建后不变)
使用场景:
- Processor 将用户输入转换为 EngineCoreRequest
- EngineCore 接收 EngineCoreRequest 并加入调度队列
- Scheduler 根据 EngineCoreRequest 创建内部 Request 对象
数据流向
flowchart LR
A[用户输入<br/>prompt + params] --> B[Processor]
B --> C[EngineCoreRequest]
C --> D[EngineCore]
D --> E[Scheduler]
E --> F[Request<br/>内部表示]
2. EngineCoreOutput
基本信息
用途:表示单个请求在某次 step 中的输出,包含新生成的 token、logprobs、完成状态等。
定义:
class EngineCoreOutput(
msgspec.Struct,
array_like=True,
omit_defaults=True,
gc=False
):
"""单个请求的引擎输出"""
request_id: str # 请求 ID
new_token_ids: list[int] # 新生成的 token IDs
new_logprobs: Optional[LogprobsLists] = None # 新 token 的 logprobs
new_prompt_logprobs_tensors: Optional[LogprobsTensors] = None # Prompt logprobs
pooling_output: Optional[torch.Tensor] = None # 池化输出(embedding)
finish_reason: Optional[FinishReason] = None # 完成原因
stop_reason: Union[int, str, None] = None # 停止原因
events: Optional[list[EngineCoreEvent]] = None # 事件列表
kv_transfer_params: Optional[dict[str, Any]] = None # KV 传输参数
trace_headers: Optional[Mapping[str, str]] = None # 追踪头
num_cached_tokens: int = 0 # 缓存命中的 token 数
@property
def finished(self) -> bool:
"""是否完成"""
return self.finish_reason is not None
字段说明表
| 字段 | 类型 | 必填 | 默认值 | 说明 |
|---|---|---|---|---|
| request_id | str | ✓ | - | 对应的请求 ID |
| new_token_ids | list[int] | ✓ | - | 本次 step 新生成的 token IDs(通常 1 个,speculative decoding 可能多个) |
| new_logprobs | LogprobsLists | ✗ | None | 新 token 的 logprobs(如 logprobs > 0) |
| new_prompt_logprobs_tensors | LogprobsTensors | ✗ | None | Prompt 的 logprobs(仅 prefill 阶段) |
| pooling_output | torch.Tensor | ✗ | None | 池化输出(embedding 任务),shape: [hidden_size] |
| finish_reason | FinishReason | ✗ | None | 完成原因:LENGTH / STOP / ABORT / ERROR |
| stop_reason | int/str | ✗ | None | 停止原因详情:stop token ID 或 stop string |
| events | list[EngineCoreEvent] | ✗ | None | 事件列表(如缓存命中、抢占等) |
| kv_transfer_params | dict | ✗ | None | KV 传输参数(disaggregated serving) |
| trace_headers | dict | ✗ | None | 追踪头(继承自请求) |
| num_cached_tokens | int | ✗ | 0 | Prefix Caching 命中的 token 数量 |
用途和更新时机
创建时机:
- 每次
EngineCore.get_output()时为每个活跃请求生成
更新时机:
- 每次 step 生成新的 EngineCoreOutput(不复用)
使用场景:
- EngineCore 从 Scheduler 和 Executor 收集输出
- OutputProcessor 处理 EngineCoreOutput,生成 RequestOutput
- 用户通过
step()获取最终的 RequestOutput
3. EngineCoreOutputs
基本信息
用途:批量输出 + 统计信息的封装。
定义:
@dataclass
class EngineCoreOutputs:
"""引擎核心输出集合"""
outputs: list[EngineCoreOutput] # 所有请求的输出
scheduler_stats: Optional[SchedulerStats] # 调度器统计信息
timestamp: float # 时间戳
字段说明表
| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
| outputs | list[EngineCoreOutput] | ✓ | 本次 step 所有请求的输出列表 |
| scheduler_stats | SchedulerStats | ✗ | 调度器统计信息(队列长度、缓存命中率等) |
| timestamp | float | ✓ | 输出生成的时间戳(秒) |
4. RequestState
基本信息
用途:跟踪单个请求在 OutputProcessor 中的状态,用于增量 detokenization 和输出聚合。
定义:
class RequestState:
"""请求状态(OutputProcessor 内部)"""
def __init__(
self,
request_id: str,
parent_req: Optional[ParentRequest],
request_index: int,
lora_name: Optional[str],
output_kind: RequestOutputKind,
prompt: Optional[str],
prompt_token_ids: Optional[list[int]],
prompt_embeds: Optional[torch.Tensor],
logprobs_processor: Optional[LogprobsProcessor],
detokenizer: Optional[IncrementalDetokenizer],
max_tokens_param: Optional[int],
arrival_time: float,
queue: Optional[RequestOutputCollector],
log_stats: bool,
top_p: Optional[float] = None,
n: Optional[int] = None,
temperature: Optional[float] = None,
):
self.request_id = request_id
self.parent_req = parent_req # 父请求(n > 1 时)
self.request_index = request_index # 子请求索引
self.lora_name = lora_name # LoRA 名称
self.output_kind = output_kind # 输出类型
self.prompt = prompt # 原始 prompt 文本
self.prompt_token_ids = prompt_token_ids # Prompt token IDs
self.prompt_embeds = prompt_embeds # Prompt embeddings
self.prompt_len = ... # Prompt 长度
self.logprobs_processor = logprobs_processor # Logprobs 处理器
self.detokenizer = detokenizer # 增量 detokenizer
self.max_tokens_param = max_tokens_param # max_tokens 参数
self.top_p = top_p
self.n = n
self.temperature = temperature
self.is_prefilling = True # 是否在 prefill 阶段
self.queue = queue # 输出队列
self.num_cached_tokens = 0 # 缓存命中数
self.stats = RequestStateStats(...) # 统计信息
字段说明表
| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
| request_id | str | ✓ | 请求 ID |
| parent_req | ParentRequest | ✗ | 父请求(n > 1 时,用于聚合多个候选) |
| request_index | int | ✓ | 子请求索引(n > 1 时) |
| lora_name | str | ✗ | LoRA 适配器名称 |
| output_kind | RequestOutputKind | ✓ | 输出类型:COMPLETION / POOLING / CLASSIFICATION / SCORING |
| prompt | str | ✗ | 原始 prompt 文本 |
| prompt_token_ids | list[int] | ✗ | Prompt 的 token IDs |
| prompt_embeds | torch.Tensor | ✗ | Prompt embeddings |
| prompt_len | int | ✓ | Prompt 长度 |
| logprobs_processor | LogprobsProcessor | ✗ | Logprobs 处理器 |
| detokenizer | IncrementalDetokenizer | ✗ | 增量 detokenizer(流式生成文本) |
| max_tokens_param | int | ✗ | max_tokens 参数(用于进度计算) |
| is_prefilling | bool | ✓ | 是否在 prefill 阶段 |
| queue | RequestOutputCollector | ✗ | 输出队列(流式输出) |
| num_cached_tokens | int | ✓ | 缓存命中的 token 数 |
| stats | RequestStateStats | ✗ | 统计信息(TTFT、TPOT 等) |
用途和更新时机
创建时机:
LLMEngine.add_request()时,通过OutputProcessor.add_request()创建
更新时机:
- 每次
OutputProcessor.process_outputs()时更新 - 更新内容:新 token IDs、detokenization、logprobs、完成状态
使用场景:
- 跟踪请求的 detokenization 状态
- 增量生成文本(避免重复 detokenize)
- 收集统计信息(TTFT、TPOT)
- 检测 stop strings
5. RequestOutput
基本信息
用途:返回给用户的最终输出结构,包含生成的文本、token IDs、logprobs、完成状态等。
定义:
class RequestOutput:
"""请求输出(返回给用户)"""
def __init__(
self,
request_id: str,
prompt: Optional[str],
prompt_token_ids: list[int],
prompt_logprobs: Optional[PromptLogprobs],
outputs: list[CompletionOutput],
finished: bool,
metadata: Optional[RequestMetadata] = None,
):
self.request_id = request_id
self.prompt = prompt
self.prompt_token_ids = prompt_token_ids
self.prompt_logprobs = prompt_logprobs
self.outputs = outputs # list[CompletionOutput]
self.finished = finished
self.metadata = metadata
字段说明表
| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
| request_id | str | ✓ | 请求 ID |
| prompt | str | ✗ | 原始 prompt 文本 |
| prompt_token_ids | list[int] | ✓ | Prompt 的 token IDs |
| prompt_logprobs | PromptLogprobs | ✗ | Prompt 的 logprobs(如 prompt_logprobs > 0) |
| outputs | list[CompletionOutput] | ✓ | 生成的输出列表(n 个候选) |
| finished | bool | ✓ | 是否完成 |
| metadata | RequestMetadata | ✗ | 元数据(统计信息、事件等) |
6. CompletionOutput
基本信息
用途:单个完成输出(当 n > 1 时,RequestOutput 包含多个 CompletionOutput)。
定义:
class CompletionOutput:
"""单个完成输出"""
def __init__(
self,
index: int,
text: str,
token_ids: list[int],
cumulative_logprob: float,
logprobs: Optional[list[dict]],
finish_reason: Optional[str],
stop_reason: Optional[Union[int, str]],
lora_request: Optional[LoRARequest] = None,
):
self.index = index
self.text = text
self.token_ids = token_ids
self.cumulative_logprob = cumulative_logprob
self.logprobs = logprobs
self.finish_reason = finish_reason
self.stop_reason = stop_reason
self.lora_request = lora_request
字段说明表
| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
| index | int | ✓ | 输出索引(0 到 n-1) |
| text | str | ✓ | 生成的文本 |
| token_ids | list[int] | ✓ | 生成的 token IDs(增量或累积,取决于配置) |
| cumulative_logprob | float | ✓ | 累积 log 概率 |
| logprobs | list[dict] | ✗ | 每个 token 的 logprobs 详细信息 |
| finish_reason | str | ✗ | 完成原因:"length" / "stop" / "abort" |
| stop_reason | int/str | ✗ | 停止原因详情:stop token ID 或 stop string |
| lora_request | LoRARequest | ✗ | 使用的 LoRA 适配器 |
7. SamplingParams
基本信息
用途:控制生成过程的采样参数。
定义:
class SamplingParams:
"""采样参数"""
def __init__(
self,
n: int = 1,
best_of: Optional[int] = None,
presence_penalty: float = 0.0,
frequency_penalty: float = 0.0,
repetition_penalty: float = 1.0,
temperature: float = 1.0,
top_p: float = 1.0,
top_k: int = -1,
min_p: float = 0.0,
seed: Optional[int] = None,
use_beam_search: bool = False,
length_penalty: float = 1.0,
early_stopping: Union[bool, str] = False,
stop: Optional[Union[str, list[str]]] = None,
stop_token_ids: Optional[list[int]] = None,
include_stop_str_in_output: bool = False,
ignore_eos: bool = False,
max_tokens: Optional[int] = None,
min_tokens: int = 0,
logprobs: Optional[int] = None,
prompt_logprobs: Optional[int] = None,
detokenize: bool = True,
skip_special_tokens: bool = True,
spaces_between_special_tokens: bool = True,
logits_processors: Optional[list[LogitsProcessor]] = None,
):
...
字段说明表(常用字段)
| 字段 | 类型 | 默认值 | 约束 | 说明 |
|---|---|---|---|---|
| n | int | 1 | > 0 | 生成的候选数量 |
| temperature | float | 1.0 | > 0 | 采样温度(0 = greedy,> 1 = 更随机) |
| top_p | float | 1.0 | 0 < p ≤ 1 | Nucleus sampling 阈值 |
| top_k | int | -1 | -1 或 > 0 | Top-K sampling(-1 = 不限制) |
| max_tokens | int | None | > 0 | 最大生成 token 数(None = 直到 EOS) |
| stop | str/list[str] | None | - | 停止字符串 |
| stop_token_ids | list[int] | None | 有效 token IDs | 停止 token IDs |
| presence_penalty | float | 0.0 | -2.0 ~ 2.0 | 出现惩罚(惩罚重复) |
| frequency_penalty | float | 0.0 | -2.0 ~ 2.0 | 频率惩罚(惩罚高频词) |
| repetition_penalty | float | 1.0 | > 0 | 重复惩罚(> 1 惩罚,< 1 鼓励) |
| logprobs | int | None | > 0 | 返回 Top-N logprobs |
| seed | int | None | - | 随机种子(可复现) |
数据结构关系图(类图)
classDiagram
class EngineCoreRequest {
+str request_id
+list~int~ prompt_token_ids
+SamplingParams sampling_params
+float arrival_time
+int priority
}
class EngineCoreOutput {
+str request_id
+list~int~ new_token_ids
+FinishReason finish_reason
+int num_cached_tokens
+finished() bool
}
class EngineCoreOutputs {
+list~EngineCoreOutput~ outputs
+SchedulerStats scheduler_stats
+float timestamp
}
class RequestState {
+str request_id
+str prompt
+list~int~ prompt_token_ids
+IncrementalDetokenizer detokenizer
+bool is_prefilling
+int num_cached_tokens
}
class RequestOutput {
+str request_id
+str prompt
+list~int~ prompt_token_ids
+list~CompletionOutput~ outputs
+bool finished
}
class CompletionOutput {
+int index
+str text
+list~int~ token_ids
+float cumulative_logprob
+str finish_reason
}
class SamplingParams {
+int n
+float temperature
+float top_p
+int max_tokens
+list~str~ stop
}
EngineCoreRequest --> SamplingParams : contains
EngineCoreOutputs --> EngineCoreOutput : contains many
RequestOutput --> CompletionOutput : contains many
RequestState --> RequestOutput : generates
EngineCoreOutput --> RequestState : processed by
数据流图
flowchart TB
A[用户输入<br/>prompt + SamplingParams] --> B[Processor]
B --> C[EngineCoreRequest]
C --> D[EngineCore]
D --> E[Scheduler]
E --> F[Executor]
F --> G[ModelRunnerOutput]
G --> D
D --> H[EngineCoreOutput]
H --> I[OutputProcessor]
I --> J[RequestState<br/>增量 detokenization]
J --> K[RequestOutput<br/>+ CompletionOutput]
K --> L[用户]
style C fill:#e1f5ff
style H fill:#ffe1e1
style K fill:#e1ffe1
流程说明:
- 输入转换:用户输入 → Processor → EngineCoreRequest
- 调度与执行:EngineCoreRequest → EngineCore → Scheduler + Executor
- 输出生成:ModelRunnerOutput → EngineCoreOutput
- 输出处理:EngineCoreOutput → OutputProcessor → RequestState → RequestOutput
- 返回用户:RequestOutput(包含 CompletionOutput)
状态转换图
stateDiagram-v2
[*] --> WAITING: add_request()
WAITING --> RUNNING: schedule()
RUNNING --> RUNNING: step() (generating)
RUNNING --> FINISHED: 完成条件满足
RUNNING --> ABORTED: abort_request()
WAITING --> ABORTED: abort_request()
FINISHED --> [*]
ABORTED --> [*]
note right of WAITING
队列中等待调度
未分配 KV cache
end note
note right of RUNNING
已分配 KV cache
正在生成 tokens
end note
note right of FINISHED
完成原因:
- LENGTH (达到 max_tokens)
- STOP (遇到 stop token/string)
end note
note right of ABORTED
被取消:
- 用户调用 abort_request()
- 资源不足被抢占
end note
字段映射与版本演进
EngineCoreRequest 映射
| 用户 API 字段 | EngineCoreRequest 字段 | 转换逻辑 |
|---|---|---|
prompt (str) |
prompt_token_ids (list[int]) |
Tokenizer.encode() |
prompt (dict with multi_modal_data) |
mm_features |
多模态处理器提取特征 |
sampling_params.max_tokens |
sampling_params.max_tokens |
直接复制 |
arrival_time |
arrival_time |
默认 time.time() |
priority |
priority |
直接复制(默认 0) |
RequestOutput 映射
| EngineCoreOutput 字段 | RequestOutput 字段 | 转换逻辑 |
|---|---|---|
new_token_ids |
CompletionOutput.token_ids |
累积或增量(取决于配置) |
| N/A(需 detokenize) | CompletionOutput.text |
Tokenizer.decode() |
finish_reason |
CompletionOutput.finish_reason |
枚举转字符串 |
num_cached_tokens |
RequestMetadata.cached_tokens |
统计信息 |
版本兼容
V0 → V1 变更:
| V0 类型 | V1 类型 | 变更说明 |
|---|---|---|
Sequence |
Request |
重命名,字段大幅精简 |
SequenceGroup |
N/A | 移除(V1 不使用 SequenceGroup) |
SequenceOutput |
EngineCoreOutput |
重命名,结构简化 |
SequenceGroupOutput |
RequestOutput |
重命名,去除 group 概念 |
兼容性保证:
- 用户 API(
LLM、LLMEngine)完全兼容 - 内部数据结构不保证兼容(V1 架构重构)
性能考虑
内存占用
| 数据结构 | 内存占用 | 说明 |
|---|---|---|
| EngineCoreRequest | ~1 KB | 小对象,主要是字段引用 |
| RequestState | ~2 KB + detokenizer state | 包含增量 detokenization 状态 |
| CompletionOutput | ~4 KB / 1000 tokens | 主要是 token IDs 和 text |
优化策略:
- 零拷贝:EngineCoreOutput 使用 msgspec.Struct,支持零拷贝序列化
- 增量 detokenization:避免重复 detokenize 整个序列
- Lazy evaluation:logprobs 按需计算
序列化性能
| 数据结构 | 序列化方式 | 性能 |
|---|---|---|
| EngineCoreRequest | msgspec | ~1 μs / request |
| EngineCoreOutput | msgspec | ~0.5 μs / output |
| RequestOutput | Python dataclass | ~5 μs / output |
msgspec 优势:
- 比 pickle 快 10-20x
- 比 JSON 快 5-10x
- 支持零拷贝(array_like=True)
使用示例
创建 EngineCoreRequest
from vllm.v1.engine import EngineCoreRequest
from vllm.sampling_params import SamplingParams
import time
sampling_params = SamplingParams(
temperature=0.8,
top_p=0.95,
max_tokens=100,
stop=["</s>"]
)
request = EngineCoreRequest(
request_id="request-123",
prompt_token_ids=[1, 2345, 678, 90], # 已 tokenize
mm_features=None, # 无多模态输入
sampling_params=sampling_params,
pooling_params=None,
eos_token_id=2, # </s>
arrival_time=time.time(),
lora_request=None,
cache_salt=None,
data_parallel_rank=None,
priority=5, # 高优先级
)
处理 EngineCoreOutput
from vllm.v1.engine import EngineCoreOutput, FinishReason
output = EngineCoreOutput(
request_id="request-123",
new_token_ids=[42], # 本次生成 1 个 token
new_logprobs=None,
finish_reason=None, # 未完成
num_cached_tokens=10, # 10 个 token 来自缓存
)
if output.finished:
print(f"请求完成,原因:{output.finish_reason}")
else:
print(f"生成了 {len(output.new_token_ids)} 个 token")
print(f"缓存命中:{output.num_cached_tokens} tokens")
构造 RequestOutput
from vllm.outputs import RequestOutput, CompletionOutput
completion = CompletionOutput(
index=0,
text="The capital of France is Paris.",
token_ids=[464, 3139, 310, 3444, 338, 3681, 29889],
cumulative_logprob=-5.23,
logprobs=None,
finish_reason="stop",
stop_reason="</s>",
)
request_output = RequestOutput(
request_id="request-123",
prompt="What is the capital of France?",
prompt_token_ids=[1, 1724, 338, 278, 7483, 310, 3444, 29973],
prompt_logprobs=None,
outputs=[completion],
finished=True,
)
print(f"生成的文本: {request_output.outputs[0].text}")
print(f"完成原因: {request_output.outputs[0].finish_reason}")