Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Checkpointing v2 #333

Merged
merged 28 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
f469cca
Batch state updates in RocksDBPartitionTransaction
daniil-quix Apr 2, 2024
e7aeb57
New Checkpointing flow (part 1)
daniil-quix Apr 11, 2024
877ead9
Accept delivery callbacks in Producer.produce()
daniil-quix Apr 12, 2024
d4b5f37
Update RowProducer to track message delivery
daniil-quix Apr 19, 2024
0267e00
Separate changelog producing and flush in State
daniil-quix Apr 19, 2024
00df6c3
Separate exceptions for RowProducer and RowConsumer
daniil-quix Apr 23, 2024
f1a7b19
Fix failing Application tests
daniil-quix Apr 24, 2024
9ad15dd
Move checkpoint to a module and add tests
daniil-quix Apr 24, 2024
8a4ba03
Expose changelog name and partition on ChangelogProducer
daniil-quix Apr 24, 2024
a7cffe4
Add missing recovery_manager_factory
daniil-quix Apr 24, 2024
5c98975
Add more logs to store transaction
daniil-quix Apr 24, 2024
d4ed15c
Log elapsed time of the checkpoint commit
daniil-quix Apr 24, 2024
a1ff890
Fix recovery test
daniil-quix Apr 24, 2024
f46d202
Don't commit the checkpoint if application fails
daniil-quix Apr 24, 2024
4a8d2ec
Add source topic-partition-offset to changelog messages
daniil-quix Apr 26, 2024
cf6236c
Pass latest committed offset to the store partition for recovery, ref…
daniil-quix Apr 29, 2024
d784d31
Remove ApplicationStatus enum
daniil-quix Apr 30, 2024
847dfaa
Implement consistent recovery
daniil-quix Apr 30, 2024
d8e95e4
Enable idempotence for internal RowProducer
daniil-quix May 1, 2024
db2523e
Remove topic and partition values from the changelog messages
daniil-quix May 2, 2024
b328a7d
Update Checkpoint.commit docstring
daniil-quix May 7, 2024
c3723e9
Add commit_interval to Application docstring
daniil-quix May 7, 2024
21918f0
Update quixstreams/state/rocksdb/transaction.py
daniil-quix May 7, 2024
8873f78
Update quixstreams/app.py
daniil-quix May 7, 2024
0b2b5ee
Update quixstreams/state/rocksdb/transaction.py
daniil-quix May 7, 2024
2b22515
Rename _should_skip_changelog -> _should_apply_changelog
daniil-quix May 7, 2024
3787271
Remove source_topic_name from changelog classes
daniil-quix May 7, 2024
b640699
Re-generate API docs
daniil-quix May 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 23 additions & 19 deletions docs/api-reference/application.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
class Application()
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/356d83c8caf613065f333dcd470e004443c12544/quixstreams/app.py#L55)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3787271060f892c91ec0aef934bfd09c55790e92/quixstreams/app.py#L59)

The main Application class.

