fix(execute_class): add async lock to prevent double deploy#274
fix(execute_class): add async lock to prevent double deploy#274
Conversation
runpod-Henrik
left a comment
There was a problem hiding this comment.
1. The fix — correct
Double-checked locking is the right pattern here. Fast path avoids lock overhead on every method call after initialization; slow path acquires the lock and re-checks before deploying.
if self._initialized: # fast path — no lock after init
return
async with self._init_lock:
if self._initialized: # re-check after acquiring lock
return
...
self._initialized = True # set only after stub is readyThree details that are all correct:
asyncio.Lock()in__init__— safe on Python ≥3.10 (Flash's minimum). In 3.10+, locks bind lazily to the running loop on first await, not at construction time. NoDeprecationWarningorRuntimeError._initialized = Trueafterstub_resource()— if stub creation raises,_initializedstays False and the lock is released byasync with. Retry works correctly. This is tested bytest_deploy_failure_releases_lock_and_allows_retry._init_locknot accessible via__getattr__—__getattr__only fires on missing attributes. Since_init_lockis set in__init__it's found by normal attribute lookup before__getattr__is called. No interaction.
2. Question: asyncio.sleep(0.05) timing assumption in concurrency test
test_concurrent_calls_deploy_only_once creates two tasks and sleeps 50ms to let both reach the gate:
task1 = asyncio.create_task(wrapper_instance._ensure_initialized())
task2 = asyncio.create_task(wrapper_instance._ensure_initialized())
await asyncio.sleep(0.05) # hope both tasks reached gate.wait() by now
gate.set()If the host is slow (loaded CI runner), task2 may not have reached await gate.wait() before gate.set() fires — task2 then starts after _initialized is already True and the test still passes, but it no longer proves the lock works. The test becomes a timing-sensitive no-op rather than a race proof.
A more reliable pattern uses a counter to confirm both tasks are in-flight before releasing:
arrived = 0
all_arrived = asyncio.Event()
async def slow_deploy(config):
nonlocal deploy_call_count, arrived
deploy_call_count += 1
arrived += 1
if arrived >= 2:
all_arrived.set()
await gate.wait()
return MagicMock()
# After creating tasks, wait until both have called deploy before releasing
await all_arrived.wait()
gate.set()
await asyncio.gather(task1, task2)Not blocking — the current test catches the bug reliably on any reasonable machine — but worth knowing for CI robustness.
3. Gap: lock is per-instance, not per resource
If the same resource config is passed to two separate create_remote_class() calls, two RemoteClassWrapper instances are created with two independent _init_lock instances. Concurrent initialization of those two wrappers could still double-deploy at the ResourceManager level. That's out of scope for this PR — but worth confirming: does ResourceManager.get_or_deploy_resource guard against concurrent deploys for the same resource config from different wrapper instances? If not, that's a separate ticket.
4. Tests — solid
Four tests covering concurrent deploy-once, flag set correctly, idempotency, and failure-path lock release. The failure-path test (added in the second commit) is the most important correctness guarantee and it's well structured.
Verdict
PASS. The fix is correct, uses the right asyncio primitives for Python ≥3.10, and the flag ordering is right. Two asks: (1) acknowledge the timing assumption in the concurrency test or use the counter pattern above, and (2) confirm whether ResourceManager provides the cross-instance guarantee, or file a follow-up ticket if it doesn't.
🤖 Reviewed by Henrik's AI-Powered Bug Finder
There was a problem hiding this comment.
Pull request overview
This PR addresses AE-2370 by preventing concurrent _ensure_initialized() calls on the same RemoteClassWrapper instance from triggering multiple deployments, using an asyncio.Lock with double-checked locking.
Changes:
- Add an
asyncio.Lock(self._init_lock) toRemoteClassWrapperand guard_ensure_initialized()with double-checked locking. - Add bug-probe tests validating single-deploy behavior under concurrent calls and retry behavior after transient deploy failures.
Reviewed changes
Copilot reviewed 2 out of 3 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
src/runpod_flash/execute_class.py |
Adds an async init lock and wraps initialization to prevent double deployment under concurrency. |
tests/bug_probes/test_class_execution.py |
Adds async race-condition regression tests for _ensure_initialized() (including retry-on-failure). |
tests/bug_probes/__init__.py |
Initializes the new bug_probes test package (empty file). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| """AE-2370: _ensure_initialized has no async lock — concurrent calls cause double deploy. | ||
|
|
||
| Without a lock, two concurrent calls to _ensure_initialized both pass | ||
| the `if not self._initialized` check and both call get_or_deploy_resource, | ||
| causing a double deploy and orphaning one stub. | ||
| """ |
| task1 = asyncio.create_task(wrapper_instance._ensure_initialized()) | ||
| task2 = asyncio.create_task(wrapper_instance._ensure_initialized()) | ||
|
|
||
| await asyncio.sleep(0.05) | ||
| gate.set() | ||
|
|
||
| await asyncio.gather(task1, task2) |
| @pytest.mark.asyncio | ||
| async def test_deploy_failure_releases_lock_and_allows_retry( | ||
| self, wrapper_instance | ||
| ): | ||
| """If deploy fails, the lock must be released and a subsequent call must retry.""" | ||
| call_count = 0 |
c5ca393 to
b076846
Compare
runpod-Henrik
left a comment
There was a problem hiding this comment.
Follow-up on prior review
The four tests are solid — flag-set, idempotency, concurrent deploy-once, and the failure-path retry are all covered.
Two open items from the prior review remain, both non-blocking:
-
asyncio.sleep(0.05)timing — counter pattern not adopted. The sleep-based approach works in practice but can silently degrade to a no-op on a loaded CI runner where task2 doesn't reachgate.wait()beforegate.set()fires. Low risk, but worth knowing. -
Cross-instance double-deploy — if two separate
create_remote_class()calls produce twoRemoteClassWrapperinstances for the same resource config, they each have their own_init_lockand could still race at theResourceManagerlevel. Confirming whetherResourceManager.get_or_deploy_resourceguards this case, or filing a follow-up ticket if it doesn't, would close the loop.
Verdict: PASS — fix is correct, lock ordering is right.
🤖 Reviewed by Henrik's AI-Powered Bug Finder
…double deploy Without a lock, concurrent calls to _ensure_initialized both pass the check and both call get_or_deploy_resource, wasting resources and orphaning one stub. Uses double-checked locking: fast-path check before lock acquisition, second check inside the lock. Closes AE-2370
- Replace misleading carried-over comment with accurate description - Add inline comments explaining double-checked locking pattern - Add failure-path test: deploy exception releases lock, allows retry
b076846 to
1d97119
Compare
Summary
asyncio.LocktoRemoteClassWrapper._ensure_initialized()to prevent concurrent calls from both deploying resources (AE-2370)if self._initializedcheck before lock acquisition, second check inside the lockTestNEW1_EnsureInitializedRacewith 3 tests validating the fixWhat was happening
Two concurrent requests both pass the
if not self._initializedcheck, both callget_or_deploy_resource, both deploy — wasting resources and orphaning one stub. The second assignment silently overwrites the first.Changes
execute_class.pyimport asyncio,self._init_lockin__init__, wrap_ensure_initializedwith double-checked locktests/bug_probes/test_class_execution.pyTest plan
make quality-checkpasses (85.50% coverage)TestNEW1_EnsureInitializedRacevalidates concurrent calls deploy exactly onceCloses AE-2370