"""Local Trino — Athena-style SQL over the Iceberg REST catalog (= S3 Tables).
AWS Athena runs Trino/Presto-derived engines over S3. We expose the same
experience here: a Trino container with the Iceberg connector pre-wired to
oblako's Iceberg REST catalog and S3Proxy. ``SELECT * FROM iceberg.credit.applicants``
just works.
The catalog config is generated at ``~/.oblako/trino/catalog/iceberg.properties``
and mounted into the container.
"""
from __future__ import annotations
from oblako import ports
import time
from pathlib import Path
import httpx
from .base import Service, PortMapping
TRINO_CATALOG_DIR = Path.home() / ".oblako" / "trino" / "catalog"
_ICEBERG_PROPERTIES = """\
connector.name=iceberg
iceberg.catalog.type=rest
iceberg.rest-catalog.uri=http://host.docker.internal:8181
iceberg.rest-catalog.warehouse=s3://oblako-iceberg/
iceberg.rest-catalog.security=NONE
fs.native-s3.enabled=true
s3.endpoint=http://host.docker.internal:9000
s3.region=us-east-1
s3.path-style-access=true
s3.aws-access-key=test
s3.aws-secret-key=test
"""
[docs]
class TrinoService(Service):
"""Local Trino, pre-wired with the Iceberg connector to oblako's catalog + S3."""
def __init__(self, host_port: int = ports.TRINO):
"""Initialize on host_port (8485; Trino's internal port is 8080)."""
TRINO_CATALOG_DIR.mkdir(parents=True, exist_ok=True)
(TRINO_CATALOG_DIR / "iceberg.properties").write_text(_ICEBERG_PROPERTIES)
super().__init__(
name="trino",
image="trinodb/trino:latest",
ports=[PortMapping(container_port=8080, host_port=host_port)],
volumes={
str(TRINO_CATALOG_DIR): {"bind": "/etc/trino/catalog", "mode": "ro"}
},
environment={
# S3Proxy doesn't implement aws-chunked CRC32 — make the AWS SDK v2
# used by Trino's S3 client skip the new flexible checksums.
"AWS_REQUEST_CHECKSUM_CALCULATION": "when_required",
"AWS_RESPONSE_CHECKSUM_VALIDATION": "when_required",
},
extra_hosts={"host.docker.internal": "host-gateway"},
)
self.host_port = host_port
@property
def endpoint_url(self) -> str:
"""Trino HTTP endpoint (clients submit SQL via /v1/statement)."""
return f"http://localhost:{self.host_port}"
[docs]
def query(self, sql: str, *, timeout: float = 60.0) -> dict:
"""Run a SQL query through Trino's REST API. Returns ``{columns, rows}`` or ``{error}``."""
base = self.endpoint_url
headers = {"X-Trino-User": "oblako", "Content-Type": "text/plain"}
result = httpx.post(
f"{base}/v1/statement", content=sql, headers=headers, timeout=timeout
).json()
rows: list = []
columns: list[str] | None = None
deadline = time.time() + timeout
while True:
if "error" in result:
return {"error": result["error"]}
if columns is None and result.get("columns"):
columns = [c["name"] for c in result["columns"]]
rows.extend(result.get("data") or [])
next_uri = result.get("nextUri")
if not next_uri:
break
if time.time() > deadline:
return {"error": {"message": "query timeout"}}
result = httpx.get(next_uri, headers=headers, timeout=timeout).json()
return {"columns": columns or [], "rows": rows}
def _health_check(self) -> bool:
try:
resp = httpx.get(f"{self.endpoint_url}/v1/info", timeout=3.0)
return resp.status_code == 200 and not resp.json().get("starting", True)
except (
httpx.HTTPError
): # any transport error (incl. accept-then-reset) = not ready
return False