view lib/python/cdx_segment.py @ 172:72b0420167dc

generalised sbatch front-end to cdx2tsv.py
author Henry S. Thompson <ht@inf.ed.ac.uk>
date Thu, 28 Jul 2022 17:24:29 +0100
parents 464d2dfb99c9
children
line wrap: on
line source

#!/usr/bin/python3
'''Split out a alphabetical cdx file by segment
Usage: cdx_segment.py archive segment-prefix idx_in
 archive is e.g. 2019-35, assuming /beegfs/common_crawl/CC-MAIN-2019-35 has sub-directories for
  cdx/warc
  [all segments, all and only those paths matching segment-prefix*.{0..99}]
 idx_in is an alphabetically ordered index fragment (one of cdx/warc/...gz), relative to archive/cdx/warc
'''
from os import listdir, makedirs, lseek, SEEK_END, SEEK_SET, read, write, fsync, system
from datetime import datetime
from random import sample
from lock import AtomicOpen
import sys,re,gzip

archive="CC-MAIN-%s"%sys.argv[1]
adir="/beegfs/common_crawl/%s"%archive
apref="crawl-data/%s"%archive
pref=sys.argv[2]

afn=sys.argv[3]
ifn=afn.split('.')[0]

SPAT=re.compile("%s[0-9]*\\.[0-9]{1,2}$"%pref)
IPAT=re.compile('"filename": "%s/segments/([0-9.]*)/([a-z]*)/'%apref)

segdirs=[d for d in listdir(adir) if SPAT.match(d)]
rr=("warc","robotstxt","crawldiagnostics")
ss={}
n={}
for r in rr:
  ss[r]=rd=dict()
  n[r]=0
  for s in segdirs:
    rdir="%s/%s/orig/cdx/%s"%(ifn,s,r)
    makedirs(rdir,0o755)
    rd[s]=open("%s/cdx"%rdir,'w+')

idir="%s/cdx/warc"%adir

e=0

st=datetime.now()
print(st,"starting",afn,file=sys.stderr)

with gzip.open("%s/%s"%(idir,afn),'rt') as f:
  for l in f:
    m=IPAT.search(l)
    if m:
      r=m[2]
      ss[r][m[1]].write(l)
      n[r]+=1
    else:
      sys.stderr.write("bogus: ",afn,l)
      e+=1

if True:
  # See note below, will have to copy entire result to /beegfs at shell level
  for rr in ss.values():
    for s in rr.values():
      s.close()
else:
  # The following fails, in that there are occasional small gaps in the result
  #  I've given up trying to figure out why...
  # Randomise to try to avoid contention
  mt=datetime.now()
  print(mt,"copying",ifn,"%s ok, %d bogus, %d seconds so far"%(':'.join(map(str,n.values())),
                                                               e,(mt-st).seconds),file=sys.stderr)

  for s in sample(segdirs,100):
    for r in rr:
      of=ss[r][s]
      of.flush()
      o=of.fileno()
      fsync(o)
      opos=lseek(o,0,SEEK_SET)
      with AtomicOpen("%s/%s/orig/cdx/%s/cdx"%(adir,s,r),"rb+") as df:
        d=df.fileno()
        dpos=lseek(d,0,SEEK_END)
        print(of.name,opos,df.name,dpos,file=sys.stderr)
        while True:
          data = read(o,131072)
          if data == b'':  # end of file reached
              break
          write(d,data)
      of.close()

  res=0 #system("rm -r %s"%ifn)

et=datetime.now()
print(et,"finished",ifn,"%s ok, %d bogus, %d seconds total"%(':'.join(map(str,n.values())),
                                                             e,(et-st).seconds),file=sys.stderr)