Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ CAPTCHA_ENABLED=false
# Default: 120 seconds (2 minutes)
CAPTCHA_TIMEOUT_SECONDS=120

# Path to groups.json for multi-group support (optional)
# If this file exists, per-group settings are loaded from it instead of the
# GROUP_ID/WARNING_TOPIC_ID/etc. fields above. See groups.json.example.
# GROUPS_CONFIG_PATH=groups.json

# Logfire Configuration (optional - for production logging)
# Get your token from https://logfire.pydantic.dev
LOGFIRE_TOKEN=your_logfire_token_here
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
.env
.env.staging
groups.json
.venv/
__pycache__/
*.pyc
Expand Down
26 changes: 26 additions & 0 deletions groups.json.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[
{
"group_id": -1001234567890,
"warning_topic_id": 123,
"restrict_failed_users": false,
"warning_threshold": 3,
"warning_time_threshold_minutes": 180,
"captcha_enabled": false,
"captcha_timeout_seconds": 120,
"new_user_probation_hours": 72,
"new_user_violation_threshold": 3,
"rules_link": "https://t.me/pythonID/290029/321799"
},
{
"group_id": -1009876543210,
"warning_topic_id": 456,
"restrict_failed_users": true,
"warning_threshold": 5,
"warning_time_threshold_minutes": 60,
"captcha_enabled": true,
"captcha_timeout_seconds": 180,
"new_user_probation_hours": 168,
"new_user_violation_threshold": 2,
"rules_link": "https://t.me/mygroup/rules"
}
]
1 change: 1 addition & 0 deletions src/bot/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class Settings(BaseSettings):
captcha_timeout_seconds: int = 120
new_user_probation_hours: int = 72 # 3 days default
new_user_violation_threshold: int = 3 # restrict after this many violations
groups_config_path: str = "groups.json"
logfire_token: str | None = None
logfire_service_name: str = "pythonid-bot"
logfire_environment: str = "production"
Expand Down
26 changes: 26 additions & 0 deletions src/bot/database/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,32 @@ def get_warnings_past_time_threshold(
)
return [record for record in records]

def get_warnings_past_time_threshold_for_group(
self, group_id: int, threshold: timedelta
) -> list[UserWarning]:
"""
Find active warnings for a specific group that exceeded the time threshold.

Args:
group_id: Telegram group ID to filter by.
threshold: Time duration since first warning to trigger restriction.

Returns:
list[UserWarning]: Warning records that should be auto-restricted.
"""
with Session(self._engine) as session:
cutoff_time = datetime.now(UTC) - threshold
statement = select(UserWarning).where(
UserWarning.group_id == group_id,
~UserWarning.is_restricted,
UserWarning.first_warned_at <= cutoff_time,
)
records = session.exec(statement).all()
logger.info(
f"Found {len(records)} warnings past {threshold} threshold for group {group_id}"
)
return [record for record in records]

def add_pending_captcha(
self,
user_id: int,
Expand Down
246 changes: 246 additions & 0 deletions src/bot/group_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
"""
Multi-group configuration for the PythonID bot.

This module provides per-group settings via GroupConfig and a GroupRegistry
that allows a single bot instance to manage multiple Telegram groups.
Groups can be configured via a groups.json file or fall back to the
single-group .env configuration for backward compatibility.
"""

import json
import logging
from datetime import timedelta
from pathlib import Path

from pydantic import BaseModel, field_validator
from telegram import Update

logger = logging.getLogger(__name__)


class GroupConfig(BaseModel):
"""
Per-group configuration settings.

Each monitored group has its own set of feature flags and thresholds.
"""

group_id: int
warning_topic_id: int
restrict_failed_users: bool = False
warning_threshold: int = 3
warning_time_threshold_minutes: int = 180
captcha_enabled: bool = False
captcha_timeout_seconds: int = 120
new_user_probation_hours: int = 72
new_user_violation_threshold: int = 3
rules_link: str = "https://t.me/pythonID/290029/321799"

@field_validator("group_id")
@classmethod
def group_id_must_be_negative(cls, v: int) -> int:
if v >= 0:
raise ValueError("group_id must be negative (Telegram supergroup IDs are negative)")
return v

@field_validator("warning_threshold")
@classmethod
def warning_threshold_must_be_positive(cls, v: int) -> int:
if v <= 0:
raise ValueError("warning_threshold must be greater than 0")
return v

@field_validator("warning_time_threshold_minutes")
@classmethod
def warning_time_threshold_must_be_positive(cls, v: int) -> int:
if v <= 0:
raise ValueError("warning_time_threshold_minutes must be greater than 0")
return v

@field_validator("captcha_timeout_seconds")
@classmethod
def captcha_timeout_must_be_in_range(cls, v: int) -> int:
if not (10 <= v <= 600):
raise ValueError("captcha_timeout_seconds must be between 10 and 600 seconds")
return v

@field_validator("new_user_probation_hours")
@classmethod
def probation_hours_must_be_non_negative(cls, v: int) -> int:
if v < 0:
raise ValueError("new_user_probation_hours must be >= 0")
return v

@property
def probation_timedelta(self) -> timedelta:
return timedelta(hours=self.new_user_probation_hours)

@property
def warning_time_threshold_timedelta(self) -> timedelta:
return timedelta(minutes=self.warning_time_threshold_minutes)

@property
def captcha_timeout_timedelta(self) -> timedelta:
return timedelta(seconds=self.captcha_timeout_seconds)


class GroupRegistry:
"""
Registry of monitored groups.

Provides O(1) lookup by group_id and iteration over all groups.
"""

def __init__(self) -> None:
self._groups: dict[int, GroupConfig] = {}

def register(self, config: GroupConfig) -> None:
if config.group_id in self._groups:
raise ValueError(f"Duplicate group_id: {config.group_id}")
self._groups[config.group_id] = config
logger.info(f"Registered group {config.group_id} (warning_topic={config.warning_topic_id})")

def get(self, group_id: int) -> GroupConfig | None:
return self._groups.get(group_id)

def all_groups(self) -> list[GroupConfig]:
return list(self._groups.values())

def is_monitored(self, group_id: int) -> bool:
return group_id in self._groups


def load_groups_from_json(path: str) -> list[GroupConfig]:
"""
Parse a groups.json file into a list of GroupConfig objects.

Args:
path: Path to the JSON file.

Returns:
List of GroupConfig instances.

Raises:
FileNotFoundError: If the file doesn't exist.
json.JSONDecodeError: If the file is not valid JSON.
ValueError: If the JSON structure is invalid.
"""
with open(path) as f:
data = json.load(f)

if not isinstance(data, list):
raise ValueError("groups.json must contain a JSON array of group objects")

if not data:
raise ValueError("groups.json must contain at least one group")

configs = [GroupConfig(**item) for item in data]

# Check for duplicate group_ids
seen_ids: set[int] = set()
for config in configs:
if config.group_id in seen_ids:
raise ValueError(f"Duplicate group_id in groups.json: {config.group_id}")
seen_ids.add(config.group_id)

return configs


def build_group_registry(settings: object) -> GroupRegistry:
"""
Build a GroupRegistry from settings.

If groups.json exists at the configured path, loads from it.
Otherwise creates a single GroupConfig from .env fields (backward compatible).

Args:
settings: Application Settings instance.

Returns:
Populated GroupRegistry.
"""
registry = GroupRegistry()
groups_path = getattr(settings, "groups_config_path", "groups.json")

if Path(groups_path).exists():
logger.info(f"Loading group configuration from {groups_path}")
configs = load_groups_from_json(groups_path)
for config in configs:
registry.register(config)
logger.info(f"Loaded {len(configs)} group(s) from {groups_path}")
else:
logger.info("No groups.json found, using single-group config from .env")
config = GroupConfig(
group_id=settings.group_id,
warning_topic_id=settings.warning_topic_id,
restrict_failed_users=settings.restrict_failed_users,
warning_threshold=settings.warning_threshold,
warning_time_threshold_minutes=settings.warning_time_threshold_minutes,
captcha_enabled=settings.captcha_enabled,
captcha_timeout_seconds=settings.captcha_timeout_seconds,
new_user_probation_hours=settings.new_user_probation_hours,
new_user_violation_threshold=settings.new_user_violation_threshold,
rules_link=settings.rules_link,
)
registry.register(config)

return registry


def get_group_config_for_update(update: Update) -> GroupConfig | None:
"""
Get the GroupConfig for the group in the given Update.

Returns None if the update's chat is not a monitored group.

Args:
update: Telegram Update object.

Returns:
GroupConfig if the chat is monitored, None otherwise.
"""
if not update.effective_chat:
return None
return get_group_registry().get(update.effective_chat.id)


# Module-level singleton
_registry: GroupRegistry | None = None


def init_group_registry(settings: object) -> GroupRegistry:
"""
Initialize the global group registry singleton.

Must be called once at application startup.

Args:
settings: Application Settings instance.

Returns:
Initialized GroupRegistry.
"""
global _registry
_registry = build_group_registry(settings)
return _registry


def get_group_registry() -> GroupRegistry:
"""
Get the global group registry singleton.

Returns:
GroupRegistry instance.

Raises:
RuntimeError: If init_group_registry() hasn't been called.
"""
if _registry is None:
raise RuntimeError("Group registry not initialized. Call init_group_registry() first.")
return _registry


def reset_group_registry() -> None:
"""Reset the group registry singleton (for testing)."""
global _registry
_registry = None
Loading
Loading