Source code for oblako.services.iam

"""Local IAM + STS over moto's control plane, with oblako's policy evaluator.

moto stores users, roles, and policies (real boto3 ``iam``/``sts`` behaviour);
oblako adds what moto does not evaluate: role trust-policy checks on AssumeRole
and identity-policy authorization simulation, both cross-account aware. The
account is identity metadata (single backend) — cross-account is expressed in the
ARNs and trust policies and decided by the evaluator.
"""

from __future__ import annotations

import json

from oblako.engines.iam import evaluator
from .boto import BotoService
from .moto import MotoService


[docs] @BotoService("iam", "sts") class IamService: """IAM/STS control plane (moto) plus oblako's trust + authorization evaluator.""" name = "iam" def __init__(self, moto: MotoService | None = None): """Initialize against a moto control plane (shared instance if provided).""" self._moto = moto or MotoService() @property def endpoint_url(self) -> str: """Return the moto endpoint that backs the IAM/STS control plane.""" return self._moto.endpoint_url
[docs] def create_user(self, name: str) -> dict: """Create an IAM user and return its description.""" return self.get_client().create_user(UserName=name)["User"]
[docs] def create_role(self, name: str, trust_policy: dict) -> dict: """Create an IAM role with the given trust policy and return its description.""" return self.get_client().create_role( RoleName=name, AssumeRolePolicyDocument=json.dumps(trust_policy), )["Role"]
[docs] def create_policy(self, name: str, document: dict) -> dict: """Create a customer-managed policy and return its description.""" return self.get_client().create_policy( PolicyName=name, PolicyDocument=json.dumps(document), )["Policy"]
[docs] def attach_role_policy(self, role_name: str, policy_arn: str) -> None: """Attach a managed policy to a role.""" self.get_client().attach_role_policy(RoleName=role_name, PolicyArn=policy_arn)
[docs] def attach_user_policy(self, user_name: str, policy_arn: str) -> None: """Attach a managed policy to a user.""" self.get_client().attach_user_policy(UserName=user_name, PolicyArn=policy_arn)
@staticmethod def _statements(document) -> list: if isinstance(document, dict): stmts = document.get("Statement", []) return stmts if isinstance(stmts, list) else [stmts] return []
[docs] def gather_statements(self, principal_arn: str) -> list: """Collect all identity-policy statements (managed + inline) for a user/role ARN.""" iam = self.get_client() name = principal_arn.split("/")[-1] is_role = ":role/" in principal_arn if is_role: attached = iam.list_attached_role_policies(RoleName=name)[ "AttachedPolicies" ] inline_names = iam.list_role_policies(RoleName=name)["PolicyNames"] inline = [ iam.get_role_policy(RoleName=name, PolicyName=p)["PolicyDocument"] for p in inline_names ] else: attached = iam.list_attached_user_policies(UserName=name)[ "AttachedPolicies" ] inline_names = iam.list_user_policies(UserName=name)["PolicyNames"] inline = [ iam.get_user_policy(UserName=name, PolicyName=p)["PolicyDocument"] for p in inline_names ] statements: list = [] for ap in attached: pol = iam.get_policy(PolicyArn=ap["PolicyArn"])["Policy"] version = iam.get_policy_version( PolicyArn=ap["PolicyArn"], VersionId=pol["DefaultVersionId"], )["PolicyVersion"] statements += self._statements(version["Document"]) for doc in inline: statements += self._statements(doc) return statements
[docs] def authorize(self, principal_arn: str, action: str, resource: str) -> str: """Return Allow/Deny/ImplicitDeny for the principal doing action on resource.""" return evaluator.evaluate( self.gather_statements(principal_arn), action, resource )
[docs] def assume_role( self, role_arn: str, principal_arn: str, session_name: str = "oblako-session" ) -> dict: """Evaluate the role's trust policy, then sts:AssumeRole if the principal is allowed.""" iam = self.get_client() role_name = role_arn.split("/")[-1] trust = iam.get_role(RoleName=role_name)["Role"]["AssumeRolePolicyDocument"] if not evaluator.can_assume(trust, principal_arn): return { "allowed": False, "reason": f"{principal_arn} is not permitted by the role's trust policy", } resp = self.get_client("sts").assume_role( RoleArn=role_arn, RoleSessionName=session_name ) creds = resp["Credentials"] return { "allowed": True, "assumedRoleArn": resp["AssumedRoleUser"]["Arn"], "accessKeyId": creds["AccessKeyId"], "sessionToken": creds["SessionToken"][:28] + "…", }