Skip to content

feat: implement websockets transport mode selection for chat#5410

Open
Soulter wants to merge 3 commits intomasterfrom
feat/ws-chatui
Open

feat: implement websockets transport mode selection for chat#5410
Soulter wants to merge 3 commits intomasterfrom
feat/ws-chatui

Conversation

@Soulter
Copy link
Member

@Soulter Soulter commented Feb 24, 2026

  • Added transport mode selection (SSE/WebSocket) in the chat component.
  • Updated conversation sidebar to include transport mode options.
  • Integrated transport mode handling in message sending logic.
  • Refactored message sending functions to support both SSE and WebSocket.
  • Enhanced WebSocket connection management and message handling.
  • Updated localization files for transport mode labels.
  • Configured Vite to support WebSocket proxying.

Modifications / 改动点

  • This is NOT a breaking change. / 这不是一个破坏性变更。

Screenshots or Test Results / 运行截图或测试结果


Checklist / 检查清单

  • 😊 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。/ If there are new features added in the PR, I have discussed it with the authors through issues/emails, etc.
  • 👀 我的更改经过了良好的测试,并已在上方提供了“验证步骤”和“运行截图”。/ My changes have been well-tested, and "Verification Steps" and "Screenshots" have been provided above.
  • 🤓 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到了 requirements.txtpyproject.toml 文件相应位置。/ I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations in requirements.txt and pyproject.toml.
  • 😮 我的更改没有引入恶意代码。/ My changes do not introduce malicious code.

Summary by Sourcery

为聊天功能添加可选择的 SSE/WebSocket 传输方式,并在前端和 webchat 后端完成端到端集成。

New Features:

  • 引入用户可选的聊天传输模式(SSE 或 WebSocket),存储在本地存储中,并在会话侧边栏中展示。
  • 新增统一的聊天 WebSocket 端点,支持绑定聊天会话、发送聊天消息,以及在流式响应中中断运行。

Enhancements:

  • 重构前端和后端的消息流式传输管线,以共享通用的分片处理逻辑,并同时支持 SSE 和 WebSocket 传输。
  • 改进 webchat 平台,以支持主动推送的机器人消息、规范化的会话 ID,以及对通过 WebSocket 传输的消息和附件进行持久化存储。
  • 抽取可复用的辅助方法,用于构建 webchat 消息片段、在消息载荷和消息链之间转换,以及在存储前剥离临时字段。

Build:

  • 在 Vite 开发服务器配置中为 /api 启用 WebSocket 代理支持。

Documentation:

  • 为聊天传输模式标签添加英文和中文本地化字符串。
Original summary in English

Summary by Sourcery

Add selectable SSE/WebSocket transport for chat and integrate it end-to-end across frontend and webchat backend.

New Features:

  • Introduce user-selectable chat transport mode (SSE or WebSocket) persisted in local storage and exposed in the conversation sidebar.
  • Add unified chat WebSocket endpoint that supports binding to chat sessions, sending chat messages, and interrupting runs with streaming responses.

Enhancements:

  • Refactor message streaming pipeline on both frontend and backend to share common chunk processing logic and support both SSE and WebSocket transports.
  • Improve webchat platform to support proactive bot messages, normalized conversation IDs, and persistent storage of WebSocket-delivered messages and attachments.
  • Extract reusable helpers for building webchat message parts, converting between message payloads and message chains, and stripping transient fields before storage.

Build:

  • Enable WebSocket proxy support for /api in the Vite dev server configuration.

Documentation:

  • Add localization strings for chat transport mode labels in English and Chinese.

- Added transport mode selection (SSE/WebSocket) in the chat component.
- Updated conversation sidebar to include transport mode options.
- Integrated transport mode handling in message sending logic.
- Refactored message sending functions to support both SSE and WebSocket.
- Enhanced WebSocket connection management and message handling.
- Updated localization files for transport mode labels.
- Configured Vite to support WebSocket proxying.
@dosubot dosubot bot added the size:XXL This PR changes 1000+ lines, ignoring generated files. label Feb 24, 2026
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @Soulter, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly upgrades the chat functionality by introducing selectable transport modes: Server-Sent Events (SSE) and WebSockets. This change provides users with more flexible and potentially more performant real-time communication options. The implementation involves a comprehensive refactoring of message handling across the application, from the frontend UI to the backend message processing and storage, ensuring robust support for both streaming protocols.

Highlights

  • Transport Mode Selection: Implemented the ability to select between Server-Sent Events (SSE) and WebSockets as the communication transport mode within the chat component.
  • Unified WebSocket Endpoint: Introduced a new unified WebSocket endpoint (/api/unified_chat/ws) to handle both 'live' and 'chat' modes, replacing the legacy live chat WebSocket.
  • Message Sending Refactoring: Refactored message sending logic on both frontend and backend to support distinct handling for SSE and WebSocket transport modes, including attachment processing and message chain construction.
  • WebSocket Connection Management: Enhanced WebSocket connection management, including session binding, message dispatching, and subscription handling for real-time updates.
  • Localization and Configuration Updates: Added new localization keys for transport mode options and configured Vite to support WebSocket proxying for development.
Changelog
  • astrbot/core/platform/sources/webchat/message_parts_helper.py
    • Added a new module to centralize message part processing, including building message parts from payloads, converting to message chains, and handling attachments.
  • astrbot/core/platform/sources/webchat/webchat_adapter.py
    • Updated message sending logic to support proactive messages and integrated new message part helper functions, along with a new conversation ID extraction utility.
  • astrbot/core/platform/sources/webchat/webchat_event.py
    • Modified message event handling to use a new conversation ID extraction function for consistency.
  • astrbot/core/platform/sources/webchat/webchat_queue_mgr.py
    • Added a utility method to retrieve active back-queue request IDs for a given conversation.
  • astrbot/dashboard/routes/chat.py
    • Refactored message processing by delegating message part building and attachment creation to helper functions, and updated message content validation.
  • astrbot/dashboard/routes/live_chat.py
    • Introduced a unified WebSocket endpoint, implemented WebSocket-specific message handling, subscription management, and integrated message part helper functions for chat mode.
  • astrbot/dashboard/routes/open_api.py
    • Refactored message chain construction to utilize the new build_message_chain_from_payload helper function.
  • dashboard/src/components/chat/Chat.vue
    • Integrated the transport mode selection from the conversation sidebar and passed the selected mode to the useMessages composable.
  • dashboard/src/components/chat/ConversationSidebar.vue
    • Added UI elements for selecting between SSE and WebSocket transport modes and emitted the selected mode.
  • dashboard/src/composables/useMessages.ts
    • Implemented WebSocket connection management, message sending via WebSocket, and refactored SSE message sending, including state management for transport mode and cleanup.
  • dashboard/src/i18n/locales/en-US/features/chat.json
    • Added new localization strings for "Transport Mode", "SSE", and "WebSocket".
  • dashboard/src/i18n/locales/zh-CN/features/chat.json
    • Added new localization strings for "通信传输模式", "SSE", and "WebSocket" in Chinese.
  • dashboard/vite.config.ts
    • Enabled WebSocket proxying in the development server configuration to support WebSocket connections.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@dosubot dosubot bot added the feature:chatui The bug / feature is about astrbot's chatui, webchat label Feb 24, 2026
@dosubot
Copy link

dosubot bot commented Feb 24, 2026

Related Documentation

Checked 1 published document(s) in 1 knowledge base(s). No updates required.

How did I do? Any feedback?  Join Discord

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - 我发现了 3 个问题,并留下了一些整体性的反馈:

  • ConversationSidebar.vue 中,transportOptions 在 script setup 中使用了 tm('transport.sse')/tm('transport.websocket'),但从 useI18n 中只解构导入了 t,所以你需要同时解构 tm(或者改用 t)以避免运行时/TS 错误。
给 AI Agent 的提示词
请根据这次代码评审中的评论进行修改:

## 整体评论
-`ConversationSidebar.vue` 中,`transportOptions` 在 script setup 中使用了 `tm('transport.sse')`/`tm('transport.websocket')`,但从 `useI18n` 中只解构导入了 `t`,所以你需要同时解构 `tm`(或者改用 `t`)以避免运行时/TS 错误。

## 逐条评论

### Comment 1
<location path="dashboard/src/composables/useMessages.ts" line_range="727-730" />
<code_context>
         }
     }

