Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Checkpointing v2 #333

Merged
merged 28 commits into from
May 7, 2024
Merged

Checkpointing v2 #333

merged 28 commits into from
May 7, 2024

Conversation

daniil-quix
Copy link
Collaborator

@daniil-quix daniil-quix commented Apr 11, 2024

New checkpointing

Why?

Existing commit & recovery implementation has the following issues:

  1. The delivery of produced messages (both for changelogs and output) is not guaranteed and the library doesn't check the delivery callbacks
  2. The changelog offsets are calculated on the fly before they're produced, which leaves room for errors
  3. Local state stores are updated for each processed message which puts more pressure on the disks

Goals

  1. Provide better state consistency guarantees in the At-Least-Once setting.
  2. Optimize the state performance.
  3. Build a foundation for EOS processing

What's changed

  1. Introduced a new class Checkpoint responsible for flushing the Producer, committing topic offsets, and flushing the state in sync.
    Now it first flushes the Producer and ensures that all outgoing messages are delivered, then synchronously commits the topic offsets, and only then does it flush the state to the disk.

  2. The Checkpoint commits on a schedule provided by the commit_interval Application setting.
    The default interval is 5 seconds, the same as Kafka Consumer's default autocommit interval.
    Kafka Consumer autocommit is now always disabled.

  3. The state updates are batched in-memory before the checkpoint is committed.
    It improves the state performance because the disks are only touched once in several seconds, although it increases memory usage.
    The batches are kept per key, so the more keys are processed during the checkpoint interval, the more memory it will require to store.
    To use less memory, users may reduce the commit_interval value.

  4. The changelog messages now have the source topic-partition-offset info attached in the headers, and the recovery checks if the changelog belongs to the committed source topic message.
    It ensures that the state changes are not applied for the messages that are not yet committed.
    The app should always* recover state consistently minimizing the chance for double-counting in the At-Least-Once setting.

  5. The store changelog offsets are now taken from the delivery callbacks (no manual calculation anymore).

  6. The internal RowProducer now has enable.idempotence = True by default to provide stronger delivery guarantees.
    This setting can be disabled by providing producer_extra_config = {'enable.idempotence': False} to the Application class.

  7. Lots of refactoring and tests

Caveats

  1. Although the stateful processing performance increases, the apps will require more memory to batch the state updates.
  2. In the At-Least-Once setting, it is still possible that unwanted changelog changes get applied.
    Example:
  • Checkpoint successfully produces changelog updates
  • Checkpoint fails to commit the source topic offsets
  • The user changes the application code and some of the input messages get filtered during re-processing.
    Since the changelogs are already produced, during recovery from scratch the app will apply them to the state.
    To mitigate this, the EOS should be used, which we will implement in the future

Docs

I'll update the docs after the code review

@daniil-quix daniil-quix marked this pull request as draft April 11, 2024 14:50
@daniil-quix daniil-quix marked this pull request as ready for review April 30, 2024 17:09
daniil-quix added 18 commits May 1, 2024 11:17
Do not write updates to Writebatch but do it once in the end instead
- Added new Checkpoint class to sync state updates and Kafka commits
- Updated state transactions to span across multiple offsets
- Added new ProcessingContext class to share dependencies and checkpoints between Application and SDF
- Save the latest produced TPs and offsets using delivery callbacks
- If delivery callback returns an error, raise it on next produce() or flush()
- Move KafkaMessageError outside `rowconsumer.py` and rename it to KafkaException
- Make flushing state to the disk and producing changelogs separate operations
- Rename "maybe_flush" -> "flush"
- Move changelog producing code from Partition to StateTransaction
- Make "processed_offset" a required param in "prepare()"
- Pass the source topic info to the ChangelogProducer and add it to the changelog messages
@daniil-quix daniil-quix force-pushed the feature/checkpointing-v2 branch from edfa3bd to 847dfaa Compare May 1, 2024 09:31
@daniil-quix daniil-quix changed the title Feature/checkpointing v2 Checkpointing v2 May 1, 2024
Copy link
Contributor

@tim-quix tim-quix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really nice work.

Nothing major (and some things you already addressed).

For documentation reasons, Daniil and I also discussed potentially implementing a caching class for the RDB transactions, and probably some sort of batching for recovery as well. He also has some other outstanding refactoring he'd like to do, but out of scope of these changes.

Also, don't forget to build/update any docs as necc (or maybe we just do that prior to a release?) =)

quixstreams/checkpointing/checkpoint.py Show resolved Hide resolved
quixstreams/checkpointing/checkpoint.py Show resolved Hide resolved
quixstreams/app.py Show resolved Hide resolved
quixstreams/state/rocksdb/partition.py Outdated Show resolved Hide resolved
quixstreams/state/rocksdb/partition.py Show resolved Hide resolved
quixstreams/processing_context.py Show resolved Hide resolved
quixstreams/state/recovery.py Show resolved Hide resolved
quixstreams/app.py Outdated Show resolved Hide resolved
quixstreams/state/rocksdb/transaction.py Outdated Show resolved Hide resolved
quixstreams/state/rocksdb/transaction.py Outdated Show resolved Hide resolved
@tim-quix tim-quix self-requested a review May 6, 2024 20:47
Copy link
Contributor

@tim-quix tim-quix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Woo, lets merge!

@daniil-quix daniil-quix merged commit 207d3f0 into main May 7, 2024
4 checks passed
@daniil-quix daniil-quix deleted the feature/checkpointing-v2 branch May 7, 2024 15:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants