Mercurial > hg > cc > cirrus_home
view bin/track.py @ 93:4d870a7ec871
support a command to receive each result,
remove use of X-Crawler-Content-Length
author | Henry S. Thompson <ht@inf.ed.ac.uk> |
---|---|
date | Thu, 15 Apr 2021 10:59:25 +0000 |
parents | fcb390b3ea55 |
children |
line wrap: on
line source
#!/lustre/sw/miniconda3/bin/python3 '''Track a list of URIs through nutch results''' # Usage: track.py year-nn segmentid [file] import re,sys,glob,gzip,json,urllib.parse CDX=re.compile("(.*)\)(.*) (.*) (\{.*\})$") FAIL1a=re.compile("....................... INFO fetcher\\.FetcherThread - FetcherThread [0-9]* fetch of (?P<u>.*) failed with: (?P<m>Http code=[^,]*)") FAIL1b=re.compile("....................... INFO fetcher\\.FetcherThread - FetcherThread [0-9]* fetch of (?P<u>.*) failed with: (?P<m>[^ ]*):") FAIL2=re.compile("....................... INFO fetcher\\.FetcherThread - (?P<m>Crawl-Delay) for (?P<u>.*) too long") FAIL3=re.compile("....................... INFO fetcher\\.FetcherThread - (?P<m>Denied) by robots.txt: (?P<u>.*)$") cc="/beegfs/common_crawl/CC-MAIN-%s"%sys.argv[1] seg=sys.argv[2] try: if len(sys.argv)==4: uuf=open(sys.argv[3]) else: uuf=sys.stdin wwfp="%s/%s/cdx/warc/CC*.gz"%(cc,seg) ddfp="%s/%s/cdx/crawldiagnostics/CC*.gz"%(cc,seg) wwff=glob.glob(wwfp) ddff=glob.glob(ddfp) wtf=open("%s/%s/warc/warc/truncated.txt"%(cc,seg)) buf=open("%s/%s/bu.txt"%(cc,seg)) log=open("%s/%s/hadoop.log"%(cc,seg)) assert len(wwff)!=0,wwfp assert len(ddff)!=0,ddfp except: print("Usage: track.py year-nn segmentid [file]",file=sys.stderr) raise BU={} U={} class URI: uu={} depth=0 def __init__(self,s,seed=False): self.s=s self.d_props=None # from cdx/crawldiagnostics self.w_props=None # from cdx/warc self.trunc=self.fail=None self._host=self._path=self._scheme=None self.seed=seed self.uu[s]=self @property def host(self): if self._host is None: (self._scheme,self._host,self._path ,_,_,_)=urllib.parse.urlparse(self.s) return self._host @property def path(self): if self._path is None: (self._scheme,self._host,self._path, _,_,_)=urllib.parse.urlparse(self.s) return self._path @property def scheme(self): if self._scheme is None: (self._scheme,self._host,self._path, _,_,_)=urllib.parse.urlparse(self.s) return self._scheme @classmethod def get(cls,s): try: return cls.uu[s] except KeyError: return cls(s) def __repr__(self): prefix="%s://%s%s"%(self.scheme,self.host,self.path[:20]) plen=len(prefix) suffix=('' if len(self.s)<plen else '...'+self.s[-min(len(self.s)-plen,10):]) return "<U%s>%s[%s%s]"%(self.typeString(), (lambda x:'' if x=='2' else x)(self.status[0]), prefix,suffix) def typeString(self): return "%s%s%s%s%s"%('s' if self.seed else '', 'r' if self.w_props is not None else ( ('d'+self.status[0]) if self.d_props is not None else ''), ('l'+'.'.join(str(s[1]) for s in sorted(self.sources,key=lambda x:x[1]))) if hasattr(self,'sources') else '', '' if self.trunc is None else self.trunc[0], '' if self.fail is None else ( 'F'+(lambda x:x[10] if x[0]=='H' else x[0])(self.fail))) def readCDX(files,where,status=None): c=None res={} # Ref. https://github.com/ikreymer/webarchive-indexing for resFileName in files: with gzip.open(resFileName,mode='rt',encoding='utf-8') as rf: n=0 try: for c in rf: r=CDX.match(c) (rdom,path,seg,props)=r.groups() d=json.loads(props) uri=d["url"] u=URI.get(uri) u.__dict__[where]=d u.rdomstr=rdom # domain, reverse order, comma separated u.lcpath=path # path, lower-cased, maybe %decoded? u.seg=seg # partial warc identifier? if status is not None: u.status=status res[uri]=u n+=1 except: print(resFileName,n,c,file=sys.stderr) raise #print (n,len(res),file=sys.stderr) return res seeds=0 for l in buf.readlines(): seeds+=1 u=URI(l.rstrip(),True) print('post-seed',seeds,file=sys.stderr) sys.stderr.flush() fetches=readCDX(wwff,'w_props',"200") print('post-fetch',len(URI.uu),file=sys.stderr) sys.stderr.flush() diags=readCDX(ddff,'d_props') print('post-diag',len(URI.uu),file=sys.stderr) sys.stderr.flush() BORKED="borked" def maybeTrack(u,source=None,depth=0): if not hasattr(u,'status'): u.status='unknown' if u.d_props is None else u.d_props["status"] if source is not None: bptr=(source,depth) if hasattr(u,'sources'): u.sources.append(bptr) else: u.sources=[bptr] if u.status[0]=='3': try: loc=u.d_props["redirect"] r=URI.get(loc) u.reloc=r maybeTrack(r,source=u,depth=depth+1) except KeyError: u.reloc=BORKED # something went wrong somewhere... for u in diags.values(): maybeTrack(u) truncs=0 for l in wtf: if l.startswith('WARC-'): (k,rest)=l.split(' ',1) if k=='WARC-Target-URI:': uri=URI.uu[rest.rstrip()] # better be there... elif k=='WARC-Truncated:': truncs+=1 uri.trunc=rest.rstrip() fails=0 for l in log: for p in (FAIL1a,FAIL1b,FAIL2,FAIL3): r=p.match(l) if r: fails+=1 u=r.group('u') m=r.group('m') URI.get(u).fail=m print('post-fail',len(URI.uu),file=sys.stderr) sys.stderr.flush() print("""For %s/%s: %4s requested s %4s retrieved r %4s diagnosed d %4s redirection-location l %4s failed F{j for java Exception, 1-5 for Http code=, C for robot crawl delay D for robot denied} %4s truncated r{d for disconnect, t for timeout} """%(cc,seg,seeds,len(fetches),len(diags), sum(1 for u in URI.uu.values() if hasattr(u,'sources')), fails,truncs), file=sys.stderr) sys.stderr.flush() if not sys.stdout.isatty(): for u in URI.uu.values(): print(u.typeString())