Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Implement DynamoDB backend #937

Open
iongion opened this issue Dec 13, 2024 · 1 comment
Open

Feature: Implement DynamoDB backend #937

iongion opened this issue Dec 13, 2024 · 1 comment

Comments

@iongion
Copy link

iongion commented Dec 13, 2024

If would be great if this backend could be added to the list.

Here is an attempt:

import datetime
import zlib
from typing import Any

import boto3
import msgspec
from aiocache.base import BaseCache
from aiocache.serializers import BaseSerializer
from aiocache.serializers.serializers import _NOT_SET
from boto3.dynamodb.types import Binary



class MsgSpecSerializer(BaseSerializer):
    """Transform data to bytes using msgspec.dumps and msgspec.loads to retrieve it back.

    You need to have ``msgspec`` installed in order to be able to use this serializer.

    :param encoding: str. Can be used to change encoding param for ``msg.loads`` method.
        Default is utf-8.
    :param use_list: bool. Can be used to change use_list param for ``msgspec.loads`` method.
        Default is True.
    """

    def __init__(self, encoding=_NOT_SET, use_list=True, compress=False):
        if not msgspec:
            raise RuntimeError("msgspec not installed, MsgSpecSerializer unavailable")
        self.encoding = self.DEFAULT_ENCODING if encoding is _NOT_SET else encoding
        self.use_list = use_list
        self.compress = compress

    def dumps(self, value) -> bytes:
        """Serialize the received value using ``msgpack.dumps``.

        :param value: obj
        :returns: bytes
        """
        output = msgspec.msgpack.encode(value)
        if self.compress:
            output = zlib.compress(output)
        return output

    def loads(self, value: bytes) -> Any:
        """Deserialize value using ``msgpack.loads``.

        :param value: bytes
        :returns: obj
        """
        if value is None:
            return None
        if isinstance(value, Binary):
            value = value.value
        if isinstance(value, memoryview):
            value = bytes(value)
        decoded = msgspec.msgpack.decode(value)
        if self.compress:
            decoded = zlib.decompress(decoded)
        return decoded


class DynamoDBBackend(BaseCache):
    def __init__(self, table_name="cache_table", region_name="us-east-1", key_column="CacheKey", val_column="CacheValue", ttl_attribute_name="CacheTTL", **kwargs):
        super().__init__(**kwargs)
        self.key_column = key_column
        self.val_column = val_column
        self.ttl_attribute_name = ttl_attribute_name
        self.table_name = table_name
        self.region_name = region_name
        self.resource = boto3.resource("dynamodb", region_name=region_name)
        self.table = self.resource.Table(table_name)

    async def _get(self, key, encoding="utf-8", _conn=None):
        response = self.table.get_item(Key={self.key_column: key})
        item = response.get("Item")
        if not item:
            return None
        value = item[self.val_column]
        return self.serializer.loads(value)

    async def _set(self, key, value, ttl=0, _cas_token=None, _conn=None):
        serialized_value = self.serializer.dumps(value)
        item = {self.key_column: key, self.val_column: serialized_value}
        if ttl > 0:
            expires_at = datetime.datetime.now() + datetime.timedelta(seconds=ttl)
            item[self.ttl_attribute_name] = int(expires_at.timestamp())
        self.table.put_item(Item=item)
        return True

    async def _delete(self, key, _conn=None):
        self.table.delete_item(Key={self.key_column: key})
        return True

    async def _exists(self, key, _conn=None):
        # Assume a keys exists as dynamodb does not have a direct way to check if a key exists
        return True

    async def _clear(self, namespace=None, _conn=None):
        raise NotImplementedError("Clearing all items is not supported for DynamoDB.")

    async def _close(self, *args, _conn=None, **kwargs):
        pass  # No specific close logic required for boto3

    def build_key(self, key: str, namespace: str | None = None) -> str:
        return self._str_build_key(key, namespace)


class DynamoDBCache(DynamoDBBackend):
    """DynamoDB cache implementation.

    With the following components as defaults:
        - serializer: :class:`BaseSerializer`
        - plugins: []

    Config options are:

    :param serializer: obj derived from :class:`aiocache.serializers.BaseSerializer`.
    :param plugins: list of :class:`aiocache.plugins.BasePlugin` derived classes.
    :param namespace: string to use as default prefix for the key used in all operations of
        the backend. Default is an empty string, "".
    :param timeout: int or float in seconds specifying maximum timeout for the operations to last.
        By default its 5.
    :param table_name: str with the name of the DynamoDB table. Default is "cache_table".
    :param region_name: str with the AWS region for DynamoDB. Default is "us-east-1".
    """

    NAME = "dynamodb"

    def __init__(self, serializer=None, compress=False, **kwargs):
        super().__init__(serializer=serializer or MsgSpecSerializer(compress=compress), **kwargs)

    @classmethod
    def parse_uri_path(cls, path):
        return {}

    def __repr__(self):
        return f"DynamoDBCache (table_name={self.table_name}, region_name={self.region_name})"

It is important to create a dynamodb table with a TTL column - the default table columns are

  • key_column: CacheKey
  • val_column: CacheValue
  • ttl_attribute_name: CacheTTL
@Dreamsorcerer
Copy link
Member

Dreamsorcerer commented Dec 13, 2024

As I've mentioned elsewhere, it's probably best to create additional backends in separate libraries, rather than us trying to maintain dozens of backends. If you do this, then open an issue/PR to link to your backend from the README, then other users will be able to find it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants