pyppeteer 简单使用

API文档

初始化环境

1
pip install pyppeteer

下载依赖

启动docker容器

1
docker run -d -p 3000:3000 browserless/chrome

访问 localhost:3000 打开页面API调用

API

简单链接测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncio
from pyppeteer import launch

async def main():
remote_browser_api = "ws://localhost:30386"
browser = await connect(
{"browserWSEndpoint": remote_browser_api, "ignoreHTTPSErrors": True}
)
page = await browser.newPage()
url = 'https://intoli.com/blog/not-possible-to-block-chrome-headless/chrome-headless-test.html'
await page.goto(url)
# 截图
await page.screenshot({'path': 'chrome-headless-test.png'})
await browser.close()

asyncio.get_event_loop().run_until_complete(main())

过检测

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import asyncio
from pyppeteer import connect

async def main():
remote_browser_api = "ws://localhost:3000"
browser = await connect({ "browserWSEndpoint": remote_browser_api,
"ignoreHTTPSErrors": True,
"args": ['--no-sandbox'],
"headless": True,
})
page = await browser.newPage()
userAgent = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/64.0.3282.39 Safari/537.36'
await page.setUserAgent(userAgent)
await page.evaluateOnNewDocument('() =>{ Object.defineProperties(navigator, { webdriver:{ get: () => false } }) }')
await page.evaluateOnNewDocument("""() => {window.navigator.chrome = { runtime: {},}}""")
await page.evaluateOnNewDocument("""() => {
const originalQuery = window.navigator.permissions.query;
return window.navigator.permissions.query = (parameters) => (
parameters.name === 'notifications' ?
Promise.resolve({ state: Notification.permission }) :
originalQuery(parameters)
)}""")
await page.evaluateOnNewDocument("""() => {
Object.defineProperty(navigator, 'plugins', {
get: () => [1, 2, 3, 4, 5],
});}""")
url = 'https://bot.sannysoft.com/'
await page.goto(url)
w = await page.evaluate("!window.chrome")
print(w)
# 截图
await page.screenshot({'path': 'chrome-headless-test.png'})
await browser.close()

asyncio.get_event_loop().run_until_complete(main())

或者

1
pip install pyppeteer_stealth
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import asyncio
from pyppeteer import launch
from pyppeteer_stealth import stealth

async def main():
browser = await launch(headless=True)
page = await browser.newPage()

await stealth(page) # <-- Here

await page.goto("https://bot.sannysoft.com/")
await page.screenshot({'path': 'chrome-headless-test.png'})
await browser.close()
asyncio.get_event_loop().run_until_complete(main())

一些常用的方法

设置代理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
browser = await launch({'headless': True, 'timeout': 500, 'args': ['--disable-extensions',
'--hide-scrollbars',
'--disable-bundled-ppapi-flash',
'--mute-audio',
'--no-sandbox',
'--disable-setuid-sandbox',
'--disable-gpu',
'--proxy-server=localhost:1080',
], })

# 设置代理ip验证
await page.authenticate({
'username': '用户名',
'password': '密码'
})

# 设置User-Agent
page = await browser.newPage()
await page.setUserAgent("Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.86 Safari/537.36")

执行JS

1
2
3
4
5
6
7
8
9
# 在网页上执行js 脚本
dimensions = await page.evaluate(pageFunction='''() => {
return {
width: document.documentElement.clientWidth, // 页面宽度
height: document.documentElement.clientHeight, // 页面高度
deviceScaleFactor: window.devicePixelRatio, // 像素比 1.0000000149011612
}
}''', force_expr=False) # force_expr=False 执行的是函数
print(dimensions)

设置页面视图大小

1
await page.setViewport(viewport={'width': 1280, 'height': 800})

超时间见 1000 毫秒

1
2
3
res = await page.goto('https://www.toutiao.com/', options={'timeout': 1000})
resp_headers = res.headers # 响应头
resp_status = res.status # 响应状态

等待

1
2
3
await asyncio.sleep(2)
# 第二种方法,在while循环里强行查询某元素进行等待
await page.querySelector('.t')

滚动到页面底部

1
await page.evaluate('window.scrollBy(0, document.body.scrollHeight)')

截图 保存图片

1
await page.screenshot({'path': 'toutiao.png'})

打印页面cookies

1
print(await page.cookies())

获取所有 html 内容

1
print(await page.content())

监听请求和响应

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
page.on('request', intercept_response)
page.on("response", request_check)

# 请求处理函数
async def request_check(req):
'''请求过滤'''
if req.resourceType in ['image', 'media', 'eventsource', 'websocket']:
await req.abort()
else:
await req.continue_()
# 响应处理函数
async def intercept_response(res):
resourceType = res.request.resourceType
if resourceType in ['image', 'media']:
resp = await res.text()
print(resp)

使用 xvfb 做有屏幕的显示

Docker地址

本地安装

1
2
sudo apt-get update
sudo apt-get install xvfb
1
2
3
4
5
6
7
8
import time
from selenium.webdriver import Chrome
driver = Chrome('./chromedriver')
driver.get('https://bot.sannysoft.com/')
time.sleep(5)
driver.save_screenshot('screenshot.png')
driver.close()
print('运行完成')
1
xvfb-run python3 test.py -s -screen 0 1920x1080x16

DemoFunc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
# coding:utf8
import re
import asyncio

import pyppeteer
from pyppeteer import launcher


if pyppeteer.version <= "0.0.25":
# hook 禁用 防止监测webdriver
launcher.AUTOMATION_ARGS.remove("--enable-automation")

from pyppeteer import launch

from pyppeteer.network_manager import Request, Response
from pyppeteer.dialog import Dialog


