Source code for oblako.services.kinesis

"""Local AWS Kinesis Data Streams: kinesalite via saidsef/aws-kinesis-local.

Point a boto3 ``kinesis`` client at this service and the usual API works
(create_stream, put_record, get_shard_iterator, get_records, list_streams, …).
State persists in a named volume so streams survive restarts.
"""

from __future__ import annotations

from oblako import ports
from .base import Service, PortMapping
from .boto import BotoService


[docs] @BotoService("kinesis") class KinesisService(Service): """Local Kinesis Data Streams (kinesalite-backed).""" def __init__(self, host_port: int = ports.KINESIS, shard_limit: int = 100): """Initialize on host_port (4567 default) with a configurable shard limit.""" super().__init__( name="kinesis", image="saidsef/aws-kinesis-local:latest", ports=[PortMapping(container_port=4567, host_port=host_port)], # The image's CMD is shell-mangled; override with a clean args list. command=[ "--port", "4567", "--path", "/data", "--shardLimit", str(shard_limit), ], volumes={"oblako-kinesis-data": {"bind": "/data", "mode": "rw"}}, ) self.host_port = host_port @property def endpoint_url(self) -> str: """Return the local Kinesis endpoint URL for boto3 clients.""" return f"http://localhost:{self.host_port}" def _health_check(self) -> bool: try: self.get_client().list_streams(Limit=1) return True except Exception: # noqa: BLE001 return False