Skip to content

feat(platform): add CLI Tester for plugin testing and debugging#4787

Open
YukiRa1n wants to merge 39 commits intoAstrBotDevs:masterfrom
YukiRa1n:master
Open

feat(platform): add CLI Tester for plugin testing and debugging#4787
YukiRa1n wants to merge 39 commits intoAstrBotDevs:masterfrom
YukiRa1n:master

Conversation

@YukiRa1n
Copy link
Contributor

@YukiRa1n YukiRa1n commented Jan 31, 2026

CLI Tester - 命令行测试平台

概述

为 AstrBot 添加 CLI 平台适配器,支持通过命令行直接测试插件功能,无需配置 IM 平台。

功能特性

核心功能

  • Socket 模式: 支持 Unix Socket (Linux/macOS) 和 TCP Socket (Windows/容器)
  • File 模式: 批量文件输入测试
  • 会话隔离: 支持 use_isolated_sessions 配置,每个请求独立会话
  • Token 认证: 自动生成安全 Token,支持认证校验

消息处理

  • 富媒体支持: 图片自动转 Base64 编码
  • JSON 响应: 结构化响应格式,包含文本、图片、状态
  • 多轮回复收集: 支持 LLM 多轮响应聚合
  • CLI 客户端: pip 安装后全局可用 astr 命令(基于 click 库)

函数工具管理

  • 工具列表: astr tool ls 查看所有注册的函数工具(支持按来源过滤:plugin/mcp/builtin)
  • 工具详情: astr tool info <name> 查看工具的参数定义和描述
  • 工具调用: astr tool call <name> [json_args] 直接调用函数工具进行测试

文件架构

平台适配器(扁平结构,与 Telegram/DingTalk 等适配器风格一致)

astrbot/core/platform/sources/cli/          (1,863 行)
├── __init__.py              12    # 模块导出
├── cli_adapter.py          377    # 主适配器(含配置加载、Token管理、会话管理)
├── cli_event.py            364    # CLI 事件(含消息转换、响应构建、图片处理)
├── socket_server.py        249    # Socket 服务器(TCP/Unix 实现 + 工厂 + 平台检测)
├── socket_handler.py       736    # Socket 连接处理器(含连接信息写入、函数工具管理)
└── file_handler.py         125    # 文件轮询模式处理器

CLI 客户端(click 子命令架构)

astrbot/cli/client/                         (1,553 行)
├── __init__.py               4    # 客户端包入口
├── __main__.py             152    # astr 命令入口 (click Group + RawEpilogGroup)
├── connection.py           424    # Socket 通信(send_message / get_logs / list_tools / call_tool)
├── output.py                84    # 格式化输出(图片占位符 / Git Bash 路径兼容)
└── commands/
    ├── __init__.py          97    # 命令注册(含 help/sid/t2i/tts/batch 快捷别名)
    ├── send.py              47    # 发送消息命令
    ├── log.py              131    # 日志查看命令
    ├── conv.py              73    # 会话管理(ls/new/switch/del/rename/reset/history)
    ├── plugin.py            47    # 插件管理(ls/on/off/help)
    ├── provider.py          55    # Provider/Model/Key 管理
    ├── tool.py             133    # 函数工具管理(ls/info/call)
    ├── debug.py            135    # 调试工具(ping/status/test echo/test plugin)
    └── session.py          171    # 跨会话浏览(ls/convs/history)

代码风格对齐

适配器结构扁平化,与其他平台适配器风格一致:

  • 删除过度工程化模块(Protocol/ABC 接口层、AOP 装饰器集合、结构化 [TAG] 日志)
  • 配置加载改为 dict.get() 模式(同 Telegram)
  • 日志简化为 f-string 格式
  • Token管理、会话管理内联到 cli_adapter.py
  • 消息转换、响应构建、图片处理内联到 cli_event.py

采纳的 Review 修改

1. CLI 客户端改用 click(@LIghtJUNction review)

  • argparse -> click,与项目已有的 astrbot CLI 风格统一
  • click 子命令结构:astr send / astr log / astr conv / astr plugin
  • 支持 shell Tab 补全(Bash/Zsh/Fish 原生支持)
  • 完全向后兼容:所有旧用法不变

2. asyncio API 修复(@sourcery-ai review)

  • Socket 服务器: asyncio.get_event_loop() -> asyncio.get_running_loop()
  • Python 3.10+ 弃用 get_event_loop(),async 函数中应使用 get_running_loop()

使用方式

cd AstrBot
pip install -e .
astr --help                    # 查看所有支持的指令

常用命令示例:

astr 你好                      # 发送消息
astr conv ls                   # 列出对话
astr plugin ls                 # 列出插件
astr log                       # 查看日志
astr tool ls                   # 列出函数工具

服务端命令:

astrbot run                    # 运行
astrbot restart                # 重启
astrbot stop                   # 停止

兼容性

  • Linux: Unix Socket (默认)
  • macOS: Unix Socket (默认)
  • Windows: TCP Socket (自动检测)
  • Docker 容器: TCP Socket (自动检测)
  • Git Bash: 自动兼容路径转换

@auto-assign auto-assign bot requested review from Fridemn and anka-afk January 31, 2026 18:22
@dosubot dosubot bot added size:XXL This PR changes 1000+ lines, ignoring generated files. area:platform The bug / feature is about IM platform adapter, such as QQ, Lark, Telegram, WebChat and so on. labels Jan 31, 2026
Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - 我发现了两个问题,并留下了一些高层面的反馈:

  • 当前 CLI 适配器将诸如 /AstrBot/data/{config_file}/tmp/astrbot.sock 这样的路径写死;建议从现有的配置/根路径工具中派生这些路径,或者允许通过配置覆盖,这样在非 Docker 或非默认部署环境中该特性也能正常工作。
  • 白名单检查对 cli 平台无条件绕过校验;如果你预期有些环境会以更受控的方式使用 CLI,那么将该行为放在一个配置开关后面,可能会比硬编码豁免更安全。
  • 对于 Unix 套接字服务器,你可能需要显式控制文件权限/所有权(例如通过 os.chmod 或 umask),并在 JSON 载荷上增加最小的分帧/大小检查,以避免在多个客户端连接时因部分读取或超大读取导致的问题。