proxy = "http://127.0.0.1:1080"


args = [
"--start-maximized",
"--no-sandbox",
"--ignore-certificate-errors",
"--log-level=3",
"--enable-extensions",
"--window-size=1920,1080",
# "--proxy-server={}".format(proxy),
"--user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.97 Safari/537.36",
]

if pyppeteer.version <= "0.0.25":
args.append("--disable-infobars")
else:
args.append("--disable-blink-features=AutomationControlled")

launch_args = {
"headless": False,
"args": args,
"autoClose": False,
"dumpio": True,
}

if pyppeteer.version > "0.0.25":
launch_args.update(
{"ignoreDefaultArgs": ["--enable-automation", "--disable-extensions"]}
)


async def modify_url(request: Request):
"""
# 启用拦截器
await page.setRequestInterception(True)
page.on("request", use_proxy_base)
:param request:
:return:
"""
if request.url == "https://www.baidu.com/":
await request.continue_({"url": "https://www.baidu.com/s?wd=ip&ie=utf-8"})
else:
await request.continue_()


async def get_content(response: Response):
"""
# 注意这里不需要设置 page.setRequestInterception(True)
page.on("response", get_content)
:param response:
:return:
"""
if response.url == "https://www.baidu.com/":
content = await response.text()
title = re.search(b"<title>(.*?)</title>", content)
print(title.group(1))


async def handle_dialog(dialog: Dialog):
"""
page.on("dialog", get_content)
:param dialog:
:return:
"""
await dialog.dismiss()


import aiohttp

aiohttp_session = aiohttp.ClientSession(loop=asyncio.get_event_loop())


async def use_proxy_base(request: Request):
"""
# 启用拦截器
await page.setRequestInterception(True)
page.on("request", use_proxy_base)
:param request:
:return:
"""
# 构造请求并添加代理
req = {
"headers": request.headers,
"data": request.postData,
"proxy": proxy, # 使用全局变量 则可随意切换
"timeout": 5,
"ssl": False,
}
try:
# 使用第三方库获取响应
async with aiohttp_session.request(
method=request.method, url=request.url, **req
) as response:
body = await response.read()
except Exception as e:
await request.abort()
return

# 数据返回给浏览器
resp = {"body": body, "headers": response.headers, "status": response.status}
await request.respond(resp)
return


# 静态资源缓存
static_cache = {}


async def use_proxy_and_cache(request: Request):
"""
# 启用拦截器
await page.setRequestInterception(True)
page.on("request", use_proxy_base)
:param request:
:return:
"""
global static_cache
if request.url not in static_cache:
# 构造请求并添加代理
req = {
"headers": request.headers,
"data": request.postData,
"proxy": proxy, # 使用全局变量 则可随意切换
"timeout": 5,
"ssl": False,
}
try:
# 使用第三方库获取响应
async with aiohttp_session.request(
method=request.method, url=request.url, **req
) as response:
body = await response.read()
except Exception as e:
await request.abort()
return

# 数据返回给浏览器
resp = {"body": body, "headers": response.headers, "status": response.status}
# 判断数据类型 如果是静态文件则缓存起来
content_type = response.headers.get("Content-Type")
if content_type and ("javascript" in content_type or "/css" in content_type):
static_cache[request.url] = resp
else:
resp = static_cache[request.url]

await request.respond(resp)
return


async def pass_webdriver(request: Request):
"""
# 启用拦截器
await page.setRequestInterception(True)
page.on("request", use_proxy_base)
:param request:
:return:
"""
# 构造请求并添加代理
req = {
"headers": request.headers,
"data": request.postData,
"proxy": proxy, # 使用全局变量 则可随意切换
"timeout": 5,
"ssl": False,
}
try:
# 使用第三方库获取响应
async with aiohttp_session.request(
method=request.method, url=request.url, **req
) as response:
body = await response.read()
except Exception as e:
await request.abort()
return

if request.url == "https://www.baidu.com/":
with open("pass_webdriver.js") as f:
js = f.read()
# 在html源码头部添加js代码 修改navigator属性
body = body.replace(b"<title>", b"<script>%s</script><title>" % js.encode())

# 数据返回给浏览器
resp = {"body": body, "headers": response.headers, "status": response.status}
await request.respond(resp)
return


async def interception_test():
# 启动浏览器
browser = await launch(**launch_args)
# 新建标签页
page = await browser.newPage()
# 设置页面打开超时时间
page.setDefaultNavigationTimeout(10 * 1000)
# 设置窗口大小
await page.setViewport({"width": 1920, "height": 1080})

# 设置拦截器
# 1. 修改请求的url
if 0:
# 启用拦截器
await page.setRequestInterception(True)
# 捕获request,response
if pyppeteer.version <= "0.0.25":
page.on("request", modify_url)
else:
page.on("request", lambda r: asyncio.ensure_future(modify_url(r)))
# 2. 获取响应内容
if 0:
# 注意这里不需要设置 page.setRequestInterception(True)
if pyppeteer.version <= "0.0.25":
page.on("response", get_content)
else:
page.on("response", lambda r: asyncio.ensure_future(get_content(r)))
# 3. 使用代理
if 0:
# 启用拦截器
await page.setRequestInterception(True)
if pyppeteer.version <= "0.0.25":
page.on("request", pass_webdriver)
# page.on("request", use_proxy_base)
# page.on("request", use_proxy_and_cache)
else:
page.on("request", lambda r: asyncio.ensure_future(pass_webdriver(r)))

await page.goto("http://www.baidu.com")

await asyncio.sleep(10)

# 关闭浏览器
await page.close()
await browser.close()
return


if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(interception_test())