+    function buildBackendMessageParts(
+        prompt: string,
+        stagedFiles: { attachment_id: string; url: string; original_name: string; type: string }[],
+        replyTo: ReplyInfo | null
+    ): MessagePart[] {
+        const parts: MessagePart[] = [];
</code_context>
<issue_to_address>
**issue (bug_risk):** 仅含音频的消息当前不会被包含在 SSE/WebSocket 的后端负载中,因此会在服务端被丢弃。

`sendMessage` 会把本地音频 URL 加到 `userMessageParts` 中,但 `buildBackendMessageParts(prompt, stagedFiles, replyTo)` 完全忽略了音频。结果是:

- 纯音频消息会生成空的 `backendMessageParts`- 对于混合消息,录音永远不会被包含在发送到 `/api/chat/send` 的负载中。

为了让后端行为与本地展示保持一致,可以选择:

- 扩展 `buildBackendMessageParts`,让它接收 `audioName` 并按后端期望的格式发出 `record` 类型的片段,或者
-`userMessageParts` 推导出 `backendMessageParts`,在转换过程中把音频片段转换为后端基于附件的表示形式。

否则,音频会在新的 SSE/WebSocket 流程中被静默丢弃。
</issue_to_address>

### Comment 2
<location path="dashboard/src/composables/useMessages.ts" line_range="205" />
<code_context>
+        currentBoundSessionId.value = sessionId;
+    }
+
+    async function handlePassiveWebSocketChunk(payload: StreamChunk) {
+        if (!payload.type) {
+            return;
</code_context>
<issue_to_address>
**issue (complexity):** 建议把 shared 的“chunk → message 渲染逻辑”抽取到一个单独的 helper 中,同时被 `createStreamChunkProcessor``handlePassiveWebSocketChunk` 复用,以避免行为重复。

通过把 shared 的 chunk→message 处理逻辑抽到一个 helper 中,并在 `createStreamChunkProcessor``handlePassiveWebSocketChunk` 中复用,你可以显著减少重复代码,并让未来的改动更安全。

### 1. 抽取共享的媒体/纯文本/推理处理逻辑

当前情况:

- `createStreamChunkProcessor` 处理 `plain` / `image` / `record` / `file` / `agent_stats` / `update_title` / `message_saved` 以及流式状态等;
- `handlePassiveWebSocketChunk` 为“被动” WebSocket 通道重新实现了大部分 `plain` / `image` / `record` / `file` 的处理。

这意味着,只要修改 chunk 渲染方式,就必须在两个地方同时更新。

你可以抽出一个 helper,负责在给定 `StreamChunk` 和少量状态(`messageObj``inStreaming`)的前提下更新 `messages`,然后让两个路径都调用它。

#### 示例:共享的处理核心

```ts
type StreamProcessorState = {
  inStreaming: boolean;
  messageObj: MessageContent | null;
};

async function processStreamChunkCore(
  chunkJson: StreamChunk,
  state: StreamProcessorState,
) {
  if (!chunkJson || typeof chunkJson !== 'object' || !chunkJson.type) return;

  // remove loading placeholder if present
  const lastMsg = messages.value[messages.value.length - 1];
  if (lastMsg?.content?.isLoading) {
    messages.value.pop();
  }

  if (chunkJson.type === 'error') {
    console.error('Error received:', chunkJson.data);
    return;
  }

  if (chunkJson.type === 'image') {
    const img = String(chunkJson.data || '').replace('[IMAGE]', '');
    const imageUrl = await getMediaFile(img);
    const botResp: MessageContent = {
      type: 'bot',
      message: [{ type: 'image', embedded_url: imageUrl }],
    };
    messages.value.push({ content: botResp });
    return;
  }

  if (chunkJson.type === 'record') {
    const audio = String(chunkJson.data || '').replace('[RECORD]', '');
    const audioUrl = await getMediaFile(audio);
    const botResp: MessageContent = {
      type: 'bot',
      message: [{ type: 'record', embedded_url: audioUrl }],
    };
    messages.value.push({ content: botResp });
    return;
  }

  if (chunkJson.type === 'file') {
    const fileData = String(chunkJson.data || '').replace('[FILE]', '');
    const [filename, originalName] = fileData.includes('|')
      ? fileData.split('|', 2)
      : [fileData, fileData];
    const fileUrl = await getMediaFile(filename);
    const botResp: MessageContent = {
      type: 'bot',
      message: [{
        type: 'file',
        embedded_file: { url: fileUrl, filename: originalName },
      }],
    };
    messages.value.push({ content: botResp });
    return;
  }

  if (chunkJson.type === 'plain') {
    const chainType = chunkJson.chain_type || 'normal';

    if (chainType === 'tool_call') {
      let toolCallData: any;
      try {
        toolCallData = JSON.parse(String(chunkJson.data || '{}'));
      } catch {
        return;
      }
      const toolCall: ToolCall = {
        id: toolCallData.id,
        name: toolCallData.name,
        args: toolCallData.args,
        ts: toolCallData.ts,
      };

      if (!state.inStreaming) {
        state.messageObj = reactive<MessageContent>({
          type: 'bot',
          message: [{ type: 'tool_call', tool_calls: [toolCall] }],
        });
        messages.value.push({ content: state.messageObj });
        state.inStreaming = true;
      } else {
        const lastPart = state.messageObj!.message[state.messageObj!.message.length - 1];
        if (lastPart?.type === 'tool_call') {
          const existingIndex = lastPart.tool_calls!.findIndex(
            (tc: ToolCall) => tc.id === toolCall.id,
          );
          if (existingIndex === -1) {
            lastPart.tool_calls!.push(toolCall);
          }
        } else {
          state.messageObj!.message.push({
            type: 'tool_call',
            tool_calls: [toolCall],
          });
        }
      }
      return;
    }

    if (chainType === 'tool_call_result') {
      let resultData: any;
      try {
        resultData = JSON.parse(String(chunkJson.data || '{}'));
      } catch {
        return;
      }

      if (state.messageObj) {
        for (const part of state.messageObj.message) {
          if (part.type === 'tool_call' && part.tool_calls) {
            const toolCall = part.tool_calls.find(
              (tc: ToolCall) => tc.id === resultData.id,
            );
            if (toolCall) {
              toolCall.result = resultData.result;
              toolCall.finished_ts = resultData.ts;
              break;
            }
          }
        }
      }
      return;
    }

    if (chainType === 'reasoning') {
      if (!state.inStreaming) {
        state.messageObj = reactive<MessageContent>({
          type: 'bot',
          message: [],
          reasoning: String(chunkJson.data || ''),
        });
        messages.value.push({ content: state.messageObj });
        state.inStreaming = true;
      } else {
        state.messageObj!.reasoning =
          (state.messageObj!.reasoning || '') + String(chunkJson.data || '');
      }
      return;
    }

    // 普通纯文本
    if (!state.inStreaming) {
      state.messageObj = reactive<MessageContent>({
        type: 'bot',
        message: [{
          type: 'plain',
          text: String(chunkJson.data || ''),
        }],
      });
      messages.value.push({ content: state.messageObj });
      state.inStreaming = true;
    } else {
      const lastPart = state.messageObj!.message[state.messageObj!.message.length - 1];
      if (lastPart?.type === 'plain') {
        lastPart.text = (lastPart.text || '') + String(chunkJson.data || '');
      } else {
        state.messageObj!.message.push({
          type: 'plain',
          text: String(chunkJson.data || ''),
        });
      }
    }
    return;
  }

  if (chunkJson.type === 'update_title') {
    if (chunkJson.session_id) {
      updateSessionTitle(chunkJson.session_id, chunkJson.data);
    }
    return;
  }

  if (chunkJson.type === 'message_saved') {
    const lastBotMsg = messages.value[messages.value.length - 1];
    if (lastBotMsg && lastBotMsg.content?.type === 'bot') {
      lastBotMsg.id = chunkJson.data?.id;
      lastBotMsg.created_at = chunkJson.data?.created_at;
    }
    return;
  }

  if (chunkJson.type === 'agent_stats') {
    if (state.messageObj) {
      state.messageObj.agentStats = chunkJson.data;
    }
    return;
  }
}
```

然后再加一个很小的 helper 来处理通用的流式标记逻辑以及 `isStreaming````ts
function updateStreamingState(chunkJson: StreamChunk, state: StreamProcessorState) {
  if (typeof chunkJson.streaming !== 'boolean') return;

  if ((chunkJson.type === 'break' && chunkJson.streaming) || !chunkJson.streaming) {
    state.inStreaming = false;
    if (!chunkJson.streaming) {
      isStreaming.value = false;
    }
  }
}
```

### 2. 在 `createStreamChunkProcessor``handlePassiveWebSocketChunk` 中复用

`createStreamChunkProcessor` 可以简化成一个很薄的包装:

```ts
function createStreamChunkProcessor() {
  const state: StreamProcessorState = {
    inStreaming: false,
    messageObj: null,
  };

  return async (chunkJson: StreamChunk) => {
    if (!chunkJson || typeof chunkJson !== 'object') return;
    if (chunkJson.type === 'session_id') return;

    await processStreamChunkCore(chunkJson, state);
    updateStreamingState(chunkJson, state);
  };
}
```

`handlePassiveWebSocketChunk` 可以复用同一个核心逻辑,但使用一个独立的 state,不影响 `isStreaming`(如果你希望被动推送不改变 `isStreaming`,可以传入一个标记或使用单独版本的 `updateStreamingState`):

```ts
const passiveWsState: StreamProcessorState = {
  inStreaming: false,
  messageObj: null,
};

async function handlePassiveWebSocketChunk(payload: StreamChunk) {
  if (!payload.type) return;

  // 复用相同的 chunk -> messages 逻辑
  await processStreamChunkCore(payload, passiveWsState);
  // 可选:如果被动推送不应影响 UI 流式状态,这里就不要调用 updateStreamingState
}
```

这样既保留了当前的所有行为,又能:

- 移除重复的 `plain` / `image` / `record` / `file` / `reasoning` / `tool_call` 处理逻辑;
- 确保未来对渲染逻辑的修改只需在一个地方进行;
-`createStreamChunkProcessor` 更小、更聚焦(并且更易于在未来进一步拆分)。
</issue_to_address>

### Comment 3
<location path="astrbot/dashboard/routes/live_chat.py" line_range="338" />
<code_context>
+        session.chat_subscriptions.clear()
+        session.chat_subscription_tasks.clear()
+
+    async def _handle_chat_message(
+        self, session: LiveChatSession, message: dict
+    ) -> None:
</code_context>
<issue_to_address>
**issue (complexity):** 建议抽取一个共享的聊天错误处理 helper,并把 `_handle_chat_message` 按命令类型拆分成多个小的处理函数,以保持该函数聚焦且更易扩展。

你可以保留所有新增能力,同时通过在现有逻辑**之上**增加两层小抽象来控制复杂度:

1. 标准化的聊天错误 helper
2. `_handle_chat_message` 的按消息类型分发器

这些改动不会改变行为,但会让 `_handle_chat_message` 更易读、更易扩展。

---

### 1. 标准化错误响应

你在 `_handle_chat_message` 中多次重复构造相同结构的错误 payload。定义一个小 helper 可以降低噪音,并让未来修改错误格式变得非常容易:

```python
async def _send_chat_error(
    self,
    session: LiveChatSession,
    code: str,
    data: str,
) -> None:
    await self._send_chat_payload(
        session,
        {
            "ct": "chat",
            "t": "error",
            "code": code,
            "data": data,
        },
    )
```

使用示例(语义保持完全一致):

```python
if not isinstance(chat_session_id, str) or not chat_session_id:
    await self._send_chat_error(
        session,
        code="INVALID_MESSAGE_FORMAT",
        data="session_id is required",
    )
    return

# ...

if msg_type != "send":
    await self._send_chat_error(
        session,
        code="INVALID_MESSAGE_FORMAT",
        data=f"Unsupported message type: {msg_type}",
    )
    return

# ...

if session.is_processing:
    await self._send_chat_error(
        session,
        code="PROCESSING_ERROR",
        data="Session is busy",
    )
    return
```

`_handle_chat_message` 末尾的 `except` 块也可以使用这个 helper。

---

### 2. 按命令类型拆分 `_handle_chat_message`

目前 `_handle_chat_message` 内联处理了 `bind``interrupt``send`。把它们抽成小的处理函数,可以在不动“重型累积循环”的前提下,大幅理清控制流。

```python
async def _handle_chat_bind(
    self,
    session: LiveChatSession,
    chat_session_id: str | None,
) -> None:
    if not isinstance(chat_session_id, str) or not chat_session_id:
        await self._send_chat_error(
            session,
            code="INVALID_MESSAGE_FORMAT",
            data="session_id is required",
        )
        return

    request_id = await self._ensure_chat_subscription(session, chat_session_id)
    await self._send_chat_payload(
        session,
        {
            "ct": "chat",
            "type": "session_bound",
            "session_id": chat_session_id,
            "message_id": request_id,
        },
    )
```

```python
async def _handle_chat_interrupt(self, session: LiveChatSession) -> None:
    session.should_interrupt = True
    await self._send_chat_error(
        session,
        code="INTERRUPTED",
        data="INTERRUPTED",
    )
```

然后 `_handle_chat_message` 顶部就可以变成一个精简的分发器,只有 `send` 分支保留较大的流程:

```python
async def _handle_chat_message(
    self,
    session: LiveChatSession,
    message: dict,
) -> None:
    msg_type = message.get("t")

    if msg_type == "bind":
        await self._handle_chat_bind(session, message.get("session_id"))
        return

    if msg_type == "interrupt":
        await self._handle_chat_interrupt(session)
        return

    if msg_type != "send":
        await self._send_chat_error(
            session,
            code="INVALID_MESSAGE_FORMAT",
            data=f"Unsupported message type: {msg_type}",
        )
        return

    await self._handle_chat_send(session, message)
```

然后可以把当前 “send” 分支(校验、入队、历史、累积循环)原样剪切到 `_handle_chat_send` 中:

```python
async def _handle_chat_send(
    self,
    session: LiveChatSession,
    message: dict,
) -> None:
    # 当前 _handle_chat_message 中 `send` 分支的主体直接移到这里,
    # 保持不变(payload 校验、入队、历史处理、累积循环等)
    ...
```

这样既保持所有功能不变,又能:

- 统一错误响应格式;
-`_handle_chat_message` 变得简短聚焦(只按 `t` 做路由);
- 把“重型”的 send 流程隔离到 `_handle_chat_send` 中,未来如果需要可以进一步拆分(校验、入队、累积等)。
</issue_to_address>

Sourcery 对开源项目免费使用——如果你觉得我们的评审有帮助,欢迎分享 ✨
帮我变得更有用!请对每条评论点 👍 或 👎,我会根据你的反馈改进后续评审。
Original comment in English

Hey - I've found 3 issues, and left some high level feedback:

  • In ConversationSidebar.vue, transportOptions uses tm('transport.sse')/tm('transport.websocket') in the script setup but only t is imported from useI18n, so you should also destructure tm (or switch to t) to avoid a runtime/TS error.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `ConversationSidebar.vue`, `transportOptions` uses `tm('transport.sse')`/`tm('transport.websocket')` in the script setup but only `t` is imported from `useI18n`, so you should also destructure `tm` (or switch to `t`) to avoid a runtime/TS error.

## Individual Comments

### Comment 1
<location path="dashboard/src/composables/useMessages.ts" line_range="727-730" />
<code_context>
         }
     }