供 AI Agents 使用的提示词
请根据这次代码审查中的评论进行修改:

## 整体评论
- 当前 CLI 适配器将诸如 `/AstrBot/data/{config_file}``/tmp/astrbot.sock` 这样的路径写死;建议从现有的配置/根路径工具中派生这些路径,或者允许通过配置覆盖,这样在非 Docker 或非默认部署环境中该特性也能正常工作。
- 白名单检查对 `cli` 平台无条件绕过校验;如果你预期有些环境会以更受控的方式使用 CLI,那么将该行为放在一个配置开关后面,可能会比硬编码豁免更安全。
- 对于 Unix 套接字服务器,你可能需要显式控制文件权限/所有权(例如通过 `os.chmod` 或 umask),并在 JSON 载荷上增加最小的分帧/大小检查,以避免在多个客户端连接时因部分读取或超大读取导致的问题。

## 单独评论

### 评论 1
<location> `astrbot/core/platform/sources/cli/cli_adapter.py:302` </location>
<code_context>
+            while self._running:
+                try:
+                    # 接受连接(非阻塞)
+                    loop = asyncio.get_event_loop()
+                    client_socket, _ = await loop.sock_accept(server_socket)
+
</code_context>

<issue_to_address>
**suggestion (bug_risk):** 在异步代码中使用 asyncio.get_event_loop() 并不推荐;asyncio.get_running_loop() 更安全且更具前向兼容性。

在 Python 3.10+ 中,`asyncio.get_event_loop()` 在异步代码中已被弃用,并且在某些策略下可能返回错误的事件循环。在 `_run_socket_mode` 中(以及类似的 `_handle_socket_client` / `_read_input` 中),请使用 `asyncio.get_running_loop()`,以确保你获取的是当前任务所在的事件循环。

建议实现:

```python
                    # 接受连接(非阻塞)
                    loop = asyncio.get_running_loop()
                    client_socket, _ = await loop.sock_accept(server_socket)

````astrbot/core/platform/sources/cli/cli_adapter.py` 中搜索其他在 `async def` 函数内部使用 `asyncio.get_event_loop()` 的地方,尤其是 `_handle_socket_client``_read_input`,并以类似方式替换:

- 当上下文为运行在事件循环上的异步代码时,将 `loop = asyncio.get_event_loop()` 替换为 `loop = asyncio.get_running_loop()`。

如果在纯同步初始化代码中(不在活动事件循环内)有 `asyncio.get_event_loop()` 的使用,应单独进行审查,因为它们可能需要不同的模式(例如使用 `asyncio.new_event_loop()` 显式创建事件循环,而不是使用 `get_running_loop()`)。
</issue_to_address>

