diff --git a/docs/api-reference/quixstreams.md b/docs/api-reference/quixstreams.md index 2ff360b81..1b69f0436 100644 --- a/docs/api-reference/quixstreams.md +++ b/docs/api-reference/quixstreams.md @@ -11051,7 +11051,7 @@ with {_key: str, _value: dict, _timestamp: int}. class Format(ABC) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/formats/base.py#L14) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/formats/base.py#L13) Base class for reading files serialized by the Quix Streams File Sink Connector. @@ -11069,7 +11069,7 @@ Also handles different compression types. def __init__(compression: Optional[CompressionName] = None) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/formats/base.py#L25) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/formats/base.py#L24) super().__init__() this for a usable init. @@ -11082,7 +11082,7 @@ super().__init__() this for a usable init. def deserialize(filestream: BinaryIO) -> Iterable[dict] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/formats/base.py#L35) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/formats/base.py#L33) Parse a filelike byte stream into a collection of records @@ -11114,10 +11114,14 @@ The iterable should output dicts with the following data/naming structure: class FileSource(Source) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/file.py#L17) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/file.py#L19) -Ingest a set of local files into kafka by iterating through the provided folder and -processing all nested files within it. +Ingest a set of files from a desired origin into Kafka by iterating through the +provided folder and processing all nested files within it. + +Origins include a local filestore, AWS S3, or Microsoft Azure. + +FileSource defaults to a local filestore (LocalOrigin) + JSON format. Expects folder and file structures as generated by the related FileSink connector: @@ -11146,14 +11150,24 @@ Example Usage: ```python from quixstreams import Application from quixstreams.sources.community.file import FileSource +from quixstreams.sources.community.file.origins import S3Origin app = Application(broker_address="localhost:9092", auto_offset_reset="earliest") + +origin = S3Origin( + bucket="", + aws_access_key_id="", + aws_secret_access_key="", + aws_region="", +) source = FileSource( - filepath="/path/to/my/topic_folder", - file_format="json", - file_compression="gzip", + directory="path/to/your/topic_folder/", + origin=origin, + format="json", + compression="gzip", ) sdf = app.dataframe(source=source).print(metadata=True) +# YOUR LOGIC HERE! if __name__ == "__main__": app.run() @@ -11164,26 +11178,29 @@ if __name__ == "__main__": #### FileSource.\_\_init\_\_ ```python -def __init__(filepath: Union[str, Path], - file_format: Union[Format, FormatName], - file_compression: Optional[CompressionName] = None, - as_replay: bool = True, +def __init__(directory: Union[str, Path], + format: Union[Format, FormatName] = "json", + origin: Origin = LocalOrigin(), + compression: Optional[CompressionName] = None, + replay_speed: float = 1.0, name: Optional[str] = None, shutdown_timeout: float = 10) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/file.py#L63) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/file.py#L79) **Arguments**: -- `filepath`: a filepath to recursively read through; it is recommended to +- `directory`: a directory to recursively read through; it is recommended to provide the path to a given topic folder (ex: `/path/to/topic_a`). -- `file_format`: what format the message files are in (ex: json, parquet). -Optionally, can provide a `Format` instance if more than file_compression -is necessary to define (file_compression will then be ignored). -- `file_compression`: what compression is used on the given files, if any. -- `as_replay`: Produce the messages with the original time delay between them. -Otherwise, produce the messages as fast as possible. +- `format`: what format the message files are in (ex: json, parquet). +Optionally, can provide a `Format` instance if more than compression +is necessary to define (compression will then be ignored). +- `origin`: an Origin type (defaults to reading local files). +- `compression`: what compression is used on the given files, if any. +- `replay_speed`: Produce the messages with this speed multiplier, which +roughly reflects the time "delay" between the original message producing. +Use any float >= 0, where 0 is no delay, and 1 is the original speed. NOTE: Time delay will only be accurate per partition, NOT overall. - `name`: The name of the Source application (Default: last folder name). - `shutdown_timeout`: Time in seconds the application waits for the source @@ -11197,7 +11214,7 @@ to gracefully shutdown def default_topic() -> Topic ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/file.py#L110) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/file.py#L152) Uses the file structure to generate the desired partition count for the @@ -11207,6 +11224,76 @@ internal topic. the original default topic, with updated partition count + + +## quixstreams.sources.community.file.origins.local + + + +## quixstreams.sources.community.file.origins + + + +## quixstreams.sources.community.file.origins.s3 + + + +### S3Origin + +```python +class S3Origin(Origin) +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/origins/s3.py#L23) + + + +#### S3Origin.\_\_init\_\_ + +```python +def __init__( + bucket: str, + region_name: Optional[str] = getenv("AWS_REGION"), + aws_access_key_id: Optional[str] = getenv("AWS_ACCESS_KEY_ID"), + aws_secret_access_key: Optional[str] = getenv("AWS_SECRET_ACCESS_KEY"), + endpoint_url: Optional[str] = getenv("AWS_ENDPOINT_URL_S3")) +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/origins/s3.py#L24) + +Configure IcebergSink to work with AWS Glue. + +**Arguments**: + +- `bucket`: The S3 bucket name only (ex: 'your-bucket'). +- `region_name`: The AWS region. +NOTE: can alternatively set the AWS_REGION environment variable +- `aws_access_key_id`: the AWS access key ID. +NOTE: can alternatively set the AWS_ACCESS_KEY_ID environment variable +- `aws_secret_access_key`: the AWS secret access key. +NOTE: can alternatively set the AWS_SECRET_ACCESS_KEY environment variable +- `endpoint_url`: the endpoint URL to use; only required for connecting +to a locally hosted S3. +NOTE: can alternatively set the AWS_ENDPOINT_URL_S3 environment variable + + + +## quixstreams.sources.community.file.origins.base + + + +### Origin + +```python +class Origin(ABC) +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/origins/base.py#L8) + +An interface for interacting with a file-based client. + +Provides methods for navigating folders and retrieving/opening raw files. + ## quixstreams.sources.community.file.compressions.gzip diff --git a/docs/api-reference/sources.md b/docs/api-reference/sources.md index 23ec55113..7eb102d85 100644 --- a/docs/api-reference/sources.md +++ b/docs/api-reference/sources.md @@ -830,10 +830,14 @@ For other parameters See `quixstreams.sources.kafka.KafkaReplicatorSource` class FileSource(Source) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/file.py#L17) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/file.py#L19) -Ingest a set of local files into kafka by iterating through the provided folder and -processing all nested files within it. +Ingest a set of files from a desired origin into Kafka by iterating through the +provided folder and processing all nested files within it. + +Origins include a local filestore, AWS S3, or Microsoft Azure. + +FileSource defaults to a local filestore (LocalOrigin) + JSON format. Expects folder and file structures as generated by the related FileSink connector: @@ -862,14 +866,24 @@ Example Usage: ```python from quixstreams import Application from quixstreams.sources.community.file import FileSource +from quixstreams.sources.community.file.origins import S3Origin app = Application(broker_address="localhost:9092", auto_offset_reset="earliest") + +origin = S3Origin( + bucket="", + aws_access_key_id="", + aws_secret_access_key="", + aws_region="", +) source = FileSource( - filepath="/path/to/my/topic_folder", - file_format="json", - file_compression="gzip", + directory="path/to/your/topic_folder/", + origin=origin, + format="json", + compression="gzip", ) sdf = app.dataframe(source=source).print(metadata=True) +# YOUR LOGIC HERE! if __name__ == "__main__": app.run() @@ -882,28 +896,31 @@ if __name__ == "__main__": #### FileSource.\_\_init\_\_ ```python -def __init__(filepath: Union[str, Path], - file_format: Union[Format, FormatName], - file_compression: Optional[CompressionName] = None, - as_replay: bool = True, +def __init__(directory: Union[str, Path], + format: Union[Format, FormatName] = "json", + origin: Origin = LocalOrigin(), + compression: Optional[CompressionName] = None, + replay_speed: float = 1.0, name: Optional[str] = None, shutdown_timeout: float = 10) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/file.py#L63) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/file.py#L79)
***Arguments:*** -- `filepath`: a filepath to recursively read through; it is recommended to +- `directory`: a directory to recursively read through; it is recommended to provide the path to a given topic folder (ex: `/path/to/topic_a`). -- `file_format`: what format the message files are in (ex: json, parquet). -Optionally, can provide a `Format` instance if more than file_compression -is necessary to define (file_compression will then be ignored). -- `file_compression`: what compression is used on the given files, if any. -- `as_replay`: Produce the messages with the original time delay between them. -Otherwise, produce the messages as fast as possible. +- `format`: what format the message files are in (ex: json, parquet). +Optionally, can provide a `Format` instance if more than compression +is necessary to define (compression will then be ignored). +- `origin`: an Origin type (defaults to reading local files). +- `compression`: what compression is used on the given files, if any. +- `replay_speed`: Produce the messages with this speed multiplier, which +roughly reflects the time "delay" between the original message producing. +Use any float >= 0, where 0 is no delay, and 1 is the original speed. NOTE: Time delay will only be accurate per partition, NOT overall. - `name`: The name of the Source application (Default: last folder name). - `shutdown_timeout`: Time in seconds the application waits for the source @@ -919,7 +936,7 @@ to gracefully shutdown def default_topic() -> Topic ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/file.py#L110) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/file.py#L152) Uses the file structure to generate the desired partition count for the @@ -935,6 +952,58 @@ the original default topic, with updated partition count ## quixstreams.sources.community.file.compressions.gzip + + +## quixstreams.sources.community.file.origins.local + + + +## quixstreams.sources.community.file.origins.s3 + + + +### S3Origin + +```python +class S3Origin(Origin) +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/origins/s3.py#L23) + + + +

+ +#### S3Origin.\_\_init\_\_ + +```python +def __init__( + bucket: str, + region_name: Optional[str] = getenv("AWS_REGION"), + aws_access_key_id: Optional[str] = getenv("AWS_ACCESS_KEY_ID"), + aws_secret_access_key: Optional[str] = getenv("AWS_SECRET_ACCESS_KEY"), + endpoint_url: Optional[str] = getenv("AWS_ENDPOINT_URL_S3")) +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/origins/s3.py#L24) + +Configure IcebergSink to work with AWS Glue. + + +
+***Arguments:*** + +- `bucket`: The S3 bucket name only (ex: 'your-bucket'). +- `region_name`: The AWS region. +NOTE: can alternatively set the AWS_REGION environment variable +- `aws_access_key_id`: the AWS access key ID. +NOTE: can alternatively set the AWS_ACCESS_KEY_ID environment variable +- `aws_secret_access_key`: the AWS secret access key. +NOTE: can alternatively set the AWS_SECRET_ACCESS_KEY environment variable +- `endpoint_url`: the endpoint URL to use; only required for connecting +to a locally hosted S3. +NOTE: can alternatively set the AWS_ENDPOINT_URL_S3 environment variable + ## quixstreams.sources.community.file.formats.json