使用 Scrapy 爬取内容,使用 Pipeline 将处理后内容 POST 到远程。
使用request
是同步的会有阻塞。
使用scrapy.FormRequest
受限于全局DOWNLOAD_DELAY
限制,每次 POST 都会有延迟。
使用 treq ,不确定如何获取内容
async def process_post(self, url, data):
req = treq.post(url, data=data)
res = await deferred_to_future(req)
return res
如何获取 res 的内容,打印是<treq.response._Response 200 'text/html; charset=UTF-8' unknown size>
1
encro 2023-12-16 13:46:47 +08:00 1
不看源码?
看 treq.response._Response 的源码啊!!! |
3
kekeones OP |
4
ayugesheng 2023-12-27 17:56:19 +08:00
@kekeones 既然都 async 了,推荐直接 aiomysql + aiohttp ,给出一个 aiohttp 的 pipeline 示例:
import asyncio import aiohttp from scrapy.utils.defer import deferred_from_coro class DemoPipeline: def __init__(self) -> None: # 一些参数初始化 pass def open_spider(self, spider): # 这里可以写入一些非 async 的预备操作,把比如默认参数设置和日志配置等 return deferred_from_coro(self._open_spider(spider)) async def _open_spider(self, spider): # 这里一般是连接池,async 连接等预备操作 await asyncio.sleep(0.1) async def process_item(self, item, spider): # 这里可以使用一些 async 存储库来实现存储逻辑 ... # 看你想 post 到 data 还是 form # post_data = json.dumps('{"content": "test"}') post_data = {"content": "test"} async with aiohttp.ClientSession() as session: async with session.post( "http://httpbin.org/post", data=post_data ) as additional_response: # 获取响应内容 additional_data = await additional_response.text() print("additional_data:", additional_data) return item async def _close_spider(self): # 这里一般是 async 连接或连接池关闭逻辑 await asyncio.sleep(0.1) def close_spider(self, spider): return deferred_from_coro(self._close_spider()) 注意: 使用以上代码时,需要在 settings.py 中或者 custom_settings 中配置 "TWISTED_REACTOR": "twisted.internet.asyncioreactor.AsyncioSelectorReactor" |
5
ayugesheng 2023-12-27 17:58:27 +08:00
@kekeones 推荐直接 aiomysql + aiohttp ,给出一个 aiohttp 的 pipeline 示例:
``` import asyncio import aiohttp from scrapy.utils.defer import deferred_from_coro class DemoPipeline: def __init__(self) -> None: # 一些参数初始化 pass def open_spider(self, spider): # 这里可以写入一些非 async 的预备操作,把比如默认参数设置和日志配置等 return deferred_from_coro(self._open_spider(spider)) async def _open_spider(self, spider): # 这里一般是连接池,async 连接等预备操作 await asyncio.sleep(0.1) async def process_item(self, item, spider): # 这里可以使用一些 async 存储库来实现存储逻辑 ... # 看你想 post 到 data 还是 form # post_data = json.dumps('{"content": "test"}') post_data = {"content": "test"} async with aiohttp.ClientSession() as session: async with session.post( "http://httpbin.org/post", data=post_data ) as additional_response: # 获取响应内容 additional_data = await additional_response.text() print("additional_data:", additional_data) return item async def _close_spider(self): # 这里一般是 async 连接或连接池关闭逻辑 await asyncio.sleep(0.1) def close_spider(self, spider): return deferred_from_coro(self._close_spider()) ``` 注意: 使用以上代码时,需要在 settings.py 中或者 custom_settings 中配置 "TWISTED_REACTOR": "twisted.internet.asyncioreactor.AsyncioSelectorReactor" 以上代码乱了,无语,重发一次。 |
6
ayugesheng 2023-12-27 18:00:06 +08:00
不好意思,v2ex 是不支持 markdown 吗,不怎么在论坛发东西。
|
7
kekeones OP 好的,谢谢了哈,后面用了 treq 来处理了。
|