Source code for oblako.services.opensearch

"""OpenSearch service: vector search for Knowledge Bases / RAG."""

from oblako import ports
import httpx

from .base import Service, PortMapping


[docs] class OpenSearchService(Service): """OpenSearch service for vector search and RAG Knowledge Bases.""" def __init__(self, host_port: int = ports.OPENSEARCH): """Initialize the OpenSearch service on the given host port.""" super().__init__( name="opensearch", image="opensearchproject/opensearch:2", ports=[PortMapping(container_port=9200, host_port=host_port)], environment={ "discovery.type": "single-node", "DISABLE_SECURITY_PLUGIN": "true", "OPENSEARCH_INITIAL_ADMIN_PASSWORD": "admin", }, volumes={ "oblako-opensearch-data": { "bind": "/usr/share/opensearch/data", "mode": "rw", } }, ) self.host_port = host_port @property def url(self) -> str: """Return the OpenSearch base URL.""" return f"http://localhost:{self.host_port}" def _health_check(self) -> bool: try: resp = httpx.get(f"{self.url}/_cluster/health", timeout=3.0) return resp.status_code == 200 except httpx.HTTPError: # a starting OpenSearch may accept then reset the connection # (httpx.ReadError), not just refuse it — any transport error = not ready return False
[docs] def create_knn_index(self, index_name: str, dimension: int = 1536) -> dict: """Create a k-NN vector index (for RAG / Knowledge Bases).""" resp = httpx.put( f"{self.url}/{index_name}", json={ "settings": {"index": {"knn": True}}, "mappings": { "properties": { "embedding": { "type": "knn_vector", "dimension": dimension, "method": {"name": "hnsw", "engine": "faiss"}, }, "text": {"type": "text"}, "metadata": {"type": "object"}, } }, }, timeout=10.0, ) resp.raise_for_status() return resp.json()