changeset 187:9805323d9969

add lastmod to cdx lines, start writing test case
author Henry S. Thompson <>
date Mon, 23 Sep 2024 16:35:22 +0100 (5 months ago)
parents fc1b16130961
children 0c5422df3a67
files src/nutch-cc/src/java/org/commoncrawl/util/ src/nutch-cc/src/java/org/commoncrawl/util/ src/nutch-cc/src/test/org/commoncrawl/util/
diffstat 3 files changed, 1357 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/nutch-cc/src/java/org/commoncrawl/util/	Mon Sep 23 16:35:22 2024 +0100
@@ -0,0 +1,218 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.commoncrawl.util;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.TimeZone;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.protocol.Content;
+import org.archive.url.WaybackURLKeyMaker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.fasterxml.jackson.core.JsonGenerationException;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.json.JsonWriteFeature;
+import com.fasterxml.jackson.core.TSFBuilder;
+import com.fasterxml.jackson.core.util.MinimalPrettyPrinter;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+public class WarcCdxWriter extends WarcWriter {
+  public static Logger LOG = LoggerFactory.getLogger(WarcCdxWriter.class);
+  private static final Charset UTF_8 = StandardCharsets.UTF_8;
+  protected CountingOutputStream countingOut;
+  protected OutputStream cdxOut;
+  protected String warcFilename;
+  private SimpleDateFormat timestampFormat;
+  private ObjectWriter jsonWriter;
+  private WaybackURLKeyMaker surtKeyMaker = new WaybackURLKeyMaker(true);
+  /**
+   * JSON indentation same as by Python WayBack
+   * (
+   */
+  @SuppressWarnings("serial")
+  public static class JsonIndenter extends MinimalPrettyPrinter {
+    // @Override
+    public void writeObjectFieldValueSeparator(JsonGenerator jg)
+        throws IOException, JsonGenerationException {
+      jg.writeRaw(": ");
+    }
+    // @Override
+    public void writeObjectEntrySeparator(JsonGenerator jg)
+        throws IOException, JsonGenerationException {
+      jg.writeRaw(", ");
+    }
+  }
+  public WarcCdxWriter(OutputStream warcOut, OutputStream cdxOut,
+      Path warcFilePath) {
+    super(new CountingOutputStream(warcOut));
+    countingOut = (CountingOutputStream) this.out;
+    this.cdxOut = cdxOut;
+    timestampFormat = new SimpleDateFormat("yyyyMMddHHmmss");
+    timestampFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+    warcFilename = warcFilePath.toUri().getPath().replaceFirst("^/", "");
+    ObjectMapper jsonMapper = new ObjectMapper();
+    jsonMapper.getFactory().configure(JsonWriteFeature.ESCAPE_NON_ASCII.mappedFeature(),
+				      true);
+    jsonWriter = jsonMapper.writer(new JsonIndenter());
+  }
+  public URI writeWarcRevisitRecord(final URI targetUri, final String ip,
+      final int httpStatusCode, final Date date, final URI warcinfoId,
+      final URI relatedId, final String warcProfile, final Date refersToDate,
+      final String payloadDigest, final String blockDigest, byte[] block,
+      Content content) throws IOException {
+    long offset = countingOut.getByteCount();
+    URI recordId = super.writeWarcRevisitRecord(targetUri, ip, httpStatusCode,
+        date, warcinfoId, relatedId, warcProfile, refersToDate, payloadDigest,
+        blockDigest, block, content);
+    long length = (countingOut.getByteCount() - offset);
+    writeCdxLine(targetUri, date, offset, length, payloadDigest, content, true,
+		 null, null, null);
+    return recordId;
+  }
+  public URI writeWarcResponseRecord(final URI targetUri, final String ip,
+      final int httpStatusCode, final Date date, final URI warcinfoId,
+      final URI relatedId, final String payloadDigest, final String blockDigest,
+      final String truncated, final String lastMod,
+      final byte[] block, Content content)
+      throws IOException {
+    long offset = countingOut.getByteCount();
+    URI recordId = super.writeWarcResponseRecord(targetUri, ip, httpStatusCode,
+        date, warcinfoId, relatedId, payloadDigest, blockDigest, truncated,
+	lastMod, // will be ignored
+        block, content);
+    long length = (countingOut.getByteCount() - offset);
+    String redirectLocation = null;
+    if (isRedirect(httpStatusCode)) {
+      redirectLocation = content.getMetadata().get("Location");
+      if (redirectLocation != null) {
+        try {
+          redirectLocation = new URL(targetUri.toURL(), redirectLocation)
+              .toURI().toString();
+        } catch (URISyntaxException | MalformedURLException e) {
+          redirectLocation = null;
+        }
+      }
+    }
+    writeCdxLine(targetUri, date, offset, length, payloadDigest, content, false,
+		 redirectLocation, truncated, lastMod);
+    return recordId;
+  }
+  public void writeCdxLine(final URI targetUri, final Date date, long offset,
+      long length, String payloadDigest, Content content, boolean revisit,
+      String redirectLocation, String truncated, String lastMod) throws IOException {
+    String url = targetUri.toString();
+    String surt = url;
+    Metadata meta = content.getMetadata();
+    try {
+      surt = surtKeyMaker.makeKey(url);
+    } catch (URISyntaxException e) {
+      LOG.error("Failed to make SURT for {}: {}", url,
+          StringUtils.stringifyException(e));
+      return;
+    }
+    if (payloadDigest == null) {
+      // no content, e.g., revisit record
+    } else if (payloadDigest.startsWith("sha1:")) {
+      payloadDigest = payloadDigest.substring(5);
+    }
+    cdxOut.write(surt.getBytes(UTF_8));
+    cdxOut.write(' ');
+    cdxOut.write(timestampFormat.format(date).getBytes(UTF_8));
+    cdxOut.write(' ');
+    Map<String, String> data = new LinkedHashMap<String, String>();
+    data.put("url", url);
+    if (revisit) {
+      data.put("mime", "warc/revisit");
+    } else {
+      data.put("mime", cleanMimeType(meta.get("Content-Type")));
+      data.put("mime-detected", content.getContentType());
+    }
+    data.put("status", meta.get("HTTP-Status-Code"));
+    if (payloadDigest != null) {
+      data.put("digest", payloadDigest);
+    }
+    data.put("length", String.format("%d", length));
+    data.put("offset", String.format("%d", offset));
+    data.put("filename", warcFilename);
+    String val = meta.get("Detected-Charset");
+    if (val != null) {
+      data.put("charset", val);
+    }
+    val = meta.get("Detected-Language");
+    if (val != null) {
+      data.put("languages", val);
+    }
+    if (truncated != null) {
+      data.put("truncated", truncated);
+    }
+    if (lastMod != null) {
+      data.put("lastmod", lastMod);
+    }
+    if (redirectLocation != null) {
+      data.put("redirect", redirectLocation);
+    }
+    cdxOut.write(jsonWriter.writeValueAsBytes(data));
+    cdxOut.write('\n');
+  }
+  protected static String cleanMimeType(String mime) {
+    if (mime == null)
+      return "unk";
+    final char[] delimiters = { ';', ' ' };
+    for (char delim : delimiters) {
+      int pos = mime.indexOf(delim);
+      if (pos > -1)
+        mime = mime.substring(0, pos);
+    }
+    if (mime.isEmpty())
+      return "unk";
+    return mime;
+  }
+  protected static boolean isRedirect(int httpStatusCode) {
+    return httpStatusCode >= 300 && httpStatusCode < 400
+        && httpStatusCode != 304;
+  }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/nutch-cc/src/java/org/commoncrawl/util/	Mon Sep 23 16:35:22 2024 +0100
@@ -0,0 +1,773 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.commoncrawl.util;
+import java.lang.invoke.MethodHandles;
+import java.nio.charset.StandardCharsets;
+import java.text.NumberFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.regex.Pattern;
+import org.apache.commons.codec.binary.Base32;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.protocol.ProtocolStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+class WarcRecordWriter extends RecordWriter<Text, WarcCapture> {
+  private static final Logger LOG = LoggerFactory
+      .getLogger(MethodHandles.lookup().lookupClass());
+  /**
+   * Holds duration of fetch in metadata, see
+   * {@link org.apache.nutch.protocol.http.api.HttpBase#RESPONSE_TIME}
+   */
+  protected static final Text FETCH_DURATION = new Text("_rs_");
+  public static final String CRLF = "\r\n";
+  public static final String COLONSP = ": ";
+  protected static final Pattern PROBLEMATIC_HEADERS = Pattern
+      .compile("(?i)(?:Content-(?:Encoding|Length)|Transfer-Encoding)");
+  protected static final String X_HIDE_HEADER = "X-Crawler-";
+  public static final String WARC_WRITER_COUNTER_GROUP = "WARC-Writer";
+  private TaskAttemptContext context;
+  private DataOutputStream warcOut;
+  private WarcWriter warcWriter;
+  private DataOutputStream crawlDiagnosticsWarcOut;
+  private WarcWriter crawlDiagnosticsWarcWriter;
+  private DataOutputStream robotsTxtWarcOut;
+  private WarcWriter robotsTxtWarcWriter;
+  private DataOutputStream cdxOut;
+  private DataOutputStream crawlDiagnosticsCdxOut;
+  private DataOutputStream robotsTxtCdxOut;
+  private URI warcinfoId;
+  private URI crawlDiagnosticsWarcinfoId;
+  private URI robotsTxtWarcinfoId;
+  private MessageDigest sha1 = null;
+  private Base32 base32 = new Base32();
+  private LanguageDetector langDetect;
+  private boolean generateCrawlDiagnostics;
+  private boolean generateRobotsTxt;
+  private boolean generateCdx;
+  private boolean deduplicate;
+  private boolean detectLanguage;
+  private String lastURL = ""; // for deduplication
+  public WarcRecordWriter(Configuration conf, Path outputPath, int partition,
+      TaskAttemptContext context) throws IOException {
+    this.context = context;
+    FileSystem fs = outputPath.getFileSystem(conf);
+    SimpleDateFormat fileDate = new SimpleDateFormat("yyyyMMddHHmmss",
+        Locale.US);
+    fileDate.setTimeZone(TimeZone.getTimeZone("GMT"));
+    String prefix = conf.get("warc.export.prefix", "NUTCH-CRAWL");
+    /*
+     * WARC-Date : "The timestamp shall represent the instant that data capture
+     * for record creation began."
+     * (
+     * warc-1.1/#warc-date-mandatory)
+     */
+    String date = conf.get("", fileDate.format(new Date()));
+    String endDate = conf.get("", date);
+    Date captureStartDate = new Date();
+    try {
+      captureStartDate = fileDate.parse(date);
+    } catch (ParseException e) {
+      LOG.error("Failed to parse {}: {}", date,
+          e.getMessage());
+    }
+    String hostname = conf.get("warc.export.hostname", getHostname());
+    String filename = getFileName(prefix, date, endDate, hostname, partition);
+    String publisher = conf.get("warc.export.publisher", null);
+    String operator = conf.get("warc.export.operator", null);
+    String software = conf.get("", "Apache Nutch");
+    String isPartOf = conf.get("warc.export.isPartOf", null);
+    String description = conf.get("warc.export.description", null);
+    generateCrawlDiagnostics = conf.getBoolean("warc.export.crawldiagnostics",
+        false);
+    generateRobotsTxt = conf.getBoolean("warc.export.robotstxt", false);
+    generateCdx = conf.getBoolean("warc.export.cdx", false);
+    deduplicate = conf.getBoolean("warc.deduplicate", false);
+    detectLanguage = conf.getBoolean("warc.detect.language", false);
+    Path warcPath = new Path(new Path(outputPath, "warc"), filename);
+    warcOut = fs.create(warcPath);
+    Path cdxPath = null;
+    if (generateCdx) {
+      cdxPath = new Path(
+          conf.get("warc.export.cdx.path", outputPath.toString()));
+      cdxOut = openCdxOutputStream(new Path(cdxPath, "warc"), filename, conf);
+    }
+    warcWriter = openWarcWriter(warcPath, warcOut, cdxOut);
+    warcinfoId = warcWriter.writeWarcinfoRecord(filename, hostname, publisher,
+        operator, software, isPartOf, description, captureStartDate);
+    if (generateCrawlDiagnostics) {
+      Path crawlDiagnosticsWarcPath = new Path(
+          new Path(outputPath, "crawldiagnostics"), filename);
+      crawlDiagnosticsWarcOut = fs.create(crawlDiagnosticsWarcPath);
+      if (generateCdx) {
+        crawlDiagnosticsCdxOut = openCdxOutputStream(
+            new Path(cdxPath, "crawldiagnostics"), filename, conf);
+      }
+      crawlDiagnosticsWarcWriter = openWarcWriter(crawlDiagnosticsWarcPath, crawlDiagnosticsWarcOut,crawlDiagnosticsCdxOut);
+      crawlDiagnosticsWarcinfoId = crawlDiagnosticsWarcWriter
+          .writeWarcinfoRecord(filename, hostname, publisher, operator,
+              software, isPartOf, description, captureStartDate);
+    }
+    if (generateRobotsTxt) {
+      Path robotsTxtWarcPath = new Path(new Path(outputPath, "robotstxt"),
+          filename);
+      robotsTxtWarcOut = fs.create(robotsTxtWarcPath);
+      if (generateCdx) {
+        robotsTxtCdxOut = openCdxOutputStream(new Path(cdxPath, "robotstxt"),
+            filename, conf);
+      }
+      robotsTxtWarcWriter = openWarcWriter(robotsTxtWarcPath, robotsTxtWarcOut,
+          robotsTxtCdxOut);
+      robotsTxtWarcinfoId = robotsTxtWarcWriter.writeWarcinfoRecord(filename,
+          hostname, publisher, operator, software, isPartOf, description,
+          captureStartDate);
+    }
+    try {
+      sha1 = MessageDigest.getInstance("SHA1");
+    } catch (NoSuchAlgorithmException e) {
+      LOG.error("Unable to instantiate SHA1 MessageDigest object");
+      throw new RuntimeException(e);
+    }
+    if (detectLanguage) {
+      boolean bestEffort = conf
+          .getBoolean("warc.detect.language.cld2.besteffort", false);
+      langDetect = new LanguageDetector();
+      langDetect.setBestEffort(bestEffort);
+    }
+  }
+  /**
+   * Compose a unique WARC file name.
+   * 
+   * The WARC specification recommends:
+   * <code>Prefix-Timestamp-Serial-Crawlhost.warc.gz</code> (<a href=
+   * "
+   * warc-1.1/#annex-c-informative-warc-file-size-and-name-recommendations">WARC
+   * 1.1, Annex C</a>)
+   * 
+   * @param prefix
+   *          WARC file name prefix
+   * @param startDate
+   *          capture start date
+   * @param endDate
+   *          capture end date
+   * @param hostname
+   *          name of the crawling host
+   * @param partition
+   *          MapReduce partition
+   * @return (unique) WARC file name
+   */
+  protected String getFileName(String prefix, String startDate, String endDate,
+      String host, int partition) {
+    NumberFormat numberFormat = NumberFormat.getInstance();
+    numberFormat.setMinimumIntegerDigits(5);
+    numberFormat.setGroupingUsed(false);
+    return prefix + "-" + startDate + "-" + endDate + "-"
+        + numberFormat.format(partition) + ".warc.gz";
+  }
+  protected String getSha1DigestWithAlg(byte[] bytes) {
+    sha1.reset();
+    return "sha1:" + base32.encodeAsString(sha1.digest(bytes));
+  }
+  protected static String getStatusLine(String httpHeader) {
+    int eol = httpHeader.indexOf('\n');
+    if (eol == -1) {
+      return httpHeader;
+    }
+    if (eol > 0 && httpHeader.charAt(eol - 1) == '\r') {
+      eol--;
+    }
+    return httpHeader.substring(0, eol);
+  }
+  protected static int getStatusCode(String statusLine) {
+    int start = statusLine.indexOf(" ");
+    int end = statusLine.indexOf(" ", start + 1);
+    if (end == -1)
+      end = statusLine.length();
+    int code = 200;
+    try {
+      code = Integer.parseInt(statusLine.substring(start + 1, end));
+    } catch (NumberFormatException e) {
+    }
+    return code;
+  }
+  /** Format status line and pair-wise list of headers as string */
+  public static String formatHttpHeaders(String statusLine, List<String> headers) {
+    StringBuilder sb = new StringBuilder();
+    sb.append(statusLine).append(CRLF);
+    Iterator<String> it = headers.iterator();
+    while (it.hasNext()) {
+      String name =;
+      if (!it.hasNext()) {
+        // no value for name
+        break;
+      }
+      String value =;
+      sb.append(name).append(COLONSP).append(value).append(CRLF);
+    }
+    sb.append(CRLF);
+    return sb.toString();
+  }
+  /**
+   * Modify verbatim HTTP response headers: fix, remove or replace headers
+   * <code>Content-Length</code>, <code>Content-Encoding</code> and
+   * <code>Transfer-Encoding</code> which may confuse WARC readers. Ensure that
+   * returned header end with a single empty line (<code>\r\n\r\n</code>).
+   * 
+   * @param headers
+   *          HTTP 1.1 or 1.0 response header string, CR-LF-separated lines,
+   *          first line is status line
+   * @return safe HTTP response header
+   */
+  public static String fixHttpHeaders(String headers, int contentLength) {
+    int start = 0, lineEnd = 0, last = 0, trailingCrLf= 0;
+    boolean hasContentLength = false;
+    StringBuilder replace = new StringBuilder();
+    while (start < headers.length()) {
+      lineEnd = headers.indexOf(CRLF, start);
+      trailingCrLf = 1;
+      if (lineEnd == -1) {
+        lineEnd = headers.length();
+        trailingCrLf = 0;
+      }
+      int colonPos = -1;
+      for (int i = start; i < lineEnd; i++) {
+        if (headers.charAt(i) == ':') {
+          colonPos = i;
+          break;
+        }
+      }
+      if (colonPos == -1) {
+        boolean valid = true;
+        if (start == 0) {
+          // status line (without colon)
+          // TODO: http/2
+        } else if ((lineEnd + 4) == headers.length()
+            && headers.endsWith(CRLF + CRLF)) {
+          // ok, trailing empty line
+          trailingCrLf = 2;
+        } else if (start == lineEnd) {
+          // skip/remove empty line
+          LOG.debug("Skipping empty header line");
+          valid = false;
+        } else {
+          LOG.warn("Invalid header line: {}",
+              headers.substring(start, lineEnd));
+          valid = false;
+        }
+        if (!valid) {
+          if (last < start) {
+            replace.append(headers.substring(last, start));
+          }
+          last = lineEnd + 2 * trailingCrLf;
+        }
+        start = lineEnd + 2 * trailingCrLf;
+        /*
+         * skip over invalid header line, no further check for problematic
+         * headers required
+         */
+        continue;
+      }
+      String name = headers.substring(start, colonPos);
+      if (PROBLEMATIC_HEADERS.matcher(name).matches()) {
+        boolean needsFix = true;
+        if (name.equalsIgnoreCase("content-length")) {
+          hasContentLength = true;
+          String value = headers.substring(colonPos + 1, lineEnd).trim();
+          try {
+            int l = Integer.parseInt(value);
+            if (l == contentLength) {
+              needsFix = false;
+            }
+          } catch (NumberFormatException e) {
+            // needs to be fixed
+          }
+        }
+        if (needsFix) {
+          if (last < start) {
+            replace.append(headers.substring(last, start));
+          }
+          last = lineEnd + 2 * trailingCrLf;
+          replace.append(X_HIDE_HEADER)
+              .append(headers.substring(start, lineEnd + 2 * trailingCrLf));
+          if (trailingCrLf == 0) {
+            replace.append(CRLF);
+            trailingCrLf = 1;
+          }
+          if (name.equalsIgnoreCase("content-length")) {
+            // add effective uncompressed and unchunked length of content
+            replace.append("Content-Length").append(COLONSP)
+                .append(contentLength).append(CRLF);
+          }
+        }
+      }
+      start = lineEnd + 2 * trailingCrLf;
+    }
+    if (last > 0 || trailingCrLf != 2 || !hasContentLength) {
+      if (last < headers.length()) {
+        // append trailing headers
+        replace.append(headers.substring(last));
+      }
+      if (!hasContentLength) {
+        replace.append("Content-Length").append(COLONSP).append(contentLength)
+            .append(CRLF);
+      }
+      while (trailingCrLf < 2) {
+        replace.append(CRLF);
+        trailingCrLf++;
+      }
+      return replace.toString();
+    }
+    return headers;
+  }
+  protected static String getHostname() {
+    try {
+      return InetAddress.getLocalHost().getHostName();
+    } catch (UnknownHostException e) {
+      LOG.warn("Failed to get hostname: {}", e.getMessage());
+    }
+    return "localhost";
+  }
+  private WarcWriter openWarcWriter(Path warcPath, DataOutputStream warcOut,
+      DataOutputStream cdxOut) {
+    if (cdxOut != null) {
+      return new WarcCdxWriter(warcOut, cdxOut, warcPath);
+    }
+    return new WarcWriter(warcOut);
+  }
+  protected static DataOutputStream openCdxOutputStream(Path cdxPath,
+      String warcFilename, Configuration conf) throws IOException {
+    String cdxFilename = warcFilename.replaceFirst("\\.warc\\.gz$", ".cdx.gz");
+    Path cdxFile = new Path(cdxPath, cdxFilename);
+    FileSystem fs = cdxPath.getFileSystem(conf);
+    return new DataOutputStream(new GZIPOutputStream(fs.create(cdxFile)));
+  }
+  @Override
+  public synchronized void write(Text key, WarcCapture value)
+      throws IOException {
+    URI targetUri;
+    String url = value.url.toString();
+    try {
+      targetUri = new URI(url);
+    } catch (URISyntaxException e) {
+      LOG.error("Cannot write WARC record, invalid URI: {}", url);
+      return;
+    }
+    if (value.content == null) {
+      String reason = "";
+      if (value.datum != null) {
+        ProtocolStatus pstatus = (ProtocolStatus) value.datum.getMetaData()
+            .get(Nutch.WRITABLE_PROTO_STATUS_KEY);
+        if (pstatus != null) {
+          reason = ": " + pstatus.getName() + " - " + pstatus.getMessage();
+        }
+      }
+      LOG.warn("Cannot write WARC record, no content for {}{}", value.url,
+          reason);
+      return;
+    }
+    if (deduplicate) {
+      if (lastURL.equals(url)) {
+"Skipping duplicate record: {}", value.url);
+        return;
+      }
+      lastURL = url;
+    }
+    String ip = "";
+    Date date = null;
+    boolean notModified = false;
+    Date lastModifiedDate = null;
+    String verbatimResponseHeaders = null;
+    String verbatimRequestHeaders = null;
+    List<String> headers = new ArrayList<>();
+    String responseHeaders = null;
+    String statusLine = "";
+    int httpStatusCode = 200;
+    String fetchDuration = null;
+    String truncatedReason = null;
+    String lastModifiedHeader = null;
+    if (value.datum != null) {
+      date = new Date(value.datum.getFetchTime());
+      // This is for older crawl dbs that don't include the verbatim status
+      // line in the metadata
+      ProtocolStatus pstatus = (ProtocolStatus) value.datum.getMetaData()
+          .get(Nutch.WRITABLE_PROTO_STATUS_KEY);
+      if (pstatus == null) {
+        LOG.warn("Cannot write WARC record, no protocol status for {}",
+            value.url);
+        return;
+      }
+      switch (pstatus.getCode()) {
+      case ProtocolStatus.SUCCESS:
+        statusLine = "HTTP/1.1 200 OK";
+        httpStatusCode = 200;
+        break;
+      case ProtocolStatus.TEMP_MOVED:
+        statusLine = "HTTP/1.1 302 Found";
+        httpStatusCode = 302;
+        break;
+      case ProtocolStatus.MOVED:
+        statusLine = "HTTP/1.1 301 Moved Permanently";
+        httpStatusCode = 301;
+        break;
+      case ProtocolStatus.NOTMODIFIED:
+        statusLine = "HTTP/1.1 304 Not Modified";
+        httpStatusCode = 304;
+        notModified = true;
+        long modifiedTime = value.datum.getModifiedTime();
+        if (modifiedTime > 0) {
+          lastModifiedDate = new Date(modifiedTime);
+        }
+        break;
+      default:
+        if (value.content.getMetadata()
+            .get(Response.RESPONSE_HEADERS) == null) {
+          LOG.warn("Unknown or ambiguous protocol status: {}", pstatus);
+          return;
+        }
+      }
+      String httpStatusCodeVal = value.datum.getMetaData()
+          .get(Nutch.PROTOCOL_STATUS_CODE_KEY).toString();
+      if (httpStatusCodeVal != null) {
+        try {
+          httpStatusCode = Integer.parseInt(httpStatusCodeVal);
+        } catch (NumberFormatException e) {
+        }
+      }
+      if (value.datum.getMetaData().get(FETCH_DURATION) != null) {
+        fetchDuration = value.datum.getMetaData().get(FETCH_DURATION)
+            .toString();
+      }
+    } else {
+      // robots.txt, no CrawlDatum available
+      String fetchTime = value.content.getMetadata().get(Nutch.FETCH_TIME_KEY);
+      if (fetchTime != null) {
+        try {
+          date = new Date(Long.parseLong(fetchTime));
+        } catch (NumberFormatException e) {
+          LOG.error("Invalid fetch time '{}' in content metadata of {}",
+              fetchTime, value.url.toString());
+        }
+      }
+      if (date == null) {
+        String httpDate = value.content.getMetadata().get("Date");
+        if (httpDate != null) {
+          try {
+            date = HttpDateFormat.toDate(httpDate);
+          } catch (ParseException e) {
+            LOG.warn("Failed to parse HTTP Date {} for {}", httpDate,
+                targetUri);
+            date = new Date();
+          }
+        } else {
+          LOG.warn("No HTTP Date for {}", targetUri);
+          date = new Date();
+        }
+      }
+      // status is taken from header
+    }
+    boolean useVerbatimResponseHeaders = false;
+    for (String name : value.content.getMetadata().names()) {
+      String val = value.content.getMetadata().get(name);
+      switch (name) {
+      case Response.IP_ADDRESS:
+        ip = val;
+        break;
+      case Response.REQUEST:
+        verbatimRequestHeaders = val;
+        break;
+      case Response.RESPONSE_HEADERS:
+        verbatimResponseHeaders = val;
+        if (verbatimResponseHeaders.contains(CRLF)) {
+          useVerbatimResponseHeaders = true;
+        }
+        statusLine = getStatusLine(verbatimResponseHeaders);
+        httpStatusCode = getStatusCode(statusLine);
+        break;
+      case Response.TRUNCATED_CONTENT_REASON:
+        truncatedReason = val;
+        break;
+      case Response.LAST_MODIFIED:
+	lastModifiedHeader = val;
+	break;
+      case Nutch.SEGMENT_NAME_KEY:
+      case Nutch.FETCH_STATUS_KEY:
+      case Nutch.SCORE_KEY:
+      case Nutch.SIGNATURE_KEY:
+      case Response.FETCH_TIME:
+        break; // ignore, not required for WARC record
+      default:
+        // We have to fix up a few headers because we don't have the raw
+        // responses to avoid that WARC readers try to read the content
+        // as chunked or gzip-compressed.
+        if (name.equalsIgnoreCase(Response.CONTENT_LENGTH)) {
+          int origContentLength = -1;
+          try {
+            origContentLength = Integer.parseInt(val);
+          } catch (NumberFormatException e) {
+            // ignore
+          }
+          headers.add(Response.CONTENT_LENGTH);
+          if (origContentLength != value.content.getContent().length) {
+            headers.add("" + value.content.getContent().length);
+            headers.add(X_HIDE_HEADER + Response.CONTENT_LENGTH);
+          }
+        } else if (name.equalsIgnoreCase(Response.CONTENT_ENCODING)) {
+          if (val.equalsIgnoreCase("identity")) {
+            headers.add(name);
+          } else {
+            headers.add(X_HIDE_HEADER + Response.CONTENT_ENCODING);
+          }
+        } else if (name.equalsIgnoreCase(Response.TRANSFER_ENCODING)) {
+          if (val.equalsIgnoreCase("identity")) {
+            headers.add(name);
+          } else {
+            headers.add(X_HIDE_HEADER + Response.TRANSFER_ENCODING);
+          }
+        } else {
+          headers.add(name);
+        }
+        headers.add(val);
+      }
+    }
+    if (verbatimRequestHeaders == null) {
+      LOG.error("No request headers for {}", url);
+    }
+    if (useVerbatimResponseHeaders && verbatimResponseHeaders != null) {
+      responseHeaders = fixHttpHeaders(verbatimResponseHeaders, value.content.getContent().length);
+    } else {
+      responseHeaders = formatHttpHeaders(statusLine, headers);
+    }
+    WarcWriter writer = warcWriter;
+    URI infoId = this.warcinfoId;
+    if (value.datum == null) {
+      // no CrawlDatum: must be a robots.txt
+      if (!generateRobotsTxt)
+        return;
+      writer = robotsTxtWarcWriter;
+      infoId = robotsTxtWarcinfoId;
+    } else if (value.datum.getStatus() != CrawlDatum.STATUS_FETCH_SUCCESS) {
+      if (!generateCrawlDiagnostics)
+        return;
+      writer = crawlDiagnosticsWarcWriter;
+      infoId = crawlDiagnosticsWarcinfoId;
+    }
+"WARC {} record {}", (notModified ? "revisit" : "response"),
+        targetUri);
+    URI requestId = null;
+    if (verbatimRequestHeaders != null) {
+      requestId = writer.writeWarcRequestRecord(targetUri, ip, date, infoId,
+          verbatimRequestHeaders.getBytes(StandardCharsets.UTF_8));
+    }
+    if (generateCdx) {
+      value.content.getMetadata().add("HTTP-Status-Code",
+          String.format("%d", httpStatusCode));
+    }
+    LanguageDetector.Result ldres = null;
+    if (detectLanguage && writer == warcWriter) {
+      // detect language only for successfully fetched primary documents
+      ldres = langDetect.detectLanguage(targetUri, value.content);
+      if (ldres.errorReason != null) {
+        context.getCounter(WARC_WRITER_COUNTER_GROUP,
+            "language detection: " +;
+      } else if (ldres.languages == null) {
+        context.getCounter(WARC_WRITER_COUNTER_GROUP,
+            "language detection: no result").increment(1);
+      } else if (ldres.languages.isReliable()) {
+        context.getCounter(WARC_WRITER_COUNTER_GROUP,
+            "language detection: reliable").increment(1);
+      } else {
+        context.getCounter(WARC_WRITER_COUNTER_GROUP,
+            "language detection: not reliable").increment(1);
+      }
+      if (generateCdx) {
+        if (ldres.charset != null) {
+          value.content.getMetadata().add("Detected-Charset",
+    ;
+        }
+        org.commoncrawl.langdetect.cld2.Result lr = ldres.languages;
+        if (lr != null) {
+          String codes = lr.getLanguageCodesISO639_3(",", true);
+          if (codes != null && !codes.isEmpty()) {
+            value.content.getMetadata().add("Detected-Language", codes);
+          }
+        }
+      }
+    }
+    if (notModified) {
+      /*
+       * revisit record of profile WarcWriter.PROFILE_REVISIT_NOT_MODIFIED
+       * 
+       * Note: "revisits" identified by signature comparison
+       * (WarcWriter.PROFILE_REVISIT_IDENTICAL_DIGEST) are stored as response
+       * records.
+       * 
+       * The modified date of the CrawlDatum is the date of the last successful
+       * fetch with content (status 200). It is uses for the
+       * WARC-Refers-To-Date.
+       */
+      byte[] responseHeaderBytes = responseHeaders
+          .getBytes(StandardCharsets.UTF_8);
+      String blockDigest = getSha1DigestWithAlg(responseHeaderBytes);
+      /*
+       * HTTP 304 not-modified responses do not have a payload, should not add a
+       * digest for it according to the WARC specification:
+       * "The WARC-Payload-Digest field ... shall not be used on records without
+       * a well-defined payload."
+       */
+      String payloadDigest = null;
+      writer.writeWarcRevisitRecord(targetUri, ip, httpStatusCode, date, infoId,
+          requestId, WarcWriter.PROFILE_REVISIT_NOT_MODIFIED, lastModifiedDate,
+          payloadDigest, blockDigest, responseHeaderBytes, value.content);
+    } else {
+      StringBuilder responsesb = new StringBuilder(4096);
+      responsesb.append(responseHeaders);
+      byte[] responseHeaderBytes = responsesb.toString()
+          .getBytes(StandardCharsets.UTF_8);
+      byte[] responseBytes = new byte[responseHeaderBytes.length
+          + value.content.getContent().length];
+      System.arraycopy(responseHeaderBytes, 0, responseBytes, 0,
+          responseHeaderBytes.length);
+      System.arraycopy(value.content.getContent(), 0, responseBytes,
+          responseHeaderBytes.length, value.content.getContent().length);
+      String payloadDigest = getSha1DigestWithAlg(value.content.getContent());
+      String blockDigest = getSha1DigestWithAlg(responseBytes);
+      URI responseId = writer.writeWarcResponseRecord(targetUri, ip,
+          httpStatusCode, date, infoId, requestId, payloadDigest, blockDigest,
+	  truncatedReason, lastModifiedHeader, responseBytes, value.content);
+      // Write metadata record
+      StringBuilder metadatasb = new StringBuilder(4096);
+      Map<String, String> metadata = new LinkedHashMap<String, String>();
+      if (fetchDuration != null) {
+        metadata.put("fetchTimeMs", fetchDuration);
+      }
+      if (ldres != null) {
+        if (ldres.charset != null) {
+          metadata.put("charset-detected",;
+        }
+        if (ldres.languages != null) {
+          metadata.put("languages-cld2", ldres.languages.toJSON());
+        }
+      }
+      if (metadata.size() > 0) {
+        for (Map.Entry<String, String> entry : metadata.entrySet()) {
+          metadatasb.append(entry.getKey()).append(COLONSP)
+              .append(entry.getValue()).append(CRLF);
+        }
+        metadatasb.append(CRLF);
+        writer.writeWarcMetadataRecord(targetUri, date, infoId, responseId,
+            null, metadatasb.toString().getBytes(StandardCharsets.UTF_8));
+      }
+    }
+  }
+  @Override
+  public synchronized void close(TaskAttemptContext context)
+      throws IOException {
+    context.setStatus("closing WARC output writers");
+    warcOut.close();
+    if (generateCrawlDiagnostics) {
+      crawlDiagnosticsWarcOut.close();
+    }
+    if (generateRobotsTxt) {
+      robotsTxtWarcOut.close();
+    }
+    if (generateCdx) {
+      cdxOut.close();
+      if (generateCrawlDiagnostics) {
+        crawlDiagnosticsCdxOut.close();
+      }
+      if (generateRobotsTxt) {
+        robotsTxtCdxOut.close();
+      }
+    }
+  }
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/nutch-cc/src/test/org/commoncrawl/util/	Mon Sep 23 16:35:22 2024 +0100
@@ -0,0 +1,366 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.commoncrawl.util;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configuration.IntegerRanges;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.CrawlDbReducer;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import java.util.ArrayList;
+import java.util.List;
+import org.junit.Test;
+public class TestWarcCdxWriter {
+  private static CrawlDbReducer reducer = new CrawlDbReducer();
+  private static class DummyContext extends Reducer<Text, CrawlDatum, Text, CrawlDatum>.Context {
+    private Configuration conf;
+    private DummyContext() {
+      reducer.super();
+      conf = new Configuration();
+    }
+    private List<CrawlDatum> values = new ArrayList<CrawlDatum>();
+    @Override
+    public void write(Text key, CrawlDatum value) throws IOException, InterruptedException {
+      values.add(value);
+    }
+    /** collected values as List */
+    public List<CrawlDatum> getValues() {
+      return values;
+    }
+    /** Obtain current collected value from List */
+    @Override
+    public CrawlDatum getCurrentValue() throws UnsupportedOperationException {
+      throw new UnsupportedOperationException("Dummy context");
+    }
+    /** Obtain current collected key from List */
+    @Override
+    public Text getCurrentKey() throws UnsupportedOperationException {
+      throw new UnsupportedOperationException("Dummy context with no keys");
+    }
+    private Counters dummyCounters = new Counters();
+    public void progress() {
+    }
+    public Counter getCounter(Enum<?> arg0) {
+      return dummyCounters.getGroup("dummy").getCounterForName("dummy");
+    }
+    public Counter getCounter(String arg0, String arg1) {
+      return dummyCounters.getGroup("dummy").getCounterForName("dummy");
+    }
+    public void setStatus(String arg0) throws UnsupportedOperationException {
+      throw new UnsupportedOperationException("Dummy context with no status");
+    }
+    @Override
+    public String getStatus() throws UnsupportedOperationException {
+      throw new UnsupportedOperationException("Dummy context with no status");
+    }
+    public float getProgress() {
+      return 1f;
+    }
+    public OutputCommitter getOutputCommitter() {
+      throw new UnsupportedOperationException("Dummy context without committer");
+    }
+    public boolean nextKey(){
+      return false;
+    }
+    @Override
+    public boolean nextKeyValue(){
+      return false;
+    }
+    @Override
+    public TaskAttemptID getTaskAttemptID() throws UnsupportedOperationException {
+      throw new UnsupportedOperationException("Dummy context without TaskAttemptID");
+    }
+    @Override
+    public Path[] getArchiveClassPaths() {
+      return null;
+    }
+    @Override
+    public String[] getArchiveTimestamps() {
+      return null;
+    }
+    @Override
+    public URI[] getCacheArchives() throws IOException {
+      return null;
+    }
+    @Override
+    public URI[] getCacheFiles() throws IOException {
+      return null;
+    }
+    @Override
+    public Class<? extends Reducer<?, ?, ?, ?>> getCombinerClass() throws ClassNotFoundException {
+      return null;
+    }
+    @Override
+    public RawComparator<?> getCombinerKeyGroupingComparator() {
+      return null;
+    }
+    @Override
+    public Configuration getConfiguration() {
+      return conf;
+    }
+    @Override
+    public Credentials getCredentials() {
+      return null;
+    }
+    @Override
+    public Path[] getFileClassPaths() {
+      return null;
+    }
+    @Override
+    public String[] getFileTimestamps() {
+      return null;
+    }
+    @Override
+    public RawComparator<?> getGroupingComparator() {
+      return null;
+    }
+    @Override
+    public Class<? extends InputFormat<?, ?>> getInputFormatClass() throws ClassNotFoundException {
+      return null;
+    }
+    @Override
+    public String getJar() {
+      return null;
+    }
+    @Override
+    public JobID getJobID() {
+      return null;
+    }
+    @Override
+    public String getJobName() {
+      return null;
+    }
+    @Override
+    public boolean getJobSetupCleanupNeeded() {
+      return false;
+    }
+    @Override
+    @Deprecated
+    public Path[] getLocalCacheArchives() throws IOException {
+      return null;
+    }
+    @Override
+    @Deprecated
+    public Path[] getLocalCacheFiles() throws IOException {
+      return null;
+    }
+    @Override
+    public Class<?> getMapOutputKeyClass() {
+      return null;
+    }
+    @Override
+    public Class<?> getMapOutputValueClass() {
+      return null;
+    }
+    @Override
+    public Class<? extends Mapper<?, ?, ?, ?>> getMapperClass() throws ClassNotFoundException {
+      return null;
+    }
+    @Override
+    public int getMaxMapAttempts() {
+      return 0;
+    }
+    @Override
+    public int getMaxReduceAttempts() {
+      return 0;
+    }
+    @Override
+    public int getNumReduceTasks() {
+      return 0;
+    }
+    @Override
+    public Class<? extends OutputFormat<?, ?>> getOutputFormatClass() throws ClassNotFoundException {
+      return null;
+    }
+    @Override
+    public Class<?> getOutputKeyClass() {
+      return null;
+    }
+    @Override
+    public Class<?> getOutputValueClass() {
+      return null;
+    }
+    @Override
+    public Class<? extends Partitioner<?, ?>> getPartitionerClass() throws ClassNotFoundException {
+      return null;
+    }
+    @Override
+    public boolean getProfileEnabled() {
+      return false;
+    }
+    @Override
+    public String getProfileParams() {
+      return null;
+    }
+    @Override
+    public IntegerRanges getProfileTaskRange(boolean arg0) {
+      return null;
+    }
+    @Override
+    public Class<? extends Reducer<?, ?, ?, ?>> getReducerClass() throws ClassNotFoundException {
+      return null;
+    }
+    @Override
+    public RawComparator<?> getSortComparator() {
+      return null;
+    }
+    @Override
+    @Deprecated
+    public boolean getSymlink() {
+      return false;
+    }
+    @Override
+    public boolean getTaskCleanupNeeded() {
+      return false;
+    }
+   @Override
+    public String getUser() {
+      return null;
+    }
+    @Override
+    public Path getWorkingDirectory() throws IOException {
+      return null;
+    }
+  }
+  public final static String statusLine1 = "HTTP/1.1 200 OK";
+  public final static String testHeaders1[] = { //
+      "Content-Type", "text/html", //
+      "Accept-Ranges", "bytes", //
+      "Content-Encoding", "gzip", //
+      "Vary", "Accept-Encoding", "Server",
+      "Apache/2.0.63 (Unix) PHP/4.4.7 mod_ssl/2.0.63 OpenSSL/0.9.7e mod_fastcgi/2.4.2 DAV/2 SVN/1.4.2",
+      "Last-Modified", "Thu, 15 Jan 2009 00:02:29 GMT", "ETag",
+      "\"1262d9e-3ffa-2c19af40\"", //
+      "Date", "Mon, 26 Jan 2009 10:00:40 GMT", //
+      "Content-Length", "16378", //
+      "Connection", "close" };
+  public final static String testHeaderString1;
+  static {
+    StringBuilder headers = new StringBuilder();
+    headers.append(statusLine1).append(WarcRecordWriter.CRLF);
+    for (int i = 0; i < testHeaders1.length; i += 2) {
+      headers.append(testHeaders1[i]).append(WarcRecordWriter.COLONSP);
+      headers.append(testHeaders1[i+1]).append(WarcRecordWriter.CRLF);
+    }
+    headers.append(WarcRecordWriter.CRLF);
+    testHeaderString1 = headers.toString();
+  }
+  @Test
+  public void testLastMod() throws IOException {
+    Configuration config = NutchConfiguration.create();
+    Job job = NutchJob.getInstance(config);
+    TaskAttemptContext context = new DummyContext();
+    config = job.getConfiguration();
+    OutputStream devnull = OutputStream.nullOutputStream();
+    config.setBoolean("warc.export.cdx", true);
+    WarcRecordWriter wrw = new WarcRecordWriter(config, new Path("/tmp"),
+						123, context); 
+    //ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    //WarcCdxWriter cdx = new WarcCdxWriter(devnull, baos, new Path("/tmp"));
+  }