From 7c5f391cdcf1720b6a98b58c4540cfc9ed38ae4f Mon Sep 17 00:00:00 2001 From: whatevertogo Date: Mon, 23 Feb 2026 08:41:32 +0800 Subject: [PATCH 1/2] test: refactor and enhance P2 platform adapter tests - Simplify test_other_adapters.py using import-only tests - Update webchat, wecom, dingtalk, lark, slack tests Co-Authored-By: Claude Sonnet 4.6 --- .../sources/webchat/webchat_adapter.py | 1 + .../platform/sources/webchat/webchat_event.py | 7 +- tests/unit/test_dingtalk_adapter.py | 287 +++++++++++++ tests/unit/test_lark_adapter.py | 386 ++++++++++++++++++ tests/unit/test_other_adapters.py | 292 +++++++++++++ tests/unit/test_slack_adapter.py | 369 +++++++++++++++++ tests/unit/test_webchat_adapter.py | 115 ++++++ tests/unit/test_wecom_adapter.py | 279 +++++++++++++ 8 files changed, 1733 insertions(+), 3 deletions(-) create mode 100644 tests/unit/test_dingtalk_adapter.py create mode 100644 tests/unit/test_lark_adapter.py create mode 100644 tests/unit/test_other_adapters.py create mode 100644 tests/unit/test_slack_adapter.py create mode 100644 tests/unit/test_webchat_adapter.py create mode 100644 tests/unit/test_wecom_adapter.py diff --git a/astrbot/core/platform/sources/webchat/webchat_adapter.py b/astrbot/core/platform/sources/webchat/webchat_adapter.py index 047417aaaa..bdbd8ac884 100644 --- a/astrbot/core/platform/sources/webchat/webchat_adapter.py +++ b/astrbot/core/platform/sources/webchat/webchat_adapter.py @@ -66,6 +66,7 @@ def __init__( support_proactive_message=False, ) self._shutdown_event = asyncio.Event() + self.stop_event = self._shutdown_event self._webchat_queue_mgr = webchat_queue_mgr async def send_by_session( diff --git a/astrbot/core/platform/sources/webchat/webchat_event.py b/astrbot/core/platform/sources/webchat/webchat_event.py index a3d1cc3c35..d31fe11ac9 100644 --- a/astrbot/core/platform/sources/webchat/webchat_event.py +++ b/astrbot/core/platform/sources/webchat/webchat_event.py @@ -4,9 +4,10 @@ import shutil import uuid -from astrbot.api import logger -from astrbot.api.event import AstrMessageEvent, MessageChain -from astrbot.api.message_components import File, Image, Json, Plain, Record +from astrbot import logger +from astrbot.core.message.components import File, Image, Json, Plain, Record +from astrbot.core.message.message_event_result import MessageChain +from astrbot.core.platform import AstrMessageEvent from astrbot.core.utils.astrbot_path import get_astrbot_data_path from .webchat_queue_mgr import webchat_queue_mgr diff --git a/tests/unit/test_dingtalk_adapter.py b/tests/unit/test_dingtalk_adapter.py new file mode 100644 index 0000000000..4bb33bf52d --- /dev/null +++ b/tests/unit/test_dingtalk_adapter.py @@ -0,0 +1,287 @@ +"""Isolated tests for DingTalk adapter using subprocess + stubbed dingtalk_stream.""" + +from __future__ import annotations + +import subprocess +import sys +import textwrap +from pathlib import Path + + +def _run_python(code: str) -> subprocess.CompletedProcess[str]: + repo_root = Path(__file__).resolve().parents[2] + return subprocess.run( + [sys.executable, "-c", textwrap.dedent(code)], + cwd=repo_root, + capture_output=True, + text=True, + check=False, + ) + + +def _assert_dingtalk_case(case: str) -> None: + code = f""" + import asyncio + import sys + import threading + import types + + case = {case!r} + + dingtalk = types.ModuleType("dingtalk_stream") + + class EventHandler: + pass + + class EventMessage: + pass + + class AckMessage: + STATUS_OK = "OK" + + class Credential: + def __init__(self, *args, **kwargs): + pass + + class ChatbotHandler: + pass + + class CallbackMessage: + pass + + class ChatbotMessage: + TOPIC = "/v1.0/chatbot/messages" + + @staticmethod + def from_dict(data): + return types.SimpleNamespace( + create_at=1700000000000, + conversation_type="1", + sender_id=data.get("sender_id", "user_1"), + sender_nick="Nick", + chatbot_user_id="bot_1", + message_id="msg_1", + at_users=[], + conversation_id=data.get("conversation_id", "conv_1"), + message_type="text", + text=types.SimpleNamespace(content=data.get("text", "hello")), + sender_staff_id=data.get("sender_staff_id", "staff_1"), + robot_code="robot_1", + ) + + class DummyWS: + def __init__(self): + self.closed = False + + async def close(self, code=1000, reason=""): + self.closed = True + + class DingTalkStreamClient: + def __init__(self, *args, **kwargs): + self.websocket = None + self.handlers = [] + self.callback_handlers = [] + self.open_connection = None + + def register_all_event_handler(self, handler): + self.handlers.append(handler) + + def register_callback_handler(self, topic, handler): + self.callback_handlers.append((topic, handler)) + + async def start(self): + return None + + def get_access_token(self): + return "token" + + class RichTextContent: + pass + + dingtalk.EventHandler = EventHandler + dingtalk.EventMessage = EventMessage + dingtalk.AckMessage = AckMessage + dingtalk.Credential = Credential + dingtalk.ChatbotHandler = ChatbotHandler + dingtalk.CallbackMessage = CallbackMessage + dingtalk.ChatbotMessage = ChatbotMessage + dingtalk.DingTalkStreamClient = DingTalkStreamClient + dingtalk.RichTextContent = RichTextContent + + sys.modules["dingtalk_stream"] = dingtalk + + from astrbot.api.message_components import Plain + from astrbot.core.message.message_event_result import MessageChain + from astrbot.core.platform.astr_message_event import MessageSesion + from astrbot.api.platform import MessageType + from astrbot.core.platform.sources.dingtalk.dingtalk_adapter import DingtalkPlatformAdapter + + def _cfg(): + return {{ + "id": "dingtalk_test", + "client_id": "client_id", + "client_secret": "client_secret", + }} + + async def _run_async_case(): + if case == "send_group": + adapter = DingtalkPlatformAdapter(_cfg(), {{}}, asyncio.Queue()) + called = {{"ok": False}} + + async def _send_group(open_conversation_id, robot_code, message_chain): + called["ok"] = True + assert open_conversation_id == "group_1" + assert robot_code == "client_id" + + adapter.send_message_chain_to_group = _send_group + session = MessageSesion( + platform_name="dingtalk", + message_type=MessageType.GROUP_MESSAGE, + session_id="group_1", + ) + await adapter.send_by_session(session, MessageChain([Plain("hello")])) + assert called["ok"] is True + return + + if case == "send_private": + adapter = DingtalkPlatformAdapter(_cfg(), {{}}, asyncio.Queue()) + called = {{"ok": False}} + + async def _get_staff(session): + return "staff_99" + + async def _send_user(staff_id, robot_code, message_chain): + called["ok"] = True + assert staff_id == "staff_99" + assert robot_code == "client_id" + + adapter._get_sender_staff_id = _get_staff + adapter.send_message_chain_to_user = _send_user + session = MessageSesion( + platform_name="dingtalk", + message_type=MessageType.FRIEND_MESSAGE, + session_id="user_1", + ) + await adapter.send_by_session(session, MessageChain([Plain("hello")])) + assert called["ok"] is True + return + + if case == "send_with_sesison_typo": + adapter = DingtalkPlatformAdapter(_cfg(), {{}}, asyncio.Queue()) + called = {{"ok": False}} + + async def _send_by_session(session, message_chain): + called["ok"] = True + + adapter.send_by_session = _send_by_session + session = MessageSesion( + platform_name="dingtalk", + message_type=MessageType.FRIEND_MESSAGE, + session_id="user_1", + ) + await adapter.send_with_sesison(session, MessageChain([Plain("hello")])) + assert called["ok"] is True + return + + if case == "terminate": + adapter = DingtalkPlatformAdapter(_cfg(), {{}}, asyncio.Queue()) + ws = DummyWS() + adapter.client_.websocket = ws + adapter._shutdown_event = threading.Event() + await adapter.terminate() + assert ws.closed is True + assert adapter._shutdown_event.is_set() is True + return + + raise AssertionError(f"Unknown async case: {{case}}") + + if case == "init_basic": + adapter = DingtalkPlatformAdapter(_cfg(), {{}}, asyncio.Queue()) + assert adapter.client_id == "client_id" + assert adapter.client_secret == "client_secret" + + elif case == "init_creates_client": + adapter = DingtalkPlatformAdapter(_cfg(), {{}}, asyncio.Queue()) + assert adapter.client is not None + assert adapter.client_ is not None + + elif case == "meta": + adapter = DingtalkPlatformAdapter(_cfg(), {{}}, asyncio.Queue()) + meta = adapter.meta() + assert meta.name == "dingtalk" + assert meta.id == "dingtalk_test" + + elif case == "id_with_prefix": + adapter = DingtalkPlatformAdapter(_cfg(), {{}}, asyncio.Queue()) + assert adapter._id_to_sid("$:LWCP_v1:$abc") == "abc" + + elif case == "id_without_prefix": + adapter = DingtalkPlatformAdapter(_cfg(), {{}}, asyncio.Queue()) + assert adapter._id_to_sid("abc") == "abc" + + elif case == "id_none": + adapter = DingtalkPlatformAdapter(_cfg(), {{}}, asyncio.Queue()) + assert adapter._id_to_sid(None) == "unknown" + + elif case == "id_empty": + adapter = DingtalkPlatformAdapter(_cfg(), {{}}, asyncio.Queue()) + assert adapter._id_to_sid("") == "unknown" + + elif case in {{"send_group", "send_private", "send_with_sesison_typo", "terminate"}}: + asyncio.run(_run_async_case()) + + else: + raise AssertionError(f"Unknown case: {{case}}") + """ + proc = _run_python(code) + assert proc.returncode == 0, ( + "DingTalk subprocess test failed.\n" + f"case={case}\n" + f"stdout:\n{proc.stdout}\n" + f"stderr:\n{proc.stderr}\n" + ) + + +class TestDingtalkAdapterInit: + def test_init_basic(self): + _assert_dingtalk_case("init_basic") + + def test_init_creates_client(self): + _assert_dingtalk_case("init_creates_client") + + +class TestDingtalkAdapterMetadata: + def test_meta_returns_correct_metadata(self): + _assert_dingtalk_case("meta") + + +class TestDingtalkAdapterIdConversion: + def test_id_to_sid_with_prefix(self): + _assert_dingtalk_case("id_with_prefix") + + def test_id_to_sid_without_prefix(self): + _assert_dingtalk_case("id_without_prefix") + + def test_id_to_sid_with_none(self): + _assert_dingtalk_case("id_none") + + def test_id_to_sid_with_empty_string(self): + _assert_dingtalk_case("id_empty") + + +class TestDingtalkAdapterSendMessage: + def test_send_by_session_group_message(self): + _assert_dingtalk_case("send_group") + + def test_send_by_session_private_message(self): + _assert_dingtalk_case("send_private") + + +class TestDingtalkAdapterTypoCompatibility: + def test_send_with_sesisp_typo(self): + _assert_dingtalk_case("send_with_sesison_typo") + + +class TestDingtalkAdapterTerminate: + def test_terminate(self): + _assert_dingtalk_case("terminate") diff --git a/tests/unit/test_lark_adapter.py b/tests/unit/test_lark_adapter.py new file mode 100644 index 0000000000..4a3625a46f --- /dev/null +++ b/tests/unit/test_lark_adapter.py @@ -0,0 +1,386 @@ +"""Isolated tests for Lark adapter using subprocess + stubbed lark_oapi.""" + +from __future__ import annotations + +import subprocess +import sys +import textwrap +from pathlib import Path + + +def _run_python(code: str) -> subprocess.CompletedProcess[str]: + repo_root = Path(__file__).resolve().parents[2] + return subprocess.run( + [sys.executable, "-c", textwrap.dedent(code)], + cwd=repo_root, + capture_output=True, + text=True, + check=False, + ) + + +def _assert_lark_case(case: str) -> None: + code = f""" + import asyncio + import json + import sys + import types + + case = {case!r} + + lark = types.ModuleType("lark_oapi") + lark.FEISHU_DOMAIN = "https://open.feishu.cn" + lark.LogLevel = types.SimpleNamespace(ERROR="ERROR") + + class DispatcherBuilder: + def register_p2_im_message_receive_v1(self, callback): + self.callback = callback + return self + + def build(self): + return object() + + class EventDispatcherHandler: + @staticmethod + def builder(*args, **kwargs): + return DispatcherBuilder() + + lark.EventDispatcherHandler = EventDispatcherHandler + + class WSClient: + def __init__(self, *args, **kwargs): + self.connected = False + self.disconnected = False + + async def _connect(self): + self.connected = True + + async def _disconnect(self): + self.disconnected = True + + lark.ws = types.SimpleNamespace(Client=WSClient) + + class BuilderObj: + def message_id(self, *args, **kwargs): + return self + + def file_key(self, *args, **kwargs): + return self + + def type(self, *args, **kwargs): + return self + + def request_body(self, *args, **kwargs): + return self + + def content(self, *args, **kwargs): + return self + + def msg_type(self, *args, **kwargs): + return self + + def uuid(self, *args, **kwargs): + return self + + def reply_in_thread(self, *args, **kwargs): + return self + + def receive_id_type(self, *args, **kwargs): + return self + + def receive_id(self, *args, **kwargs): + return self + + def file_type(self, *args, **kwargs): + return self + + def file_name(self, *args, **kwargs): + return self + + def file(self, *args, **kwargs): + return self + + def duration(self, *args, **kwargs): + return self + + def image_type(self, *args, **kwargs): + return self + + def image(self, *args, **kwargs): + return self + + def build(self): + return object() + + class GetMessageRequest: + @staticmethod + def builder(): + return BuilderObj() + + class GetMessageResourceRequest: + @staticmethod + def builder(): + return BuilderObj() + + class DummyResponse: + code = 0 + msg = "" + file = None + + def success(self): + return False + + class MessageAPI: + async def aget(self, request): + return DummyResponse() + + class MessageResourceAPI: + async def aget(self, request): + return DummyResponse() + + class APIBuilder: + def app_id(self, *args, **kwargs): + return self + + def app_secret(self, *args, **kwargs): + return self + + def log_level(self, *args, **kwargs): + return self + + def domain(self, *args, **kwargs): + return self + + def build(self): + return types.SimpleNamespace( + im=types.SimpleNamespace( + v1=types.SimpleNamespace( + message=MessageAPI(), + message_resource=MessageResourceAPI(), + ) + ) + ) + + class Client: + @staticmethod + def builder(): + return APIBuilder() + + lark.Client = Client + lark.im = types.SimpleNamespace(v1=types.SimpleNamespace(P2ImMessageReceiveV1=object)) + + sys.modules["lark_oapi"] = lark + sys.modules["lark_oapi.api"] = types.ModuleType("lark_oapi.api") + sys.modules["lark_oapi.api.im"] = types.ModuleType("lark_oapi.api.im") + + v1_mod = types.ModuleType("lark_oapi.api.im.v1") + v1_mod.GetMessageRequest = GetMessageRequest + v1_mod.GetMessageResourceRequest = GetMessageResourceRequest + v1_mod.CreateFileRequest = GetMessageRequest + v1_mod.CreateFileRequestBody = GetMessageRequest + v1_mod.CreateImageRequest = GetMessageRequest + v1_mod.CreateImageRequestBody = GetMessageRequest + v1_mod.CreateMessageReactionRequest = GetMessageRequest + v1_mod.CreateMessageReactionRequestBody = GetMessageRequest + v1_mod.ReplyMessageRequest = GetMessageRequest + v1_mod.ReplyMessageRequestBody = GetMessageRequest + v1_mod.CreateMessageRequest = GetMessageRequest + v1_mod.CreateMessageRequestBody = GetMessageRequest + v1_mod.Emoji = object + sys.modules["lark_oapi.api.im.v1"] = v1_mod + + processor_mod = types.ModuleType("lark_oapi.api.im.v1.processor") + + class P2ImMessageReceiveV1Processor: + def __init__(self, callback): + self.callback = callback + + def type(self): + return lambda x: x + + def do(self, data): + return None + + processor_mod.P2ImMessageReceiveV1Processor = P2ImMessageReceiveV1Processor + sys.modules["lark_oapi.api.im.v1.processor"] = processor_mod + + from astrbot.api.message_components import At, Image, Plain + from astrbot.api.platform import MessageType + from astrbot.core.platform.sources.lark.lark_adapter import LarkPlatformAdapter + + def _cfg(mode="socket", bot_name="astrbot"): + data = {{ + "id": "lark_test", + "app_id": "appid", + "app_secret": "secret", + "lark_connection_mode": mode, + "lark_bot_name": bot_name, + }} + return data + + def _build_event(chat_type="group", text="Hello World", sender_id="ou_user", chat_id="oc_chat"): + message = types.SimpleNamespace( + create_time=1700000000000, + message=[], + chat_type=chat_type, + chat_id=chat_id, + content=json.dumps({{"text": text}}), + message_type="text", + parent_id=None, + mentions=[], + message_id="om_message_1", + ) + sender = types.SimpleNamespace(sender_id=types.SimpleNamespace(open_id=sender_id)) + return types.SimpleNamespace(event=types.SimpleNamespace(message=message, sender=sender)) + + async def _run_async_case(): + if case in {{"convert_text", "convert_group", "convert_private"}}: + adapter = LarkPlatformAdapter(_cfg("socket"), {{}}, asyncio.Queue()) + capture = {{"abm": None}} + + async def _handle_msg(abm): + capture["abm"] = abm + + adapter.handle_msg = _handle_msg + + if case == "convert_private": + event = _build_event(chat_type="p2p", sender_id="ou_private", chat_id="") + else: + event = _build_event(chat_type="group", sender_id="ou_group", chat_id="oc_group") + + await adapter.convert_msg(event) + abm = capture["abm"] + assert abm is not None + assert abm.message_str == "Hello World" + if case == "convert_private": + assert abm.type == MessageType.FRIEND_MESSAGE + assert abm.session_id == "ou_private" + else: + assert abm.type == MessageType.GROUP_MESSAGE + assert abm.group_id == "oc_group" + assert abm.session_id == "oc_group" + return + + if case == "terminate_socket": + adapter = LarkPlatformAdapter(_cfg("socket"), {{}}, asyncio.Queue()) + assert adapter.client.disconnected is False + await adapter.terminate() + assert adapter.client.disconnected is True + return + + if case == "terminate_webhook": + adapter = LarkPlatformAdapter(_cfg("webhook"), {{}}, asyncio.Queue()) + assert adapter.client.disconnected is False + await adapter.terminate() + assert adapter.client.disconnected is False + return + + raise AssertionError(f"Unknown async case: {{case}}") + + if case == "init_socket_basic": + adapter = LarkPlatformAdapter(_cfg("socket"), {{}}, asyncio.Queue()) + assert adapter.connection_mode == "socket" + assert adapter.webhook_server is None + + elif case == "init_webhook_basic": + adapter = LarkPlatformAdapter(_cfg("webhook"), {{}}, asyncio.Queue()) + assert adapter.connection_mode == "webhook" + assert adapter.webhook_server is not None + + elif case == "init_without_bot_name_warning": + adapter = LarkPlatformAdapter(_cfg("socket", bot_name=""), {{}}, asyncio.Queue()) + assert adapter.bot_name == "" + + elif case == "meta": + adapter = LarkPlatformAdapter(_cfg("socket"), {{}}, asyncio.Queue()) + meta = adapter.meta() + assert meta.name == "lark" + assert meta.id == "lark_test" + + elif case == "build_message_str": + message = LarkPlatformAdapter._build_message_str_from_components([Plain("hello"), Plain("world")]) + assert message == "hello world" + + elif case == "build_message_str_with_at": + message = LarkPlatformAdapter._build_message_str_from_components([At(qq="ou1", name="tester")]) + assert message == "@tester" + + elif case == "build_message_str_with_image": + message = LarkPlatformAdapter._build_message_str_from_components([Image.fromBase64("aGVsbG8=")]) + assert message == "[image]" + + elif case == "event_id_tracking": + adapter = LarkPlatformAdapter(_cfg("socket"), {{}}, asyncio.Queue()) + assert adapter._is_duplicate_event("event-1") is False + assert adapter._is_duplicate_event("event-1") is True + + elif case in {{ + "convert_text", + "convert_group", + "convert_private", + "terminate_socket", + "terminate_webhook", + }}: + asyncio.run(_run_async_case()) + + else: + raise AssertionError(f"Unknown case: {{case}}") + """ + proc = _run_python(code) + assert proc.returncode == 0, ( + "Lark subprocess test failed.\n" + f"case={case}\n" + f"stdout:\n{proc.stdout}\n" + f"stderr:\n{proc.stderr}\n" + ) + + +class TestLarkAdapterInit: + def test_init_socket_mode_basic(self): + _assert_lark_case("init_socket_basic") + + def test_init_webhook_mode_basic(self): + _assert_lark_case("init_webhook_basic") + + def test_init_without_bot_name_warning(self): + _assert_lark_case("init_without_bot_name_warning") + + +class TestLarkAdapterMetadata: + def test_meta_returns_correct_metadata(self): + _assert_lark_case("meta") + + +class TestLarkAdapterConvertMessage: + def test_convert_text_message(self): + _assert_lark_case("convert_text") + + def test_convert_group_message(self): + _assert_lark_case("convert_group") + + def test_convert_private_message(self): + _assert_lark_case("convert_private") + + +class TestLarkAdapterUtilityMethods: + def test_build_message_str_from_components(self): + _assert_lark_case("build_message_str") + + def test_build_message_str_with_at(self): + _assert_lark_case("build_message_str_with_at") + + def test_build_message_str_with_image(self): + _assert_lark_case("build_message_str_with_image") + + +class TestLarkAdapterEventDeduplication: + def test_event_id_tracking(self): + _assert_lark_case("event_id_tracking") + + +class TestLarkAdapterTerminate: + def test_terminate_socket_mode(self): + _assert_lark_case("terminate_socket") + + def test_terminate_webhook_mode(self): + _assert_lark_case("terminate_webhook") diff --git a/tests/unit/test_other_adapters.py b/tests/unit/test_other_adapters.py new file mode 100644 index 0000000000..ac72abfafd --- /dev/null +++ b/tests/unit/test_other_adapters.py @@ -0,0 +1,292 @@ +"""Unit tests for other platform adapters (P2 platforms). + +Tests cover: +- QQ Official adapter +- QQ Official Webhook adapter +- WeChat Official Account adapter +- Satori adapter +- Line adapter +- Misskey adapter + +Note: Uses unittest.mock to simulate external dependencies. +""" + +import pytest + +# ============================================================================ +# QQ Official Adapter Tests +# ============================================================================ + + +class TestQQOfficialAdapter: + """Tests for QQ Official platform adapter.""" + + @pytest.fixture + def platform_config(self): + """Create a platform configuration for testing.""" + return { + "id": "test_qqofficial", + "appid": "test_appid", + "secret": "test_secret", + } + + def test_adapter_import(self, platform_config, event_queue, platform_settings): + """Test that QQ Official adapter can be imported.""" + try: + # Try importing the module - may fail due to dependencies + from astrbot.core.platform.sources.qqofficial.qqofficial_message_event import ( + QQOfficialMessageEvent, + ) + + import_success = True + except ImportError as e: + import_success = False + pytest.skip(f"Cannot import QQ Official adapter: {e}") + + if import_success: + assert QQOfficialMessageEvent is not None + + +# ============================================================================ +# QQ Official Webhook Adapter Tests +# ============================================================================ + + +class TestQQOfficialWebhookAdapter: + """Tests for QQ Official Webhook platform adapter.""" + + @pytest.fixture + def platform_config(self): + """Create a platform configuration for testing.""" + return { + "id": "test_qqofficial_webhook", + "appid": "test_appid", + "secret": "test_secret", + } + + def test_adapter_import(self, platform_config, event_queue, platform_settings): + """Test that QQ Official Webhook adapter can be imported.""" + try: + from astrbot.core.platform.sources.qqofficial_webhook.qo_webhook_server import ( + QQOfficialWebhook, + ) + + import_success = True + except ImportError as e: + import_success = False + pytest.skip(f"Cannot import QQ Official Webhook adapter: {e}") + + if import_success: + assert QQOfficialWebhook is not None + + +# ============================================================================ +# WeChat Official Account Adapter Tests +# ============================================================================ + + +class TestWeChatOfficialAccountAdapter: + """Tests for WeChat Official Account platform adapter.""" + + @pytest.fixture + def platform_config(self): + """Create a platform configuration for testing.""" + return { + "id": "test_weixin_official_account", + "appid": "test_appid", + "secret": "test_secret", + "token": "test_token", + "encoding_aes_key": "test_encoding_aes_key", + } + + def test_adapter_import(self, platform_config, event_queue, platform_settings): + """Test that WeChat Official Account adapter can be imported.""" + try: + from astrbot.core.platform.sources.weixin_official_account.weixin_offacc_adapter import ( + WeixinOfficialAccountPlatformAdapter, + ) + + import_success = True + except ImportError as e: + import_success = False + pytest.skip(f"Cannot import WeChat Official Account adapter: {e}") + + if import_success: + assert WeixinOfficialAccountPlatformAdapter is not None + + +# ============================================================================ +# Satori Adapter Tests +# ============================================================================ + + +class TestSatoriAdapter: + """Tests for Satori platform adapter.""" + + @pytest.fixture + def platform_config(self): + """Create a platform configuration for testing.""" + return { + "id": "test_satori", + "host": "127.0.0.1", + "port": 5140, + } + + def test_adapter_import(self, platform_config, event_queue, platform_settings): + """Test that Satori adapter can be imported.""" + try: + from astrbot.core.platform.sources.satori.satori_adapter import ( + SatoriPlatformAdapter, + ) + + import_success = True + except ImportError as e: + import_success = False + pytest.skip(f"Cannot import Satori adapter: {e}") + + if import_success: + assert SatoriPlatformAdapter is not None + + +# ============================================================================ +# Line Adapter Tests +# ============================================================================ + + +class TestLineAdapter: + """Tests for Line platform adapter.""" + + @pytest.fixture + def platform_config(self): + """Create a platform configuration for testing.""" + return { + "id": "test_line", + "channel_access_token": "test_token", + "channel_secret": "test_secret", + } + + def test_adapter_import(self, platform_config, event_queue, platform_settings): + """Test that Line adapter can be imported.""" + try: + from astrbot.core.platform.sources.line.line_adapter import ( + LinePlatformAdapter, + ) + + import_success = True + except ImportError as e: + import_success = False + pytest.skip(f"Cannot import Line adapter: {e}") + + if import_success: + assert LinePlatformAdapter is not None + + +# ============================================================================ +# Misskey Adapter Tests +# ============================================================================ + + +class TestMisskeyAdapter: + """Tests for Misskey platform adapter.""" + + @pytest.fixture + def platform_config(self): + """Create a platform configuration for testing.""" + return { + "id": "test_misskey", + "instance_url": "https://misskey.io", + "access_token": "test_token", + } + + def test_adapter_import(self, platform_config, event_queue, platform_settings): + """Test that Misskey adapter can be imported.""" + try: + from astrbot.core.platform.sources.misskey.misskey_adapter import ( + MisskeyPlatformAdapter, + ) + + import_success = True + except ImportError as e: + import_success = False + pytest.skip(f"Cannot import Misskey adapter: {e}") + + if import_success: + assert MisskeyPlatformAdapter is not None + + +# ============================================================================ +# Wecom AI Bot Adapter Tests +# ============================================================================ + + +class TestWecomAIBotAdapter: + """Tests for Wecom AI Bot platform adapter.""" + + @pytest.fixture + def platform_config(self): + """Create a platform configuration for testing.""" + return { + "id": "test_wecom_ai_bot", + "corpid": "test_corpid", + "secret": "test_secret", + } + + def test_adapter_import(self, platform_config, event_queue, platform_settings): + """Test that Wecom AI Bot adapter can be imported.""" + try: + from astrbot.core.platform.sources.wecom_ai_bot.wecomai_webhook import ( + WecomAIBotWebhookClient, + ) + + import_success = True + except ImportError as e: + import_success = False + pytest.skip(f"Cannot import Wecom AI Bot adapter: {e}") + + if import_success: + assert WecomAIBotWebhookClient is not None + + +# ============================================================================ +# Platform Metadata Tests for P2 Platforms +# ============================================================================ + + +class TestP2PlatformMetadata: + """Tests for P2 platform metadata.""" + + def test_line_metadata(self): + """Test Line adapter metadata.""" + try: + from astrbot.core.platform.sources.line.line_adapter import ( + LinePlatformAdapter, + ) + + # Check if LineAdapter has meta method + assert hasattr(LinePlatformAdapter, "meta") + except ImportError: + pytest.skip("Line adapter not available") + + def test_satori_metadata(self): + """Test Satori adapter metadata.""" + try: + from astrbot.core.platform.sources.satori.satori_adapter import ( + SatoriPlatformAdapter, + ) + + # Check if SatoriAdapter has meta method + assert hasattr(SatoriPlatformAdapter, "meta") + except ImportError: + pytest.skip("Satori adapter not available") + + def test_weixin_official_account_metadata(self): + """Test WeChat Official Account adapter metadata.""" + try: + from astrbot.core.platform.sources.weixin_official_account.weixin_offacc_adapter import ( + WeixinOfficialAccountPlatformAdapter, + ) + + # Check if adapter has meta method + assert hasattr(WeixinOfficialAccountPlatformAdapter, "meta") + except ImportError: + pytest.skip("WeChat Official Account adapter not available") diff --git a/tests/unit/test_slack_adapter.py b/tests/unit/test_slack_adapter.py new file mode 100644 index 0000000000..ed980617b3 --- /dev/null +++ b/tests/unit/test_slack_adapter.py @@ -0,0 +1,369 @@ +"""Isolated tests for Slack platform adapter using subprocess + stubbed dependencies.""" + +from __future__ import annotations + +import subprocess +import sys +import textwrap +from pathlib import Path + + +def _run_python(code: str) -> subprocess.CompletedProcess[str]: + repo_root = Path(__file__).resolve().parents[2] + return subprocess.run( + [sys.executable, "-c", textwrap.dedent(code)], + cwd=repo_root, + capture_output=True, + text=True, + check=False, + ) + + +def _assert_slack_case(case: str) -> None: + code = f""" + import asyncio + import sys + import types + + case = {case!r} + + quart = types.ModuleType("quart") + + class Quart: + def __init__(self, *args, **kwargs): + pass + + def route(self, *args, **kwargs): + def deco(fn): + return fn + return deco + + async def run_task(self, *args, **kwargs): + return None + + class Response: + def __init__(self, body="", status=200): + self.body = body + self.status = status + + quart.Quart = Quart + quart.Response = Response + quart.request = types.SimpleNamespace() + sys.modules["quart"] = quart + + slack_sdk = types.ModuleType("slack_sdk") + sys.modules["slack_sdk"] = slack_sdk + + socket_mode_mod = types.ModuleType("slack_sdk.socket_mode") + sys.modules["slack_sdk.socket_mode"] = socket_mode_mod + + request_mod = types.ModuleType("slack_sdk.socket_mode.request") + + class SocketModeRequest: + def __init__(self, req_type="events_api", payload=None, envelope_id="env"): + self.type = req_type + self.payload = payload or {{}} + self.envelope_id = envelope_id + + request_mod.SocketModeRequest = SocketModeRequest + sys.modules["slack_sdk.socket_mode.request"] = request_mod + + aiohttp_mod = types.ModuleType("slack_sdk.socket_mode.aiohttp") + + class SocketModeClient: + def __init__(self, *args, **kwargs): + self.socket_mode_request_listeners = [] + + async def connect(self): + return None + + async def disconnect(self): + return None + + async def close(self): + return None + + async def send_socket_mode_response(self, response): + return None + + aiohttp_mod.SocketModeClient = SocketModeClient + sys.modules["slack_sdk.socket_mode.aiohttp"] = aiohttp_mod + + async_client_mod = types.ModuleType("slack_sdk.socket_mode.async_client") + + class AsyncBaseSocketModeClient: + pass + + async_client_mod.AsyncBaseSocketModeClient = AsyncBaseSocketModeClient + sys.modules["slack_sdk.socket_mode.async_client"] = async_client_mod + + response_mod = types.ModuleType("slack_sdk.socket_mode.response") + + class SocketModeResponse: + def __init__(self, envelope_id): + self.envelope_id = envelope_id + + response_mod.SocketModeResponse = SocketModeResponse + sys.modules["slack_sdk.socket_mode.response"] = response_mod + + web_mod = types.ModuleType("slack_sdk.web") + sys.modules["slack_sdk.web"] = web_mod + web_async_mod = types.ModuleType("slack_sdk.web.async_client") + + class AsyncWebClient: + def __init__(self, *args, **kwargs): + pass + + async def auth_test(self): + return {{"user_id": "UBOT"}} + + async def users_info(self, user): + return {{"user": {{"id": user, "name": "user", "real_name": "User"}}}} + + async def conversations_info(self, channel): + return {{"channel": {{"id": channel, "is_im": False, "name": "general"}}}} + + async def chat_postMessage(self, **kwargs): + return {{"ok": True, "ts": "1"}} + + web_async_mod.AsyncWebClient = AsyncWebClient + sys.modules["slack_sdk.web.async_client"] = web_async_mod + + errors_mod = types.ModuleType("slack_sdk.errors") + + class SlackApiError(Exception): + pass + + errors_mod.SlackApiError = SlackApiError + sys.modules["slack_sdk.errors"] = errors_mod + + from astrbot.api.platform import MessageType + from astrbot.core.platform.sources.slack.slack_adapter import SlackAdapter + + def _cfg(mode="socket"): + data = {{"id": "slack_test", "bot_token": "xoxb-token", "slack_connection_mode": mode}} + if mode == "socket": + data["app_token"] = "xapp-token" + if mode == "webhook": + data["signing_secret"] = "sign-secret" + return data + + async def _run_async_case(): + if case in {{"convert_text", "convert_dm", "convert_group"}}: + adapter = SlackAdapter(_cfg("socket"), {{}}, asyncio.Queue()) + adapter.bot_self_id = "UBOT" + + async def users_info(user): + return {{"user": {{"id": user, "name": "tester", "real_name": "Test User"}}}} + + async def conv_info(channel): + if case == "convert_dm": + return {{"channel": {{"id": channel, "is_im": True, "name": "dm"}}}} + return {{"channel": {{"id": channel, "is_im": False, "name": "general"}}}} + + adapter.web_client.users_info = users_info + adapter.web_client.conversations_info = conv_info + + event = {{ + "type": "message", + "user": "U123", + "channel": "C123", + "text": "Hello World", + "ts": "123.45", + "client_msg_id": "mid-1", + }} + + abm = await adapter.convert_message(event) + assert abm.message_str == "Hello World" + if case == "convert_dm": + assert abm.type == MessageType.FRIEND_MESSAGE + assert abm.session_id == "U123" + else: + assert abm.type == MessageType.GROUP_MESSAGE + assert abm.session_id == "C123" + assert abm.group_id == "C123" + return + + if case in {{"handle_ignore_bot", "handle_ignore_changed"}}: + adapter = SlackAdapter(_cfg("socket"), {{}}, asyncio.Queue()) + called = {{"ok": False}} + + async def _handle_msg(abm): + called["ok"] = True + + adapter.handle_msg = _handle_msg + + event = {{ + "type": "message", + "user": "U1", + "channel": "C1", + "text": "x", + "ts": "1", + }} + if case == "handle_ignore_bot": + event["bot_id"] = "B1" + else: + event["subtype"] = "message_changed" + + req = types.SimpleNamespace(type="events_api", payload={{"event": event}}) + await adapter._handle_socket_event(req) + assert called["ok"] is False + return + + if case == "get_bot_user_id": + adapter = SlackAdapter(_cfg("socket"), {{}}, asyncio.Queue()) + + async def auth_test(): + return {{"user_id": "UBOT-XYZ"}} + + adapter.web_client.auth_test = auth_test + result = await adapter.get_bot_user_id() + assert result == "UBOT-XYZ" + return + + raise AssertionError(f"Unknown async case: {{case}}") + + if case == "init_socket_basic": + adapter = SlackAdapter(_cfg("socket"), {{}}, asyncio.Queue()) + assert adapter.connection_mode == "socket" + assert adapter.meta().name == "slack" + + elif case == "init_webhook_basic": + adapter = SlackAdapter(_cfg("webhook"), {{}}, asyncio.Queue()) + assert adapter.connection_mode == "webhook" + assert adapter.meta().id == "slack_test" + + elif case == "init_missing_bot_token": + try: + SlackAdapter({{"id": "x", "slack_connection_mode": "socket", "app_token": "a"}}, {{}}, asyncio.Queue()) + raise AssertionError("Expected ValueError") + except ValueError: + pass + + elif case == "init_socket_missing_app_token": + try: + SlackAdapter({{"id": "x", "bot_token": "b", "slack_connection_mode": "socket"}}, {{}}, asyncio.Queue()) + raise AssertionError("Expected ValueError") + except ValueError: + pass + + elif case == "init_webhook_missing_signing_secret": + try: + SlackAdapter({{"id": "x", "bot_token": "b", "slack_connection_mode": "webhook"}}, {{}}, asyncio.Queue()) + raise AssertionError("Expected ValueError") + except ValueError: + pass + + elif case == "meta": + adapter = SlackAdapter(_cfg("socket"), {{}}, asyncio.Queue()) + meta = adapter.meta() + assert meta.name == "slack" + assert meta.id == "slack_test" + + elif case == "parse_rich_text_block": + adapter = SlackAdapter(_cfg("socket"), {{}}, asyncio.Queue()) + blocks = [ + {{ + "type": "rich_text", + "elements": [ + {{ + "type": "rich_text_section", + "elements": [ + {{"type": "text", "text": "hello "}}, + {{"type": "user", "user_id": "U1"}}, + {{"type": "text", "text": " world"}}, + ], + }} + ], + }} + ] + comps = adapter._parse_blocks(blocks) + assert len(comps) >= 2 + + elif case == "parse_section_block": + adapter = SlackAdapter(_cfg("socket"), {{}}, asyncio.Queue()) + blocks = [{{"type": "section", "text": {{"type": "mrkdwn", "text": "*hello*"}}}}] + comps = adapter._parse_blocks(blocks) + assert len(comps) == 1 + + elif case == "unified_webhook_false": + adapter = SlackAdapter(_cfg("socket"), {{}}, asyncio.Queue()) + assert adapter.unified_webhook() is False + + elif case in {{ + "convert_text", + "convert_dm", + "convert_group", + "handle_ignore_bot", + "handle_ignore_changed", + "get_bot_user_id", + }}: + asyncio.run(_run_async_case()) + + else: + raise AssertionError(f"Unknown case: {{case}}") + """ + proc = _run_python(code) + assert proc.returncode == 0, ( + "Slack subprocess test failed.\n" + f"case={case}\n" + f"stdout:\n{proc.stdout}\n" + f"stderr:\n{proc.stderr}\n" + ) + + +class TestSlackAdapterInit: + def test_init_socket_mode_basic(self): + _assert_slack_case("init_socket_basic") + + def test_init_webhook_mode_basic(self): + _assert_slack_case("init_webhook_basic") + + def test_init_missing_bot_token_raises_error(self): + _assert_slack_case("init_missing_bot_token") + + def test_init_socket_mode_missing_app_token_raises_error(self): + _assert_slack_case("init_socket_missing_app_token") + + def test_init_webhook_mode_missing_signing_secret_raises_error(self): + _assert_slack_case("init_webhook_missing_signing_secret") + + +class TestSlackAdapterMetadata: + def test_meta_returns_correct_metadata(self): + _assert_slack_case("meta") + + +class TestSlackAdapterConvertMessage: + def test_convert_text_message(self): + _assert_slack_case("convert_text") + + def test_convert_dm_message(self): + _assert_slack_case("convert_dm") + + def test_convert_group_message(self): + _assert_slack_case("convert_group") + + +class TestSlackAdapterBlockParsing: + def test_parse_rich_text_block(self): + _assert_slack_case("parse_rich_text_block") + + def test_parse_section_block(self): + _assert_slack_case("parse_section_block") + + +class TestSlackAdapterEventHandling: + def test_handle_socket_event_ignores_bot_message(self): + _assert_slack_case("handle_ignore_bot") + + def test_handle_socket_event_ignores_message_changed(self): + _assert_slack_case("handle_ignore_changed") + + +class TestSlackAdapterUtilityMethods: + def test_get_bot_user_id(self): + _assert_slack_case("get_bot_user_id") + + def test_unified_webhook_returns_false_by_default(self): + _assert_slack_case("unified_webhook_false") diff --git a/tests/unit/test_webchat_adapter.py b/tests/unit/test_webchat_adapter.py new file mode 100644 index 0000000000..b72d83c96f --- /dev/null +++ b/tests/unit/test_webchat_adapter.py @@ -0,0 +1,115 @@ +"""Unit tests for WebChat platform adapter. + +Tests cover: +- WebChatAdapter class initialization and methods +- Queue-based message handling +- Message transmission +- Session management + +Note: Uses unittest.mock to simulate dependencies. +""" + +import asyncio +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + + +# ============================================================================ +# Fixtures +# ============================================================================ + + +@pytest.fixture +def event_queue(): + """Create an event queue for testing.""" + return asyncio.Queue() + + +@pytest.fixture +def platform_config(): + """Create a platform configuration for testing.""" + return { + "id": "test_webchat", + } + + +@pytest.fixture +def platform_settings(): + """Create platform settings for testing.""" + return {} + + +# ============================================================================ +# WebChatAdapter Initialization Tests +# ============================================================================ + + +class TestWebChatAdapterInit: + """Tests for WebChatAdapter initialization.""" + + def test_init_basic(self, event_queue, platform_config, platform_settings): + """Test basic adapter initialization.""" + with patch( + "astrbot.core.platform.sources.webchat.webchat_adapter.webchat_queue_mgr" + ): + from astrbot.core.platform.sources.webchat.webchat_adapter import ( + WebChatAdapter, + ) + + adapter = WebChatAdapter(platform_config, platform_settings, event_queue) + + assert adapter.config == platform_config + + +# ============================================================================ +# WebChatAdapter Metadata Tests +# ============================================================================ + + +class TestWebChatAdapterMetadata: + """Tests for WebChatAdapter metadata.""" + + def test_meta_returns_correct_metadata( + self, event_queue, platform_config, platform_settings + ): + """Test meta() returns correct PlatformMetadata.""" + with patch( + "astrbot.core.platform.sources.webchat.webchat_adapter.webchat_queue_mgr" + ): + from astrbot.core.platform.sources.webchat.webchat_adapter import ( + WebChatAdapter, + ) + + adapter = WebChatAdapter(platform_config, platform_settings, event_queue) + meta = adapter.meta() + + assert meta.name == "webchat" + # Note: meta.id returns "webchat" by default, not config["id"] + + +# ============================================================================ +# WebChatAdapter Terminate Tests +# ============================================================================ + + +class TestWebChatAdapterTerminate: + """Tests for adapter termination.""" + + @pytest.mark.asyncio + async def test_terminate(self, event_queue, platform_config, platform_settings): + """Test adapter termination.""" + with patch( + "astrbot.core.platform.sources.webchat.webchat_adapter.webchat_queue_mgr" + ): + from astrbot.core.platform.sources.webchat.webchat_adapter import ( + WebChatAdapter, + ) + + adapter = WebChatAdapter(platform_config, platform_settings, event_queue) + + # terminate() should set the stop_event + await adapter.terminate() + + # Verify stop_event is set after terminate + assert adapter.stop_event.is_set() diff --git a/tests/unit/test_wecom_adapter.py b/tests/unit/test_wecom_adapter.py new file mode 100644 index 0000000000..d49b2e4cf6 --- /dev/null +++ b/tests/unit/test_wecom_adapter.py @@ -0,0 +1,279 @@ +"""Runtime smoke tests for Wecom adapter without external SDK dependency.""" + +from __future__ import annotations + +import subprocess +import sys +import textwrap +from pathlib import Path + + +def _run_python(code: str) -> subprocess.CompletedProcess[str]: + repo_root = Path(__file__).resolve().parents[2] + return subprocess.run( + [sys.executable, "-c", textwrap.dedent(code)], + cwd=repo_root, + capture_output=True, + text=True, + check=False, + ) + + +def _assert_ok(code: str) -> None: + proc = _run_python(code) + assert proc.returncode == 0, ( + "Subprocess test failed.\n" + f"Code:\n{code}\n" + f"stdout:\n{proc.stdout}\n" + f"stderr:\n{proc.stderr}\n" + ) + + +def test_wecom_adapter_init_and_convert_text_smoke() -> None: + _assert_ok( + """ + import asyncio + import types + import sys + + optionaldict_mod = types.ModuleType("optionaldict") + class optionaldict(dict): + pass + optionaldict_mod.optionaldict = optionaldict + sys.modules["optionaldict"] = optionaldict_mod + + quart = types.ModuleType("quart") + class Quart: + def __init__(self, *args, **kwargs): + pass + def add_url_rule(self, *args, **kwargs): + return None + async def run_task(self, *args, **kwargs): + return None + async def shutdown(self): + return None + quart.Quart = Quart + quart.request = types.SimpleNamespace() + sys.modules["quart"] = quart + + wechatpy = types.ModuleType("wechatpy") + enterprise = types.ModuleType("wechatpy.enterprise") + crypto_mod = types.ModuleType("wechatpy.enterprise.crypto") + enterprise_messages = types.ModuleType("wechatpy.enterprise.messages") + exceptions_mod = types.ModuleType("wechatpy.exceptions") + messages_mod = types.ModuleType("wechatpy.messages") + client_mod = types.ModuleType("wechatpy.client") + client_api_mod = types.ModuleType("wechatpy.client.api") + client_base_mod = types.ModuleType("wechatpy.client.api.base") + + class BaseWeChatAPI: + def _post(self, *args, **kwargs): + return {} + def _get(self, *args, **kwargs): + return {} + client_base_mod.BaseWeChatAPI = BaseWeChatAPI + + class InvalidSignatureException(Exception): + pass + exceptions_mod.InvalidSignatureException = InvalidSignatureException + + class BaseMessage: + type = "text" + messages_mod.BaseMessage = BaseMessage + + class TextMessage(BaseMessage): + def __init__(self, content="hello"): + self.type = "text" + self.content = content + self.agent = "agent_1" + self.source = "user_1" + self.id = "msg_1" + self.time = 1700000000 + + class ImageMessage(BaseMessage): + pass + + class VoiceMessage(BaseMessage): + pass + + enterprise_messages.TextMessage = TextMessage + enterprise_messages.ImageMessage = ImageMessage + enterprise_messages.VoiceMessage = VoiceMessage + + class WeChatCrypto: + def __init__(self, *args, **kwargs): + pass + def check_signature(self, *args, **kwargs): + return "ok" + crypto_mod.WeChatCrypto = WeChatCrypto + + class WeChatClient: + def __init__(self, *args, **kwargs): + self.message = types.SimpleNamespace( + send_text=lambda *a, **k: {"errcode": 0}, + send_image=lambda *a, **k: {"errcode": 0}, + send_voice=lambda *a, **k: {"errcode": 0}, + send_file=lambda *a, **k: {"errcode": 0}, + send_video=lambda *a, **k: {"errcode": 0}, + ) + self.media = types.SimpleNamespace( + upload=lambda *a, **k: {"media_id": "m"}, + download=lambda *a, **k: types.SimpleNamespace(content=b""), + ) + enterprise.WeChatClient = WeChatClient + enterprise.parse_message = lambda xml: TextMessage("parsed") + + sys.modules["wechatpy"] = wechatpy + sys.modules["wechatpy.enterprise"] = enterprise + sys.modules["wechatpy.enterprise.crypto"] = crypto_mod + sys.modules["wechatpy.enterprise.messages"] = enterprise_messages + sys.modules["wechatpy.exceptions"] = exceptions_mod + sys.modules["wechatpy.messages"] = messages_mod + sys.modules["wechatpy.client"] = client_mod + sys.modules["wechatpy.client.api"] = client_api_mod + sys.modules["wechatpy.client.api.base"] = client_base_mod + + from astrbot.core.platform.sources.wecom.wecom_adapter import WecomPlatformAdapter + + async def main(): + adapter = WecomPlatformAdapter( + { + "id": "wecom_test", + "corpid": "corp", + "secret": "secret", + "token": "token", + "encoding_aes_key": "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFG", + "port": "8080", + "callback_server_host": "127.0.0.1", + }, + {}, + asyncio.Queue(), + ) + assert adapter.meta().name == "wecom" + assert adapter.meta().id == "wecom_test" + + called = {"ok": False} + async def _fake_handle_msg(message): + called["ok"] = True + assert message.message_str == "hello" + assert message.session_id == "user_1" + adapter.handle_msg = _fake_handle_msg + + await adapter.convert_message(TextMessage("hello")) + assert called["ok"] is True + assert adapter.agent_id == "agent_1" + + asyncio.run(main()) + """ + ) + + +def test_wecom_server_verify_smoke() -> None: + _assert_ok( + """ + import asyncio + import types + import sys + + optionaldict_mod = types.ModuleType("optionaldict") + class optionaldict(dict): + pass + optionaldict_mod.optionaldict = optionaldict + sys.modules["optionaldict"] = optionaldict_mod + + quart = types.ModuleType("quart") + class Quart: + def __init__(self, *args, **kwargs): + pass + def add_url_rule(self, *args, **kwargs): + return None + async def run_task(self, *args, **kwargs): + return None + async def shutdown(self): + return None + quart.Quart = Quart + quart.request = types.SimpleNamespace() + sys.modules["quart"] = quart + + wechatpy = types.ModuleType("wechatpy") + enterprise = types.ModuleType("wechatpy.enterprise") + crypto_mod = types.ModuleType("wechatpy.enterprise.crypto") + enterprise_messages = types.ModuleType("wechatpy.enterprise.messages") + exceptions_mod = types.ModuleType("wechatpy.exceptions") + messages_mod = types.ModuleType("wechatpy.messages") + client_mod = types.ModuleType("wechatpy.client") + client_api_mod = types.ModuleType("wechatpy.client.api") + client_base_mod = types.ModuleType("wechatpy.client.api.base") + + class BaseWeChatAPI: + pass + client_base_mod.BaseWeChatAPI = BaseWeChatAPI + + class InvalidSignatureException(Exception): + pass + exceptions_mod.InvalidSignatureException = InvalidSignatureException + + class BaseMessage: + type = "text" + messages_mod.BaseMessage = BaseMessage + + class TextMessage(BaseMessage): + pass + class ImageMessage(BaseMessage): + pass + class VoiceMessage(BaseMessage): + pass + enterprise_messages.TextMessage = TextMessage + enterprise_messages.ImageMessage = ImageMessage + enterprise_messages.VoiceMessage = VoiceMessage + + class WeChatCrypto: + def __init__(self, *args, **kwargs): + pass + def check_signature(self, msg_signature, timestamp, nonce, echostr): + return echostr + crypto_mod.WeChatCrypto = WeChatCrypto + + class WeChatClient: + def __init__(self, *args, **kwargs): + pass + enterprise.WeChatClient = WeChatClient + enterprise.parse_message = lambda xml: TextMessage() + + sys.modules["wechatpy"] = wechatpy + sys.modules["wechatpy.enterprise"] = enterprise + sys.modules["wechatpy.enterprise.crypto"] = crypto_mod + sys.modules["wechatpy.enterprise.messages"] = enterprise_messages + sys.modules["wechatpy.exceptions"] = exceptions_mod + sys.modules["wechatpy.messages"] = messages_mod + sys.modules["wechatpy.client"] = client_mod + sys.modules["wechatpy.client.api"] = client_api_mod + sys.modules["wechatpy.client.api.base"] = client_base_mod + + from astrbot.core.platform.sources.wecom.wecom_adapter import WecomServer + + async def main(): + server = WecomServer( + asyncio.Queue(), + { + "corpid": "corp", + "token": "token", + "encoding_aes_key": "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFG", + "port": "8080", + "callback_server_host": "127.0.0.1", + }, + ) + req = types.SimpleNamespace( + args={ + "msg_signature": "sig", + "timestamp": "1", + "nonce": "2", + "echostr": "echo", + } + ) + resp = await server.handle_verify(req) + assert resp == "echo" + + asyncio.run(main()) + """ + ) From 664760e44e5caf294de5c0df17f19985431943c1 Mon Sep 17 00:00:00 2001 From: whatevertogo Date: Thu, 26 Feb 2026 12:17:00 +0800 Subject: [PATCH 2/2] fix: correct stop_event reference to _shutdown_event in WebChatAdapter test: add event queue and platform settings fixtures for testing test: fix typo in TestDingtalkAdapterTypoCompatibility test case --- .../sources/webchat/webchat_adapter.py | 1 - tests/unit/test_dingtalk_adapter.py | 2 +- tests/unit/test_other_adapters.py | 18 ++++++++++++++++++ tests/unit/test_webchat_adapter.py | 9 ++++----- 4 files changed, 23 insertions(+), 7 deletions(-) diff --git a/astrbot/core/platform/sources/webchat/webchat_adapter.py b/astrbot/core/platform/sources/webchat/webchat_adapter.py index bdbd8ac884..047417aaaa 100644 --- a/astrbot/core/platform/sources/webchat/webchat_adapter.py +++ b/astrbot/core/platform/sources/webchat/webchat_adapter.py @@ -66,7 +66,6 @@ def __init__( support_proactive_message=False, ) self._shutdown_event = asyncio.Event() - self.stop_event = self._shutdown_event self._webchat_queue_mgr = webchat_queue_mgr async def send_by_session( diff --git a/tests/unit/test_dingtalk_adapter.py b/tests/unit/test_dingtalk_adapter.py index 4bb33bf52d..4237015ed9 100644 --- a/tests/unit/test_dingtalk_adapter.py +++ b/tests/unit/test_dingtalk_adapter.py @@ -278,7 +278,7 @@ def test_send_by_session_private_message(self): class TestDingtalkAdapterTypoCompatibility: - def test_send_with_sesisp_typo(self): + def test_send_with_sesison_typo(self): _assert_dingtalk_case("send_with_sesison_typo") diff --git a/tests/unit/test_other_adapters.py b/tests/unit/test_other_adapters.py index ac72abfafd..dff7b593ae 100644 --- a/tests/unit/test_other_adapters.py +++ b/tests/unit/test_other_adapters.py @@ -11,8 +11,26 @@ Note: Uses unittest.mock to simulate external dependencies. """ +import asyncio + import pytest +# ============================================================================ +# Shared Fixtures +# ============================================================================ + + +@pytest.fixture +def event_queue(): + """Create an event queue for testing.""" + return asyncio.Queue() + + +@pytest.fixture +def platform_settings(): + """Create platform settings for testing.""" + return {} + # ============================================================================ # QQ Official Adapter Tests # ============================================================================ diff --git a/tests/unit/test_webchat_adapter.py b/tests/unit/test_webchat_adapter.py index b72d83c96f..bbba27b073 100644 --- a/tests/unit/test_webchat_adapter.py +++ b/tests/unit/test_webchat_adapter.py @@ -10,11 +10,10 @@ """ import asyncio -from unittest.mock import AsyncMock, MagicMock, patch +from unittest.mock import patch import pytest - # ============================================================================ # Fixtures # ============================================================================ @@ -108,8 +107,8 @@ async def test_terminate(self, event_queue, platform_config, platform_settings): adapter = WebChatAdapter(platform_config, platform_settings, event_queue) - # terminate() should set the stop_event + # terminate() should set the _shutdown_event await adapter.terminate() - # Verify stop_event is set after terminate - assert adapter.stop_event.is_set() + # Verify _shutdown_event is set after terminate + assert adapter._shutdown_event.is_set()