Browser API
接口认证及接口形式
签名
请求所需前置信息
- X-API-KEY: 由我侧分配,为唯一uuid,会对这个ID进行数据范围限定。
例子:
curl --location --request PATCH 'http://domain:port/v2/env/open_env' \
--header 'X-API-KEY: API-KEY' \
--header 'User-Agent: Apifox/1.0.0 (https://apifox.com)' \
--header 'Accept: */*' \
--header 'Host: domain:port' \
--header 'Connection: keep-alive'
最新域名
http://domain:port
- 测试环境可用key
API-KEY
接口形式
- 响应基本结构:
| 字段 | 类型 | 描述 |
|---|---|---|
| code | Integer | 响应返回码,返回码枚举如下:0: 成功其它: 错误码 |
| msg | String | 异常时返回错误的提示信息 |
| data | - | 数据对象/数据列表,根据不同的接口而定义 |
| next | String | 当且仅当接口为列表型接口时可能存在,表示下一页的起始值(不包含),当为null时表示没有下一页(分页的最后一页或者没有数据) |
// data对象示例
{
"code": 0,
"msg": "成功",
"data": {}
}
// data数组示例
{
"code": 0,
"msg": "成功",
"data": []
}
// 异常示例
{
"code": 400009,
"msg": "参数异常",
"data": null
}
接口
保存环境配置
URL: /v2/env
请求方式: POST
参数:
| 名称 | 类型 | 必选 | 说明 |
|---|---|---|---|
| random_ua | bool | 是 | User-Agent进行随机生成,true:每次打开重新生成 false: 使用第一次生成的UA |
| random_fingerprint | bool | 是 | 随机指纹:true: 每次打开重新生成指纹,false: 使用第一次生成的指纹 |
| proxy_update_type | string | 是 | 代理账号数据更新模式: COVER: 覆盖 APPEND: 追加 |
| proxy_way | string | 是 | 代理方式: NON_USE : 不使用代理 (默认新加坡) RANDOM:随机选择配置代理账号USE_ONE:配置代理账号只使用一次 |
| proxys | [proxy] | 是 | 代理账号信息 |
- proxy
| 名称 | 类型 | 必选 | 说明 |
|---|---|---|---|
| type | string | 是 | 代理类型 (NON_USE:不使用 HTTP HTTPS SSH SOCKS5 ) |
| host | string | 是 | 代理主机 |
| port | string | 是 | 代理端口 |
| user_name | string | 是 | 代理账号 |
| passwd | string | 是 | 代理密码 |
- 请求示例
- 全部数据修改
{
"proxy_update_type" : "COVER",
"proxy_way": "RANDOM",
"proxys": [
{"type": "SOCKS5","host":"ep.test.com","port":"6616","user_name":"test","passwd":"test"},
{"type": "SOCKS5","host":"ep.test.com","port":"6616","user_name":"test","passwd":"test"},
],
"random_ua": true,
"random_fingerprint": true
}
- 修改代理数据
{
"proxy_update_type" : "COVER",
"proxys": [
{"type": "SOCKS5","host":"ep.test.com","port":"6616","user_name":"test","passwd":"test"},
{"type": "SOCKS5","host":"ep.test.com","port":"6616","user_name":"test","passwd":"test"},
]
}
- 修改代理方式
{
"proxy_way": "USE_ONE"
}
- 修改UA数据
{
"random_ua": true
"ua": ""
}
- 修改随机指纹
{
"random_fingerprint": true
}
- 响应示例
{
"code": 0,
"msg": "成功",
"data": null
}
打开环境
URL: /v2/env/open_env
请求方式: PATCH
响应参数:
| 名称 | 类型 | 必选 | 说明 |
|---|---|---|---|
| url | string | 是 | cdp控制连接 |
| session_id | string | 是 | 当前cdp会话id |
异常字典:
| 错误码 | 信息 | 解释 |
|---|---|---|
| 300104 | 代理配置已耗尽,请更新代理配置 | proxy_way为USE_ONE 时,账号所配置的代理信息已使用完 |
| 300105 | 浏览器实例数量启动过多,请关闭部分实例后重试 | 开启运行的实例过多,需要关闭之前不使用的实例 |
| 300106 | 云浏览器异常 | 云浏览器启动异常,请查看信息判断(常见于代理信息异常,无法启动) |
| 300000 | 业务异常 | 系统异常,请查看信息判断 |
- 响应示例
{
"code": 0,
"msg": "成功",
"data": {
"url": "ws://8.222.226.165:8081/cdp/c0d7fb01933d472687c04bdb47337024",
"session_id": "c0d7fb01933d472687c04bdb47337024"
}
}
关闭环境
URL: /v2/env/close_env
请求方式: PATCH
- 响应示例
{
"code": 0,
"msg": "成功",
"data": null
}
测试代码
import asyncio
import logging
import time
import requests
from playwright.async_api import async_playwright, Page
import os
from typing import Optional, Tuple
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('browser_control.log', encoding='utf-8'), # 写入文件
logging.StreamHandler() # 输出到控制台
]
)
logger = logging.getLogger(__name__)
# API端点和密钥
OPEN_ENV_URL = "http://domain:port/v2/env/open_env"
CLOSE_ENV_URL = "http://domain:port/v2/env/close_env"
X_API_KEY = "API-KEY" # 请替换为您的实际密钥
# 创建截图保存目录(如果不存在)
SCREENSHOT_DIR = "screenshots"
os.makedirs(SCREENSHOT_DIR, exist_ok=True)
# 定义自定义异常类,用于区分不同类型的错误
class BrowserAPIError(Exception):
"""自定义异常类,用于表示与浏览器API交互时的错误"""
def __init__(self, code: int, message: str):
self.code = code
self.message = message
super().__init__(f"API Error [{code}]: {message}")
async def open_browser_session() -> Tuple[Optional[str], Optional[str]]:
"""
调用 /open_env 接口获取新的浏览器会话ID 和 CDP URL。
成功时返回 (session_id, cdp_url)。
失败时记录错误日志并返回 (None, None),或抛出自定义异常。
"""
headers = {
"X-API-KEY": X_API_KEY,
"Content-Type": "application/json"
}
try:
# 发送 PATCH 请求
response = requests.patch(OPEN_ENV_URL, headers=headers, timeout=30) # 添加超时
response.raise_for_status() # 检查 HTTP 状态码
# 解析 JSON 响应
try:
data = response.json()
except requests.exceptions.JSONDecodeError:
logger.error(f"API 响应非JSON格式: {response.text}")
return None, None
# 获取业务状态码和消息
code = data.get('code')
message = data.get('msg', '未知错误')
# 检查业务逻辑是否成功
if code == 0:
session_id = data['data']['session_id']
cdp_url = data['data']['url']
logger.info(f"成功获取新会话: session_id={session_id}, CDP URL={cdp_url}")
return session_id, cdp_url
else:
# 处理已知的错误码
error_messages = {
300104: "代理配置已耗尽,请更新代理配置。proxy_way为USE_ONE时,账号所配置的代理信息已使用完",
300105: "浏览器实例数量启动过多,请关闭部分实例后重试。开启运行的实例过多,需要关闭之前不使用的实例",
300106: "云浏览器异常,云浏览器启动异常,请查看信息判断(常见于代理信息异常,无法启动)",
300000: "业务异常,系统异常,请查看信息判断"
}
# 获取详细错误信息或使用默认信息
logger.error(f"打开接口响应: {message}")
detailed_message = error_messages.get(code, f"未知错误: {message}")
logger.error(f"获取会话失败: [{code}] {detailed_message}")
# 可以选择抛出异常以便调用者处理特定错误,或者简单返回 None
# 这里我们记录日志并返回 None
return None, None
except requests.exceptions.Timeout:
logger.error("请求超时,请检查网络连接或API服务状态")
return None, None
except requests.exceptions.ConnectionError:
logger.error("网络连接错误,请检查网络或API地址")
return None, None
except requests.exceptions.RequestException as e:
logger.error(f"HTTP请求异常: {e}")
return None, None
except KeyError as e:
logger.error(f"API响应数据结构异常,缺少必要字段: {e}")
return None, None
except Exception as e:
logger.error(f"获取会话时发生未预期的错误: {e}")
return None, None
async def close_browser_session(session_id: str) -> bool:
"""
调用 /close_env 接口关闭指定的浏览器会话
:param session_id: 要关闭的会话ID
:return: 成功返回 True,失败返回 False
"""
if not session_id:
logger.warning("尝试关闭空的会话ID")
return False
headers = {
"X-API-KEY": X_API_KEY,
"Content-Type": "application/json"
}
try:
# 注意:通常关闭会话需要在请求体或参数中传递 session_id
# 这里假设 session_id 通过 URL 参数或请求体传递,你需要根据实际API文档调整
# 示例:如果通过请求体传递
payload = {"session_id": session_id}
response = requests.patch(CLOSE_ENV_URL, headers=headers, json=payload, timeout=30)
response.raise_for_status()
data = response.json()
if data.get('code') == 0:
logger.info(f"成功关闭会话: session_id={session_id}")
return True
else:
logger.error(f"关闭会话失败: [{data.get('code')}] {data.get('msg')}")
return False
except Exception as e:
logger.error(f"关闭会话失败: {e}")
return False
async def take_screenshot_with_playwright(cdp_url: str, screenshot_path: str):
"""
使用 Playwright 连接到 CDP URL,打开网页并截图
:param cdp_url: CDP WebSocket URL
:param screenshot_path: 截图要保存的完整路径和文件名
"""
async with async_playwright() as p:
browser = None
page = None
try:
# 创建浏览器实例,使用 CDP 连接
browser = await p.chromium.connect_over_cdp(cdp_url)
page = await browser.new_page()
# 打开目标网站 (注意 URL 末尾的空格,已修正)
await page.goto("https://ip111.cn/")
logger.info("已导航至 https://ip111.cn/")
# 等待页面加载完成(可选:增加等待时间确保内容渲染)
await page.wait_for_timeout(2000) # 或使用 await page.wait_for_load_state("networkidle")
# 截图
await page.screenshot(path=screenshot_path, full_page=True) # 建议截图整个页面
logger.info(f"截图已保存至: {screenshot_path}")
except Exception as e:
logger.error(f"Playwright 操作失败: {e}")
raise
finally:
# 确保资源被正确释放
if page:
await page.close()
if browser:
await browser.close()
async def run_cycle(cycle_number: int) -> bool:
"""
执行一次完整的循环:打开会话 -> 截图 -> 关闭会话
:param cycle_number: 当前循环的序号 (1-6)
:return: 成功返回 True,失败返回 False
"""
logger.info(f"开始第 {cycle_number} 轮操作...")
session_id = None
cdp_url = None
try:
# 1. 获取新的浏览器会话
session_id, cdp_url = await open_browser_session()
if not session_id or not cdp_url:
logger.error("无法获取浏览器会话,跳过本次循环")
return False # 可以根据需要决定是返回 False 还是抛出异常
# 生成唯一的截图文件名
screenshot_filename = f"screenshot_{cycle_number:02d}.png" # 例如: screenshot_01.png
screenshot_path = os.path.join(SCREENSHOT_DIR, screenshot_filename)
# 2. 使用 Playwright 控制浏览器并截图,传入特定的文件名
await take_screenshot_with_playwright(cdp_url, screenshot_path)
logger.info(f"第 {cycle_number} 轮操作成功完成")
return True
except Exception as e:
logger.error(f"第 {cycle_number} 轮操作发生异常: {e}")
return False
finally:
# 3. 确保无论成功与否都尝试关闭浏览器会话以避免资源泄漏
# 注意:只有在成功获取到 session_id 后才尝试关闭
if session_id:
try:
await close_browser_session(session_id)
except Exception as close_error:
logger.error(f"尝试关闭会话 {session_id} 时发生错误: {close_error}")
async def main():
"""
主函数:循环执行指定次数
"""
total_cycles = 1 # 你可以根据需要调整循环次数
successful_cycles = 0
for i in range(1, total_cycles + 1):
# 执行一轮操作
success = await run_cycle(i)
if success:
successful_cycles += 1
logger.info(f"第 {i} 次循环成功")
else:
logger.error(f"第 {i} 次循环失败")
# 可选:在循环之间添加延迟
if i < total_cycles:
delay_seconds = 2 # 增加延迟时间
logger.info(f"等待 {delay_seconds} 秒后进行下一次循环...")
await asyncio.sleep(delay_seconds)
logger.info(f"所有循环完成。成功: {successful_cycles}/{total_cycles}")
if __name__ == "__main__":
asyncio.run(main())