Source code for oblako.services.glue

"""Local AWS Glue: PySpark jobs in the official ``amazon/aws-glue-libs:5`` image.

Submit a Glue / PySpark script and the service runs a per-job container that has
Spark + Glue libs + Iceberg already on the classpath. The container is wired so
Spark can reach oblako's Iceberg REST catalog and S3Proxy on the host
(``host.docker.internal``), so a Glue job can read/write the same Iceberg tables
pyiceberg sees.

The Glue 5 image is ~5 GB — pulled lazily by :meth:`ensure_image` on first use.
"""

from __future__ import annotations

import tempfile
from pathlib import Path

from oblako import config

IMAGE_TAG = "amazon/aws-glue-libs:5"

# Glue's bundled AWS SDK v2 trips over two gaps in oblako's S3Proxy backend: it
# 501s on the `x-amz-optional-object-attributes` header the SDK adds to
# listObjectsV2, and it omits `<Size>` for the zero-byte directory markers Spark
# writes (so hadoop-aws NPEs on S3Object.size() at commit). A tiny SDK
# ExecutionInterceptor fixes both (strip the request header; default null list
# sizes to 0); registered on every job via this S3A config.
_INTERCEPTOR_CLASS = "io.oblako.glue.S3ProxyCompatInterceptor"
# Spark/Hadoop config every job needs for S3A to work against the S3Proxy backend.
_SPARK_CONFS = (
    # the compat interceptor: strips the 501 header, fills null marker fields,
    # and turns S3Proxy's 500 on a single directory-marker DELETE into success
    f"spark.hadoop.fs.s3a.audit.execution.interceptors={_INTERCEPTOR_CLASS}",
    # use single-object DELETEs (not the bulk POST ?delete S3Proxy 500s on, and
    # which the interceptor can't cleanly fix up — its response carries a body),
    # so marker deletes go through the DELETE-500 -> 204 path above.
    "spark.hadoop.fs.s3a.multiobjectdelete.enable=false",
)