+    function buildBackendMessageParts(
+        prompt: string,
+        stagedFiles: { attachment_id: string; url: string; original_name: string; type: string }[],
+        replyTo: ReplyInfo | null
+    ): MessagePart[] {
+        const parts: MessagePart[] = [];
</code_context>
<issue_to_address>
**issue (bug_risk):** Audio-only messages are not included in backend payload for SSE/WebSocket, so they will be dropped server-side.

`sendMessage` adds the local audio URL to `userMessageParts`, but `buildBackendMessageParts(prompt, stagedFiles, replyTo)` ignores audio entirely. As a result:

- Audio-only messages produce an empty `backendMessageParts`.
- For mixed messages, the recording is never included in the payload sent to `/api/chat/send`.

To keep backend behavior aligned with the local view, either:

- Extend `buildBackendMessageParts` to take `audioName` and emit a `record` part in the format the backend expects, or
- Derive `backendMessageParts` from `userMessageParts`, transforming audio parts into the backend’s attachment-based representation.

Otherwise, audio will be silently dropped by the new SSE/WebSocket flow.
</issue_to_address>

### Comment 2
<location path="dashboard/src/composables/useMessages.ts" line_range="205" />
<code_context>
+        currentBoundSessionId.value = sessionId;
+    }
+
+    async function handlePassiveWebSocketChunk(payload: StreamChunk) {
+        if (!payload.type) {
+            return;
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting the shared chunk-to-message rendering logic into a single helper used by both `createStreamChunkProcessor` and `handlePassiveWebSocketChunk` to avoid duplicated behavior.

You can significantly reduce duplication and make future changes safer by extracting shared chunk→message handling into a single helper and reusing it in both `createStreamChunkProcessor` and `handlePassiveWebSocketChunk`.

### 1. Extract shared media/plain/reasoning handling

Right now:

- `createStreamChunkProcessor` handles `plain` / `image` / `record` / `file` / `agent_stats` / `update_title` / `message_saved` plus streaming state, etc.
- `handlePassiveWebSocketChunk` reimplements most of `plain` / `image` / `record` / `file` handling for the “passive” WebSocket path.

This means any change to how chunks are rendered must be updated in two places.

You can factor out a single helper that is responsible for updating `messages` given a `StreamChunk` and some minimal state (`messageObj`, `inStreaming`) and then call it from both code paths.

#### Example: shared processor core

```ts
type StreamProcessorState = {
  inStreaming: boolean;
  messageObj: MessageContent | null;
};

async function processStreamChunkCore(
  chunkJson: StreamChunk,
  state: StreamProcessorState,
) {
  if (!chunkJson || typeof chunkJson !== 'object' || !chunkJson.type) return;

  // remove loading placeholder if present
  const lastMsg = messages.value[messages.value.length - 1];
  if (lastMsg?.content?.isLoading) {
    messages.value.pop();
  }

  if (chunkJson.type === 'error') {
    console.error('Error received:', chunkJson.data);
    return;
  }

  if (chunkJson.type === 'image') {
    const img = String(chunkJson.data || '').replace('[IMAGE]', '');
    const imageUrl = await getMediaFile(img);
    const botResp: MessageContent = {
      type: 'bot',
      message: [{ type: 'image', embedded_url: imageUrl }],
    };
    messages.value.push({ content: botResp });
    return;
  }

  if (chunkJson.type === 'record') {
    const audio = String(chunkJson.data || '').replace('[RECORD]', '');
    const audioUrl = await getMediaFile(audio);
    const botResp: MessageContent = {
      type: 'bot',
      message: [{ type: 'record', embedded_url: audioUrl }],
    };
    messages.value.push({ content: botResp });
    return;
  }

  if (chunkJson.type === 'file') {
    const fileData = String(chunkJson.data || '').replace('[FILE]', '');
    const [filename, originalName] = fileData.includes('|')
      ? fileData.split('|', 2)
      : [fileData, fileData];
    const fileUrl = await getMediaFile(filename);
    const botResp: MessageContent = {
      type: 'bot',
      message: [{
        type: 'file',
        embedded_file: { url: fileUrl, filename: originalName },
      }],
    };
    messages.value.push({ content: botResp });
    return;
  }

  if (chunkJson.type === 'plain') {
    const chainType = chunkJson.chain_type || 'normal';

    if (chainType === 'tool_call') {
      let toolCallData: any;
      try {
        toolCallData = JSON.parse(String(chunkJson.data || '{}'));
      } catch {
        return;
      }
      const toolCall: ToolCall = {
        id: toolCallData.id,
        name: toolCallData.name,
        args: toolCallData.args,
        ts: toolCallData.ts,
      };

      if (!state.inStreaming) {
        state.messageObj = reactive<MessageContent>({
          type: 'bot',
          message: [{ type: 'tool_call', tool_calls: [toolCall] }],
        });
        messages.value.push({ content: state.messageObj });
        state.inStreaming = true;
      } else {
        const lastPart = state.messageObj!.message[state.messageObj!.message.length - 1];
        if (lastPart?.type === 'tool_call') {
          const existingIndex = lastPart.tool_calls!.findIndex(
            (tc: ToolCall) => tc.id === toolCall.id,
          );
          if (existingIndex === -1) {
            lastPart.tool_calls!.push(toolCall);
          }
        } else {
          state.messageObj!.message.push({
            type: 'tool_call',
            tool_calls: [toolCall],
          });
        }
      }
      return;
    }

    if (chainType === 'tool_call_result') {
      let resultData: any;
      try {
        resultData = JSON.parse(String(chunkJson.data || '{}'));
      } catch {
        return;
      }

      if (state.messageObj) {
        for (const part of state.messageObj.message) {
          if (part.type === 'tool_call' && part.tool_calls) {
            const toolCall = part.tool_calls.find(
              (tc: ToolCall) => tc.id === resultData.id,
            );
            if (toolCall) {
              toolCall.result = resultData.result;
              toolCall.finished_ts = resultData.ts;
              break;
            }
          }
        }
      }
      return;
    }

    if (chainType === 'reasoning') {
      if (!state.inStreaming) {
        state.messageObj = reactive<MessageContent>({
          type: 'bot',
          message: [],
          reasoning: String(chunkJson.data || ''),
        });
        messages.value.push({ content: state.messageObj });
        state.inStreaming = true;
      } else {
        state.messageObj!.reasoning =
          (state.messageObj!.reasoning || '') + String(chunkJson.data || '');
      }
      return;
    }

    // normal plain text
    if (!state.inStreaming) {
      state.messageObj = reactive<MessageContent>({
        type: 'bot',
        message: [{
          type: 'plain',
          text: String(chunkJson.data || ''),
        }],
      });
      messages.value.push({ content: state.messageObj });
      state.inStreaming = true;
    } else {
      const lastPart = state.messageObj!.message[state.messageObj!.message.length - 1];
      if (lastPart?.type === 'plain') {
        lastPart.text = (lastPart.text || '') + String(chunkJson.data || '');
      } else {
        state.messageObj!.message.push({
          type: 'plain',
          text: String(chunkJson.data || ''),
        });
      }
    }
    return;
  }

  if (chunkJson.type === 'update_title') {
    if (chunkJson.session_id) {
      updateSessionTitle(chunkJson.session_id, chunkJson.data);
    }
    return;
  }

  if (chunkJson.type === 'message_saved') {
    const lastBotMsg = messages.value[messages.value.length - 1];
    if (lastBotMsg && lastBotMsg.content?.type === 'bot') {
      lastBotMsg.id = chunkJson.data?.id;
      lastBotMsg.created_at = chunkJson.data?.created_at;
    }
    return;
  }

  if (chunkJson.type === 'agent_stats') {
    if (state.messageObj) {
      state.messageObj.agentStats = chunkJson.data;
    }
    return;
  }
}
```

Then add a tiny helper to handle the common streaming flag logic and `isStreaming`:

```ts
function updateStreamingState(chunkJson: StreamChunk, state: StreamProcessorState) {
  if (typeof chunkJson.streaming !== 'boolean') return;

  if ((chunkJson.type === 'break' && chunkJson.streaming) || !chunkJson.streaming) {
    state.inStreaming = false;
    if (!chunkJson.streaming) {
      isStreaming.value = false;
    }
  }
}
```

### 2. Reuse in `createStreamChunkProcessor` and `handlePassiveWebSocketChunk`

`createStreamChunkProcessor` can become a thin wrapper:

```ts
function createStreamChunkProcessor() {
  const state: StreamProcessorState = {
    inStreaming: false,
    messageObj: null,
  };

  return async (chunkJson: StreamChunk) => {
    if (!chunkJson || typeof chunkJson !== 'object') return;
    if (chunkJson.type === 'session_id') return;

    await processStreamChunkCore(chunkJson, state);
    updateStreamingState(chunkJson, state);
  };
}
```

`handlePassiveWebSocketChunk` can reuse the same core, but with an isolated state that doesn’t affect `isStreaming` (if you want passive pushes not to toggle `isStreaming`, you can pass a flag or a separate `updateStreamingState` variant):

```ts
const passiveWsState: StreamProcessorState = {
  inStreaming: false,
  messageObj: null,
};

async function handlePassiveWebSocketChunk(payload: StreamChunk) {
  if (!payload.type) return;

  // reuse the same chunk -> messages logic
  await processStreamChunkCore(payload, passiveWsState);
  // optionally: skip updateStreamingState here if passive pushes shouldn't affect UI streaming state
}
```

This keeps all current behavior but:

- Removes duplicated `plain` / `image` / `record` / `file` / `reasoning` / `tool_call` logic.
- Ensures future changes to rendering are made in one place.
- Makes `createStreamChunkProcessor` smaller and more focused (and easier to further split later if needed).
</issue_to_address>

### Comment 3
<location path="astrbot/dashboard/routes/live_chat.py" line_range="338" />
<code_context>
+        session.chat_subscriptions.clear()
+        session.chat_subscription_tasks.clear()
+
+    async def _handle_chat_message(
+        self, session: LiveChatSession, message: dict
+    ) -> None:
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting a shared chat error helper and delegating `_handle_chat_message` to small per-command handlers to keep the function focused and easier to extend.

You can keep all the new capabilities but still trim complexity by extracting two small layers that sit *on top* of your existing logic:

1. a standardized chat error helper
2. a per-message-type dispatcher for `_handle_chat_message`

These don’t change behavior, but they make `_handle_chat_message` much easier to read and extend.

---

### 1. Standardize error responses

You repeat the same error payload shape many times inside `_handle_chat_message`. A small helper reduces noise and makes future changes to the error format trivial:

```python
async def _send_chat_error(
    self,
    session: LiveChatSession,
    code: str,
    data: str,
) -> None:
    await self._send_chat_payload(
        session,
        {
            "ct": "chat",
            "t": "error",
            "code": code,
            "data": data,
        },
    )
```

Usage example (keeps semantics identical):

```python
if not isinstance(chat_session_id, str) or not chat_session_id:
    await self._send_chat_error(
        session,
        code="INVALID_MESSAGE_FORMAT",
        data="session_id is required",
    )
    return

# ...

if msg_type != "send":
    await self._send_chat_error(
        session,
        code="INVALID_MESSAGE_FORMAT",
        data=f"Unsupported message type: {msg_type}",
    )
    return

# ...

if session.is_processing:
    await self._send_chat_error(
        session,
        code="PROCESSING_ERROR",
        data="Session is busy",
    )
    return
```

The `except` block at the end of `_handle_chat_message` can also use this helper.

---

### 2. Split `_handle_chat_message` by command type

Right now `_handle_chat_message` handles `bind`, `interrupt`, and `send` inline. Extracting small handlers makes the control flow much clearer without touching the heavy accumulation loop yet.

```python
async def _handle_chat_bind(
    self,
    session: LiveChatSession,
    chat_session_id: str | None,
) -> None:
    if not isinstance(chat_session_id, str) or not chat_session_id:
        await self._send_chat_error(
            session,
            code="INVALID_MESSAGE_FORMAT",
            data="session_id is required",
        )
        return

    request_id = await self._ensure_chat_subscription(session, chat_session_id)
    await self._send_chat_payload(
        session,
        {
            "ct": "chat",
            "type": "session_bound",
            "session_id": chat_session_id,
            "message_id": request_id,
        },
    )
```

```python
async def _handle_chat_interrupt(self, session: LiveChatSession) -> None:
    session.should_interrupt = True
    await self._send_chat_error(
        session,
        code="INTERRUPTED",
        data="INTERRUPTED",
    )
```

Then `_handle_chat_message` becomes a thin dispatcher at the top, and only the `send` path contains the big flow:

```python
async def _handle_chat_message(
    self,
    session: LiveChatSession,
    message: dict,
) -> None:
    msg_type = message.get("t")

    if msg_type == "bind":
        await self._handle_chat_bind(session, message.get("session_id"))
        return

    if msg_type == "interrupt":
        await self._handle_chat_interrupt(session)
        return

    if msg_type != "send":
        await self._send_chat_error(
            session,
            code="INVALID_MESSAGE_FORMAT",
            data=f"Unsupported message type: {msg_type}",
        )
        return

    await self._handle_chat_send(session, message)
```

And you can move the existing “send” logic (validation, queueing, history, accumulation loop) into `_handle_chat_send` with a straight cut & paste:

```python
async def _handle_chat_send(
    self,
    session: LiveChatSession,
    message: dict,
) -> None:
    # current `send` branch body from _handle_chat_message goes here,
    # unchanged (payload validation, queue enqueue, accumulation loop, etc.)
    ...
```

This keeps all functionality intact but:

- localizes error-response formatting
- makes `_handle_chat_message` short and focused (just routing by `t`)
- isolates the “big” send flow into `_handle_chat_send`, which can be further split later (validation, enqueue, accumulation) when you’re ready.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +727 to +730
function buildBackendMessageParts(
prompt: string,
stagedFiles: { attachment_id: string; url: string; original_name: string; type: string }[],
replyTo: ReplyInfo | null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): 仅含音频的消息当前不会被包含在 SSE/WebSocket 的后端负载中,因此会在服务端被丢弃。

sendMessage 会把本地音频 URL 加到 userMessageParts 中,但 buildBackendMessageParts(prompt, stagedFiles, replyTo) 完全忽略了音频。结果是:

  • 纯音频消息会生成空的 backendMessageParts
  • 对于混合消息,录音永远不会被包含在发送到 /api/chat/send 的负载中。

为了让后端行为与本地展示保持一致,可以选择:

  • 扩展 buildBackendMessageParts,让它接收 audioName 并按后端期望的格式发出 record 类型的片段,或者
  • userMessageParts 推导出 backendMessageParts,在转换过程中把音频片段转换为后端基于附件的表示形式。

否则,音频会在新的 SSE/WebSocket 流程中被静默丢弃。

Original comment in English

issue (bug_risk): Audio-only messages are not included in backend payload for SSE/WebSocket, so they will be dropped server-side.

sendMessage adds the local audio URL to userMessageParts, but buildBackendMessageParts(prompt, stagedFiles, replyTo) ignores audio entirely. As a result:

  • Audio-only messages produce an empty backendMessageParts.
  • For mixed messages, the recording is never included in the payload sent to /api/chat/send.

To keep backend behavior aligned with the local view, either:

  • Extend buildBackendMessageParts to take audioName and emit a record part in the format the backend expects, or
  • Derive backendMessageParts from userMessageParts, transforming audio parts into the backend’s attachment-based representation.

Otherwise, audio will be silently dropped by the new SSE/WebSocket flow.

currentBoundSessionId.value = sessionId;
}

async function handlePassiveWebSocketChunk(payload: StreamChunk) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): 建议把 shared 的“chunk → message 渲染逻辑”抽取到一个单独的 helper 中,同时被 createStreamChunkProcessorhandlePassiveWebSocketChunk 复用,以避免行为重复。