Expand Down Expand Up @@ -62,7 +62,7 @@ def __init__(broker_address: Optional[str] = None,
quix_sdk_token: Optional[str] = None,
consumer_group: Optional[str] = None,
auto_offset_reset: AutoOffsetReset = "latest",
auto_commit_enable: bool = True,
commit_interval: float = 5.0,
partitioner: Partitioner = "murmur2",
consumer_extra_config: Optional[dict] = None,
producer_extra_config: Optional[dict] = None,
Expand All @@ -81,7 +81,7 @@ def __init__(broker_address: Optional[str] = None,
topic_manager: Optional[TopicManager] = None)
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/356d83c8caf613065f333dcd470e004443c12544/quixstreams/app.py#L93)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3787271060f892c91ec0aef934bfd09c55790e92/quixstreams/app.py#L97)


<br>
Expand All @@ -102,9 +102,9 @@ Passed as `group.id` to `confluent_kafka.Consumer`.
Linked Environment Variable: `Quix__Consumer__Group`.
Default - "quixstreams-default" (set during init)
>***NOTE:*** Quix Applications will prefix it with the Quix workspace id.
- `commit_interval`: How often to commit the processed messages in seconds.
Default - 5.0.
- `auto_offset_reset`: Consumer `auto.offset.reset` setting
- `auto_commit_enable`: If true, periodically commit offset of
the last message handed to the application. Default - `True`.
- `partitioner`: A function to be used to determine the outgoing message
partition.
- `consumer_extra_config`: A dictionary with additional options that
Expand Down Expand Up @@ -157,7 +157,6 @@ instead of the default one.
def Quix(cls,
consumer_group: Optional[str] = None,
auto_offset_reset: AutoOffsetReset = "latest",
auto_commit_enable: bool = True,
partitioner: Partitioner = "murmur2",
consumer_extra_config: Optional[dict] = None,
producer_extra_config: Optional[dict] = None,
Expand All @@ -176,7 +175,7 @@ def Quix(cls,
topic_manager: Optional[QuixTopicManager] = None) -> Self
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/356d83c8caf613065f333dcd470e004443c12544/quixstreams/app.py#L296)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3787271060f892c91ec0aef934bfd09c55790e92/quixstreams/app.py#L303)

>***NOTE:*** DEPRECATED: use Application with `quix_sdk_token` argument instead.

Expand Down Expand Up @@ -224,8 +223,6 @@ Linked Environment Variable: `Quix__Consumer__Group`.
Default - "quixstreams-default" (set during init).
>***NOTE:*** Quix Applications will prefix it with the Quix workspace id.
- `auto_offset_reset`: Consumer `auto.offset.reset` setting
- `auto_commit_enable`: If true, periodically commit offset of
the last message handed to the application. Default - `True`.
- `partitioner`: A function to be used to determine the outgoing message
partition.
- `consumer_extra_config`: A dictionary with additional options that
Expand Down Expand Up @@ -288,7 +285,7 @@ def topic(name: str,
timestamp_extractor: Optional[TimestampExtractor] = None) -> Topic
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/356d83c8caf613065f333dcd470e004443c12544/quixstreams/app.py#L436)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3787271060f892c91ec0aef934bfd09c55790e92/quixstreams/app.py#L439)

Create a topic definition.

Expand Down Expand Up @@ -369,7 +366,7 @@ topic = app.topic("input-topic", timestamp_extractor=custom_ts_extractor)
def dataframe(topic: Topic) -> StreamingDataFrame
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/356d83c8caf613065f333dcd470e004443c12544/quixstreams/app.py#L516)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3787271060f892c91ec0aef934bfd09c55790e92/quixstreams/app.py#L519)

A simple helper method that generates a `StreamingDataFrame`, which is used

Expand Down Expand Up @@ -416,10 +413,10 @@ to be used as an input topic.
#### Application.stop

```python
def stop()
def stop(fail: bool = False)
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/356d83c8caf613065f333dcd470e004443c12544/quixstreams/app.py#L552)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3787271060f892c91ec0aef934bfd09c55790e92/quixstreams/app.py#L556)

Stop the internal poll loop and the message processing.

Expand All @@ -429,6 +426,13 @@ likely through some sort of threading).
To otherwise stop an application, either send a `SIGTERM` to the process
(like Kubernetes does) or perform a typical `KeyboardInterrupt` (`Ctrl+C`).


<br>
***Arguments:***

- `fail`: if True, signals that application is stopped due
to unhandled exception, and it shouldn't commit the current checkpoint.

<a id="quixstreams.app.Application.get_producer"></a>

<br><br>
Expand All @@ -439,7 +443,7 @@ To otherwise stop an application, either send a `SIGTERM` to the process
def get_producer() -> Producer
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/356d83c8caf613065f333dcd470e004443c12544/quixstreams/app.py#L566)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3787271060f892c91ec0aef934bfd09c55790e92/quixstreams/app.py#L579)

Create and return a pre-configured Producer instance.
The Producer is initialized with params passed to Application.
Expand Down Expand Up @@ -471,10 +475,10 @@ with app.get_producer() as producer:
#### Application.get\_consumer

```python
def get_consumer() -> Consumer
def get_consumer(auto_commit_enable: bool = True) -> Consumer
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/356d83c8caf613065f333dcd470e004443c12544/quixstreams/app.py#L597)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3787271060f892c91ec0aef934bfd09c55790e92/quixstreams/app.py#L610)

Create and return a pre-configured Consumer instance.
The Consumer is initialized with params passed to Application.
Expand Down Expand Up @@ -519,7 +523,7 @@ with app.get_consumer() as consumer:
def clear_state()
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/356d83c8caf613065f333dcd470e004443c12544/quixstreams/app.py#L641)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3787271060f892c91ec0aef934bfd09c55790e92/quixstreams/app.py#L654)

Clear the state of the application.

Expand All @@ -533,11 +537,11 @@ Clear the state of the application.
def run(dataframe: StreamingDataFrame)
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/356d83c8caf613065f333dcd470e004443c12544/quixstreams/app.py#L719)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3787271060f892c91ec0aef934bfd09c55790e92/quixstreams/app.py#L660)

Start processing data from Kafka using provided `StreamingDataFrame`

One started, can be safely terminated with a `SIGTERM` signal
Once started, it can be safely terminated with a `SIGTERM` signal
(like Kubernetes does) or a typical `KeyboardInterrupt` (`Ctrl+C`).


Expand Down
6 changes: 3 additions & 3 deletions docs/api-reference/context.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
def set_message_context(context: Optional[MessageContext])
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/356d83c8caf613065f333dcd470e004443c12544/quixstreams/context.py#L21)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3787271060f892c91ec0aef934bfd09c55790e92/quixstreams/context.py#L21)

Set a MessageContext for the current message in the given `contextvars.Context`

Expand Down Expand Up @@ -55,7 +55,7 @@ sdf = sdf.update(lambda value: alter_context(value))
def message_context() -> MessageContext
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/356d83c8caf613065f333dcd470e004443c12544/quixstreams/context.py#L52)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3787271060f892c91ec0aef934bfd09c55790e92/quixstreams/context.py#L52)

Get a MessageContext for the current message, which houses most of the message

Expand Down Expand Up @@ -96,7 +96,7 @@ instance of `MessageContext`
def message_key() -> Any
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/356d83c8caf613065f333dcd470e004443c12544/quixstreams/context.py#L83)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3787271060f892c91ec0aef934bfd09c55790e92/quixstreams/context.py#L83)

Get the current message's key.

Expand Down
44 changes: 22 additions & 22 deletions docs/api-reference/dataframe.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
class StreamingDataFrame(BaseStreaming)
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/356d83c8caf613065f333dcd470e004443c12544/quixstreams/dataframe/dataframe.py#L32)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3787271060f892c91ec0aef934bfd09c55790e92/quixstreams/dataframe/dataframe.py#L31)

`StreamingDataFrame` is the main object you will use for ETL work.

Expand Down Expand Up @@ -74,7 +74,7 @@ def apply(func: Union[DataFrameFunc, DataFrameStatefulFunc],
expand: bool = False) -> Self
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/356d83c8caf613065f333dcd470e004443c12544/quixstreams/dataframe/dataframe.py#L109)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3787271060f892c91ec0aef934bfd09c55790e92/quixstreams/dataframe/dataframe.py#L108)

Apply a function to transform the value and return a new value.

Expand Down Expand Up @@ -122,7 +122,7 @@ def update(func: Union[DataFrameFunc, DataFrameStatefulFunc],
stateful: bool = False) -> Self
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/356d83c8caf613065f333dcd470e004443c12544/quixstreams/dataframe/dataframe.py#L152)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3787271060f892c91ec0aef934bfd09c55790e92/quixstreams/dataframe/dataframe.py#L151)

Apply a function to mutate value in-place or to perform a side effect

Expand Down Expand Up @@ -170,7 +170,7 @@ def filter(func: Union[DataFrameFunc, DataFrameStatefulFunc],
stateful: bool = False) -> Self
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/356d83c8caf613065f333dcd470e004443c12544/quixstreams/dataframe/dataframe.py#L191)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3787271060f892c91ec0aef934bfd09c55790e92/quixstreams/dataframe/dataframe.py#L190)

Filter value using provided function.

Expand Down Expand Up @@ -218,7 +218,7 @@ of type `State` to perform stateful operations.
def contains(key: str) -> StreamingSeries
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/356d83c8caf613065f333dcd470e004443c12544/quixstreams/dataframe/dataframe.py#L244)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3787271060f892c91ec0aef934bfd09c55790e92/quixstreams/dataframe/dataframe.py#L233)

Check if the key is present in the Row value.

Expand Down Expand Up @@ -258,7 +258,7 @@ def to_topic(topic: Topic,
key: Optional[Callable[[object], object]] = None) -> Self
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/356d83c8caf613065f333dcd470e004443c12544/quixstreams/dataframe/dataframe.py#L267)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3787271060f892c91ec0aef934bfd09c55790e92/quixstreams/dataframe/dataframe.py#L256)

Produce current value to a topic. You can optionally specify a new key.

Expand Down Expand Up @@ -306,7 +306,7 @@ By default, the current message key will be used.
def compose() -> StreamCallable
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/356d83c8caf613065f333dcd470e004443c12544/quixstreams/dataframe/dataframe.py#L306)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3787271060f892c91ec0aef934bfd09c55790e92/quixstreams/dataframe/dataframe.py#L295)

Compose all functions of this StreamingDataFrame into one big closure.

Expand Down Expand Up @@ -349,7 +349,7 @@ and returns a result of StreamingDataFrame
def test(value: object, ctx: Optional[MessageContext] = None) -> Any
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/356d83c8caf613065f333dcd470e004443c12544/quixstreams/dataframe/dataframe.py#L336)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3787271060f892c91ec0aef934bfd09c55790e92/quixstreams/dataframe/dataframe.py#L325)

A shorthand to test `StreamingDataFrame` with provided value

Expand Down Expand Up @@ -383,7 +383,7 @@ def tumbling_window(duration_ms: Union[int, timedelta],
name: Optional[str] = None) -> TumblingWindowDefinition
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/356d83c8caf613065f333dcd470e004443c12544/quixstreams/dataframe/dataframe.py#L354)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3787271060f892c91ec0aef934bfd09c55790e92/quixstreams/dataframe/dataframe.py#L343)

Create a tumbling window transformation on this StreamingDataFrame.

Expand Down Expand Up @@ -468,7 +468,7 @@ def hopping_window(duration_ms: Union[int, timedelta],
name: Optional[str] = None) -> HoppingWindowDefinition
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/356d83c8caf613065f333dcd470e004443c12544/quixstreams/dataframe/dataframe.py#L429)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3787271060f892c91ec0aef934bfd09c55790e92/quixstreams/dataframe/dataframe.py#L418)

Create a hopping window transformation on this StreamingDataFrame.

Expand Down Expand Up @@ -561,7 +561,7 @@ sdf = (
class StreamingSeries(BaseStreaming)
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/356d83c8caf613065f333dcd470e004443c12544/quixstreams/dataframe/series.py#L17)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3787271060f892c91ec0aef934bfd09c55790e92/quixstreams/dataframe/series.py#L40)

`StreamingSeries` are typically generated by `StreamingDataframes` when getting
elements from, or performing certain operations on, a `StreamingDataframe`,
Expand Down Expand Up @@ -627,7 +627,7 @@ sdf = sdf[["column_a"] & (sdf["new_sum_field"] >= 10)]
def from_func(cls, func: StreamCallable) -> Self
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/356d83c8caf613065f333dcd470e004443c12544/quixstreams/dataframe/series.py#L77)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3787271060f892c91ec0aef934bfd09c55790e92/quixstreams/dataframe/series.py#L100)

Create a StreamingSeries from a function.

Expand Down Expand Up @@ -655,7 +655,7 @@ instance of `StreamingSeries`
def apply(func: StreamCallable) -> Self
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/356d83c8caf613065f333dcd470e004443c12544/quixstreams/dataframe/series.py#L91)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3787271060f892c91ec0aef934bfd09c55790e92/quixstreams/dataframe/series.py#L114)

Add a callable to the execution list for this series.

Expand Down Expand Up @@ -708,7 +708,7 @@ def compose(allow_filters: bool = True,
allow_updates: bool = True) -> StreamCallable
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/356d83c8caf613065f333dcd470e004443c12544/quixstreams/dataframe/series.py#L125)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3787271060f892c91ec0aef934bfd09c55790e92/quixstreams/dataframe/series.py#L148)

Compose all functions of this StreamingSeries into one big closure.

Expand Down Expand Up @@ -768,7 +768,7 @@ and returns a result of `StreamingSeries`
def test(value: Any, ctx: Optional[MessageContext] = None) -> Any
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/356d83c8caf613065f333dcd470e004443c12544/quixstreams/dataframe/series.py#L172)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3787271060f892c91ec0aef934bfd09c55790e92/quixstreams/dataframe/series.py#L195)

A shorthand to test `StreamingSeries` with provided value

Expand Down Expand Up @@ -800,7 +800,7 @@ result of `StreamingSeries`
def isin(other: Container) -> Self
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/356d83c8caf613065f333dcd470e004443c12544/quixstreams/dataframe/series.py#L208)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3787271060f892c91ec0aef934bfd09c55790e92/quixstreams/dataframe/series.py#L231)

Check if series value is in "other".

Expand Down Expand Up @@ -845,7 +845,7 @@ new StreamingSeries
def contains(other: Union[Self, object]) -> Self
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/356d83c8caf613065f333dcd470e004443c12544/quixstreams/dataframe/series.py#L235)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3787271060f892c91ec0aef934bfd09c55790e92/quixstreams/dataframe/series.py#L258)

Check if series value contains "other"

Expand Down Expand Up @@ -890,7 +890,7 @@ new StreamingSeries
def is_(other: Union[Self, object]) -> Self
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/356d83c8caf613065f333dcd470e004443c12544/quixstreams/dataframe/series.py#L260)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3787271060f892c91ec0aef934bfd09c55790e92/quixstreams/dataframe/series.py#L283)

Check if series value refers to the same object as `other`

Expand Down Expand Up @@ -932,7 +932,7 @@ new StreamingSeries
def isnot(other: Union[Self, object]) -> Self
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/356d83c8caf613065f333dcd470e004443c12544/quixstreams/dataframe/series.py#L283)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3787271060f892c91ec0aef934bfd09c55790e92/quixstreams/dataframe/series.py#L306)

Check if series value does not refer to the same object as `other`

Expand Down Expand Up @@ -975,7 +975,7 @@ new StreamingSeries
def isnull() -> Self
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/356d83c8caf613065f333dcd470e004443c12544/quixstreams/dataframe/series.py#L307)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3787271060f892c91ec0aef934bfd09c55790e92/quixstreams/dataframe/series.py#L330)

Check if series value is None.

Expand Down Expand Up @@ -1012,7 +1012,7 @@ new StreamingSeries
def notnull() -> Self
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/356d83c8caf613065f333dcd470e004443c12544/quixstreams/dataframe/series.py#L330)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3787271060f892c91ec0aef934bfd09c55790e92/quixstreams/dataframe/series.py#L353)

Check if series value is not None.

Expand Down Expand Up @@ -1049,7 +1049,7 @@ new StreamingSeries
def abs() -> Self
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/356d83c8caf613065f333dcd470e004443c12544/quixstreams/dataframe/series.py#L353)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3787271060f892c91ec0aef934bfd09c55790e92/quixstreams/dataframe/series.py#L376)

Get absolute value of the series value.

Expand Down
Loading
Loading