LoFP LoFP / in-cluster automation may produce the same pattern: validate `esql.user_name_values`, workload ownership, and whether `esql.source_ip_values` / `esql.source_asn_names` match expected egress before tuning or allowlisting.

Techniques

Sample rules

AWS Lateral Movement from Kubernetes SA via AssumeRoleWithWebIdentity

Description

Detects when credentials issued through AssumeRoleWithWebIdentity for a Kubernetes service account identity are later used for several distinct AWS control-plane actions on the same session access key. Workloads that use EKS IAM Roles for Service Accounts routinely exchange a projected service-account token for short-lived IAM credentials; this rule highlights sessions where that exchange is followed by a spread of sensitive APIs—reconnaissance, secrets and parameter access, IAM changes, or compute creation—beyond what routine pod traffic usually shows. High-volume S3 object reads and writes are excluded from the correlation set to reduce noise from normal data-plane work.

Detection logic

FROM logs-aws.cloudtrail-*
| WHERE (event.action == "AssumeRoleWithWebIdentity" AND user.name like "system:serviceaccount:*")
  // S3 PutObject/GetObject is too  common in legit pod SA behavior 
  OR (event.action IN ("ListBuckets", "DescribeInstances", "GetCallerIdentity",
    "ListUsers", "ListRoles", "ListAttachedRolePolicies", "GetRolePolicy",
    "GetSecretValue", "ListSecrets",
    "GetParameters", "DescribeParameters", "ListKeys", "Decrypt",
    "ListFunctions", "GetAuthorizationToken",
    "SendCommand", "StartSession",
    "CreateUser", "CreateAccessKey", "AttachRolePolicy", "CreateRole",
    "PutRolePolicy", "UpdateAssumeRolePolicy",
    "UpdateFunctionCode", "UpdateFunctionConfiguration", "ModifyInstanceAttribute",
    "StopLogging", "DeleteTrail")
    AND aws.cloudtrail.user_identity.type == "AssumedRole")
| GROK aws.cloudtrail.response_elements "accessKeyId=%{NOTSPACE:issued_key_id},"
| EVAL access_key = COALESCE(issued_key_id, aws.cloudtrail.user_identity.access_key_id)
| EVAL is_assume = CASE(event.action == "AssumeRoleWithWebIdentity", 1, 0)
| EVAL is_post_exploit = CASE(event.action != "AssumeRoleWithWebIdentity", 1, 0)
| EVAL phase = CASE(
    event.action == "AssumeRoleWithWebIdentity", "initial_access",
    event.action IN ("ListBuckets", "DescribeInstances", "ListUsers", "ListRoles",
      "GetCallerIdentity", "ListAttachedRolePolicies", "GetRolePolicy",
      "ListFunctions"), "recon",
    event.action IN ("GetSecretValue", "ListSecrets", "GetParameters",
      "GetAuthorizationToken", "Decrypt"), "credential_access",
    event.action IN ("SendCommand", "StartSession"), "lateral_movement",
    event.action IN ("CreateUser", "CreateAccessKey", "AttachRolePolicy",
      "CreateRole", "PutRolePolicy", "UpdateAssumeRolePolicy",
      "UpdateFunctionCode", "UpdateFunctionConfiguration",
      "ModifyInstanceAttribute"), "persistence",
    event.action IN ("StopLogging", "DeleteTrail"), "defense_evasion"
  )
| STATS 
    Esql.assume_count = SUM(is_assume),
    Esql.post_exploit_count = COUNT_DISTINCT(event.action),
    Esql.attack_phases = VALUES(phase),
    Esql.event_action_values = VALUES(event.action),
    Esql.source_ip_values = VALUES(source.ip),
    Esql.source_as_organization_name_values = VALUES(source.as.organization.name),
    Esql.user_name_values = VALUES(user.name),
    Esql.user_agent_original_values = VALUES(user_agent.original),
    Esql.cloud_account_id_values = VALUES(cloud.account.id),
    Esql.data_stream_namespace_values = VALUES(data_stream.namespace),
    Esql.first_seen = MIN(@timestamp),
    Esql.last_seen = MAX(@timestamp),
    Esql.total_calls = COUNT(*)
  BY access_key
| WHERE access_key is not null and Esql.assume_count >= 1 AND Esql.post_exploit_count >= 3
| EVAL aws.cloudtrail.user_identity.access_key_id = MV_FIRST(access_key)
| KEEP aws.cloudtrail.user_identity.access_key_id, Esql.*