Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions app/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ configurations.configureEach {
exclude(module = "commons-logging")
}

val canonicalVersionCode = 436
val canonicalVersionName = "1.30.3"
val canonicalVersionCode = 437
val canonicalVersionName = "1.31.0"

val postFixSize = 10
val abiPostFix = mapOf(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
package org.session.libsession.messaging.sending_receiving.pollers

import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.session.libsignal.utilities.Log
import org.thoughtcrime.securesms.util.AppVisibilityManager
import org.thoughtcrime.securesms.util.NetworkConnectivity
import kotlin.time.Clock
import kotlin.time.Instant

/**
* Base class for pollers that perform periodic polling operations. These poller will:
*
* 1. Run periodically when the app is in the foreground and there is network.
* 2. Adjust the polling interval based on success/failure of previous polls.
* 3. Expose the current polling state via [pollState]
* 4. Allow manual polling via [manualPollOnce]
*
* @param T The type of the result returned by the single polling.
*/
abstract class BasePoller<T>(
private val networkConnectivity: NetworkConnectivity,
appVisibilityManager: AppVisibilityManager,
private val scope: CoroutineScope,
) {
protected val logTag: String = this::class.java.simpleName
private val pollMutex = Mutex()

private val mutablePollState = MutableStateFlow<PollState<T>>(PollState.Idle)

/**
* The current state of the poller.
*/
val pollState: StateFlow<PollState<T>> get() = mutablePollState

init {
scope.launch {
var numConsecutiveFailures = 0

while (true) {
// Wait until the app is in the foreground and we have network connectivity
combine(
appVisibilityManager.isAppVisible.filter { visible ->
if (visible) {
true
} else {
Log.d(logTag, "Polling paused - app in background")
false
}
},
networkConnectivity.networkAvailable.filter { hasNetwork ->
if (hasNetwork) {
true
} else {
Log.d(logTag, "Polling paused - no network connectivity")
false
}
},
transform = { _, _ -> }
).first()

try {
pollOnce("routine")
numConsecutiveFailures = 0
} catch (e: CancellationException) {
throw e
} catch (_: Throwable) {
numConsecutiveFailures += 1
}

val nextPollSeconds = nextPollDelaySeconds(numConsecutiveFailures)
Log.d(logTag, "Next poll in ${nextPollSeconds}s")
delay(nextPollSeconds * 1000L)
}
}
}

protected open val successfulPollIntervalSeconds: Int get() = 2
protected open val maxRetryIntervalSeconds: Int get() = 10

/**
* Returns the delay until the next poll should be performed.
*
* @param numConsecutiveFailures The number of consecutive polling failures that have occurred.
* 0 indicates the last poll was successful.
*/
private fun nextPollDelaySeconds(
numConsecutiveFailures: Int,
): Int {
val delay = successfulPollIntervalSeconds * (numConsecutiveFailures + 1)
return delay.coerceAtMost(maxRetryIntervalSeconds)
}

/**
* Performs a single polling operation. A failed poll should throw an exception.
*
* @param isFirstPollSinceApoStarted True if this is the first poll since the app started.
* @return The result of the polling operation.
*/
protected abstract suspend fun doPollOnce(isFirstPollSinceApoStarted: Boolean): T

private suspend fun pollOnce(reason: String): T {
pollMutex.withLock {
val lastState = mutablePollState.value
mutablePollState.value =
PollState.Polling(reason, lastPolledResult = lastState.lastPolledResult)
Log.d(logTag, "Start $reason polling")
val result = runCatching {
doPollOnce(isFirstPollSinceApoStarted = lastState is PollState.Idle)
}

if (result.isSuccess) {
Log.d(logTag, "$reason polling succeeded")
} else if (result.exceptionOrNull() !is CancellationException) {
Log.e(logTag, "$reason polling failed", result.exceptionOrNull())
}

mutablePollState.value = PollState.Polled(
at = Clock.System.now(),
result = result,
)

return result.getOrThrow()
}
}

/**
* Manually triggers a single polling operation.
*
* Note:
* * If a polling operation is already in progress, this will wait for it to complete first.
* * This method does not check for app foreground/background state or network connectivity.
* * This method will throw if the polling operation fails.
*/
suspend fun manualPollOnce(): T {
val resultChannel = Channel<Result<T>>()

scope.launch {
resultChannel.trySend(runCatching {
pollOnce("manual")
})
}

return resultChannel.receive().getOrThrow()
}


sealed interface PollState<out T> {
val lastPolledResult: Result<T>?

object Idle : PollState<Nothing> {
override val lastPolledResult: Result<Nothing>?
get() = null
}

data class Polled<T>(
val at: Instant,
val result: Result<T>,
) : PollState<T> {
override val lastPolledResult: Result<T>
get() = result
}

data class Polling<T>(
val reason: String,
override val lastPolledResult: Result<T>?,
) : PollState<T>
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,7 @@ import dagger.assisted.Assisted
import dagger.assisted.AssistedFactory
import dagger.assisted.AssistedInject
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.stateIn
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit
Expand All @@ -38,10 +30,9 @@ import org.session.libsession.utilities.withUserConfigs
import org.session.libsignal.utilities.Log
import org.thoughtcrime.securesms.database.CommunityDatabase
import org.thoughtcrime.securesms.util.AppVisibilityManager
import org.thoughtcrime.securesms.util.NetworkConnectivity
import javax.inject.Provider

private typealias PollRequestToken = Channel<Result<List<String>>>

/**
* A [OpenGroupPoller] is responsible for polling all communities on a particular server.
*
Expand All @@ -52,7 +43,6 @@ private typealias PollRequestToken = Channel<Result<List<String>>>
*/
class OpenGroupPoller @AssistedInject constructor(
private val storage: StorageProtocol,
private val appVisibilityManager: AppVisibilityManager,
private val configFactory: ConfigFactoryProtocol,
private val trimThreadJobFactory: TrimThreadJob.Factory,
private val openGroupDeleteJobFactory: OpenGroupDeleteJob.Factory,
Expand All @@ -63,76 +53,22 @@ class OpenGroupPoller @AssistedInject constructor(
private val getDirectMessageFactory: GetDirectMessagesApi.Factory,
private val pollRoomInfoFactory: PollRoomApi.Factory,
private val getCapsApi: Provider<GetCapsApi>,
networkConnectivity: NetworkConnectivity,
appVisibilityManager: AppVisibilityManager,
private val json: Json,
@Assisted private val server: String,
@Assisted private val scope: CoroutineScope,
@Assisted private val pollerSemaphore: Semaphore,
): BasePoller<Unit>(
networkConnectivity = networkConnectivity,
scope = scope,
appVisibilityManager = appVisibilityManager
) {
companion object {
private const val POLL_INTERVAL_MILLS: Long = 4000L
const val MAX_INACTIVITIY_PERIOD_MILLS = 14 * 24 * 60 * 60 * 1000L // 14 days

private const val TAG = "OpenGroupPoller"
}

private val pendingPollRequest = Channel<PollRequestToken>()
override val successfulPollIntervalSeconds: Int
get() = 4

@OptIn(ExperimentalCoroutinesApi::class)
val pollState: StateFlow<PollState> = flow {
val tokens = arrayListOf<PollRequestToken>()

while (true) {
// Wait for next request(s) to come in
tokens.clear()
tokens.add(pendingPollRequest.receive())
tokens.addAll(generateSequence { pendingPollRequest.tryReceive().getOrNull() })

Log.d(TAG, "Polling open group messages for server: $server")
emit(PollState.Polling)
val pollResult = runCatching {
pollerSemaphore.withPermit {
pollOnce()
}
}
tokens.forEach { it.trySend(pollResult) }
emit(PollState.Idle(pollResult))

pollResult.exceptionOrNull()?.let {
Log.e(TAG, "Error while polling open groups for $server", it)
}

}
}.stateIn(scope, SharingStarted.Eagerly, PollState.Idle(null))

init {
// Start a periodic polling request when the app becomes visible
scope.launch {
appVisibilityManager.isAppVisible
.collectLatest { visible ->
if (visible) {
while (true) {
val r = requestPollAndAwait()
if (r.isSuccess) {
delay(POLL_INTERVAL_MILLS)
} else {
delay(2000L)
}
}
}
}
}
}

/**
* Requests a poll and await for the result.
*
* The result will be a list of room tokens that were polled.
*/
suspend fun requestPollAndAwait(): Result<List<String>> {
val token: PollRequestToken = Channel()
pendingPollRequest.send(token)
return token.receive()
}
override val maxRetryIntervalSeconds: Int
get() = 30

private fun handleRoomPollInfo(
address: Address.Community,
Expand All @@ -147,7 +83,7 @@ class OpenGroupPoller @AssistedInject constructor(
*
* @return A list of rooms that were polled.
*/
private suspend fun pollOnce(): List<String> {
override suspend fun doPollOnce(isFirstPollSinceApoStarted: Boolean): Unit = pollerSemaphore.withPermit {
val allCommunities = configFactory.withUserConfigs { it.userGroups.allCommunityInfo() }

val rooms = allCommunities
Expand All @@ -158,7 +94,7 @@ class OpenGroupPoller @AssistedInject constructor(
}?.community?.pubKeyHex

if (rooms.isEmpty() || serverKey.isNullOrBlank()) {
return emptyList()
return
}

coroutineScope {
Expand Down Expand Up @@ -255,8 +191,6 @@ class OpenGroupPoller @AssistedInject constructor(
}
}
}

return rooms
}


Expand Down Expand Up @@ -285,7 +219,7 @@ class OpenGroupPoller @AssistedInject constructor(
)
} catch (e: Exception) {
Log.e(
TAG,
logTag,
"Error processing open group message ${msg.id} in ${threadAddress.debugString}",
e
)
Expand Down Expand Up @@ -317,7 +251,7 @@ class OpenGroupPoller @AssistedInject constructor(

val serverPubKeyHex = storage.getOpenGroupPublicKey(server)
?: run {
Log.e(TAG, "No community server public key cannot process inbox messages")
Log.e(logTag, "No community server public key cannot process inbox messages")
return
}

Expand All @@ -334,7 +268,7 @@ class OpenGroupPoller @AssistedInject constructor(
)

} catch (e: Exception) {
Log.e(TAG, "Error processing inbox message", e)
Log.e(logTag, "Error processing inbox message", e)
}
}
}
Expand All @@ -351,7 +285,7 @@ class OpenGroupPoller @AssistedInject constructor(

val serverPubKeyHex = storage.getOpenGroupPublicKey(server)
?: run {
Log.e(TAG, "No community server public key cannot process inbox messages")
Log.e(logTag, "No community server public key cannot process inbox messages")
return
}

Expand All @@ -368,18 +302,12 @@ class OpenGroupPoller @AssistedInject constructor(
)

} catch (e: Exception) {
Log.e(TAG, "Error processing outbox message", e)
Log.e(logTag, "Error processing outbox message", e)
}
}
}
}


sealed interface PollState {
data class Idle(val lastPolled: Result<List<String>>?) : PollState
data object Polling : PollState
}

@AssistedFactory
interface Factory {
fun create(
Expand Down
Loading
Loading