1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
|
from scrapy.conf import settings
from scrapy.exceptions import DropItem
from twisted.enterprise import adbapi
import json
class DoubanmeinvPipeline(object):
feed_key = ['feedId','userId','createOn','title','thumbUrl','href','description','pics']
user_key = ['userId','name','avatar']
insertFeed_sql = '''insert into MeiziFeed (%s) values (%s)'''
insertUser_sql = '''insert into MeiziUser (%s) values (%s)'''
feed_query_sql = "select * from MeiziFeed where feedId = %s"
user_query_sql = "select * from MeiziUser where userId = %s"
feed_seen_sql = "select feedId from MeiziFeed"
user_seen_sql = "select userId from MeiziUser"
max_dropcount = 50
current_dropcount = 0
def __init__(self):
dbargs = settings.get('DB_CONNECT')
db_server = settings.get('DB_SERVER')
dbpool = adbapi.ConnectionPool(db_server,**dbargs)
self.dbpool = dbpool
d = self.dbpool.runInteraction(self.update_feed_seen_ids)
d.addErrback(self._database_error)
u = self.dbpool.runInteraction(self.update_user_seen_ids)
u.addErrback(self._database_error)
def __del__(self):
self.dbpool.close()
def update_feed_seen_ids(self, tx):
tx.execute(self.feed_seen_sql)
result = tx.fetchall()
if result:
self.feed_ids_seen = set([int(id[0]) for id in result])
else:
self.feed_ids_seen = set()
def update_user_seen_ids(self, tx):
tx.execute(self.user_seen_sql)
result = tx.fetchall()
if result:
self.user_ids_seen = set([int(id[0]) for id in result])
else:
self.user_ids_seen = set()
def process_item(self, item, spider):
query = self.dbpool.runInteraction(self._conditional_insert, item)
query.addErrback(self._database_error, item)
feedId = item['feedId']
if(int(feedId) in self.feed_ids_seen):
self.current_dropcount += 1
if(self.current_dropcount >= self.max_dropcount):
spider.close_down = True
raise DropItem("重复的数据:%s" % item['feedId'])
else:
return item
def _conditional_insert(self, tx, item):
tx.execute(self.feed_query_sql, (item['feedId']))
result = tx.fetchone()
if result == None:
self.insert_data(item,self.insertFeed_sql,self.feed_key)
else:
print "该feed已存在数据库中:%s" % item['feedId']
feedId = item['feedId']
if int(feedId) not in self.feed_ids_seen:
self.feed_ids_seen.add(int(feedId))
user = item['userInfo']
tx.execute(self.user_query_sql, (user['userId']))
user_result = tx.fetchone()
if user_result == None:
self.insert_data(user,self.insertUser_sql,self.user_key)
else:
print "该用户已存在数据库:%s" % user['userId']
userId = user['userId']
if int(userId) not in self.user_ids_seen:
self.user_ids_seen.add(int(userId))
def insert_data(self, item, insert, sql_key):
fields = u','.join(sql_key)
qm = u','.join([u'%s'] * len(sql_key))
sql = insert % (fields,qm)
data = [item[k] for k in sql_key]
return self.dbpool.runOperation(sql,data)
def _database_error(self, e):
print "Database error: ", e
|