概述
服务与自动化系统是Home Assistant的行为执行引擎,负责处理用户指令、设备控制、自动化规则等核心功能。本文档深入分析服务调用机制、自动化引擎、触发器系统和动作执行的完整实现。
1. 系统整体架构
1.1 服务与自动化架构概览
graph TB
subgraph "服务系统架构"
A[ServiceRegistry 服务注册表] --> B[Service 服务对象]
B --> C[ServiceCall 服务调用]
C --> D[实体服务调用]
C --> E[域服务调用]
F[服务发现] --> G[服务注册]
G --> H[服务验证]
H --> I[服务执行]
end
subgraph "自动化系统架构"
J[AutomationEntity 自动化实体] --> K[触发器系统]
K --> L[条件检查]
L --> M[动作执行]
N[Trigger 触发器] --> O[状态触发器]
N --> P[时间触发器]
N --> Q[事件触发器]
N --> R[设备触发器]
S[Action 动作] --> T[服务调用动作]
S --> U[延迟动作]
S --> V[条件动作]
S --> W[脚本动作]
end
subgraph "集成层"
X[用户界面] --> Y[REST API]
X --> Z[WebSocket API]
Y --> A
Z --> J
AA[配置文件] --> J
BB[蓝图系统] --> J
end
1.2 调用流程时序图
sequenceDiagram
participant User as 用户/UI
participant API as API层
participant ServiceReg as ServiceRegistry
participant Service as Service
participant Entity as Entity
participant Auto as AutomationEntity
participant Trigger as TriggerSystem
participant Action as ActionExecutor
User->>API: 调用服务
API->>ServiceReg: async_call()
ServiceReg->>Service: 验证和执行
Service->>Entity: 实体操作
Entity-->>Service: 执行结果
Service-->>ServiceReg: 返回结果
ServiceReg-->>API: 服务响应
Note over Trigger: 自动化触发
Trigger->>Auto: 状态变化触发
Auto->>Auto: 检查条件
Auto->>Action: 执行动作
Action->>ServiceReg: 调用服务
ServiceReg->>Service: 执行服务
Service-->>Action: 服务结果
2. 服务系统深度解析
2.1 服务注册与管理
"""服务系统的核心实现分析"""
# 在homeassistant/core.py中的ServiceRegistry类
class ServiceRegistry:
"""服务注册表 - Home Assistant服务系统的核心管理器
核心职责:
1. 服务的注册和注销管理
2. 服务调用的路由和分发
3. 服务权限和安全控制
4. 服务响应数据处理
5. 服务调用的并发管理
服务类型:
- 域服务:针对特定域的全局服务(如automation.reload)
- 实体服务:针对特定实体的服务(如light.turn_on)
- 系统服务:Home Assistant核心服务
- 平台服务:集成特定的服务
"""
def __init__(self, hass: HomeAssistant) -> None:
"""初始化服务注册表
参数:
hass: Home Assistant核心实例
初始化内容:
1. 创建服务存储字典
2. 设置Home Assistant引用
3. 初始化并发控制机制
"""
# 服务存储:{domain: {service_name: Service}}
self._services: dict[str, dict[str, Service]] = {}
self._hass = hass
# 服务调用统计
self._call_count = 0
self._last_call_time = 0.0
@callback
def async_register(
self,
domain: str,
service: str,
service_func: Callable[
[ServiceCall],
Coroutine[Any, Any, ServiceResponse | EntityServiceResponse]
| ServiceResponse
| EntityServiceResponse
| None,
],
schema: VolSchemaType | None = None,
supports_response: SupportsResponse = SupportsResponse.NONE,
job_type: HassJobType | None = None,
) -> None:
"""注册服务 - 服务注册的核心方法
参数:
domain: 服务域名(如'light'、'automation')
service: 服务名称(如'turn_on'、'reload')
service_func: 服务处理函数
schema: 数据验证模式
supports_response: 是否支持返回响应数据
job_type: 任务类型(用于性能优化)
注册流程:
1. 验证服务名称和域名格式
2. 检查服务是否已存在
3. 创建Service对象
4. 注册到服务字典
5. 发送服务注册事件
"""
# 验证输入参数
self._hass.verify_event_loop_thread("hass.services.async_register")
domain = domain.lower()
service = service.lower()
# 验证域名和服务名格式
if not cv.domain_key(domain) or not cv.service_name(service):
raise ValueError(f"Invalid domain '{domain}' or service '{service}'")
# 检查服务是否已存在
if domain in self._services and service in self._services[domain]:
_LOGGER.warning(
"Service %s.%s is already registered, overriding",
domain, service
)
# 创建Service对象
service_obj = Service(
service_func,
schema,
domain,
service,
supports_response=supports_response,
job_type=job_type,
)
# 注册服务
self._services.setdefault(domain, {})[service] = service_obj
# 发送服务注册事件
self._hass.bus.async_fire(
EVENT_SERVICE_REGISTERED,
{"domain": domain, "service": service}
)
_LOGGER.debug("Service %s.%s registered", domain, service)
async def async_call(
self,
domain: str,
service: str,
service_data: dict[str, Any] | None = None,
blocking: bool = False,
context: Context | None = None,
limit: float | None = DEFAULT_SERVICE_TIMEOUT,
target: dict[str, Any] | None = None,
return_response: bool = False,
) -> ServiceResponse | EntityServiceResponse | None:
"""异步调用服务 - 服务调用的核心入口点
参数:
domain: 服务域名
service: 服务名称
service_data: 服务数据参数
blocking: 是否阻塞等待完成
context: 调用上下文
limit: 超时限制
target: 目标选择器
return_response: 是否返回响应数据
返回值:
服务响应数据(如果请求返回响应)
调用流程:
1. 参数验证和标准化
2. 权限检查
3. 创建ServiceCall对象
4. 查找并执行服务
5. 处理响应数据
6. 错误处理和日志记录
"""
# 参数标准化
domain = cv.domain_key(domain)
service = cv.service_name(service)
service_data = service_data or {}
# 合并target到service_data
if target:
service_data.update(target)
# 创建调用上下文
if context is None:
context = Context()
# 权限检查
if context.user_id:
user = await self._hass.auth.async_get_user(context.user_id)
if user and not user.is_admin:
# 检查用户权限
if not user.permissions.check_service(domain, service):
raise Unauthorized(
f"User {user.name} not authorized to call service {domain}.{service}"
)
# 查找服务
service_handler = self._services.get(domain, {}).get(service)
if not service_handler:
raise ServiceNotFound(domain, service)
# 验证响应支持
if return_response and service_handler.supports_response is SupportsResponse.NONE:
raise HomeAssistantError(
f"Service {domain}.{service} does not support returning response data"
)
elif not return_response and service_handler.supports_response is SupportsResponse.ONLY:
raise HomeAssistantError(
f"Service {domain}.{service} requires return_response=True"
)
# 创建ServiceCall对象
service_call = ServiceCall(
domain=domain,
service=service,
data=service_data,
context=context,
return_response=return_response,
)
# 更新调用统计
self._call_count += 1
self._last_call_time = time.time()
try:
# 执行服务调用
_LOGGER.debug(
"Calling service %s.%s with data %s",
domain, service, service_data
)
if blocking:
# 阻塞调用:等待完成
response = await service_handler.async_call(service_call, limit)
else:
# 非阻塞调用:创建后台任务
task = self._hass.async_create_task(
service_handler.async_call(service_call, limit),
f"service call {domain}.{service}"
)
if return_response:
response = await task
else:
response = None
# 发送服务调用事件
self._hass.bus.async_fire(
EVENT_SERVICE_CALLED,
{
"domain": domain,
"service": service,
"service_data": service_data,
"context": context,
}
)
return response
except Exception as err:
_LOGGER.error(
"Error calling service %s.%s: %s",
domain, service, err
)
# 发送服务错误事件
self._hass.bus.async_fire(
EVENT_SERVICE_ERROR,
{
"domain": domain,
"service": service,
"error": str(err),
"context": context,
}
)
raise
def has_service(self, domain: str, service: str) -> bool:
"""检查服务是否存在 - 线程安全的服务查找
参数:
domain: 服务域名
service: 服务名称
返回值:
服务是否存在
"""
return service.lower() in self._services.get(domain.lower(), {})
@callback
def async_services(self) -> dict[str, dict[str, Service]]:
"""获取所有注册的服务 - 返回服务注册表的副本
返回值:
服务注册表的深度复制
注意:
此方法创建完整的副本,开销较大,
应谨慎使用,优先使用has_service进行检查
"""
return {
domain: services.copy()
for domain, services in self._services.items()
}
@callback
def async_remove(self, domain: str, service: str) -> bool:
"""移除已注册的服务
参数:
domain: 服务域名
service: 服务名称
返回值:
是否成功移除服务
用途:
- 集成卸载时清理服务
- 动态服务管理
- 服务更新时的重新注册
"""
domain = domain.lower()
service = service.lower()
if domain not in self._services:
return False
domain_services = self._services[domain]
if service not in domain_services:
return False
# 移除服务
del domain_services[service]
# 如果域下没有服务了,移除域
if not domain_services:
del self._services[domain]
# 发送服务移除事件
self._hass.bus.async_fire(
EVENT_SERVICE_REMOVED,
{"domain": domain, "service": service}
)
_LOGGER.debug("Service %s.%s removed", domain, service)
return True
2.2 Service服务对象
class Service:
"""服务对象 - 封装单个服务的完整信息和执行逻辑
核心属性:
- func: 服务处理函数
- schema: 数据验证模式
- supports_response: 响应支持类型
- job: HassJob执行对象
服务生命周期:
1. 创建:通过async_register注册
2. 验证:检查参数和权限
3. 执行:调用处理函数
4. 响应:返回执行结果
5. 清理:服务完成后的资源清理
"""
def __init__(
self,
func: Callable[
[ServiceCall],
Coroutine[Any, Any, ServiceResponse | EntityServiceResponse]
| ServiceResponse
| EntityServiceResponse
| None,
],
schema: vol.Schema | None = None,
domain: str | None = None,
service: str | None = None,
supports_response: SupportsResponse = SupportsResponse.NONE,
job_type: HassJobType | None = None,
) -> None:
"""初始化服务对象
参数:
func: 服务处理函数
schema: 数据验证模式
domain: 服务域名
service: 服务名称
supports_response: 响应支持级别
job_type: 任务执行类型
"""
self.func = func
self.schema = schema
self.domain = domain
self.service = service
self.supports_response = supports_response
# 创建HassJob用于优化执行
self.job = HassJob(func, job_type=job_type)
# 统计信息
self.call_count = 0
self.total_execution_time = 0.0
self.last_called = None
# 错误跟踪
self.error_count = 0
self.last_error = None
async def async_call(
self, call: ServiceCall, limit: float | None = None
) -> ServiceResponse | EntityServiceResponse | None:
"""异步执行服务调用 - 服务执行的核心逻辑
参数:
call: 服务调用对象
limit: 执行时间限制
返回值:
服务响应数据
执行流程:
1. 数据验证
2. 执行服务函数
3. 处理超时控制
4. 异常处理和日志
5. 统计信息更新
"""
start_time = time.time()
try:
# 数据验证
if self.schema:
try:
call.data = self.schema(call.data)
except vol.Invalid as err:
_LOGGER.error(
"Invalid service data for %s.%s: %s",
self.domain, self.service, err
)
raise ValueError(f"Invalid service data: {err}") from err
# 执行服务函数
_LOGGER.debug(
"Executing service %s.%s",
self.domain, self.service
)
if limit:
# 带超时限制的执行
response = await asyncio.wait_for(
call.context.async_run_job(self.job, call),
timeout=limit
)
else:
# 无超时限制执行
response = await call.context.async_run_job(self.job, call)
# 更新统计信息
execution_time = time.time() - start_time
self.call_count += 1
self.total_execution_time += execution_time
self.last_called = dt_util.utcnow()
# 检查慢服务调用
if execution_time > 1.0: # 超过1秒的调用
_LOGGER.warning(
"Service %s.%s took %.2f seconds to complete",
self.domain, self.service, execution_time
)
_LOGGER.debug(
"Service %s.%s completed in %.3f seconds",
self.domain, self.service, execution_time
)
return response
except asyncio.TimeoutError:
execution_time = time.time() - start_time
self.error_count += 1
self.last_error = dt_util.utcnow()
_LOGGER.error(
"Service %s.%s timed out after %.2f seconds",
self.domain, self.service, execution_time
)
raise HomeAssistantError(
f"Service {self.domain}.{self.service} timed out"
)
except Exception as err:
execution_time = time.time() - start_time
self.error_count += 1
self.last_error = dt_util.utcnow()
_LOGGER.error(
"Error executing service %s.%s after %.3f seconds: %s",
self.domain, self.service, execution_time, err
)
raise
def get_statistics(self) -> dict[str, Any]:
"""获取服务统计信息
返回值:
包含服务性能和使用情况的统计字典
"""
avg_execution_time = (
self.total_execution_time / self.call_count
if self.call_count > 0 else 0
)
return {
"domain": self.domain,
"service": self.service,
"call_count": self.call_count,
"error_count": self.error_count,
"total_execution_time": round(self.total_execution_time, 3),
"average_execution_time": round(avg_execution_time, 3),
"last_called": self.last_called.isoformat() if self.last_called else None,
"last_error": self.last_error.isoformat() if self.last_error else None,
"supports_response": self.supports_response.value,
"error_rate": (
self.error_count / self.call_count
if self.call_count > 0 else 0
),
}
2.3 实体服务调用系统
"""实体服务调用系统 - 处理针对实体的服务调用"""
async def entity_service_call(
hass: HomeAssistant,
registered_entities: dict[str, Entity] | Callable[[], dict[str, Entity]],
func: str | HassJob,
call: ServiceCall,
required_features: Iterable[int] | None = None,
) -> EntityServiceResponse | None:
"""处理实体服务调用 - 实体服务调用的核心调度器
参数:
hass: Home Assistant核心实例
registered_entities: 已注册的实体字典或获取函数
func: 要调用的实体方法名或HassJob对象
call: 服务调用对象
required_features: 必需的实体功能特性
返回值:
实体服务响应数据集合
处理流程:
1. 权限验证和用户身份检查
2. 目标实体选择和过滤
3. 功能特性验证
4. 并发执行实体操作
5. 响应数据聚合和返回
并发策略:
- 对所有目标实体并发执行操作
- 单个实体操作失败不影响其他实体
- 支持部分成功的结果处理
"""
# 权限和用户验证
entity_perms: Callable[[str, str], bool] | None = None
return_response = call.return_response
if call.context.user_id:
user = await hass.auth.async_get_user(call.context.user_id)
if user is None:
raise UnknownUser(context=call.context)
if not user.is_admin:
# 非管理员用户需要检查实体权限
entity_perms = user.permissions.check_entity
# 处理目标实体选择
target_all_entities = call.data.get(ATTR_ENTITY_ID) == ENTITY_MATCH_ALL
if target_all_entities:
referenced: target_helpers.SelectedEntities | None = None
all_referenced: set[str] | None = None
else:
# 解析目标选择器中的实体引用
selector_data = target_helpers.TargetSelectorData(call.data)
referenced = target_helpers.async_extract_referenced_entity_ids(
hass, selector_data, True
)
all_referenced = referenced.referenced | referenced.indirectly_referenced
# 准备服务调用数据
if isinstance(func, str):
# 字符串函数名:传递处理后的数据
data: dict | ServiceCall = remove_entity_service_fields(call)
else:
# HassJob对象:传递完整的ServiceCall
data = call
# 获取实体字典
if callable(registered_entities):
_registered_entities = registered_entities()
else:
_registered_entities = registered_entities
# 获取有权限的候选实体
entity_candidates = _get_permissible_entity_candidates(
call,
_registered_entities,
entity_perms,
target_all_entities,
all_referenced,
)
# 验证和过滤实体
if not target_all_entities:
assert referenced is not None
# 检查缺失的实体引用
missing_referenced = referenced.referenced - set(_registered_entities.keys())
if missing_referenced:
_LOGGER.warning(
"Service %s.%s called with missing entities: %s",
call.domain, call.service,
", ".join(sorted(missing_referenced))
)
# 特性验证:过滤不支持必需特性的实体
if required_features:
def feature_filter(entity: Entity) -> bool:
"""检查实体是否支持必需的特性"""
if not hasattr(entity, 'supported_features'):
return False
entity_features = entity.supported_features or 0
return all(
entity_features & feature for feature in required_features
)
# 分离支持和不支持的实体
supported_entities = []
unsupported_entities = []
for entity in entity_candidates:
if feature_filter(entity):
supported_entities.append(entity)
else:
unsupported_entities.append(entity)
# 记录不支持的实体
if unsupported_entities:
_LOGGER.warning(
"Service %s.%s called on entities that don't support required features %s: %s",
call.domain, call.service,
required_features,
", ".join(entity.entity_id for entity in unsupported_entities)
)
entity_candidates = supported_entities
if not entity_candidates:
# 没有有效的目标实体
_LOGGER.warning(
"Service %s.%s called with no valid target entities",
call.domain, call.service
)
return None
# 并发执行实体操作
_LOGGER.debug(
"Executing service %s.%s on %d entities",
call.domain, call.service, len(entity_candidates)
)
# 创建实体调用任务
tasks = [
hass.async_create_task(
_handle_entity_call(hass, entity, func, data, call.context),
f"service call {call.domain}.{call.service} on {entity.entity_id}"
)
for entity in entity_candidates
]
# 等待所有任务完成
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理结果和错误
response_data: list[dict[str, Any]] = []
error_count = 0
for entity, result in zip(entity_candidates, results):
if isinstance(result, Exception):
error_count += 1
_LOGGER.error(
"Error executing service %s.%s on %s: %s",
call.domain, call.service, entity.entity_id, result
)
elif return_response and result is not None:
# 收集响应数据
response_data.append({
"entity_id": entity.entity_id,
"response": result,
})
# 记录执行统计
success_count = len(entity_candidates) - error_count
_LOGGER.debug(
"Service %s.%s completed on %d/%d entities successfully",
call.domain, call.service, success_count, len(entity_candidates)
)
# 返回响应数据
return response_data if return_response and response_data else None
async def _handle_entity_call(
hass: HomeAssistant,
entity: Entity,
func: str | HassJob,
data: dict | ServiceCall,
context: Context,
) -> ServiceResponse:
"""处理单个实体的服务调用 - 实体服务执行的原子操作
参数:
hass: Home Assistant核心实例
entity: 目标实体对象
func: 要调用的方法名或HassJob
data: 服务调用数据
context: 调用上下文
返回值:
单个实体的服务响应
执行流程:
1. 设置实体上下文
2. 创建或使用现有的HassJob
3. 执行实体方法
4. 处理协程结果
5. 错误处理和日志
"""
# 设置实体调用上下文
entity.async_set_context(context)
# 执行实体方法
task: asyncio.Future[ServiceResponse] | None
if isinstance(func, str):
# 字符串方法名:动态调用实体方法
entity_method = getattr(entity, func)
# 创建优化的HassJob
job = HassJob(
partial(entity_method, **data),
job_type=entity.get_hassjob_type(func),
)
task = hass.async_run_hass_job(job)
else:
# HassJob对象:直接执行
task = hass.async_run_hass_job(func, entity, data)
# 处理任务结果
result: ServiceResponse = None
if task is not None:
result = await task
# 检查错误的协程返回
if asyncio.iscoroutine(result):
_LOGGER.error(
"Service %s for %s incorrectly returns a coroutine object. "
"Await result instead in service handler. "
"Report bug to integration author",
func, entity.entity_id,
)
# 尝试等待协程结果
result = await result
return result
def _get_permissible_entity_candidates(
call: ServiceCall,
entities: dict[str, Entity],
entity_perms: Callable[[str, str], bool] | None,
target_all_entities: bool,
all_referenced: set[str] | None,
) -> list[Entity]:
"""获取有权限访问的候选实体 - 实体权限过滤器
参数:
call: 服务调用对象
entities: 所有可用实体
entity_perms: 实体权限检查函数
target_all_entities: 是否目标所有实体
all_referenced: 引用的实体ID集合
返回值:
经过权限过滤的实体列表
权限检查:
1. 管理员用户:访问所有实体
2. 普通用户:检查每个实体的控制权限
3. 系统调用:跳过权限检查
"""
if target_all_entities:
# 目标所有实体的情况
if entity_perms is None:
# 管理员用户或系统调用
return list(entities.values())
else:
# 普通用户:过滤有权限的实体
return [
entity for entity in entities.values()
if entity_perms(entity.entity_id, POLICY_CONTROL)
]
if all_referenced is None:
# 没有指定目标实体
return []
# 获取引用的实体中存在且有权限的实体
permissible_entities = []
for entity_id in all_referenced:
if entity_id not in entities:
# 实体不存在,记录警告
_LOGGER.warning(
"Service %s.%s called with unknown entity %s",
call.domain, call.service, entity_id
)
continue
entity = entities[entity_id]
# 权限检查
if entity_perms and not entity_perms(entity_id, POLICY_CONTROL):
_LOGGER.warning(
"User not authorized to control entity %s in service %s.%s",
entity_id, call.domain, call.service
)
continue
permissible_entities.append(entity)
return permissible_entities
3. 自动化系统深度解析
3.1 自动化实体核心架构
"""自动化系统的核心实现"""
class BaseAutomationEntity(ToggleEntity):
"""自动化实体基类 - 自动化系统的核心抽象
核心职责:
1. 管理自动化的生命周期
2. 处理触发器的注册和监听
3. 执行条件检查和动作序列
4. 提供自动化状态和控制接口
5. 支持调试和追踪功能
自动化组件:
- 触发器(Triggers):定义何时执行自动化
- 条件(Conditions):决定是否继续执行
- 动作(Actions):实际执行的操作序列
- 变量(Variables):动态数据处理
"""
_attr_should_poll = False
def __init__(
self,
automation_id: str,
name: str,
triggers: list[ConfigType],
cond_func: Callable[[dict[str, Any]], bool] | None,
action_script: Script,
initial_state: bool | None,
variables: ScriptVariables | None,
trigger_variables: ScriptVariables | None,
raw_config: ConfigType | None,
blueprint_inputs: ConfigType | None,
trace_config: ConfigType,
) -> None:
"""初始化自动化实体
参数:
automation_id: 自动化唯一标识
name: 自动化名称
triggers: 触发器配置列表
cond_func: 条件检查函数
action_script: 动作脚本对象
initial_state: 初始启用状态
variables: 全局变量
trigger_variables: 触发器变量
raw_config: 原始配置
blueprint_inputs: 蓝图输入参数
trace_config: 追踪配置
"""
self._automation_id = automation_id
self._triggers = triggers
self._cond_func = cond_func
self._action_script = action_script
self._initial_state = initial_state
self._variables = variables
self._trigger_variables = trigger_variables
self.raw_config = raw_config
self._blueprint_inputs = blueprint_inputs
self._trace_config = trace_config
# 状态管理
self._is_enabled = initial_state
self._last_triggered: datetime | None = None
self._trigger_count = 0
# 内部状态
self._unsub_triggers: list[CALLBACK_TYPE] = []
self._logger = LOGGER.getChild(automation_id)
# 设置实体属性
self._attr_name = name
self._attr_unique_id = automation_id
self._attr_icon = "mdi:robot"
@cached_property
def extra_state_attributes(self) -> dict[str, Any]:
"""返回自动化的额外状态属性
返回值:
包含自动化详细信息的属性字典
"""
attrs = {
ATTR_ID: self._automation_id,
ATTR_LAST_TRIGGERED: self._last_triggered,
"trigger_count": self._trigger_count,
"mode": self._action_script.script_mode,
}
if self._action_script.supports_max:
attrs[ATTR_MAX] = self._action_script.max_runs
attrs[ATTR_CUR] = self._action_script.runs
return attrs
@property
def is_on(self) -> bool:
"""返回自动化是否启用"""
return self._is_enabled
async def async_added_to_hass(self) -> None:
"""自动化添加到Home Assistant时的初始化
初始化流程:
1. 恢复上次运行状态
2. 注册触发器监听
3. 设置初始状态
4. 启动追踪系统
"""
await super().async_added_to_hass()
# 恢复上次触发时间
if (last_state := await self.async_get_last_state()) is not None:
if last_triggered_str := last_state.attributes.get(ATTR_LAST_TRIGGERED):
try:
self._last_triggered = parse_datetime(last_triggered_str)
except (ValueError, TypeError):
self._logger.warning("Invalid last_triggered value in state")
# 恢复触发计数
self._trigger_count = last_state.attributes.get("trigger_count", 0)
# 注册触发器
if self._is_enabled:
await self._async_setup_triggers()
# 监听Home Assistant启动完成事件
@callback
def async_on_hass_started(_: Event) -> None:
"""Home Assistant启动完成后的处理"""
if self._is_enabled and self.hass.state is CoreState.running:
self.hass.async_create_task(
self._async_setup_triggers(),
f"automation {self._automation_id} setup triggers"
)
if self.hass.state is CoreState.starting:
self.hass.bus.async_listen_once(
EVENT_HOMEASSISTANT_STARTED, async_on_hass_started
)
elif self.hass.state is CoreState.running and self._is_enabled:
await self._async_setup_triggers()
async def _async_setup_triggers(self) -> None:
"""设置触发器监听 - 自动化触发系统的核心
设置流程:
1. 清理现有触发器监听
2. 初始化所有触发器
3. 注册触发回调函数
4. 处理触发器设置错误
触发器类型:
- 状态触发器:实体状态变化
- 时间触发器:定时和时间模式
- 事件触发器:系统和自定义事件
- 设备触发器:设备操作和状态
"""
# 清理现有触发器
await self._async_cleanup_triggers()
if not self._triggers:
self._logger.warning("No triggers defined")
return
try:
# 初始化触发器系统
self._unsub_triggers = await async_initialize_triggers(
self.hass,
self._triggers,
self._async_automation_triggered,
DOMAIN,
self._automation_id,
self._logger,
start_automation=True, # 立即启动触发器
)
self._logger.debug(
"Initialized %d triggers for automation %s",
len(self._unsub_triggers), self._automation_id
)
except Exception as err:
self._logger.error("Error setting up triggers: %s", err)
# 如果触发器设置失败,禁用自动化
self._is_enabled = False
self.async_write_ha_state()
raise
async def _async_cleanup_triggers(self) -> None:
"""清理触发器监听"""
if self._unsub_triggers:
self._logger.debug(
"Cleaning up %d triggers for automation %s",
len(self._unsub_triggers), self._automation_id
)
# 并发清理所有触发器
cleanup_tasks = []
for unsub in self._unsub_triggers:
if asyncio.iscoroutinefunction(unsub):
cleanup_tasks.append(self.hass.async_create_task(unsub()))
else:
unsub()
if cleanup_tasks:
await asyncio.gather(*cleanup_tasks, return_exceptions=True)
self._unsub_triggers.clear()
async def _async_automation_triggered(
self,
trigger_data: dict[str, Any],
context: Context | None = None,
) -> None:
"""自动化触发回调 - 处理触发器事件的核心方法
参数:
trigger_data: 触发器数据
context: 触发上下文
触发流程:
1. 检查自动化启用状态
2. 处理触发器变量
3. 执行自动化逻辑
4. 更新状态和统计
5. 错误处理和日志
"""
if not self._is_enabled:
self._logger.debug("Automation is disabled, ignoring trigger")
return
# 更新触发统计
self._trigger_count += 1
self._last_triggered = dt_util.utcnow()
self._logger.info(
"Automation %s triggered by %s",
self._automation_id,
trigger_data.get("description", "unknown trigger")
)
try:
# 准备触发器变量
trigger_variables = {"trigger": trigger_data}
if self._trigger_variables:
try:
additional_vars = self._trigger_variables.async_render(
self.hass, trigger_variables
)
trigger_variables.update(additional_vars)
except TemplateError as err:
self._logger.error("Error rendering trigger variables: %s", err)
return
# 执行自动化
await self.async_trigger(
trigger_variables,
context=context,
skip_condition=False,
)
except Exception as err:
self._logger.error("Error in automation %s: %s", self._automation_id, err)
finally:
# 更新状态
self.async_write_ha_state()
async def async_trigger(
self,
run_variables: dict[str, Any],
context: Context | None = None,
skip_condition: bool = False,
) -> ScriptRunResult | None:
"""手动触发自动化执行 - 自动化执行的核心引擎
参数:
run_variables: 执行变量
context: 执行上下文
skip_condition: 是否跳过条件检查
返回值:
脚本执行结果
执行流程:
1. 创建执行上下文
2. 渲染全局变量
3. 执行条件检查
4. 运行动作脚本
5. 处理执行追踪
"""
if not self._is_enabled:
self._logger.warning("Cannot trigger disabled automation %s", self._automation_id)
return None
# 创建触发上下文
if context is None:
context = Context()
trigger_context = Context(parent_id=context.id if context else None)
# 开始执行追踪
with trace_automation(
self.hass,
self._automation_id,
self.raw_config,
self._blueprint_inputs,
trigger_context,
self._trace_config,
) as automation_trace:
# 准备执行变量
this = None
if state := self.hass.states.get(self.entity_id):
this = state.as_dict()
variables: dict[str, Any] = {
"this": this,
**(run_variables or {})
}
# 渲染全局变量
if self._variables:
try:
variables = self._variables.async_render(self.hass, variables)
except TemplateError as err:
self._logger.error("Error rendering variables: %s", err)
automation_trace.set_error(err)
return None
# 设置追踪信息
automation_trace.set_trace(trace_get())
# 设置触发描述
trigger_description = variables.get("trigger", {}).get("description")
automation_trace.set_trigger_description(trigger_description)
# 添加触发步骤到追踪
if "trigger" in variables and "idx" in variables["trigger"]:
trigger_path = f"trigger/{variables['trigger']['idx']}"
else:
trigger_path = "trigger"
trace_element = TraceElement(variables, trigger_path)
trace_append_element(trace_element)
# 条件检查
if (
not skip_condition
and self._cond_func is not None
and not self._cond_func(variables)
):
self._logger.debug(
"Conditions not met, aborting automation. Condition summary: %s",
trace_get(clear=False),
)
script_execution_set("failed_conditions")
return None
# 设置执行上下文
self.async_set_context(trigger_context)
# 发送自动化触发事件
event_data = {
ATTR_NAME: self.name,
ATTR_ENTITY_ID: self.entity_id,
ATTR_SOURCE: variables.get("trigger", {}).get("description"),
ATTR_VARIABLES: variables,
}
self.hass.bus.async_fire(
EVENT_AUTOMATION_TRIGGERED,
event_data,
context=trigger_context
)
# 执行动作脚本
self._logger.debug("Executing automation %s actions", self._automation_id)
try:
script_result = await self._action_script.async_run(
variables, trigger_context
)
self._logger.debug(
"Automation %s actions completed with result: %s",
self._automation_id, script_result.result
)
return script_result
except Exception as err:
self._logger.error("Error executing automation %s actions: %s", self._automation_id, err)
automation_trace.set_error(err)
return None
async def async_turn_on(self, **kwargs: Any) -> None:
"""启用自动化"""
if self._is_enabled:
return
self._logger.info("Enabling automation %s", self._automation_id)
self._is_enabled = True
# 设置触发器
await self._async_setup_triggers()
self.async_write_ha_state()
async def async_turn_off(self, **kwargs: Any) -> None:
"""禁用自动化"""
if not self._is_enabled:
return
stop_actions = kwargs.get(CONF_STOP_ACTIONS, DEFAULT_STOP_ACTIONS)
self._logger.info("Disabling automation %s", self._automation_id)
self._is_enabled = False
# 清理触发器
await self._async_cleanup_triggers()
# 停止正在运行的动作(如果需要)
if stop_actions:
await self._action_script.async_stop()
self.async_write_ha_state()
async def async_will_remove_from_hass(self) -> None:
"""自动化从Home Assistant移除时的清理"""
await self._async_cleanup_triggers()
await self._action_script.async_stop()
await super().async_will_remove_from_hass()
3.2 触发器系统架构
graph TB
subgraph "触发器系统架构"
A[TriggerProtocol 触发器协议] --> B[状态触发器]
A --> C[时间触发器]
A --> D[事件触发器]
A --> E[设备触发器]
A --> F[模板触发器]
G[TriggerInfo 触发信息] --> H[触发数据]
G --> I[触发上下文]
G --> J[触发描述]
K[触发器注册] --> L[async_initialize_triggers]
L --> M[触发器验证]
M --> N[监听器注册]
N --> O[回调设置]
end
3.3 触发器实现示例
"""触发器系统实现示例"""
# 状态触发器实现
async def async_state_trigger_setup(
hass: HomeAssistant,
config: ConfigType,
action: TriggerActionType,
trigger_info: TriggerInfo,
) -> CALLBACK_TYPE:
"""状态触发器设置 - 监听实体状态变化
参数:
hass: Home Assistant实例
config: 触发器配置
action: 触发动作函数
trigger_info: 触发器信息
返回值:
取消监听的回调函数
配置示例:
trigger:
- platform: state
entity_id: sensor.temperature
from: "20"
to: "25"
for:
minutes: 5
"""
entity_ids = config.get(CONF_ENTITY_ID, [])
from_state = config.get(CONF_FROM)
to_state = config.get(CONF_TO)
time_delta = config.get(CONF_FOR)
attribute = config.get(CONF_ATTRIBUTE)
# 状态变化跟踪
state_trackers: dict[str, dict] = {}
@callback
def state_listener(event: Event[EventStateChangedData]) -> None:
"""状态变化监听器"""
entity_id = event.data["entity_id"]
old_state = event.data["old_state"]
new_state = event.data["new_state"]
# 检查状态变化是否匹配条件
if not _state_change_matches(
old_state, new_state, from_state, to_state, attribute
):
# 清理跟踪器(如果存在)
state_trackers.pop(entity_id, None)
return
# 处理延迟触发
if time_delta:
_handle_delayed_trigger(
hass, entity_id, new_state, time_delta,
action, trigger_info, state_trackers
)
else:
# 立即触发
_execute_state_trigger(
entity_id, old_state, new_state, action, trigger_info
)
# 注册状态变化监听器
unsub = hass.bus.async_listen(EVENT_STATE_CHANGED, state_listener)
return unsub
def _state_change_matches(
old_state: State | None,
new_state: State | None,
from_state: str | None,
to_state: str | None,
attribute: str | None,
) -> bool:
"""检查状态变化是否匹配触发条件
匹配逻辑:
1. 检查from状态条件
2. 检查to状态条件
3. 处理属性变化(如果指定)
4. 处理未知状态和不可用状态
"""
# 获取比较值
if attribute:
old_value = old_state.attributes.get(attribute) if old_state else None
new_value = new_state.attributes.get(attribute) if new_state else None
else:
old_value = old_state.state if old_state else None
new_value = new_state.state if new_state else None
# 检查from条件
if from_state is not None:
if old_value != from_state:
return False
# 检查to条件
if to_state is not None:
if new_value != to_state:
return False
# 确保状态确实发生了变化
return old_value != new_value
def _handle_delayed_trigger(
hass: HomeAssistant,
entity_id: str,
new_state: State,
time_delta: timedelta,
action: TriggerActionType,
trigger_info: TriggerInfo,
state_trackers: dict[str, dict],
) -> None:
"""处理延迟触发逻辑"""
@callback
def delayed_trigger_callback() -> None:
"""延迟触发回调"""
# 验证状态是否仍然满足条件
current_state = hass.states.get(entity_id)
if current_state and current_state.state == new_state.state:
_execute_state_trigger(
entity_id, None, current_state, action, trigger_info
)
# 清理跟踪器
state_trackers.pop(entity_id, None)
# 取消现有的延迟触发
if entity_id in state_trackers:
state_trackers[entity_id]["cancel"]()
# 设置新的延迟触发
cancel_handle = hass.loop.call_later(
time_delta.total_seconds(),
delayed_trigger_callback
)
state_trackers[entity_id] = {
"cancel": cancel_handle.cancel,
"trigger_time": dt_util.utcnow() + time_delta,
}
def _execute_state_trigger(
entity_id: str,
old_state: State | None,
new_state: State | None,
action: TriggerActionType,
trigger_info: TriggerInfo,
) -> None:
"""执行状态触发动作"""
# 构建触发数据
trigger_data = {
"platform": "state",
"entity_id": entity_id,
"from_state": old_state,
"to_state": new_state,
"for": trigger_info.get("for"),
"description": f"state of {entity_id}",
}
if old_state:
trigger_data["from"] = old_state.state
if new_state:
trigger_data["to"] = new_state.state
# 执行触发动作
hass.async_create_task(
action(trigger_data),
f"state trigger for {entity_id}"
)
# 时间触发器实现
async def async_time_trigger_setup(
hass: HomeAssistant,
config: ConfigType,
action: TriggerActionType,
trigger_info: TriggerInfo,
) -> CALLBACK_TYPE:
"""时间触发器设置 - 基于时间的自动化触发
支持的时间触发类型:
1. 固定时间:每天的特定时间
2. 时间模式:使用cron表达式
3. 间隔触发:固定时间间隔
4. 相对时间:相对于日出日落等
"""
trigger_type = config.get(CONF_PLATFORM)
if CONF_AT in config:
# 固定时间触发
return await _setup_time_at_trigger(hass, config, action, trigger_info)
elif CONF_HOURS in config or CONF_MINUTES in config or CONF_SECONDS in config:
# 间隔触发
return await _setup_interval_trigger(hass, config, action, trigger_info)
else:
raise ValueError(f"Invalid time trigger configuration: {config}")
async def _setup_time_at_trigger(
hass: HomeAssistant,
config: ConfigType,
action: TriggerActionType,
trigger_info: TriggerInfo,
) -> CALLBACK_TYPE:
"""设置固定时间触发器"""
at_time = config[CONF_AT]
@callback
def time_trigger_callback(now: datetime) -> None:
"""时间触发回调"""
trigger_data = {
"platform": "time",
"now": now,
"description": f"time trigger at {at_time}",
}
hass.async_create_task(
action(trigger_data),
f"time trigger at {at_time}"
)
# 注册时间监听器
unsub = async_track_time_change(
hass,
time_trigger_callback,
hour=at_time.hour,
minute=at_time.minute,
second=at_time.second,
)
return unsub
4. 动作执行系统
4.1 Script脚本引擎
"""Script脚本执行引擎 - 自动化动作序列的执行器"""
class Script:
"""脚本执行器 - 管理复杂动作序列的执行
核心功能:
1. 顺序执行动作列表
2. 支持条件分支和循环
3. 处理并发执行和排队
4. 提供执行追踪和调试
5. 实现错误处理和恢复
执行模式:
- single: 单实例执行(默认)
- restart: 重启模式,取消正在运行的实例
- queued: 排队模式,按顺序执行
- parallel: 并行模式,同时执行多个实例
"""
def __init__(
self,
hass: HomeAssistant,
sequence: list[ConfigType],
name: str | None = None,
domain: str | None = None,
*,
script_mode: str = DEFAULT_SCRIPT_MODE,
max_runs: int = DEFAULT_MAX_RUNS,
max_exceeded: str = DEFAULT_MAX_EXCEEDED,
logger: Logger | None = None,
top_level: bool = True,
variables: dict[str, Any] | None = None,
) -> None:
"""初始化脚本执行器
参数:
hass: Home Assistant实例
sequence: 动作序列配置
name: 脚本名称
domain: 脚本域
script_mode: 执行模式
max_runs: 最大并发运行数
max_exceeded: 超出限制时的处理方式
logger: 日志记录器
top_level: 是否为顶级脚本
variables: 默认变量
"""
self.hass = hass
self.sequence = sequence
self.name = name or "Script"
self.domain = domain
self.script_mode = script_mode
self.max_runs = max_runs
self.max_exceeded = max_exceeded
self.top_level = top_level
self.variables = variables or {}
# 日志配置
self.logger = logger or _LOGGER.getChild(name or "script")
# 执行状态
self._runs: list[ScriptRun] = []
self._is_stopping = False
# 性能统计
self.total_runs = 0
self.successful_runs = 0
self.failed_runs = 0
self.total_execution_time = 0.0
@property
def is_running(self) -> bool:
"""脚本是否正在运行"""
return bool(self._runs)
@property
def runs(self) -> int:
"""当前运行的实例数"""
return len(self._runs)
@property
def supports_max(self) -> bool:
"""是否支持最大运行数限制"""
return self.script_mode in (SCRIPT_MODE_QUEUED, SCRIPT_MODE_PARALLEL)
async def async_run(
self,
variables: dict[str, Any] | None = None,
context: Context | None = None,
) -> ScriptRunResult:
"""异步运行脚本 - 脚本执行的主要入口点
参数:
variables: 执行变量
context: 执行上下文
返回值:
脚本执行结果对象
执行流程:
1. 检查执行模式和限制
2. 创建脚本运行实例
3. 处理并发控制
4. 执行动作序列
5. 处理结果和清理
"""
if self._is_stopping:
self.logger.debug("Script %s is stopping, cannot start new run", self.name)
return ScriptRunResult(success=False, result="stopping")
# 合并变量
run_variables = {**self.variables}
if variables:
run_variables.update(variables)
# 创建执行上下文
if context is None:
context = Context()
# 处理执行模式
if self.script_mode == SCRIPT_MODE_SINGLE:
# 单实例模式:停止现有实例
if self._runs:
self.logger.debug(
"Script %s is already running in single mode, stopping previous run",
self.name
)
await self.async_stop()
elif self.script_mode == SCRIPT_MODE_RESTART:
# 重启模式:取消所有现有实例
if self._runs:
self.logger.debug(
"Script %s restarting, cancelling %d existing runs",
self.name, len(self._runs)
)
for run in self._runs:
run.async_cancel()
self._runs.clear()
elif self.script_mode in (SCRIPT_MODE_QUEUED, SCRIPT_MODE_PARALLEL):
# 队列/并行模式:检查运行数限制
if len(self._runs) >= self.max_runs:
if self.max_exceeded == MAX_EXCEEDED_SILENT:
self.logger.debug(
"Script %s max runs (%d) reached, silently ignoring",
self.name, self.max_runs
)
return ScriptRunResult(success=False, result="max_runs_reached")
elif self.max_exceeded == MAX_EXCEEDED_WARNING:
self.logger.warning(
"Script %s max runs (%d) reached, ignoring new request",
self.name, self.max_runs
)
return ScriptRunResult(success=False, result="max_runs_reached")
else: # MAX_EXCEEDED_ERROR
raise HomeAssistantError(
f"Script {self.name} maximum runs ({self.max_runs}) reached"
)
# 创建脚本运行实例
script_run = ScriptRun(
self,
run_variables,
context,
self.logger,
)
# 添加到运行列表
self._runs.append(script_run)
self.total_runs += 1
try:
# 执行脚本
self.logger.debug("Starting script %s run", self.name)
start_time = time.time()
result = await script_run.async_run()
# 更新统计
execution_time = time.time() - start_time
self.total_execution_time += execution_time
if result.success:
self.successful_runs += 1
self.logger.debug(
"Script %s completed successfully in %.3f seconds",
self.name, execution_time
)
else:
self.failed_runs += 1
self.logger.debug(
"Script %s failed after %.3f seconds: %s",
self.name, execution_time, result.result
)
return result
except Exception as err:
# 执行异常处理
self.failed_runs += 1
self.logger.error("Error running script %s: %s", self.name, err)
return ScriptRunResult(success=False, result=f"exception: {err}")
finally:
# 清理运行实例
if script_run in self._runs:
self._runs.remove(script_run)
async def async_stop(self) -> None:
"""停止所有正在运行的脚本实例
停止流程:
1. 设置停止标志
2. 取消所有运行实例
3. 等待实例完成清理
4. 重置内部状态
"""
if not self._runs:
return
self.logger.debug("Stopping script %s with %d running instances", self.name, len(self._runs))
self._is_stopping = True
try:
# 取消所有运行实例
cancel_tasks = []
for run in self._runs:
if run.is_running:
cancel_tasks.append(
self.hass.async_create_task(
run.async_stop(),
f"stop script run {run.run_id}"
)
)
# 等待所有实例停止
if cancel_tasks:
await asyncio.gather(*cancel_tasks, return_exceptions=True)
# 清理运行列表
self._runs.clear()
self.logger.debug("Script %s stopped successfully", self.name)
finally:
self._is_stopping = False
class ScriptRun:
"""脚本运行实例 - 管理单次脚本执行的完整生命周期
核心职责:
1. 执行动作序列中的每个步骤
2. 处理变量渲染和上下文管理
3. 实现错误处理和异常恢复
4. 提供执行追踪和调试信息
5. 管理子脚本和嵌套调用
"""
def __init__(
self,
script: Script,
variables: dict[str, Any],
context: Context,
logger: Logger,
) -> None:
"""初始化脚本运行实例"""
self.script = script
self.variables = variables
self.context = context
self.logger = logger
# 运行状态
self.run_id = str(uuid.uuid4())[:8]
self.is_running = False
self.is_cancelled = False
# 执行控制
self._current_action: asyncio.Task | None = None
self._stop_requested = False
async def async_run(self) -> ScriptRunResult:
"""执行脚本动作序列
返回值:
脚本执行结果
执行逻辑:
按顺序执行每个动作,处理条件判断、
循环控制、延迟等特殊动作类型
"""
self.is_running = True
current_variables = self.variables.copy()
try:
for idx, action_config in enumerate(self.script.sequence):
if self._stop_requested or self.is_cancelled:
return ScriptRunResult(success=False, result="cancelled")
self.logger.debug(
"Executing script %s action %d: %s",
self.script.name, idx, action_config.get("alias", "unnamed action")
)
# 执行单个动作
result = await self._async_execute_action(
action_config, current_variables, idx
)
if not result.success:
self.logger.error(
"Script %s action %d failed: %s",
self.script.name, idx, result.result
)
return result
# 更新变量(如果动作返回了新变量)
if result.variables:
current_variables.update(result.variables)
return ScriptRunResult(success=True, variables=current_variables)
except Exception as err:
self.logger.error("Script %s run %s failed: %s", self.script.name, self.run_id, err)
return ScriptRunResult(success=False, result=f"exception: {err}")
finally:
self.is_running = False
async def _async_execute_action(
self,
action_config: ConfigType,
variables: dict[str, Any],
action_idx: int,
) -> ScriptRunResult:
"""执行单个动作配置
支持的动作类型:
- service: 服务调用
- delay: 延迟等待
- wait_template: 模板等待
- condition: 条件检查
- repeat: 重复执行
- choose: 条件分支
- parallel: 并行执行
"""
action_type = _determine_action_type(action_config)
try:
if action_type == "service":
return await self._async_service_action(action_config, variables)
elif action_type == "delay":
return await self._async_delay_action(action_config, variables)
elif action_type == "condition":
return await self._async_condition_action(action_config, variables)
elif action_type == "repeat":
return await self._async_repeat_action(action_config, variables)
elif action_type == "choose":
return await self._async_choose_action(action_config, variables)
elif action_type == "parallel":
return await self._async_parallel_action(action_config, variables)
else:
raise ValueError(f"Unknown action type: {action_type}")
except Exception as err:
return ScriptRunResult(success=False, result=f"action_error: {err}")
async def _async_service_action(
self, action_config: ConfigType, variables: dict[str, Any]
) -> ScriptRunResult:
"""执行服务调用动作"""
service = action_config.get("service")
service_data = action_config.get("data", {})
target = action_config.get("target", {})
if not service:
return ScriptRunResult(success=False, result="missing_service")
# 渲染模板
try:
rendered_service = template.render_complex(service, variables)
rendered_data = template.render_complex(service_data, variables)
rendered_target = template.render_complex(target, variables)
except TemplateError as err:
return ScriptRunResult(success=False, result=f"template_error: {err}")
# 调用服务
try:
domain, service_name = rendered_service.split(".", 1)
await self.script.hass.services.async_call(
domain,
service_name,
rendered_data,
blocking=True,
context=self.context,
target=rendered_target,
)
return ScriptRunResult(success=True)
except Exception as err:
return ScriptRunResult(success=False, result=f"service_error: {err}")
async def async_stop(self) -> None:
"""停止脚本运行"""
self._stop_requested = True
if self._current_action:
self._current_action.cancel()
def async_cancel(self) -> None:
"""取消脚本运行"""
self.is_cancelled = True
self.async_stop()
@dataclass
class ScriptRunResult:
"""脚本运行结果"""
success: bool
result: str | None = None
variables: dict[str, Any] | None = None
5. 最佳实践和使用模式
5.1 服务开发最佳实践
"""服务开发最佳实践指南"""
# 1. 服务注册的最佳实践
async def async_setup_services(hass: HomeAssistant) -> None:
"""服务设置的最佳实践"""
# 使用适当的数据验证模式
SERVICE_SCHEMA = vol.Schema({
vol.Required("entity_id"): cv.entity_ids,
vol.Optional("brightness", default=255): vol.All(
vol.Coerce(int), vol.Range(min=1, max=255)
),
vol.Optional("color_name"): cv.string,
})
async def async_service_handler(call: ServiceCall) -> ServiceResponse:
"""服务处理函数的最佳实践"""
try:
# 1. 记录服务调用
_LOGGER.debug(
"Service %s.%s called with data: %s",
call.domain, call.service, call.data
)
# 2. 验证实体存在性
entity_ids = call.data["entity_id"]
existing_entities = []
for entity_id in entity_ids:
if hass.states.get(entity_id):
existing_entities.append(entity_id)
else:
_LOGGER.warning("Entity %s not found", entity_id)
if not existing_entities:
raise HomeAssistantError("No valid entities found")
# 3. 执行服务逻辑
results = []
for entity_id in existing_entities:
result = await _process_entity(hass, entity_id, call.data)
results.append(result)
# 4. 返回响应(如果需要)
if call.return_response:
return {"results": results}
return None
except Exception as err:
_LOGGER.error("Service error: %s", err)
raise
# 注册服务
hass.services.async_register(
"my_domain",
"my_service",
async_service_handler,
schema=SERVICE_SCHEMA,
supports_response=SupportsResponse.OPTIONAL,
)
# 2. 实体服务的最佳实践
class MyEntityPlatform:
"""实体服务注册的最佳实践"""
def __init__(self, hass: HomeAssistant):
self.hass = hass
self.entities: dict[str, MyEntity] = {}
async def async_setup_entity_services(self) -> None:
"""设置实体服务"""
# 定义服务模式
SET_VALUE_SCHEMA = vol.Schema({
vol.Required("value"): vol.All(
vol.Coerce(float),
vol.Range(min=0, max=100)
),
vol.Optional("transition", default=0): cv.positive_int,
})
# 注册实体服务
async_register_entity_service(
self.hass,
"my_domain",
"set_value",
entities=self.entities,
func="async_set_value",
schema=SET_VALUE_SCHEMA,
supports_response=SupportsResponse.NONE,
)
async def async_custom_entity_service(
self, entity: MyEntity, call: ServiceCall
) -> None:
"""自定义实体服务处理"""
value = call.data["value"]
transition = call.data["transition"]
await entity.async_set_value(value, transition)
# 3. 服务性能优化
class PerformantServiceHandler:
"""高性能服务处理器"""
def __init__(self, hass: HomeAssistant):
self.hass = hass
self._batch_operations: dict[str, list] = defaultdict(list)
self._batch_timer: asyncio.Handle | None = None
async def async_handle_service(self, call: ServiceCall) -> None:
"""带批处理优化的服务处理"""
operation_type = call.data.get("operation_type")
if operation_type == "batch_update":
# 批量操作:收集请求后批量处理
self._batch_operations[operation_type].append(call.data)
self._schedule_batch_processing()
else:
# 单个操作:立即处理
await self._process_single_operation(call)
def _schedule_batch_processing(self) -> None:
"""调度批量处理"""
if self._batch_timer:
self._batch_timer.cancel()
self._batch_timer = self.hass.loop.call_later(
0.1, # 100ms批处理延迟
lambda: self.hass.async_create_task(
self._process_batch_operations()
)
)
async def _process_batch_operations(self) -> None:
"""处理批量操作"""
for operation_type, operations in self._batch_operations.items():
if operations:
await self._execute_batch_operation(operation_type, operations)
self._batch_operations.clear()
self._batch_timer = None
5.2 自动化开发最佳实践
"""自动化开发最佳实践"""
# 1. 高效的触发器设计
EFFICIENT_AUTOMATION_CONFIG = {
"alias": "Efficient Motion Detection",
"trigger": [
{
"platform": "state",
"entity_id": "binary_sensor.motion",
"to": "on",
"for": {"seconds": 2}, # 防抖动
}
],
"condition": [
{
"condition": "time",
"after": "sunset",
"before": "sunrise",
},
{
"condition": "state",
"entity_id": "input_boolean.automation_enabled",
"state": "on",
}
],
"action": [
{
"service": "light.turn_on",
"target": {"area_id": "living_room"},
"data": {
"brightness_pct": "{{ states('input_number.brightness') | int }}",
"transition": 2,
}
}
],
"mode": "single", # 防止重复触发
}
# 2. 复杂自动化的模块化设计
class ModularAutomationManager:
"""模块化自动化管理器"""
def __init__(self, hass: HomeAssistant):
self.hass = hass
self.automation_modules: dict[str, dict] = {}
def register_automation_module(
self,
module_name: str,
triggers: list[dict],
conditions: list[dict],
actions: list[dict],
variables: dict | None = None,
) -> None:
"""注册自动化模块"""
self.automation_modules[module_name] = {
"triggers": triggers,
"conditions": conditions,
"actions": actions,
"variables": variables or {},
}
async def async_create_composite_automation(
self,
name: str,
module_sequence: list[str],
global_conditions: list[dict] | None = None,
) -> dict:
"""创建复合自动化"""
composite_config = {
"alias": name,
"trigger": [],
"condition": global_conditions or [],
"action": [],
"variables": {},
}
# 合并模块配置
for module_name in module_sequence:
if module_name not in self.automation_modules:
raise ValueError(f"Unknown module: {module_name}")
module = self.automation_modules[module_name]
composite_config["trigger"].extend(module["triggers"])
composite_config["condition"].extend(module["conditions"])
composite_config["action"].extend(module["actions"])
composite_config["variables"].update(module["variables"])
return composite_config
# 3. 自动化性能监控
class AutomationPerformanceMonitor:
"""自动化性能监控器"""
def __init__(self, hass: HomeAssistant):
self.hass = hass
self.execution_stats: dict[str, dict] = defaultdict(
lambda: {
"total_runs": 0,
"successful_runs": 0,
"failed_runs": 0,
"total_execution_time": 0.0,
"average_execution_time": 0.0,
"last_run": None,
"slowest_run": 0.0,
}
)
# 监听自动化事件
hass.bus.async_listen(
EVENT_AUTOMATION_TRIGGERED,
self._automation_triggered
)
@callback
def _automation_triggered(self, event: Event) -> None:
"""处理自动化触发事件"""
entity_id = event.data[ATTR_ENTITY_ID]
# 记录开始时间
start_time = time.time()
stats = self.execution_stats[entity_id]
stats["total_runs"] += 1
stats["last_run"] = dt_util.utcnow()
# 注册完成监听
def track_completion():
execution_time = time.time() - start_time
stats["total_execution_time"] += execution_time
stats["average_execution_time"] = (
stats["total_execution_time"] / stats["total_runs"]
)
if execution_time > stats["slowest_run"]:
stats["slowest_run"] = execution_time
# 检查慢自动化
if execution_time > 5.0: # 超过5秒
_LOGGER.warning(
"Slow automation detected: %s took %.2f seconds",
entity_id, execution_time
)
def get_performance_report(self) -> dict[str, dict]:
"""获取性能报告"""
return dict(self.execution_stats)
6. 总结
Home Assistant的服务与自动化系统提供了强大而灵活的行为执行引擎,通过以下核心特性实现了复杂的智能家居自动化:
6.1 核心优势
- 统一服务模型:所有功能通过服务调用统一访问
- 灵活的触发机制:支持多种触发器类型和组合
- 强大的脚本引擎:复杂动作序列的可靠执行
- 完善的权限控制:细粒度的安全访问管理
- 高性能并发:优化的异步执行和资源管理
6.2 系统特色
- 声明式配置:通过YAML配置实现复杂逻辑
- 模板系统集成:动态数据处理和条件判断
- 追踪和调试:完整的执行追踪和调试支持
- 蓝图系统:自动化的模板化和复用
6.3 实践价值
- 为复杂的智能家居场景提供了完整的自动化解决方案
- 展示了如何设计可扩展的事件驱动系统
- 提供了优秀的异步编程和并发控制范例
- 演示了声明式配置与命令式执行的有效结合
这个服务与自动化系统是Home Assistant智能化能力的核心,使用户能够创建复杂而可靠的家居自动化体验。
下一步分析
至此,我们已经完成了Home Assistant Core项目的核心模块深入分析。整个文档体系包括:
- 整体架构分析 - 项目结构和设计理念
- 核心模块分析 - HomeAssistant、EventBus、StateMachine等
- 组件系统分析 - 动态加载和管理机制
- 实体平台分析 - 设备抽象和生命周期
- API接口分析 - REST和WebSocket API
- 数据存储分析 - 持久化和配置管理
- 关键数据结构UML - 核心数据模型
- 实战经验与最佳实践 - 开发指南
- 服务自动化分析 - 行为执行引擎
这套完整的技术文档为深入理解和掌握Home Assistant Core提供了全面的指导。