Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make resulting effect cancellable again #192

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

cornerman
Copy link

Currently, any effect created by the async macro is not cancelable. This PR fixes that.

Probably a regression after the semantics of async_/async changed in regards to cancellability.

I have also fixed the test for "be cancellable" - which was not asserting correctly.

@cornerman
Copy link
Author

Somehow the "be cancellable" test is now flaky - I could make it pass reliably with this change: 99e8dbf.

But this looks suspicious to me, maybe there is something else going on?

@cornerman
Copy link
Author

I have added a potential fix, which makes the test pass without the above mentioned workaround. See: c4e98a8

There, I have introduced a stopSignal to cancel tasks running in the dispatcher, when the actual effect is cancelled. Please let me know, if this makes sense or whether there is a better fix. From what I can see, it does solve the issue at hand.

@@ -61,20 +62,20 @@ abstract class AsyncAwaitStateMachine[F[_]](
// as inspecting the Succeeded outcome using dispatcher is risky on algebraic sums,
// such as OptionT, EitherT, ...
var awaitedValue: Option[AnyRef] = None
F.uncancelable { poll =>
F.race(stopSignal, F.uncancelable { poll =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the race required here? Shouldn't Dispatcher itself shut things down?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought the same, but this comment indicates that maybe not : #192 (comment)

Copy link
Author

@cornerman cornerman Jan 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, this is referencing the mentioned workaround. Looking at the commit before, there is a green checkmark, but actually the test is skipped (it is timing out!). See: https://github.com/typelevel/cats-effect-cps/actions/runs/7575437745/job/20632044148#step:10:64

The test gets stuck somewhere when waiting for the defer.get (in the test) - and my debugging journey led me to believe that the dispatcher is working on some task and is not shutting down.

This is not always happening, which is why I did not realize it initially when testing locally.

Copy link
Author

@cornerman cornerman Jan 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is interesting though:

  • Why does this workaround (99e8dbf) in the test fix the issue as well?
  • When replacing the defer.get (in the test) with sleep of around 300 millis, it also reliably works in the test - so the dispatcher is normally stopping, but maybe something can deadlock here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering, whether it makes sense to first merge the change from F.async_ to a cancellable F.async. Then we can discuss the remaining issues in a separate issue.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cornerman was the original test passing consistently after you changed the async call ? Because if that was the case, sure, please open a more atomic PR

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Baccata 👍 Yes, it was. I have now updated the PR to only contain one fix. Also leaving the existing test in place and instead adding a new passing test - though this one has a TODO that we should tackle another time.

@@ -99,7 +99,7 @@ object AsyncAwaitDsl {
}
${F}.flatten {
_root_.cats.effect.std.Dispatcher.sequential[$effect].use { dispatcher =>
${F}.async_[$name#FF[AnyRef]](cb => new $name(dispatcher, cb).start())
${F}.async[$name#FF[AnyRef]](cb => ${F}.delay { new $name(dispatcher, cb).start(); Some(${F}.unit) })
Copy link
Collaborator

@Baccata Baccata Jan 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wary of the F.unit call here : I think what should happen instead is some mutation to the StateMachine that could/would be inspected in any subsequent step in the state machine to short circuit the computation

it'd look like :

${F}.delay {
  val stateMachine = $name(dispatcher, cb) 
  stateMachine.start()
  Some(${F}.delay(stateMachine.cancel()))
} 

Then in AsyncStateMachine, you'd have something like

private val isCancelled = new AtomicBoolean(false)

def cancel() : Unit = isCancelled.set(true) 

def onComplete(f: F[AnyRef]) : Unit = {
  // short-circuits the next call to dispatcher 
  if (isCancelled.get) {
    // surfaces the cancellation to the `async` invocation. 
    this(Left(F.canceled.asInstanceOf[F[AnyRef]]))
  } else dispatcher.unsafeRunAndForget(...)
} 

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@djspiewak, mind giving a sanity to what I said above ? Also maybe an AtomicBoolean is overkill and a simple @volatile var boolean might do the trick.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is exactly correct. What's being revealed here is the fact that AsyncStateMachine at present has no support for cancelation, but the old properties of async_ caused that reality to be ignored, effectively leaking fibers (and potentially even creating thread starvation issues) when async/await blocks were canceled.

Also yes, a volatile should be mostly sufficient in this case. You're really just looking for a flag to make it stop. The one trick though is you'll need to make sure the finalizer itself asynchronously blocks until the state machine has been fully interrupted, which obviously might take a bit.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants