-
Notifications
You must be signed in to change notification settings - Fork 70
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Checkpointing v2 #333
Checkpointing v2 #333
Conversation
Do not write updates to Writebatch but do it once in the end instead
- Added new Checkpoint class to sync state updates and Kafka commits - Updated state transactions to span across multiple offsets - Added new ProcessingContext class to share dependencies and checkpoints between Application and SDF
- Save the latest produced TPs and offsets using delivery callbacks - If delivery callback returns an error, raise it on next produce() or flush() - Move KafkaMessageError outside `rowconsumer.py` and rename it to KafkaException
- Make flushing state to the disk and producing changelogs separate operations - Rename "maybe_flush" -> "flush"
- Move changelog producing code from Partition to StateTransaction - Make "processed_offset" a required param in "prepare()" - Pass the source topic info to the ChangelogProducer and add it to the changelog messages
edfa3bd
to
847dfaa
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really nice work.
Nothing major (and some things you already addressed).
For documentation reasons, Daniil and I also discussed potentially implementing a caching class for the RDB transactions, and probably some sort of batching for recovery as well. He also has some other outstanding refactoring he'd like to do, but out of scope of these changes.
Also, don't forget to build/update any docs as necc (or maybe we just do that prior to a release?) =)
Fix typo Co-authored-by: Tim Sawicki <[email protected]>
Fix typo Co-authored-by: Tim Sawicki <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Woo, lets merge!
New checkpointing
Why?
Existing commit & recovery implementation has the following issues:
Goals
What's changed
Introduced a new class
Checkpoint
responsible for flushing the Producer, committing topic offsets, and flushing the state in sync.Now it first flushes the Producer and ensures that all outgoing messages are delivered, then synchronously commits the topic offsets, and only then does it flush the state to the disk.
The Checkpoint commits on a schedule provided by the
commit_interval
Application setting.The default interval is 5 seconds, the same as Kafka Consumer's default autocommit interval.
Kafka Consumer autocommit is now always disabled.
The state updates are batched in-memory before the checkpoint is committed.
It improves the state performance because the disks are only touched once in several seconds, although it increases memory usage.
The batches are kept per key, so the more keys are processed during the checkpoint interval, the more memory it will require to store.
To use less memory, users may reduce the
commit_interval
value.The changelog messages now have the source topic-partition-offset info attached in the headers, and the recovery checks if the changelog belongs to the committed source topic message.
It ensures that the state changes are not applied for the messages that are not yet committed.
The app should always* recover state consistently minimizing the chance for double-counting in the At-Least-Once setting.
The store changelog offsets are now taken from the delivery callbacks (no manual calculation anymore).
The internal RowProducer now has
enable.idempotence = True
by default to provide stronger delivery guarantees.This setting can be disabled by providing
producer_extra_config = {'enable.idempotence': False}
to the Application class.Lots of refactoring and tests
Caveats
Example:
Since the changelogs are already produced, during recovery from scratch the app will apply them to the state.
To mitigate this, the EOS should be used, which we will implement in the future
Docs
I'll update the docs after the code review