Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mypy: make quixstreams.platforms.* pass type checks #678

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ ignore_errors = true
module = [
"quixstreams.core.*",
"quixstreams.dataframe.*",
"quixstreams.platforms.*",
"quixstreams.rowproducer.*"
]
ignore_errors = true
4 changes: 2 additions & 2 deletions quixstreams/models/topics/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ class TopicManager:
"""

# Default topic params
default_num_partitions = 1
default_replication_factor = 1
default_num_partitions: Optional[int] = 1
default_replication_factor: Optional[int] = 1
default_extra_config: dict[str, str] = {}

# Max topic name length for the new topics
Expand Down
4 changes: 2 additions & 2 deletions quixstreams/models/topics/topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ class TopicConfig:
Generally used by Topic and any topic creation procedures.
"""

num_partitions: int
replication_factor: int
num_partitions: Optional[int]
replication_factor: Optional[int]
extra_config: dict[str, str] = dataclasses.field(default_factory=dict)

def as_dict(self):
Expand Down
9 changes: 5 additions & 4 deletions quixstreams/platforms/quix/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,21 @@ class QuixPortalApiService:

def __init__(
self,
auth_token: Optional[str] = None,
auth_token: str,
portal_api: Optional[str] = None,
api_version: Optional[str] = None,
default_workspace_id: Optional[str] = None,
):
self._portal_api = (
portal_api or QUIX_ENVIRONMENT.portal_api or DEFAULT_PORTAL_API_URL
)
self._auth_token = auth_token or QUIX_ENVIRONMENT.sdk_token
if not self._auth_token:
if not auth_token:
raise MissingConnectionRequirements(
f"A Quix Cloud auth token (SDK or PAT) is required; "
f"set with environment variable {QUIX_ENVIRONMENT.SDK_TOKEN}"
)
self._auth_token = auth_token

self._default_workspace_id = (
default_workspace_id or QUIX_ENVIRONMENT.workspace_id
)
Expand Down Expand Up @@ -133,7 +134,7 @@ def get_workspace_certificate(
f"/workspaces/{workspace_id}/certificates", timeout=timeout
).content
if not content:
return
return None

with ZipFile(BytesIO(content)) as z:
with z.open("ca.cert") as f:
Expand Down
57 changes: 36 additions & 21 deletions quixstreams/platforms/quix/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
import time
from copy import deepcopy
from typing import List, Optional
from typing import Any, List, Optional

from requests import HTTPError

Expand Down Expand Up @@ -122,17 +122,17 @@ def __init__(
try:
self._workspace_id = workspace_id or self.api.default_workspace_id
except UndefinedQuixWorkspaceId:
self._workspace_id = None
self._workspace_id = ""
logger.warning(
"'workspace_id' argument was not provided nor set with "
"'Quix__Workspace__Id' environment; if only one Workspace ID for the "
"provided auth token exists (often true with SDK tokens), "
"then that ID will be used. Otherwise, provide a known topic name to "
"method 'get_workspace_info(topic)' to obtain desired Workspace ID."
)
self._librdkafka_connect_config = None
self._quix_broker_settings = None
self._workspace_meta = None
self._librdkafka_connect_config: Optional[ConnectionConfig] = None
self._quix_broker_settings: dict[str, Any] = {}
self._workspace_meta: dict[str, Any] = {}
self._timeout = timeout
self._topic_create_timeout = topic_create_timeout

Expand Down Expand Up @@ -175,18 +175,22 @@ def librdkafka_extra_config(self) -> dict:
}

@classmethod
def convert_topic_response(cls, api_response: dict) -> Topic:
def convert_topic_response(
cls, api_response: dict, extra_config: Optional[dict] = None
) -> Topic:
"""
Converts a GET or POST ("create") topic API response to a Topic object

:param api_response: the dict response from a get or create topic call
:return: a corresponding Topic object
"""
if extra_config is None:
extra_config = {}

topic_config = api_response["configuration"]
extra_config = {
"retention.ms": topic_config["retentionInMinutes"] * 60 * 1000,
"retention.bytes": topic_config["retentionInBytes"],
}
extra_config["retention.ms"] = topic_config["retentionInMinutes"] * 60 * 1000
extra_config["retention.bytes"] = topic_config["retentionInBytes"]

