Db interface exampleΒΆ

Check in the CRUD and Full CRUD example to get understanding what parameters could be received as *args and **kwargs.

Note

This example is for representational purposes only. Actual implementation details and functionality may vary. It serves as a template and may not accurately reflect the current state of the code or your implementation.

from motor import motor_asyncio

from py_ocpi.core.enums import ModuleID

client = motor_asyncio.AsyncIOMotorClient("db_url")
db = client.ocpi_database


class DbInterface:
    MODULE_MAP = {
        ModuleID.credentials_and_registration: "credentials_table",
        ModuleID.locations: "locations_table",
        ModuleID.cdrs: "cdrs_table",
        ModuleID.tokens: "tokens_table",
        ModuleID.tariffs: "tariffs_table",
        ModuleID.sessions: "sessions_table",
        ModuleID.commands: "commands_table",
        "integration": "integration_table",
        ModuleID.hub_client_info: "hubclientinfo_table",
        ModuleID.charging_profile: "chargingprofile_table",
    }

    @classmethod
    async def get(cls, module, id, *args, **kwargs) -> dict | None:
        collection = cls.MODULE_MAP[module]

        match module:
            case ModuleID.commands:
                # TODO: implement query using your identification for commands
                command_data = kwargs["comand_data"]
                query = {}
            case ModuleID.tokens:
                query = {"uid": id}
            case "integration":
                query = {"credentials_id": id}
            case _:
                query = {"id": id}
        return await db[collection].find_one(query)

    @classmethod
    async def get_all(cls, module, filters, *args, **kwargs) -> tuple[list[dict], int, bool]:
        data_list = await cls.list(module, filters, *args, **kwargs)
        total = await cls.count(module, filters, *args, **kwargs)
        is_last_page = await cls.is_last_page(
            module, filters, total, *args, **kwargs
        )
        return data_list, total, is_last_page

    @classmethod
    async def list(cls, module, filters, *args, **kwargs) -> list[dict]:
        collection = cls.MODULE_MAP[module]

        offset = await cls._get_offset_filter(filters)
        limit = await cls._get_limit_filter(filters)

        query = await cls._get_date_from_query(filters)
        query |= await cls._get_date_to_query(filters)

        return (
            await db[collection]
            .find(
                query,
            )
            .sort("_id")
            .skip(offset)
            .limit(limit)
            .to_list(None)
        )

    @classmethod
    async def create(cls, module, data, *args, **kwargs) -> dict:
        collection = cls.MODULE_MAP[module]

        return await db[collection].insert_one(data)

    @classmethod
    async def update(cls, module, data, id, *args, **kwargs) -> dict:
        collection = cls.MODULE_MAP[module]

        match module:
            case ModuleID.tokens:
                token_type = kwargs.get("token_type")
                query = {"uid": id}
                if token_type:
                    query |= {"token_type": token_type}
            case "integration":
                query = {"credentials_id": id}
            case ModuleID.credentials_and_registration:
                query = {"token": id}
            case _:
                query = {"id": id}

        return await db[collection].update_one(query, {"$set": data})

    @classmethod
    async def delete(cls, module, id, *args, **kwargs) -> None:
        collection = cls.MODULE_MAP[module]
        if module == ModuleID.credentials_and_registration:
            query = {"token": id}
        else:
            query = {"id": id}
        await db[collection].delete_one(query)

    @classmethod
    async def count(cls, module, filters, *args, **kwargs) -> int:
        collection = cls.MODULE_MAP[module]

        query = await cls._get_date_from_query(filters)
        query |= await cls._get_date_to_query(filters)

        total = db[collection].count_documents(query)
        return total

    @classmethod
    async def is_last_page(
        cls, module, filters, total, *args, **kwargs
    ) -> bool:
        offset = await cls._get_offset_filter(filters)
        limit = await cls._get_limit_filter(filters)
        return offset + limit >= total if limit else True

    @classmethod
    async def _get_offset_filter(cls, filters: dict) -> int:
        return filters.get("offset", 0)

    @classmethod
    async def _get_limit_filter(cls, filters: dict) -> int:
        return filters.get("limit", 0)

    @classmethod
    async def _get_date_from_query(cls, filters: dict) -> int:
        query = {}
        date_to = filters.get("date_to")
        if date_to:
            query.setdefault("last_updated", {}).update(
                {"$lte": date_to.isoformat()}
            )
        return query

    @classmethod
    async def _get_date_to_query(cls, filters: dict) -> int:
        query = {}
        date_from = filters.get("date_from")
        if date_from:
            query.setdefault("last_updated", {}).update(
                {"$gte": date_from.isoformat()}
            )
        return query