Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 107 additions & 34 deletions app/jobs/channels/fetch_posts_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,41 +5,93 @@ class Channels::FetchPostsJob < ApplicationJob
retry_on StandardError, wait: 1.minute, attempts: 3

def perform(channel_id)
with_error_context(channel_id: channel_id, action: 'fetch_posts') do
channel = Channel.find(channel_id)

Rails.logger.info "Fetching posts for channel #{channel.username}"

# Получаем Telegram ID канала
chat_id = channel.telegram_id.to_s

# Инициализируем клиент
bot = Telegram.bots[:default]
fetcher = TelegramClient::ChannelFetcher.new(bot)

# Проверяем доступность канала
unless fetcher.channel_available?(channel.username)
Rails.logger.warn "Channel #{channel.username} is not available, deactivating"
channel.deactivate!
return
end

# Получаем последние посты
posts_data = fetcher.get_channel_posts(channel.username, limit: 20)

Rails.logger.info "Fetched #{posts_data.count} posts from channel #{channel.username}"

# Обновляем время последнего поста если есть посты
if posts_data.any?
channel.update_last_post!
start_time = Time.current
channel = nil
posts_count = 0
new_posts_count = 0
errors = []

begin
with_error_context(channel_id: channel_id, action: 'fetch_posts') do
channel = Channel.find(channel_id)

Rails.logger.info "Fetching posts for channel #{channel.username}"

# Получаем Telegram ID канала
chat_id = channel.telegram_id.to_s

# Инициализируем клиент
bot = Telegram.bots[:default]
fetcher = TelegramClient::ChannelFetcher.new(bot)

# Проверяем доступность канала
unless fetcher.channel_available?(channel.username)
Rails.logger.warn "Channel #{channel.username} is not available, deactivating"
channel.deactivate!

log_fetch_results(
channel: channel,
job_id: job_id,
execution_time_ms: ((Time.current - start_time) * 1000).to_i,
posts_count: 0,
new_posts_count: 0,
status: 'warning',
message: "Channel #{channel.username} is not available and was deactivated",
errors: ['Channel not available']
)
return
end

# Получаем последние посты
posts_data = fetcher.get_channel_posts(channel.username, limit: 20)
posts_count = posts_data.count

Rails.logger.info "Fetched #{posts_data.count} posts from channel #{channel.username}"

# Обновляем время последнего поста если есть посты
if posts_data.any?
channel.update_last_post!
end

# Обрабатываем каждый пост
posts_data.each do |post_data|
begin
process_single_post(channel, post_data)
new_posts_count += 1
rescue StandardError => e
errors << "Failed to process post #{post_data[:telegram_message_id]}: #{e.message}"
Rails.logger.error "Error processing post #{post_data[:telegram_message_id]}: #{e.message}"
end
end

Rails.logger.info "Completed fetching posts for channel #{channel.username}"

# Логируем успешное завершение
log_fetch_results(
channel: channel,
job_id: job_id,
execution_time_ms: ((Time.current - start_time) * 1000).to_i,
posts_count: posts_count,
new_posts_count: new_posts_count,
status: errors.empty? ? 'success' : 'warning',
message: "Processed #{posts_count} posts, #{new_posts_count} new posts from #{channel.username}",
errors: errors
)
end

# Обрабатываем каждый пост
posts_data.each do |post_data|
process_single_post(channel, post_data)
end

Rails.logger.info "Completed fetching posts for channel #{channel.username}"
rescue StandardError => e
# Логируем ошибку выполнения всего job
log_fetch_results(
channel: channel,
job_id: job_id,
execution_time_ms: ((Time.current - start_time) * 1000).to_i,
posts_count: posts_count,
new_posts_count: new_posts_count,
status: 'error',
message: "Failed to fetch posts from channel #{channel&.username || channel_id}: #{e.message}",
errors: [e.message]
)

raise e
end
end

Expand Down Expand Up @@ -72,4 +124,25 @@ def process_single_post(channel, post_data)
severity: :warn,
reraise: false)
end

def log_fetch_results(channel:, job_id:, execution_time_ms:, posts_count:, new_posts_count:, status:, message:, errors: [])
data = {
posts_processed: posts_count,
new_posts: new_posts_count,
errors: errors
}

ChannelUpdateLog.create!(
source: 'FetchPostsJob',
message: message,
status: status,
channel: channel,
job_id: job_id,
execution_time_ms: execution_time_ms,
data: data
)
rescue StandardError => e
# Не даем ошибкам логирования прервать основную операцию
Rails.logger.error "Failed to log fetch results: #{e.message}"
end
end
90 changes: 75 additions & 15 deletions app/jobs/channels/monitor_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,88 @@ class Channels::MonitorJob < ApplicationJob
RUN_INTERVAL = 5.minutes