通过把 shared 的 chunk→message 处理逻辑抽到一个 helper 中,并在 createStreamChunkProcessorhandlePassiveWebSocketChunk 中复用,你可以显著减少重复代码,并让未来的改动更安全。

1. 抽取共享的媒体/纯文本/推理处理逻辑

当前情况:

  • createStreamChunkProcessor 处理 plain / image / record / file / agent_stats / update_title / message_saved 以及流式状态等;
  • handlePassiveWebSocketChunk 为“被动” WebSocket 通道重新实现了大部分 plain / image / record / file 的处理。

这意味着,只要修改 chunk 渲染方式,就必须在两个地方同时更新。

你可以抽出一个 helper,负责在给定 StreamChunk 和少量状态(messageObjinStreaming)的前提下更新 messages,然后让两个路径都调用它。

示例:共享的处理核心

type StreamProcessorState = {
  inStreaming: boolean;
  messageObj: MessageContent | null;
};

async function processStreamChunkCore(
  chunkJson: StreamChunk,
  state: StreamProcessorState,
) {
  if (!chunkJson || typeof chunkJson !== 'object' || !chunkJson.type) return;

  // remove loading placeholder if present
  const lastMsg = messages.value[messages.value.length - 1];
  if (lastMsg?.content?.isLoading) {
    messages.value.pop();
  }

  if (chunkJson.type === 'error') {
    console.error('Error received:', chunkJson.data);
    return;
  }

  if (chunkJson.type === 'image') {
    const img = String(chunkJson.data || '').replace('[IMAGE]', '');
    const imageUrl = await getMediaFile(img);
    const botResp: MessageContent = {
      type: 'bot',
      message: [{ type: 'image', embedded_url: imageUrl }],
    };
    messages.value.push({ content: botResp });
    return;
  }

  if (chunkJson.type === 'record') {
    const audio = String(chunkJson.data || '').replace('[RECORD]', '');
    const audioUrl = await getMediaFile(audio);
    const botResp: MessageContent = {
      type: 'bot',
      message: [{ type: 'record', embedded_url: audioUrl }],
    };
    messages.value.push({ content: botResp });
    return;
  }

  if (chunkJson.type === 'file') {
    const fileData = String(chunkJson.data || '').replace('[FILE]', '');
    const [filename, originalName] = fileData.includes('|')
      ? fileData.split('|', 2)
      : [fileData, fileData];
    const fileUrl = await getMediaFile(filename);
    const botResp: MessageContent = {
      type: 'bot',
      message: [{
        type: 'file',
        embedded_file: { url: fileUrl, filename: originalName },
      }],
    };
    messages.value.push({ content: botResp });
    return;
  }

  if (chunkJson.type === 'plain') {
    const chainType = chunkJson.chain_type || 'normal';

    if (chainType === 'tool_call') {
      let toolCallData: any;
      try {
        toolCallData = JSON.parse(String(chunkJson.data || '{}'));
      } catch {
        return;
      }
      const toolCall: ToolCall = {
        id: toolCallData.id,
        name: toolCallData.name,
        args: toolCallData.args,
        ts: toolCallData.ts,
      };

      if (!state.inStreaming) {
        state.messageObj = reactive<MessageContent>({
          type: 'bot',
          message: [{ type: 'tool_call', tool_calls: [toolCall] }],
        });
        messages.value.push({ content: state.messageObj });
        state.inStreaming = true;
      } else {
        const lastPart = state.messageObj!.message[state.messageObj!.message.length - 1];
        if (lastPart?.type === 'tool_call') {
          const existingIndex = lastPart.tool_calls!.findIndex(
            (tc: ToolCall) => tc.id === toolCall.id,
          );
          if (existingIndex === -1) {
            lastPart.tool_calls!.push(toolCall);
          }
        } else {
          state.messageObj!.message.push({
            type: 'tool_call',
            tool_calls: [toolCall],
          });
        }
      }
      return;
    }

    if (chainType === 'tool_call_result') {
      let resultData: any;
      try {
        resultData = JSON.parse(String(chunkJson.data || '{}'));
      } catch {
        return;
      }

      if (state.messageObj) {
        for (const part of state.messageObj.message) {
          if (part.type === 'tool_call' && part.tool_calls) {
            const toolCall = part.tool_calls.find(
              (tc: ToolCall) => tc.id === resultData.id,
            );
            if (toolCall) {
              toolCall.result = resultData.result;
              toolCall.finished_ts = resultData.ts;
              break;
            }
          }
        }
      }
      return;
    }

    if (chainType === 'reasoning') {
      if (!state.inStreaming) {
        state.messageObj = reactive<MessageContent>({
          type: 'bot',
          message: [],
          reasoning: String(chunkJson.data || ''),
        });
        messages.value.push({ content: state.messageObj });
        state.inStreaming = true;
      } else {
        state.messageObj!.reasoning =
          (state.messageObj!.reasoning || '') + String(chunkJson.data || '');
      }
      return;
    }

    // 普通纯文本
    if (!state.inStreaming) {
      state.messageObj = reactive<MessageContent>({
        type: 'bot',
        message: [{
          type: 'plain',
          text: String(chunkJson.data || ''),
        }],
      });
      messages.value.push({ content: state.messageObj });
      state.inStreaming = true;
    } else {
      const lastPart = state.messageObj!.message[state.messageObj!.message.length - 1];
      if (lastPart?.type === 'plain') {
        lastPart.text = (lastPart.text || '') + String(chunkJson.data || '');
      } else {
        state.messageObj!.message.push({
          type: 'plain',
          text: String(chunkJson.data || ''),
        });
      }
    }
    return;
  }

  if (chunkJson.type === 'update_title') {
    if (chunkJson.session_id) {
      updateSessionTitle(chunkJson.session_id, chunkJson.data);
    }
    return;
  }

  if (chunkJson.type === 'message_saved') {
    const lastBotMsg = messages.value[messages.value.length - 1];
    if (lastBotMsg && lastBotMsg.content?.type === 'bot') {
      lastBotMsg.id = chunkJson.data?.id;
      lastBotMsg.created_at = chunkJson.data?.created_at;
    }
    return;
  }

  if (chunkJson.type === 'agent_stats') {
    if (state.messageObj) {
      state.messageObj.agentStats = chunkJson.data;
    }
    return;
  }
}

然后再加一个很小的 helper 来处理通用的流式标记逻辑以及 isStreaming

function updateStreamingState(chunkJson: StreamChunk, state: StreamProcessorState) {
  if (typeof chunkJson.streaming !== 'boolean') return;

  if ((chunkJson.type === 'break' && chunkJson.streaming) || !chunkJson.streaming) {
    state.inStreaming = false;
    if (!chunkJson.streaming) {
      isStreaming.value = false;
    }
  }
}

2. 在 createStreamChunkProcessorhandlePassiveWebSocketChunk 中复用

createStreamChunkProcessor 可以简化成一个很薄的包装:

function createStreamChunkProcessor() {
  const state: StreamProcessorState = {
    inStreaming: false,
    messageObj: null,
  };

  return async (chunkJson: StreamChunk) => {
    if (!chunkJson || typeof chunkJson !== 'object') return;
    if (chunkJson.type === 'session_id') return;

    await processStreamChunkCore(chunkJson, state);
    updateStreamingState(chunkJson, state);
  };
}

handlePassiveWebSocketChunk 可以复用同一个核心逻辑,但使用一个独立的 state,不影响 isStreaming(如果你希望被动推送不改变 isStreaming,可以传入一个标记或使用单独版本的 updateStreamingState):

const passiveWsState: StreamProcessorState = {
  inStreaming: false,
  messageObj: null,
};

async function handlePassiveWebSocketChunk(payload: StreamChunk) {
  if (!payload.type) return;

  // 复用相同的 chunk -> messages 逻辑
  await processStreamChunkCore(payload, passiveWsState);
  // 可选:如果被动推送不应影响 UI 流式状态,这里就不要调用 updateStreamingState
}

这样既保留了当前的所有行为,又能:

  • 移除重复的 plain / image / record / file / reasoning / tool_call 处理逻辑;
  • 确保未来对渲染逻辑的修改只需在一个地方进行;
  • createStreamChunkProcessor 更小、更聚焦(并且更易于在未来进一步拆分)。
Original comment in English

issue (complexity): Consider extracting the shared chunk-to-message rendering logic into a single helper used by both createStreamChunkProcessor and handlePassiveWebSocketChunk to avoid duplicated behavior.

You can significantly reduce duplication and make future changes safer by extracting shared chunk→message handling into a single helper and reusing it in both createStreamChunkProcessor and handlePassiveWebSocketChunk.

1. Extract shared media/plain/reasoning handling

Right now:

  • createStreamChunkProcessor handles plain / image / record / file / agent_stats / update_title / message_saved plus streaming state, etc.
  • handlePassiveWebSocketChunk reimplements most of plain / image / record / file handling for the “passive” WebSocket path.

This means any change to how chunks are rendered must be updated in two places.

You can factor out a single helper that is responsible for updating messages given a StreamChunk and some minimal state (messageObj, inStreaming) and then call it from both code paths.

Example: shared processor core

type StreamProcessorState = {
  inStreaming: boolean;
  messageObj: MessageContent | null;
};

async function processStreamChunkCore(
  chunkJson: StreamChunk,
  state: StreamProcessorState,
) {
  if (!chunkJson || typeof chunkJson !== 'object' || !chunkJson.type) return;

  // remove loading placeholder if present
  const lastMsg = messages.value[messages.value.length - 1];
  if (lastMsg?.content?.isLoading) {
    messages.value.pop();
  }

  if (chunkJson.type === 'error') {
    console.error('Error received:', chunkJson.data);
    return;
  }

  if (chunkJson.type === 'image') {
    const img = String(chunkJson.data || '').replace('[IMAGE]', '');
    const imageUrl = await getMediaFile(img);
    const botResp: MessageContent = {
      type: 'bot',
      message: [{ type: 'image', embedded_url: imageUrl }],
    };
    messages.value.push({ content: botResp });
    return;
  }

  if (chunkJson.type === 'record') {
    const audio = String(chunkJson.data || '').replace('[RECORD]', '');
    const audioUrl = await getMediaFile(audio);
    const botResp: MessageContent = {
      type: 'bot',
      message: [{ type: 'record', embedded_url: audioUrl }],
    };
    messages.value.push({ content: botResp });
    return;
  }

  if (chunkJson.type === 'file') {
    const fileData = String(chunkJson.data || '').replace('[FILE]', '');
    const [filename, originalName] = fileData.includes('|')
      ? fileData.split('|', 2)
      : [fileData, fileData];
    const fileUrl = await getMediaFile(filename);
    const botResp: MessageContent = {
      type: 'bot',
      message: [{
        type: 'file',
        embedded_file: { url: fileUrl, filename: originalName },
      }],
    };
    messages.value.push({ content: botResp });
    return;
  }

  if (chunkJson.type === 'plain') {
    const chainType = chunkJson.chain_type || 'normal';

    if (chainType === 'tool_call') {
      let toolCallData: any;
      try {
        toolCallData = JSON.parse(String(chunkJson.data || '{}'));
      } catch {
        return;
      }
      const toolCall: ToolCall = {
        id: toolCallData.id,
        name: toolCallData.name,
        args: toolCallData.args,
        ts: toolCallData.ts,
      };

      if (!state.inStreaming) {
        state.messageObj = reactive<MessageContent>({
          type: 'bot',
          message: [{ type: 'tool_call', tool_calls: [toolCall] }],
        });
        messages.value.push({ content: state.messageObj });
        state.inStreaming = true;
      } else {
        const lastPart = state.messageObj!.message[state.messageObj!.message.length - 1];
        if (lastPart?.type === 'tool_call') {
          const existingIndex = lastPart.tool_calls!.findIndex(
            (tc: ToolCall) => tc.id === toolCall.id,
          );
          if (existingIndex === -1) {
            lastPart.tool_calls!.push(toolCall);
          }
        } else {
          state.messageObj!.message.push({
            type: 'tool_call',
            tool_calls: [toolCall],
          });
        }
      }
      return;
    }

    if (chainType === 'tool_call_result') {
      let resultData: any;
      try {
        resultData = JSON.parse(String(chunkJson.data || '{}'));
      } catch {
        return;
      }

      if (state.messageObj) {
        for (const part of state.messageObj.message) {
          if (part.type === 'tool_call' && part.tool_calls) {
            const toolCall = part.tool_calls.find(
              (tc: ToolCall) => tc.id === resultData.id,
            );
            if (toolCall) {
              toolCall.result = resultData.result;
              toolCall.finished_ts = resultData.ts;
              break;
            }
          }
        }
      }
      return;
    }

    if (chainType === 'reasoning') {
      if (!state.inStreaming) {
        state.messageObj = reactive<MessageContent>({
          type: 'bot',
          message: [],
          reasoning: String(chunkJson.data || ''),
        });
        messages.value.push({ content: state.messageObj });
        state.inStreaming = true;
      } else {
        state.messageObj!.reasoning =
          (state.messageObj!.reasoning || '') + String(chunkJson.data || '');
      }
      return;
    }

    // normal plain text
    if (!state.inStreaming) {
      state.messageObj = reactive<MessageContent>({
        type: 'bot',
        message: [{
          type: 'plain',
          text: String(chunkJson.data || ''),
        }],
      });
      messages.value.push({ content: state.messageObj });
      state.inStreaming = true;
    } else {
      const lastPart = state.messageObj!.message[state.messageObj!.message.length - 1];
      if (lastPart?.type === 'plain') {
        lastPart.text = (lastPart.text || '') + String(chunkJson.data || '');
      } else {
        state.messageObj!.message.push({
          type: 'plain',
          text: String(chunkJson.data || ''),
        });
      }
    }
    return;
  }

  if (chunkJson.type === 'update_title') {
    if (chunkJson.session_id) {
      updateSessionTitle(chunkJson.session_id, chunkJson.data);
    }
    return;
  }

  if (chunkJson.type === 'message_saved') {
    const lastBotMsg = messages.value[messages.value.length - 1];
    if (lastBotMsg && lastBotMsg.content?.type === 'bot') {
      lastBotMsg.id = chunkJson.data?.id;
      lastBotMsg.created_at = chunkJson.data?.created_at;
    }
    return;
  }

  if (chunkJson.type === 'agent_stats') {
    if (state.messageObj) {
      state.messageObj.agentStats = chunkJson.data;
    }
    return;
  }
}

Then add a tiny helper to handle the common streaming flag logic and isStreaming:

function updateStreamingState(chunkJson: StreamChunk, state: StreamProcessorState) {
  if (typeof chunkJson.streaming !== 'boolean') return;

  if ((chunkJson.type === 'break' && chunkJson.streaming) || !chunkJson.streaming) {
    state.inStreaming = false;
    if (!chunkJson.streaming) {
      isStreaming.value = false;
    }
  }
}

2. Reuse in createStreamChunkProcessor and handlePassiveWebSocketChunk

createStreamChunkProcessor can become a thin wrapper:

function createStreamChunkProcessor() {
  const state: StreamProcessorState = {
    inStreaming: false,
    messageObj: null,
  };

  return async (chunkJson: StreamChunk) => {
    if (!chunkJson || typeof chunkJson !== 'object') return;
    if (chunkJson.type === 'session_id') return;

    await processStreamChunkCore(chunkJson, state);
    updateStreamingState(chunkJson, state);
  };
}

handlePassiveWebSocketChunk can reuse the same core, but with an isolated state that doesn’t affect isStreaming (if you want passive pushes not to toggle isStreaming, you can pass a flag or a separate updateStreamingState variant):

const passiveWsState: StreamProcessorState = {
  inStreaming: false,
  messageObj: null,
};

async function handlePassiveWebSocketChunk(payload: StreamChunk) {
  if (!payload.type) return;

  // reuse the same chunk -> messages logic
  await processStreamChunkCore(payload, passiveWsState);
  // optionally: skip updateStreamingState here if passive pushes shouldn't affect UI streaming state
}

This keeps all current behavior but:

  • Removes duplicated plain / image / record / file / reasoning / tool_call logic.
  • Ensures future changes to rendering are made in one place.
  • Makes createStreamChunkProcessor smaller and more focused (and easier to further split later if needed).

session.chat_subscriptions.clear()
session.chat_subscription_tasks.clear()

async def _handle_chat_message(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): 建议抽取一个共享的聊天错误处理 helper,并把 _handle_chat_message 按命令类型拆分成多个小的处理函数,以保持该函数聚焦且更易扩展。

你可以保留所有新增能力,同时通过在现有逻辑之上增加两层小抽象来控制复杂度:

  1. 标准化的聊天错误 helper
  2. _handle_chat_message 的按消息类型分发器

这些改动不会改变行为,但会让 _handle_chat_message 更易读、更易扩展。


1. 标准化错误响应

你在 _handle_chat_message 中多次重复构造相同结构的错误 payload。定义一个小 helper 可以降低噪音,并让未来修改错误格式变得非常容易:

async def _send_chat_error(
    self,
    session: LiveChatSession,
    code: str,
    data: str,
) -> None:
    await self._send_chat_payload(
        session,
        {
            "ct": "chat",
            "t": "error",
            "code": code,
            "data": data,
        },
    )

使用示例(语义保持完全一致):

if not isinstance(chat_session_id, str) or not chat_session_id:
    await self._send_chat_error(
        session,
        code="INVALID_MESSAGE_FORMAT",
        data="session_id is required",
    )
    return

# ...

if msg_type != "send":
    await self._send_chat_error(
        session,
        code="INVALID_MESSAGE_FORMAT",
        data=f"Unsupported message type: {msg_type}",
    )
    return

# ...

if session.is_processing:
    await self._send_chat_error(
        session,
        code="PROCESSING_ERROR",
        data="Session is busy",
    )
    return

_handle_chat_message 末尾的 except 块也可以使用这个 helper。


2. 按命令类型拆分 _handle_chat_message

目前 _handle_chat_message 内联处理了 bindinterruptsend。把它们抽成小的处理函数,可以在不动“重型累积循环”的前提下,大幅理清控制流。

async def _handle_chat_bind(
    self,
    session: LiveChatSession,
    chat_session_id: str | None,
) -> None:
    if not isinstance(chat_session_id, str) or not chat_session_id:
        await self._send_chat_error(
            session,
            code="INVALID_MESSAGE_FORMAT",
            data="session_id is required",
        )
        return

    request_id = await self._ensure_chat_subscription(session, chat_session_id)
    await self._send_chat_payload(
        session,
        {
            "ct": "chat",
            "type": "session_bound",
            "session_id": chat_session_id,
            "message_id": request_id,
        },
    )
async def _handle_chat_interrupt(self, session: LiveChatSession) -> None:
    session.should_interrupt = True
    await self._send_chat_error(
        session,
        code="INTERRUPTED",
        data="INTERRUPTED",
    )

然后 _handle_chat_message 顶部就可以变成一个精简的分发器,只有 send 分支保留较大的流程:

async def _handle_chat_message(
    self,
    session: LiveChatSession,
    message: dict,
) -> None:
    msg_type = message.get("t")

    if msg_type == "bind":
        await self._handle_chat_bind(session, message.get("session_id"))
        return

    if msg_type == "interrupt":
        await self._handle_chat_interrupt(session)
        return

    if msg_type != "send":
        await self._send_chat_error(
            session,
            code="INVALID_MESSAGE_FORMAT",
            data=f"Unsupported message type: {msg_type}",
        )
        return

    await self._handle_chat_send(session, message)

然后可以把当前 “send” 分支(校验、入队、历史、累积循环)原样剪切到 _handle_chat_send 中:

async def _handle_chat_send(
    self,
    session: LiveChatSession,
    message: dict,
) -> None:
    # 当前 _handle_chat_message 中 `send` 分支的主体直接移到这里,
    # 保持不变(payload 校验、入队、历史处理、累积循环等)
    ...

这样既保持所有功能不变,又能:

  • 统一错误响应格式;
  • _handle_chat_message 变得简短聚焦(只按 t 做路由);
  • 把“重型”的 send 流程隔离到 _handle_chat_send 中,未来如果需要可以进一步拆分(校验、入队、累积等)。
Original comment in English

issue (complexity): Consider extracting a shared chat error helper and delegating _handle_chat_message to small per-command handlers to keep the function focused and easier to extend.

You can keep all the new capabilities but still trim complexity by extracting two small layers that sit on top of your existing logic:

  1. a standardized chat error helper
  2. a per-message-type dispatcher for _handle_chat_message

These don’t change behavior, but they make _handle_chat_message much easier to read and extend.


1. Standardize error responses

You repeat the same error payload shape many times inside _handle_chat_message. A small helper reduces noise and makes future changes to the error format trivial:

async def _send_chat_error(
    self,
    session: LiveChatSession,
    code: str,
    data: str,
) -> None:
    await self._send_chat_payload(
        session,
        {
            "ct": "chat",
            "t": "error",
            "code": code,
            "data": data,
        },
    )

Usage example (keeps semantics identical):

if not isinstance(chat_session_id, str) or not chat_session_id:
    await self._send_chat_error(
        session,
        code="INVALID_MESSAGE_FORMAT",
        data="session_id is required",
    )
    return

# ...

if msg_type != "send":
    await self._send_chat_error(
        session,
        code="INVALID_MESSAGE_FORMAT",
        data=f"Unsupported message type: {msg_type}",
    )
    return

# ...

if session.is_processing:
    await self._send_chat_error(
        session,
        code="PROCESSING_ERROR",
        data="Session is busy",
    )
    return

The except block at the end of _handle_chat_message can also use this helper.


2. Split _handle_chat_message by command type

Right now _handle_chat_message handles bind, interrupt, and send inline. Extracting small handlers makes the control flow much clearer without touching the heavy accumulation loop yet.

async def _handle_chat_bind(
    self,
    session: LiveChatSession,
    chat_session_id: str | None,
) -> None:
    if not isinstance(chat_session_id, str) or not chat_session_id:
        await self._send_chat_error(
            session,
            code="INVALID_MESSAGE_FORMAT",
            data="session_id is required",
        )
        return

    request_id = await self._ensure_chat_subscription(session, chat_session_id)
    await self._send_chat_payload(
        session,
        {
            "ct": "chat",
            "type": "session_bound",
            "session_id": chat_session_id,
            "message_id": request_id,
        },
    )
async def _handle_chat_interrupt(self, session: LiveChatSession) -> None:
    session.should_interrupt = True
    await self._send_chat_error(
        session,
        code="INTERRUPTED",
        data="INTERRUPTED",
    )

Then _handle_chat_message becomes a thin dispatcher at the top, and only the send path contains the big flow:

async def _handle_chat_message(
    self,
    session: LiveChatSession,
    message: dict,
) -> None:
    msg_type = message.get("t")

    if msg_type == "bind":
        await self._handle_chat_bind(session, message.get("session_id"))
        return

    if msg_type == "interrupt":
        await self._handle_chat_interrupt(session)
        return

    if msg_type != "send":
        await self._send_chat_error(
            session,
            code="INVALID_MESSAGE_FORMAT",
            data=f"Unsupported message type: {msg_type}",
        )
        return

    await self._handle_chat_send(session, message)

And you can move the existing “send” logic (validation, queueing, history, accumulation loop) into _handle_chat_send with a straight cut & paste:

async def _handle_chat_send(
    self,
    session: LiveChatSession,
    message: dict,
) -> None:
    # current `send` branch body from _handle_chat_message goes here,
    # unchanged (payload validation, queue enqueue, accumulation loop, etc.)
    ...

This keeps all functionality intact but:

  • localizes error-response formatting
  • makes _handle_chat_message short and focused (just routing by t)
  • isolates the “big” send flow into _handle_chat_send, which can be further split later (validation, enqueue, accumulation) when you’re ready.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant new feature: selectable transport modes (SSE/WebSocket) for the chat functionality, involving extensive backend and frontend changes. However, it also introduces several critical security vulnerabilities, primarily related to broken access control and potential denial of service. Specifically, Insecure Direct Object Reference (IDOR) vulnerabilities in WebSocket session binding and attachment access could allow users to eavesdrop on other sessions or access unauthorized files. Additionally, there are potential DoS vectors through malformed WebSocket messages and blocking queues. Beyond security, general code quality issues include code duplication, maintainability challenges due to very large functions, and silent error handling. Addressing these security and code quality concerns is crucial for the stability and reliability of this new feature.

Comment on lines +344 to +368
if msg_type == "bind":
chat_session_id = message.get("session_id")
if not isinstance(chat_session_id, str) or not chat_session_id:
await self._send_chat_payload(
session,
{
"ct": "chat",
"t": "error",
"data": "session_id is required",
"code": "INVALID_MESSAGE_FORMAT",
},
)
return

request_id = await self._ensure_chat_subscription(session, chat_session_id)
await self._send_chat_payload(
session,
{
"ct": "chat",
"type": "session_bound",
"session_id": chat_session_id,
"message_id": request_id,
},
)
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

A critical Insecure Direct Object Reference (IDOR) vulnerability exists within the bind message type in the _handle_chat_message method. It allows users to specify any session_id without ownership verification, potentially enabling eavesdropping on other chat sessions. This specific security concern is part of a larger issue where the _handle_chat_message method is overly complex and handles various message types. Refactoring this method into smaller, more focused functions, particularly for handling different message types like bind, interrupt, and send, would improve both security and maintainability.

Comment on lines 173 to +179
async def _build_user_message_parts(self, message: str | list) -> list[dict]:
"""构建用户消息的部分列表

Args:
message: 文本消息 (str) 或消息段列表 (list)
"""
parts = []

if isinstance(message, list):
for part in message:
part_type = part.get("type")
if part_type == "plain":
parts.append({"type": "plain", "text": part.get("text", "")})
elif part_type == "reply":
parts.append(
{
"type": "reply",
"message_id": part.get("message_id"),
"selected_text": part.get("selected_text", ""),
}
)
elif attachment_id := part.get("attachment_id"):
attachment = await self.db.get_attachment_by_id(attachment_id)
if attachment:
parts.append(
{
"type": attachment.type,
"attachment_id": attachment.attachment_id,
"filename": os.path.basename(attachment.path),
"path": attachment.path, # will be deleted
}
)
return parts

