"""EC2 service: AWS EC2 control plane (moto) + real container-backed instances.
moto owns the control plane — RunInstances, DescribeInstances, tags, VPCs,
security groups, AMIs — with full describe fidelity. On top of that, oblako backs
each instance with a **real Docker container** and a **Docker named volume as its
EBS root** (``real behavior, simulated topology``: an instance is a container,
EBS is a volume, the VM/Nitro boundary is simulated). This mirrors how Lambda
runs locally (a container per invoke) — EC2 just keeps its container long-lived,
matching a durable instance vs Lambda's ephemeral microVM.
AWS EC2 is full VMs (Nitro); Lambda is Firecracker microVMs. Neither boots on a
Mac, so both map to a container locally — the faithful distinction is lifecycle:
EC2 containers persist until TerminateInstances; Lambda's vanish after the invoke.
A Firecracker backend stays a future bare-metal-Linux fidelity option.
"""
from __future__ import annotations
import os
from .boto import BotoService
from .moto import MotoService
# A generic instance is a long-running container off a base Linux image. AMI ids
# are metadata only (moto), so the image is fixed (override via OBLAKO_EC2_IMAGE,
# e.g. a tiny image for tests).
DEFAULT_INSTANCE_IMAGE = "amazonlinux:2023"
EBS_MOUNT = "/ebs" # the instance's EBS volume is mounted here
INSTANCE_LABEL = "oblako.ec2.instance-id"
def _docker():
from .backends import docker_client
return docker_client() # honours OBLAKO_CONTAINER_BACKEND (docker/podman/colima)
def _image_for(image_id: str | None) -> str:
"""Resolve the backing container image (AMI id is metadata; env can override)."""
return os.environ.get("OBLAKO_EC2_IMAGE") or DEFAULT_INSTANCE_IMAGE
def _container_name(instance_id: str) -> str:
return f"oblako-ec2-{instance_id}"
def _volume_name(instance_id: str) -> str:
return f"oblako-ec2-{instance_id}" # the instance's EBS volume
def start_instance_container(
instance_id: str,
image_id: str | None = None,
image: str | None = None,
command=None,
published_ports: dict | None = None,
) -> str:
"""Start (idempotently) the backing container + EBS volume for an instance.
Reusable by both Ec2Service and the CloudFormation AWS::EC2::Instance provider
so a stack-provisioned instance is just as real as a service-launched one.
``image`` forces a specific backing image (e.g. a Jupyter image for a
SageMaker notebook instance); otherwise the AMI id maps to the default.
``published_ports`` maps container ports to host ports (e.g. a notebook
instance publishing JupyterLab).
"""
import docker
client = _docker()
name = _container_name(instance_id)
try: # already backed?
existing = client.containers.get(name)
if existing.status != "running":
existing.start()
return existing.id
except docker.errors.NotFound:
pass
image = image or _image_for(image_id)
try:
client.images.get(image)
except docker.errors.ImageNotFound:
client.images.pull(image)
vol = _volume_name(instance_id)
try:
client.volumes.get(vol)
except docker.errors.NotFound:
client.volumes.create(vol, labels={INSTANCE_LABEL: instance_id}) # EBS root
container = client.containers.run(
image,
command=command or ["sleep", "infinity"],
detach=True,
name=name,
volumes={vol: {"bind": EBS_MOUNT, "mode": "rw"}},
ports=published_ports or None,
extra_hosts={
"host.docker.internal": "host-gateway"
}, # reach oblako on the host
labels={INSTANCE_LABEL: instance_id, "oblako.service": "ec2"},
)
return container.id
def terminate_instance_container(instance_id: str) -> None:
"""Remove the backing container and its EBS volume (idempotent)."""
import docker
client = _docker()
try:
client.containers.get(_container_name(instance_id)).remove(force=True)
except docker.errors.NotFound:
pass
try:
client.volumes.get(_volume_name(instance_id)).remove(force=True)
except docker.errors.NotFound:
pass
[docs]
@BotoService("ec2")
class Ec2Service:
"""AWS EC2 — moto control plane + container-backed instances."""
name = "ec2"
def __init__(self, moto: MotoService):
"""Wire to the shared moto endpoint (moto owns EC2 metadata)."""
self.moto = moto
@property
def endpoint_url(self) -> str:
"""Moto serves EC2 at the same endpoint as every other AWS API."""
return self.moto.endpoint_url
# Control plane + real compute
[docs]
def run_instance(
self,
image_id: str = "ami-0abcdef1234567890",
instance_type: str = "t3.micro",
*,
backed: bool = True,
**kwargs,
) -> str:
"""Launch one instance: record it in moto, then back it with a container.
Returns the InstanceId. ``backed=False`` records control-plane metadata
only (no container) — useful when Docker isn't available.
"""
iid = self.get_client().run_instances(
ImageId=image_id,
InstanceType=instance_type,
MinCount=1,
MaxCount=1,
**kwargs,
)["Instances"][0]["InstanceId"]
if backed:
start_instance_container(iid, image_id=image_id)
return iid
[docs]
def stop_instance(self, instance_id: str) -> None:
"""Stop the instance (moto -> stopped; container stopped, EBS volume kept)."""
self.get_client().stop_instances(InstanceIds=[instance_id])
self._with_container(instance_id, lambda c: c.stop())
[docs]
def start_instance(self, instance_id: str) -> None:
"""Start a stopped instance (moto -> running; container started)."""
self.get_client().start_instances(InstanceIds=[instance_id])
start_instance_container(instance_id)
[docs]
def terminate_instance(self, instance_id: str) -> None:
"""Terminate the instance (moto -> terminated; container + EBS volume removed)."""
self.get_client().terminate_instances(InstanceIds=[instance_id])
terminate_instance_container(instance_id)
[docs]
def instance_container(self, instance_id: str):
"""Return the backing container (or None if the instance isn't backed)."""
import docker
try:
return _docker().containers.get(_container_name(instance_id))
except docker.errors.NotFound:
return None
[docs]
def list_instance_containers(self) -> list:
"""All oblako EC2 instance containers (running or stopped)."""
return _docker().containers.list(
all=True, filters={"label": "oblako.service=ec2"}
)
def _with_container(self, instance_id: str, fn) -> None:
c = self.instance_container(instance_id)
if c is not None:
fn(c)
# Lifecycle (moto-backed; instance containers are managed per-id, not as one service)
[docs]
def start(self) -> None:
"""Bring moto up if it isn't already (no own service container)."""
self.moto.wait_ready(timeout=2) or self.moto.start()
[docs]
def stop(self) -> None:
"""No-op — moto owns the control plane; instances are managed per-id."""
[docs]
def wait_ready(self, timeout: float = 5.0) -> bool:
"""Defer readiness to moto."""
return self.moto.wait_ready(timeout=timeout)
[docs]
def status(self):
"""Defer status to moto."""
return self.moto.status()