挖一个大坑,我是来观光的。
坑还没填完。
BOP 2016 SEMI Rank#11 团队 yaoyao 代码。
基于 Python 2.7 ,使用 flask 提供 Web API 终结点。665行的人生。
使用的库
- gevent :协程。
REST API 入口:cca
求解主入口:get_all_possible_ans
TOC
cca
[挖坑]
get_all_possible_ans
|
601 602 603 |
#@profile def get_all_possible_ans(Id1,Id2): entities = get_entity(expr='Or(Or(Or(Composite(AA.AuId=%s),Composite(AA.AuId=%s)),Id=%s),Id=%s)' % (Id1, Id2, Id1, Id2)) |
与 Elecky 类似,在查询 Id 类型的同时,将作者的所有论文全部下载下来,然后判断节点对的类型,分别使用 get_Id_Id 、 get_AuId_Id 、 get_Id_AuId 和 get_AuId_AuId 子过程解决问题。
get_entity
|
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
#@profile def get_entity(expr,attr='Id,AA.AuId,AA.AfId,J.JId,C.CId,RId,F.FId,CC',thl=[]): t_l = [] q = Queue.Queue() entities = [] pe_l = [] try: for i in xrange(2): t_l.append(threading.Thread(target=get_entity_one,args=(expr,q,i,attr))) t_l[i].start() gevent.joinall(thl) ans = q.get() entities = json.loads(ans)['entities'] except: para['expr'] = expr para['attributes'] = attr ans = get_ans_http(para) entities = json.loads(ans)['entities'] for en in entities: pe = Paper_Entity(en) pe_l.append(pe) return pe_l #@profile def get_entity_one(expr,q,i,attr='Id,AA.AuId,AA.AfId,J.JId,C.CId,RId,F.FId,CC'): para['expr'] = expr para['attributes'] = attr ans = get_ans(para,i) q.put(ans) |
#167:这就是“同一查询,两次请求”的实现。注意到 get_entity_one 会将下载下来的论文信息放到 q 中。
#170:只是为了等待其它的 greenlet 。目前使用的都是默认值(空)。
#171:注意到线程安全队列的 Queue.get 在队列为空时会阻塞调用方。此处十分巧妙地利用了这一特点,如果 t_l 中有任意一次请求先完成,那么阻塞就会解除, ans 马上被赋值。
加速的方法:3.一个语句同时查询两次(单次查询时间波动较大,两次查询同时进行,并且使用先返回的结果,会更稳定) [1]
随后就是 JSON 解析和返回了。
#176:可能作者也觉得强行 socket (参阅下文)有点丧心病狂,所以如果在用 socket 的时候出现任何问题,就回退到 get_ans_http ,使用 httplib.HTTPConnection (对应 Python 3 的 http.client.HTTPConnection )进行连接。
get_ans
|
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
def get_ans(para, i=0): global host, port, domain try: sock_c = s_q.get(block=False) except: sock_c = socket_class() sock = sock_c.get_sock() para = '&'.join(['='.join(item) for item in para.items()]) request_str = '''GET /%s?%s HTTP/1.1\r\nHost: %s\r\n\r\n''' %(domain, para, host) sock.send(request_str) total_data=[] #sock.settimeout(5) data = sock.recv(150) length = int(data.split('\n')[1].split(' ')[1]) header_len = len(data.split('\r\n\r\n')[0]) total_data.append(data[header_len+4:]) length = length-(len(data) - header_len - 4) while 1: if length>0: data = sock.recv(min(length,8192)) length -= len(data) total_data.append(data) else: break ans = ''.join(total_data) s_q.put(sock_c) return ans for i in range(total_sock): s_q.put(socket_class()) |
其中,para 是一个字典,包含了URL请求的查询属性和值;i 不知道是干什么的,可能是之前向控制台输出以区分 get_entity 的两次连接的。
#95:socket pool 不多说。
加速的方法: 1.socket 长连接 [1]
#99:将字典转换为完整的 URL 。
#101:好吧,强行 socket
requests > urllib2 >httplib > socket长连接 时间消耗依次减少 [1]
那么先把 HTTP 1.1 完整的请求和回复放在这里备用吧。例如
|
1 2 |
GET https://oxfordhk.azure-api.net/academic/v1.0/evaluate?expr=Or(Id=2147152072,Composite(AA.AuId=2147152072))&model=latest&count=2&offset=0&orderby=&attributes=Id,Y,AA.AuId,AA.AfId,F.FId,J.JId,C.CId,RId,CC&subscription-key=f7cc29509a8443c5b3a5e56b0e38b5a6 HTTP/1.1 Host: oxfordhk.azure-api.net |
|
1 2 3 4 5 6 7 8 9 10 11 |
HTTP/1.1 200 OK Content-Length: 683 Content-Type: application/json X-Powered-By: ASP.NET Date: Sat, 14 May 2016 07:32:12 GMT { "expr" : "Or(Id=2147152072,Composite(AA.AuId=2147152072))", "entities" : [{ "logprob":-15.440, "Id":2147152072, "Y":1990, "CC":5293, "RId":[2151561903, 1965061793, 2019911971, 2114804204, 2000215628, 2043343585, 134022301, 2158495995, 96199841, 1602667807, 35738896, 2041565863, 2072371179, 124362989, 2107827038, 1991631392, 71581993], "AA":[{"AuId":2033059188,"AfId":40347166}, {"AuId":676500258}, {"AuId":2019832499}, {"AuId":2114332599}, {"AuId":2004554093,"AfId":125749732}], "F":[{"FId":170133592}, {"FId":56666940}, {"FId":22789450}, {"FId":10879293}, {"FId":186311912}, {"FId":124246873}, {"FId":23123220}, {"FId":105795698}, {"FId":41008148}], "J":{"JId":135954941} }] } |
#105, #113:注意 socket.recv 是阻塞函数。不过前面的调用方基本上都是在工作线程调用此函数的,所以也就无谓了。
#106-#109:为了从回复中套出 Content-Length 标头的值。
#113:随后就可以开始愉快地接收数据了。缓冲区大小 8KB 。
数据收完之后就可以返回了。
#123:在程序一开始运行的时候,就向池中填满新鲜的 sockets 。
heart_beat
在此函数中生成了一个简单的请求,然后把 s_q socket 池中的 socket 取出来,一个一个地使用 get_ans 向外发送请求。函数保证了每一个 socket 在一分钟内会被使用一次。
|
145 |
threading.Thread(target=heart_beat)#.start() |
然而这里的 start 被注释掉了……什么状况……
get_Id_Id
|
622 623 624 625 626 627 628 629 |
if (Id1_en.AuId or Id1_en.entity.get('RId',[])) and (Id2_en.AuId or Id2_en.entity.get('RId',[])): print 'Id-Id' if Id2_en.entity.get('CC',0)>1000: print 'CC>1000' ans+=get_Id_Id(Id1, Id2, Id1_en, Id2_en) else: print 'CC<1000' ans += get_Id_Id_CC(Id1, Id2, Id1_en, Id2_en) |
与 Elecky 类似,如果 Id2 的被引次数大于1000,则使用 get_Id_Id 从左向右搜索;否则使用 get_Id_Id_CC 从右向左搜索。
|
257 258 259 |
#@profile def get_Id_Id(Id1, Id2, Id1_en, Id2_en): ans = [] |
这里 Id1_en 、 Id2_en 分别对应了 Id1 、 Id2 的论文详细信息。
对于从左向右搜索的情况
#266:先把 Id1 引用的所有文献(潜在的 Id3 )下载下来,存入 Id1_RId_en_l 中,然后按顺序考虑以下情况
Id1 - AuId/FId/CId/JId3 - Id4 - Id2 (分支):构造查询表达式
查找(包含 Id1.AuId/FId/JId/CId
或位于 Id3.RId 列表中)且引用了 Id2 的论文
很明显,这就是为了查找 Id4 。顺带一提,Or_expr_Id 实现了按照允许的最大表达式长度对查询进行拆分的功能。
#275:作者应该是 JId/CId 不分的,统统称为 CId 。
#286:然后把拆分后的查询表达式丢给 gevent.spawn ,在工作线程中进行下载。
Id - Id :这个很明显。
Id1 - Id3 - Id2 :在 Id_RId_Id 子过程中,由于此时已经把所有可能的 Id3 下载下来了,所以直接判断其 RId 中是否包含 Id2 即可。
Id1 - CId/JId/FId/AuId3 - Id2 :这个也很明显。
Id1 - Id3 - CId/JId/FId/AuID4 - Id2 :这个也很明显,考虑到所有的 Id3 都被下载下来了。
Id1 - AuId/FId/CId/JId3 - Id4 - Id2 (汇合):等待前面 spawn 出来的线程工作完毕,然后查一下返回的 Id4 论文的属性,就可以一条一条地返回了。
对于从右向左搜索的情况
[未完待续]
