skills/building-identity-governance-lifecycle-process/SKILL.md
构建全面的身份治理(Identity Governance)与生命周期管理流程,包括入职-调岗-离职(Joiner-Mover-Leaver)自动化、角色挖掘、访问申请工作流、定期重新认证以及孤立账户修复,使用 IGA 平台实现。 适用于身份生命周期管理、JML 流程、基于角色的访问配置或身份治理程序设计等相关请求。
npx skillsauth add killvxk/cybersecurity-skills-zh building-identity-governance-lifecycle-processInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
不适用于单一应用程序的用户管理;身份治理面向跨系统的生命周期管理,需要将权威 HR 来源与下游应用程序配置进行关联。
绘制从入职到离职的身份生命周期图:
"""
身份生命周期状态机
定义所有身份状态及有效转换,以及自动化动作。
"""
IDENTITY_LIFECYCLE = {
"states": {
"PRE_HIRE": {
"description": "在入职日期前从 HR 数据源创建的身份",
"automated_actions": [
"在 IGA 平台创建身份记录",
"生成唯一员工 ID",
"创建邮箱预留",
"基于职位代码分配天赋角色",
"启动背景调查工作流"
],
"valid_transitions": ["ACTIVE", "CANCELLED"]
},
"ACTIVE": {
"description": "员工已入职,完整访问权限已配置",
"automated_actions": [
"创建 Active Directory 账户",
"创建电子邮件邮箱",
"配置天赋应用程序访问权限",
"分配部门特定角色",
"添加到分发组",
"颁发 MFA 令牌/安全密钥",
"为远程工作者创建 VPN 账户"
],
"valid_transitions": ["ROLE_CHANGE", "LEAVE_OF_ABSENCE", "TERMINATED"]
},
"ROLE_CHANGE": {
"description": "员工调岗、晋升或更换部门",
"automated_actions": [
"根据新职位代码重新计算角色分配",
"移除对前部门应用程序的访问权限",
"配置对新部门应用程序的访问权限",
"更新组成员身份",
"在目录中转移经理关系",
"对保留的权限触发访问审查",
"通知新经理关于继承的访问权限"
],
"valid_transitions": ["ACTIVE", "LEAVE_OF_ABSENCE", "TERMINATED"]
},
"LEAVE_OF_ABSENCE": {
"description": "员工处于长期休假(医疗、育儿、学术休假)",
"automated_actions": [
"禁用交互式登录(保留账户)",
"暂停 VPN 访问",
"设置外出自动回复",
"将邮箱委托给经理",
"保留所有角色分配以便返回",
"从 HR 数据源设置重新激活日期"
],
"valid_transitions": ["ACTIVE", "TERMINATED"]
},
"TERMINATED": {
"description": "员工已离开组织",
"automated_actions": [
"立即禁用 AD 账户",
"撤销所有应用程序访问权限",
"撤销 VPN 和远程访问权限",
"将邮箱转为共享邮箱(经理可访问 90 天)",
"将 OneDrive 文件转移给经理",
"从所有安全和分发组中移除",
"撤销 OAuth 令牌和 API 密钥",
"从移动设备清除企业数据",
"归档身份记录",
"在保留期后安排账户删除"
],
"valid_transitions": ["REHIRE", "DELETED"]
},
"REHIRE": {
"description": "之前离职的员工重新入职",
"automated_actions": [
"重新激活现有身份记录",
"重置凭据并要求重新注册 MFA",
"根据新职位代码配置权限(不使用之前的访问权限)",
"在前 30 天内标记为增强访问审查"
],
"valid_transitions": ["ACTIVE"]
},
"DELETED": {
"description": "在保留期后永久删除账户",
"automated_actions": [
"删除 AD 账户",
"删除电子邮件邮箱存档",
"从 IGA 中移除身份记录",
"生成删除审计日志"
],
"valid_transitions": []
}
},
"retention_periods": {
"terminated_to_deleted": "90 天(默认)",
"mailbox_retention": "90 天作为共享邮箱",
"onedrive_retention": "经理访问 30 天,然后归档",
"audit_log_retention": "合规要求保留 7 年"
}
}
将 HR 系统连接为身份数据的单一真相来源:
"""
HR 来源集成 - Workday 到 IGA 平台连接器
轮询 Workday 获取员工生命周期事件并触发配置。
"""
import requests
from datetime import datetime, timedelta
import logging
class WorkdayIdentityConnector:
def __init__(self, config):
self.base_url = config["workday_api_url"]
self.tenant = config["tenant"]
self.client_id = config["client_id"]
self.client_secret = config["client_secret"]
self.session = requests.Session()
self.logger = logging.getLogger("workday_connector")
def get_access_token(self):
"""向 Workday REST API 进行认证。"""
token_url = f"{self.base_url}/ccx/oauth2/{self.tenant}/token"
response = self.session.post(token_url, data={
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
})
response.raise_for_status()
return response.json()["access_token"]
def fetch_worker_changes(self, since_datetime):
"""获取自上次同步以来的所有员工生命周期事件。"""
headers = {"Authorization": f"Bearer {self.get_access_token()}"}
params = {
"Updated_From": since_datetime.isoformat(),
"Updated_Through": datetime.utcnow().isoformat(),
"Count": 100
}
workers = []
url = f"{self.base_url}/ccx/api/v1/{self.tenant}/workers"
while url:
response = self.session.get(url, headers=headers, params=params)
response.raise_for_status()
data = response.json()
workers.extend(data.get("data", []))
url = data.get("next", None)
params = {}
return workers
def map_lifecycle_event(self, worker):
"""将 Workday 员工数据映射到身份生命周期事件。"""
worker_data = worker.get("workerData", {})
employment = worker_data.get("employmentData", {})
personal = worker_data.get("personalData", {})
event = {
"employee_id": worker.get("id"),
"first_name": personal.get("legalName", {}).get("firstName"),
"last_name": personal.get("legalName", {}).get("lastName"),
"email": worker_data.get("emailAddress"),
"job_code": employment.get("jobProfile", {}).get("id"),
"job_title": employment.get("jobProfile", {}).get("name"),
"department": employment.get("organization", {}).get("name"),
"department_code": employment.get("organization", {}).get("id"),
"manager_id": employment.get("managerId"),
"location": employment.get("location", {}).get("name"),
"cost_center": employment.get("costCenter", {}).get("id"),
"hire_date": employment.get("hireDate"),
"termination_date": employment.get("terminationDate"),
"status": employment.get("status"),
"worker_type": employment.get("workerType"),
}
# 确定生命周期转换
if event["status"] == "Active" and event["hire_date"]:
hire_date = datetime.fromisoformat(event["hire_date"])
if hire_date > datetime.utcnow():
event["lifecycle_event"] = "PRE_HIRE"
else:
event["lifecycle_event"] = "JOINER"
elif event["status"] == "Active":
event["lifecycle_event"] = "MOVER" # 部门或角色变更
elif event["status"] == "Terminated":
event["lifecycle_event"] = "LEAVER"
elif event["status"] == "On Leave":
event["lifecycle_event"] = "LEAVE_OF_ABSENCE"
return event
def process_lifecycle_events(self, since_datetime):
"""身份生命周期事件的主处理循环。"""
workers = self.fetch_worker_changes(since_datetime)
events = []
for worker in workers:
event = self.map_lifecycle_event(worker)
events.append(event)
self.logger.info(
f"生命周期事件:{event['lifecycle_event']} - "
f"{event['first_name']} {event['last_name']} "
f"(员工ID:{event['employee_id']})"
)
return events
根据职能定义角色,用于自动化配置:
"""
角色挖掘引擎
分析现有访问模式,推导角色定义
用于天赋(自动)配置。
"""
import pandas as pd
from collections import Counter
from itertools import combinations
class RoleMiningEngine:
def __init__(self, access_data):
"""
access_data:DataFrame,包含列
[employee_id, job_code, department, application, entitlement]
"""
self.access_data = access_data
def mine_birthright_roles(self, min_assignment_pct=0.8):
"""
识别应根据职位代码自动分配的权限。
若同一职位代码中 80% 以上的用户拥有某权限,
则该权限成为天赋访问权限。
"""
birthright_roles = {}
for job_code, group in self.access_data.groupby("job_code"):
total_users = group["employee_id"].nunique()
entitlement_counts = group.groupby(
["application", "entitlement"]
)["employee_id"].nunique()
birthright_entitlements = []
for (app, ent), count in entitlement_counts.items():
pct = count / total_users
if pct >= min_assignment_pct:
birthright_entitlements.append({
"application": app,
"entitlement": ent,
"assignment_percentage": round(pct * 100, 1),
"user_count": count
})
if birthright_entitlements:
birthright_roles[job_code] = {
"job_code": job_code,
"total_users": total_users,
"birthright_entitlements": birthright_entitlements
}
return birthright_roles
def detect_role_explosion(self):
"""识别重叠度过高的角色,说明需要整合。"""
roles = self.access_data.groupby("job_code").apply(
lambda x: set(zip(x["application"], x["entitlement"]))
)
overlap_report = []
for (role1, ents1), (role2, ents2) in combinations(roles.items(), 2):
if len(ents1) == 0 or len(ents2) == 0:
continue
overlap = len(ents1 & ents2)
max_size = max(len(ents1), len(ents2))
overlap_pct = overlap / max_size * 100
if overlap_pct > 70:
overlap_report.append({
"role_1": role1,
"role_2": role2,
"role_1_entitlements": len(ents1),
"role_2_entitlements": len(ents2),
"overlapping_entitlements": overlap,
"overlap_percentage": round(overlap_pct, 1),
"recommendation": "CONSOLIDATE" if overlap_pct > 90 else "REVIEW"
})
return sorted(overlap_report, key=lambda x: x["overlap_percentage"], reverse=True)
def find_orphaned_access(self):
"""
找出不再与任何角色定义对齐的权限。
这些是随着时间积累的例外情况。
"""
# 获取天赋权限定义
birthright = self.mine_birthright_roles(min_assignment_pct=0.5)
orphaned = []
for _, row in self.access_data.iterrows():
job_birthright = birthright.get(row["job_code"], {})
expected_ents = set()
for ent in job_birthright.get("birthright_entitlements", []):
expected_ents.add((ent["application"], ent["entitlement"]))
current_ent = (row["application"], row["entitlement"])
if current_ent not in expected_ents:
orphaned.append({
"employee_id": row["employee_id"],
"job_code": row["job_code"],
"application": row["application"],
"entitlement": row["entitlement"],
"recommendation": "审查是否撤销"
})
return pd.DataFrame(orphaned)
实施基于风险的审批自助访问申请:
"""
访问申请工作流引擎
处理自助访问申请,根据所申请权限的风险分类
进行多级审批。
"""
ACCESS_REQUEST_WORKFLOW = {
"risk_levels": {
"LOW": {
"description": "标准业务应用程序",
"examples": ["电子邮件分发组", "SharePoint 团队站点", "标准 SaaS 应用"],
"approval_chain": ["manager"],
"sla_hours": 4,
"auto_approve_if_birthright": True
},
"MEDIUM": {
"description": "敏感数据访问或提升权限",
"examples": ["CRM 管理员", "财务报告", "HR 系统"],
"approval_chain": ["manager", "application_owner"],
"sla_hours": 24,
"auto_approve_if_birthright": False
},
"HIGH": {
"description": "特权访问或受监管数据",
"examples": ["数据库管理员", "云管理员", "PAM 保险库访问"],
"approval_chain": ["manager", "application_owner", "security_team"],
"sla_hours": 48,
"auto_approve_if_birthright": False,
"require_justification": True,
"require_time_limit": True
},
"CRITICAL": {
"description": "域管理员、root 访问或生产数据修改",
"examples": ["Domain Admin", "AWS root", "生产数据库写入"],
"approval_chain": ["manager", "application_owner", "security_team", "ciso"],
"sla_hours": 72,
"auto_approve_if_birthright": False,
"require_justification": True,
"require_time_limit": True,
"require_sod_check": True,
"max_duration_days": 90
}
}
}
class AccessRequestEngine:
def __init__(self, iga_client, risk_catalog):
self.iga = iga_client
self.risk_catalog = risk_catalog
def submit_request(self, requester_id, entitlement_id, justification, duration_days=None):
"""提交访问申请,自动进行风险分类。"""
# 对所申请权限进行风险级别分类
risk_level = self.risk_catalog.get_risk_level(entitlement_id)
workflow = ACCESS_REQUEST_WORKFLOW["risk_levels"][risk_level]
# 检查权限是否为申请者角色的天赋权限
requester = self.iga.get_identity(requester_id)
is_birthright = self.iga.is_birthright_for_role(
entitlement_id, requester["job_code"]
)
if is_birthright and workflow.get("auto_approve_if_birthright"):
return self._auto_approve(requester_id, entitlement_id, "天赋访问权限")
# 如需要,执行职责分离(SOD)检查
if workflow.get("require_sod_check"):
sod_violations = self.iga.check_sod(requester_id, entitlement_id)
if sod_violations:
return {
"status": "SOD_VIOLATION",
"violations": sod_violations,
"action": "申请需要补偿性控制审批"
}
# 创建审批链
request = {
"requester": requester_id,
"entitlement": entitlement_id,
"risk_level": risk_level,
"justification": justification,
"duration_days": duration_days or workflow.get("max_duration_days"),
"approval_chain": self._build_approval_chain(
requester, workflow["approval_chain"]
),
"sla_deadline": workflow["sla_hours"],
"status": "PENDING_APPROVAL"
}
return self.iga.create_request(request)
def _build_approval_chain(self, requester, approver_types):
"""将审批链解析为实际的审批者身份。"""
chain = []
for approver_type in approver_types:
if approver_type == "manager":
chain.append({
"type": "manager",
"identity": requester["manager_id"],
"fallback": requester.get("skip_manager_id")
})
elif approver_type == "application_owner":
chain.append({
"type": "application_owner",
"identity": "resolved_at_runtime",
"fallback": "it-governance-team"
})
elif approver_type == "security_team":
chain.append({
"type": "group",
"identity": "security-governance-team",
"required_approvals": 1
})
elif approver_type == "ciso":
chain.append({
"type": "role",
"identity": "CISO",
"fallback": "deputy-ciso"
})
return chain
识别并修复没有活跃身份关联的账户:
"""
孤立账户检测
识别目标系统中在权威 HR 来源中没有对应活跃身份的账户。
"""
class OrphanedAccountDetector:
def __init__(self, hr_connector, app_connectors):
self.hr = hr_connector
self.apps = app_connectors
def detect_orphaned_accounts(self):
"""对照 HR 活跃员工比对应用程序账户。"""
active_employees = set(self.hr.get_active_employee_ids())
orphaned_accounts = []
for app_name, connector in self.apps.items():
app_accounts = connector.get_all_accounts()
for account in app_accounts:
correlated_id = account.get("employee_id") or account.get("correlation_id")
if correlated_id and correlated_id not in active_employees:
# 检查是否最近离职(在宽限期内)
termination_info = self.hr.get_termination_info(correlated_id)
orphaned_accounts.append({
"application": app_name,
"account_name": account["username"],
"correlated_employee_id": correlated_id,
"account_status": account.get("status", "unknown"),
"last_login": account.get("last_login"),
"termination_date": termination_info.get("date") if termination_info else None,
"days_since_termination": (
(datetime.utcnow() - termination_info["date"]).days
if termination_info and termination_info.get("date") else None
),
"risk_level": self._assess_orphan_risk(account, termination_info)
})
elif not correlated_id:
# 未关联账户 - 没有与任何员工的关联
orphaned_accounts.append({
"application": app_name,
"account_name": account["username"],
"correlated_employee_id": None,
"account_status": account.get("status", "unknown"),
"last_login": account.get("last_login"),
"risk_level": "HIGH",
"reason": "未关联 - 无员工身份关联"
})
return orphaned_accounts
def _assess_orphan_risk(self, account, termination_info):
"""评估孤立账户的风险级别。"""
if account.get("is_privileged"):
return "CRITICAL"
if termination_info and termination_info.get("involuntary"):
return "HIGH"
if account.get("status") == "active":
return "HIGH"
return "MEDIUM"
def generate_remediation_plan(self, orphaned_accounts):
"""为孤立账户创建修复行动计划。"""
plan = []
for account in orphaned_accounts:
if account["risk_level"] == "CRITICAL":
action = "DISABLE_IMMEDIATELY"
sla = "4 小时"
elif account["risk_level"] == "HIGH":
action = "DISABLE_WITHIN_24H"
sla = "24 小时"
else:
action = "REVIEW_AND_DISABLE"
sla = "7 天"
plan.append({
**account,
"remediation_action": action,
"sla": sla,
"assigned_to": "identity-governance-team"
})
return sorted(plan, key=lambda x: ["CRITICAL", "HIGH", "MEDIUM", "LOW"].index(x["risk_level"]))
| 术语 | 定义 | |------|------------| | 入职-调岗-离职(Joiner-Mover-Leaver,JML) | 核心身份生命周期转换,包括员工入职(Joiner)、角色/部门变更(Mover)和离职(Leaver) | | 天赋访问权限(Birthright Access) | 根据职位代码、部门或地点自动配置的基线权限,无需提交访问申请 | | 角色挖掘(Role Mining) | 通过识别相似职能中的常见权限分组,分析现有访问模式以推导角色定义 | | 孤立账户(Orphaned Account) | 在权威 HR 来源中不再有对应活跃身份的应用账户,代表安全风险 | | 权威来源(Authoritative Source) | 记录系统(通常是 HR),作为身份属性和就业状态的单一真相来源 | | 访问申请工作流(Access Request Workflow) | 使用户能够申请额外权限的自助服务流程,带有基于风险的审批路由 |
场景背景:快速成长的公司没有自动化身份生命周期。IT 手动创建账户,新员工需要 3-5 天才能就绪。离职员工保留访问权限长达数周。审计发现 45 个应用程序中存在 2,300 个孤立账户。
方法:
常见陷阱:
身份治理生命周期报告
=======================================
权威来源: Workday
IGA 平台: SailPoint IdentityIQ
身份总数: 10,247
在职员工: 9,834
承包商: 413
生命周期自动化
入职(预入职)SLA: 目标:0 天 | 实际:平均 0.2 天
调岗处理 SLA: 目标:1 天 | 实际:平均 0.8 天
离职禁用 SLA: 目标:1 小时 | 实际:平均 0.5 小时
配置指标(过去 30 天)
新入职员工: 187
自动配置: 174(93.0%)
人工干预: 13(7.0%)
角色变更处理: 89
离职处理: 43
1 小时 SLA 内: 41(95.3%)
角色治理
已定义角色: 127
天赋角色: 48
每角色平均权限数: 12.3
重叠度 > 70%: 8 对(建议整合)
孤立账户
检测到: 23
严重: 2(特权账户)
高: 8
中: 13
已修复(30 天): 19
未处理: 4
访问申请
提交: 342
自动审批(天赋): 87(25.4%)
已批准: 231(67.5%)
已拒绝: 24(7.0%)
平均审批时间: 6.2 小时
SOD 违规标记: 12
testing
设计并执行社会工程学渗透测试,包括钓鱼、语音钓鱼、短信钓鱼和物理借口活动,以衡量人员安全韧性并识别培训差距。
testing
主持结构化的事件后审查,以识别根本原因、记录有效和无效的措施,并提出可操作的改进建议以提升未来的事件响应能力。
testing
通过分析举报的邮件、提取指标、评估凭据受攻陷情况、在全组织范围隔离恶意邮件并修复受影响账号来响应网络钓鱼事件。涵盖邮件头分析、URL/附件沙箱检测和邮箱范围清除操作。适用于网络钓鱼响应、邮件事件、凭据钓鱼、鱼叉式网络钓鱼调查或钓鱼修复相关请求。
tools
票据传递(Pass-the-Ticket,PtT)是一种横向移动技术,使用窃取的 Kerberos 票据(TGT 或 TGS)在不知道用户密码的情况下向服务进行认证。通过从已控制的主机内存中提取 Kerberos 票据,攻击者可以将这些票据注入自己的会话以模拟票据所有者。