# Map value returned by Quix API to Kafka Admin API format
if topic_config.get("cleanupPolicy"):
cleanup_policy = _quix_cleanup_policy_to_kafka(
Expand Down Expand Up @@ -254,6 +258,7 @@ def search_for_workspace(
for ws in ws_list:
if ws["name"] == workspace_name_or_id:
return ws
raise

def _set_workspace_info(self, workspace_data: dict):
ws_data = deepcopy(workspace_data)
Expand All @@ -280,14 +285,16 @@ def get_workspace_info(
:param timeout: response timeout (seconds); Default 30
"""
# TODO: more error handling with the wrong combo of ws_id and topic
ws_data: Optional[dict] = None
if self._workspace_id:
ws_data = self.search_for_workspace(
workspace_name_or_id=self._workspace_id, timeout=timeout
)
else:
elif known_workspace_topic:
ws_data = self.search_for_topic_workspace(
known_workspace_topic, timeout=timeout
)

if not ws_data:
raise NoWorkspaceFound(
"No workspace was found for the given workspace/auth-token/topic combo"
Expand Down Expand Up @@ -316,6 +323,8 @@ def search_workspace_for_topic(
if t["name"] == topic or t["id"] == topic:
return workspace_id

return None

def search_for_topic_workspace(
self, topic: str, timeout: Optional[float] = None
) -> Optional[dict]:
Expand Down Expand Up @@ -345,6 +354,8 @@ def search_for_topic_workspace(
):
return ws

return None

def create_topic(self, topic: Topic, timeout: Optional[float] = None):
"""
The actual API call to create the topic.
Expand All @@ -353,6 +364,8 @@ def create_topic(self, topic: Topic, timeout: Optional[float] = None):
:param timeout: response timeout (seconds); Default 30
"""
cfg = topic.config
if cfg is None:
raise RuntimeError("Topic config not set")

# settings that must be ints or Nones
ret_ms = cfg.extra_config.get("retention.ms")
Expand All @@ -366,7 +379,7 @@ def create_topic(self, topic: Topic, timeout: Optional[float] = None):
topic_rep_factor=cfg.replication_factor,
topic_ret_bytes=ret_bytes if ret_bytes is None else int(ret_bytes),
topic_ret_minutes=ret_ms if ret_ms is None else int(ret_ms) // 60000,
cleanup_policy=cfg.extra_config.get("cleanup.policy"),
cleanup_policy=cfg.extra_config.get("cleanup.policy"), # type: ignore[arg-type]
timeout=timeout if timeout is not None else self._timeout,
)
logger.debug(
Expand All @@ -391,15 +404,17 @@ def get_or_create_topic(
try:
return self.get_topic(topic_name=topic.name, timeout=timeout)
except QuixApiRequestFailure as e:
if e.status_code == 404:
# Topic likely does not exist (anything but success 404's; could inspect
# error string, but that creates a dependency on it never changing).
try:
return self.create_topic(topic, timeout=timeout)
except QuixApiRequestFailure:
# Multiple apps likely tried to create at the same time.
# If this fails, it raises with all previous API errors
return self.get_topic(topic_name=topic.name, timeout=timeout)
if e.status_code != 404:
raise

# Topic likely does not exist (anything but success 404's; could inspect
# error string, but that creates a dependency on it never changing).
try:
return self.create_topic(topic, timeout=timeout)
except QuixApiRequestFailure:
# Multiple apps likely tried to create at the same time.
# If this fails, it raises with all previous API errors
return self.get_topic(topic_name=topic.name, timeout=timeout)

def wait_for_topic_ready_statuses(
self,
Expand Down
23 changes: 11 additions & 12 deletions quixstreams/platforms/quix/topic_manager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, Literal
from typing import List, Literal, Optional

from quixstreams.models.topics import Topic, TopicAdmin, TopicManager

Expand All @@ -22,8 +22,8 @@ class QuixTopicManager(TopicManager):

# Default topic params
# Set these to None to use defaults defined in Quix Cloud
default_num_partitions: None = None
default_replication_factor: None = None
default_num_partitions = None
default_replication_factor = None

# Max topic name length for the new topics
_max_topic_name_len = 249
Expand Down Expand Up @@ -54,7 +54,7 @@ def __init__(
auto_create_topics=auto_create_topics,
)
self._quix_config_builder = quix_config_builder
self._topic_id_to_name = {}
self._topic_id_to_name: dict[str, str] = {}

def _finalize_topic(self, topic: Topic) -> Topic:
"""
Expand All @@ -64,12 +64,11 @@ def _finalize_topic(self, topic: Topic) -> Topic:
Additionally, sets the actual topic configuration since we now have it anyway.
"""
quix_topic_info = self._quix_config_builder.get_or_create_topic(topic)
quix_topic = self._quix_config_builder.convert_topic_response(quix_topic_info)
# allows us to include the configs not included in the API response
quix_topic.config.extra_config = {
**topic.config.extra_config,
**quix_topic.config.extra_config,
}
quix_topic = self._quix_config_builder.convert_topic_response(
quix_topic_info,
extra_config=topic.config.extra_config if topic.config else {},
)

topic_out = topic.__clone__(name=quix_topic.name, config=quix_topic.config)
self._topic_id_to_name[topic_out.name] = quix_topic_info["name"]
return super()._finalize_topic(topic_out)
Expand All @@ -86,7 +85,7 @@ def _create_topics(
def _internal_name(
self,
topic_type: Literal["changelog", "repartition"],
topic_name: str,
topic_name: Optional[str],
suffix: str,
):
"""
Expand All @@ -107,6 +106,6 @@ def _internal_name(
"""
return super()._internal_name(
topic_type,
self._topic_id_to_name[topic_name],
self._topic_id_to_name[topic_name] if topic_name else None,
suffix,
)
7 changes: 7 additions & 0 deletions quixstreams/sources/core/kafka/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,13 @@ def _validate_topics(self) -> None:
target_topic_config,
)

# should never happen
if (
source_topic_config.num_partitions is None
or target_topic_config.num_partitions is None
):
return

if source_topic_config.num_partitions > target_topic_config.num_partitions:
raise ValueError("Source topic has more partitions than destination topic")
elif source_topic_config.num_partitions < target_topic_config.num_partitions:
Expand Down
4 changes: 2 additions & 2 deletions tests/test_quixstreams/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,8 +378,8 @@ def factory(workspace_id: Optional[str] = None):
strip_workspace_id_prefix(workspace_id, s) if workspace_id else s
)

cfg_builder.convert_topic_response.side_effect = (
lambda topic: QuixKafkaConfigsBuilder.convert_topic_response(topic)
cfg_builder.convert_topic_response = (
QuixKafkaConfigsBuilder.convert_topic_response
)

# Mock the create API call and return this response.
Expand Down
Loading