### 评论 2
<location> `astrbot/core/platform/sources/cli/cli_adapter.py:134` </location>
<code_context>
+        logger.info("[ENTRY] CLIPlatformAdapter.run inputs={}")
+        return self._run_loop()
+
+    async def _run_loop(self) -> None:
+        """主运行循环
+
</code_context>

<issue_to_address>
**issue (complexity):** 建议通过抽取小的辅助函数、集中复用逻辑以及将模式映射到处理函数的方式来重构 CLI 适配器,使代码在不改变行为的前提下更易理解。

在不改变行为的前提下,你可以通过抽取一些聚焦的小型辅助函数并整合重复逻辑,显著降低复杂度。下面是一些具体、局部化的重构建议,可以保留现有设计,但让代码更易于推理。

---

### 1. 使用策略映射简化模式选择

`_run_loop` 目前在函数体中同时包含 TTY 检测和分支逻辑。你可以将模式解析抽取到一个小的辅助函数中,并将模式映射到可调用对象:

```python
# in __init__
self._mode_handlers: dict[str, callable[[], Awaitable[None]]] = {
    "tty": self._run_tty_mode,
    "file": self._run_file_mode,
    "socket": self._run_socket_mode,
}

def _resolve_mode(self) -> str:
    has_tty = sys.stdin.isatty()
    if self.mode == "auto":
        return "file" if not has_tty else "tty"
    if self.mode in ("tty", "file", "socket"):
        if self.mode == "tty" and not has_tty:
            logger.warning(
                "[PROCESS] TTY mode requested but no TTY detected. "
                "CLI platform will not start."
            )
            return ""  # or None
        return self.mode
    logger.error("[ERROR] Unknown mode: %s", self.mode)
    return ""
```

```python
async def _run_loop(self) -> None:
    logger.info("[PROCESS] Starting CLI loop")

    if self.use_isolated_sessions:
        self._cleanup_task = asyncio.create_task(self._cleanup_expired_sessions())

    mode = self._resolve_mode()
    if not mode:
        return

    handler = self._mode_handlers.get(mode)
    if handler:
        logger.info("[PROCESS] Starting %s mode", mode)
        await handler()
```

这样可以在不改变行为的前提下,压平分支结构,并把 TTY 逻辑局部化。

---

### 2. 抽取一个小型 `SessionTracker` 辅助类

`_convert_input``_cleanup_expired_sessions` 都会操作 `_session_timestamps`。将它们移入一个小型辅助类中,可以将消息构建和会话生命周期解耦:

```python
class _SessionTracker:
    def __init__(self, ttl: int) -> None:
        import time
        self._ttl = ttl
        self._timestamps: dict[str, float] = {}
        self._time = time

    def ensure_session(self, base_session_id: str, request_id: str | None) -> str:
        if request_id is None:
            return base_session_id
        session_id = f"{base_session_id}_{request_id}"
        if session_id not in self._timestamps:
            self._timestamps[session_id] = self._time.time()
        return session_id

    def collect_expired(self) -> list[str]:
        now = self._time.time()
        expired = [
            s for s, ts in list(self._timestamps.items())
            if now - ts > self._ttl
        ]
        for s in expired:
            self._timestamps.pop(s, None)
        return expired
```

将其接入适配器:

```python
# __init__
self._session_tracker = _SessionTracker(self.session_ttl)
```

```python
def _convert_input(self, text: str, request_id: str | None = None) -> AstrBotMessage:
    ...
    if self.use_isolated_sessions and request_id:
        message.session_id = self._session_tracker.ensure_session(
            base_session_id="cli_session",
            request_id=request_id,
        )
    else:
        message.session_id = self.session_id
    ...
```

```python
async def _cleanup_expired_sessions(self) -> None:
    logger.info("[ENTRY] _cleanup_expired_sessions started, TTL=%s seconds", self.session_ttl)
    while self._running:
        try:
            await asyncio.sleep(10)
            if not self.use_isolated_sessions:
                continue

            expired_sessions = self._session_tracker.collect_expired()
            for session_id in expired_sessions:
                logger.info("[PROCESS] Cleaning expired session: %s", session_id)
                # TODO: DB cleanup if needed

            if expired_sessions:
                logger.info("[PROCESS] Cleaned %d expired sessions", len(expired_sessions))
        except Exception as e:
            logger.error("[ERROR] Session cleanup error: %s", e)
```

这样可以避免在多个位置直接修改 `_session_timestamps`,并将会话策略集中到一个地方。

---

### 3. 去重图片提取/归一化逻辑

你已经在 `_handle_socket_client` 中实现了图片提取和 base64 转换。如果 `CLIMessageEvent.send`(或类似逻辑)中有重叠代码,可以将其集中到一个可复用的辅助函数中:

```python
from astrbot.core.message.components import Image

def normalize_images_from_chain(message_chain: MessageChain) -> list[dict[str, Any]]:
    import base64
    images: list[dict[str, Any]] = []

    for comp in message_chain.chain:
        if not isinstance(comp, Image) or not comp.file:
            continue

        info: dict[str, Any] = {}
        if comp.file.startswith("http"):
            info["type"] = "url"
            info["url"] = comp.file

        elif comp.file.startswith("file:///"):
            info["type"] = "file"
            file_path = comp.file[8:]
            info["path"] = file_path
            try:
                with open(file_path, "rb") as f:
                    raw = f.read()
                info["base64_data"] = base64.b64encode(raw).decode("utf-8")
                info["size"] = len(raw)
            except Exception as e:
                logger.error("[ERROR] Failed to read image file %s: %s", file_path, e)
                info["error"] = str(e)

        elif comp.file.startswith("base64://"):
            info["type"] = "base64"
            base64_data = comp.file[9:]
            info["base64_data"] = base64_data
            info["base64_length"] = len(base64_data)

        images.append(info)

    return images
````_handle_socket_client` 中使用:

```python
from .image_utils import normalize_images_from_chain  # or local helper

...
message_chain = await asyncio.wait_for(response_future, timeout=30.0)
response_text = message_chain.get_plain_text()
images = normalize_images_from_chain(message_chain)

response = json.dumps(
    {
        "status": "success",
        "response": response_text,
        "images": images,
        "request_id": request_id,
    },
    ensure_ascii=False,
)
await loop.sock_sendall(client_socket, response.encode("utf-8"))
```

并在 `CLIMessageEvent` 或其他复用该逻辑的代码路径中同样复用该辅助函数。

---

### 4. 将 `_handle_socket_client` 拆分为更小的步骤

在不改变行为的前提下,你可以把该方法拆分成解析 / 处理 / 序列化等辅助函数,使其更易测试且更短:

```python
def _parse_socket_request(self, raw: bytes) -> tuple[dict[str, Any], str, str]:
    import json
    request = json.loads(raw.decode("utf-8"))
    message_text = request.get("message", "")
    request_id = request.get("request_id", str(uuid.uuid4()))
    return request, message_text, request_id

def _serialize_error(self, request_id: str, msg: str) -> bytes:
    import json
    return json.dumps(
        {"status": "error", "error": msg, "request_id": request_id},
        ensure_ascii=False,
    ).encode("utf-8")

def _serialize_success(self, request_id: str, chain: MessageChain) -> bytes:
    import json
    response_text = chain.get_plain_text()
    images = normalize_images_from_chain(chain)
    return json.dumps(
        {
            "status": "success",
            "response": response_text,
            "images": images,
            "request_id": request_id,
        },
        ensure_ascii=False,
    ).encode("utf-8")
````_handle_socket_client` 中使用它们:

```python
async def _handle_socket_client(self, client_socket) -> None:
    import json
    logger.debug("[ENTRY] _handle_socket_client")
    loop = asyncio.get_event_loop()

    try:
        data = await loop.sock_recv(client_socket, 4096)
        if not data:
            logger.debug("[PROCESS] Empty request, closing connection")
            return

        try:
            _, message_text, request_id = self._parse_socket_request(data)
        except json.JSONDecodeError:
            await loop.sock_sendall(
                client_socket, self._serialize_error("", "Invalid JSON format")
            )
            return

        response_future: asyncio.Future[MessageChain] = asyncio.Future()
        message = self._convert_input(message_text, request_id=request_id)

        message_event = CLIMessageEvent(
            message_str=message.message_str,
            message_obj=message,
            platform_meta=self.meta(),
            session_id=message.session_id,
            output_queue=self._output_queue,
            response_future=response_future,
        )
        self.commit_event(message_event)

        try:
            chain = await asyncio.wait_for(response_future, timeout=30.0)
            await loop.sock_sendall(
                client_socket, self._serialize_success(request_id, chain)
            )
        except asyncio.TimeoutError:
            await loop.sock_sendall(
                client_socket, self._serialize_error(request_id, "Request timeout")
            )
    except Exception as e:
        logger.error("[ERROR] Socket client handler error: %s", e)
        import traceback
        logger.error(traceback.format_exc())
    finally:
        client_socket.close()
        logger.debug("[EXIT] _handle_socket_client return=None")
```

这样可以在保持现有行为的同时,将这个较长的方法拆分为更小、可单独测试的步骤。

---

这些改动在保持现有特性和流程的前提下,可以:

- 将会话策略从消息转换逻辑中分离出来;
- 降低模式选择时的分支复杂度;
- 去除图片处理中的重复代码;
- 将套接字客户端处理逻辑拆分得更短、更清晰。
</issue_to_address>

Sourcery 对开源项目免费——如果你喜欢我们的评审,请考虑分享 ✨
帮我变得更有用!请在每条评论上点击 👍 或 👎,我会根据这些反馈来改进后续的评审。
Original comment in English

Hey - I've found 2 issues, and left some high level feedback:

  • The CLI adapter currently hardcodes paths like /AstrBot/data/{config_file} and /tmp/astrbot.sock; consider deriving these from existing config/root-path utilities or allowing them to be overridden so the feature works in non-Docker or non-default deployments.
  • The whitelist check unconditionally bypasses validation for the cli platform; if you expect some environments to use CLI in a more controlled way, it might be safer to gate this behavior behind a config flag rather than hardcoding the exemption.
  • For the Unix socket server, you may want to explicitly control file permissions/ownership (e.g., via os.chmod or umask) and add minimal framing/size checks on the JSON payload to avoid issues with partial or oversized reads when multiple clients connect.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The CLI adapter currently hardcodes paths like `/AstrBot/data/{config_file}` and `/tmp/astrbot.sock`; consider deriving these from existing config/root-path utilities or allowing them to be overridden so the feature works in non-Docker or non-default deployments.
- The whitelist check unconditionally bypasses validation for the `cli` platform; if you expect some environments to use CLI in a more controlled way, it might be safer to gate this behavior behind a config flag rather than hardcoding the exemption.
- For the Unix socket server, you may want to explicitly control file permissions/ownership (e.g., via `os.chmod` or umask) and add minimal framing/size checks on the JSON payload to avoid issues with partial or oversized reads when multiple clients connect.

## Individual Comments

### Comment 1
<location> `astrbot/core/platform/sources/cli/cli_adapter.py:302` </location>
<code_context>
+            while self._running:
+                try:
+                    # 接受连接(非阻塞)
+                    loop = asyncio.get_event_loop()
+                    client_socket, _ = await loop.sock_accept(server_socket)
+
</code_context>

<issue_to_address>
**suggestion (bug_risk):** Using asyncio.get_event_loop() in async code is discouraged; asyncio.get_running_loop() is safer and future‑proof.

In Python 3.10+, `asyncio.get_event_loop()` is deprecated in async code and can return the wrong loop under some policies. In `_run_socket_mode` (and similarly in `_handle_socket_client` / `_read_input`), please use `asyncio.get_running_loop()` so you reliably get the loop for the current task.

Suggested implementation:

```python
                    # 接受连接(非阻塞)
                    loop = asyncio.get_running_loop()
                    client_socket, _ = await loop.sock_accept(server_socket)

```

Search in `astrbot/core/platform/sources/cli/cli_adapter.py` for other occurrences of `asyncio.get_event_loop()` used inside `async def` functions, especially in `_handle_socket_client` and `_read_input`, and replace them similarly:

- Change `loop = asyncio.get_event_loop()` to `loop = asyncio.get_running_loop()` where the surrounding context is async code running on the event loop.

If there are any uses of `asyncio.get_event_loop()` in purely synchronous initialization code (outside of an active event loop), those should be reviewed separately, as they may need a different pattern (e.g., explicitly creating a loop with `asyncio.new_event_loop()` rather than `get_running_loop()`).
</issue_to_address>

### Comment 2
<location> `astrbot/core/platform/sources/cli/cli_adapter.py:134` </location>
<code_context>
+        logger.info("[ENTRY] CLIPlatformAdapter.run inputs={}")
+        return self._run_loop()
+
+    async def _run_loop(self) -> None:
+        """主运行循环
+
</code_context>

