Skip to content

Conversation

@ZeroIntensity
Copy link
Member

@ZeroIntensityZeroIntensity commented Sep 23, 2024

Copy link
Contributor

@graingertgraingert left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're using private methods of TaskGroup and starting tasks on the loop rather than the TaskGroup

@ZeroIntensity
Copy link
MemberAuthor

I think I'm just going to refactor this to not use TaskGroup. It's causing more problems than solutions.

@graingert
Copy link
Contributor

I think it's worth persevering with TaskGroup, you just need to write it without using add_done_callback or private attributes

@ZeroIntensity
Copy link
MemberAuthor

I'll try it, but I'm worried that it isn't possible when considering an eager task factory. The previous implementation used a variation of a task group (a list containing tasks, since it predated asyncio.TaskGroup) but broke due to the reliance of tasks not starting before the event loop is called.

While we're here, staggered_race is undocumented -- might that be something worth addressing in this PR?

@graingert
Copy link
Contributor

A demo of what I mean wrt TaskGroup:

"""Support for running coroutines in parallel with staggered start times."""__all__='staggered_race', from . importlocksfrom . importtasksfrom . importtaskgroupsasyncdefstaggered_race(coro_fns, delay, *, loop=None): """Run coroutines with staggered start times and take the first to finish. This method takes an iterable of coroutine functions. The first one is started immediately. From then on, whenever the immediately preceding one fails (raises an exception), or when *delay* seconds has passed, the next coroutine is started. This continues until one of the coroutines complete successfully, in which case all others are cancelled, or until all coroutines fail. The coroutines provided should be well-behaved in the following way: * They should only ``return`` if completed successfully. * They should always raise an exception if they did not complete successfully. In particular, if they handle cancellation, they should probably reraise, like this:: try: # do work except asyncio.CancelledError: # undo partially completed work raise Args: coro_fns: an iterable of coroutine functions, i.e. callables that return a coroutine object when called. Use ``functools.partial`` or lambdas to pass arguments. delay: amount of time, in seconds, between starting coroutines. If ``None``, the coroutines will run sequentially. loop: the event loop to use. Returns: tuple *(winner_result, winner_index, exceptions)* where - *winner_result*: the result of the winning coroutine, or ``None`` if no coroutines won. - *winner_index*: the index of the winning coroutine in ``coro_fns``, or ``None`` if no coroutines won. If the winning coroutine may return None on success, *winner_index* can be used to definitively determine whether any coroutine won. - *exceptions*: list of exceptions returned by the coroutines. ``len(exceptions)`` is equal to the number of coroutines actually started, and the order is the same as in ``coro_fns``. The winning coroutine's entry is ``None``. """# TODO: when we have aiter() and anext(), allow async iterables in coro_fns.winner_result=Nonewinner_index=Noneexceptions= [] class_Done(Exception): passasyncdefrun_one_coro(this_index, coro_fn, this_failed): try: result=awaitcoro_fn() except (SystemExit, KeyboardInterrupt): raiseexceptBaseExceptionase: exceptions[this_index] =ethis_failed.set() # Kickstart the next coroutineelse: # Store winner's resultsnonlocalwinner_index, winner_result# There could be more than one winnerwinner_index=this_indexwinner_result=resultraise_Donetry: asyncwithtaskgroups.TaskGroup() astg: forthis_index, coro_fninenumerate(coro_fns): this_failed=locks.Event() exceptions.append(None) tg.create_task(run_one_coro(this_index, coro_fn, this_failed)) try: awaittasks.wait_for(this_failed.wait(), delay) exceptTimeoutError: pass except* _Done: passreturnwinner_result, winner_index, exceptions

Copy link
Contributor

@willingcwillingc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@miss-islington-app
Copy link

Thanks @ZeroIntensity for the PR, and @kumaraditya303 for merging it 🌮🎉.. I'm working now to backport this PR to: 3.12, 3.13.
🐍🍒⛏🤖

miss-islington pushed a commit to miss-islington/cpython that referenced this pull request Sep 26, 2024
…port eager task factories (pythonGH-124390) (cherry picked from commit de929f3) Co-authored-by: Peter Bierma <[email protected]> Co-authored-by: Thomas Grainger <[email protected]> Co-authored-by: Jelle Zijlstra <[email protected]> Co-authored-by: Carol Willing <[email protected]> Co-authored-by: Kumar Aditya <[email protected]>
@miss-islington-app
Copy link

Sorry, @ZeroIntensity and @kumaraditya303, I could not cleanly backport this to 3.12 due to a conflict.
Please backport using cherry_picker on command line.

cherry_picker de929f353c413459834a2a37b2d9b0240673d874 3.12 

@bedevere-app
Copy link

GH-124573 is a backport of this pull request to the 3.13 branch.

@bedevere-app
Copy link

GH-124574 is a backport of this pull request to the 3.12 branch.

@bedevere-appbedevere-appbot removed the needs backport to 3.12 only security fixes label Sep 26, 2024
kumaraditya303 added a commit that referenced this pull request Sep 26, 2024
…pport e… (#124574) gh-124309: Modernize the `staggered_race` implementation to support eager task factories (#124390) Co-authored-by: Thomas Grainger <[email protected]> Co-authored-by: Jelle Zijlstra <[email protected]> Co-authored-by: Carol Willing <[email protected]> Co-authored-by: Kumar Aditya <[email protected]> (cherry picked from commit de929f3) Co-authored-by: Peter Bierma <[email protected]>
@ZeroIntensityZeroIntensity deleted the fix-staggered-race-eager-task-factory branch September 26, 2024 10:05
ZeroIntensity added a commit to ZeroIntensity/cpython that referenced this pull request Sep 30, 2024
…n to support eager task factories (python#124390)" This reverts commit de929f3.
Yhg1s pushed a commit that referenced this pull request Oct 1, 2024
…am (#124810) * Revert "GH-124639: add back loop param to staggered_race (#124700)" This reverts commit e0a41a5. * Revert "gh-124309: Modernize the `staggered_race` implementation to support eager task factories (#124390)" This reverts commit de929f3.
miss-islington pushed a commit to miss-islington/cpython that referenced this pull request Oct 1, 2024
…wnstream (pythonGH-124810) * Revert "pythonGH-124639: add back loop param to staggered_race (pythonGH-124700)" This reverts commit e0a41a5. * Revert "pythongh-124309: Modernize the `staggered_race` implementation to support eager task factories (pythonGH-124390)" This reverts commit de929f3. (cherry picked from commit 133e929) Co-authored-by: Peter Bierma <[email protected]>
Yhg1s pushed a commit that referenced this pull request Oct 1, 2024
…ownstream (GH-124810) (#124817) gh-124309: Revert eager task factory fix to prevent breaking downstream (GH-124810) * Revert "GH-124639: add back loop param to staggered_race (GH-124700)" This reverts commit e0a41a5. * Revert "gh-124309: Modernize the `staggered_race` implementation to support eager task factories (GH-124390)" This reverts commit de929f3. (cherry picked from commit 133e929) Co-authored-by: Peter Bierma <[email protected]>
Sign up for freeto join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants

@ZeroIntensity@graingert@JelleZijlstra@willingc@kumaraditya303