Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 59 additions & 1 deletion src/components/Indexer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ import { BlockchainRegistry } from '../BlockchainRegistry/index.js'
import { CommandStatus, JobStatus } from '../../@types/commands.js'
import { buildJobIdentifier, getDeployedContractBlock } from './utils.js'
import { create256Hash } from '../../utils/crypt.js'
import { isReachableConnection } from '../../utils/database.js'
import { getDatabase, isReachableConnection } from '../../utils/database.js'
import { sleep } from '../../utils/util.js'
import { isReindexingNeeded } from './version.js'
import { DB_EVENTS, ES_CONNECTION_EVENTS } from '../database/ElasticsearchConfigHelper.js'

/**
* Event emitter for DDO (Data Descriptor Object) events
Expand Down Expand Up @@ -82,6 +83,8 @@ export class OceanIndexer {
private supportedChains: string[]
private indexers: Map<number, ChainIndexer> = new Map()
private MIN_REQUIRED_VERSION = '0.2.2'
private isDbConnected: boolean = true
private reconnectTimer: NodeJS.Timeout | null = null

constructor(
db: Database,
Expand All @@ -93,9 +96,64 @@ export class OceanIndexer {
this.blockchainRegistry = blockchainRegistry
this.supportedChains = Object.keys(supportedNetworks)
INDEXING_QUEUE = []
this.setupDbConnectionListeners()
this.startAllChainIndexers()
}

/**
* Listen for Elasticsearch connection events.
*
* CONNECTION_LOST → cancel any pending restart, stop all indexers once.
* CONNECTION_RESTORED → debounce restart by 5 s so rapid LOST/RESTORED cycles are a single restart.
*/
private setupDbConnectionListeners(): void {
ES_CONNECTION_EVENTS.on(DB_EVENTS.CONNECTION_LOST, async () => {
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer)
this.reconnectTimer = null
}

if (!this.isDbConnected) {
return
}

this.isDbConnected = false
INDEXER_LOGGER.error(
'Database connection lost - stopping all chain indexers until DB is back'
)
await this.stopAllChainIndexers()
})

ES_CONNECTION_EVENTS.on(DB_EVENTS.CONNECTION_RESTORED, () => {
if (this.isDbConnected) {
return
}

if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer)
}

this.reconnectTimer = setTimeout(async () => {
this.reconnectTimer = null
if (this.isDbConnected) {
return
}

this.isDbConnected = true
numCrawlAttempts = 0
INDEXER_LOGGER.info(
'Database connection stable - reinitialising DB and restarting all chain indexers'
)
const freshDb = await getDatabase(true)
if (freshDb) {
this.db = freshDb
}

await this.startAllChainIndexers()
}, 5000)
})
}

public getSupportedNetworks(): RPCS {
return this.networks
}
Expand Down
174 changes: 78 additions & 96 deletions src/components/database/ElasticsearchConfigHelper.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
import EventEmitter from 'node:events'
import { Client } from '@elastic/elasticsearch'
import { OceanNodeDBConfig } from '../../@types'
import { DATABASE_LOGGER } from '../../utils/logging/common.js'
import { GENERIC_EMOJIS, LOG_LEVELS_STR } from '../../utils/logging/Logger.js'
import { DB_TYPES } from '../../utils/constants.js'

export const DB_EVENTS = {
CONNECTION_LOST: 'db:connection:lost',
CONNECTION_RESTORED: 'db:connection:restored'
} as const
export const ES_CONNECTION_EVENTS = new EventEmitter()

export interface ElasticsearchRetryConfig {
requestTimeout?: number
pingTimeout?: number
Expand All @@ -16,20 +23,20 @@
}

export const DEFAULT_ELASTICSEARCH_CONFIG: Required<ElasticsearchRetryConfig> = {
requestTimeout: parseInt(process.env.ELASTICSEARCH_REQUEST_TIMEOUT || '60000'),
pingTimeout: parseInt(process.env.ELASTICSEARCH_PING_TIMEOUT || '5000'),
requestTimeout: parseInt(process.env.ELASTICSEARCH_REQUEST_TIMEOUT || '30000'),
pingTimeout: parseInt(process.env.ELASTICSEARCH_PING_TIMEOUT || '3000'),
resurrectStrategy:
(process.env.ELASTICSEARCH_RESURRECT_STRATEGY as 'ping' | 'optimistic' | 'none') ||
'ping',
maxRetries: parseInt(process.env.ELASTICSEARCH_MAX_RETRIES || '5'),
maxRetries: parseInt(process.env.ELASTICSEARCH_MAX_RETRIES || '3'),
sniffOnStart: process.env.ELASTICSEARCH_SNIFF_ON_START !== 'false',
sniffInterval:
process.env.ELASTICSEARCH_SNIFF_INTERVAL === 'false'
? false
: parseInt(process.env.ELASTICSEARCH_SNIFF_INTERVAL || '30000'),
: parseInt(process.env.ELASTICSEARCH_SNIFF_INTERVAL || '30000', 10) || 30000,
sniffOnConnectionFault: process.env.ELASTICSEARCH_SNIFF_ON_CONNECTION_FAULT !== 'false',
healthCheckInterval: parseInt(
process.env.ELASTICSEARCH_HEALTH_CHECK_INTERVAL || '60000'
process.env.ELASTICSEARCH_HEALTH_CHECK_INTERVAL || '15000'
)
}