<issue_to_address>
**issue (complexity):** Consider refactoring the CLI adapter by extracting small helpers, centralizing shared logic, and mapping modes to handlers to make the code easier to follow without changing behavior.

You can reduce complexity meaningfully without changing behavior by extracting a few focused helpers and consolidating duplicated logic. Here are concrete, localized refactors that keep the existing design but make it easier to reason about.

---

### 1. Simplify mode selection with a strategy map

`_run_loop` currently mixes TTY detection and branching logic inline. You can pull the mode resolution into a small helper and map modes to callables:

```python
# in __init__
self._mode_handlers: dict[str, callable[[], Awaitable[None]]] = {
    "tty": self._run_tty_mode,
    "file": self._run_file_mode,
    "socket": self._run_socket_mode,
}

def _resolve_mode(self) -> str:
    has_tty = sys.stdin.isatty()
    if self.mode == "auto":
        return "file" if not has_tty else "tty"
    if self.mode in ("tty", "file", "socket"):
        if self.mode == "tty" and not has_tty:
            logger.warning(
                "[PROCESS] TTY mode requested but no TTY detected. "
                "CLI platform will not start."
            )
            return ""  # or None
        return self.mode
    logger.error("[ERROR] Unknown mode: %s", self.mode)
    return ""
```

```python
async def _run_loop(self) -> None:
    logger.info("[PROCESS] Starting CLI loop")

    if self.use_isolated_sessions:
        self._cleanup_task = asyncio.create_task(self._cleanup_expired_sessions())

    mode = self._resolve_mode()
    if not mode:
        return

    handler = self._mode_handlers.get(mode)
    if handler:
        logger.info("[PROCESS] Starting %s mode", mode)
        await handler()
```

This flattens the branching and localizes TTY logic without changing behavior.

---

### 2. Extract a small `SessionTracker` helper

`_convert_input` and `_cleanup_expired_sessions` both manipulate `_session_timestamps`. Move this into a tiny helper to decouple message construction from session lifecycle:

```python
class _SessionTracker:
    def __init__(self, ttl: int) -> None:
        import time
        self._ttl = ttl
        self._timestamps: dict[str, float] = {}
        self._time = time

    def ensure_session(self, base_session_id: str, request_id: str | None) -> str:
        if request_id is None:
            return base_session_id
        session_id = f"{base_session_id}_{request_id}"
        if session_id not in self._timestamps:
            self._timestamps[session_id] = self._time.time()
        return session_id

    def collect_expired(self) -> list[str]:
        now = self._time.time()
        expired = [
            s for s, ts in list(self._timestamps.items())
            if now - ts > self._ttl
        ]
        for s in expired:
            self._timestamps.pop(s, None)
        return expired
```

Wire it into the adapter:

```python
# __init__
self._session_tracker = _SessionTracker(self.session_ttl)
```

```python
def _convert_input(self, text: str, request_id: str | None = None) -> AstrBotMessage:
    ...
    if self.use_isolated_sessions and request_id:
        message.session_id = self._session_tracker.ensure_session(
            base_session_id="cli_session",
            request_id=request_id,
        )
    else:
        message.session_id = self.session_id
    ...
```

```python
async def _cleanup_expired_sessions(self) -> None:
    logger.info("[ENTRY] _cleanup_expired_sessions started, TTL=%s seconds", self.session_ttl)
    while self._running:
        try:
            await asyncio.sleep(10)
            if not self.use_isolated_sessions:
                continue

            expired_sessions = self._session_tracker.collect_expired()
            for session_id in expired_sessions:
                logger.info("[PROCESS] Cleaning expired session: %s", session_id)
                # TODO: DB cleanup if needed

            if expired_sessions:
                logger.info("[PROCESS] Cleaned %d expired sessions", len(expired_sessions))
        except Exception as e:
            logger.error("[ERROR] Session cleanup error: %s", e)
```

This removes direct mutation of `_session_timestamps` from multiple places and keeps session policy in one spot.

---

### 3. Deduplicate image extraction/normalization

You already do image extraction and base64 conversion in `_handle_socket_client`. If `CLIMessageEvent.send` (or similar) has overlapping logic, centralize it in a reusable helper:

```python
from astrbot.core.message.components import Image

def normalize_images_from_chain(message_chain: MessageChain) -> list[dict[str, Any]]:
    import base64
    images: list[dict[str, Any]] = []

    for comp in message_chain.chain:
        if not isinstance(comp, Image) or not comp.file:
            continue

        info: dict[str, Any] = {}
        if comp.file.startswith("http"):
            info["type"] = "url"
            info["url"] = comp.file

        elif comp.file.startswith("file:///"):
            info["type"] = "file"
            file_path = comp.file[8:]
            info["path"] = file_path
            try:
                with open(file_path, "rb") as f:
                    raw = f.read()
                info["base64_data"] = base64.b64encode(raw).decode("utf-8")
                info["size"] = len(raw)
            except Exception as e:
                logger.error("[ERROR] Failed to read image file %s: %s", file_path, e)
                info["error"] = str(e)

        elif comp.file.startswith("base64://"):
            info["type"] = "base64"
            base64_data = comp.file[9:]
            info["base64_data"] = base64_data
            info["base64_length"] = len(base64_data)

        images.append(info)

    return images
```

Then in `_handle_socket_client`:

```python
from .image_utils import normalize_images_from_chain  # or local helper

...
message_chain = await asyncio.wait_for(response_future, timeout=30.0)
response_text = message_chain.get_plain_text()
images = normalize_images_from_chain(message_chain)

response = json.dumps(
    {
        "status": "success",
        "response": response_text,
        "images": images,
        "request_id": request_id,
    },
    ensure_ascii=False,
)
await loop.sock_sendall(client_socket, response.encode("utf-8"))
```

And reuse the same helper wherever you currently replicate that logic in `CLIMessageEvent` or other code paths.

---

### 4. Factor `_handle_socket_client` into smaller steps

Without changing behavior, you can split the method into parsing / processing / serializing helpers to make it testable and shorter:

```python
def _parse_socket_request(self, raw: bytes) -> tuple[dict[str, Any], str, str]:
    import json
    request = json.loads(raw.decode("utf-8"))
    message_text = request.get("message", "")
    request_id = request.get("request_id", str(uuid.uuid4()))
    return request, message_text, request_id

def _serialize_error(self, request_id: str, msg: str) -> bytes:
    import json
    return json.dumps(
        {"status": "error", "error": msg, "request_id": request_id},
        ensure_ascii=False,
    ).encode("utf-8")

def _serialize_success(self, request_id: str, chain: MessageChain) -> bytes:
    import json
    response_text = chain.get_plain_text()
    images = normalize_images_from_chain(chain)
    return json.dumps(
        {
            "status": "success",
            "response": response_text,
            "images": images,
            "request_id": request_id,
        },
        ensure_ascii=False,
    ).encode("utf-8")
```

Use them in `_handle_socket_client`:

