爬虫工具_应用程序market

爬虫工具_应用程序market一个简单的异步爬虫.私信太多,统一回答一下:关于异步函数的:1.真正派发任务的是consumer这个coroutine,所以也在内部做了并发控制.2.process_content用于获取html及保存到mysql.关于异步相关(asyncio)的:1.await相当于yieldfrom.2.await后面是一个coroutine,…

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全家桶1年46,售后保障稳定

一个简单的异步爬虫. 

私信太多,统一回答一下:

关于异步函数的:

1. 真正派发任务的是  consumer 这个coroutine,所以也在内部做了并发控制. 

2. process_content 用于获取html及保存到mysql.

关于异步相关(asyncio)的 :

1.await 相当于 yield from . 

 

2.await 后面是一个coroutine, 普通函数不是coroutine,普通函数也不是通过加一个 async   / asyncio.coroutine,就能真正

成为coroutine的,就算没报错,如果内部加了阻塞函数(time.sleep / read /write ) 还是一个阻塞函数;因此,往往我们自己的

coroutine只是一个中间层的东西,所以需要aiohttp , aiomysql等这个模块来提供支持,就跟tornado的异步框架一样,如果你

在get()/post() 中加了阻塞函数调用,tornado 还是阻塞的.

 

3. 这个问题问的有点多 . 

1. await 后面可以是 Task,Future,async def xxx() ( 自定义函数) ,因此在加入loop 时,将自动封装我们自定义的coroutine成为一个 Task / Future 对象.

* 2. 无论是 await 还是  asyncio.ensure_future/async , 都将把coroutine加入loop中,但是两者有一个差别:

      await 是等待这个coroutine 运行完成, ensure_future/async 不会等待直接仍入循环体中运行. 

      这个就是下面代码中一会用await 一会用ensure_future/async .同样的也是很多人问,

      为什么在控制 coroutine 的时候: async with sem 后面用ensure_future 控制不了的原因

 

3.额外说明一下Future对象. 下面代码中没有自行创建Future对象, 其实也可以用, Future对象与Twisted中的

Defer对象极其类似. 只不过Future 对象一般都是 await Future, 当然也可以使用回调: Future.add_done_callback.

而Defer对象是使用2个回调链的方式.具体可参考我写的:Twisted

这2个对象都在Future.set_result  /  Defered.callbacks  “返回”执行 await Future之后的代码 / Deferred 中的无穷无尽的callbacks.

关于Future对象的运行流程可看官方文档或Tornado中的 AsyncHTTPClient.fetch.

下面给一个Tornado中使用Future对象的案例 :  注意,没有使用过Tornado协程的别看了, Tornado旧版本的协程有些误导 

1.第一个使用Future对象的Tornado案例

class cor(RequestHandler):
   @gen.coroutine  #tornado 装饰器
   def get(self):
        #创建一个异步客户端
        clt = AsyncHTTPClient()
        #fetch 将返回一个Future对象 . yield 这个Future,直到set_result被调用
        res = yield clt.fetch('http://www.baidu.com')
        print('fetch : ' , res)
        self.write(res.body)

Jetbrains全家桶1年46,售后保障稳定

2.这个案例比较容易理解, 使用了新的语法,与下面爬虫的协程语法一致

class cor(RequestHandler):
   async def get(self): #用了新的语法
        clt = AsyncHTTPClient()

        #使用了await Future,直到 Future.set_result 被调用
        res = await clt.fetch('http://www.baidu.com')
        print('fetch : ' , res)
        self.write(res.body)

 

 

分割线:

 

import asyncio
import aiohttp
from lxml import etree
import aiomysql
import re


