获取新浪微博1000w用户及最近50条微博的Python脚本
2012-09-12 09:22:47 来源:我爱运维网 评论:0 点击:
功能是:获取新浪微博1000w用户的基本信息和每个爬取用户最近发表的50条微博,使用python编写,多进程爬取,将数据存储在了mongodb中[代码][P...
功能是:获取新浪微博1000w用户的基本信息和每个爬取用户最近发表的50条微博,使用python编写,多进程爬取,将数据存储在了mongodb中
[代码] [Python]代码
| 001 | #!/usr/bin/python |
| 002 | #-*-coding:utf8-*- |
| 003 |
| 004 | from pprint import pprint |
| 005 | from weibopy.auth import OAuthHandler |
| 006 | from weibopy.api import API |
| 007 | from weibopy.binder import bind_api |
| 008 | from weibopy.error import WeibopError |
| 009 | import time,os,pickle,sys |
| 010 | import logging.config |
| 011 | from multiprocessing import Process |
| 012 | from pymongo import Connection |
| 013 |
| 014 |
| 015 | mongo_addr = 'localhost' |
| 016 | mongo_port = 27017 |
| 017 | db_name = 'weibo' |
| 018 |
| 019 | class Sina_reptile(): |
| 020 | """ |
| 021 | 爬取sina微博数据 |
| 022 | """ |
| 023 |
| 024 | def __init__(self,consumer_key,consumer_secret): |
| 025 | self.consumer_key,self.consumer_secret = consumer_key,consumer_secret |
| 026 | self.connection = Connection(mongo_addr,mongo_port) |
| 027 | self.db = self.connection[db_name] |
| 028 | self.collection_userprofile = self.db['userprofile'] |
| 029 | self.collection_statuses = self.db['statuses'] |
| 030 |
| 031 | def getAtt(self, key): |
| 032 | try: |
| 033 | return self.obj.__getattribute__(key) |
| 034 | except Exception, e: |
| 035 | print e |
| 036 | return '' |
| 037 |
| 038 | def getAttValue(self, obj, key): |
| 039 | try: |
| 040 | return obj.__getattribute__(key) |
| 041 | except Exception, e: |
| 042 | print e |
| 043 | return '' |
| 044 |
| 045 | def auth(self): |
| 046 | """ |
| 047 | 用于获取sina微博 access_token 和access_secret |
| 048 | """ |
| 049 | if len(self.consumer_key) == 0: |
| 050 | print "Please set consumer_key" |
| 051 | return |
| 052 |
| 053 | if len(self.consumer_key) == 0: |
| 054 | print "Please set consumer_secret" |
| 055 | return |
| 056 |
| 057 | self.auth = OAuthHandler(self.consumer_key, self.consumer_secret) |
| 058 | auth_url = self.auth.get_authorization_url() |
| 059 | print 'Please authorize: ' + auth_url |
| 060 | verifier = raw_input('PIN: ').strip() |
| 061 | self.auth.get_access_token(verifier) |
| 062 | self.api = API(self.auth) |
| 063 |
| 064 | def setToken(self, token, tokenSecret): |
| 065 | """ |
| 066 | 通过oauth协议以便能获取sina微博数据 |
| 067 | """ |
| 068 | self.auth = OAuthHandler(self.consumer_key, self.consumer_secret) |
| 069 | self.auth.setToken(token, tokenSecret) |
| 070 | self.api = API(self.auth) |
| 071 |
| 072 | def get_userprofile(self,id): |
| 073 | """ |
| 074 | 获取用户基本信息 |
| 075 | """ |
| 076 | try: |
| 077 | userprofile = {} |
| 078 | userprofile['id'] = id |
| 079 | user = self.api.get_user(id) |
| 080 | self.obj = user |
| 081 |
| 082 | userprofile['screen_name'] = self.getAtt("screen_name") |
| 083 | userprofile['name'] = self.getAtt("name") |
| 084 | userprofile['province'] = self.getAtt("province") |
| 085 | userprofile['city'] = self.getAtt("city") |
| 086 | userprofile['location'] = self.getAtt("location") |
| 087 | userprofile['description'] = self.getAtt("description") |
| 088 | userprofile['url'] = self.getAtt("url") |
| 089 | userprofile['profile_image_url'] = self.getAtt("profile_image_url") |
| 090 | userprofile['domain'] = self.getAtt("domain") |
| 091 | userprofile['gender'] = self.getAtt("gender") |
| 092 | userprofile['followers_count'] = self.getAtt("followers_count") |
| 093 | userprofile['friends_count'] = self.getAtt("friends_count") |
| 094 | userprofile['statuses_count'] = self.getAtt("statuses_count") |
| 095 | userprofile['favourites_count'] = self.getAtt("favourites_count") |
| 096 | userprofile['created_at'] = self.getAtt("created_at") |
| 097 | userprofile['following'] = self.getAtt("following") |
| 098 | userprofile['allow_all_act_msg'] = self.getAtt("allow_all_act_msg") |
| 099 | userprofile['geo_enabled'] = self.getAtt("geo_enabled") |
| 100 | userprofile['verified'] = self.getAtt("verified") |
| 101 |
| 102 | # for i in userprofile: |
| 103 | # print type(i),type(userprofile[i]) |
| 104 | # print i,userprofile[i] |
| 105 | # |
| 106 |
| 107 | except WeibopError, e: #捕获到的WeibopError错误的详细原因会被放置在对象e中 |
| 108 | print "error occured when access userprofile use user_id:",id |
| 109 | print "Error:",e |
| 110 | log.error("Error occured when access userprofile use user_id:{0}\nError:{1}".format(id, e),exc_info=sys.exc_info()) |
| 111 | return None |
| 112 |
| 113 | return userprofile |
| 114 |
| 115 | def get_specific_weibo(self,id): |
| 116 | """ |
| 117 | 获取用户最近发表的50条微博 |
| 118 | """ |
| 119 | statusprofile = {} |
| 120 | statusprofile['id'] = id |
| 121 | try: |
| 122 | #重新绑定get_status函数 |
| 123 | get_status = bind_api( path = '/statuses/show/{id}.json', |
| 124 | payload_type = 'status', |
| 125 | allowed_param = ['id']) |
| 126 | except: |
| 127 | return "**绑定错误**" |
| 128 | status = get_status(self.api,id) |
| 129 | self.obj = status |
| 130 | statusprofile['created_at'] = self.getAtt("created_at") |
| 131 | statusprofile['text'] = self.getAtt("text") |
| 132 | statusprofile['source'] = self.getAtt("source") |
| 133 | statusprofile['favorited'] = self.getAtt("favorited") |
| 134 | statusprofile['truncated'] = self.getAtt("ntruncatedame") |
| 135 | statusprofile['in_reply_to_status_id'] =self.getAtt("in_reply_to_status_id") |
| 136 | statusprofile['in_reply_to_user_id'] =self.getAtt("in_reply_to_user_id") |
| 137 | statusprofile['in_reply_to_screen_name'] =self.getAtt("in_reply_to_screen_name") |
| 138 | statusprofile['thumbnail_pic'] = self.getAtt("thumbnail_pic") |
| 139 | statusprofile['bmiddle_pic'] = self.getAtt("bmiddle_pic") |
| 140 | statusprofile['original_pic'] = self.getAtt("original_pic") |
| 141 | statusprofile['geo'] = self.getAtt("geo") |
| 142 | statusprofile['mid'] = self.getAtt("mid") |
| 143 | statusprofile['retweeted_status'] = self.getAtt("retweeted_status") |
| 144 | return statusprofile |
| 145 |
| 146 | def get_latest_weibo(self,user_id,count): |
| 147 | """ |
| 148 | 获取用户最新发表的count条数据 |
| 149 | """ |
| 150 | statuses,statusprofile = [],{} |
| 151 | try: #error occur in the SDK |
| 152 | timeline = self.api.user_timeline(count=count, user_id=user_id) |
| 153 | except Exception as e: |
| 154 | print "error occured when access status use user_id:",user_id |
| 155 | print "Error:",e |
| 156 | log.error("Error occured when access status use user_id:{0}\nError:{1}".format(user_id, e),exc_info=sys.exc_info()) |
| 157 | return None |
| 158 | for line in timeline: |
| 159 | self.obj = line |
| 160 | statusprofile['usr_id'] = user_id |
| 161 | statusprofile['id'] = self.getAtt("id") |
| 162 | statusprofile['created_at'] = self.getAtt("created_at") |
| 163 | statusprofile['text'] = self.getAtt("text") |
| 164 | statusprofile['source'] = self.getAtt("source") |
| 165 | statusprofile['favorited'] = self.getAtt("favorited") |
| 166 | statusprofile['truncated'] = self.getAtt("ntruncatedame") |
| 167 | statusprofile['in_reply_to_status_id'] =self.getAtt("in_reply_to_status_id") |
| 168 | statusprofile['in_reply_to_user_id'] =self.getAtt("in_reply_to_user_id") |
| 169 | statusprofile['in_reply_to_screen_name'] =self.getAtt("in_reply_to_screen_name") |
| 170 | statusprofile['thumbnail_pic'] = self.getAtt("thumbnail_pic") |
| 171 | statusprofile['bmiddle_pic'] = self.getAtt("bmiddle_pic") |
| 172 | statusprofile['original_pic'] = self.getAtt("original_pic") |
| 173 | statusprofile['geo'] =repr(pickle.dumps(self.getAtt("geo"),pickle.HIGHEST_PROTOCOL)) |
| 174 | statusprofile['mid'] = self.getAtt("mid") |
| 175 | statusprofile['retweeted_status'] =repr(pickle.dumps(self.getAtt("retweeted_status"),pickle.HIGHEST_PROTOCOL)) |
| 176 | statuses.append(statusprofile) |
| 177 |
| 178 | return statuses |
| 179 |
| 180 | def friends_ids(self,id): |
| 181 | """ |
| 182 | 获取用户关注列表id |
| 183 | """ |
| 184 | next_cursor,cursor = 1,0 |
| 185 | ids = [] |
| 186 | while(0!=next_cursor): |
| 187 | fids = self.api.friends_ids(user_id=id,cursor=cursor) |
| 188 | self.obj = fids |
| 189 | ids.extend(self.getAtt("ids")) |
| 190 | cursor = next_cursor = self.getAtt("next_cursor") |
| 191 | previous_cursor = self.getAtt("previous_cursor") |
| 192 | return ids |
| 193 |
| 194 | def manage_access(self): |
| 195 | """ |
| 196 | 管理应用访问API速度,适时进行沉睡 |
| 197 | """ |
| 198 | info = self.api.rate_limit_status() |
| 199 | self.obj = info |
| 200 | sleep_time = round( (float)(self.getAtt("reset_time_in_seconds"))/self.getAtt("remaining_hits"),2 ) ifself.getAtt("remaining_hits") else self.getAtt("reset_time_in_seconds") |
| 201 | printself.getAtt("remaining_hits"),self.getAtt("reset_time_in_seconds"),self.getAtt("hourly_limit"),self.getAtt("reset_time") |
| 202 | print "sleep time:",sleep_time,'pid:',os.getpid() |
| 203 | time.sleep(sleep_time + 1.5) |
| 204 |
| 205 | def save_data(self,userprofile,statuses): |
| 206 | self.collection_statuses.insert(statuses) |
| 207 | self.collection_userprofile.insert(userprofile) |
| 208 |
| 209 | def reptile(sina_reptile,userid): |
| 210 | ids_num,ids,new_ids,return_ids = 1,[userid],[userid],[] |
| 211 | while(ids_num <= 10000000): |
| 212 | next_ids = [] |
| 213 | for id in new_ids: |
| 214 | try: |
| 215 | sina_reptile.manage_access() |
| 216 | return_ids = sina_reptile.friends_ids(id) |
| 217 | ids.extend(return_ids) |
| 218 | userprofile = sina_reptile.get_userprofile(id) |
| 219 | statuses = sina_reptile.get_latest_weibo(count=50, user_id=id) |
| 220 | if statuses is None or userprofile is None: |
| 221 | continue |
| 222 | sina_reptile.save_data(userprofile,statuses) |
| 223 | except Exception as e: |
| 224 | log.error("Error occured in reptile,id:{0}\nError:{1}".format(id, e),exc_info=sys.exc_info()) |
| 225 | time.sleep(60) |
| 226 | continue |
| 227 | ids_num+=1 |
| 228 | print ids_num |
| 229 | if(ids_num >= 10000000):break |
| 230 | next_ids.extend(return_ids) |
| 231 | next_ids,new_ids = new_ids,next_ids |
| 232 |
| 233 | def run_crawler(consumer_key,consumer_secret,key,secret,userid): |
| 234 | try: |
| 235 | sina_reptile = Sina_reptile(consumer_key,consumer_secret) |
| 236 | sina_reptile.setToken(key, secret) |
| 237 | reptile(sina_reptile,userid) |
| 238 | sina_reptile.connection.close() |
| 239 | except Exception as e: |
| 240 | print e |
| 241 | log.error("Error occured in run_crawler,pid:{1}\nError:{2}".format(os.getpid(), e),exc_info=sys.exc_info()) |
| 242 |
| 243 | if __name__ == "__main__": |
| 244 | logging.config.fileConfig("logging.conf") |
| 245 | log = logging.getLogger('logger_sina_reptile') |
| 246 | with open('test.txt') as f: |
| 247 | for i in f.readlines(): |
| 248 | j = i.strip().split(' ') |
| 249 | p = Process(target=run_crawler, args=(j[0],j[1],j[2],j[3],j[4])) |
| 250 | p.start() |
上一篇:用python程序连接hive
下一篇:数据需求统计常用shell命令
分享到:
收藏
评论排行
- ·Windows(Win7)下用Xming...(92)
- ·使用jmx client监控activemq(20)
- ·Hive查询OOM分析(14)
- ·复杂网络架构导致的诡异...(8)
- ·使用 OpenStack 实现云...(7)
- ·影响Java EE性能的十大问题(6)
- ·云计算平台管理的三大利...(6)
- ·Mysql数据库复制延时分析(5)
- ·OpenStack Nova开发与测...(4)
- ·LTPP一键安装包1.2 发布(4)
- ·Linux下系统或服务排障的...(4)
- ·PHP发布5.4.4 和 5.3.1...(4)
- ·RSYSLOG搭建集中日志管理服务(4)
- ·转换程序源码的编码格式[...(3)
- ·Linux 的木马程式 Wirenet 出现(3)
- ·Nginx 发布1.2.1稳定版...(3)
- ·zend framework文件读取漏洞分析(3)
- ·Percona Playback 0.3 development release(3)
- ·运维业务与CMDB集成关系一例(3)
- ·应该知道的Linux技巧(3)