```python
async def _handle_socket_client(self, client_socket) -> None:
    import json
    logger.debug("[ENTRY] _handle_socket_client")
    loop = asyncio.get_event_loop()

    try:
        data = await loop.sock_recv(client_socket, 4096)
        if not data:
            logger.debug("[PROCESS] Empty request, closing connection")
            return

        try:
            _, message_text, request_id = self._parse_socket_request(data)
        except json.JSONDecodeError:
            await loop.sock_sendall(
                client_socket, self._serialize_error("", "Invalid JSON format")
            )
            return

        response_future: asyncio.Future[MessageChain] = asyncio.Future()
        message = self._convert_input(message_text, request_id=request_id)

        message_event = CLIMessageEvent(
            message_str=message.message_str,
            message_obj=message,
            platform_meta=self.meta(),
            session_id=message.session_id,
            output_queue=self._output_queue,
            response_future=response_future,
        )
        self.commit_event(message_event)

        try:
            chain = await asyncio.wait_for(response_future, timeout=30.0)
            await loop.sock_sendall(
                client_socket, self._serialize_success(request_id, chain)
            )
        except asyncio.TimeoutError:
            await loop.sock_sendall(
                client_socket, self._serialize_error(request_id, "Request timeout")
            )
    except Exception as e:
        logger.error("[ERROR] Socket client handler error: %s", e)
        import traceback
        logger.error(traceback.format_exc())
    finally:
        client_socket.close()
        logger.debug("[EXIT] _handle_socket_client return=None")
```

This keeps the same behavior but breaks the long method into smaller, independently testable pieces.

---

These changes preserve the existing feature set and flow, but:

- Isolate session policy from message conversion.
- Reduce branching complexity in mode selection.
- Remove duplication in image handling.
- Shorten and clarify the socket client handler into clear sub-steps.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

YukiRa1n added a commit to YukiRa1n/AstrBot that referenced this pull request Jan 31, 2026
- Replace hardcoded paths with dynamic path resolution
  - Use get_astrbot_data_path() for data directory
  - Use get_astrbot_temp_path() for temp directory
  - Support ASTRBOT_ROOT environment variable

- Fix asyncio deprecation warnings
  - Replace asyncio.get_event_loop() with get_running_loop()
  - Improve Python 3.10+ compatibility

- Add socket file permission control
  - Set socket permissions to 600 (owner-only)
  - Add security logging

- Update astrbot-cli client
  - Add dynamic path resolution functions
  - Match server-side path logic
  - Improve cross-environment compatibility

Addresses code review feedback from PR AstrBotDevs#4787
@Clhikari
Copy link
Contributor

Clhikari commented Feb 2, 2026

建议加个Windows兼容性支持

@YukiRa1n YukiRa1n force-pushed the master branch 2 times, most recently from 4aec9ef to b070282 Compare February 2, 2026 16:43
astrbot-doc-agent bot pushed a commit to AstrBotDevs/AstrBot-docs that referenced this pull request Feb 2, 2026
@astrbot-doc-agent
Copy link

已为该 PR 生成文档更新 PR(待人工审核):
AstrBotDevs/AstrBot-docs#115


AI 改动摘要:

根据 PR 内容,我已更新了文档,增加了关于 CLI 测试器 (CLI Tester) 的说明。

修改内容:

  1. 新增文档页面
    • zh/dev/cli-tester.md:详细介绍了 CLI 测试器的核心价值、启用方法、使用方式(包括 astrbot-cli 工具)以及核心功能。
    • en/dev/cli-tester.md:上述文档的英文版本。
  2. 更新侧边栏配置
    • .vitepress/config.mjs 中,将 "CLI 测试器" (CLI Tester) 添加到 "开发" (Development) 栏目下,方便开发者快速找到。

文档要点:

  • 定位:专为插件开发者设计,用于构建快速反馈循环,支持 Vibe Coding 模式。
  • 功能:支持 Unix Socket 模式、会话隔离、富媒体(图片)Base64 编码、白名单豁免。
  • 用法:通过 astrbot-cli 工具在命令行直接与机器人交互,无需登录 IM 平台。

@YukiRa1n
Copy link
Contributor Author

YukiRa1n commented Feb 2, 2026

建议加个Windows兼容性支持

现已支持

python astrbot-cli "你好"

在astrbot根目录执行这个就可以了

图片 这是效果

@YukiRa1n
Copy link
Contributor Author

YukiRa1n commented Feb 2, 2026

比如说后续对bot本体进行开发,也可以随时部署,随时就测试,而无需繁琐的适配器配置

astrbot-doc-agent bot pushed a commit to AstrBotDevs/AstrBot-docs that referenced this pull request Feb 2, 2026
@astrbot-doc-agent
Copy link

已为该 PR 生成文档更新 PR(待人工审核):
AstrBotDevs/AstrBot-docs#115


AI 改动摘要:

根据上游 PR #4787 的改动,我已更新了文档以包含新引入的 CLI Tester (CLI测试器)

主要改动:

  1. 新增详细指南页
    • 创建了 zh/dev/star/guides/test-with-cli.mden/dev/star/guides/test-with-cli.md,详细介绍了 CLI Tester 的核心价值、启用步骤、基础用法(包括 Docker 命令示例)以及进阶技巧(如会话隔离)。
  2. 新增平台接入页
    • 创建了 zh/deploy/platform/cli-tester.mden/deploy/platform/cli-tester.md,作为平台接入列表中的入口点。
  3. 更新插件开发指南
    • zh/dev/star/plugin-new.mden/dev/star/plugin-new.md 的“调试插件”章节中添加了关于使用 CLI Tester 的说明和链接。
  4. 更新侧边栏导航
    • .vitepress/config.mjs 中,将 “CLI 测试器” 添加到了“接入到消息平台”和“插件开发”两个侧边栏分类中,方便用户快速找到。

文档摘要:

  • CLI Tester:专为开发者设计的本地测试工具,无需连接 QQ/微信等平台即可通过命令行调试插件。
  • 快速反馈:支持通过 astrbot-cli 工具发送消息并即时获取文本或富媒体(Base64 格式)响应。
  • 会话隔离:支持模拟独立会话,适合并发测试和上下文逻辑验证。
  • 白名单豁免:CLI 平台自动豁免白名单检查,简化测试流程。