#一些全局变量
start_url = "http://www.jobbole.com/"
waitting_urls = None
visited_urls = set()
failed_urls = set()
stop_fetch = False
USER_AGENT_LIST = [
    'MSIE (MSIE 6.0; X11; Linux; i686) Opera 7.23',
    'Opera/9.20 (Macintosh; Intel Mac OS X; U; en)',
    'Opera/9.0 (Macintosh; PPC Mac OS X; U; en)',
    'iTunes/9.0.3 (Macintosh; U; Intel Mac OS X 10_6_2; en-ca)',
    'Mozilla/4.76 [en_jp] (X11; U; SunOS 5.8 sun4u)',
    'iTunes/4.2 (Macintosh; U; PPC Mac OS X 10.2)',
    'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.6; rv:5.0) Gecko/20100101 Firefox/5.0',
    'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.6; rv:9.0) Gecko/20100101 Firefox/9.0',
    'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.8; rv:16.0) Gecko/20120813 Firefox/16.0',
    'Mozilla/4.77 [en] (X11; I; IRIX;64 6.5 IP30)',
    'Mozilla/4.8 [en] (X11; U; SunOS; 5.7 sun4u)',
    "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.101 Safari/537.36"
]

import random

#控制corroutine数量
sem = asyncio.Semaphore(4)

#获取html
async def fetch(session, url):
    headers = {'User-Agent' : random.choice(USER_AGENT_LIST)}
    try:
        #单一一个站点用一个会话就行了
        async with session.get(url,headers=headers) as r:
            
            #已经访问过的url
            visited_urls.add(url)
            print("url:{} status:{}".format(url,r.status))
            
            #获取结果,用 await / yield from (如果用asyncio.coroutine)
            text = await r.text(encoding='utf8')
            return text
    except Exception as e:
        print('url:{} failed')
        failed_urls.add(url)
        return None


#获取mysql pool . 自己看文档把,文档里写的很清楚
async def get_mysql_pool(**settings):
    pool = await aiomysql.create_pool(**settings)
    return pool


#获取href属性
def extract_urls(html):
    htmlobj = etree.HTML(html)
    return set(htmlobj.xpath('//a[contains(@href,"http")]/@href'))

#获取标题
def get_title(html):
    htmlobj = etree.HTML(html)
    title = ''.join(htmlobj.xpath('//div[@class="entry-header"]/h1/text()'))
    return title

#处理数据,保存到mysql
async  def process_content(session,url,pool):
    
    #获取html
    html = await fetch(session, url)

    #深度控制,我这就获取一层
    # urls = extract_urls(html) 深度控制,这里不做了
    sql = 'insert into jobbole(title) VALUES(%s)'
    title = get_title(html)

    #获取pool中的连接
    async with pool.acquire() as conn:
        #拿游标
        async with conn.cursor() as cur:
            try:
                await cur.execute(sql,(title))
                await conn.commit()
            except Exception as e:
                await conn.rollback()
                print(e)


#额,这个名字随意取了一个,相当于消费者.申明啊,这个函数我就随意一写,没调式过.
async def consumer(session,pool):

    #只要waitting_urls不为空,就拿出来,去获取内容
    while not stop_fetch:
        while len(waitting_urls) == 0 and not stop_fetch:

            #如果空了,休息一下
            await asyncio.sleep(1)
            continue

        
        if len(waitting_urls) > 0 :
            url = waitting_urls.pop()

            #这里就拿个标题,只要符合的就ok.
            if re.match('https?://.*?jobbole.com/\d+/',url) and url not in visited_urls:

                #做一下控制并发
                async with sem:
        
                    #这行注释掉,否则就别用控制并发了
                    # asyncio.ensure_future(process_content(session,url,pool))
                    await process_content(session,url,pool)

#一开始的准备工作, 我这里只从首页拿所有的url
async def prepare_urls(session):
    global waitting_urls
    html = await fetch(session, start_url)
    urls = extract_urls(html)
    waitting_urls = {url for url in urls}
    print('waitinglist len:' , len(waitting_urls))


#一个检测函数
async def check_tasks():
    while True:
        print('当前运行corotine数量 :' , len(asyncio.Task.all_tasks()))
        await asyncio.sleep(0.5)


