"""Step Functions Local service."""
from oblako import ports
import json
from pathlib import Path
import httpx
from oblako import config
from .base import Service, PortMapping
from .boto import BotoService
DUMMY_ROLE = config.arn("iam", "role/DummyRole", region_scoped=False)
# Mounted into the container so executions can run the bundled ML templates in
# SFN Local mock mode (canned task results). The file is hot-reloaded on each run.
MOCK_CONFIG_MOUNT = "/oblako/sfn"
[docs]
@BotoService("stepfunctions")
class StepFunctionsService(Service):
"""Step Functions Local service backed by the official Amazon image."""
def __init__(
self,
host_port: int = ports.STEPFUNCTIONS,
lambda_endpoint: str = "http://host.docker.internal:3001",
):
"""Initialize the Step Functions service with the given host port and Lambda endpoint."""
mock_dir = Path.home() / ".oblako" / "sfn"
super().__init__(
name="stepfunctions",
image="amazon/aws-stepfunctions-local:latest",
ports=[PortMapping(container_port=8083, host_port=host_port)],
environment={
"LAMBDA_ENDPOINT": lambda_endpoint,
"SFN_MOCK_CONFIG": f"{MOCK_CONFIG_MOUNT}/MockConfigFile.json",
},
volumes={str(mock_dir): {"bind": MOCK_CONFIG_MOUNT, "mode": "ro"}},
# Reach the host lambda shim (oblako.lambda_shim on :3001) for live
# lambda:invoke tasks — e.g. the Bedrock prompt-chain -> local model.
extra_hosts={"host.docker.internal": "host-gateway"},
)
self.host_port = host_port
self.mock_config_path = mock_dir / "MockConfigFile.json"
[docs]
def write_mock_config(self, config: dict) -> None:
"""Write the SFN Local mock configuration to the mounted file (hot-reloaded)."""
self.mock_config_path.parent.mkdir(parents=True, exist_ok=True)
self.mock_config_path.write_text(json.dumps(config, indent=2))
[docs]
def start(self) -> None:
"""Seed the mock config from the bundled ML templates, then start the container."""
from . import sfn_templates
self.write_mock_config(sfn_templates.build_mock_config())
super().start()
@property
def endpoint_url(self) -> str:
"""Return the Step Functions Local endpoint URL."""
return f"http://localhost:{self.host_port}"
[docs]
def create_state_machine(
self, name: str, definition: dict, role_arn: str = DUMMY_ROLE
) -> str:
"""Create a state machine and return its ARN."""
sfn = self.get_client()
resp = sfn.create_state_machine(
name=name,
definition=json.dumps(definition),
roleArn=role_arn,
)
return resp["stateMachineArn"]
[docs]
def execute(self, state_machine_arn: str, input_data: dict) -> str:
"""Start an execution and return the execution ARN."""
sfn = self.get_client()
resp = sfn.start_execution(
stateMachineArn=state_machine_arn,
input=json.dumps(input_data),
)
return resp["executionArn"]
def _health_check(self) -> bool:
try:
resp = httpx.post(
self.endpoint_url,
headers={
"Content-Type": "application/x-amz-json-1.0",
"X-Amz-Target": "AWSStepFunctions.ListStateMachines",
},
content="{}",
timeout=3.0,
)
return resp.status_code == 200
except httpx.HTTPError:
# a starting SFN Local may accept then reset the connection
# (RemoteProtocolError/ReadError), not just refuse it — any transport
# error means not-ready, never crash wait_ready
return False