-
Notifications
You must be signed in to change notification settings - Fork 602
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add evalFlatten
methods for stream of effects
#2851
base: main
Are you sure you want to change the base?
Conversation
|
||
/** Evaluates all inner effects concurrently, emitting the results in order. | ||
*/ | ||
def parEvalFlattenUnbounded(implicit F: Concurrent[F]) = self.parEvalMapUnbounded(identity) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, just to clarify something: compiling this stream would immediately start a background action that would launch all the actions in this stream. The inside of the auxiliary parEvalMapAction
function use an internal queue and a semaphore to control how many items from the source are running, but if the concurrency are infinite then those do not limit progress. Also nothing allows the user to control, based on the outputs of the resulting stream, if any actions from the source are delayed... Is that a desirable mode of operation?
Note that the parJoinUnbounded
relies on a single-chunk channel (a funnel) to stop streams from advancing before the consumer has pulled latest chunk. So, the semantics of that unbounded is to launch all source streams to, but contending for that channel. This unbounded, on the other hand, would be "pull all the F[O]
actions from the stream, and start all of them. So, operation of those actions Furthermore, without the consumer being able to control progress of the source, this stream offers no means for back-pressure. If the consumer is too slow, this operation is going to fill that unbounded buffer until it exhaust memory...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this method is as unsafe as other *Unbounded
methods. All concurrent methods: parJoin
, parEvalMap
and other - are effectively push based and the only thing that guards users from launching too many computations is semaphore
. In case of *Unbounded
semaphore isn't working, so they all should be unsafe in that regard.
However, I see you make a distinction between parJoinUnbounded
and parEvalMapUnbounded
in a way they block on Channel
. I think this functionality isn't intended as a protection and shouldn't be relied on.
Personally, when I use par*Unbounded
methods I assume that the stream won't be parallel enough to overwhelm the consumer and choose not to bother restricting the level of parallelism. Otherwise, I would have to widen interfaces(functions or constructors) with the configuration of parallelism. Because when you really want to restrict parallelism, you should make that restriction configurable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally, when I use par*Unbounded methods I assume that the stream won't be parallel enough to overwhelm the consumer and choose not to bother restricting the level of parallelism
There are two mixed concerns here: parallelism, and buffering vs backpressuring.
In parJoinUnbounded
you take an outer stream of sources of data, and it launches all sources to pull, and push into the queue, in parallel. That outer stream may not end and keep incorporating new sources; so parJoinUnbounded
does not limit parallelism. However, what it does restrict, by means of that output queue, is how many items are pulled from each source into memory.
The problem with this new combinator is that it would prefetch and load all data from its source into local memory, without any feedback from the consumer to stop it. Thus, a short slowdown in the consumer would cause this combinator to accrue a lot of data, and thus crash the program. A basic reliability guideline is to avoid infinite buffers.
However, I see you make a distinction between parJoinUnbounded and parEvalMapUnbounded in how they block on Channel. I don't think this functionality is intended as a protection and should be relied on.
Given that back-pressuring and laziness is an essential part of fs2 streams, building that check and balance into the pipeline seems to me a crucial part of that combinator.
As an aside, there are FS2 combinators that do fetch a lot of data, like prefetchAll
, but those are some legacies. Also, some choices of parameters in other combinators can cause trouble, but those cannot be avoided.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the radio silence on this!
Yes, the use case I have in mind for this is when backpressure is provided upstream - the case I have in mind (which occurs for example in fs2-kafka) is where you have an operation that returns F[F[Result]]
, where the outer F
does a backpressured enqueue operation and then returns the inner F
which is a handle for waiting on the result - so there is no need for backpressure on the inner F
.
This should definitely have documentation and an example though, so I'm going to remove parEvalFlattenUnbounded
from this PR and maybe put it in a separate one later.
@@ -4117,6 +4114,21 @@ object Stream extends StreamLowPriority { | |||
parJoin(Int.MaxValue) | |||
} | |||
|
|||
/** Provides syntax for a stream of `F` effects. */ | |||
implicit class StreamFOps[F[_], O](private val self: Stream[F, F[O]]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For what it's worth, I've regretted using this shape of stream every time. You can run yourself out of memory really quickly if those inner F[A]
are nontrivial and you have a lot of them.
I'm uncomfortable with encoding this in the library in a way that makes it easier to use, because I personally think this shape should be discouraged
Especially when the implementation here is pretty trivial.
And as a library user, I think it's a lot more clear to the code reader to see an inline evalMap(identity)
rather than yet another method they need to learn as part of the api
So I'm a polite 👎 on the PR for those reasons
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good to know @Daenyth, I'd never think it could blow the memory. Would a suspend help resolve that? (at least to reduce the overhead of the non-trivial ones)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably not because the issue is the memory used by having large Chunk[IO[A]]
It's also possibly my codebase just was doing something really silly and that's why it cost so much memory for that structure
No description provided.