Scrapy实用demo实例

搭建ip代理池(简易版)

推荐两个scrapy代理的项目

第一个是免费的代理插件,无需付费
https://github.com/aivarsk/scrapy-proxies

第二个是需要付费的代理插件
https://github.com/scrapy-plugins/scrapy-crawlera

撸视频的时候学到的代理池实例

获取西刺代理的代理列表并存入mysql数据库:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def crawl_xici():
headers = {"User-Agent":"Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36"}

for i in range(3411):
res = requests.get('http://www.xicidaili.com/nn/{}'.format(i), headers = headers)
ip_list = []
selector = Selector(text=res.text)
all_trs = selector.css("#ip_list tr")

for tr in all_trs[1:]:
speed_str = tr.css(".bar::attr(title)").extract()[0]
if speed_str:
speed = float(speed_str.split("秒")[0])

all_texts = tr.css("td::text").extract()

ip = all_texts[0]
port = all_texts[1]
proxy_type = all_texts[5]
ip_list.append((ip,port,proxy_type,speed))
# print(ip_list)

# for ip_info in ip_list:
# cursor.execute(
# "insert into proxy_ip(ip, port, speed, proxy_type) VALUES ('{0}', '{1}', {2}, '{3}')".format(
# ip_info[0], ip_info[1], ip_info[3], ip_info[2]
# )
# )
# conn.commit()
for ip_info in ip_list:
insert_sql = """
insert into proxy_ip(ip, port, speed, proxy_type)
VALUES (%s, %s, %s, %s) ON DUPLICATE KEY UPDATE ip=VALUES(ip), port=VALUES(port), speed=VALUES(speed), proxy_type=VALUES(proxy_type)
"""
params = (ip_info[0], ip_info[1], ip_info[3], ip_info[2])
cursor.execute(insert_sql,params)
conn.commit()
# print("入库成功")

定义随机获取ip的类方法(包括删除无效代理)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
class GetIP(object):
# 从数据库删除ip
def delete_ip(self, ip):
delete_sql = """
delete from proxy_ip WHERE ip={0}
""".format(ip)
cursor.execute(delete_sql)
conn.commit()
print("删除成功")
return True

# 验证ip
def judge_ip(self, ip, port):
http_url = "http://www.baidu.com"
proxy_url = "http://{0}:{1}".format(ip, port)
try:
proxy_dict = {
"http":proxy_url
}
res = requests.get(http_url, proxies=proxy_dict)
except Exception as e:
print("invalid ip and port")
self.delete_ip(ip)
return False
else:
code = res.status_code
if code >= 200 and code < 300:
print("effective ip")
return True
else:
print("invalid ip and port")
self.delete_ip(ip)
return False


# 从数据库获取随机ip
def get_random_ip(self):
select_sql = """
SELECT ip,port from proxy_ip ORDER BY RAND() LIMIT 1
"""
result = cursor.execute(select_sql)
for ip_info in cursor.fetchall():
ip = ip_info[0]
port = ip_info[1]
judge_re = self.judge_ip(ip,port)
if judge_re:
return "http://{0}:{1}".format(ip, port)
else:
return self.get_random_ip()



# crawl_xici()
if __name__ == '__main__':
get_ip = GetIP()
get_ip.get_random_ip()

写入middleware文件中

1
2
3
4
5
6
7
# 使用前要记得在setting中添加RadomProxyMiddleware
from tools.crawl_xici_ip import GetIP
# 随机ip代理
class RadomProxyMiddleware(object):
def process_request(self, request, spider):
get_ip = GetIP()
request.meta['proxy'] = get_ip.get_random_ip()

自定义pipline的使用实例

pipline存储json(自定义json存储)

1
2
3
4
5
6
7
8
9
10
11
import codecs
class JsonWithEncodingPipeline(object):
#自定义json文件的导出
def __init__(self):
self.file = codecs.open('article.json', 'w', encoding="utf-8")
def process_item(self, item, spider):
lines = json.dumps(dict(item), ensure_ascii=False) + "\n"
self.file.write(lines)
return item
def spider_closed(self, spider):
self.file.close()

pipline存储json(使用scrapy自带的组件)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from scrapy.exporters import JsonItemExporter
class JsonExporterPipleline(object):
#调用scrapy提供的json export导出json文件
def __init__(self):
self.file = open('articleexport.json', 'wb')
self.exporter = JsonItemExporter(self.file, encoding="utf-8", ensure_ascii=False)
self.exporter.start_exporting()

def close_spider(self, spider):
self.exporter.finish_exporting()
self.file.close()

def process_item(self, item, spider):
self.exporter.export_item(item)
return item

pipline中的存储mysql(阻塞)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import MySQLdb
import MySQLdb.cursors
class MysqlPipeline(object):
#采用同步的机制写入mysql
def __init__(self):
self.conn = MySQLdb.connect('192.168.0.106', 'root', 'root', 'article_spider', charset="utf8", use_unicode=True)
self.cursor = self.conn.cursor()