Expand All @@ -42,8 +49,9 @@
private isRetrying: boolean = false
private healthCheckTimer: NodeJS.Timeout | null = null
private isMonitoring: boolean = false
private connectionLostEmitted: boolean = false

private constructor() {}

Check warning on line 54 in src/components/database/ElasticsearchConfigHelper.ts

View workflow job for this annotation

GitHub Actions / lint

Useless constructor

public static getInstance(): ElasticsearchClientSingleton {
if (!ElasticsearchClientSingleton.instance) {
Expand Down Expand Up @@ -73,21 +81,27 @@
}

if (this.client && this.config) {
// Skip the extra ping here: 5 DB-class constructors all call getClient()
// during reconnect reinit, and concurrent pings cause false errors that trigger another LOST/RESTORED cycle.
if (this.isMonitoring) {
return this.client
}

const isHealthy = await this.checkConnectionHealth()
if (isHealthy) {
this.startHealthMonitoring(config, customConfig)
return this.client
} else {
DATABASE_LOGGER.logMessageWithEmoji(
`Elasticsearch connection interrupted or failed to ${this.maskUrl(
`Elasticsearch connection unhealthy at ${this.maskUrl(
this.config.url
)} - starting retry phase`,
)} - health monitoring will handle reconnection`,
true,
GENERIC_EMOJIS.EMOJI_CROSS_MARK,
LOG_LEVELS_STR.LEVEL_WARN
)
this.closeConnectionSync()
return this.startRetryConnection(config, customConfig)
this.startHealthMonitoring(config, customConfig)
throw new Error('Elasticsearch connection is not healthy')
}
}

Expand All @@ -109,33 +123,69 @@

this.isMonitoring = true
DATABASE_LOGGER.logMessageWithEmoji(
`Starting Elasticsearch connection monitoring (health check every ${finalConfig.healthCheckInterval}ms)`,
`Starting Elasticsearch health monitoring (interval: ${finalConfig.healthCheckInterval}ms)`,
true,
GENERIC_EMOJIS.EMOJI_OCEAN_WAVE,
LOG_LEVELS_STR.LEVEL_DEBUG
)

this.healthCheckTimer = setInterval(async () => {
if (this.client && !this.isRetrying) {
const isHealthy = await this.checkConnectionHealth()
if (!isHealthy) {
DATABASE_LOGGER.logMessageWithEmoji(
`Elasticsearch connection lost during monitoring - triggering automatic reconnection`,
true,
GENERIC_EMOJIS.EMOJI_CROSS_MARK,
LOG_LEVELS_STR.LEVEL_WARN
)
this.closeConnectionSync()
if (this.isRetrying) {
return
}

const isHealthy = await this.checkConnectionHealth()
if (!isHealthy) {
if (this.client) {
try {
await this.startRetryConnection(config, customConfig)
} catch (error) {
this.client.close()
} catch (err) {
DATABASE_LOGGER.logMessageWithEmoji(
`Automatic reconnection failed: ${error.message}`,
`Error closing Elasticsearch client during health check: ${err instanceof Error ? err.message : String(err)}`,
true,
GENERIC_EMOJIS.EMOJI_CROSS_MARK,
LOG_LEVELS_STR.LEVEL_ERROR
LOG_LEVELS_STR.LEVEL_DEBUG
)
}
this.client = null
this.config = null
}

// Emit CONNECTION_LOST
if (!this.connectionLostEmitted) {
this.connectionLostEmitted = true
DATABASE_LOGGER.logMessageWithEmoji(
`Elasticsearch connection lost to ${this.maskUrl(
config.url
)} - starting reconnection attempts every ${finalConfig.healthCheckInterval}ms`,
true,
GENERIC_EMOJIS.EMOJI_CROSS_MARK,
LOG_LEVELS_STR.LEVEL_WARN
)
ES_CONNECTION_EVENTS.emit(DB_EVENTS.CONNECTION_LOST)
}

// Single reconnection attempt
this.isRetrying = true
try {
DATABASE_LOGGER.logMessageWithEmoji(
`Attempting Elasticsearch reconnection to ${this.maskUrl(config.url)}`,
true,
GENERIC_EMOJIS.EMOJI_OCEAN_WAVE,
LOG_LEVELS_STR.LEVEL_INFO
)
await this.createNewConnection(config, customConfig)
this.isRetrying = false
this.connectionLostEmitted = false
DATABASE_LOGGER.logMessageWithEmoji(
`Elasticsearch connection restored to ${this.maskUrl(config.url)}`,
true,
GENERIC_EMOJIS.EMOJI_CHECK_MARK,
LOG_LEVELS_STR.LEVEL_INFO
)
ES_CONNECTION_EVENTS.emit(DB_EVENTS.CONNECTION_RESTORED)
} catch {
this.isRetrying = false
}
}
}, finalConfig.healthCheckInterval)
Expand All @@ -155,71 +205,6 @@
}
}

private async startRetryConnection(
config: OceanNodeDBConfig,
customConfig: Partial<ElasticsearchRetryConfig> = {}
): Promise<Client> {
if (!this.isElasticsearchDatabase(config)) {
throw new Error(`Database type '${config.dbType}' is not Elasticsearch`)
}

this.isRetrying = true
const finalConfig = {
...DEFAULT_ELASTICSEARCH_CONFIG,
...customConfig
}

DATABASE_LOGGER.logMessageWithEmoji(
`Starting Elasticsearch retry connection phase to ${this.maskUrl(
config.url
)} (max retries: ${finalConfig.maxRetries})`,
true,
GENERIC_EMOJIS.EMOJI_OCEAN_WAVE,
LOG_LEVELS_STR.LEVEL_INFO
)

for (let attempt = 1; attempt <= finalConfig.maxRetries; attempt++) {
try {
DATABASE_LOGGER.logMessageWithEmoji(
`Elasticsearch reconnection attempt ${attempt}/${
finalConfig.maxRetries
} to ${this.maskUrl(config.url)}`,
true,
GENERIC_EMOJIS.EMOJI_OCEAN_WAVE,
LOG_LEVELS_STR.LEVEL_INFO
)

const client = await this.createNewConnection(config, customConfig)
this.isRetrying = false
return client
} catch (error) {
if (attempt === finalConfig.maxRetries) {
this.isRetrying = false
DATABASE_LOGGER.logMessageWithEmoji(
`Elasticsearch retry connection failed after ${
finalConfig.maxRetries
} attempts to ${this.maskUrl(config.url)}: ${error.message}`,
true,
GENERIC_EMOJIS.EMOJI_CROSS_MARK,
LOG_LEVELS_STR.LEVEL_ERROR
)
throw error
}

const delay = Math.min(1000 * Math.pow(2, attempt - 1), 30000)
DATABASE_LOGGER.logMessageWithEmoji(
`Elasticsearch retry attempt ${attempt}/${finalConfig.maxRetries} failed, waiting ${delay}ms before next attempt: ${error.message}`,
true,
GENERIC_EMOJIS.EMOJI_CROSS_MARK,
LOG_LEVELS_STR.LEVEL_WARN
)
await new Promise((resolve) => setTimeout(resolve, delay))
}
}

throw new Error('Maximum retry attempts reached')
}

private async checkConnectionHealth(): Promise<boolean> {
if (!this.client) return false

Expand All @@ -228,7 +213,7 @@
return true
} catch (error) {
DATABASE_LOGGER.logMessageWithEmoji(
`Elasticsearch connection health check failed: ${error.message}`,
`Elasticsearch health check failed: ${error.message}`,
true,
GENERIC_EMOJIS.EMOJI_CROSS_MARK,
LOG_LEVELS_STR.LEVEL_DEBUG
Expand Down Expand Up @@ -277,9 +262,7 @@
DATABASE_LOGGER.logMessageWithEmoji(
`Elasticsearch connection established successfully to ${this.maskUrl(
config.url
)} (attempt ${this.connectionAttempts}/${
finalConfig.maxRetries
}) last successful connection ${this.lastConnectionTime}`,
)} (attempt ${this.connectionAttempts}) last successful connection ${this.lastConnectionTime}`,
true,
GENERIC_EMOJIS.EMOJI_CHECK_MARK,
LOG_LEVELS_STR.LEVEL_INFO
Expand All @@ -290,9 +273,7 @@
DATABASE_LOGGER.logMessageWithEmoji(
`Failed to connect to Elasticsearch at ${this.maskUrl(config.url)} (attempt ${
this.connectionAttempts
}/${finalConfig.maxRetries}) last successful connection ${
this.lastConnectionTime
}: ${error.message}`,
}) last successful connection ${this.lastConnectionTime}: ${error.message}`,
true,
GENERIC_EMOJIS.EMOJI_CROSS_MARK,
LOG_LEVELS_STR.LEVEL_ERROR
Expand Down Expand Up @@ -323,6 +304,7 @@
this.client = null
this.config = null
}
this.connectionLostEmitted = false
}

public getConnectionStats(): {
Expand Down
Loading