目录:
传统的 RPA(Robotic Process Automation)工具通常需要手动编写脚本或使用可视化编排工具,学习成本高且维护困难。随着多模态 AI 的发展,一种新的范式正在兴起:通过录制用户的桌面操作,让 AI 自动理解并复现这些操作。这种方式大大降低了自动化的门槛,让业务人员也能快速构建自动化流程。
为什么选择录屏驱动的 RPA?
传统 RPA 的痛点
- 高技术门槛:需要编写代码或学习复杂的可视化工具
- 维护成本高:UI 变化时脚本经常失效
- 场景覆盖有限:难以处理动态内容和异常情况
- 缺乏智能:无法理解操作的语义,只是机械重复
录屏 + AI 的优势
- 零代码上手:只需录制一次操作,AI 自动学习
- 语义理解:AI 理解操作意图,而非死记硬背坐标
- 智能适应:界面变化时能自动调整
- 异常处理:可以识别并处理意外情况
整体架构设计
┌──────────────────────────────────────────────────────────────────┐
│ Screen Recording RPA System │
├──────────────────────────────────────────────────────────────────┤
│ Layer 1: 录制层 │
│ ├── 屏幕录制模块 (视频 + 音频) │
│ ├── 鼠标/键盘事件捕获 │
│ └── 时间戳同步 │
├──────────────────────────────────────────────────────────────────┤
│ Layer 2: 理解层 │
│ ├── 视频帧提取与关键帧检测 │
│ ├── 多模态 LLM 分析 (GPT-4V / Claude Vision / Gemini) │
│ └── 操作序列抽象 │
├──────────────────────────────────────────────────────────────────┤
│ Layer 3: 执行层 │
│ ├── 元素定位引擎 (视觉 + 语义) │
│ ├── 动作执行器 (pyautogui / playwright) │
│ └── 验证与重试机制 │
├──────────────────────────────────────────────────────────────────┤
│ Layer 4: 编排层 │
│ ├── 流程定义与管理 │
│ ├── 调度与触发器 │
│ └── 监控与日志 │
└──────────────────────────────────────────────────────────────────┘
Step 1: 构建录制模块
1.1 屏幕录制
我们需要同时录制屏幕画面和用户的输入事件。
import cv2
import numpy as np
import pyautogui
from pynput import mouse, keyboard
from dataclasses import dataclass, field
from typing import List, Optional, Tuple
from datetime import datetime
import threading
import json
import time
@dataclass
class InputEvent:
"""表示一个输入事件"""
timestamp: float # 相对于录制开始的时间(秒)
event_type: str # "click", "scroll", "key_press", "key_release", "move"
position: Optional[Tuple[int, int]] = None # 鼠标位置
button: Optional[str] = None # 鼠标按钮
key: Optional[str] = None # 按键
data: dict = field(default_factory=dict) # 额外数据
@dataclass
class RecordingSession:
"""录制会话"""
video_path: str
events: List[InputEvent]
start_time: float
end_time: float
screen_size: Tuple[int, int]
fps: int = 10
class ScreenRecorder:
"""
屏幕录制器
同时捕获:
1. 屏幕视频
2. 鼠标点击、移动、滚动
3. 键盘输入
"""
def __init__(self, output_dir: str = "./recordings", fps: int = 10):
self.output_dir = output_dir
self.fps = fps
self.is_recording = False
self.events: List[InputEvent] = []
self.start_time: float = 0
self.screen_size = pyautogui.size()
# 录制线程
self._video_thread: Optional[threading.Thread] = None
self._video_writer: Optional[cv2.VideoWriter] = None
# 输入监听器
self._mouse_listener: Optional[mouse.Listener] = None
self._keyboard_listener: Optional[keyboard.Listener] = None
def _on_mouse_click(self, x: int, y: int, button, pressed: bool):
"""鼠标点击回调"""
if not self.is_recording:
return
event = InputEvent(
timestamp=time.time() - self.start_time,
event_type="click" if pressed else "release",
position=(x, y),
button=str(button),
data={"pressed": pressed}
)
self.events.append(event)
def _on_mouse_scroll(self, x: int, y: int, dx: int, dy: int):
"""鼠标滚动回调"""
if not self.is_recording:
return
event = InputEvent(
timestamp=time.time() - self.start_time,
event_type="scroll",
position=(x, y),
data={"dx": dx, "dy": dy}
)
self.events.append(event)
def _on_key_press(self, key):
"""键盘按下回调"""
if not self.is_recording:
return
try:
key_str = key.char if hasattr(key, 'char') else str(key)
except AttributeError:
key_str = str(key)
event = InputEvent(
timestamp=time.time() - self.start_time,
event_type="key_press",
key=key_str
)
self.events.append(event)
def _record_video(self, output_path: str):
"""视频录制线程"""
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
self._video_writer = cv2.VideoWriter(
output_path,
fourcc,
self.fps,
self.screen_size
)
interval = 1.0 / self.fps
while self.is_recording:
frame_start = time.time()
# 截取屏幕
screenshot = pyautogui.screenshot()
frame = np.array(screenshot)
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
# 在帧上绘制鼠标位置(用于调试)
mouse_pos = pyautogui.position()
cv2.circle(frame, mouse_pos, 10, (0, 0, 255), -1)
self._video_writer.write(frame)
# 控制帧率
elapsed = time.time() - frame_start
if elapsed < interval:
time.sleep(interval - elapsed)
self._video_writer.release()
def start_recording(self, session_name: str = None) -> str:
"""
开始录制
Args:
session_name: 会话名称,用于生成文件名
Returns:
录制文件路径
"""
if self.is_recording:
raise RuntimeError("Already recording")
import os
os.makedirs(self.output_dir, exist_ok=True)
if session_name is None:
session_name = datetime.now().strftime("%Y%m%d_%H%M%S")
video_path = f"{self.output_dir}/{session_name}.mp4"
self.events = []
self.start_time = time.time()
self.is_recording = True
# 启动视频录制线程
self._video_thread = threading.Thread(
target=self._record_video,
args=(video_path,)
)
self._video_thread.start()
# 启动输入监听
self._mouse_listener = mouse.Listener(
on_click=self._on_mouse_click,
on_scroll=self._on_mouse_scroll
)
self._keyboard_listener = keyboard.Listener(
on_press=self._on_key_press
)
self._mouse_listener.start()
self._keyboard_listener.start()
print(f"Recording started: {video_path}")
return video_path
def stop_recording(self) -> RecordingSession:
"""
停止录制
Returns:
RecordingSession 对象
"""
if not self.is_recording:
raise RuntimeError("Not recording")
self.is_recording = False
end_time = time.time()
# 停止监听器
if self._mouse_listener:
self._mouse_listener.stop()
if self._keyboard_listener:
self._keyboard_listener.stop()
# 等待视频线程结束
if self._video_thread:
self._video_thread.join()
# 获取视频路径
video_path = self._video_writer.getfilename() if self._video_writer else ""
session = RecordingSession(
video_path=video_path,
events=self.events.copy(),
start_time=self.start_time,
end_time=end_time,
screen_size=self.screen_size,
fps=self.fps
)
# 保存事件数据
events_path = video_path.replace('.mp4', '_events.json')
self._save_events(events_path)
print(f"Recording stopped. Duration: {end_time - self.start_time:.1f}s")
print(f"Captured {len(self.events)} input events")
return session
def _save_events(self, path: str):
"""保存事件到 JSON 文件"""
events_data = [
{
"timestamp": e.timestamp,
"event_type": e.event_type,
"position": e.position,
"button": e.button,
"key": e.key,
"data": e.data
}
for e in self.events
]
with open(path, 'w') as f:
json.dump(events_data, f, indent=2)
# generated by hugo's coding agent
1.2 关键帧提取
录制完成后,我们需要从视频中提取关键帧,用于后续的 AI 分析。
import cv2
import numpy as np
from typing import List, Tuple
from dataclasses import dataclass
@dataclass
class KeyFrame:
"""关键帧"""
frame_index: int
timestamp: float
image: np.ndarray
associated_events: List[InputEvent]
change_score: float # 与前一帧的变化程度
class KeyFrameExtractor:
"""
关键帧提取器
策略:
1. 基于画面变化检测
2. 基于输入事件(点击、按键)
3. 固定间隔采样
"""
def __init__(self,
change_threshold: float = 0.1,
min_interval: float = 0.5,
event_window: float = 0.2):
"""
Args:
change_threshold: 画面变化阈值(0-1)
min_interval: 关键帧最小间隔(秒)
event_window: 事件关联时间窗口(秒)
"""
self.change_threshold = change_threshold
self.min_interval = min_interval
self.event_window = event_window
def extract(self,
video_path: str,
events: List[InputEvent]) -> List[KeyFrame]:
"""
从视频中提取关键帧
Args:
video_path: 视频文件路径
events: 输入事件列表
Returns:
关键帧列表
"""
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
keyframes = []
prev_frame = None
prev_keyframe_time = -float('inf')
frame_index = 0
while True:
ret, frame = cap.read()
if not ret:
break
timestamp = frame_index / fps
# 计算与前一帧的变化
change_score = 0.0
if prev_frame is not None:
change_score = self._compute_change(prev_frame, frame)
# 判断是否为关键帧
is_keyframe = False
# 条件1:画面变化超过阈值
if change_score > self.change_threshold:
is_keyframe = True
# 条件2:有关联的点击事件
associated = self._find_associated_events(timestamp, events)
click_events = [e for e in associated if e.event_type == "click"]
if click_events:
is_keyframe = True
# 条件3:满足最小间隔
if is_keyframe and (timestamp - prev_keyframe_time) >= self.min_interval:
keyframe = KeyFrame(
frame_index=frame_index,
timestamp=timestamp,
image=frame.copy(),
associated_events=associated,
change_score=change_score
)
keyframes.append(keyframe)
prev_keyframe_time = timestamp
prev_frame = frame
frame_index += 1
cap.release()
print(f"Extracted {len(keyframes)} keyframes from {frame_index} frames")
return keyframes
def _compute_change(self, frame1: np.ndarray, frame2: np.ndarray) -> float:
"""计算两帧之间的变化程度"""
# 转换为灰度图
gray1 = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY)
gray2 = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY)
# 计算结构相似性
diff = cv2.absdiff(gray1, gray2)
change_ratio = np.mean(diff) / 255.0
return change_ratio
def _find_associated_events(self,
timestamp: float,
events: List[InputEvent]) -> List[InputEvent]:
"""找到时间窗口内的关联事件"""
return [
e for e in events
if abs(e.timestamp - timestamp) <= self.event_window
]
# generated by hugo's coding agent
Step 2: 使用多模态 AI 理解操作
2.1 构建分析 Prompt
import base64
import io
from PIL import Image
from typing import List, Dict, Any
class OperationAnalyzer:
"""
使用多模态 LLM 分析操作序列
支持的模型:
- OpenAI GPT-4 Vision
- Anthropic Claude 3
- Google Gemini Pro Vision
"""
ANALYSIS_PROMPT = """你是一个 RPA 自动化专家。请分析以下屏幕截图序列,理解用户正在执行的操作。
对于每个截图:
1. 描述当前屏幕的状态
2. 识别用户点击/操作的元素
3. 推断用户的意图
最后,请将整个操作序列抽象为一个可重复执行的自动化流程。
输出格式(JSON):
{
"workflow_name": "流程名称",
"description": "流程描述",
"steps": [
{
"step_id": 1,
"action": "click|type|scroll|wait|verify",
"target": {
"description": "目标元素的描述",
"visual_features": ["视觉特征1", "特征2"],
"text_content": "元素包含的文本(如有)",
"relative_position": "相对位置描述"
},
"parameters": {
"text": "要输入的文本(type 操作)",
"direction": "滚动方向(scroll 操作)",
"condition": "验证条件(verify 操作)"
},
"expected_result": "执行后的预期结果"
}
],
"preconditions": ["前置条件1", "前置条件2"],
"error_handling": {
"timeout": "超时处理策略",
"element_not_found": "元素未找到处理策略"
}
}
"""
def __init__(self, model_provider: str = "openai", api_key: str = None):
self.model_provider = model_provider
self.api_key = api_key
def _encode_image(self, image: np.ndarray) -> str:
"""将图像编码为 base64"""
# 转换为 PIL Image
image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
pil_image = Image.fromarray(image_rgb)
# 压缩并编码
buffer = io.BytesIO()
pil_image.save(buffer, format="JPEG", quality=85)
return base64.b64encode(buffer.getvalue()).decode()
def _build_messages(self, keyframes: List[KeyFrame]) -> List[Dict]:
"""构建多模态消息"""
messages = [
{"role": "system", "content": self.ANALYSIS_PROMPT}
]
# 构建用户消息,包含所有关键帧
content = []
for i, kf in enumerate(keyframes):
# 添加图像
content.append({
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{self._encode_image(kf.image)}"
}
})
# 添加事件信息
events_desc = self._describe_events(kf.associated_events)
content.append({
"type": "text",
"text": f"[截图 {i+1}] 时间: {kf.timestamp:.1f}s\n用户操作: {events_desc}"
})
messages.append({"role": "user", "content": content})
return messages
def _describe_events(self, events: List[InputEvent]) -> str:
"""描述事件列表"""
if not events:
return "无操作"
descriptions = []
for e in events:
if e.event_type == "click":
descriptions.append(f"点击 ({e.position[0]}, {e.position[1]})")
elif e.event_type == "key_press":
descriptions.append(f"按键 '{e.key}'")
elif e.event_type == "scroll":
dy = e.data.get("dy", 0)
direction = "向上" if dy > 0 else "向下"
descriptions.append(f"滚动 {direction}")
return "; ".join(descriptions)
def analyze(self, keyframes: List[KeyFrame]) -> Dict[str, Any]:
"""
分析关键帧序列,生成自动化流程
Args:
keyframes: 关键帧列表
Returns:
解析后的流程定义
"""
messages = self._build_messages(keyframes)
if self.model_provider == "openai":
return self._call_openai(messages)
elif self.model_provider == "anthropic":
return self._call_anthropic(messages)
else:
raise ValueError(f"Unsupported provider: {self.model_provider}")
def _call_openai(self, messages: List[Dict]) -> Dict[str, Any]:
"""调用 OpenAI API"""
from openai import OpenAI
client = OpenAI(api_key=self.api_key)
response = client.chat.completions.create(
model="gpt-4-vision-preview",
messages=messages,
max_tokens=4096,
response_format={"type": "json_object"}
)
return json.loads(response.choices[0].message.content)
def _call_anthropic(self, messages: List[Dict]) -> Dict[str, Any]:
"""调用 Anthropic API"""
import anthropic
client = anthropic.Anthropic(api_key=self.api_key)
# 转换消息格式
claude_messages = self._convert_to_claude_format(messages)
response = client.messages.create(
model="claude-3-opus-20240229",
max_tokens=4096,
messages=claude_messages
)
# 提取 JSON
content = response.content[0].text
# 尝试解析 JSON
import re
json_match = re.search(r'\{[\s\S]*\}', content)
if json_match:
return json.loads(json_match.group())
return {"error": "Failed to parse response"}
def _convert_to_claude_format(self, messages: List[Dict]) -> List[Dict]:
"""转换为 Claude 消息格式"""
claude_messages = []
for msg in messages:
if msg["role"] == "system":
# Claude 使用 system 参数而非消息
continue
elif msg["role"] == "user":
content = []
for item in msg["content"]:
if item["type"] == "text":
content.append({"type": "text", "text": item["text"]})
elif item["type"] == "image_url":
# 提取 base64 数据
url = item["image_url"]["url"]
data = url.split(",")[1]
content.append({
"type": "image",
"source": {
"type": "base64",
"media_type": "image/jpeg",
"data": data
}
})
claude_messages.append({"role": "user", "content": content})
return claude_messages
# generated by hugo's coding agent
2.2 流程优化与验证
@dataclass
class WorkflowStep:
"""工作流步骤"""
step_id: int
action: str # click, type, scroll, wait, verify
target: Dict[str, Any]
parameters: Dict[str, Any]
expected_result: str
confidence: float = 1.0
@dataclass
class Workflow:
"""自动化工作流"""
name: str
description: str
steps: List[WorkflowStep]
preconditions: List[str]
error_handling: Dict[str, str]
class WorkflowOptimizer:
"""
工作流优化器
功能:
1. 合并冗余步骤
2. 添加智能等待
3. 验证流程完整性
"""
def optimize(self, raw_workflow: Dict[str, Any]) -> Workflow:
"""优化原始工作流"""
steps = []
for step_data in raw_workflow.get("steps", []):
step = WorkflowStep(
step_id=step_data["step_id"],
action=step_data["action"],
target=step_data.get("target", {}),
parameters=step_data.get("parameters", {}),
expected_result=step_data.get("expected_result", "")
)
steps.append(step)
# 优化步骤
steps = self._merge_typing_steps(steps)
steps = self._add_implicit_waits(steps)
steps = self._add_verification_steps(steps)
return Workflow(
name=raw_workflow.get("workflow_name", "Unnamed"),
description=raw_workflow.get("description", ""),
steps=steps,
preconditions=raw_workflow.get("preconditions", []),
error_handling=raw_workflow.get("error_handling", {})
)
def _merge_typing_steps(self, steps: List[WorkflowStep]) -> List[WorkflowStep]:
"""合并连续的打字步骤"""
merged = []
typing_buffer = []
for step in steps:
if step.action == "type" and len(step.parameters.get("text", "")) == 1:
typing_buffer.append(step.parameters["text"])
else:
if typing_buffer:
# 创建合并后的打字步骤
merged_step = WorkflowStep(
step_id=len(merged) + 1,
action="type",
target=steps[len(merged)].target if merged else {},
parameters={"text": "".join(typing_buffer)},
expected_result="Text entered"
)
merged.append(merged_step)
typing_buffer = []
merged.append(step)
if typing_buffer:
merged_step = WorkflowStep(
step_id=len(merged) + 1,
action="type",
target={},
parameters={"text": "".join(typing_buffer)},
expected_result="Text entered"
)
merged.append(merged_step)
return merged
def _add_implicit_waits(self, steps: List[WorkflowStep]) -> List[WorkflowStep]:
"""在可能导致页面变化的操作后添加等待"""
result = []
for step in steps:
result.append(step)
# 点击操作后可能需要等待
if step.action == "click":
wait_step = WorkflowStep(
step_id=len(result) + 1,
action="wait",
target={},
parameters={
"type": "smart", # 智能等待
"max_timeout": 5.0,
"condition": step.expected_result
},
expected_result="Page loaded"
)
result.append(wait_step)
return result
def _add_verification_steps(self, steps: List[WorkflowStep]) -> List[WorkflowStep]:
"""添加关键验证步骤"""
result = []
for i, step in enumerate(steps):
result.append(step)
# 在最后一步后添加验证
if i == len(steps) - 1:
verify_step = WorkflowStep(
step_id=len(result) + 1,
action="verify",
target={},
parameters={
"condition": "workflow_completed",
"expected_state": step.expected_result
},
expected_result="Workflow verified"
)
result.append(verify_step)
return result
# generated by hugo's coding agent
Step 3: 智能执行引擎
3.1 视觉元素定位
传统 RPA 依赖固定坐标或 DOM 结构,而我们使用视觉+语义的方式定位元素。
import cv2
import numpy as np
from typing import Optional, Tuple, List
class VisualElementLocator:
"""
视觉元素定位器
结合多种策略定位 UI 元素:
1. 模板匹配
2. OCR 文本定位
3. 特征点匹配
4. AI 语义定位
"""
def __init__(self,
ocr_model=None,
vision_model=None):
self.ocr_model = ocr_model
self.vision_model = vision_model
def locate(self,
screenshot: np.ndarray,
target: Dict[str, Any],
reference_image: Optional[np.ndarray] = None) -> Optional[Tuple[int, int]]:
"""
在屏幕截图中定位目标元素
Args:
screenshot: 当前屏幕截图
target: 目标描述(来自流程定义)
reference_image: 参考图像(录制时的截图)
Returns:
元素中心坐标 (x, y) 或 None
"""
strategies = [
("text", self._locate_by_text),
("template", self._locate_by_template),
("features", self._locate_by_features),
("ai", self._locate_by_ai)
]
for strategy_name, strategy_func in strategies:
try:
result = strategy_func(screenshot, target, reference_image)
if result:
print(f"Located element using {strategy_name}: {result}")
return result
except Exception as e:
print(f"Strategy {strategy_name} failed: {e}")
return None
def _locate_by_text(self,
screenshot: np.ndarray,
target: Dict[str, Any],
reference: Optional[np.ndarray]) -> Optional[Tuple[int, int]]:
"""通过 OCR 文本定位"""
text_content = target.get("text_content")
if not text_content:
return None
# 使用 OCR 识别屏幕文字
# 这里假设使用 PaddleOCR 或 EasyOCR
if self.ocr_model is None:
return None
results = self.ocr_model.ocr(screenshot)
for line in results:
for word_info in line:
box, (text, confidence) = word_info
if text_content.lower() in text.lower() and confidence > 0.8:
# 计算中心点
x = int((box[0][0] + box[2][0]) / 2)
y = int((box[0][1] + box[2][1]) / 2)
return (x, y)
return None
def _locate_by_template(self,
screenshot: np.ndarray,
target: Dict[str, Any],
reference: Optional[np.ndarray]) -> Optional[Tuple[int, int]]:
"""通过模板匹配定位"""
if reference is None:
return None
# 从参考图像中提取目标区域
# 这里简化处理,实际应该根据 target 信息提取
template = reference # 应该是裁剪后的目标区域
# 模板匹配
result = cv2.matchTemplate(screenshot, template, cv2.TM_CCOEFF_NORMED)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
if max_val > 0.8: # 相似度阈值
h, w = template.shape[:2]
x = max_loc[0] + w // 2
y = max_loc[1] + h // 2
return (x, y)
return None
def _locate_by_features(self,
screenshot: np.ndarray,
target: Dict[str, Any],
reference: Optional[np.ndarray]) -> Optional[Tuple[int, int]]:
"""通过特征点匹配定位"""
if reference is None:
return None
# 使用 ORB 特征
orb = cv2.ORB_create()
kp1, des1 = orb.detectAndCompute(reference, None)
kp2, des2 = orb.detectAndCompute(screenshot, None)
if des1 is None or des2 is None:
return None
# 匹配特征点
bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True)
matches = bf.match(des1, des2)
if len(matches) < 10:
return None
# 计算匹配区域的中心
good_matches = sorted(matches, key=lambda x: x.distance)[:20]
points = [kp2[m.trainIdx].pt for m in good_matches]
x = int(np.mean([p[0] for p in points]))
y = int(np.mean([p[1] for p in points]))
return (x, y)
def _locate_by_ai(self,
screenshot: np.ndarray,
target: Dict[str, Any],
reference: Optional[np.ndarray]) -> Optional[Tuple[int, int]]:
"""通过 AI 视觉模型定位"""
if self.vision_model is None:
return None
description = target.get("description", "")
visual_features = target.get("visual_features", [])
prompt = f"""在这张屏幕截图中,请找到以下元素并返回其坐标:
元素描述:{description}
视觉特征:{', '.join(visual_features)}
请返回元素中心点的坐标,格式:{{"x": 数字, "y": 数字}}
如果找不到,返回:{{"x": null, "y": null}}
"""
# 调用视觉模型(简化示例)
response = self.vision_model.analyze(screenshot, prompt)
if response.get("x") and response.get("y"):
return (int(response["x"]), int(response["y"]))
return None
# generated by hugo's coding agent
3.2 动作执行器
import pyautogui
import time
from typing import Optional, Dict, Any
class ActionExecutor:
"""
动作执行器
支持的动作类型:
- click: 点击
- double_click: 双击
- right_click: 右键点击
- type: 输入文本
- scroll: 滚动
- wait: 等待
- hotkey: 组合键
"""
def __init__(self,
locator: VisualElementLocator,
default_timeout: float = 10.0,
retry_interval: float = 0.5):
self.locator = locator
self.default_timeout = default_timeout
self.retry_interval = retry_interval
# 配置 pyautogui
pyautogui.PAUSE = 0.1 # 动作间隔
pyautogui.FAILSAFE = True # 启用故障安全
def execute(self,
step: WorkflowStep,
context: Dict[str, Any] = None) -> bool:
"""
执行单个工作流步骤
Args:
step: 工作流步骤
context: 执行上下文
Returns:
执行是否成功
"""
action = step.action
action_handlers = {
"click": self._execute_click,
"double_click": self._execute_double_click,
"right_click": self._execute_right_click,
"type": self._execute_type,
"scroll": self._execute_scroll,
"wait": self._execute_wait,
"hotkey": self._execute_hotkey,
"verify": self._execute_verify
}
handler = action_handlers.get(action)
if handler is None:
print(f"Unknown action: {action}")
return False
try:
return handler(step, context)
except Exception as e:
print(f"Action {action} failed: {e}")
return False
def _locate_with_retry(self,
target: Dict[str, Any],
timeout: float = None) -> Optional[Tuple[int, int]]:
"""带重试的元素定位"""
timeout = timeout or self.default_timeout
start_time = time.time()
while time.time() - start_time < timeout:
screenshot = pyautogui.screenshot()
screenshot = np.array(screenshot)
screenshot = cv2.cvtColor(screenshot, cv2.COLOR_RGB2BGR)
position = self.locator.locate(screenshot, target)
if position:
return position
time.sleep(self.retry_interval)
return None
def _execute_click(self, step: WorkflowStep, context: Dict) -> bool:
"""执行点击"""
position = self._locate_with_retry(step.target)
if position is None:
print(f"Cannot locate element for click: {step.target}")
return False
pyautogui.click(position[0], position[1])
return True
def _execute_double_click(self, step: WorkflowStep, context: Dict) -> bool:
"""执行双击"""
position = self._locate_with_retry(step.target)
if position is None:
return False
pyautogui.doubleClick(position[0], position[1])
return True
def _execute_right_click(self, step: WorkflowStep, context: Dict) -> bool:
"""执行右键点击"""
position = self._locate_with_retry(step.target)
if position is None:
return False
pyautogui.rightClick(position[0], position[1])
return True
def _execute_type(self, step: WorkflowStep, context: Dict) -> bool:
"""执行输入"""
text = step.parameters.get("text", "")
# 支持变量替换
if context:
for key, value in context.items():
text = text.replace(f"{{{key}}}", str(value))
# 如果有目标元素,先点击
if step.target:
position = self._locate_with_retry(step.target)
if position:
pyautogui.click(position[0], position[1])
time.sleep(0.2)
# 使用 pyperclip 处理中文
import pyperclip
pyperclip.copy(text)
pyautogui.hotkey('ctrl', 'v')
return True
def _execute_scroll(self, step: WorkflowStep, context: Dict) -> bool:
"""执行滚动"""
direction = step.parameters.get("direction", "down")
amount = step.parameters.get("amount", 3)
if direction == "up":
pyautogui.scroll(amount)
else:
pyautogui.scroll(-amount)
return True
def _execute_wait(self, step: WorkflowStep, context: Dict) -> bool:
"""执行等待"""
wait_type = step.parameters.get("type", "fixed")
if wait_type == "fixed":
duration = step.parameters.get("duration", 1.0)
time.sleep(duration)
return True
elif wait_type == "smart":
# 智能等待:等待页面稳定
max_timeout = step.parameters.get("max_timeout", 5.0)
return self._wait_for_stable(max_timeout)
elif wait_type == "element":
# 等待元素出现
target = step.parameters.get("target", step.target)
max_timeout = step.parameters.get("max_timeout", 10.0)
return self._locate_with_retry(target, max_timeout) is not None
return True
def _wait_for_stable(self, timeout: float) -> bool:
"""等待页面稳定(画面不再变化)"""
start_time = time.time()
prev_screenshot = None
stable_count = 0
while time.time() - start_time < timeout:
screenshot = np.array(pyautogui.screenshot())
if prev_screenshot is not None:
# 比较两帧
diff = np.abs(screenshot.astype(float) - prev_screenshot.astype(float))
change_ratio = np.mean(diff) / 255.0
if change_ratio < 0.01:
stable_count += 1
if stable_count >= 3: # 连续3次稳定
return True
else:
stable_count = 0
prev_screenshot = screenshot
time.sleep(0.2)
return True # 超时也返回 True,继续执行
def _execute_hotkey(self, step: WorkflowStep, context: Dict) -> bool:
"""执行组合键"""
keys = step.parameters.get("keys", [])
if keys:
pyautogui.hotkey(*keys)
return True
def _execute_verify(self, step: WorkflowStep, context: Dict) -> bool:
"""执行验证"""
condition = step.parameters.get("condition", "")
if condition == "element_exists":
target = step.parameters.get("target", step.target)
return self._locate_with_retry(target, timeout=3.0) is not None
elif condition == "text_visible":
expected_text = step.parameters.get("expected_text", "")
# 使用 OCR 验证文本
screenshot = np.array(pyautogui.screenshot())
# ... OCR 验证逻辑
return True
return True
# generated by hugo's coding agent
Step 4: 工作流编排与调度
4.1 工作流运行器
from enum import Enum
from dataclasses import dataclass, field
from typing import Callable, Dict, Any, List
import logging
class ExecutionStatus(Enum):
PENDING = "pending"
RUNNING = "running"
SUCCESS = "success"
FAILED = "failed"
CANCELLED = "cancelled"
@dataclass
class ExecutionResult:
"""执行结果"""
workflow_name: str
status: ExecutionStatus
steps_completed: int
total_steps: int
error_message: Optional[str] = None
execution_time: float = 0.0
step_results: List[Dict] = field(default_factory=list)
class WorkflowRunner:
"""
工作流运行器
功能:
- 执行工作流
- 错误处理与重试
- 断点恢复
- 执行日志
"""
def __init__(self,
executor: ActionExecutor,
max_retries: int = 3,
screenshot_on_error: bool = True):
self.executor = executor
self.max_retries = max_retries
self.screenshot_on_error = screenshot_on_error
self.logger = logging.getLogger("WorkflowRunner")
self._callbacks: Dict[str, List[Callable]] = {
"on_step_start": [],
"on_step_complete": [],
"on_step_error": [],
"on_workflow_complete": []
}
def register_callback(self, event: str, callback: Callable):
"""注册回调函数"""
if event in self._callbacks:
self._callbacks[event].append(callback)
def _emit(self, event: str, *args, **kwargs):
"""触发事件"""
for callback in self._callbacks.get(event, []):
try:
callback(*args, **kwargs)
except Exception as e:
self.logger.error(f"Callback error: {e}")
def run(self,
workflow: Workflow,
context: Dict[str, Any] = None,
start_from: int = 0) -> ExecutionResult:
"""
运行工作流
Args:
workflow: 工作流定义
context: 执行上下文(变量)
start_from: 从第几步开始(用于断点恢复)
Returns:
执行结果
"""
context = context or {}
start_time = time.time()
step_results = []
self.logger.info(f"Starting workflow: {workflow.name}")
# 检查前置条件
if not self._check_preconditions(workflow.preconditions):
return ExecutionResult(
workflow_name=workflow.name,
status=ExecutionStatus.FAILED,
steps_completed=0,
total_steps=len(workflow.steps),
error_message="Preconditions not met"
)
# 执行步骤
for i, step in enumerate(workflow.steps[start_from:], start=start_from):
step_start = time.time()
self._emit("on_step_start", step, i)
success = False
error_msg = None
# 重试逻辑
for attempt in range(self.max_retries):
try:
success = self.executor.execute(step, context)
if success:
break
except Exception as e:
error_msg = str(e)
self.logger.warning(f"Step {i} attempt {attempt + 1} failed: {e}")
if attempt < self.max_retries - 1:
time.sleep(1.0) # 重试前等待
step_result = {
"step_id": step.step_id,
"action": step.action,
"success": success,
"duration": time.time() - step_start,
"error": error_msg
}
step_results.append(step_result)
if success:
self._emit("on_step_complete", step, i, step_result)
self.logger.info(f"Step {i} completed: {step.action}")
else:
self._emit("on_step_error", step, i, error_msg)
self.logger.error(f"Step {i} failed: {step.action}")
# 保存错误截图
if self.screenshot_on_error:
self._save_error_screenshot(workflow.name, i)
# 应用错误处理策略
should_continue = self._handle_error(workflow, step, error_msg)
if not should_continue:
return ExecutionResult(
workflow_name=workflow.name,
status=ExecutionStatus.FAILED,
steps_completed=i,
total_steps=len(workflow.steps),
error_message=error_msg,
execution_time=time.time() - start_time,
step_results=step_results
)
result = ExecutionResult(
workflow_name=workflow.name,
status=ExecutionStatus.SUCCESS,
steps_completed=len(workflow.steps),
total_steps=len(workflow.steps),
execution_time=time.time() - start_time,
step_results=step_results
)
self._emit("on_workflow_complete", workflow, result)
self.logger.info(f"Workflow completed: {workflow.name}")
return result
def _check_preconditions(self, preconditions: List[str]) -> bool:
"""检查前置条件"""
# 简化实现,实际应该有更复杂的条件检查
return True
def _handle_error(self,
workflow: Workflow,
step: WorkflowStep,
error: str) -> bool:
"""处理错误,返回是否继续执行"""
error_handling = workflow.error_handling
if "element_not_found" in error.lower():
strategy = error_handling.get("element_not_found", "stop")
elif "timeout" in error.lower():
strategy = error_handling.get("timeout", "stop")
else:
strategy = error_handling.get("default", "stop")
return strategy == "skip" or strategy == "continue"
def _save_error_screenshot(self, workflow_name: str, step_index: int):
"""保存错误截图"""
import os
os.makedirs("error_screenshots", exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
path = f"error_screenshots/{workflow_name}_{step_index}_{timestamp}.png"
screenshot = pyautogui.screenshot()
screenshot.save(path)
self.logger.info(f"Error screenshot saved: {path}")
# generated by hugo's coding agent
4.2 调度器
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from typing import Dict, Any
class WorkflowScheduler:
"""
工作流调度器
支持:
- Cron 表达式调度
- 间隔调度
- 事件触发
- 手动触发
"""
def __init__(self, runner: WorkflowRunner):
self.runner = runner
self.scheduler = BackgroundScheduler()
self.workflows: Dict[str, Workflow] = {}
def register_workflow(self, workflow: Workflow):
"""注册工作流"""
self.workflows[workflow.name] = workflow
def schedule_cron(self,
workflow_name: str,
cron_expression: str,
context: Dict[str, Any] = None):
"""
使用 Cron 表达式调度
Args:
workflow_name: 工作流名称
cron_expression: Cron 表达式(如 "0 9 * * *" 每天9点)
context: 执行上下文
"""
workflow = self.workflows.get(workflow_name)
if not workflow:
raise ValueError(f"Workflow not found: {workflow_name}")
parts = cron_expression.split()
trigger = CronTrigger(
minute=parts[0],
hour=parts[1],
day=parts[2],
month=parts[3],
day_of_week=parts[4]
)
self.scheduler.add_job(
func=self._run_workflow,
trigger=trigger,
args=[workflow_name, context],
id=f"cron_{workflow_name}",
replace_existing=True
)
def schedule_interval(self,
workflow_name: str,
hours: int = 0,
minutes: int = 0,
seconds: int = 0,
context: Dict[str, Any] = None):
"""
间隔调度
Args:
workflow_name: 工作流名称
hours, minutes, seconds: 间隔时间
context: 执行上下文
"""
workflow = self.workflows.get(workflow_name)
if not workflow:
raise ValueError(f"Workflow not found: {workflow_name}")
trigger = IntervalTrigger(
hours=hours,
minutes=minutes,
seconds=seconds
)
self.scheduler.add_job(
func=self._run_workflow,
trigger=trigger,
args=[workflow_name, context],
id=f"interval_{workflow_name}",
replace_existing=True
)
def _run_workflow(self, workflow_name: str, context: Dict[str, Any]):
"""执行工作流"""
workflow = self.workflows.get(workflow_name)
if workflow:
result = self.runner.run(workflow, context)
print(f"Workflow {workflow_name} completed: {result.status}")
def start(self):
"""启动调度器"""
self.scheduler.start()
print("Scheduler started")
def stop(self):
"""停止调度器"""
self.scheduler.shutdown()
print("Scheduler stopped")
def run_now(self, workflow_name: str, context: Dict[str, Any] = None):
"""立即运行工作流"""
return self._run_workflow(workflow_name, context)
# generated by hugo's coding agent
完整使用示例
def main():
"""完整的录屏 RPA 使用示例"""
# 1. 录制用户操作
print("=== 步骤 1: 录制操作 ===")
recorder = ScreenRecorder(output_dir="./recordings", fps=10)
input("按 Enter 开始录制,再次按 Enter 停止...")
video_path = recorder.start_recording("demo_workflow")
input("正在录制... 按 Enter 停止")
session = recorder.stop_recording()
# 2. 提取关键帧
print("\n=== 步骤 2: 提取关键帧 ===")
extractor = KeyFrameExtractor()
keyframes = extractor.extract(session.video_path, session.events)
# 3. AI 分析操作
print("\n=== 步骤 3: AI 分析 ===")
analyzer = OperationAnalyzer(
model_provider="openai",
api_key="your-api-key"
)
raw_workflow = analyzer.analyze(keyframes)
# 4. 优化工作流
print("\n=== 步骤 4: 优化工作流 ===")
optimizer = WorkflowOptimizer()
workflow = optimizer.optimize(raw_workflow)
print(f"工作流: {workflow.name}")
print(f"步骤数: {len(workflow.steps)}")
for step in workflow.steps:
print(f" {step.step_id}. {step.action}: {step.target.get('description', '')}")
# 5. 保存工作流
workflow_path = f"./workflows/{workflow.name}.json"
save_workflow(workflow, workflow_path)
# 6. 执行工作流
print("\n=== 步骤 5: 执行工作流 ===")
locator = VisualElementLocator()
executor = ActionExecutor(locator)
runner = WorkflowRunner(executor)
# 添加回调
runner.register_callback(
"on_step_complete",
lambda step, i, result: print(f" ✓ Step {i}: {step.action}")
)
result = runner.run(workflow)
print(f"\n执行结果: {result.status}")
print(f"完成步骤: {result.steps_completed}/{result.total_steps}")
print(f"耗时: {result.execution_time:.1f}s")
# 7. 设置定时任务(可选)
print("\n=== 步骤 6: 设置调度 ===")
scheduler = WorkflowScheduler(runner)
scheduler.register_workflow(workflow)
# 每天早上9点执行
scheduler.schedule_cron(workflow.name, "0 9 * * *")
scheduler.start()
print("调度已启动,按 Ctrl+C 退出")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
scheduler.stop()
if __name__ == "__main__":
main()
# generated by hugo's coding agent
最佳实践总结
录制阶段
- 稳定的操作节奏:录制时保持适度的操作速度,避免过快
- 明确的等待:在页面加载后稍作停顿,让系统捕获完整状态
- 避免拖拽:拖拽操作识别困难,尽量使用点击替代
- 一致的窗口状态:保持窗口最大化,避免尺寸变化
分析阶段
- 提供业务上下文:让 AI 理解操作的业务意义
- 验证生成的流程:人工检查 AI 生成的步骤是否完整
- 添加异常处理:为可能失败的步骤定义降级策略
执行阶段
- 智能等待:使用页面稳定检测而非固定延时
- 多策略定位:结合文本、视觉、语义多种方式
- 错误恢复:实现断点续执行能力
- 日志记录:详细记录每步执行情况
运维阶段
- 监控告警:实时监控执行状态,失败时及时告警
- 版本管理:工作流定义纳入版本控制
- 定期校验:UI 变化后及时更新工作流
- 性能优化:分析执行日志,优化慢操作
结语
通过桌面录屏实现 RPA 自动化,本质上是将人类的操作经验转化为可执行的自动化流程。这种方式结合了多模态 AI 的理解能力和传统 RPA 的执行能力,让自动化变得更加智能和易用。
随着视觉语言模型的发展,未来的 RPA 系统将能够:
- 理解更复杂的操作意图
- 自动适应 UI 变化
- 处理更多异常情况
- 实现真正的"所见即所得"自动化
这不仅降低了自动化的技术门槛,也为企业数字化转型提供了新的可能。