view bin/track.py @ 93:4d870a7ec871

support a command to receive each result, remove use of X-Crawler-Content-Length
author Henry S. Thompson <ht@inf.ed.ac.uk>
date Thu, 15 Apr 2021 10:59:25 +0000
parents fcb390b3ea55
children
line wrap: on
line source

#!/lustre/sw/miniconda3/bin/python3
'''Track a list of URIs through nutch results'''
# Usage: track.py year-nn segmentid [file]

import re,sys,glob,gzip,json,urllib.parse

CDX=re.compile("(.*)\)(.*) (.*) (\{.*\})$")
FAIL1a=re.compile("....................... INFO  fetcher\\.FetcherThread - FetcherThread [0-9]* fetch of (?P<u>.*) failed with: (?P<m>Http code=[^,]*)")
FAIL1b=re.compile("....................... INFO  fetcher\\.FetcherThread - FetcherThread [0-9]* fetch of (?P<u>.*) failed with: (?P<m>[^ ]*):")
FAIL2=re.compile("....................... INFO  fetcher\\.FetcherThread - (?P<m>Crawl-Delay) for (?P<u>.*) too long")
FAIL3=re.compile("....................... INFO  fetcher\\.FetcherThread - (?P<m>Denied) by robots.txt: (?P<u>.*)$")
cc="/beegfs/common_crawl/CC-MAIN-%s"%sys.argv[1]
seg=sys.argv[2]
try:
    if len(sys.argv)==4:
        uuf=open(sys.argv[3])
    else:
        uuf=sys.stdin
    wwfp="%s/%s/cdx/warc/CC*.gz"%(cc,seg)
    ddfp="%s/%s/cdx/crawldiagnostics/CC*.gz"%(cc,seg)
    wwff=glob.glob(wwfp)
    ddff=glob.glob(ddfp)
    wtf=open("%s/%s/warc/warc/truncated.txt"%(cc,seg))
    buf=open("%s/%s/bu.txt"%(cc,seg))
    log=open("%s/%s/hadoop.log"%(cc,seg))
    assert len(wwff)!=0,wwfp
    assert len(ddff)!=0,ddfp
except:
    print("Usage: track.py year-nn segmentid [file]",file=sys.stderr)
    raise

BU={}
U={}

class URI:
    uu={}
    depth=0
    def __init__(self,s,seed=False):
        self.s=s
        self.d_props=None # from cdx/crawldiagnostics
        self.w_props=None # from cdx/warc
        self.trunc=self.fail=None
        self._host=self._path=self._scheme=None
        self.seed=seed
        self.uu[s]=self

    @property
    def host(self):
        if self._host is None:
            (self._scheme,self._host,self._path
             ,_,_,_)=urllib.parse.urlparse(self.s)
        return self._host

    @property
    def path(self):
        if self._path is None:
            (self._scheme,self._host,self._path,
             _,_,_)=urllib.parse.urlparse(self.s)
        return self._path

    @property
    def scheme(self):
        if self._scheme is None:
            (self._scheme,self._host,self._path,
             _,_,_)=urllib.parse.urlparse(self.s)
        return self._scheme

    @classmethod
    def get(cls,s):
        try:
            return cls.uu[s]
        except KeyError:
            return cls(s)

    def __repr__(self):
        prefix="%s://%s%s"%(self.scheme,self.host,self.path[:20])
        plen=len(prefix)
        suffix=('' if len(self.s)<plen else
                '...'+self.s[-min(len(self.s)-plen,10):])
        return "<U%s>%s[%s%s]"%(self.typeString(),
                                 (lambda x:'' if x=='2' else x)(self.status[0]),
                                 prefix,suffix)

    def typeString(self):
        return "%s%s%s%s%s"%('s' if self.seed else '',
                           'r' if self.w_props is not None else (
                               ('d'+self.status[0])
                                  if self.d_props is not None else ''),
                           ('l'+'.'.join(str(s[1]) for s in
                                        sorted(self.sources,key=lambda x:x[1])))
                                if hasattr(self,'sources') else '',
                           '' if self.trunc is None else self.trunc[0],
                           '' if self.fail is None else (
                               'F'+(lambda x:x[10] if x[0]=='H' else x[0])(self.fail)))

def readCDX(files,where,status=None):
    c=None
    res={}
    # Ref. https://github.com/ikreymer/webarchive-indexing
    for resFileName in files:
        with gzip.open(resFileName,mode='rt',encoding='utf-8') as rf:
            n=0
            try:
                for c in rf:
                    r=CDX.match(c)
                    (rdom,path,seg,props)=r.groups()
                    d=json.loads(props)
                    uri=d["url"]
                    u=URI.get(uri)
                    u.__dict__[where]=d
                    u.rdomstr=rdom # domain, reverse order, comma separated
                    u.lcpath=path # path, lower-cased, maybe %decoded?
                    u.seg=seg # partial warc identifier?
                    if status is not None:
                        u.status=status
                    res[uri]=u
                    n+=1
            except:
                print(resFileName,n,c,file=sys.stderr)
                raise
        #print (n,len(res),file=sys.stderr)
    return res

seeds=0
for l in buf.readlines():
    seeds+=1
    u=URI(l.rstrip(),True)

print('post-seed',seeds,file=sys.stderr)
sys.stderr.flush()
fetches=readCDX(wwff,'w_props',"200")
print('post-fetch',len(URI.uu),file=sys.stderr)
sys.stderr.flush()
diags=readCDX(ddff,'d_props')
print('post-diag',len(URI.uu),file=sys.stderr)
sys.stderr.flush()

BORKED="borked"

def maybeTrack(u,source=None,depth=0):
    if not hasattr(u,'status'):
        u.status='unknown' if u.d_props is None else u.d_props["status"]
    if source is not None:
        bptr=(source,depth)
        if hasattr(u,'sources'):
            u.sources.append(bptr)
        else:
            u.sources=[bptr]
    if u.status[0]=='3':
        try:
            loc=u.d_props["redirect"]
            r=URI.get(loc)
            u.reloc=r
            maybeTrack(r,source=u,depth=depth+1)
        except KeyError:
            u.reloc=BORKED # something went wrong somewhere...

for u in diags.values():
    maybeTrack(u)

truncs=0
for l in wtf:
    if l.startswith('WARC-'):
        (k,rest)=l.split(' ',1)
        if k=='WARC-Target-URI:':
            uri=URI.uu[rest.rstrip()] # better be there...
        elif k=='WARC-Truncated:':
            truncs+=1
            uri.trunc=rest.rstrip()

fails=0
for l in log:
    for p in (FAIL1a,FAIL1b,FAIL2,FAIL3):
        r=p.match(l)
        if r:
            fails+=1
            u=r.group('u')
            m=r.group('m')
            URI.get(u).fail=m

print('post-fail',len(URI.uu),file=sys.stderr)
sys.stderr.flush()

print("""For %s/%s:
 %4s requested s
 %4s retrieved r
 %4s diagnosed d
 %4s redirection-location l
 %4s failed F{j for java Exception, 1-5 for Http code=, C for robot crawl delay
              D for robot denied}
 %4s truncated r{d for disconnect, t for timeout}
"""%(cc,seg,seeds,len(fetches),len(diags),
     sum(1 for u in URI.uu.values() if hasattr(u,'sources')),
         fails,truncs),
      file=sys.stderr)
sys.stderr.flush()

if not sys.stdout.isatty():
    for u in URI.uu.values():
        print(u.typeString())