def perform(*args)
Rails.logger.info 'Starting channel monitoring'
start_time = Time.current
processed_channels = 0
skipped_channels = 0
skip_reasons = {}

# Получаем все активные каналы, за которыми есть подписки
channels = Channel.joins(:subscriptions)
.where(subscriptions: { active: true })
.where(active: true)
.needs_monitoring
begin
Rails.logger.info 'Starting channel monitoring'

Rails.logger.info "Found #{channels.count} channels to monitor"
# Получаем все активные каналы, за которыми есть подписки
channels = Channel.joins(:subscriptions)
.where(subscriptions: { active: true })
.where(active: true)
.needs_monitoring

channels.each do |channel|
with_error_context(channel_id: channel.id, channel_username: channel.username) do
# Запускаем задачу для получения постов из канала
Channels::FetchPostsJob.perform_later(channel.id)
Rails.logger.info "Found #{channels.count} channels to monitor"

# Обновляем время последней проверки
channel.mark_as_monitored!
channels.each do |channel|
with_error_context(channel_id: channel.id, channel_username: channel.username) do
# Запускаем задачу для получения постов из канала
Channels::FetchPostsJob.perform_later(channel.id)
processed_channels += 1

Rails.logger.debug "Scheduled fetch job for channel #{channel.username}"
# Обновляем время последней проверки
channel.mark_as_monitored!

Rails.logger.debug "Scheduled fetch job for channel #{channel.username}"
end
end

Rails.logger.info 'Channel monitoring completed'

# Логируем успешное завершение мониторинга
log_monitor_results(
job_id: job_id,
execution_time_ms: ((Time.current - start_time) * 1000).to_i,
total_channels: channels.count,
processed_channels: processed_channels,
skipped_channels: skipped_channels,
skip_reasons: skip_reasons,
status: 'success',
message: "Monitoring completed: #{processed_channels} channels processed, #{skipped_channels} channels skipped"
)

rescue StandardError => e
# Логируем ошибку выполнения мониторинга
log_monitor_results(
job_id: job_id,
execution_time_ms: ((Time.current - start_time) * 1000).to_i,
total_channels: channels&.count || 0,
processed_channels: processed_channels,
skipped_channels: skipped_channels,
skip_reasons: skip_reasons,
status: 'error',
message: "Monitoring failed: #{e.message}",
errors: [e.message]
)

raise e
end
end

private

def log_monitor_results(job_id:, execution_time_ms:, total_channels:, processed_channels:, skipped_channels:, skip_reasons:, status:, message:, errors: [])
data = {
total_channels: total_channels,
processed_channels: processed_channels,
skipped_channels: skipped_channels,
skip_reasons: skip_reasons,
errors: errors || []
}

Rails.logger.info 'Channel monitoring completed'
ChannelUpdateLog.create!(
source: 'MonitorJob',
message: message,
status: status,
job_id: job_id,
execution_time_ms: execution_time_ms,
data: data
)
rescue StandardError => e
# Не даем ошибкам логирования прервать основную операцию
Rails.logger.error "Failed to log monitor results: #{e.message}"
end
end
15 changes: 15 additions & 0 deletions app/models/channel_update_log.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
class ChannelUpdateLog < ApplicationRecord
belongs_to :channel, optional: true

STATUS_TYPES = %w[info success warning error].freeze

validates :status, inclusion: { in: STATUS_TYPES }
validates :source, presence: true
validates :message, presence: true
validates :status, presence: true

scope :by_source, ->(source) { where(source: source) }
scope :by_status, ->(status) { where(status: status) }
scope :recent, ->(hours = 24) { where('created_at > ?', hours.hours.ago) }
scope :for_channel, ->(channel) { where(channel: channel) }
end
18 changes: 18 additions & 0 deletions db/migrate/20251011144317_create_channel_update_logs.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
class CreateChannelUpdateLogs < ActiveRecord::Migration[8.0]
def change
create_table :channel_update_logs do |t|
t.string :source, null: false, index: true
t.text :message, null: false
t.jsonb :data, default: {}, null: false
t.string :status, null: false, index: true
t.references :channel, null: true, foreign_key: true, index: true
t.string :job_id, index: true
t.integer :execution_time_ms

t.timestamps
end

add_index :channel_update_logs, [:source, :created_at]
add_index :channel_update_logs, [:channel_id, :created_at]
end
end
25 changes: 22 additions & 3 deletions db/schema.rb

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading