changeset 86:b5fef78cbb26

working for -t 2 -c 2
author Henry S. Thompson <ht@inf.ed.ac.uk>
date Mon, 15 Mar 2021 14:26:42 +0000
parents e5d5958bf3fe
children b6a5999d8e06
files bin/cdx_segment.sh lib/python/cdx_segment.py
diffstat 2 files changed, 76 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/bin/cdx_segment.sh	Mon Mar 15 14:26:42 2021 +0000
@@ -0,0 +1,16 @@
+#!/bin/bash
+# Invoke this as e.g. sbatch -n 30 -c 10 masterJob.sh cdx_segment
+# run cdx_segment.py in parallel 
+n=$SLURM_NTASKS
+c=$SLURM_CPUS_PER_TASK
+node=$SLURMD_NODENAME
+local=$SLURM_LOCALID
+proc=$SLURM_PROCID
+echo $(date) $node:$proc $start
+
+module load gnu-parallel
+
+parallel --will-cite -j $c lib/python/cdx_segment.py 2019-35 15 '{}' < cdx_segment/$proc.txt
+
+echo $(date) $proc end
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/lib/python/cdx_segment.py	Mon Mar 15 14:26:42 2021 +0000
@@ -0,0 +1,60 @@
+#!/usr/bin/python3
+'''Split out a alphabetical cdx file by segment
+Usage: cdx_segment.py archive segment-prefix idx_in
+ archive is e.g. 2019-35, assuming /beegfs/common_crawl/CC-MAIN-2019-35 has sub-directories for
+  cdx/warc
+  [all segments, all and only those paths matching segment-prefix*.{0..99}]
+ idx_in is an alphabetically ordered index fragment (one of cdx/warc/...gz), relative to archive/cdx/warc
+'''
+import gzip
+from os import listdir, makedirs
+from datetime import datetime
+import sys,re
+
+archive="CC-MAIN-%s"%sys.argv[1]
+adir="/beegfs/common_crawl/%s"%archive
+apref="crawl-data/%s"%archive
+pref=sys.argv[2]
+
+afn=sys.argv[3]
+
+SPAT=re.compile("%s[0-9]*\\.[0-9]{1,2}$"%pref)
+IPAT=re.compile('"filename": "%s/segments/([0-9.]*)/([a-z]*)/'%apref)
+
+segdirs=[d for d in listdir(adir) if SPAT.match(d)]
+ss={}
+n={}
+for r in ("warc","robotstxt","crawldiagnostics"):
+  ss[r]=rd=dict()
+  n[r]=0
+  for s in segdirs:
+    rdir="%s/%s/orig/cdx/%s"%(adir,s,r)
+    makedirs(rdir,0o755,exist_ok=True)
+    rd[s]=open("%s/cdx"%rdir,'at')
+
+idir="%s/cdx/warc"%adir
+
+e=0
+
+st=datetime.now()
+print(st,"starting",afn,file=sys.stderr)
+
+with gzip.open("%s/%s"%(idir,afn),'rt') as f:
+  for l in f:
+    m=IPAT.search(l)
+    if m:
+      r=m[2]
+      ss[r][m[1]].write(l)
+      n[r]+=1
+    else:
+      sys.stderr.write("bogus: ",afn,l)
+      e+=1
+
+for gg in ss.values():
+  for g in gg.values():
+    g.close()
+
+et=datetime.now()
+print(et,"finished",afn,"%s ok, %d bogus, %d seconds elapsed"%(':'.join(map(str,n.values())),
+                                                               e,(et-st).seconds),file=sys.stderr)
+