def process_item(self, item, spider):
insert_sql = """
insert into jobbole_article(title, url, create_date, fav_nums)
VALUES (%s, %s, %s, %s)
"""
self.cursor.execute(insert_sql, (item["title"], item["url"], item["create_date"], item["fav_nums"]))
self.conn.commit()

pipline中存储mysql(异步)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class MysqlTwistedPipline(object):
def __init__(self, dbpool):
self.dbpool = dbpool

@classmethod
def from_settings(cls, settings):
dbparms = dict(
host = settings["MYSQL_HOST"],
db = settings["MYSQL_DBNAME"],
user = settings["MYSQL_USER"],
passwd = settings["MYSQL_PASSWORD"],
charset='utf8',
cursorclass=MySQLdb.cursors.DictCursor,
use_unicode=True,
)
dbpool = adbapi.ConnectionPool("MySQLdb", **dbparms)

return cls(dbpool)

def process_item(self, item, spider):
#使用twisted将mysql插入变成异步执行
query = self.dbpool.runInteraction(self.do_insert, item)
query.addErrback(self.handle_error, item, spider) #处理异常

def handle_error(self, failure, item, spider):
# 处理异步插入的异常
print (failure)


def do_insert(self, cursor, item):
insert_sql = """
insert into jobbole_article(title, url, create_date, fav_nums)
VALUES (%s, %s, %s, %s)
"""
cursor.execute(insert_sql, (item["title"], item["url"], item["create_date"], item["fav_nums"]))

# 想使用下面的插入方法需要在item中定义insert_sql
# def do_insert(self, cursor, item):
# #执行具体的插入
# #根据不同的item 构建不同的sql语句并插入到mysql中
# insert_sql, params = item.get_insert_sql()
# print (insert_sql, params)
# cursor.execute(insert_sql, params)

如何在scrapy中随机切换UA?

随机UA下载中间件(初始版)

setting文件:

1
2
3
DOWNLOADER_MIDDLEWARES = {
'ArticleSpider.middlewares.RandomUserAgentMiddleware': 543,
}

middlewares文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from fake_useragent import UserAgent
class RandomUserAgentMiddleware(object):
def __init__(self, crawler):
super(RandomUserAgentMiddleware, self).__init__()
self.ua = UserAgent()
self.ua_type = crawler.settings.get('RANDOM_UA_TYPE', 'random')

@classmethod
def from_crawler(cls, crawler):
return cls(crawler)
def process_request(self, request, spider):
def get_ua():
return getattr(self.ua, self.ua_type)
request.headers.setdefault('User-Agent', get_ua())

数据存错怎么办?

将redis数据库导入mongodb数据库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import json, redis, pymongo

def main():
# 指定Redis数据库信息
rediscli = redis.StrictRedis(host='127.0.0.1', port=6379, db=0)
# 指定MongoDB数据库信息
mongocli = pymongo.MongoClient(host='localhost', port=27017)
# 创建数据库名
db = mongocli['sina']
# 创建表名
sheet = db['sina_items']
offset = 0
while True:
# FIFO模式为 blpop,LIFO模式为 brpop,获取键值
source, data = rediscli.blpop(["sinainfospider_redis:items"])
item = json.loads(data.decode("utf-8"))
sheet.insert(item)
offset += 1
print(offset)
try:
print("Processing: %s " % item)
except KeyError:
print("Error procesing: %s" % item)

if __name__ == '__main__':
main()

将redis数据存入mysql数据库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import redis, json, time
from pymysql import connect

# redis数据库链接
redis_client = redis.StrictRedis(host="127.0.0.1", port=6379, db=0)
# mysql数据库链接
mysql_client = connect(host="127.0.0.1", user="root", password="mysql",
database="sina", port=3306, charset='utf8')
cursor = mysql_client.cursor()

i = 1
while True:
print(i)
time.sleep(1)
source, data = redis_client.blpop(["sinainfospider_redis:items"])
item = json.loads(data.decode())
print("source===========", source)
print("item===========", item)
sql = "insert into sina_items(parent_url,parent_title,sub_title,sub_url,sub_file_name,son_url,head,content,crawled,spider) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"
params = [item["parent_url"], item["parent_title"], item["sub_title"], item["sub_url"], item["sub_file_name"],
item["son_url"], item["head"], item["content"], item["crawled"], item["spider"], ]
cursor.execute(sql, params)
mysql_client.commit()
i += 1
煌金 wechat
扫描关注公众号,回复「1024」获取为你准备的特别推送~
  • 本文作者: 煌金 | 微信公众号【咸鱼学Python】
  • 本文链接: http://www.xianyucoder.cn/2018/11/29/scrapy-use-demo/
  • 版权声明: 本博客所有文章除特别声明外,均采用 许可协议。转载请注明出处!
  • 并保留本声明和上方二维码。感谢您的阅读和支持!