[docs] class GlueService: """Per-job Glue 5 runner. Submit a PySpark script, get back logs + exit code.""" name = "glue" def __init__(self): """Initialize a Glue runner (no persistent container — jobs run on demand).""" self._client = None @property def client(self): """Return the docker-py client for the configured backend (docker/podman/colima).""" if self._client is None: from .backends import docker_client self._client = docker_client() return self._client
[docs] def ensure_image(self) -> None: """Pull the Glue 5 image (~5 GB) if it isn't present locally.""" from docker.errors import ImageNotFound try: self.client.images.get(IMAGE_TAG) except ImageNotFound: print(f"Pulling {IMAGE_TAG} (~5 GB) — this can take a few minutes...") self.client.images.pull(IMAGE_TAG)
def _ensure_interceptor_jar(self) -> Path: """Build (once, cached) the S3Proxy-compat interceptor jar. Compiled in the Glue image itself (it ships javac 17 + the AWS SDK), so no host Java toolchain is needed. The cache filename includes a hash of the sources, so editing the interceptor transparently triggers a rebuild. Returns the host path to the jar, which ``submit_job`` mounts onto the Spark classpath. """ import hashlib src_dir = Path(__file__).parent / "glue_assets" sources = sorted(src_dir.glob("*.java")) digest = hashlib.sha256(b"".join(s.read_bytes() for s in sources)).hexdigest() jar = ( Path(tempfile.gettempdir()) / "oblako-glue" / f"oblako-glue-s3a-{digest[:12]}.jar" ) if jar.exists(): return jar jar.parent.mkdir(parents=True, exist_ok=True) self.ensure_image() build = ( 'set -e; BUNDLE=$(find /root/.m2 -name "bundle-*.jar" | head -1); ' "mkdir -p /tmp/c && " 'javac -cp "$BUNDLE" -d /tmp/c /src/*.java && ' f"(cd /tmp/c && jar cf /out/{jar.name} io)" ) container = self.client.containers.run( IMAGE_TAG, entrypoint=["bash", "-lc"], command=[build], detach=True, volumes={ str(src_dir): {"bind": "/src", "mode": "ro"}, str(jar.parent): {"bind": "/out", "mode": "rw"}, }, ) try: result = container.wait(timeout=180) if result["StatusCode"] != 0 or not jar.exists(): logs = container.logs().decode("utf-8", errors="replace") raise RuntimeError(f"interceptor jar build failed:\n{logs[-2000:]}") finally: container.remove(force=True) return jar
[docs] def submit_job( self, script: str, *, args: list[str] | None = None, env: dict[str, str] | None = None, timeout: int = 1200, ) -> dict: """Run a PySpark script in a Glue 5 container; return ``{exit_code, logs}``. The script is written to a tempdir mounted at ``/scripts/job.py``. The container is given creds + endpoints to reach oblako's S3Proxy and Iceberg REST catalog at ``host.docker.internal``, and an S3A interceptor that strips the one header S3Proxy can't handle (see ``_INTERCEPTOR_CLASS``). """ self.ensure_image() interceptor_jar = self._ensure_interceptor_jar() full_env = { "AWS_ACCESS_KEY_ID": "test", "AWS_SECRET_ACCESS_KEY": "test", "AWS_DEFAULT_REGION": config.region(), # S3Proxy doesn't speak the new flexible checksums. "AWS_REQUEST_CHECKSUM_CALCULATION": "when_required", "AWS_RESPONSE_CHECKSUM_VALIDATION": "when_required", **(env or {}), } with tempfile.TemporaryDirectory() as scripts_dir: (Path(scripts_dir) / "job.py").write_text(script) container = self.client.containers.run( IMAGE_TAG, command=[ "spark-submit", *[arg for conf in _SPARK_CONFS for arg in ("--conf", conf)], "/scripts/job.py", *(args or []), ], detach=True, volumes={ scripts_dir: {"bind": "/scripts", "mode": "ro"}, # drop the interceptor into a classpath-glob dir so S3A loads it str(interceptor_jar): { "bind": "/opt/spark/jars/oblako-glue-s3a.jar", "mode": "ro", }, }, environment=full_env, extra_hosts={"host.docker.internal": "host-gateway"}, ) try: result = container.wait(timeout=timeout) logs = container.logs().decode("utf-8", errors="replace") return {"exit_code": result["StatusCode"], "logs": logs} finally: container.remove(force=True)
[docs] def run_workflow(self, name: str, steps: list[dict], *, timeout: int = 600) -> dict: """Run a Glue workflow: a sequential pipeline of PySpark job steps. Each step is ``{name, script}``. Steps run in order, and a step runs only if every prior step SUCCEEDED — mirroring a Glue CONDITIONAL trigger gated on predecessor success. The first failure stops the pipeline; the rest are marked SKIPPED. Returns ``{name, status, steps:[{name, status, exitCode, logs}]}``. This covers the common sequential-ETL workflow. Full DAGs (parallel branches, crawlers, scheduled/event triggers) aren't modelled yet. """ results: list[dict] = [] failed = False for i, step in enumerate(steps): sname = step.get("name") or f"step{i + 1}" script = (step.get("script") or "").strip() if failed: results.append( {"name": sname, "status": "SKIPPED", "exitCode": None, "logs": ""} ) continue if not script: results.append( { "name": sname, "status": "FAILED", "exitCode": None, "logs": "empty script", } ) failed = True continue try: r = self.submit_job(script, timeout=timeout) except Exception as e: # noqa: BLE001 — surface the failure in the step results.append( { "name": sname, "status": "FAILED", "exitCode": None, "logs": str(e), } ) failed = True continue ok = r["exit_code"] == 0 results.append( { "name": sname, "status": "SUCCEEDED" if ok else "FAILED", "exitCode": r["exit_code"], "logs": r["logs"][-8000:], } ) if not ok: failed = True return { "name": name, "status": "FAILED" if failed else "SUCCEEDED", "steps": results, }