Source code for oblako.services.backends

"""Pluggable container backend so oblako isn't hard-wired to Docker.

``Service`` (see base.py) drives containers through a ``ContainerBackend`` instead
of calling docker-py directly. Docker, Podman, and Colima all speak the Docker
Engine API, so they share ``DockerBackend`` — the only difference is which socket
it talks to (honoured via ``DOCKER_HOST`` or auto-detected per runtime).
Kubernetes is a genuinely different control plane, so it gets its own backend, as
does Apple's ``container`` (macOS 26+), which has no Docker-API socket.

Select with ``OBLAKO_CONTAINER_BACKEND`` = docker (default) | podman | colima |
kubernetes | apple. ``DOCKER_HOST`` always wins for the Docker-API runtimes.
"""

from __future__ import annotations

import json
import os
import re
import shlex
import shutil
import signal
import subprocess

# container status normalised across backends
RUNNING = "running"
STOPPED = "stopped"
ABSENT = "absent"


class PortInUseError(RuntimeError):
    """A host port a service needs is already held by another container/process."""


# Kubernetes backend: namespace + a registry of live `kubectl port-forward`
# processes (so localhost:host_port reaches the in-cluster Service), keyed by name.
K8S_NAMESPACE = os.environ.get("OBLAKO_K8S_NAMESPACE") or "oblako"
_port_forwards: dict[str, list] = {}

# Best-effort socket locations when DOCKER_HOST is unset (the runtime's `start`
# command usually creates these). DOCKER_HOST overrides all of them.
_CANDIDATE_SOCKETS = {
    "colima": ["~/.colima/default/docker.sock"],
    "podman": [
        "$XDG_RUNTIME_DIR/podman/podman.sock",
        "~/.local/share/containers/podman/machine/podman.sock",
        "/run/podman/podman.sock",
    ],
}