@YukiRa1n YukiRa1n force-pushed the master branch 3 times, most recently from 4cad0a4 to 0a865f2 Compare February 5, 2026 10:11
@LIghtJUNction
Copy link
Member

修改pyproject.toml
把命令行程序入口加在这后面:
[project.scripts]
astrbot = "astrbot.cli.main:cli"
其实现在已经有一个了
往后面新增
调用时前面就不需要加上python
直接astrbot-cli就好了
建议重命名为astr以免和现有的重名
功能划分:
astrbot:拉起daemon进程,以及一些基础功能

astr : 侧重于开发,以及更加复杂的功能,控制daemon进程等

我建议在后面加上
astr = "astrbot.core.platform.sources.cli.......:入口函数名称"

@LIghtJUNction
Copy link
Member

不要在根目录丢一个astrbot-cli
这样不会被打包工具打包
入口文件名建议使用前后双下划线main来命名
python执行包将以这个文件作为入口,参考astrbot/clli/main.py

@YukiRa1n
Copy link
Contributor Author

YukiRa1n commented Feb 5, 2026

修改pyproject.toml 把命令行程序入口加在这后面: [project.scripts] astrbot = "astrbot.cli.main:cli" 其实现在已经有一个了 往后面新增 调用时前面就不需要加上python 直接astrbot-cli就好了 建议重命名为astr以免和现有的重名 功能划分: astrbot:拉起daemon进程,以及一些基础功能

astr : 侧重于开发,以及更加复杂的功能,控制daemon进程等

我建议在后面加上 astr = "astrbot.core.platform.sources.cli.......:入口函数名称"

谢谢建议,我已修改,可以看一下我新的pr描述。
然后入口文件名放在了AstrBot\astrbot\cli\client_main_.py
同时完善了cli指令

@LIghtJUNction
Copy link
Member

astrbot既然都导入click这个命令行程序库了,你加入的这个也用click吧,就不要再用标准库的写法了,后面肯定还要加上命令补全的,按tab补全

@YukiRa1n
Copy link
Contributor Author

YukiRa1n commented Feb 6, 2026

astrbot既然都导入click这个命令行程序库了,你加入的这个也用click吧,就不要再用标准库的写法了,后面肯定还要加上命令补全的,按tab补全

谢谢建议,已经修改了~

…nes LINES]

            [--level LEVEL] [--pattern PATTERN]
            [message]

AstrBot CLI Client - 与 CLI Platform 通信的客户端工具

positional arguments:
  message               Message to send (if not provided, read from stdin)

options:
  -h, --help            show this help message and exit
  -s SOCKET, --socket SOCKET
                        Unix socket path (default: {temp_dir}/astrbot.sock)
  -t TIMEOUT, --timeout TIMEOUT
                        Timeout in seconds (default: 30.0)
  -j, --json            Output raw JSON response
  --log                 Get recent console logs (instead of sending a message)
  --lines LINES         Number of log lines to return (default: 100, max:
                        1000)
  --level LEVEL         Filter logs by level
                        (DEBUG/INFO/WARNING/ERROR/CRITICAL)
  --pattern PATTERN     Filter logs by pattern (substring match)

使用示例:

  发送消息:
    astr "你好"                    # 发送消息给 AstrBot
    astr "/help"                   # 查看内置帮助
    echo "你好" | astr             # 从标准输入读取

  获取日志:
    astr --log                     # 获取最近 100 行日志
    astr --log --lines 50          # 获取最近 50 行
    astr --log --level ERROR       # 只显示 ERROR 级别
    astr --log --pattern "CLI"     # 只显示包含 "CLI" 的日志
    astr --log --json              # 以 JSON 格式输出日志

  高级选项:
    astr -j "测试"                 # 输出原始 JSON 响应
    astr -t 60 "长时间任务"        # 设置超时时间为 60 秒

连接说明:
  - 自动从 data/.cli_connection 文件检测连接类型(Unix Socket 或 TCP)
  - Token 自动从 data/.cli_token 文件读取
  - 必须在 AstrBot 根目录下运行,或设置 ASTRBOT_ROOT 环境变量
         输出

- 添加中文说明
- 补充详细使用示例(发送消息、获取日志、高级选项)
- 添加连接说明
- / 不是 AstrBot 的命令前缀
- help 等内置命令不需要 / 前缀
- 添加内置命令使用示例说明
- 说明带 / 的消息会发给 LLM 处理
使用 argparse.REMAINDER 模式捕获所有剩余参数,
允许 /plugin ls 这类命令正常工作。
在接收 Socket 数据时,如果多字节字符被截断在缓冲区边界会导致解码失败。
使用 errors="replace" 参数来处理截断的 UTF-8 字符。
- 添加 format_response() 函数处理分段回复和图片占位符
- 图片显示为 [图片] 或 [N张图片] 占位符
- 更新 --help 说明,添加输出说明部分
- 修复 socket_handler.py 中 action 变量未定义的问题
- 修复 _get_logs() 方法返回格式问题
当日志文件不存在时,返回中文提示信息,
说明需要在配置中启用 log_file_enable。
- 添加 noqa 注释抑制必要的警告
- 格式化代码符合 ruff 规范
Git Bash (MSYS2) 会把 /plugin ls 转换为 C:/Program Files/Git/plugin ls
添加 fix_git_bash_path() 函数检测并还原原始命令
1. astrbot/cli/client/__main__.py: argparse → click (LIghtJUNction review)
   - 子命令结构: astr send / astr log
   - DefaultToSend兼容: astr 你好 等价于 astr send 你好
   - RawEpilogGroup保留帮助文本原始格式
   - 支持shell tab补全(Bash/Zsh/Fish)

2. tcp_socket_server.py: get_event_loop() → get_running_loop() (sourcery-ai review)
   - Python 3.10+弃用get_event_loop(),async函数中应使用get_running_loop()