if message:
parts.append({"type": "plain", "text": message})

return parts
"""构建用户消息的部分列表。"""
return await build_webchat_message_parts(
message,
get_attachment_by_id=self.db.get_attachment_by_id,
strict=False,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

The function _build_user_message_parts uses self.db.get_attachment_by_id to retrieve attachment details based on an attachment_id provided by the user. There is no ownership check to ensure that the attachment belongs to the user making the request. An attacker can provide an attachment_id belonging to another user, and the bot will retrieve the corresponding file path and process it.

Comment on lines +653 to +659
async def _build_chat_message_parts(self, message: list[dict]) -> list[dict]:
"""构建 chat websocket 用户消息段(复用 webchat 逻辑)"""
return await build_webchat_message_parts(
message,
get_attachment_by_id=self.db.get_attachment_by_id,
strict=False,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

The function _build_chat_message_parts uses self.db.get_attachment_by_id to retrieve attachment details based on an attachment_id provided by the user. There is no ownership check to ensure that the attachment belongs to the user making the request. An attacker can provide an attachment_id belonging to another user, and the bot will retrieve the corresponding file path and process it.

Comment on lines +258 to +262
return await build_message_chain_from_payload(
message_payload,
get_attachment_by_id=self.db.get_attachment_by_id,
strict=True,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

The function _build_message_chain_from_payload uses self.db.get_attachment_by_id to retrieve attachment details based on an attachment_id provided by the user. There is no ownership check to ensure that the attachment belongs to the user making the request. An attacker can provide an attachment_id belonging to another user, and the bot will retrieve the corresponding file path and process it.

Comment on lines +534 to +535
except Exception:
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Silently passing on all exceptions with except Exception: pass can hide bugs and make debugging very difficult. It's better to at least log the exception as a warning, so you're aware of potential issues during JSON parsing or other operations. This applies to other similar blocks in this function as well.

Suggested change
except Exception:
pass
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse tool_call JSON: {e}")

Comment on lines 166 to +167
message = await websocket.receive_json()
await self._handle_message(live_session, message)
ct = force_ct or message.get("ct", "live")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

The WebSocket message handling loop receives JSON data and immediately calls .get() on it. If a client sends a JSON array (which parses as a Python list) instead of a JSON object (which parses as a dict), the call to message.get("ct", "live") will raise an AttributeError, causing the WebSocket handler for that session to crash and disconnect the user.

Comment on lines +113 to +118
message_id = f"active_{uuid.uuid4()!s}"
await WebChatMessageEvent._send(
message_id,
message_chain,
session.session_id,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

When sending proactive messages and no active requests are found for a conversation, the adapter creates a new back queue with a random message_id and attempts to send the message. Since no client is listening for this random message_id, the messages will accumulate in the queue. Once the queue reaches its maximum size (default 512), subsequent calls to WebChatMessageEvent._send (which uses await queue.put()) will block indefinitely, potentially hanging the bot's execution for that task.

Comment on lines +17 to +23
def _extract_conversation_id(session_id: str) -> str:
"""Extract raw webchat conversation id from event/session id."""
if session_id.startswith("webchat!"):
parts = session_id.split("!", 2)
if len(parts) == 3:
return parts[2]
return session_id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This function _extract_conversation_id is also defined in astrbot/core/platform/sources/webchat/webchat_adapter.py. To adhere to the Don't Repeat Yourself (DRY) principle and improve maintainability, this duplicated logic should be consolidated into a single, shared location. Consider moving it to a utility file, such as the newly created message_parts_helper.py, and importing it where needed.

Comment on lines +120 to +124
should_persist = (
bool(subscription_request_ids)
or not active_request_ids
or all(req_id.startswith("active_") for req_id in active_request_ids)
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic to determine should_persist is quite complex and relies on the naming conventions of request IDs. To improve code clarity and make it easier for future developers to understand, please add a comment explaining the conditions under which a proactive message is persisted versus when persistence is delegated to another handler (e.g., for a standard request-response cycle).

Comment on lines +416 to 604
function createStreamChunkProcessor() {
let inStreaming = false;
let messageObj: MessageContent | null = null;

return async (chunkJson: StreamChunk) => {
if (!chunkJson || typeof chunkJson !== 'object') {
return;
}

if (chunkJson.type === 'session_id') {
return;
}

if (!chunkJson.type) {
return;
}

const lastMsg = messages.value[messages.value.length - 1];
if (lastMsg?.content?.isLoading) {
messages.value.pop();
}

if (chunkJson.type === 'error') {
console.error('Error received:', chunkJson.data);
return;
}

if (chunkJson.type === 'image') {
const img = String(chunkJson.data || '').replace('[IMAGE]', '');
const imageUrl = await getMediaFile(img);
const botResp: MessageContent = {
type: 'bot',
message: [{
type: 'image',
embedded_url: imageUrl
}]
};
messages.value.push({ content: botResp });
} else if (chunkJson.type === 'record') {
const audio = String(chunkJson.data || '').replace('[RECORD]', '');
const audioUrl = await getMediaFile(audio);
const botResp: MessageContent = {
type: 'bot',
message: [{
type: 'record',
embedded_url: audioUrl
}]
};
messages.value.push({ content: botResp });
} else if (chunkJson.type === 'file') {
const fileData = String(chunkJson.data || '').replace('[FILE]', '');
const [filename, originalName] = fileData.includes('|')
? fileData.split('|', 2)
: [fileData, fileData];
const fileUrl = await getMediaFile(filename);
const botResp: MessageContent = {
type: 'bot',
message: [{
type: 'file',
embedded_file: {
url: fileUrl,
filename: originalName
}
}]
};
messages.value.push({ content: botResp });
} else if (chunkJson.type === 'plain') {
const chainType = chunkJson.chain_type || 'normal';

if (chainType === 'tool_call') {
let toolCallData: any;
try {
toolCallData = JSON.parse(String(chunkJson.data || '{}'));
} catch {
return;
}

const toolCall: ToolCall = {
id: toolCallData.id,
name: toolCallData.name,
args: toolCallData.args,
ts: toolCallData.ts
};

if (!inStreaming) {
messageObj = reactive<MessageContent>({
type: 'bot',
message: [{
type: 'tool_call',
tool_calls: [toolCall]
}]
});
messages.value.push({ content: messageObj });
inStreaming = true;
} else {
const lastPart = messageObj!.message[messageObj!.message.length - 1];
if (lastPart?.type === 'tool_call') {
const existingIndex = lastPart.tool_calls!.findIndex((tc: ToolCall) => tc.id === toolCall.id);
if (existingIndex === -1) {
lastPart.tool_calls!.push(toolCall);
}
} else {
messageObj!.message.push({
type: 'tool_call',
tool_calls: [toolCall]
});
}
}
} else if (chainType === 'tool_call_result') {
let resultData: any;
try {
resultData = JSON.parse(String(chunkJson.data || '{}'));
} catch {
return;
}

if (messageObj) {
for (const part of messageObj.message) {
if (part.type === 'tool_call' && part.tool_calls) {
const toolCall = part.tool_calls.find((tc: ToolCall) => tc.id === resultData.id);
if (toolCall) {
toolCall.result = resultData.result;
toolCall.finished_ts = resultData.ts;
break;
}
}
}
}
} else if (chainType === 'reasoning') {
if (!inStreaming) {
messageObj = reactive<MessageContent>({
type: 'bot',
message: [],
reasoning: String(chunkJson.data || '')
});
messages.value.push({ content: messageObj });
inStreaming = true;
} else {
messageObj!.reasoning = (messageObj!.reasoning || '') + String(chunkJson.data || '');
}
} else {
if (!inStreaming) {
messageObj = reactive<MessageContent>({
type: 'bot',
message: [{
type: 'plain',
text: String(chunkJson.data || '')
}]
});
messages.value.push({ content: messageObj });
inStreaming = true;
} else {
const lastPart = messageObj!.message[messageObj!.message.length - 1];
if (lastPart?.type === 'plain') {
lastPart.text = (lastPart.text || '') + String(chunkJson.data || '');
} else {
messageObj!.message.push({
type: 'plain',
text: String(chunkJson.data || '')
});
}
}
}
} else if (chunkJson.type === 'update_title') {
if (chunkJson.session_id) {
updateSessionTitle(chunkJson.session_id, chunkJson.data);
}
} else if (chunkJson.type === 'message_saved') {
const lastBotMsg = messages.value[messages.value.length - 1];
if (lastBotMsg && lastBotMsg.content?.type === 'bot') {
lastBotMsg.id = chunkJson.data?.id;
lastBotMsg.created_at = chunkJson.data?.created_at;
}
} else if (chunkJson.type === 'agent_stats') {
if (messageObj) {
messageObj.agentStats = chunkJson.data;
}
}

if (typeof chunkJson.streaming === 'boolean') {
if ((chunkJson.type === 'break' && chunkJson.streaming) || !chunkJson.streaming) {
inStreaming = false;
if (!chunkJson.streaming) {
isStreaming.value = false;
}
}
}
};
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The createStreamChunkProcessor function is very long and contains complex conditional logic for handling different types of stream chunks. This makes it difficult to read and maintain. To improve its structure, consider breaking down the logic into smaller helper functions, each responsible for processing a specific chunk type (e.g., processPlainTextChunk, processToolCallChunk, etc.). This will make the code more modular and easier to understand.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feature:chatui The bug / feature is about astrbot's chatui, webchat size:XXL This PR changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant