Add coalesce service and improve error handling#483
Conversation
There was a problem hiding this comment.
Pull request overview
This PR introduces a new CoalesceService to deduplicate concurrent background jobs (by key) and refactors existing background-job code to use it instead of explicit locking, alongside a dedicated async executor for coalesced work.
Changes:
- Added
CoalesceServicefor keyed coalescing of concurrent task submissions. - Added a dedicated
coalesceExecutorthread pool for running coalesced tasks. - Refactored
CacheServiceandRelayDetailsUpdateServiceto submit work throughCoalesceService.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 6 comments.
| File | Description |
|---|---|
| backend/src/main/kotlin/org/tormap/service/CoalesceService.kt | New service to coalesce concurrent executions per key and rerun once if requests arrive while running. |
| backend/src/main/kotlin/org/tormap/config/AyncConfig.kt | Adds a coalesceExecutor thread pool for coalesced async work. |
| backend/src/main/kotlin/org/tormap/service/CacheService.kt | Replaces manual lock/retry logic with coalesced task submission for cache rebuild jobs. |
| backend/src/main/kotlin/org/tormap/service/RelayDetailsUpdateService.kt | Replaces explicit locking/try-catch concurrency handling with coalesced task submission per month. |
Comments suppressed due to low confidence (1)
backend/src/main/kotlin/org/tormap/service/CacheService.kt:51
cacheRelayLocationsPerDay()now schedules work viacoalesceService.submit(...)and then returnsCompletableFuture.completedFuture(null)immediately, so callers that wait on the returned future will proceed before caches are populated (CacheServiceTest uses.get()and will observe empty caches intermittently/consistently). Return a future that completes when all submitted month jobs complete (e.g.,CompletableFuture.allOf(...)over the returned futures), or remove this method’s@Asyncand make the coalesced work happen within the awaited call.
fun cacheRelayLocationsPerDay(months: Set<String>): CompletableFuture<Void> {
logger.info("Caching relay locations for each day of months: {}", months.joinToString(", "))
months.forEach { month ->
coalesceService.submit("cacheRelayLocationsPerDay-$month") {
val yearMonth = YearMonth.parse(month)
yearMonth.atDay(1).datesUntil(yearMonth.plusMonths(1).atDay(1)).forEach {
val day = it.toString()
val relayLocations = relayLocationRepositoryImpl.findAllUsingDay(LocalDate.parse(day))
if (relayLocations.isNotEmpty()) {
cacheManager.getCache(CacheConfig.RELAY_LOCATIONS_PER_DAY)?.put(
day,
relayLocations
)
}
}
}
}
return CompletableFuture.completedFuture(null)
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
@copilot analyze failing tests and apply a fix if necessary. Add new tests for the new CoalesceService to test if the service is bug free. |
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Agent-Logs-Url: https://github.com/TorMap/tormap/sessions/64594795-80c6-4518-b53d-33f357564940 Co-authored-by: JuliusHenke <23460202+JuliusHenke@users.noreply.github.com>
…mantics Agent-Logs-Url: https://github.com/TorMap/tormap/sessions/64594795-80c6-4518-b53d-33f357564940 Co-authored-by: JuliusHenke <23460202+JuliusHenke@users.noreply.github.com>
Implemented and pushed fixes in Warning Firewall rules blocked me from connecting to one or more addresses (expand for details)I tried to connect to the following addresses, but was blocked by firewall rules:
If you need me to access, download, or install something from one of these locations, you can either:
|
…eature/coalesce-service # Conflicts: # backend/src/main/kotlin/org/tormap/service/CacheService.kt # backend/src/main/kotlin/org/tormap/service/CoalesceService.kt
…eature/coalesce-service
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (2)
backend/src/main/kotlin/org/tormap/service/CoalesceService.kt:31
submitAsyncreturns acompletedFuturewhen the key is already running. That means callers may observe completion even though their submission only setpending=trueand a rerun will execute later, which breaks expected semantics for returned futures (e.g.,allOf(...).get()can return before the pending rerun finished). Consider returning a future that completes when the current coalesced cycle (including any pending rerun) finishes, even for submissions made while running.
synchronized(state.monitor) {
if (state.running) {
state.pending = true
return CompletableFuture.completedFuture(null)
}
backend/src/main/kotlin/org/tormap/service/CoalesceService.kt:45
- Exceptions thrown by
task()are caught and only logged, then the loop breaks. Because the exception is swallowed, the returnedCompletableFuturewill complete normally even though the task failed, making it hard for callers to detect failures (regression vs typicalCompletableFuture.runAsyncbehavior). Consider completing the returned future exceptionally (and potentially keeping the state consistent) so callers awaiting.get()can react to errors.
try {
task()
} catch (ex: Exception) {
// Keep existing behavior: stop reruns for this cycle after a failed execution.
logger.error("Coalesced task failed for key={}", key, ex)
break
}
| logger.info("Caching relay locations for each day of months: {}", months.joinToString(", ")) | ||
| val futures = months.map { month -> | ||
| coalesceService.submitAsync("cacheRelayLocationsPerDay-$month") { | ||
| val yearMonth = YearMonth.parse(month) | ||
| yearMonth.atDay(1).datesUntil(yearMonth.plusMonths(1).atDay(1)).forEach { | ||
| val day = it.toString() | ||
| val relayLocations = relayLocationRepositoryImpl.findAllUsingDay(LocalDate.parse(day)) | ||
| if (relayLocations.isNotEmpty()) { | ||
| cacheManager.getCache(CacheConfig.RELAY_LOCATIONS_PER_DAY)?.put( | ||
| day, | ||
| relayLocations | ||
| ) | ||
| } | ||
| } | ||
| } finally { | ||
| lockRelayLocationsPerDay.unlock() | ||
| } | ||
| } else { | ||
| logger.debug("Cache update of relay location per day already in progress. Waiting 1 second...") | ||
| Thread.sleep(1000) | ||
| cacheRelayLocationsPerDay(months) | ||
| } | ||
| return CompletableFuture.completedFuture(null) | ||
| return CompletableFuture.allOf(*futures.toTypedArray()) | ||
| } |
There was a problem hiding this comment.
@copilot apply changes based on this feedback
There was a problem hiding this comment.
Addressed in c4213cb: queued submissions now return a dedicated future that completes only after the coalesced rerun drain finishes, so cacheRelayLocationsPerDay(...).get() no longer completes early when the same month key is already running.
| private val states = ConcurrentHashMap<String, CoalesceState>() | ||
|
|
||
| fun submitAsync(key: String, task: () -> Unit): CompletableFuture<Void> { | ||
| val state = states.computeIfAbsent(key) { CoalesceState() } | ||
|
|
||
| synchronized(state.monitor) { | ||
| if (state.running) { | ||
| state.pending = true | ||
| return CompletableFuture.completedFuture(null) | ||
| } | ||
| state.running = true | ||
| } |
There was a problem hiding this comment.
@copilot apply changes based on this feedback
There was a problem hiding this comment.
Applied in 3dc9e77: CoalesceService now removes per-key state when work finishes (both normal and exception exit paths), with stale-state revalidation in submitAsync to avoid remove/recreate races. I also added a regression test to assert completed keys are cleaned up (removes key state after task completion). Targeted and full backend tests pass.
| val first = coalesceService.submitAsync("coalesce-rerun", task) | ||
|
|
||
| started.await(5, TimeUnit.SECONDS) shouldBe true | ||
| val second = coalesceService.submitAsync("coalesce-rerun", task) | ||
| release.countDown() | ||
|
|
||
| first.get(5, TimeUnit.SECONDS) | ||
| second.get(5, TimeUnit.SECONDS) | ||
| runs.get() shouldBe 2 |
There was a problem hiding this comment.
@copilot apply changes based on this feedback
There was a problem hiding this comment.
Addressed in c4213cb: I added an assertion in CoalesceServiceTest that a submission made during an active run returns a future that is not completed immediately (second.isDone shouldBe false) and only completes after the rerun executes.
Agent-Logs-Url: https://github.com/TorMap/tormap/sessions/c5300178-d1e5-42e8-89a2-87d99fbb0b06 Co-authored-by: JuliusHenke <23460202+JuliusHenke@users.noreply.github.com>
|
Just as a heads up, I was blocked by some firewall rules while working on your feedback. Expand below for details. Warning Firewall rules blocked me from connecting to one or more addresses (expand for details)I tried to connect to the following addresses, but was blocked by firewall rules:
If you need me to access, download, or install something from one of these locations, you can either:
|
Agent-Logs-Url: https://github.com/TorMap/tormap/sessions/53e205bf-1a91-4e56-9b41-7177d7a45a44 Co-authored-by: JuliusHenke <23460202+JuliusHenke@users.noreply.github.com>
|
Just as a heads up, I was blocked by some firewall rules while working on your feedback. Expand below for details. Warning Firewall rules blocked me from connecting to one or more addresses (expand for details)I tried to connect to the following addresses, but was blocked by firewall rules:
If you need me to access, download, or install something from one of these locations, you can either:
|
This pull request introduces a new
CoalesceServicefor managing and deduplicating concurrent background tasks, and refactors several services to use this mechanism instead of explicit locking. The main goals are to improve concurrency handling, simplify code, and prevent redundant executions of the same background job.Concurrency improvements and code simplification:
CoalesceService, which coalesces concurrent executions of the same task (identified by a key) and ensures only one runs at a time, with pending requests rerun after completion (backend/src/main/kotlin/org/tormap/service/CoalesceService.kt).coalesceExecutorfor running coalesced tasks, with a configurable thread pool (backend/src/main/kotlin/org/tormap/config/AyncConfig.kt).Refactoring of background jobs:
CacheServiceto remove manual locking and instead useCoalesceServicefor bothcacheRelayLocationDistinctDaysandcacheRelayLocationsPerDay, ensuring only one job per key runs at a time (backend/src/main/kotlin/org/tormap/service/CacheService.kt). [1] [2]RelayDetailsUpdateServiceto useCoalesceServiceforlookupMissingAutonomousSystemsandcomputeFamiliesmethods, removing explicit locks and error handling for concurrency (backend/src/main/kotlin/org/tormap/service/RelayDetailsUpdateService.kt). [1] [2] [3] [4] [5]These changes make background task execution safer, more robust, and easier to maintain by centralizing concurrency control.