changeset 65:e1f61f94b196

switch to curl->file, enable retries
author Henry S. Thompson <ht@markup.co.uk>
date Thu, 04 Jun 2020 12:08:29 +0000
parents b91e44355bbf
children b04870ab3035
files master/src/wecu/run_sac.sh
diffstat 1 files changed, 39 insertions(+), 10 deletions(-) [+]
line wrap: on
line diff
--- a/master/src/wecu/run_sac.sh	Wed Jun 03 22:08:01 2020 +0000
+++ b/master/src/wecu/run_sac.sh	Thu Jun 04 12:08:29 2020 +0000
@@ -30,9 +30,30 @@
  shift
 fi
 
+lrand () {
+# cheap bad little random number generator
+echo $(( 1 + ($(openssl rand 1 | od -d | head -1 | tr -s ' ' | cut -f 2 -d ' ') % $1)))
+}
+
+tryread () {
+  m=0
+  u=$1
+  f=$2
+  set -o pipefail
+  until if [ $((m+=1)) -gt 5 ]; then echo " tried 5 times w/o success, giving up" 1>&2; return 1; fi && echo $(date) "Reading $u ..." 1>&2 && \
+   curl -s -S --max-time 60 --insecure -o "$f" "$u" &&
+        echo " done at " $(date) 1>&2 
+   do
+    # try to avoid lockstep retries
+    echo \# ${PIPESTATUS[@]} 1>&2
+    sleep $(lrand 10)
+    echo \# $(date) retry number $m 1>&2
+  done
+  set +o pipefail
+}
+
 # Get quoting right...
 worker () {
-  set -e
   set -o pipefail
   mkdir -p logs
   mkdir -p res
@@ -51,19 +72,26 @@
   ff=$(echo $f | cut -f 4,6 -d / | sed 's/CC-MAIN-//;s/\.warc.*$//')
   echo $(date +%Y-%m-%d.%H:%M:%S) $me start $j $ff >>logs/${j}_log
   export PYTHONIOENCODING=utf-8
-  { IFS=$'\n' ; stderr=( $( { curl -s -N https://commoncrawl.s3.amazonaws.com/$f | \
-   unpigz -dp 1 -c | $filter ./$mapper "$keyHandler" "$@" ; } 2>&1 1>res/${j}.tsv ; ) ) ; unset IFS ; }
-  { echo $(date +%Y-%m-%d.%H:%M:%S) $me finished $j $ff
-    printf '%s\n' "${stderr[@]}" ; } | sed '2,$s/^/ /' >>logs/${j}_log # hack to try to
-      # guarantee atomic entry in the log
+  { IFS=$'\n' ; stderr=( $( { set -e
+		  #curl -s -N -o ${j}.gz https://commoncrawl.s3.amazonaws.com/$f
+		  tryread "https://commoncrawl.s3.amazonaws.com/$f" "${j}.gz"
+		  unpigz -dp 1 -c ${j}.gz |\
+		      $filter ./$mapper "$keyHandler" "$@" 
+	      } 2>&1 1>res/${j}.tsv ) ) ; subres="$?" ; unset IFS ; }
+  rm "${j}.gz"
+  { echo $(date +%Y-%m-%d.%H:%M:%S) $me finished $j $ff $subres
+    printf '%s\n' "${stderr[@]}" ; } | sed '2,$s/^/ /' >>logs/${j}_log # hack
+         # to try to guarantee atomic entry in the log
+         # Pbly not necessary with current sub-structure...
 }
 
-export -f worker
+export -f worker tryread lrand
 
 echo worker '{}' '{#}' "$mapper" "$filter" "$keyHandler" "$@" 1>&2
 
-date 1>&2
+echo starting... $(date) 1>&2
 parallel \
+    --joblog parlog.txt \
     --sshloginfile $hosts \
     --retries 3 \
     --transferfile $(which $mapper|sed 's/\(^.*\/\)/\1.\//') \
@@ -71,9 +99,10 @@
     --jobs $cores \
     --workdir $wd \
     -a input_paths \
-    --env worker \
+    --env worker --env tryread --env lrand \
     --return 'logs/{#}_log' --return 'res/{#}.tsv' --cleanup \
     worker '{}' '{#}' "$mapper" "$filter" "$keyHandler" "$@"
 res=$?
-echo $(date) $res
+echo reducing... $(date) pres=$res 1>&2
 cat res/*.tsv | sac_reducer.py $1 $numKeys
+echo done $(date) 1>&2