Source code for oblako.services.iceberg

"""Local Iceberg REST catalog — Iceberg tables on S3Proxy.

On AWS this is **S3 Tables** (``boto3.client("s3tables")``): a managed Iceberg
catalog over S3. Same shape locally — Iceberg REST + S3Proxy underneath; the
platform aliases this service as ``oblako.s3tables`` to mirror the AWS API.

The ``tabulario/iceberg-rest`` reference image speaks the Iceberg REST API on
:8181, with a built-in JDBC catalog. Configured here so the *warehouse* is
``s3://oblako-iceberg/`` on S3Proxy, with path-style addressing and the same
creds the rest of oblako uses.

pyiceberg clients connect via ``FsspecFileIO`` (s3fs/boto3 — honours
``AWS_REQUEST_CHECKSUM_CALCULATION=when_required`` which S3Proxy needs;
pyarrow's S3FS doesn't and the multipart upload fails with HTTP 400):

    import os
    os.environ.update(AWS_REQUEST_CHECKSUM_CALCULATION="when_required",
                      AWS_RESPONSE_CHECKSUM_VALIDATION="when_required")
    from pyiceberg.catalog import load_catalog
    cat = load_catalog("oblako",
                       uri="http://localhost:8181",
                       warehouse="s3://oblako-iceberg/",
                       **{"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO",
                          "s3.endpoint": "http://localhost:9000",
                          "s3.access-key-id": "test",
                          "s3.secret-access-key": "test",
                          "s3.path-style-access": "true"})
"""

from __future__ import annotations

from oblako import ports
import httpx

from oblako import config
from .base import Service, PortMapping

DEFAULT_WAREHOUSE = "s3://oblako-iceberg/"


[docs] class IcebergCatalogService(Service): """Local Iceberg REST catalog backed by S3Proxy.""" def __init__( self, host_port: int = ports.ICEBERG, s3_endpoint: str = "http://host.docker.internal:9000", warehouse: str = DEFAULT_WAREHOUSE, ): """Initialize on host_port (8181) with the warehouse on S3Proxy.""" super().__init__( name="iceberg", image="tabulario/iceberg-rest:latest", ports=[PortMapping(container_port=8181, host_port=host_port)], environment={ "AWS_ACCESS_KEY_ID": "test", "AWS_SECRET_ACCESS_KEY": "test", "AWS_REGION": config.region(), "CATALOG_WAREHOUSE": warehouse, "CATALOG_IO__IMPL": "org.apache.iceberg.aws.s3.S3FileIO", "CATALOG_S3_ENDPOINT": s3_endpoint, "CATALOG_S3_PATH-STYLE-ACCESS": "true", }, extra_hosts={"host.docker.internal": "host-gateway"}, ) self.host_port = host_port self.warehouse = warehouse @property def endpoint_url(self) -> str: """REST catalog endpoint that pyiceberg / Spark Iceberg connect to.""" return f"http://localhost:{self.host_port}" def _health_check(self) -> bool: try: resp = httpx.get(f"{self.endpoint_url}/v1/config", timeout=3.0) return resp.status_code == 200 except ( httpx.HTTPError ): # any transport error (incl. accept-then-reset) = not ready return False