async def main(**settings):

    #创建sess
    session = aiohttp.ClientSession()
    
    #创建mysql pool
    pool = await get_mysql_pool(**settings)
    
    #开启检测
    asyncio.ensure_future(check_tasks())

    #准备首页的所有url
    await prepare_urls(session)
    
    #开始运行这个coroutine ,真正干活的
    asyncio.ensure_future(consumer(session,pool))




#关闭用的,这里没用到, 可以自行修改添加
async def close(session,pool):
    await session.close()
    pool.close()
    await pool.wait_closed()

#拿主循环
lp = asyncio.get_event_loop()

#mysql配置
db_mysql_settings = {
    'db' : 'testdb',
    'host' : '127.0.0.1',
    'port' : 3306,
    'user' : 'root',
    'password' : 'fuck',
    'loop' : lp,
    'charset' : 'utf8',
    'maxsize':50
}

#开始运行
asyncio.ensure_future(main(**db_mysql_settings))
lp.run_forever()

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/210149.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • 分布式系统常见问题总结[通俗易懂]

    分布式系统常见问题总结[通俗易懂]秒杀系统架构优化思路一、秒杀业务为什么难做1)im系统,例如qq或者微博,每个人都读自己的数据(好友列表、群列表、个人信息);2)微博系统,每个人读你关注的人的数据,一个人读多个人的数据;3)秒杀系统,库存只有一份,所有人会在集中的时间读和写这些数据,多个人读一个数据。例如:小米手机每周二的秒杀,可能手机只有1万部,但瞬时进入的流量可能是几百几千万。又例如:12306抢票,…

    2022年5月20日
    38
  • 决策树原理及其应用[通俗易懂]

    决策树原理及其应用[通俗易懂]决策树原理及其应用决策树的原理我们先构造一颗简单的决策树来玩一玩。举一个不恰当的例子:小明过年回家,老妈催着他结婚,帮着张罗相亲对象。有三个女孩的资料(简称A、B、C)。关于A:小明问:”身材好吗?”,妈妈说:“好!”,小明说:“见一面”关于B:小明问:”身材好吗?”,妈妈说:“不好!”,小明又问:“漂亮吗?”,妈妈说:“漂亮!”,小明说:“见一面”关于C:

    2022年9月7日
    3
  • 使用google earth engine(GEE)提取2000年到2019年长江下游水体(河流、湖泊)数据[通俗易懂]

    使用google earth engine(GEE)提取2000年到2019年长江下游水体(河流、湖泊)数据[通俗易懂]我最近想通过GEE直接统计长时间序列下长江下游流域的水体面积变化情况,如果通过传统做法很复杂,于是想到了使用GEE。下面是提取水体的效果图,时间是2000年的,你也可以设置显示2000-2019年中任意一年的水体数据。代码链接:https://code.earthengine.google.com/2440b9511ba0c1cefaf926c7c47e5ea2读者可以先通过代码看看效果,下面我说一下最主要的思路:1.数据源的选择;2.数据的时间、地点筛选;3.水体的标准设置;4

    2022年9月2日
    3
  • 画廊效果的ViewPager实现(附带无限自动轮播)[通俗易懂]

    画廊效果的ViewPager实现(附带无限自动轮播)[通俗易懂]废话不多说,先上效果图根据效果所示,第一步实现适配器,完成无限循环首先做数据上的处理publicstaticclassLoopViewPagerAdapterextendsPagerAdapter{ …..LoopViewPagerAdapter(Contextcontext,ArrayList<Integer>imgIds){…

    2022年6月12日
    34
  • 小数乘法计算题100道_leetcode题库c语言

    小数乘法计算题100道_leetcode题库c语言LeetCode算法题-Binary Tree Level Order Traversal II(Java实现)

    2022年4月20日
    69
  • navicat激活码【2021.7最新】

    (navicat激活码)本文适用于JetBrains家族所有ide,包括IntelliJidea,phpstorm,webstorm,pycharm,datagrip等。IntelliJ2021最新激活注册码,破解教程可免费永久激活,亲测有效,下面是详细链接哦~https://javaforall.net/ide…

    2022年3月22日
    80

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号