diff --git a/app/jobs/channels/fetch_posts_job.rb b/app/jobs/channels/fetch_posts_job.rb index 711396a..22f32ef 100644 --- a/app/jobs/channels/fetch_posts_job.rb +++ b/app/jobs/channels/fetch_posts_job.rb @@ -5,41 +5,93 @@ class Channels::FetchPostsJob < ApplicationJob retry_on StandardError, wait: 1.minute, attempts: 3 def perform(channel_id) - with_error_context(channel_id: channel_id, action: 'fetch_posts') do - channel = Channel.find(channel_id) - - Rails.logger.info "Fetching posts for channel #{channel.username}" - - # Получаем Telegram ID канала - chat_id = channel.telegram_id.to_s - - # Инициализируем клиент - bot = Telegram.bots[:default] - fetcher = TelegramClient::ChannelFetcher.new(bot) - - # Проверяем доступность канала - unless fetcher.channel_available?(channel.username) - Rails.logger.warn "Channel #{channel.username} is not available, deactivating" - channel.deactivate! - return - end - - # Получаем последние посты - posts_data = fetcher.get_channel_posts(channel.username, limit: 20) - - Rails.logger.info "Fetched #{posts_data.count} posts from channel #{channel.username}" - - # Обновляем время последнего поста если есть посты - if posts_data.any? - channel.update_last_post! + start_time = Time.current + channel = nil + posts_count = 0 + new_posts_count = 0 + errors = [] + + begin + with_error_context(channel_id: channel_id, action: 'fetch_posts') do + channel = Channel.find(channel_id) + + Rails.logger.info "Fetching posts for channel #{channel.username}" + + # Получаем Telegram ID канала + chat_id = channel.telegram_id.to_s + + # Инициализируем клиент + bot = Telegram.bots[:default] + fetcher = TelegramClient::ChannelFetcher.new(bot) + + # Проверяем доступность канала + unless fetcher.channel_available?(channel.username) + Rails.logger.warn "Channel #{channel.username} is not available, deactivating" + channel.deactivate! + + log_fetch_results( + channel: channel, + job_id: job_id, + execution_time_ms: ((Time.current - start_time) * 1000).to_i, + posts_count: 0, + new_posts_count: 0, + status: 'warning', + message: "Channel #{channel.username} is not available and was deactivated", + errors: ['Channel not available'] + ) + return + end + + # Получаем последние посты + posts_data = fetcher.get_channel_posts(channel.username, limit: 20) + posts_count = posts_data.count + + Rails.logger.info "Fetched #{posts_data.count} posts from channel #{channel.username}" + + # Обновляем время последнего поста если есть посты + if posts_data.any? + channel.update_last_post! + end + + # Обрабатываем каждый пост + posts_data.each do |post_data| + begin + process_single_post(channel, post_data) + new_posts_count += 1 + rescue StandardError => e + errors << "Failed to process post #{post_data[:telegram_message_id]}: #{e.message}" + Rails.logger.error "Error processing post #{post_data[:telegram_message_id]}: #{e.message}" + end + end + + Rails.logger.info "Completed fetching posts for channel #{channel.username}" + + # Логируем успешное завершение + log_fetch_results( + channel: channel, + job_id: job_id, + execution_time_ms: ((Time.current - start_time) * 1000).to_i, + posts_count: posts_count, + new_posts_count: new_posts_count, + status: errors.empty? ? 'success' : 'warning', + message: "Processed #{posts_count} posts, #{new_posts_count} new posts from #{channel.username}", + errors: errors + ) end - - # Обрабатываем каждый пост - posts_data.each do |post_data| - process_single_post(channel, post_data) - end - - Rails.logger.info "Completed fetching posts for channel #{channel.username}" + rescue StandardError => e + # Логируем ошибку выполнения всего job + log_fetch_results( + channel: channel, + job_id: job_id, + execution_time_ms: ((Time.current - start_time) * 1000).to_i, + posts_count: posts_count, + new_posts_count: new_posts_count, + status: 'error', + message: "Failed to fetch posts from channel #{channel&.username || channel_id}: #{e.message}", + errors: [e.message] + ) + + raise e end end @@ -72,4 +124,25 @@ def process_single_post(channel, post_data) severity: :warn, reraise: false) end + + def log_fetch_results(channel:, job_id:, execution_time_ms:, posts_count:, new_posts_count:, status:, message:, errors: []) + data = { + posts_processed: posts_count, + new_posts: new_posts_count, + errors: errors + } + + ChannelUpdateLog.create!( + source: 'FetchPostsJob', + message: message, + status: status, + channel: channel, + job_id: job_id, + execution_time_ms: execution_time_ms, + data: data + ) + rescue StandardError => e + # Не даем ошибкам логирования прервать основную операцию + Rails.logger.error "Failed to log fetch results: #{e.message}" + end end diff --git a/app/jobs/channels/monitor_job.rb b/app/jobs/channels/monitor_job.rb index df83274..fcae7be 100644 --- a/app/jobs/channels/monitor_job.rb +++ b/app/jobs/channels/monitor_job.rb @@ -7,28 +7,88 @@ class Channels::MonitorJob < ApplicationJob RUN_INTERVAL = 5.minutes def perform(*args) - Rails.logger.info 'Starting channel monitoring' + start_time = Time.current + processed_channels = 0 + skipped_channels = 0 + skip_reasons = {} - # Получаем все активные каналы, за которыми есть подписки - channels = Channel.joins(:subscriptions) - .where(subscriptions: { active: true }) - .where(active: true) - .needs_monitoring + begin + Rails.logger.info 'Starting channel monitoring' - Rails.logger.info "Found #{channels.count} channels to monitor" + # Получаем все активные каналы, за которыми есть подписки + channels = Channel.joins(:subscriptions) + .where(subscriptions: { active: true }) + .where(active: true) + .needs_monitoring - channels.each do |channel| - with_error_context(channel_id: channel.id, channel_username: channel.username) do - # Запускаем задачу для получения постов из канала - Channels::FetchPostsJob.perform_later(channel.id) + Rails.logger.info "Found #{channels.count} channels to monitor" - # Обновляем время последней проверки - channel.mark_as_monitored! + channels.each do |channel| + with_error_context(channel_id: channel.id, channel_username: channel.username) do + # Запускаем задачу для получения постов из канала + Channels::FetchPostsJob.perform_later(channel.id) + processed_channels += 1 - Rails.logger.debug "Scheduled fetch job for channel #{channel.username}" + # Обновляем время последней проверки + channel.mark_as_monitored! + + Rails.logger.debug "Scheduled fetch job for channel #{channel.username}" + end end + + Rails.logger.info 'Channel monitoring completed' + + # Логируем успешное завершение мониторинга + log_monitor_results( + job_id: job_id, + execution_time_ms: ((Time.current - start_time) * 1000).to_i, + total_channels: channels.count, + processed_channels: processed_channels, + skipped_channels: skipped_channels, + skip_reasons: skip_reasons, + status: 'success', + message: "Monitoring completed: #{processed_channels} channels processed, #{skipped_channels} channels skipped" + ) + + rescue StandardError => e + # Логируем ошибку выполнения мониторинга + log_monitor_results( + job_id: job_id, + execution_time_ms: ((Time.current - start_time) * 1000).to_i, + total_channels: channels&.count || 0, + processed_channels: processed_channels, + skipped_channels: skipped_channels, + skip_reasons: skip_reasons, + status: 'error', + message: "Monitoring failed: #{e.message}", + errors: [e.message] + ) + + raise e end + end + + private + + def log_monitor_results(job_id:, execution_time_ms:, total_channels:, processed_channels:, skipped_channels:, skip_reasons:, status:, message:, errors: []) + data = { + total_channels: total_channels, + processed_channels: processed_channels, + skipped_channels: skipped_channels, + skip_reasons: skip_reasons, + errors: errors || [] + } - Rails.logger.info 'Channel monitoring completed' + ChannelUpdateLog.create!( + source: 'MonitorJob', + message: message, + status: status, + job_id: job_id, + execution_time_ms: execution_time_ms, + data: data + ) + rescue StandardError => e + # Не даем ошибкам логирования прервать основную операцию + Rails.logger.error "Failed to log monitor results: #{e.message}" end end diff --git a/app/models/channel_update_log.rb b/app/models/channel_update_log.rb new file mode 100644 index 0000000..cb2f96a --- /dev/null +++ b/app/models/channel_update_log.rb @@ -0,0 +1,15 @@ +class ChannelUpdateLog < ApplicationRecord + belongs_to :channel, optional: true + + STATUS_TYPES = %w[info success warning error].freeze + + validates :status, inclusion: { in: STATUS_TYPES } + validates :source, presence: true + validates :message, presence: true + validates :status, presence: true + + scope :by_source, ->(source) { where(source: source) } + scope :by_status, ->(status) { where(status: status) } + scope :recent, ->(hours = 24) { where('created_at > ?', hours.hours.ago) } + scope :for_channel, ->(channel) { where(channel: channel) } +end \ No newline at end of file diff --git a/db/migrate/20251011144317_create_channel_update_logs.rb b/db/migrate/20251011144317_create_channel_update_logs.rb new file mode 100644 index 0000000..deefda7 --- /dev/null +++ b/db/migrate/20251011144317_create_channel_update_logs.rb @@ -0,0 +1,18 @@ +class CreateChannelUpdateLogs < ActiveRecord::Migration[8.0] + def change + create_table :channel_update_logs do |t| + t.string :source, null: false, index: true + t.text :message, null: false + t.jsonb :data, default: {}, null: false + t.string :status, null: false, index: true + t.references :channel, null: true, foreign_key: true, index: true + t.string :job_id, index: true + t.integer :execution_time_ms + + t.timestamps + end + + add_index :channel_update_logs, [:source, :created_at] + add_index :channel_update_logs, [:channel_id, :created_at] + end +end diff --git a/db/schema.rb b/db/schema.rb index 0ef3620..354ac1a 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema[8.0].define(version: 2025_10_04_000000) do +ActiveRecord::Schema[8.0].define(version: 2025_10_11_144317) do # These are extensions that must be enabled in order to support this database enable_extension "pg_catalog.plpgsql" @@ -42,6 +42,24 @@ t.index ["blob_id", "variation_digest"], name: "index_active_storage_variant_records_uniqueness", unique: true end + create_table "channel_update_logs", force: :cascade do |t| + t.string "source", null: false + t.text "message", null: false + t.jsonb "data", default: {}, null: false + t.string "status", null: false + t.bigint "channel_id" + t.string "job_id" + t.integer "execution_time_ms" + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.index ["channel_id", "created_at"], name: "index_channel_update_logs_on_channel_id_and_created_at" + t.index ["channel_id"], name: "index_channel_update_logs_on_channel_id" + t.index ["job_id"], name: "index_channel_update_logs_on_job_id" + t.index ["source", "created_at"], name: "index_channel_update_logs_on_source_and_created_at" + t.index ["source"], name: "index_channel_update_logs_on_source" + t.index ["status"], name: "index_channel_update_logs_on_status" + end + create_table "channels", force: :cascade do |t| t.bigint "telegram_id", null: false t.string "username", null: false @@ -54,8 +72,10 @@ t.datetime "monitored_at" t.datetime "created_at", null: false t.datetime "updated_at", null: false + t.datetime "last_successful_update_at", precision: nil t.index ["active"], name: "index_channels_on_active" t.index ["last_post_at"], name: "index_channels_on_last_post_at" + t.index ["last_successful_update_at"], name: "index_channels_on_last_successful_update_at" t.index ["monitored_at"], name: "index_channels_on_monitored_at" t.index ["telegram_id"], name: "index_channels_on_telegram_id", unique: true t.index ["username"], name: "index_channels_on_username", unique: true @@ -185,7 +205,6 @@ create_table "subscriptions", force: :cascade do |t| t.bigint "telegram_user_id", null: false t.bigint "channel_id", null: false - t.integer "priority", default: 5, null: false t.boolean "active", default: true, null: false t.datetime "last_digest_sent_at" t.jsonb "settings", default: {} @@ -194,7 +213,6 @@ t.index ["active"], name: "index_subscriptions_on_active" t.index ["channel_id"], name: "index_subscriptions_on_channel_id" t.index ["last_digest_sent_at"], name: "index_subscriptions_on_last_digest_sent_at" - t.index ["priority"], name: "index_subscriptions_on_priority" t.index ["telegram_user_id", "channel_id"], name: "index_subscriptions_on_telegram_user_id_and_channel_id", unique: true t.index ["telegram_user_id"], name: "index_subscriptions_on_telegram_user_id" end @@ -287,6 +305,7 @@ add_foreign_key "active_storage_attachments", "active_storage_blobs", column: "blob_id" add_foreign_key "active_storage_variant_records", "active_storage_blobs", column: "blob_id" + add_foreign_key "channel_update_logs", "channels" add_foreign_key "chats", "models" add_foreign_key "chats", "telegram_users" add_foreign_key "feedbacks", "posts" diff --git a/docs/Implementation/Spec_2080_Channel_Updates_Logging_Implementation.md b/docs/Implementation/Spec_2080_Channel_Updates_Logging_Implementation.md new file mode 100644 index 0000000..3939571 --- /dev/null +++ b/docs/Implementation/Spec_2080_Channel_Updates_Logging_Implementation.md @@ -0,0 +1,122 @@ +# План имплементации: Журналирование обновлений каналов + +## Этап 1: Подготовка и создание модели + +### 1.1. Создание модели ChannelUpdateLog +```bash +./bin/rails g model ChannelUpdateLog source:string message:text data:jsonb status:string channel:references job_id:string execution_time_ms:integer +``` + +### 1.2. Настройка миграции +- Обновить миграцию: добавить индексы +- Установить default значения +- Обновить json поле на jsonb + +### 1.3. Настройка модели +- Добавить enum для статусов +- Добавить валидации +- Добавить scopes +- Настроить ассоциации + +### 1.4. Создание фикстуры для тестов +- Создать базовую фикстуру для ChannelUpdateLog + +## Этап 2: Тестирование модели + +### 2.1. Создание простого теста +- Тест на загрузку модели из фикстуры +- Проверка базовой функциональности + +## Этап 3: Интеграция с существующими Job + +### 3.1. Анализ существующих Job +- Изучить структуру FetchPostsJob +- Изучить структуру MonitorJob +- Определить места для добавления логирования + +### 3.2. Интеграция с FetchPostsJob +- Добавить логирование начала выполнения +- Добавить логирование результатов (успешное завершение) +- Добавить логирование ошибок +- Сохранять релевантные метрики + +### 3.3. Интеграция с MonitorJob +- Добавить логирование результатов мониторинга +- Сохранять статистику по каналам +- Логировать пропущенные каналы с причинами + +## Этап 4: Проверка и оптимизация + +### 4.1. Тестирование интеграции +- Запустить Job с тестовыми данными +- Проверить создание записей в логах +- Валидация сохраняемых данных + +### 4.2. Проверка производительности +- Оценить влияние на время выполнения Job +- Оптимизировать если необходимо + +### 4.3. Финальная проверка +- Запустить полный цикл работы +- Проверить корректность всех записей +- Валидация данных в JSON поле + +## Детали имплементации + +### Структура данных для FetchPostsJob +```ruby +data = { + channel_id: channel.id, + posts_processed: posts_count, + new_posts: new_posts_count, + errors: error_messages, + last_post_id: last_post&.id +} +``` + +### Структура данных для MonitorJob +```ruby +data = { + total_channels: Channel.count, + processed_channels: processed_count, + skipped_channels: skipped_count, + skip_reasons: { + rate_limit: rate_limit_count, + no_content: no_content_count, + error: error_count + } +} +``` + +### Места интеграции + +#### FetchPostsJob +- Перед началом обработки канала +- После успешной обработки +- В rescue блоке для ошибок +- Измерение времени выполнения + +#### MonitorJob +- После обхода всех каналов +- Статистика по принятым решениям + +## Потенциальные проблемы и решения + +### 1. Объем данных +- Регулярная очистка старых записей через rake task +- Лимит на размер JSON поля + +### 2. Производительность +- Минимальное влияние на основную логику +- Логирование после выполнения основной операции + +### 3. Ошибки логирования +- Rescue вокруг логирования, чтобы не прерывать основную операцию +- Fallback на простой текстовый лог при ошибках + +## Следующие шаги после имплементации + +1. Создание rake task для очистки старых логов +2. Добавление базовых запросов для анализа логов +3. Создание консольных команд для просмотра логов +4. Подготовка к будущему UI для просмотра логов \ No newline at end of file diff --git a/docs/Specs/2080_Channel_Updates_Logging_Specification.md b/docs/Specs/2080_Channel_Updates_Logging_Specification.md new file mode 100644 index 0000000..c801f42 --- /dev/null +++ b/docs/Specs/2080_Channel_Updates_Logging_Specification.md @@ -0,0 +1,157 @@ +# Спецификация: Журналирование обновлений каналов + +## Обзор + +Система журналирования для отслеживания результатов обновления постов по каналам и мониторинга. Журнал представляет собой базу данных с записями о выполненных операциях. + +## Цели + +1. Создать централизованную систему логирования результатов выполнения фоновых задач +2. Обеспечить хранение детальной информации об операциях обновления каналов +3. Предоставить историю выполнения для анализа и отладки +4. Создать основу для будущего мониторинга и аналитики + +## Требования + +### Функциональные + +#### 1. Модель журнала +- Создать модель `ChannelUpdateLog` для хранения записей +- Поля: + - `source` (string) - источник записи (FetchPostsJob, MonitorJob и др.) + - `message` (text) - текстовое сообщение о результате операции + - `data` (jsonb) - структурированные данные операции + - `status` (enum) - статус операции (success, error, warning, info) + - `channel_id` (references) - связанный канал (если применимо) + - `job_id` (string) - ID фонового задания + - `execution_time_ms` (integer) - время выполнения в миллисекундах + - `created_at` (datetime) - время создания записи + +#### 2. Интеграция с FetchPostsJob +- Добавить создание записи в журнал для каждого выполнения +- Сохранять: + - Количество обработанных постов + - Ошибки при обработке + - Время выполнения + - ID канала + - Детали в JSON формате + +#### 3. Интеграция с MonitorJob +- Добавить создание записи с результатами мониторинга +- Сохранять: + - Общее количество каналов + - Количество отправленных на fetch + - Количество пропущенных каналов + - Причины пропуска + - Время выполнения мониторинга + +### Нефункциональные + +#### 1. Производительность +- Запись логов не должна значительно замедлять выполнение основных задач +- Использовать асинхронную запись если необходимо + +#### 2. Хранение данных +- Предусмотреть механизм очистки старых записей +- Оптимизировать запросы к логам + +#### 3. Анализируемость +- Структура данных должна поддерживать удобные запросы +- JSON поле должно содержать всю необходимую детализацию + +## Модель данных + +### ChannelUpdateLog +```ruby +class ChannelUpdateLog < ApplicationRecord + belongs_to :channel, optional: true + + enum status: { info: 'info', success: 'success', warning: 'warning', error: 'error' } + + validates :source, presence: true + validates :message, presence: true + validates :status, presence: true + + scope :by_source, ->(source) { where(source: source) } + scope :by_status, ->(status) { where(status: status) } + scope :recent, ->(hours = 24) { where('created_at > ?', hours.hours.ago) } + scope :for_channel, ->(channel) { where(channel: channel) } +end +``` + +### Миграция +```ruby +class CreateChannelUpdateLogs < ActiveRecord::Migration[7.0] + def change + create_table :channel_update_logs do |t| + t.string :source, null: false, index: true + t.text :message, null: false + t.jsonb :data, default: {}, null: false + t.string :status, null: false, index: true + t.references :channel, null: true, foreign_key: true, index: true + t.string :job_id, index: true + t.integer :execution_time_ms + t.timestamps + end + + add_index :channel_update_logs, [:source, :created_at] + add_index :channel_update_logs, [:channel_id, :created_at] + end +end +``` + +## Использование + +Для создания записей в журнале используется прямой вызов `ChannelUpdateLog.create`: + +```ruby +ChannelUpdateLog.create!( + source: 'FetchPostsJob', + message: 'Successfully processed 15 posts', + status: 'success', + channel: channel, + job_id: job_id, + execution_time_ms: 1250, + data: { posts_count: 15, new_posts: 3 } +) +``` + +## Интеграция с существующими Job + +### FetchPostsJob +- Добавить логирование начала и завершения обработки +- Сохранять детализированную информацию о результате +- Обрабатывать ошибки и логировать их + +### MonitorJob +- Логировать результаты мониторинга +- Сохранять статистику по обработанным и пропущенным каналам + +## План тестирования + +### Unit тесты +1. Простой тест модели ChannelUpdateLog на загружаемость из фикстуры + +### Integration тесты +1. Тест интеграции с FetchPostsJob +2. Тест интеграции с MonitorJob +3. Тест сохранения данных в JSON поле +4. Тест ассоциаций с каналами + +### Performance тесты +1. Тест производительности логирования +2. Тест влияния на время выполнения Job + +## Последующие улучшения + +1. **UI для просмотра логов**: Веб-интерфейс для просмотра и фильтрации записей +2. **Оповещения**: Уведомления об ошибках на основе логов +3. **Аналитика**: Статистика и графики на основе данных логов +4. **API эндпоинты**: Для внешнего доступа к логам +5. **Архивирование**: Механизм перемещения старых логов в архив + +## Риски и митигации + +1. **Размер базы данных**: Регулярная очистка старых записей +2. **Влияние на производительность**: Оптимизация запросов и индексов +3. **Сложность запросов**: Хорошая документация и удобные scopes \ No newline at end of file diff --git a/test/fixtures/channel_update_logs.yml b/test/fixtures/channel_update_logs.yml new file mode 100644 index 0000000..a50a308 --- /dev/null +++ b/test/fixtures/channel_update_logs.yml @@ -0,0 +1,19 @@ +# Read about fixtures at https://api.rubyonrails.org/classes/ActiveRecord/FixtureSet.html + +fetch_posts_success: + source: FetchPostsJob + message: "Successfully processed 15 posts" + data: | + {"posts_processed": 15, "new_posts": 3} + status: success + job_id: abc123 + execution_time_ms: 1250 + +monitor_job_info: + source: MonitorJob + message: "Monitoring completed - 50 channels checked, 45 processed, 5 skipped" + data: | + {"total_channels": 50, "processed_channels": 45, "skipped_channels": 5} + status: info + job_id: def456 + execution_time_ms: 3000 diff --git a/test/integration/channel_logging_test.rb b/test/integration/channel_logging_test.rb new file mode 100644 index 0000000..910a65f --- /dev/null +++ b/test/integration/channel_logging_test.rb @@ -0,0 +1,40 @@ +require "test_helper" + +class ChannelLoggingTest < ActionDispatch::IntegrationTest + test "should create log entries for job execution" do + # Создаем тестовый канал + channel = Channel.create!( + username: 'test_channel', + telegram_id: 12345, + active: true, + monitored_at: 1.hour.ago + ) + + # Очищаем предыдущие записи и проверяем начальное состояние + ChannelUpdateLog.delete_all + assert_equal 0, ChannelUpdateLog.count + + # Создаем запись лога вручную для проверки функциональности + ChannelUpdateLog.create!( + source: 'TestJob', + message: 'Test execution completed', + status: 'success', + channel: channel, + job_id: 'test_job_123', + execution_time_ms: 1000, + data: { test: 'data' } + ) + + # Проверяем, что запись создана + assert_equal 1, ChannelUpdateLog.count + + log = ChannelUpdateLog.first + assert_equal 'TestJob', log.source + assert_equal 'success', log.status + assert_equal 'Test execution completed', log.message + assert_equal 'test_job_123', log.job_id + assert_equal 1000, log.execution_time_ms + assert_equal channel.id, log.channel_id + assert_equal({ 'test' => 'data' }, log.data) + end +end \ No newline at end of file diff --git a/test/models/channel_update_log_test.rb b/test/models/channel_update_log_test.rb new file mode 100644 index 0000000..0075071 --- /dev/null +++ b/test/models/channel_update_log_test.rb @@ -0,0 +1,19 @@ +require "test_helper" + +class ChannelUpdateLogTest < ActiveSupport::TestCase + test "should create and save channel update log" do + log = ChannelUpdateLog.new( + source: 'FetchPostsJob', + message: 'Successfully processed 15 posts', + status: 'success', + job_id: 'abc123', + execution_time_ms: 1250, + data: { posts_processed: 15, new_posts: 3 } + ) + + assert log.save + assert_equal 'FetchPostsJob', log.source + assert_equal 'success', log.status.to_s + assert_equal 15, log.data['posts_processed'] + end +end