Pi 官方文档
自定义 Provider
自定义 Provider
扩展可以通过 pi.registerProvider() 注册自定义模型 Provider。这可以实现:
- 代理 - 通过企业代理或 API 网关转发请求
- 自定义端点 - 使用自托管或私有模型部署
- OAuth/SSO - 为企业 Provider 添加认证流程
- 自定义 API - 为非标准 LLM API 实现流式传输
示例扩展
可以参考这些完整的 Provider 示例:
目录
快速参考
import type { ExtensionAPI } from "@earendil-works/pi-coding-agent";
export default function (pi: ExtensionAPI) {
// Override baseUrl for existing provider
pi.registerProvider("anthropic", {
baseUrl: "https://proxy.example.com"
});
// Register new provider with models
pi.registerProvider("my-provider", {
name: "My Provider",
baseUrl: "https://api.example.com",
apiKey: "$MY_API_KEY",
api: "openai-completions",
models: [
{
id: "my-model",
name: "My Model",
reasoning: false,
input: ["text", "image"],
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 },
contextWindow: 128000,
maxTokens: 4096
}
]
});
}
扩展工厂也可以是 async。如果需要动态发现模型,请在 factory 里拉取并注册模型,不要放在 session_start 里。pi 会等到 factory 完成后再继续启动,所以这个 Provider 会在交互式启动阶段以及 pi --list-models 中可用。
覆盖现有 Provider
最简单的场景:通过代理重定向一个现有 Provider。
// All Anthropic requests now go through your proxy
pi.registerProvider("anthropic", {
baseUrl: "https://proxy.example.com"
});
// Add custom headers to OpenAI requests
pi.registerProvider("openai", {
headers: {
"X-Custom-Header": "value"
}
});
// Both baseUrl and headers
pi.registerProvider("google", {
baseUrl: "https://ai-gateway.corp.com/google",
headers: {
"X-Corp-Auth": "$CORP_AUTH_TOKEN" // env var or literal
}
});
如果只提供 baseUrl 和/或 headers(不提供 models),这个 Provider 现有的所有模型都会保留,只是切到新的端点。
注册新 Provider
要添加一个全新的 Provider,请在所需配置之外指定 models。
如果模型列表来自远程端点,请使用异步扩展工厂:
import type { ExtensionAPI } from "@earendil-works/pi-coding-agent";
export default async function (pi: ExtensionAPI) {
const response = await fetch("http://localhost:1234/v1/models");
const payload = (await response.json()) as {
data: Array<{
id: string;
name?: string;
context_window?: number;
max_tokens?: number;
}>;
};
pi.registerProvider("local-openai", {
baseUrl: "http://localhost:1234/v1",
apiKey: "$LOCAL_OPENAI_API_KEY",
api: "openai-completions",
models: payload.data.map((model) => ({
id: model.id,
name: model.name ?? model.id,
reasoning: false,
input: ["text"],
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 },
contextWindow: model.context_window ?? 128000,
maxTokens: model.max_tokens ?? 4096,
})),
});
}
这样会在启动完成前注册拉取到的模型。
pi.registerProvider("my-llm", {
baseUrl: "https://api.my-llm.com/v1",
apiKey: "$MY_LLM_API_KEY", // env var reference
api: "openai-completions", // which streaming API to use
models: [
{
id: "my-llm-large",
name: "My LLM Large",
reasoning: true, // supports extended thinking
input: ["text", "image"],
cost: {
input: 3.0, // $/million tokens
output: 15.0,
cacheRead: 0.3,
cacheWrite: 3.75
},
contextWindow: 200000,
maxTokens: 16384
}
]
});
只要提供了 models,它就会替换掉该 Provider 的所有现有模型。
apiKey 和自定义 header 值使用与 models.json 相同的配置值语法:以 !command 开头时,会对整个值执行一个命令;$ENV_VAR 和 ${ENV_VAR} 会插入环境变量;$$ 输出字面量 $;$! 输出字面量 !。
注销 Provider
使用 pi.unregisterProvider(name) 可以移除之前通过 pi.registerProvider(name, ...) 注册的 Provider:
// Register
pi.registerProvider("my-llm", {
baseUrl: "https://api.my-llm.com/v1",
apiKey: "$MY_LLM_API_KEY",
api: "openai-completions",
models: [
{
id: "my-llm-large",
name: "My LLM Large",
reasoning: true,
input: ["text", "image"],
cost: { input: 3.0, output: 15.0, cacheRead: 0.3, cacheWrite: 3.75 },
contextWindow: 200000,
maxTokens: 16384
}
]
});
// Later, remove it
pi.unregisterProvider("my-llm");
注销后会移除该 Provider 的动态模型、API key 回退、OAuth Provider 注册,以及自定义流处理器注册。任何被覆盖的内置模型或 Provider 行为都会恢复。
在首次加载扩展阶段之后发起的调用会立即生效,因此不需要 /reload。
API 类型
api 字段决定使用哪种流式实现:
| API | 适用于 |
|---|---|
anthropic-messages | Anthropic Claude API 及兼容实现 |
openai-completions | OpenAI Chat Completions API 及兼容实现 |
openai-responses | OpenAI Responses API |
azure-openai-responses | Azure OpenAI Responses API |
openai-codex-responses | OpenAI Codex Responses API |
mistral-conversations | Mistral SDK Conversations/Chat 流式传输 |
google-generative-ai | Google Generative AI API |
google-vertex | Google Vertex AI API |
bedrock-converse-stream | Amazon Bedrock Converse API |
大多数兼容 OpenAI 的 Provider 都可以使用 openai-completions。如果要为不同模型设置各自的 thinking level,请使用模型级别的 thinkingLevelMap;如果要处理 Provider 的特殊行为,请使用 compat:
models: [{
id: "custom-model",
// ...
reasoning: true,
thinkingLevelMap: { // map pi levels to provider values; null hides unsupported levels
minimal: null,
low: null,
medium: null,
high: "default",
xhigh: "max"
},
compat: {
supportsDeveloperRole: false, // use "system" instead of "developer"
supportsReasoningEffort: true,
maxTokensField: "max_tokens", // instead of "max_completion_tokens"
requiresToolResultName: true, // tool results need name field
thinkingFormat: "qwen", // top-level enable_thinking: true
cacheControlFormat: "anthropic" // Anthropic-style cache_control markers
}
}]
OpenRouter 风格的 reasoning: { effort } 控制,请使用 openrouter。Together 风格的 reasoning: { enabled } 控制,请使用 together;配合 supportsReasoningEffort 时,它还会发送 reasoning_effort。对于读取 chat_template_kwargs.enable_thinking 且需要 preserve_thinking 的本地 Qwen 兼容服务器,请使用 qwen-chat-template。
对于通过系统提示词、最后一个 tool 定义,以及最后一段 user/assistant 文本内容上的 cache_control 暴露 Anthropic 风格提示词缓存的 OpenAI 兼容 Provider,请使用 cacheControlFormat: "anthropic"。
对于使用 api: "anthropic-messages" 的 Anthropic 兼容 Provider,如果上游模型需要 adaptive thinking(thinking.type: "adaptive" 加上 output_config.effort),请在模型或 Provider 上设置 compat.forceAdaptiveThinking: true。内置的 adaptive Claude 模型会自动这样做。只有当 Provider 会发出空的 thinking signature,并且回放时需要 signature: "" 时,才设置 compat.allowEmptySignature: true。
迁移说明:Mistral 已从
openai-completions迁移到mistral-conversations。 原生 Mistral 模型请使用mistral-conversations。 如果你是有意通过openai-completions转发 Mistral 兼容或自定义端点,请按需显式设置compat标志。
认证头
如果你的 Provider(模型提供方)要求使用 Authorization: Bearer <key>,但并不使用标准 API,就把 authHeader: true 设为 true:
pi.registerProvider("custom-api", {
baseUrl: "https://api.example.com",
apiKey: "$MY_API_KEY",
authHeader: true, // adds Authorization: Bearer header
api: "openai-completions",
models: [...]
});
OAuth 支持
添加可与 /login 集成的 OAuth/SSO 认证:
import type { OAuthCredentials, OAuthLoginCallbacks } from "@earendil-works/pi-ai";
pi.registerProvider("corporate-ai", {
baseUrl: "https://ai.corp.com/v1",
api: "openai-responses",
models: [...],
oauth: {
name: "Corporate AI (SSO)",
async login(callbacks: OAuthLoginCallbacks): Promise<OAuthCredentials> {
const method = await callbacks.onSelect({
message: "Select login method:",
options: [
{ id: "browser", label: "Browser OAuth" },
{ id: "device", label: "Device code" }
]
});
if (!method) throw new Error("Login cancelled");
let code: string;
if (method === "device") {
callbacks.onDeviceCode({
userCode: "ABCD-1234",
verificationUri: "https://sso.corp.com/device",
intervalSeconds: 5,
expiresInSeconds: 900
});
code = await pollDeviceCodeUntilComplete();
} else {
callbacks.onAuth({ url: "https://sso.corp.com/authorize?..." });
code = await callbacks.onPrompt({ message: "Enter SSO code:" });
}
// Exchange for tokens (your implementation)
const tokens = await exchangeCodeForTokens(code);
return {
refresh: tokens.refreshToken,
access: tokens.accessToken,
expires: Date.now() + tokens.expiresIn * 1000
};
},
async refreshToken(credentials: OAuthCredentials): Promise<OAuthCredentials> {
const tokens = await refreshAccessToken(credentials.refresh);
return {
refresh: tokens.refreshToken ?? credentials.refresh,
access: tokens.accessToken,
expires: Date.now() + tokens.expiresIn * 1000
};
},
getApiKey(credentials: OAuthCredentials): string {
return credentials.access;
},
// Optional: modify models based on user's subscription
modifyModels(models, credentials) {
const region = decodeRegionFromToken(credentials.access);
return models.map(m => ({
...m,
baseUrl: `https://${region}.ai.corp.com/v1`
}));
}
}
});
注册后,用户可以通过 /login corporate-ai 进行认证。
OAuthLoginCallbacks
callbacks 对象提供三种认证方式:
interface OAuthLoginCallbacks {
// Open URL in browser (for OAuth redirects)
onAuth(params: { url: string }): void;
// Show device code (for device authorization flow)
onDeviceCode(params: {
userCode: string;
verificationUri: string;
intervalSeconds?: number;
expiresInSeconds?: number;
}): void;
// Prompt user for input (for manual token entry)
onPrompt(params: { message: string }): Promise<string>;
// Show an interactive selector, e.g. to choose browser OAuth vs device code
onSelect(params: {
message: string;
options: { id: string; label: string }[];
}): Promise<string | undefined>;
}
OAuthCredentials
凭据会持久化到 ~/.pi/agent/auth.json:
interface OAuthCredentials {
refresh: string; // Refresh token (for refreshToken())
access: string; // Access token (returned by getApiKey())
expires: number; // Expiration timestamp in milliseconds
}
自定义流式 API
对于使用非标准 API 的 Provider(模型提供方),请实现 streamSimple。在自己编写之前,先研究现有的 Provider 实现:
参考实现:
- anthropic.ts - Anthropic Messages API
- mistral.ts - Mistral Conversations API
- openai-completions.ts - OpenAI Chat Completions
- openai-responses.ts - OpenAI Responses API
- google.ts - Google Generative AI
- amazon-bedrock.ts - AWS Bedrock
流式模式
所有 Provider 都遵循同一种模式:
import {
type AssistantMessage,
type AssistantMessageEventStream,
type Context,
type Model,
type SimpleStreamOptions,
calculateCost,
createAssistantMessageEventStream,
} from "@earendil-works/pi-ai";
function streamMyProvider(
model: Model<any>,
context: Context,
options?: SimpleStreamOptions
): AssistantMessageEventStream {
const stream = createAssistantMessageEventStream();
(async () => {
// Initialize output message
const output: AssistantMessage = {
role: "assistant",
content: [],
api: model.api,
provider: model.provider,
model: model.id,
usage: {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 0,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
stopReason: "stop",
timestamp: Date.now(),
};
try {
// Push start event
stream.push({ type: "start", partial: output });
// Make API request and process response...
// Push content events as they arrive...
// Push done event
stream.push({
type: "done",
reason: output.stopReason as "stop" | "length" | "toolUse",
message: output
});
stream.end();
} catch (error) {
output.stopReason = options?.signal?.aborted ? "aborted" : "error";
output.errorMessage = error instanceof Error ? error.message : String(error);
stream.push({ type: "error", reason: output.stopReason, error: output });
stream.end();
}
})();
return stream;
}
事件类型
按 stream.push() 以以下顺序推送 event:
-
{ type: "start", partial: output }- 流开始 -
内容事件(可重复;每个块都要跟踪
contentIndex):{ type: "text_start", contentIndex, partial }- 文本块开始{ type: "text_delta", contentIndex, delta, partial }- 文本分片{ type: "text_end", contentIndex, content, partial }- 文本块结束{ type: "thinking_start", contentIndex, partial }- Thinking 开始{ type: "thinking_delta", contentIndex, delta, partial }- Thinking 分片{ type: "thinking_end", contentIndex, content, partial }- Thinking 结束{ type: "toolcall_start", contentIndex, partial }- tool call 开始{ type: "toolcall_delta", contentIndex, delta, partial }- tool call 的 JSON 分片{ type: "toolcall_end", contentIndex, toolCall, partial }- tool call 结束
-
{ type: "done", reason, message }或{ type: "error", reason, error }- 流结束
每个 event 里的 partial 字段都包含当前的 AssistantMessage 状态。收到数据后,先更新 output.content,再把 output 作为 partial 传入。
内容块
内容到达时,把 content blocks 追加到 output.content:
// Text block
output.content.push({ type: "text", text: "" });
stream.push({ type: "text_start", contentIndex: output.content.length - 1, partial: output });
// As text arrives
const block = output.content[contentIndex];
if (block.type === "text") {
block.text += delta;
stream.push({ type: "text_delta", contentIndex, delta, partial: output });
}
// When block completes
stream.push({ type: "text_end", contentIndex, content: block.text, partial: output });
tool call
tool call 需要先累积 JSON,再解析:
// Start tool call
output.content.push({
type: "toolCall",
id: toolCallId,
name: toolName,
arguments: {}
});
stream.push({ type: "toolcall_start", contentIndex: output.content.length - 1, partial: output });
// Accumulate JSON
let partialJson = "";
partialJson += jsonDelta;
try {
block.arguments = JSON.parse(partialJson);
} catch {}
stream.push({ type: "toolcall_delta", contentIndex, delta: jsonDelta, partial: output });
// Complete
stream.push({
type: "toolcall_end",
contentIndex,
toolCall: { type: "toolCall", id, name, arguments: block.arguments },
partial: output
});
使用量和成本
根据 API 响应更新 usage,并计算成本:
output.usage.input = response.usage.input_tokens;
output.usage.output = response.usage.output_tokens;
output.usage.cacheRead = response.usage.cache_read_tokens ?? 0;
output.usage.cacheWrite = response.usage.cache_write_tokens ?? 0;
output.usage.totalTokens = output.usage.input + output.usage.output +
output.usage.cacheRead + output.usage.cacheWrite;
calculateCost(model, output.usage);
上下文超限错误
当请求超出模型的上下文窗口时,Pi 可以通过对会话做上下文压缩并重试来自动恢复。只有当 Pi 将这次失败识别为超限时,这种恢复才会触发。
检测发生在已定稿的 assistant message 上:
stopReason === "error"errorMessage匹配 Pi 已知的某个超限模式(见packages/ai/src/utils/overflow.ts)
如果你的 Provider 返回的超限错误信息是 Pi 不认识的,请在注册该 Provider 的同一个 extension 里把错误归一化。使用 message_end handler 重写 assistant message,让它的 errorMessage 以 Pi 能识别的短语开头。context_length_exceeded 这个通用兜底值最稳妥。
const MY_PROVIDER_OVERFLOW_PATTERN = /your provider's overflow phrase/i;
export default function (pi: ExtensionAPI) {
pi.registerProvider("my-provider", { /* ... */ });
pi.on("message_end", (event, ctx) => {
const message = event.message;
if (message.role !== "assistant") return;
if (message.stopReason !== "error") return;
if (
message.provider !== "my-provider" &&
ctx.model?.provider !== "my-provider"
)
return;
const errorMessage = message.errorMessage ?? "";
if (errorMessage.includes("context_length_exceeded")) return;
if (!MY_PROVIDER_OVERFLOW_PATTERN.test(errorMessage)) return;
return {
message: {
...message,
errorMessage: `context_length_exceeded: ${errorMessage}`,
},
};
});
}
message_end 会在 Pi 开始跟踪 assistant message 以进行自动上下文压缩之前运行,所以被重写后的 errorMessage 才是 Pi 实际检查的内容。加上这一层后,Pi 会:
- 从
errorMessage检测到超限。 - 把失败的 assistant message 从实时上下文中移除。
- 执行上下文压缩。
- 重试一次请求。
重写时要谨慎:
- 作用域只限定到你的 Provider(
message.provider和ctx.model?.provider),避免把其他 Provider 的无关错误也改掉。 - 匹配你这个 Provider 的特定模式,不要匹配 Pi 的通用超限模式。把 rate limit 或 throttling 错误(
rate limit,too many requests)也重写掉,会误触发上下文压缩,而不是走 Pi 正常的带退避重试路径。 - 如果
errorMessage已经包含context_length_exceeded,就跳过,保证 handler 幂等。
注册
注册你的 stream function:
pi.registerProvider("my-provider", {
baseUrl: "https://api.example.com",
apiKey: "$MY_API_KEY",
api: "my-custom-api",
models: [...],
streamSimple: streamMyProvider
});
测试你的实现
用和内置 Provider 相同的测试套件来测试你的 Provider。可以从 packages/ai/test/ 复制并按需调整这些测试文件:
| 测试 | 目的 |
|---|---|
stream.test.ts | 基础流式输出,文本输出 |
tokens.test.ts | token 计数和 usage |
abort.test.ts | AbortSignal 处理 |
empty.test.ts | 空响应 / 最小响应 |
context-overflow.test.ts | 上下文窗口限制 |
image-limits.test.ts | 图像输入处理 |
unicode-surrogate.test.ts | Unicode 边界情况 |
tool-call-without-result.test.ts | tool call 边界情况 |
image-tool-result.test.ts | tool result 里的图像 |
total-tokens.test.ts | total token 计算 |
cross-provider-handoff.test.ts | Provider 之间的上下文交接 |
使用你的 Provider/model 组合运行测试,确认兼容性。
配置参考
interface ProviderConfig {
/** Display name for the provider in UI such as /login. */
name?: string;
/** API endpoint URL. Required when defining models. */
baseUrl?: string;
/** API key literal, env interpolation ($ENV_VAR or ${ENV_VAR}), or !command. Required when defining models (unless oauth). */
apiKey?: string;
/** API type for streaming. Required at provider or model level when defining models. */
api?: Api;
/** Custom streaming implementation for non-standard APIs. */
streamSimple?: (
model: Model<Api>,
context: Context,
options?: SimpleStreamOptions
) => AssistantMessageEventStream;
/** Custom headers to include in requests. Values use the same resolution syntax as apiKey. */
headers?: Record<string, string>;
/** If true, adds Authorization: Bearer header with the resolved API key. */
authHeader?: boolean;
/** Models to register. If provided, replaces all existing models for this provider. */
models?: ProviderModelConfig[];
/** OAuth provider for /login support. */
oauth?: {
name: string;
login(callbacks: OAuthLoginCallbacks): Promise<OAuthCredentials>;
refreshToken(credentials: OAuthCredentials): Promise<OAuthCredentials>;
getApiKey(credentials: OAuthCredentials): string;
modifyModels?(models: Model<Api>[], credentials: OAuthCredentials): Model<Api>[];
};
}
模型定义参考
interface ProviderModelConfig {
/** Model ID (e.g., "claude-sonnet-4-20250514"). */
id: string;
/** Display name (e.g., "Claude 4 Sonnet"). */
name: string;
/** API type override for this specific model. */
api?: Api;
/** API endpoint URL override for this specific model. */
baseUrl?: string;
/** Whether the model supports extended thinking. */
reasoning: boolean;
/** Maps pi thinking levels to provider/model-specific values; null marks a level unsupported. */
thinkingLevelMap?: Partial<Record<"off" | "minimal" | "low" | "medium" | "high" | "xhigh", string | null>>;
/** Supported input types. */
input: ("text" | "image")[];
/** Cost per million tokens (for usage tracking). */
cost: {
input: number;
output: number;
cacheRead: number;
cacheWrite: number;
};
/** Maximum context window size in tokens. */
contextWindow: number;
/** Maximum output tokens. */
maxTokens: number;
/** Custom headers for this specific model. */
headers?: Record<string, string>;
/** Compatibility settings for the selected API. */
compat?: {
// openai-completions
supportsStore?: boolean;
supportsDeveloperRole?: boolean;
supportsReasoningEffort?: boolean;
supportsUsageInStreaming?: boolean;
maxTokensField?: "max_completion_tokens" | "max_tokens";
requiresToolResultName?: boolean;
requiresAssistantAfterToolResult?: boolean;
requiresThinkingAsText?: boolean;
requiresReasoningContentOnAssistantMessages?: boolean;
thinkingFormat?: "openai" | "openrouter" | "deepseek" | "together" | "zai" | "qwen" | "chat-template" | "qwen-chat-template" | "string-thinking" | "ant-ling";
chatTemplateKwargs?: Record<string, string | number | boolean | null | { "$var": "thinking.enabled" | "thinking.effort"; omitWhenOff?: boolean }>;
cacheControlFormat?: "anthropic";
// anthropic-messages
supportsEagerToolInputStreaming?: boolean;
supportsLongCacheRetention?: boolean;
sendSessionAffinityHeaders?: boolean;
supportsCacheControlOnTools?: boolean;
forceAdaptiveThinking?: boolean;
allowEmptySignature?: boolean;
};
}
openrouter 会发送 reasoning: { effort }。deepseek 会发送 thinking: { type: "enabled" | "disabled" },并在启用时发送 reasoning_effort。together 会发送 reasoning: { enabled },并且在启用 supportsReasoningEffort 时也会发送 reasoning_effort。qwen 用于 DashScope 风格的顶层 enable_thinking。对于本地的 Qwen 兼容服务器,如果它们读取 chat_template_kwargs.enable_thinking 并且需要 preserve_thinking,请使用 qwen-chat-template。对于可配置 chat_template_kwargs 的场景,请使用 chat-template,例如 vLLM 后面的 DeepSeek V3.x,配置为 chatTemplateKwargs: { "thinking": { "$var": "thinking.enabled" } }。
cacheControlFormat: "anthropic" 会把 Anthropic 风格的 cache_control 标记应用到 system prompt、最后一个 tool definition,以及最后一段 user/assistant 文本内容。