- astr -j "你好" → 自动路由到 astr send -j "你好"
- astr -t 60 "你好" → 自动路由到 astr send -t 60 "你好"
- astr --log → 自动路由到 astr log
- 更新帮助文本,展示新旧两种用法
问题:
- `astr log --level ERROR` 无法筛选到 [ERRO] 日志
- 日志文件使用4字符缩写 [ERRO]/[WARN]/[INFO]
- 但用户输入的是完整名称 ERROR/WARN/INFO

修复:
- 添加 LEVEL_MAP 映射表,将完整名称映射到缩写
  ERROR -> ERRO, WARNING -> WARN, CRITICAL -> CRIT
- 使用正则表达式精确匹配 [级别] 格式
- 避免 ERROR 错误匹配到 ERROR_INFO 等字符串

测试:
- `astr log --level ERROR` ✅ 筛选 [ERRO] 日志
- `astr log --level WARN` ✅ 筛选 [WARN] 日志
- `astr log --level WARNING` ✅ 映射到 [WARN]
- 默认直接读取日志文件,无需 AstrBot 运行
- 添加 --socket 参数,保留通过 Socket 获取日志的功能
- 正则表达式在文件模式下完全支持,无转义问题
- 简化使用,提高可靠性

示例:
  astr log                    # 默认:直接读取文件
  astr log --level ERROR      # 筛选 ERROR 级别
  astr log --pattern "ERRO|WARN" --regex  # 正则匹配
  astr log --socket           # 通过 Socket 获取(需 AstrBot 运行)
- 修复 CLI 适配器插件指令执行后触发 LLM 的问题
- 新增 restart 命令,支持停止并重启 AstrBot 实例
- 添加 --window 选项,Windows 下可在新窗口启动
- Linux/macOS 默认在当前窗口运行(服务器环境)
- Windows 默认在当前窗口,可选 --window 在新窗口启动
- Windows 默认在新窗口启动,使用 --no-window 在当前窗口
- Linux/macOS 始终在当前窗口运行(服务器环境)
- 修改 run 和 restart 命令的行为
- CLIMessageEvent: 用 finalize() 替代 _delayed_response() 延迟机制,
  管道完成后统一返回响应,解决工具调用响应截断问题
- CLIMessageEvent: 添加 send_streaming() 支持,采用收集后一次性发送策略
- SocketClientHandler: 超时从30s增加到120s,处理 finalize 返回 None 的情况
- PipelineScheduler: 管道完成后调用 event.finalize()(鸭子类型)
- CLIAdapter: 添加 get_stats()/unified_webhook() 兼容 CLIConfig 数据类
- PlatformManager: 安全获取平台ID,兼容 dict 和 dataclass 类型
- PlatformRoute: 兼容 CLIConfig 的 webhook_uuid 获取方式
- MessageConverter: 补充 raw_message 字段
- get_astrbot_root: 支持环境变量 ASTRBOT_ROOT 和向上查找 .astrbot 标记
- run: 默认当前窗口运行,--new-window 才开新窗口
- restart: 保持默认新窗口行为不变
- get_astrbot_root() 优先通过 __file__ 定位源码目录,避免匹配到错误的 .astrbot 标记
- get_temp_path() 增加 __file__ 回退,硬编码 /tmp 改为 tempfile.gettempdir() 兼容 Windows
- launch_in_new_window() 用 CREATE_NEW_CONSOLE 替代 Start-Process powershell,修复环境变量传递
- taskkill 加 /T 参数杀进程树,避免子进程残留
- Extract connection.py, output.py from monolithic __main__.py (708→154 lines)
- Add command modules: conv, plugin, provider, debug, interactive, batch
- Add 16 top-level commands covering session mgmt, plugin mgmt, LLM config, debug tools
- Add interactive REPL mode (astr -i) with command history
- Maintain full backward compatibility (astr 你好, astr --log, pipe input)
- Redesign --help to show all commands/subcommands in one view
- Add 97 unit tests (connection, commands, interactive) + 36 E2E tests
将26文件3层嵌套结构合并为7文件扁平结构:
- cli_adapter.py 合并 config_loader/token_manager/session_manager
- cli_event.py 合并 converter/response_builder/image_processor
- socket_server.py 合并 socket_abstract/tcp/unix/factory/platform_detector
- socket_handler.py 合并 handlers/socket_handler/connection_info_writer
- tty_handler.py/file_handler.py 从 handlers/ 提升到根目录
- 删除 interfaces.py/decorators.py 等过度工程化模块
- 修复 flaky 测试 test_scenario_new_user_onboarding
通过 socket action 协议在 CLI 适配器中实现函数工具管理,
支持 astr tool ls/info/call 子命令,无需注册为全局指令。
…n CLI client

使用 PEP 562 __getattr__ 延迟初始化 logger,CLI 客户端不再触发
astrbot/core 全量初始化,同时移除 __main__.py 中冗余的日志抑制代码。
@YukiRa1n
Copy link
Contributor Author

rebase了一下~

通过 action-based socket 协议实现跨会话浏览,支持查看任意平台会话的对话列表和聊天记录。

- 新增 ConversationManager 全局单例访问
- socket_handler 添加 list_sessions/list_session_conversations/get_session_history action
- connection.py 添加对应客户端请求方法
- 新建 session.py 命令组 (ls/convs/history)
- 聊天记录以 You/AI 交替格式显示,图片用 [图片] 占位
删除交互模式相关代码,简化CLI适配器:
- 移除 tty_handler.py 和 interactive.py
- 移除 cli_adapter.py 中的 TTY 模式处理
- 移除客户端的 -i 快捷方式和 interactive 子命令
- 移除相关测试
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:platform The bug / feature is about IM platform adapter, such as QQ, Lark, Telegram, WebChat and so on. size:XXL This PR changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants