Files
llm-compass/nvidia_router.py
aszerW 2afe976a31 feat: 启用 Apple Silicon MPS 加速 + 兼容 transformers 5.x + 本地运行配置
nvidia_router.py 变更:
- device 默认值从 'cpu' 改为 'auto',自动检测 MPS/CUDA/CPU
- AutoConfig 替换为 DebertaV2Config + 手动解析 config.json
  (nvidia/prompt-task-and-complexity-classifier 的 config.json 无 model_type,
   transformers 5.x 的 AutoConfig 会直接报错)
- MPS 设备自动转换 float16,修复 MPS 矩阵乘法数据类型冲突崩溃
  (MPS NDArrayMatrixMultiplication 要求 dst/accumulator 同类型)
- 日志增加设备和精度信息输出

docker-compose.yml 变更:
- 端口映射改为 402:8000 (本地开发端口)
- volume 从 named volume 改为 ./data 本地目录映射
- API Key 改回环境变量引用 (密钥存 .env 文件,已在 .gitignore 中)

测试环境: Mac Mini M4 Pro / 64GB / macOS 15.3.1
运行方式: .venv/bin/python -m uvicorn main:app --host 0.0.0.0 --port 402
测试结果:
- MPS + FP16 分类器正常工作,稳态路由延迟 ~53ms
- NVIDIA 3-tier 路由决策正确 (simple/medium/complex)
- OpenAI 兼容 API 正常响应,DashScope Qwen 模型调用正常
2026-04-19 00:17:38 +08:00

344 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
NVIDIA Prompt Task & Complexity Classifier Router
手动加载自定义多头模型支持3-tier路由
模型: nvidia/prompt-task-and-complexity-classifier (184M参数)
架构: DeBERTa-v3-base backbone + 8个分类头
输出: task_type(12类), creativity(3类), reasoning(2类),
domain_knowledge(4类), complexity_score 等多维度
"""
import torch
import torch.nn as nn
from transformers import AutoTokenizer, DebertaV2Model, DebertaV2Config
from safetensors.torch import load_file
from huggingface_hub import hf_hub_download
from typing import Dict, Optional
import logging
import json
logger = logging.getLogger(__name__)
class ClassificationHead(nn.Module):
"""单个分类头"""
def __init__(self, input_dim: int, num_classes: int, dropout: float = 0.2):
super().__init__()
self.dropout = nn.Dropout(dropout)
self.fc = nn.Linear(input_dim, num_classes)
def forward(self, x):
x = self.dropout(x)
return self.fc(x)
class NvidiaMultiHeadClassifier(nn.Module):
"""
NVIDIA 多头分类器
DeBERTa backbone + 8个独立分类头
"""
def __init__(self, config):
super().__init__()
self.config = config
# DeBERTa backbone
self.backbone = DebertaV2Model.from_pretrained(
config.base_model,
ignore_mismatched_sizes=True,
use_safetensors=True
)
hidden_size = 768 # DeBERTa-v3-base
dropout = config.fc_dropout if hasattr(config, 'fc_dropout') else 0.2
# 8个分类头 (与 state_dict 中的 head_0 ~ head_7 对应)
target_sizes = config.target_sizes
self.head_0 = ClassificationHead(hidden_size, target_sizes["task_type"], dropout) # 12类
self.head_1 = ClassificationHead(hidden_size, target_sizes["creativity_scope"], dropout) # 3类
self.head_2 = ClassificationHead(hidden_size, target_sizes["reasoning"], dropout) # 2类
self.head_3 = ClassificationHead(hidden_size, target_sizes["contextual_knowledge"], dropout) # 2类
self.head_4 = ClassificationHead(hidden_size, target_sizes["number_of_few_shots"], dropout) # 6类
self.head_5 = ClassificationHead(hidden_size, target_sizes["domain_knowledge"], dropout) # 4类
self.head_6 = ClassificationHead(hidden_size, target_sizes["no_label_reason"], dropout) # 1类
self.head_7 = ClassificationHead(hidden_size, target_sizes["constraint_ct"], dropout) # 2类
# Head 名称映射
self.head_names = [
"task_type", # head_0: 12类
"creativity_scope", # head_1: 3类
"reasoning", # head_2: 2类
"contextual_knowledge", # head_3: 2类
"number_of_few_shots", # head_4: 6类
"domain_knowledge", # head_5: 4类
"no_label_reason", # head_6: 1类
"constraint_ct", # head_7: 2类
]
def forward(self, input_ids, attention_mask=None, token_type_ids=None):
outputs = self.backbone(
input_ids=input_ids,
attention_mask=attention_mask,
token_type_ids=token_type_ids
)
# 使用 [CLS] token 的隐层
cls_output = outputs.last_hidden_state[:, 0]
# 各头输出
head_outputs = {
"task_type": self.head_0(cls_output),
"creativity_scope": self.head_1(cls_output),
"reasoning": self.head_2(cls_output),
"contextual_knowledge": self.head_3(cls_output),
"number_of_few_shots": self.head_4(cls_output),
"domain_knowledge": self.head_5(cls_output),
"no_label_reason": self.head_6(cls_output),
"constraint_ct": self.head_7(cls_output),
}
return head_outputs
class NvidiaComplexityRouter:
"""NVIDIA 多头分类器路由封装"""
MODEL_NAME = "nvidia/prompt-task-and-complexity-classifier"
# Task type 映射
TASK_TYPE_MAP = {
0: "Brainstorming", 1: "Chatbot", 2: "Classification",
3: "Closed QA", 4: "Code Generation", 5: "Extraction",
6: "Open QA", 7: "Other", 8: "Rewrite",
9: "Summarization", 10: "Text Generation", 11: "Unknown"
}
# Domain knowledge 映射
DOMAIN_MAP = {0: "High", 1: "Low", 2: "Medium", 3: "No"}
# Creativity 映射
CREATIVITY_MAP = {0: "High", 1: "Low", 2: "No"}
def __init__(self, device: str = "auto"):
if device == "auto":
if torch.backends.mps.is_available():
device = "mps"
logger.info("MPS (Metal GPU) detected, using MPS acceleration")
elif torch.cuda.is_available():
device = "cuda"
else:
device = "cpu"
logger.info("No GPU detected, using CPU")
self.device = device
self.tokenizer = None
self.model = None
self.config = None
self._initialized = False
def initialize(self):
"""延迟加载模型"""
if self._initialized:
return
logger.info(f"Loading NVIDIA classifier: {self.MODEL_NAME}")
# 1. 手动加载自定义 config.json该模型无 model_typeAutoConfig 不兼容)
config_path = hf_hub_download(self.MODEL_NAME, "config.json")
with open(config_path, "r") as f:
custom_config = json.load(f)
# 构建 backbone 的 DeBERTa config从 base_model 加载)
base_model = custom_config.get("base_model", "microsoft/DeBERTa-v3-base")
self.config = DebertaV2Config.from_pretrained(base_model)
# 保存自定义分类头参数
self.config.target_sizes = custom_config["target_sizes"]
self.config.fc_dropout = custom_config.get("fc_dropout", 0.2)
self.config.base_model = base_model
# 2. 加载 tokenizer (slow模式兼容性好)
self.tokenizer = AutoTokenizer.from_pretrained(self.MODEL_NAME, use_fast=False)
# 3. 构建模型并加载权重
self.model = NvidiaMultiHeadClassifier(self.config)
model_path = hf_hub_download(self.MODEL_NAME, "model.safetensors")
state_dict = load_file(model_path)
self.model.load_state_dict(state_dict, strict=False)
self.model.to(self.device)
# MPS 需要 float16 以避免矩阵乘法数据类型冲突
if self.device == "mps":
self.model.half()
self.model.eval()
self._initialized = True
dtype = "float16" if self.device == "mps" else "float32"
logger.info(f"NVIDIA classifier loaded successfully on {self.device} ({dtype})")
def predict(self, query: str) -> Dict:
"""
预测查询的多维度特征
Returns:
{
"tier": "simple" | "medium" | "complex",
"complexity_score": float (0-1),
"task_type": str,
"domain_knowledge": str,
"reasoning": bool,
"creativity": str
}
"""
if not self._initialized:
self.initialize()
inputs = self.tokenizer(
query, return_tensors="pt", truncation=True, max_length=512, padding=True
)
inputs = {k: v.to(self.device) for k, v in inputs.items()}
with torch.no_grad():
outputs = self.model(**inputs)
# 解析各头输出
task_type_idx = torch.argmax(outputs["task_type"], dim=-1).item()
task_type = self.TASK_TYPE_MAP.get(task_type_idx, "Unknown")
domain_idx = torch.argmax(outputs["domain_knowledge"], dim=-1).item()
domain = self.DOMAIN_MAP.get(domain_idx, "Unknown")
creativity_idx = torch.argmax(outputs["creativity_scope"], dim=-1).item()
creativity = self.CREATIVITY_MAP.get(creativity_idx, "Unknown")
reasoning_idx = torch.argmax(outputs["reasoning"], dim=-1).item()
needs_reasoning = reasoning_idx == 1
# 计算综合复杂度评分 (0-1)
complexity_score = self._compute_complexity_score(
domain=domain,
creativity=creativity,
needs_reasoning=needs_reasoning,
task_type=task_type
)
tier = self._score_to_tier(complexity_score)
return {
"tier": tier,
"complexity_score": complexity_score,
"task_type": task_type,
"domain_knowledge": domain,
"reasoning": needs_reasoning,
"creativity": creativity,
}
def _compute_complexity_score(self, domain, creativity, needs_reasoning, task_type) -> float:
"""
综合多维度计算复杂度评分 (0-1)
权重:
- domain_knowledge: 40% (High=1.0, Medium=0.6, Low=0.3, No=0.0)
- reasoning: 30% (Yes=1.0, No=0.0)
- creativity: 20% (High=1.0, Low=0.4, No=0.0)
- task_type: 10% (Code=0.8, QA=0.5, Chatbot=0.2, ...)
"""
domain_scores = {"High": 1.0, "Medium": 0.6, "Low": 0.3, "No": 0.0}
creativity_scores = {"High": 1.0, "Low": 0.4, "No": 0.0}
task_complexity = {
"Code Generation": 0.8, "Text Generation": 0.7,
"Summarization": 0.6, "Rewrite": 0.5,
"Open QA": 0.5, "Closed QA": 0.4,
"Classification": 0.3, "Extraction": 0.3,
"Brainstorming": 0.6, "Chatbot": 0.2,
"Other": 0.5, "Unknown": 0.5,
}
score = (
0.4 * domain_scores.get(domain, 0.5) +
0.3 * (1.0 if needs_reasoning else 0.0) +
0.2 * creativity_scores.get(creativity, 0.5) +
0.1 * task_complexity.get(task_type, 0.5)
)
return round(score, 3)
def _score_to_tier(self, score: float) -> str:
if score < 0.35:
return "simple"
elif score < 0.65:
return "medium"
else:
return "complex"
def select_model(self, query: str) -> str:
"""直接返回推荐的模型名称"""
result = self.predict(query)
model_map = {
"simple": "qwen-flash",
"medium": "qwen-plus",
"complex": "qwen-max"
}
return model_map[result["tier"]]
def benchmark(self, queries: list) -> Dict:
"""批量测试"""
import time
results = []
for query in queries:
start = time.time()
result = self.predict(query)
elapsed = (time.time() - start) * 1000
results.append({
"query": query[:50],
"tier": result["tier"],
"score": result["complexity_score"],
"task": result["task_type"],
"domain": result["domain_knowledge"],
"reasoning": result["reasoning"],
"time_ms": round(elapsed, 1)
})
times = [r["time_ms"] for r in results]
return {
"avg_ms": round(sum(times) / len(times), 1),
"results": results
}
# 全局单例
_router_instance: Optional[NvidiaComplexityRouter] = None
def get_nvidia_router() -> NvidiaComplexityRouter:
global _router_instance
if _router_instance is None:
_router_instance = NvidiaComplexityRouter()
return _router_instance
def select_model_by_nvidia(query: str) -> str:
return get_nvidia_router().select_model(query)
if __name__ == "__main__":
test_queries = [
"你好",
"What is 2+2?",
"Explain quantum computing principles in detail",
"Write a quicksort algorithm in Python with error handling",
"Analyze this 10-page research paper and summarize the key innovations",
"Rewrite this sentence to be more concise",
"Generate a creative story about a robot",
]
router = NvidiaComplexityRouter()
print("=" * 80)
print("NVIDIA Prompt Task & Complexity Classifier - 3-Tier Router Test")
print("=" * 80)
for query in test_queries:
result = router.predict(query)
model = router.select_model(query)
print(f"\nQuery: {query}")
print(f" Tier: {result['tier']}")
print(f" Score: {result['complexity_score']}")
print(f" Task: {result['task_type']}")
print(f" Domain: {result['domain_knowledge']}")
print(f" Reasoning: {result['reasoning']}")
print(f" Creativity: {result['creativity']}")
print(f" -> Model: {model}")