aiohttp
介绍
- aiohttp 强调的是异步并发。提供了对asyncio/await的支持,可以实现单线程并发IO操作。
安装
pip install aiohttp
使用方法
1. 普通发请求
import aiohttp import asyncio async def fetch(): async with aiohttp.ClientSession() as session: async with session.get('https://www.csdn.net/') as response: print(await response.text()) loop = asyncio.get_event_loop() tasks = [fetch(), ] loop.run_until_complete(asyncio.wait(tasks))
2. 添加请求参数
import aiohttp import asyncio params = {
'name': 'zhangsan', 'age': 10} async def fetch(): async with aiohttp.ClientSession() as session: async with session.get('https://www.baidu.com/s', params=params) as response: print(response.url) loop = asyncio.get_event_loop() tasks = [fetch(), ] loop.run_until_complete(asyncio.wait(tasks))
- 代码运行结果:

3. 请求头中自定义User-Agent
import aiohttp import asyncio headers = {
"User-Agent": "my-user-agent" } async def fetch(): async with aiohttp.ClientSession() as session: async with session.get('http://httpbin.org/user-agent', headers=headers) as response: print(await response.text()) loop = asyncio.get_event_loop() tasks = [fetch(), ] loop.run_until_complete(asyncio.wait(tasks))
- 代码运行结果:

4. 请求头中自定义cookies
import aiohttp import asyncio url = 'http://httpbin.org/cookies' cookies = {
'cookies_name': 'test_cookies'} async def fetch(): async with aiohttp.ClientSession() as session: async with session.get(url, cookies=cookies) as response: print(await response.text()) loop = asyncio.get_event_loop() tasks = [fetch(), ] loop.run_until_complete(asyncio.wait(tasks))
5. post字符串
url = 'http://httpbin.org' payload = {
'username': 'zhang', 'password': ''} async def fetch(): async with aiohttp.ClientSession() as session: async with session.post(url, data=payload) as response: print(await response.text()) loop = asyncio.get_event_loop() tasks = [fetch(), ] loop.run_until_complete(asyncio.wait(tasks))
6. post文件
url = 'http://httpbin.org' files = {
'file': open('test.txt', 'rb')} async def fetch(): async with aiohttp.ClientSession() as session: async with session.post(url, data=files) as response: print(await response.text()) loop = asyncio.get_event_loop() tasks = [fetch(), ] loop.run_until_complete(asyncio.wait(tasks))
7. 设置代理
url = "http://python.org" async def fetch(): async with aiohttp.ClientSession() as session: async with session.get(url, proxy="http://some.proxy.com") as response: print(response.status) loop = asyncio.get_event_loop() tasks = [fetch(), ] loop.run_until_complete(asyncio.wait(tasks))
8. 设置认证代理
url = "http://python.org" async def fetch(): async with aiohttp.ClientSession() as session: proxy_auth = aiohttp.BasicAuth('user', 'pass') async with session.get(url, proxy="http://some.proxy.com", proxy_auth=proxy_auth) as response: print(response.status) loop = asyncio.get_event_loop() tasks = [fetch(), ] loop.run_until_complete(asyncio.wait(tasks)) # 下面的方法也可以 url = "http://python.org" async def fetch(): async with aiohttp.ClientSession() as session: async with session.get(url, proxy="http://user:") as response: print(response.status) loop = asyncio.get_event_loop() tasks = [fetch(), ] loop.run_until_complete(asyncio.wait(tasks))
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/218488.html原文链接:https://javaforall.net