[docs] class ContainerBackend: """Minimal container lifecycle interface a Service needs.""" name = "container"
[docs] def ensure_image(self, image: str) -> None: """Pull the image if it is not present locally.""" raise NotImplementedError
[docs] def build_image(self, image: str, context_dir: str) -> None: """Build an oblako-owned image from a Dockerfile context.""" raise NotImplementedError
[docs] def run( self, *, name, image, ports, environment, volumes, extra_hosts, command, working_dir, user, ) -> None: """Create and start a detached container with the given spec.""" raise NotImplementedError
[docs] def status(self, name: str) -> str: """Return RUNNING, STOPPED, or ABSENT for the named container.""" raise NotImplementedError
[docs] def remove(self, name: str) -> None: """Force-remove the container if present.""" raise NotImplementedError
[docs] def stop(self, name: str) -> None: """Stop and remove the container (no-op if absent).""" raise NotImplementedError
[docs] def logs(self, name: str, tail: int = 50) -> str: """Return the container's recent logs.""" raise NotImplementedError
[docs] class DockerBackend(ContainerBackend): """Docker Engine API backend — also serves Podman and Colima via their sockets.""" name = "docker" def __init__(self, base_url: str | None = None): """Initialize against an explicit socket URL, else the ambient Docker context.""" self._base_url = base_url self._client = None @property def client(self): """Lazily create the docker-py client, honouring DOCKER_HOST or the active context.""" if self._client is None: import docker self._client = ( docker.DockerClient(base_url=self._base_url) if self._base_url else docker.from_env() ) return self._client
[docs] def ensure_image(self, image: str) -> None: """Pull the image if it is not already present.""" from docker.errors import NotFound try: self.client.images.get(image) except NotFound: print(f"Pulling {image}...") self.client.images.pull(image)
[docs] def build_image(self, image: str, context_dir: str) -> None: """Build an image from a Dockerfile context (skips if already present).""" from docker.errors import NotFound try: self.client.images.get(image) return except NotFound: pass print(f"Building {image} from {context_dir} (first run, ~1-2 min)...") self.client.images.build(path=context_dir, tag=image, rm=True, pull=True)
[docs] def run( self, *, name, image, ports, environment, volumes, extra_hosts, command, working_dir, user, ) -> None: """Create and start a detached container.""" from docker.errors import APIError try: self.client.containers.run( image, name=name, detach=True, ports=ports, environment=environment, volumes=volumes, extra_hosts=extra_hosts, command=command, working_dir=working_dir, user=user, ) except APIError as e: msg = str(e) if ( "port is already allocated" in msg or "address already in use" in msg.lower() ): m = re.search(r":(\d+) failed", msg) or re.search( r"0\.0\.0\.0:(\d+)", msg ) port = m.group(1) if m else "?" raise PortInUseError( f"Can't start '{name}': host port {port} is already in use. " f"Another container or process holds it — find it with " f"`docker ps --filter publish={port}` (often a stale oblako " f"container) and stop/remove it, then retry." ) from e raise
[docs] def status(self, name: str) -> str: """Return RUNNING/STOPPED/ABSENT for the container.""" from docker.errors import NotFound try: container = self.client.containers.get(name) except NotFound: return ABSENT return RUNNING if container.status == "running" else STOPPED
[docs] def remove(self, name: str) -> None: """Force-remove the container if present.""" from docker.errors import NotFound try: self.client.containers.get(name).remove(force=True) except NotFound: pass
[docs] def stop(self, name: str) -> None: """Stop and remove the container (no-op if absent).""" from docker.errors import NotFound try: container = self.client.containers.get(name) except NotFound: return container.stop(timeout=10) container.remove()
[docs] def logs(self, name: str, tail: int = 50) -> str: """Return recent logs, or empty string if the container is gone.""" from docker.errors import NotFound try: return self.client.containers.get(name).logs(tail=tail).decode("utf-8") except NotFound: return ""
def _k8s_name(value: str) -> str: """Sanitize a string to a DNS-1123 label for use as a k8s resource/volume name.""" out = "".join(c if (c.isalnum() or c == "-") else "-" for c in value.lower()) return out.strip("-") or "vol" def _args_list(command) -> list | None: """Map a docker command (str or list) to k8s container args.""" if command is None: return None return command if isinstance(command, list) else command.split() def _run_as_user(user) -> int | None: """Map a docker user (e.g. 'root' or a uid) to a k8s runAsUser, else None.""" if user is None: return None if user == "root": return 0 try: return int(user) except (TypeError, ValueError): return None def build_k8s_manifests( *, name, image, ports, environment, volumes, extra_hosts, command, working_dir, user, namespace, ) -> dict: """Build a Deployment + Service ``List`` manifest for a Service (pure, no cluster).""" container_ports = [int(str(spec).split("/")[0]) for spec in ports] mounts, volume_defs = [], [] for i, (vol, mount) in enumerate(volumes.items()): vname = f"{_k8s_name(vol)}-{i}" mounts.append({"name": vname, "mountPath": mount["bind"]}) volume_defs.append({"name": vname, "emptyDir": {}}) container = { "name": name, "image": image, "imagePullPolicy": "IfNotPresent", "env": [{"name": k, "value": str(v)} for k, v in environment.items()], "ports": [{"containerPort": cp} for cp in container_ports], } args = _args_list(command) if args: container["args"] = args if working_dir: container["workingDir"] = working_dir if mounts: container["volumeMounts"] = mounts uid = _run_as_user(user) if uid is not None: container["securityContext"] = {"runAsUser": uid} pod_spec = {"containers": [container]} if volume_defs: pod_spec["volumes"] = volume_defs # Concrete host IPs become hostAliases; docker's "host-gateway" has no k8s # equivalent here (use host.minikube.internal in the manifest if needed). aliases = [ {"ip": ip, "hostnames": [host]} for host, ip in (extra_hosts or {}).items() if ip and ip != "host-gateway" ] if aliases: pod_spec["hostAliases"] = aliases deployment = { "apiVersion": "apps/v1", "kind": "Deployment", "metadata": {"name": name, "namespace": namespace, "labels": {"app": name}}, "spec": { "replicas": 1, "selector": {"matchLabels": {"app": name}}, "template": {"metadata": {"labels": {"app": name}}, "spec": pod_spec}, }, } service = { "apiVersion": "v1", "kind": "Service", "metadata": {"name": name, "namespace": namespace}, "spec": { "selector": {"app": name}, "ports": [ {"name": f"p{cp}", "port": cp, "targetPort": cp} for cp in container_ports ], }, } return {"apiVersion": "v1", "kind": "List", "items": [deployment, service]}
[docs] class KubernetesBackend(ContainerBackend): """Run oblako services as Kubernetes workloads (e.g. on minikube), via kubectl. Each Service maps to a Deployment + Service; ``kubectl port-forward`` binds localhost:host_port to the in-cluster Service so oblako's boto3 clients work unchanged. Port-forwards live as long as the oblako process — after a restart, re-run start (or restart) on a service to re-establish them. """ name = "kubernetes" def __init__(self, namespace: str | None = None, kubectl: str = "kubectl"): """Initialize for the given namespace (default OBLAKO_K8S_NAMESPACE / 'oblako').""" self.namespace = namespace or K8S_NAMESPACE self.kubectl = kubectl def _run(self, *args, stdin: str | None = None) -> subprocess.CompletedProcess: return subprocess.run( [self.kubectl, *args], input=stdin, text=True, capture_output=True ) def _ns(self, *args) -> subprocess.CompletedProcess: return self._run("-n", self.namespace, *args) def _require_kubectl(self) -> None: if shutil.which(self.kubectl) is None: raise RuntimeError( "kubectl not found — install it and point it at a cluster (e.g. minikube start)." )
[docs] def ensure_image(self, image: str) -> None: """No image pre-pull: the cluster pulls it (use `minikube image load` for local images).""" self._require_kubectl()
def _ensure_namespace(self) -> None: if self._run("get", "namespace", self.namespace).returncode != 0: self._run("create", "namespace", self.namespace)
[docs] def run( self, *, name, image, ports, environment, volumes, extra_hosts, command, working_dir, user, ) -> None: """Apply a Deployment + Service, wait for it, and port-forward each port.""" self._require_kubectl() self._ensure_namespace() manifest = build_k8s_manifests( name=name, image=image, ports=ports, environment=environment, volumes=volumes, extra_hosts=extra_hosts, command=command, working_dir=working_dir, user=user, namespace=self.namespace, ) applied = self._run("apply", "-f", "-", stdin=json.dumps(manifest)) if applied.returncode != 0: raise RuntimeError(f"kubectl apply failed: {applied.stderr.strip()}") self._ns( "wait", "--for=condition=available", f"deployment/{name}", "--timeout=180s" ) self._start_port_forward(name, ports)
def _start_port_forward(self, name: str, ports: dict) -> None: self._stop_port_forward(name) procs = [] for spec, host_port in ports.items(): container_port = int(str(spec).split("/")[0]) # Self-healing: a Deployment reports "available" before the pod is # actually serving, and a port-forward that races the Service's # endpoints just exits. A shell loop restarts it until it sticks; the # wait_ready health check then succeeds once the pod serves. loop = ( f"while true; do {self.kubectl} -n {self.namespace} port-forward " f"service/{name} {host_port}:{container_port} >/dev/null 2>&1; sleep 1; done" ) procs.append(subprocess.Popen(["sh", "-c", loop], start_new_session=True)) _port_forwards[name] = procs def _stop_port_forward(self, name: str) -> None: for proc in _port_forwards.pop(name, []): try: os.killpg( os.getpgid(proc.pid), signal.SIGTERM ) # kill the loop + kubectl except (ProcessLookupError, PermissionError): pass
[docs] def status(self, name: str) -> str: """Return RUNNING/STOPPED/ABSENT from the deployment's ready replicas.""" result = self._ns( "get", "deployment", name, "-o", "jsonpath={.status.readyReplicas}" ) if result.returncode != 0: return ABSENT ready = (result.stdout or "").strip() return RUNNING if ready and int(ready) >= 1 else STOPPED
[docs] def remove(self, name: str) -> None: """Delete the Deployment + Service and stop its port-forwards.""" self._stop_port_forward(name) self._ns("delete", "deployment,service", name, "--ignore-not-found")
[docs] def stop(self, name: str) -> None: """Stop the service (delete its workload + port-forwards).""" self.remove(name)
[docs] def logs(self, name: str, tail: int = 50) -> str: """Return recent logs from the deployment's pods.""" result = self._ns( "logs", f"deployment/{name}", f"--tail={tail}", "--all-containers" ) return result.stdout if result.returncode == 0 else ""
[docs] class AppleContainerBackend(ContainerBackend): """Apple `container` backend (macOS 26+): Linux containers in lightweight VMs. Shells out to the `container` CLI (there is no Python SDK). Each container runs in its own VM with an IP on a vmnet subnet, but `-p host:container` publishes to localhost exactly like Docker — so oblako's fixed-endpoint contract holds unchanged. Two adaptations vs Docker: named volumes are mapped to host dirs (the CLI bind-mounts host paths), and since there is no `host.docker.internal` / `--add-host`, env values referencing it are rewritten to the vmnet gateway. """ name = "apple" # The vmnet gateway is the host, as seen from inside a container. There is no # host.docker.internal alias and no --add-host, so we substitute this for any # env value that points at it (e.g. the Redshift proxy's backend host). HOST_GATEWAY = "192.168.64.1" def __init__(self): """Locate the `container` binary (PATH, then the Homebrew locations).""" self._bin = shutil.which("container") or next( ( p for p in ( "/opt/homebrew/bin/container", "/opt/homebrew/opt/container/bin/container", ) if os.path.exists(p) ), None, ) if not self._bin: raise RuntimeError( "Apple `container` CLI not found — install it with " "`brew install container` (needs macOS 26+)." ) def _cli(self, *args, check=True): return subprocess.run( [self._bin, *args], check=check, text=True, capture_output=True ) @staticmethod def _qualify(image: str) -> str: """Fully-qualify a Docker Hub short name (the CLI needs an explicit ref).""" first = image.split("/")[0] if "/" not in image: # e.g. python:3.12-slim -> docker.io/library/... return f"docker.io/library/{image}" if ( "." not in first and first != "localhost" ): # andrewgaul/s3proxy -> docker.io/... return f"docker.io/{image}" return image @staticmethod def _volume_source(src: str) -> str: """Bind path as-is; map a named volume to a stable host dir.""" if os.path.isabs(src): return src path = os.path.expanduser(f"~/.oblako/apple-volumes/{src}") os.makedirs(path, exist_ok=True) return path
[docs] def ensure_image(self, image: str) -> None: """Pre-pull the image (best effort; `container run` also auto-pulls).""" self._cli("image", "pull", self._qualify(image), check=False)
[docs] def build_image(self, image: str, context_dir: str) -> None: """Build an image from a Dockerfile context via `container build`.""" inspect = self._cli("image", "inspect", image, check=False) if inspect.returncode == 0: return print(f"Building {image} from {context_dir} (first run, ~1-2 min)...") result = self._cli("build", "-t", image, context_dir, check=False) if result.returncode != 0: raise RuntimeError( f"`container build` failed for {image}:\n{result.stderr or result.stdout}" )
[docs] def run( self, *, name, image, ports, environment, volumes, extra_hosts, command, working_dir, user, ) -> None: """Create and start a detached container via `container run`.""" args = ["run", "--detach", "--name", name] for spec, host_port in (ports or {}).items(): container_port, _, proto = spec.partition("/") mapping = f"{host_port}:{container_port}" args += ["-p", mapping + (f"/{proto}" if proto and proto != "tcp" else "")] for key, value in (environment or {}).items(): if isinstance(value, str): value = value.replace("host.docker.internal", self.HOST_GATEWAY) args += ["-e", f"{key}={value}"] for src, opt in (volumes or {}).items(): suffix = ":ro" if opt.get("mode") == "ro" else "" args += ["-v", f"{self._volume_source(src)}:{opt['bind']}{suffix}"] if working_dir: args += ["-w", working_dir] if user: args += ["-u", str(user)] args.append(self._qualify(image)) if command: args += command if isinstance(command, list) else shlex.split(command) try: self._cli(*args) except subprocess.CalledProcessError as e: err = (e.stderr or "") + (e.stdout or "") if "in use" in err.lower() or "already allocated" in err.lower(): raise PortInUseError( f"Can't start '{name}': a host port it needs is already in use.\n{err}" ) from e raise RuntimeError(f"`container run` failed for '{name}':\n{err}") from e
def _items(self): result = self._cli("ls", "--all", "--format", "json", check=False) if result.returncode != 0 or not result.stdout.strip(): return [] try: return json.loads(result.stdout) except json.JSONDecodeError: return []
[docs] def status(self, name: str) -> str: """Return RUNNING/STOPPED/ABSENT from `container ls --format json`.""" for item in self._items(): if item.get("id") == name: state = (item.get("status") or {}).get("state", "") return RUNNING if state == "running" else STOPPED return ABSENT
[docs] def remove(self, name: str) -> None: """Force-remove the container if present.""" self._cli("rm", "-f", name, check=False)
[docs] def stop(self, name: str) -> None: """Stop and remove the container (no-op if absent).""" self._cli("stop", name, check=False) self.remove(name) # force-remove; a plain `rm` can race the stop
[docs] def logs(self, name: str, tail: int = 50) -> str: """Return recent container logs, or empty string if unavailable.""" result = self._cli("logs", "-n", str(tail), name, check=False) return result.stdout if result.returncode == 0 else ""
def _socket_for(runtime: str) -> str | None: """Return the Docker-API socket URL for a runtime (podman/colima), or None.""" if os.environ.get("DOCKER_HOST"): return None # explicit DOCKER_HOST wins (docker-py honours it) for candidate in _CANDIDATE_SOCKETS.get(runtime, []): path = os.path.expanduser(os.path.expandvars(candidate)) if os.path.exists(path): return f"unix://{path}" return None def _docker_backend_for(runtime: str) -> DockerBackend: return DockerBackend(base_url=_socket_for(runtime)) def docker_client(): """Return a docker-py client for the configured Docker-API backend. The per-task compute paths (Lambda exec, Glue jobs, SageMaker local, EC2 instances, the Studio notebook) talk to Docker directly rather than through a Service's ``ContainerBackend``. Routing them through here means they target the SAME daemon the services use — so ``OBLAKO_CONTAINER_BACKEND=podman``/ ``colima`` works for compute too, without having to also set ``DOCKER_HOST``. (Kubernetes has no docker socket; the compute paths need a Docker-API runtime, so under ``kubernetes`` this falls back to the ambient Docker context.) """ import docker choice = (os.environ.get("OBLAKO_CONTAINER_BACKEND") or "docker").lower() socket = _socket_for(choice) if choice in ("podman", "colima") else None return docker.DockerClient(base_url=socket) if socket else docker.from_env() def get_backend() -> ContainerBackend: """Return the configured container backend (OBLAKO_CONTAINER_BACKEND, default docker).""" choice = (os.environ.get("OBLAKO_CONTAINER_BACKEND") or "docker").lower() if choice in ("kubernetes", "k8s"): return KubernetesBackend() if choice in ("apple", "container"): return AppleContainerBackend() if choice in ("podman", "colima"): return _docker_backend_for(choice) return DockerBackend()