Mercurial > hg > cc > cirrus_home
changeset 187:9805323d9969
add lastmod to cdx lines,
start writing test case
author | Henry S. Thompson <ht@inf.ed.ac.uk> |
---|---|
date | Mon, 23 Sep 2024 16:35:22 +0100 |
parents | fc1b16130961 |
children | 0c5422df3a67 |
files | src/nutch-cc/src/java/org/commoncrawl/util/WarcCdxWriter.java src/nutch-cc/src/java/org/commoncrawl/util/WarcRecordWriter.java src/nutch-cc/src/test/org/commoncrawl/util/TestWarcCdxWriter.java |
diffstat | 3 files changed, 1357 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/nutch-cc/src/java/org/commoncrawl/util/WarcCdxWriter.java Mon Sep 23 16:35:22 2024 +0100 @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.commoncrawl.util; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.TimeZone; + +import org.apache.commons.io.output.CountingOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.StringUtils; +import org.apache.nutch.metadata.Metadata; +import org.apache.nutch.protocol.Content; +import org.archive.url.WaybackURLKeyMaker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.JsonGenerationException; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.json.JsonWriteFeature; +import com.fasterxml.jackson.core.TSFBuilder; +import com.fasterxml.jackson.core.util.MinimalPrettyPrinter; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; + +public class WarcCdxWriter extends WarcWriter { + + public static Logger LOG = LoggerFactory.getLogger(WarcCdxWriter.class); + + private static final Charset UTF_8 = StandardCharsets.UTF_8; + + protected CountingOutputStream countingOut; + protected OutputStream cdxOut; + protected String warcFilename; + + private SimpleDateFormat timestampFormat; + private ObjectWriter jsonWriter; + private WaybackURLKeyMaker surtKeyMaker = new WaybackURLKeyMaker(true); + + /** + * JSON indentation same as by Python WayBack + * (https://github.com/ikreymer/pywb) + */ + @SuppressWarnings("serial") + public static class JsonIndenter extends MinimalPrettyPrinter { + + // @Override + public void writeObjectFieldValueSeparator(JsonGenerator jg) + throws IOException, JsonGenerationException { + jg.writeRaw(": "); + } + + // @Override + public void writeObjectEntrySeparator(JsonGenerator jg) + throws IOException, JsonGenerationException { + jg.writeRaw(", "); + } + } + + public WarcCdxWriter(OutputStream warcOut, OutputStream cdxOut, + Path warcFilePath) { + super(new CountingOutputStream(warcOut)); + countingOut = (CountingOutputStream) this.out; + this.cdxOut = cdxOut; + timestampFormat = new SimpleDateFormat("yyyyMMddHHmmss"); + timestampFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + warcFilename = warcFilePath.toUri().getPath().replaceFirst("^/", ""); + ObjectMapper jsonMapper = new ObjectMapper(); + jsonMapper.getFactory().configure(JsonWriteFeature.ESCAPE_NON_ASCII.mappedFeature(), + true); + jsonWriter = jsonMapper.writer(new JsonIndenter()); + } + + public URI writeWarcRevisitRecord(final URI targetUri, final String ip, + final int httpStatusCode, final Date date, final URI warcinfoId, + final URI relatedId, final String warcProfile, final Date refersToDate, + final String payloadDigest, final String blockDigest, byte[] block, + Content content) throws IOException { + long offset = countingOut.getByteCount(); + URI recordId = super.writeWarcRevisitRecord(targetUri, ip, httpStatusCode, + date, warcinfoId, relatedId, warcProfile, refersToDate, payloadDigest, + blockDigest, block, content); + long length = (countingOut.getByteCount() - offset); + writeCdxLine(targetUri, date, offset, length, payloadDigest, content, true, + null, null, null); + return recordId; + } + + public URI writeWarcResponseRecord(final URI targetUri, final String ip, + final int httpStatusCode, final Date date, final URI warcinfoId, + final URI relatedId, final String payloadDigest, final String blockDigest, + final String truncated, final String lastMod, + final byte[] block, Content content) + throws IOException { + long offset = countingOut.getByteCount(); + URI recordId = super.writeWarcResponseRecord(targetUri, ip, httpStatusCode, + date, warcinfoId, relatedId, payloadDigest, blockDigest, truncated, + lastMod, // will be ignored + block, content); + long length = (countingOut.getByteCount() - offset); + String redirectLocation = null; + if (isRedirect(httpStatusCode)) { + redirectLocation = content.getMetadata().get("Location"); + if (redirectLocation != null) { + try { + redirectLocation = new URL(targetUri.toURL(), redirectLocation) + .toURI().toString(); + } catch (URISyntaxException | MalformedURLException e) { + redirectLocation = null; + } + } + } + writeCdxLine(targetUri, date, offset, length, payloadDigest, content, false, + redirectLocation, truncated, lastMod); + return recordId; + } + + public void writeCdxLine(final URI targetUri, final Date date, long offset, + long length, String payloadDigest, Content content, boolean revisit, + String redirectLocation, String truncated, String lastMod) throws IOException { + String url = targetUri.toString(); + String surt = url; + Metadata meta = content.getMetadata(); + try { + surt = surtKeyMaker.makeKey(url); + } catch (URISyntaxException e) { + LOG.error("Failed to make SURT for {}: {}", url, + StringUtils.stringifyException(e)); + return; + } + if (payloadDigest == null) { + // no content, e.g., revisit record + } else if (payloadDigest.startsWith("sha1:")) { + payloadDigest = payloadDigest.substring(5); + } + cdxOut.write(surt.getBytes(UTF_8)); + cdxOut.write(' '); + cdxOut.write(timestampFormat.format(date).getBytes(UTF_8)); + cdxOut.write(' '); + Map<String, String> data = new LinkedHashMap<String, String>(); + data.put("url", url); + if (revisit) { + data.put("mime", "warc/revisit"); + } else { + data.put("mime", cleanMimeType(meta.get("Content-Type"))); + data.put("mime-detected", content.getContentType()); + } + data.put("status", meta.get("HTTP-Status-Code")); + if (payloadDigest != null) { + data.put("digest", payloadDigest); + } + data.put("length", String.format("%d", length)); + data.put("offset", String.format("%d", offset)); + data.put("filename", warcFilename); + String val = meta.get("Detected-Charset"); + if (val != null) { + data.put("charset", val); + } + val = meta.get("Detected-Language"); + if (val != null) { + data.put("languages", val); + } + if (truncated != null) { + data.put("truncated", truncated); + } + if (lastMod != null) { + data.put("lastmod", lastMod); + } + if (redirectLocation != null) { + data.put("redirect", redirectLocation); + } + cdxOut.write(jsonWriter.writeValueAsBytes(data)); + cdxOut.write('\n'); + } + + protected static String cleanMimeType(String mime) { + if (mime == null) + return "unk"; + final char[] delimiters = { ';', ' ' }; + for (char delim : delimiters) { + int pos = mime.indexOf(delim); + if (pos > -1) + mime = mime.substring(0, pos); + } + if (mime.isEmpty()) + return "unk"; + return mime; + } + + protected static boolean isRedirect(int httpStatusCode) { + return httpStatusCode >= 300 && httpStatusCode < 400 + && httpStatusCode != 304; + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/nutch-cc/src/java/org/commoncrawl/util/WarcRecordWriter.java Mon Sep 23 16:35:22 2024 +0100 @@ -0,0 +1,773 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.commoncrawl.util; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.net.InetAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.text.NumberFormat; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.TimeZone; +import java.util.regex.Pattern; +import java.util.zip.GZIPOutputStream; + +import org.apache.commons.codec.binary.Base32; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.nutch.crawl.CrawlDatum; +import org.apache.nutch.metadata.Nutch; +import org.apache.nutch.net.protocols.HttpDateFormat; +import org.apache.nutch.net.protocols.Response; +import org.apache.nutch.protocol.ProtocolStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class WarcRecordWriter extends RecordWriter<Text, WarcCapture> { + + private static final Logger LOG = LoggerFactory + .getLogger(MethodHandles.lookup().lookupClass()); + + /** + * Holds duration of fetch in metadata, see + * {@link org.apache.nutch.protocol.http.api.HttpBase#RESPONSE_TIME} + */ + protected static final Text FETCH_DURATION = new Text("_rs_"); + public static final String CRLF = "\r\n"; + public static final String COLONSP = ": "; + protected static final Pattern PROBLEMATIC_HEADERS = Pattern + .compile("(?i)(?:Content-(?:Encoding|Length)|Transfer-Encoding)"); + protected static final String X_HIDE_HEADER = "X-Crawler-"; + public static final String WARC_WRITER_COUNTER_GROUP = "WARC-Writer"; + + private TaskAttemptContext context; + private DataOutputStream warcOut; + private WarcWriter warcWriter; + private DataOutputStream crawlDiagnosticsWarcOut; + private WarcWriter crawlDiagnosticsWarcWriter; + private DataOutputStream robotsTxtWarcOut; + private WarcWriter robotsTxtWarcWriter; + private DataOutputStream cdxOut; + private DataOutputStream crawlDiagnosticsCdxOut; + private DataOutputStream robotsTxtCdxOut; + private URI warcinfoId; + private URI crawlDiagnosticsWarcinfoId; + private URI robotsTxtWarcinfoId; + private MessageDigest sha1 = null; + private Base32 base32 = new Base32(); + private LanguageDetector langDetect; + private boolean generateCrawlDiagnostics; + private boolean generateRobotsTxt; + private boolean generateCdx; + private boolean deduplicate; + private boolean detectLanguage; + private String lastURL = ""; // for deduplication + + public WarcRecordWriter(Configuration conf, Path outputPath, int partition, + TaskAttemptContext context) throws IOException { + + this.context = context; + + FileSystem fs = outputPath.getFileSystem(conf); + + SimpleDateFormat fileDate = new SimpleDateFormat("yyyyMMddHHmmss", + Locale.US); + fileDate.setTimeZone(TimeZone.getTimeZone("GMT")); + + String prefix = conf.get("warc.export.prefix", "NUTCH-CRAWL"); + + /* + * WARC-Date : "The timestamp shall represent the instant that data capture + * for record creation began." + * (http://iipc.github.io/warc-specifications/specifications/warc-format/ + * warc-1.1/#warc-date-mandatory) + */ + String date = conf.get("warc.export.date", fileDate.format(new Date())); + String endDate = conf.get("warc.export.date.end", date); + Date captureStartDate = new Date(); + try { + captureStartDate = fileDate.parse(date); + } catch (ParseException e) { + LOG.error("Failed to parse warc.export.date {}: {}", date, + e.getMessage()); + } + + String hostname = conf.get("warc.export.hostname", getHostname()); + String filename = getFileName(prefix, date, endDate, hostname, partition); + + String publisher = conf.get("warc.export.publisher", null); + String operator = conf.get("warc.export.operator", null); + String software = conf.get("warc.export.software", "Apache Nutch"); + String isPartOf = conf.get("warc.export.isPartOf", null); + String description = conf.get("warc.export.description", null); + generateCrawlDiagnostics = conf.getBoolean("warc.export.crawldiagnostics", + false); + generateRobotsTxt = conf.getBoolean("warc.export.robotstxt", false); + generateCdx = conf.getBoolean("warc.export.cdx", false); + deduplicate = conf.getBoolean("warc.deduplicate", false); + detectLanguage = conf.getBoolean("warc.detect.language", false); + + Path warcPath = new Path(new Path(outputPath, "warc"), filename); + warcOut = fs.create(warcPath); + Path cdxPath = null; + if (generateCdx) { + cdxPath = new Path( + conf.get("warc.export.cdx.path", outputPath.toString())); + cdxOut = openCdxOutputStream(new Path(cdxPath, "warc"), filename, conf); + } + warcWriter = openWarcWriter(warcPath, warcOut, cdxOut); + warcinfoId = warcWriter.writeWarcinfoRecord(filename, hostname, publisher, + operator, software, isPartOf, description, captureStartDate); + + if (generateCrawlDiagnostics) { + Path crawlDiagnosticsWarcPath = new Path( + new Path(outputPath, "crawldiagnostics"), filename); + crawlDiagnosticsWarcOut = fs.create(crawlDiagnosticsWarcPath); + if (generateCdx) { + crawlDiagnosticsCdxOut = openCdxOutputStream( + new Path(cdxPath, "crawldiagnostics"), filename, conf); + } + crawlDiagnosticsWarcWriter = openWarcWriter(crawlDiagnosticsWarcPath, crawlDiagnosticsWarcOut,crawlDiagnosticsCdxOut); + crawlDiagnosticsWarcinfoId = crawlDiagnosticsWarcWriter + .writeWarcinfoRecord(filename, hostname, publisher, operator, + software, isPartOf, description, captureStartDate); + } + + if (generateRobotsTxt) { + Path robotsTxtWarcPath = new Path(new Path(outputPath, "robotstxt"), + filename); + robotsTxtWarcOut = fs.create(robotsTxtWarcPath); + if (generateCdx) { + robotsTxtCdxOut = openCdxOutputStream(new Path(cdxPath, "robotstxt"), + filename, conf); + } + robotsTxtWarcWriter = openWarcWriter(robotsTxtWarcPath, robotsTxtWarcOut, + robotsTxtCdxOut); + robotsTxtWarcinfoId = robotsTxtWarcWriter.writeWarcinfoRecord(filename, + hostname, publisher, operator, software, isPartOf, description, + captureStartDate); + } + + try { + sha1 = MessageDigest.getInstance("SHA1"); + } catch (NoSuchAlgorithmException e) { + LOG.error("Unable to instantiate SHA1 MessageDigest object"); + throw new RuntimeException(e); + } + + if (detectLanguage) { + boolean bestEffort = conf + .getBoolean("warc.detect.language.cld2.besteffort", false); + langDetect = new LanguageDetector(); + langDetect.setBestEffort(bestEffort); + } + } + + /** + * Compose a unique WARC file name. + * + * The WARC specification recommends: + * <code>Prefix-Timestamp-Serial-Crawlhost.warc.gz</code> (<a href= + * "http://iipc.github.io/warc-specifications/specifications/warc-format/ + * warc-1.1/#annex-c-informative-warc-file-size-and-name-recommendations">WARC + * 1.1, Annex C</a>) + * + * @param prefix + * WARC file name prefix + * @param startDate + * capture start date + * @param endDate + * capture end date + * @param hostname + * name of the crawling host + * @param partition + * MapReduce partition + * @return (unique) WARC file name + */ + protected String getFileName(String prefix, String startDate, String endDate, + String host, int partition) { + NumberFormat numberFormat = NumberFormat.getInstance(); + numberFormat.setMinimumIntegerDigits(5); + numberFormat.setGroupingUsed(false); + return prefix + "-" + startDate + "-" + endDate + "-" + + numberFormat.format(partition) + ".warc.gz"; + } + + protected String getSha1DigestWithAlg(byte[] bytes) { + sha1.reset(); + return "sha1:" + base32.encodeAsString(sha1.digest(bytes)); + } + + protected static String getStatusLine(String httpHeader) { + int eol = httpHeader.indexOf('\n'); + if (eol == -1) { + return httpHeader; + } + if (eol > 0 && httpHeader.charAt(eol - 1) == '\r') { + eol--; + } + return httpHeader.substring(0, eol); + } + + protected static int getStatusCode(String statusLine) { + int start = statusLine.indexOf(" "); + int end = statusLine.indexOf(" ", start + 1); + if (end == -1) + end = statusLine.length(); + int code = 200; + try { + code = Integer.parseInt(statusLine.substring(start + 1, end)); + } catch (NumberFormatException e) { + } + return code; + } + + /** Format status line and pair-wise list of headers as string */ + public static String formatHttpHeaders(String statusLine, List<String> headers) { + StringBuilder sb = new StringBuilder(); + sb.append(statusLine).append(CRLF); + Iterator<String> it = headers.iterator(); + while (it.hasNext()) { + String name = it.next(); + if (!it.hasNext()) { + // no value for name + break; + } + String value = it.next(); + sb.append(name).append(COLONSP).append(value).append(CRLF); + } + sb.append(CRLF); + return sb.toString(); + } + + /** + * Modify verbatim HTTP response headers: fix, remove or replace headers + * <code>Content-Length</code>, <code>Content-Encoding</code> and + * <code>Transfer-Encoding</code> which may confuse WARC readers. Ensure that + * returned header end with a single empty line (<code>\r\n\r\n</code>). + * + * @param headers + * HTTP 1.1 or 1.0 response header string, CR-LF-separated lines, + * first line is status line + * @return safe HTTP response header + */ + public static String fixHttpHeaders(String headers, int contentLength) { + int start = 0, lineEnd = 0, last = 0, trailingCrLf= 0; + boolean hasContentLength = false; + StringBuilder replace = new StringBuilder(); + while (start < headers.length()) { + lineEnd = headers.indexOf(CRLF, start); + trailingCrLf = 1; + if (lineEnd == -1) { + lineEnd = headers.length(); + trailingCrLf = 0; + } + int colonPos = -1; + for (int i = start; i < lineEnd; i++) { + if (headers.charAt(i) == ':') { + colonPos = i; + break; + } + } + if (colonPos == -1) { + boolean valid = true; + if (start == 0) { + // status line (without colon) + // TODO: http/2 + } else if ((lineEnd + 4) == headers.length() + && headers.endsWith(CRLF + CRLF)) { + // ok, trailing empty line + trailingCrLf = 2; + } else if (start == lineEnd) { + // skip/remove empty line + LOG.debug("Skipping empty header line"); + valid = false; + } else { + LOG.warn("Invalid header line: {}", + headers.substring(start, lineEnd)); + valid = false; + } + if (!valid) { + if (last < start) { + replace.append(headers.substring(last, start)); + } + last = lineEnd + 2 * trailingCrLf; + } + start = lineEnd + 2 * trailingCrLf; + /* + * skip over invalid header line, no further check for problematic + * headers required + */ + continue; + } + String name = headers.substring(start, colonPos); + if (PROBLEMATIC_HEADERS.matcher(name).matches()) { + boolean needsFix = true; + if (name.equalsIgnoreCase("content-length")) { + hasContentLength = true; + String value = headers.substring(colonPos + 1, lineEnd).trim(); + try { + int l = Integer.parseInt(value); + if (l == contentLength) { + needsFix = false; + } + } catch (NumberFormatException e) { + // needs to be fixed + } + } + if (needsFix) { + if (last < start) { + replace.append(headers.substring(last, start)); + } + last = lineEnd + 2 * trailingCrLf; + replace.append(X_HIDE_HEADER) + .append(headers.substring(start, lineEnd + 2 * trailingCrLf)); + if (trailingCrLf == 0) { + replace.append(CRLF); + trailingCrLf = 1; + } + if (name.equalsIgnoreCase("content-length")) { + // add effective uncompressed and unchunked length of content + replace.append("Content-Length").append(COLONSP) + .append(contentLength).append(CRLF); + } + } + } + start = lineEnd + 2 * trailingCrLf; + } + if (last > 0 || trailingCrLf != 2 || !hasContentLength) { + if (last < headers.length()) { + // append trailing headers + replace.append(headers.substring(last)); + } + if (!hasContentLength) { + replace.append("Content-Length").append(COLONSP).append(contentLength) + .append(CRLF); + } + while (trailingCrLf < 2) { + replace.append(CRLF); + trailingCrLf++; + } + return replace.toString(); + } + return headers; + } + + protected static String getHostname() { + try { + return InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + LOG.warn("Failed to get hostname: {}", e.getMessage()); + } + return "localhost"; + } + + private WarcWriter openWarcWriter(Path warcPath, DataOutputStream warcOut, + DataOutputStream cdxOut) { + if (cdxOut != null) { + return new WarcCdxWriter(warcOut, cdxOut, warcPath); + } + return new WarcWriter(warcOut); + } + + protected static DataOutputStream openCdxOutputStream(Path cdxPath, + String warcFilename, Configuration conf) throws IOException { + String cdxFilename = warcFilename.replaceFirst("\\.warc\\.gz$", ".cdx.gz"); + Path cdxFile = new Path(cdxPath, cdxFilename); + FileSystem fs = cdxPath.getFileSystem(conf); + return new DataOutputStream(new GZIPOutputStream(fs.create(cdxFile))); + } + + @Override + public synchronized void write(Text key, WarcCapture value) + throws IOException { + URI targetUri; + + String url = value.url.toString(); + try { + targetUri = new URI(url); + } catch (URISyntaxException e) { + LOG.error("Cannot write WARC record, invalid URI: {}", url); + return; + } + + if (value.content == null) { + String reason = ""; + if (value.datum != null) { + ProtocolStatus pstatus = (ProtocolStatus) value.datum.getMetaData() + .get(Nutch.WRITABLE_PROTO_STATUS_KEY); + if (pstatus != null) { + reason = ": " + pstatus.getName() + " - " + pstatus.getMessage(); + } + } + LOG.warn("Cannot write WARC record, no content for {}{}", value.url, + reason); + return; + } + + if (deduplicate) { + if (lastURL.equals(url)) { + LOG.info("Skipping duplicate record: {}", value.url); + return; + } + lastURL = url; + } + + String ip = "0.0.0.0"; + Date date = null; + boolean notModified = false; + Date lastModifiedDate = null; + String verbatimResponseHeaders = null; + String verbatimRequestHeaders = null; + List<String> headers = new ArrayList<>(); + String responseHeaders = null; + String statusLine = ""; + int httpStatusCode = 200; + String fetchDuration = null; + String truncatedReason = null; + String lastModifiedHeader = null; + + if (value.datum != null) { + date = new Date(value.datum.getFetchTime()); + // This is for older crawl dbs that don't include the verbatim status + // line in the metadata + ProtocolStatus pstatus = (ProtocolStatus) value.datum.getMetaData() + .get(Nutch.WRITABLE_PROTO_STATUS_KEY); + if (pstatus == null) { + LOG.warn("Cannot write WARC record, no protocol status for {}", + value.url); + return; + } + switch (pstatus.getCode()) { + case ProtocolStatus.SUCCESS: + statusLine = "HTTP/1.1 200 OK"; + httpStatusCode = 200; + break; + case ProtocolStatus.TEMP_MOVED: + statusLine = "HTTP/1.1 302 Found"; + httpStatusCode = 302; + break; + case ProtocolStatus.MOVED: + statusLine = "HTTP/1.1 301 Moved Permanently"; + httpStatusCode = 301; + break; + case ProtocolStatus.NOTMODIFIED: + statusLine = "HTTP/1.1 304 Not Modified"; + httpStatusCode = 304; + notModified = true; + long modifiedTime = value.datum.getModifiedTime(); + if (modifiedTime > 0) { + lastModifiedDate = new Date(modifiedTime); + } + break; + default: + if (value.content.getMetadata() + .get(Response.RESPONSE_HEADERS) == null) { + LOG.warn("Unknown or ambiguous protocol status: {}", pstatus); + return; + } + } + String httpStatusCodeVal = value.datum.getMetaData() + .get(Nutch.PROTOCOL_STATUS_CODE_KEY).toString(); + if (httpStatusCodeVal != null) { + try { + httpStatusCode = Integer.parseInt(httpStatusCodeVal); + } catch (NumberFormatException e) { + } + } + if (value.datum.getMetaData().get(FETCH_DURATION) != null) { + fetchDuration = value.datum.getMetaData().get(FETCH_DURATION) + .toString(); + } + } else { + // robots.txt, no CrawlDatum available + String fetchTime = value.content.getMetadata().get(Nutch.FETCH_TIME_KEY); + if (fetchTime != null) { + try { + date = new Date(Long.parseLong(fetchTime)); + } catch (NumberFormatException e) { + LOG.error("Invalid fetch time '{}' in content metadata of {}", + fetchTime, value.url.toString()); + } + } + if (date == null) { + String httpDate = value.content.getMetadata().get("Date"); + if (httpDate != null) { + try { + date = HttpDateFormat.toDate(httpDate); + } catch (ParseException e) { + LOG.warn("Failed to parse HTTP Date {} for {}", httpDate, + targetUri); + date = new Date(); + } + } else { + LOG.warn("No HTTP Date for {}", targetUri); + date = new Date(); + } + } + // status is taken from header + } + + boolean useVerbatimResponseHeaders = false; + + for (String name : value.content.getMetadata().names()) { + String val = value.content.getMetadata().get(name); + switch (name) { + case Response.IP_ADDRESS: + ip = val; + break; + case Response.REQUEST: + verbatimRequestHeaders = val; + break; + case Response.RESPONSE_HEADERS: + verbatimResponseHeaders = val; + if (verbatimResponseHeaders.contains(CRLF)) { + useVerbatimResponseHeaders = true; + } + statusLine = getStatusLine(verbatimResponseHeaders); + httpStatusCode = getStatusCode(statusLine); + break; + case Response.TRUNCATED_CONTENT_REASON: + truncatedReason = val; + break; + case Response.LAST_MODIFIED: + lastModifiedHeader = val; + break; + case Nutch.SEGMENT_NAME_KEY: + case Nutch.FETCH_STATUS_KEY: + case Nutch.SCORE_KEY: + case Nutch.SIGNATURE_KEY: + case Response.FETCH_TIME: + break; // ignore, not required for WARC record + default: + // We have to fix up a few headers because we don't have the raw + // responses to avoid that WARC readers try to read the content + // as chunked or gzip-compressed. + if (name.equalsIgnoreCase(Response.CONTENT_LENGTH)) { + int origContentLength = -1; + try { + origContentLength = Integer.parseInt(val); + } catch (NumberFormatException e) { + // ignore + } + headers.add(Response.CONTENT_LENGTH); + if (origContentLength != value.content.getContent().length) { + headers.add("" + value.content.getContent().length); + headers.add(X_HIDE_HEADER + Response.CONTENT_LENGTH); + } + } else if (name.equalsIgnoreCase(Response.CONTENT_ENCODING)) { + if (val.equalsIgnoreCase("identity")) { + headers.add(name); + } else { + headers.add(X_HIDE_HEADER + Response.CONTENT_ENCODING); + } + } else if (name.equalsIgnoreCase(Response.TRANSFER_ENCODING)) { + if (val.equalsIgnoreCase("identity")) { + headers.add(name); + } else { + headers.add(X_HIDE_HEADER + Response.TRANSFER_ENCODING); + } + } else { + headers.add(name); + } + headers.add(val); + } + } + + if (verbatimRequestHeaders == null) { + LOG.error("No request headers for {}", url); + } + + if (useVerbatimResponseHeaders && verbatimResponseHeaders != null) { + responseHeaders = fixHttpHeaders(verbatimResponseHeaders, value.content.getContent().length); + } else { + responseHeaders = formatHttpHeaders(statusLine, headers); + } + + WarcWriter writer = warcWriter; + URI infoId = this.warcinfoId; + if (value.datum == null) { + // no CrawlDatum: must be a robots.txt + if (!generateRobotsTxt) + return; + writer = robotsTxtWarcWriter; + infoId = robotsTxtWarcinfoId; + } else if (value.datum.getStatus() != CrawlDatum.STATUS_FETCH_SUCCESS) { + if (!generateCrawlDiagnostics) + return; + writer = crawlDiagnosticsWarcWriter; + infoId = crawlDiagnosticsWarcinfoId; + } + + LOG.info("WARC {} record {}", (notModified ? "revisit" : "response"), + targetUri); + + URI requestId = null; + if (verbatimRequestHeaders != null) { + requestId = writer.writeWarcRequestRecord(targetUri, ip, date, infoId, + verbatimRequestHeaders.getBytes(StandardCharsets.UTF_8)); + } + + if (generateCdx) { + value.content.getMetadata().add("HTTP-Status-Code", + String.format("%d", httpStatusCode)); + } + + LanguageDetector.Result ldres = null; + if (detectLanguage && writer == warcWriter) { + // detect language only for successfully fetched primary documents + ldres = langDetect.detectLanguage(targetUri, value.content); + if (ldres.errorReason != null) { + context.getCounter(WARC_WRITER_COUNTER_GROUP, + "language detection: " + ldres.errorStatus.name).increment(1); + } else if (ldres.languages == null) { + context.getCounter(WARC_WRITER_COUNTER_GROUP, + "language detection: no result").increment(1); + } else if (ldres.languages.isReliable()) { + context.getCounter(WARC_WRITER_COUNTER_GROUP, + "language detection: reliable").increment(1); + } else { + context.getCounter(WARC_WRITER_COUNTER_GROUP, + "language detection: not reliable").increment(1); + } + if (generateCdx) { + if (ldres.charset != null) { + value.content.getMetadata().add("Detected-Charset", + ldres.charset.name()); + } + org.commoncrawl.langdetect.cld2.Result lr = ldres.languages; + if (lr != null) { + String codes = lr.getLanguageCodesISO639_3(",", true); + if (codes != null && !codes.isEmpty()) { + value.content.getMetadata().add("Detected-Language", codes); + } + } + } + } + + if (notModified) { + /* + * revisit record of profile WarcWriter.PROFILE_REVISIT_NOT_MODIFIED + * + * Note: "revisits" identified by signature comparison + * (WarcWriter.PROFILE_REVISIT_IDENTICAL_DIGEST) are stored as response + * records. + * + * The modified date of the CrawlDatum is the date of the last successful + * fetch with content (status 200). It is uses for the + * WARC-Refers-To-Date. + */ + byte[] responseHeaderBytes = responseHeaders + .getBytes(StandardCharsets.UTF_8); + String blockDigest = getSha1DigestWithAlg(responseHeaderBytes); + /* + * HTTP 304 not-modified responses do not have a payload, should not add a + * digest for it according to the WARC specification: + * "The WARC-Payload-Digest field ... shall not be used on records without + * a well-defined payload." + */ + String payloadDigest = null; + writer.writeWarcRevisitRecord(targetUri, ip, httpStatusCode, date, infoId, + requestId, WarcWriter.PROFILE_REVISIT_NOT_MODIFIED, lastModifiedDate, + payloadDigest, blockDigest, responseHeaderBytes, value.content); + } else { + StringBuilder responsesb = new StringBuilder(4096); + responsesb.append(responseHeaders); + + byte[] responseHeaderBytes = responsesb.toString() + .getBytes(StandardCharsets.UTF_8); + byte[] responseBytes = new byte[responseHeaderBytes.length + + value.content.getContent().length]; + System.arraycopy(responseHeaderBytes, 0, responseBytes, 0, + responseHeaderBytes.length); + System.arraycopy(value.content.getContent(), 0, responseBytes, + responseHeaderBytes.length, value.content.getContent().length); + + String payloadDigest = getSha1DigestWithAlg(value.content.getContent()); + String blockDigest = getSha1DigestWithAlg(responseBytes); + URI responseId = writer.writeWarcResponseRecord(targetUri, ip, + httpStatusCode, date, infoId, requestId, payloadDigest, blockDigest, + truncatedReason, lastModifiedHeader, responseBytes, value.content); + + // Write metadata record + StringBuilder metadatasb = new StringBuilder(4096); + Map<String, String> metadata = new LinkedHashMap<String, String>(); + + if (fetchDuration != null) { + metadata.put("fetchTimeMs", fetchDuration); + } + if (ldres != null) { + if (ldres.charset != null) { + metadata.put("charset-detected", ldres.charset.name()); + } + if (ldres.languages != null) { + metadata.put("languages-cld2", ldres.languages.toJSON()); + } + } + if (metadata.size() > 0) { + for (Map.Entry<String, String> entry : metadata.entrySet()) { + metadatasb.append(entry.getKey()).append(COLONSP) + .append(entry.getValue()).append(CRLF); + } + metadatasb.append(CRLF); + + writer.writeWarcMetadataRecord(targetUri, date, infoId, responseId, + null, metadatasb.toString().getBytes(StandardCharsets.UTF_8)); + } + } + } + + @Override + public synchronized void close(TaskAttemptContext context) + throws IOException { + context.setStatus("closing WARC output writers"); + warcOut.close(); + if (generateCrawlDiagnostics) { + crawlDiagnosticsWarcOut.close(); + } + if (generateRobotsTxt) { + robotsTxtWarcOut.close(); + } + if (generateCdx) { + cdxOut.close(); + if (generateCrawlDiagnostics) { + crawlDiagnosticsCdxOut.close(); + } + if (generateRobotsTxt) { + robotsTxtCdxOut.close(); + } + } + } +} \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/nutch-cc/src/test/org/commoncrawl/util/TestWarcCdxWriter.java Mon Sep 23 16:35:22 2024 +0100 @@ -0,0 +1,366 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.commoncrawl.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configuration.IntegerRanges; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapred.Counters.Counter; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.security.Credentials; + +import org.apache.nutch.crawl.CrawlDatum; +import org.apache.nutch.crawl.CrawlDbReducer; +import org.apache.nutch.util.NutchConfiguration; +import org.apache.nutch.util.NutchJob; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; + +import org.junit.Test; + +public class TestWarcCdxWriter { + + private static CrawlDbReducer reducer = new CrawlDbReducer(); + + private static class DummyContext extends Reducer<Text, CrawlDatum, Text, CrawlDatum>.Context { + + private Configuration conf; + + private DummyContext() { + reducer.super(); + conf = new Configuration(); + } + + private List<CrawlDatum> values = new ArrayList<CrawlDatum>(); + + @Override + public void write(Text key, CrawlDatum value) throws IOException, InterruptedException { + values.add(value); + } + + /** collected values as List */ + public List<CrawlDatum> getValues() { + return values; + } + + /** Obtain current collected value from List */ + @Override + public CrawlDatum getCurrentValue() throws UnsupportedOperationException { + throw new UnsupportedOperationException("Dummy context"); + } + + /** Obtain current collected key from List */ + @Override + public Text getCurrentKey() throws UnsupportedOperationException { + throw new UnsupportedOperationException("Dummy context with no keys"); + } + + private Counters dummyCounters = new Counters(); + + public void progress() { + } + + public Counter getCounter(Enum<?> arg0) { + return dummyCounters.getGroup("dummy").getCounterForName("dummy"); + } + + public Counter getCounter(String arg0, String arg1) { + return dummyCounters.getGroup("dummy").getCounterForName("dummy"); + } + + public void setStatus(String arg0) throws UnsupportedOperationException { + throw new UnsupportedOperationException("Dummy context with no status"); + } + + @Override + public String getStatus() throws UnsupportedOperationException { + throw new UnsupportedOperationException("Dummy context with no status"); + } + + public float getProgress() { + return 1f; + } + + public OutputCommitter getOutputCommitter() { + throw new UnsupportedOperationException("Dummy context without committer"); + } + + public boolean nextKey(){ + return false; + } + + @Override + public boolean nextKeyValue(){ + return false; + } + + @Override + public TaskAttemptID getTaskAttemptID() throws UnsupportedOperationException { + throw new UnsupportedOperationException("Dummy context without TaskAttemptID"); + } + + @Override + public Path[] getArchiveClassPaths() { + return null; + } + + @Override + public String[] getArchiveTimestamps() { + return null; + } + + @Override + public URI[] getCacheArchives() throws IOException { + return null; + } + + @Override + public URI[] getCacheFiles() throws IOException { + return null; + } + + @Override + public Class<? extends Reducer<?, ?, ?, ?>> getCombinerClass() throws ClassNotFoundException { + return null; + } + + @Override + public RawComparator<?> getCombinerKeyGroupingComparator() { + return null; + } + + @Override + public Configuration getConfiguration() { + return conf; + } + + @Override + public Credentials getCredentials() { + return null; + } + + @Override + public Path[] getFileClassPaths() { + return null; + } + + @Override + public String[] getFileTimestamps() { + return null; + } + + @Override + public RawComparator<?> getGroupingComparator() { + return null; + } + + @Override + public Class<? extends InputFormat<?, ?>> getInputFormatClass() throws ClassNotFoundException { + return null; + } + + @Override + public String getJar() { + return null; + } + + @Override + public JobID getJobID() { + return null; + } + + @Override + public String getJobName() { + return null; + } + + @Override + public boolean getJobSetupCleanupNeeded() { + return false; + } + + @Override + @Deprecated + public Path[] getLocalCacheArchives() throws IOException { + return null; + } + + @Override + @Deprecated + public Path[] getLocalCacheFiles() throws IOException { + return null; + } + + @Override + public Class<?> getMapOutputKeyClass() { + return null; + } + + @Override + public Class<?> getMapOutputValueClass() { + return null; + } + + @Override + public Class<? extends Mapper<?, ?, ?, ?>> getMapperClass() throws ClassNotFoundException { + return null; + } + + @Override + public int getMaxMapAttempts() { + return 0; + } + + @Override + public int getMaxReduceAttempts() { + return 0; + } + + @Override + public int getNumReduceTasks() { + return 0; + } + + @Override + public Class<? extends OutputFormat<?, ?>> getOutputFormatClass() throws ClassNotFoundException { + return null; + } + + @Override + public Class<?> getOutputKeyClass() { + return null; + } + + @Override + public Class<?> getOutputValueClass() { + return null; + } + + @Override + public Class<? extends Partitioner<?, ?>> getPartitionerClass() throws ClassNotFoundException { + return null; + } + + @Override + public boolean getProfileEnabled() { + return false; + } + + @Override + public String getProfileParams() { + return null; + } + + @Override + public IntegerRanges getProfileTaskRange(boolean arg0) { + return null; + } + + @Override + public Class<? extends Reducer<?, ?, ?, ?>> getReducerClass() throws ClassNotFoundException { + return null; + } + + @Override + public RawComparator<?> getSortComparator() { + return null; + } + + @Override + @Deprecated + public boolean getSymlink() { + return false; + } + + @Override + public boolean getTaskCleanupNeeded() { + return false; + } + + @Override + public String getUser() { + return null; + } + + @Override + public Path getWorkingDirectory() throws IOException { + return null; + } + + } + + public final static String statusLine1 = "HTTP/1.1 200 OK"; + public final static String testHeaders1[] = { // + "Content-Type", "text/html", // + "Accept-Ranges", "bytes", // + "Content-Encoding", "gzip", // + "Vary", "Accept-Encoding", "Server", + "Apache/2.0.63 (Unix) PHP/4.4.7 mod_ssl/2.0.63 OpenSSL/0.9.7e mod_fastcgi/2.4.2 DAV/2 SVN/1.4.2", + "Last-Modified", "Thu, 15 Jan 2009 00:02:29 GMT", "ETag", + "\"1262d9e-3ffa-2c19af40\"", // + "Date", "Mon, 26 Jan 2009 10:00:40 GMT", // + "Content-Length", "16378", // + "Connection", "close" }; + public final static String testHeaderString1; + static { + StringBuilder headers = new StringBuilder(); + headers.append(statusLine1).append(WarcRecordWriter.CRLF); + for (int i = 0; i < testHeaders1.length; i += 2) { + headers.append(testHeaders1[i]).append(WarcRecordWriter.COLONSP); + headers.append(testHeaders1[i+1]).append(WarcRecordWriter.CRLF); + } + headers.append(WarcRecordWriter.CRLF); + testHeaderString1 = headers.toString(); + } + + @Test + public void testLastMod() throws IOException { + Configuration config = NutchConfiguration.create(); + Job job = NutchJob.getInstance(config); + TaskAttemptContext context = new DummyContext(); + config = job.getConfiguration(); + OutputStream devnull = OutputStream.nullOutputStream(); + config.setBoolean("warc.export.cdx", true); + WarcRecordWriter wrw = new WarcRecordWriter(config, new Path("/tmp"), + 123, context); + + //ByteArrayOutputStream baos = new ByteArrayOutputStream(); + //WarcCdxWriter cdx = new WarcCdxWriter(devnull, baos, new Path("/tmp")); + + } +}