Source code for oblako.services.rds

"""RDS / Aurora service: local Amazon RDS & Aurora.

Same pattern as Redshift — moto for the control plane + a real engine for the
data plane:
  * engine: a real PostgreSQL (default) or MySQL container (the actual database).
  * ``get_client()``: boto3 ``rds`` control plane (via moto) — create/describe
    **RDS instances** (`create_db_instance`) *and* **Aurora clusters**
    (`create_db_cluster`); returns endpoints, reader endpoints, members.
  * ``connect()``: a DB connection straight to the engine (psycopg2 / pymysql).
  * ``seed()``: re-create cluster/instance metadata (moto is in-memory) after a
    restart, idempotently.

The control-plane cluster/instance objects are moto metadata pointing at the one
local engine — real SQL behavior, simulated topology.
"""

from oblako import ports
from oblako import config
from .base import Service, PortMapping

_ENGINES = {
    "postgres": {
        "image": "postgres:16",
        "container_port": 5432,
        "default_host_port": 5432,
        "data_dir": "/var/lib/postgresql/data",
        "volume": "oblako-rds-data",
    },
    "mysql": {
        "image": "mysql:8.0",
        "container_port": 3306,
        "default_host_port": 3306,
        "data_dir": "/var/lib/mysql",
        "volume": "oblako-rds-mysql-data",
    },
}


[docs] class RdsService(Service): """Local Amazon RDS and Aurora service backed by a real PostgreSQL or MySQL engine.""" def __init__( self, engine: str = "postgres", host_port: int | None = None, user: str = "oblako", password: str = "oblako", database: str = "oblako", control_port: int = ports.MOTO, data_port: int = ports.RDS_DATA, region: str | None = None, ): """Initialize the RDS service for the specified engine (postgres or mysql).""" if engine not in _ENGINES: raise ValueError( f"engine must be one of {sorted(_ENGINES)}, got {engine!r}" ) spec = _ENGINES[engine] host_port = host_port or spec["default_host_port"] if engine == "postgres": environment = { "POSTGRES_USER": user, "POSTGRES_PASSWORD": password, "POSTGRES_DB": database, } else: # mysql environment = { "MYSQL_ROOT_PASSWORD": password, "MYSQL_DATABASE": database, "MYSQL_USER": user, "MYSQL_PASSWORD": password, } super().__init__( name="rds", image=spec["image"], ports=[ PortMapping(container_port=spec["container_port"], host_port=host_port) ], environment=environment, volumes={spec["volume"]: {"bind": spec["data_dir"], "mode": "rw"}}, ) self.engine = engine self.host_port = host_port self.user = user self.password = password self.database = database self.control_port = control_port self.data_port = data_port self.region = region or config.region()
[docs] def connect(self): """Return a DB connection to the engine (psycopg2 for postgres, pymysql for mysql).""" if self.engine == "mysql": try: import pymysql except ImportError as e: raise ImportError( "The MySQL engine needs pymysql: pip install 'oblako[mysql]'" ) from e return pymysql.connect( host="localhost", port=self.host_port, user=self.user, password=self.password, database=self.database, ) import psycopg2 return psycopg2.connect( host="localhost", port=self.host_port, user=self.user, password=self.password, dbname=self.database, )
[docs] def get_client(self): """boto3 ``rds`` control-plane client (RDS instances + Aurora clusters, via moto).""" from . import boto return boto.client( "rds", f"http://localhost:{self.control_port}", region=self.region, )
# ------------------------------------------------------------------------------- # Declarative seed (moto metadata is in-memory; recreate after restart) # ------------------------------------------------------------------------------- @staticmethod def _create_ignoring_exists(fn, **kwargs) -> bool: """Call fn(**kwargs); return True if created, False if it already existed.""" try: fn(**kwargs) return True except Exception as e: # noqa: BLE001 if "AlreadyExists" in type(e).__name__: return False raise
[docs] def seed( self, instances: list[dict] | None = None, clusters: list[dict] | None = None ) -> dict: """Idempotently (re)create RDS instances and Aurora clusters from a spec. ``clusters`` entries may carry an ``"instances"`` list of member specs. Each dict is passed through as kwargs to ``create_db_cluster`` / ``create_db_instance``. Safe to re-run after a moto restart. """ rds = self.get_client() created: dict[str, list[str]] = {"clusters": [], "instances": []} for spec in clusters or []: spec = dict(spec) members = spec.pop("instances", []) cid = spec["DBClusterIdentifier"] if self._create_ignoring_exists(rds.create_db_cluster, **spec): created["clusters"].append(cid) for member in members: if self._create_ignoring_exists( rds.create_db_instance, DBClusterIdentifier=cid, **member ): created["instances"].append(member["DBInstanceIdentifier"]) for spec in instances or []: if self._create_ignoring_exists(rds.create_db_instance, **spec): created["instances"].append(spec["DBInstanceIdentifier"]) return created
# ------------------------------------------------------------------------------- # RDS Data API (postgres or mysql engine) # -------------------------------------------------------------------------------
[docs] def start_data_server(self): """Start the rds-data server in-process (idempotent). Returns its URL.""" from oblako.engines.rds_data import RdsDataExecutor, start_in_thread executor = RdsDataExecutor( host="localhost", port=self.host_port, user=self.user, password=self.password, database=self.database, engine=self.engine, ) return start_in_thread(port=self.data_port, executor=executor)
[docs] def get_data_client(self, autostart: bool = True): """boto3 ``rds-data`` client executing real SQL against the engine.""" from oblako.engines import rds_data from . import boto if autostart and not rds_data.is_running(self.data_port): self.start_data_server() return boto.client( "rds-data", f"http://localhost:{self.data_port}", region=self.region, )
def _health_check(self) -> bool: try: conn = self.connect() conn.close() return True except Exception: # noqa: BLE001 - driver-specific connection errors return False