Skip to content

Commit

Permalink
Update documentation (#667)
Browse files Browse the repository at this point in the history
Co-authored-by: daniil-quix <[email protected]>
  • Loading branch information
github-actions[bot] and daniil-quix authored Dec 3, 2024
1 parent 69bf48b commit c40c820
Show file tree
Hide file tree
Showing 2 changed files with 197 additions and 41 deletions.
131 changes: 109 additions & 22 deletions docs/api-reference/quixstreams.md
Original file line number Diff line number Diff line change
Expand Up @@ -11051,7 +11051,7 @@ with {_key: str, _value: dict, _timestamp: int}.
class Format(ABC)
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/formats/base.py#L14)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/formats/base.py#L13)

Base class for reading files serialized by the Quix Streams File Sink
Connector.
Expand All @@ -11069,7 +11069,7 @@ Also handles different compression types.
def __init__(compression: Optional[CompressionName] = None)
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/formats/base.py#L25)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/formats/base.py#L24)

super().__init__() this for a usable init.

Expand All @@ -11082,7 +11082,7 @@ super().__init__() this for a usable init.
def deserialize(filestream: BinaryIO) -> Iterable[dict]
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/formats/base.py#L35)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/formats/base.py#L33)

Parse a filelike byte stream into a collection of records

Expand Down Expand Up @@ -11114,10 +11114,14 @@ The iterable should output dicts with the following data/naming structure:
class FileSource(Source)
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/file.py#L17)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/file.py#L19)

Ingest a set of local files into kafka by iterating through the provided folder and
processing all nested files within it.
Ingest a set of files from a desired origin into Kafka by iterating through the
provided folder and processing all nested files within it.

Origins include a local filestore, AWS S3, or Microsoft Azure.

FileSource defaults to a local filestore (LocalOrigin) + JSON format.

Expects folder and file structures as generated by the related FileSink connector:

Expand Down Expand Up @@ -11146,14 +11150,24 @@ Example Usage:
```python
from quixstreams import Application
from quixstreams.sources.community.file import FileSource
from quixstreams.sources.community.file.origins import S3Origin

app = Application(broker_address="localhost:9092", auto_offset_reset="earliest")

origin = S3Origin(
bucket="<YOUR BUCKET>",
aws_access_key_id="<YOUR KEY ID>",
aws_secret_access_key="<YOUR SECRET KEY>",
aws_region="<YOUR REGION>",
)
source = FileSource(
filepath="/path/to/my/topic_folder",
file_format="json",
file_compression="gzip",
directory="path/to/your/topic_folder/",
origin=origin,
format="json",
compression="gzip",
)
sdf = app.dataframe(source=source).print(metadata=True)
# YOUR LOGIC HERE!

if __name__ == "__main__":
app.run()
Expand All @@ -11164,26 +11178,29 @@ if __name__ == "__main__":
#### FileSource.\_\_init\_\_

```python
def __init__(filepath: Union[str, Path],
file_format: Union[Format, FormatName],
file_compression: Optional[CompressionName] = None,
as_replay: bool = True,
def __init__(directory: Union[str, Path],
format: Union[Format, FormatName] = "json",
origin: Origin = LocalOrigin(),
compression: Optional[CompressionName] = None,
replay_speed: float = 1.0,
name: Optional[str] = None,
shutdown_timeout: float = 10)
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/file.py#L63)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/file.py#L79)

**Arguments**:

- `filepath`: a filepath to recursively read through; it is recommended to
- `directory`: a directory to recursively read through; it is recommended to
provide the path to a given topic folder (ex: `/path/to/topic_a`).
- `file_format`: what format the message files are in (ex: json, parquet).
Optionally, can provide a `Format` instance if more than file_compression
is necessary to define (file_compression will then be ignored).
- `file_compression`: what compression is used on the given files, if any.
- `as_replay`: Produce the messages with the original time delay between them.
Otherwise, produce the messages as fast as possible.
- `format`: what format the message files are in (ex: json, parquet).
Optionally, can provide a `Format` instance if more than compression
is necessary to define (compression will then be ignored).
- `origin`: an Origin type (defaults to reading local files).
- `compression`: what compression is used on the given files, if any.
- `replay_speed`: Produce the messages with this speed multiplier, which
roughly reflects the time "delay" between the original message producing.
Use any float >= 0, where 0 is no delay, and 1 is the original speed.
NOTE: Time delay will only be accurate per partition, NOT overall.
- `name`: The name of the Source application (Default: last folder name).
- `shutdown_timeout`: Time in seconds the application waits for the source
Expand All @@ -11197,7 +11214,7 @@ to gracefully shutdown
def default_topic() -> Topic
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/file.py#L110)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/file.py#L152)

Uses the file structure to generate the desired partition count for the

Expand All @@ -11207,6 +11224,76 @@ internal topic.

the original default topic, with updated partition count

<a id="quixstreams.sources.community.file.origins.local"></a>

## quixstreams.sources.community.file.origins.local

<a id="quixstreams.sources.community.file.origins"></a>

## quixstreams.sources.community.file.origins

<a id="quixstreams.sources.community.file.origins.s3"></a>

## quixstreams.sources.community.file.origins.s3

<a id="quixstreams.sources.community.file.origins.s3.S3Origin"></a>

### S3Origin

```python
class S3Origin(Origin)
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/origins/s3.py#L23)

<a id="quixstreams.sources.community.file.origins.s3.S3Origin.__init__"></a>

#### S3Origin.\_\_init\_\_

```python
def __init__(
bucket: str,
region_name: Optional[str] = getenv("AWS_REGION"),
aws_access_key_id: Optional[str] = getenv("AWS_ACCESS_KEY_ID"),
aws_secret_access_key: Optional[str] = getenv("AWS_SECRET_ACCESS_KEY"),
endpoint_url: Optional[str] = getenv("AWS_ENDPOINT_URL_S3"))
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/origins/s3.py#L24)

Configure IcebergSink to work with AWS Glue.

**Arguments**:

- `bucket`: The S3 bucket name only (ex: 'your-bucket').
- `region_name`: The AWS region.
NOTE: can alternatively set the AWS_REGION environment variable
- `aws_access_key_id`: the AWS access key ID.
NOTE: can alternatively set the AWS_ACCESS_KEY_ID environment variable
- `aws_secret_access_key`: the AWS secret access key.
NOTE: can alternatively set the AWS_SECRET_ACCESS_KEY environment variable
- `endpoint_url`: the endpoint URL to use; only required for connecting
to a locally hosted S3.
NOTE: can alternatively set the AWS_ENDPOINT_URL_S3 environment variable

<a id="quixstreams.sources.community.file.origins.base"></a>

## quixstreams.sources.community.file.origins.base

<a id="quixstreams.sources.community.file.origins.base.Origin"></a>

### Origin

```python
class Origin(ABC)
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/origins/base.py#L8)

An interface for interacting with a file-based client.

Provides methods for navigating folders and retrieving/opening raw files.

<a id="quixstreams.sources.community.file.compressions.gzip"></a>

## quixstreams.sources.community.file.compressions.gzip
Expand Down
107 changes: 88 additions & 19 deletions docs/api-reference/sources.md
Original file line number Diff line number Diff line change
Expand Up @@ -830,10 +830,14 @@ For other parameters See `quixstreams.sources.kafka.KafkaReplicatorSource`
class FileSource(Source)
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/file.py#L17)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/file.py#L19)

Ingest a set of local files into kafka by iterating through the provided folder and
processing all nested files within it.
Ingest a set of files from a desired origin into Kafka by iterating through the
provided folder and processing all nested files within it.

Origins include a local filestore, AWS S3, or Microsoft Azure.

FileSource defaults to a local filestore (LocalOrigin) + JSON format.

Expects folder and file structures as generated by the related FileSink connector:

Expand Down Expand Up @@ -862,14 +866,24 @@ Example Usage:
```python
from quixstreams import Application
from quixstreams.sources.community.file import FileSource
from quixstreams.sources.community.file.origins import S3Origin

app = Application(broker_address="localhost:9092", auto_offset_reset="earliest")

origin = S3Origin(
bucket="<YOUR BUCKET>",
aws_access_key_id="<YOUR KEY ID>",
aws_secret_access_key="<YOUR SECRET KEY>",
aws_region="<YOUR REGION>",
)
source = FileSource(
filepath="/path/to/my/topic_folder",
file_format="json",
file_compression="gzip",
directory="path/to/your/topic_folder/",
origin=origin,
format="json",
compression="gzip",
)
sdf = app.dataframe(source=source).print(metadata=True)
# YOUR LOGIC HERE!

if __name__ == "__main__":
app.run()
Expand All @@ -882,28 +896,31 @@ if __name__ == "__main__":
#### FileSource.\_\_init\_\_

```python
def __init__(filepath: Union[str, Path],
file_format: Union[Format, FormatName],
file_compression: Optional[CompressionName] = None,
as_replay: bool = True,
def __init__(directory: Union[str, Path],
format: Union[Format, FormatName] = "json",
origin: Origin = LocalOrigin(),
compression: Optional[CompressionName] = None,
replay_speed: float = 1.0,
name: Optional[str] = None,
shutdown_timeout: float = 10)
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/file.py#L63)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/file.py#L79)


<br>
***Arguments:***

- `filepath`: a filepath to recursively read through; it is recommended to
- `directory`: a directory to recursively read through; it is recommended to
provide the path to a given topic folder (ex: `/path/to/topic_a`).
- `file_format`: what format the message files are in (ex: json, parquet).
Optionally, can provide a `Format` instance if more than file_compression
is necessary to define (file_compression will then be ignored).
- `file_compression`: what compression is used on the given files, if any.
- `as_replay`: Produce the messages with the original time delay between them.
Otherwise, produce the messages as fast as possible.
- `format`: what format the message files are in (ex: json, parquet).
Optionally, can provide a `Format` instance if more than compression
is necessary to define (compression will then be ignored).
- `origin`: an Origin type (defaults to reading local files).
- `compression`: what compression is used on the given files, if any.
- `replay_speed`: Produce the messages with this speed multiplier, which
roughly reflects the time "delay" between the original message producing.
Use any float >= 0, where 0 is no delay, and 1 is the original speed.
NOTE: Time delay will only be accurate per partition, NOT overall.
- `name`: The name of the Source application (Default: last folder name).
- `shutdown_timeout`: Time in seconds the application waits for the source
Expand All @@ -919,7 +936,7 @@ to gracefully shutdown
def default_topic() -> Topic
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/file.py#L110)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/file.py#L152)

Uses the file structure to generate the desired partition count for the

Expand All @@ -935,6 +952,58 @@ the original default topic, with updated partition count

## quixstreams.sources.community.file.compressions.gzip

<a id="quixstreams.sources.community.file.origins.local"></a>

## quixstreams.sources.community.file.origins.local

<a id="quixstreams.sources.community.file.origins.s3"></a>

## quixstreams.sources.community.file.origins.s3

<a id="quixstreams.sources.community.file.origins.s3.S3Origin"></a>

### S3Origin

```python
class S3Origin(Origin)
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/origins/s3.py#L23)

<a id="quixstreams.sources.community.file.origins.s3.S3Origin.__init__"></a>

<br><br>

#### S3Origin.\_\_init\_\_

```python
def __init__(
bucket: str,
region_name: Optional[str] = getenv("AWS_REGION"),
aws_access_key_id: Optional[str] = getenv("AWS_ACCESS_KEY_ID"),
aws_secret_access_key: Optional[str] = getenv("AWS_SECRET_ACCESS_KEY"),
endpoint_url: Optional[str] = getenv("AWS_ENDPOINT_URL_S3"))
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/origins/s3.py#L24)

Configure IcebergSink to work with AWS Glue.


<br>
***Arguments:***

- `bucket`: The S3 bucket name only (ex: 'your-bucket').
- `region_name`: The AWS region.
NOTE: can alternatively set the AWS_REGION environment variable
- `aws_access_key_id`: the AWS access key ID.
NOTE: can alternatively set the AWS_ACCESS_KEY_ID environment variable
- `aws_secret_access_key`: the AWS secret access key.
NOTE: can alternatively set the AWS_SECRET_ACCESS_KEY environment variable
- `endpoint_url`: the endpoint URL to use; only required for connecting
to a locally hosted S3.
NOTE: can alternatively set the AWS_ENDPOINT_URL_S3 environment variable

<a id="quixstreams.sources.community.file.formats.json"></a>

## quixstreams.sources.community.file.formats.json
Expand Down

0 comments on commit c40c820

Please sign in to comment.