diff options
| author | Karsten Loesing <karsten.loesing@gmx.net> | 2019-12-11 12:22:40 +0100 |
|---|---|---|
| committer | Karsten Loesing <karsten.loesing@gmx.net> | 2019-12-13 10:47:57 +0100 |
| commit | d7117f8c8ee946748eea4d2f2741195d4dbfe056 (patch) | |
| tree | 4642fa2496210b709792f716759605008ec7aa62 | |
| parent | d5ca95a2bb74410004f5c4c93270f3fd90475068 (diff) | |
Avoid reprocessing webstats files.
Web servers typically provide us with the last 14 days of request
logs. We shouldn't process the whole 14 days over and over. Instead we
should only process new logs files and any other log files containing
log lines from newly written dates.
In some cases web servers stop serving a given virtual host or stop
acting as web server at all. However, in these cases we're left with
14 days of logs per virtual host. Ideally, these logs would get
cleaned up, but until that's the case, we should at least not
reprocess these files over and over.
In order to avoid reprocessing webstats files, we need a new state
file with log dates contained in given input files. We use that state
file to determine which of the previously processed webstats files to
re-process, so that we can write complete daily logs.
| -rw-r--r-- | CHANGELOG.md | 1 | ||||
| -rw-r--r-- | src/main/java/org/torproject/metrics/collector/webstats/LogMetadata.java | 17 | ||||
| -rw-r--r-- | src/main/java/org/torproject/metrics/collector/webstats/SanitizeWeblogs.java | 202 |
3 files changed, 181 insertions, 39 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 73abdea..fe7937c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ * Medium changes - Give up on periodically checking the configuration file for updates and reloading it in case of changes. + - Avoid reprocessing webstats files. * Minor changes - Remove dependency on metrics-lib's internal package. diff --git a/src/main/java/org/torproject/metrics/collector/webstats/LogMetadata.java b/src/main/java/org/torproject/metrics/collector/webstats/LogMetadata.java index 879e8d7..b30c13a 100644 --- a/src/main/java/org/torproject/metrics/collector/webstats/LogMetadata.java +++ b/src/main/java/org/torproject/metrics/collector/webstats/LogMetadata.java @@ -81,5 +81,22 @@ public class LogMetadata { } return Optional.ofNullable(metadata); } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (other == null || getClass() != other.getClass()) { + return false; + } + LogMetadata that = (LogMetadata) other; + return path.toString().equals(that.path.toString()); + } + + @Override + public int hashCode() { + return path.hashCode(); + } } diff --git a/src/main/java/org/torproject/metrics/collector/webstats/SanitizeWeblogs.java b/src/main/java/org/torproject/metrics/collector/webstats/SanitizeWeblogs.java index fc7c64f..019fe66 100644 --- a/src/main/java/org/torproject/metrics/collector/webstats/SanitizeWeblogs.java +++ b/src/main/java/org/torproject/metrics/collector/webstats/SanitizeWeblogs.java @@ -17,28 +17,30 @@ import org.torproject.metrics.collector.conf.Key; import org.torproject.metrics.collector.conf.SourceType; import org.torproject.metrics.collector.cron.CollecTorMain; import org.torproject.metrics.collector.persist.PersistenceUtils; -import org.torproject.metrics.collector.persist.WebServerAccessLogPersistence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.ByteArrayOutputStream; -import java.io.File; +import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStream; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; import java.time.LocalDate; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.SortedSet; -import java.util.StringJoiner; import java.util.TreeMap; import java.util.TreeSet; import java.util.function.UnaryOperator; @@ -56,8 +58,10 @@ public class SanitizeWeblogs extends CollecTorMain { private static final int LIMIT = 2; private static final String WEBSTATS = "webstats"; - private String outputPathName; - private String recentPathName; + private Path outputDirectory; + private Path recentDirectory; + private Path processedWebstatsFile; + private boolean limits; /** @@ -84,14 +88,24 @@ public class SanitizeWeblogs extends CollecTorMain { try { Files.createDirectories(this.config.getPath(Key.OutputPath)); Files.createDirectories(this.config.getPath(Key.RecentPath)); - this.outputPathName = this.config.getPath(Key.OutputPath).toString(); - this.recentPathName = this.config.getPath(Key.RecentPath).toString(); + Files.createDirectories(this.config.getPath(Key.StatsPath)); + this.outputDirectory = this.config.getPath(Key.OutputPath) + .resolve(WEBSTATS); + this.recentDirectory = this.config.getPath(Key.RecentPath) + .resolve(WEBSTATS); + this.processedWebstatsFile = this.config.getPath(Key.StatsPath) + .resolve("processed-webstats"); this.limits = this.config.getBool(Key.WebstatsLimits); Set<SourceType> sources = this.config.getSourceTypeSet( Key.WebstatsSources); if (sources.contains(SourceType.Local)) { log.info("Processing logs using batch value {}.", BATCH); - findCleanWrite(this.config.getPath(Key.WebstatsLocalOrigins)); + Map<LogMetadata, Set<LocalDate>> previouslyProcessedWebstats + = this.readProcessedWebstats(); + Map<LogMetadata, Set<LocalDate>> newlyProcessedWebstats + = this.findCleanWrite(this.config.getPath(Key.WebstatsLocalOrigins), + previouslyProcessedWebstats); + this.writeProcessedWebstats(newlyProcessedWebstats); long cutOffMillis = System.currentTimeMillis() - 3L * 24L * 60L * 60L * 1000L; PersistenceUtils.cleanDirectory(this.config.getPath(Key.RecentPath), @@ -103,7 +117,32 @@ public class SanitizeWeblogs extends CollecTorMain { } } - private void findCleanWrite(Path dir) { + private Map<LogMetadata, Set<LocalDate>> readProcessedWebstats() { + Map<LogMetadata, Set<LocalDate>> processedWebstats = new HashMap<>(); + if (Files.exists(this.processedWebstatsFile)) { + try { + for (String line : Files.readAllLines(this.processedWebstatsFile)) { + String[] lineParts = line.split(",", 2); + Optional<LogMetadata> logMetadata + = LogMetadata.create(Paths.get(lineParts[1])); + if (logMetadata.isPresent()) { + processedWebstats.putIfAbsent(logMetadata.get(), new HashSet<>()); + LocalDate containedLogDate = LocalDate.parse(lineParts[0]); + processedWebstats.get(logMetadata.get()).add(containedLogDate); + } + } + } catch (IOException e) { + log.error("Cannot read state file {}.", this.processedWebstatsFile, e); + } + log.debug("Read state file containing {} log files.", + processedWebstats.size()); + } + return processedWebstats; + } + + private Map<LogMetadata, Set<LocalDate>> findCleanWrite(Path dir, + Map<LogMetadata, Set<LocalDate>> previouslyProcessedWebstats) { + Map<LogMetadata, Set<LocalDate>> newlyProcessedWebstats = new HashMap<>(); LogFileMap fileMapIn = new LogFileMap(dir); log.info("Found log files for {} virtual hosts.", fileMapIn.size()); for (Map.Entry<String,TreeMap<String,TreeMap<LocalDate,LogMetadata>>> @@ -113,48 +152,113 @@ public class SanitizeWeblogs extends CollecTorMain { : virtualEntry.getValue().entrySet()) { String physicalHost = physicalEntry.getKey(); log.info("Processing logs for {} on {}.", virtualHost, physicalHost); - Map<LocalDate, Map<String, Long>> linesByDate - = physicalEntry.getValue().values().stream().parallel() - .flatMap(metadata -> sanitzedLineStream(metadata).entrySet() - .stream()) - .collect(groupingBy(Map.Entry::getKey, - reducing(Collections.emptyMap(), Map.Entry::getValue, - (e1, e2) -> Stream.concat(e1.entrySet().stream(), e2.entrySet() - .stream()) - .collect(groupingByConcurrent(Map.Entry::getKey, - summingLong(Map.Entry::getValue)))))); - LocalDate[] interval = determineInterval(linesByDate.keySet()); - linesByDate.entrySet().stream() - .filter((entry) -> entry.getKey().isAfter(interval[0]) - && entry.getKey().isBefore(interval[1])).parallel() + /* Go through current input log files for given virtual and physical + * host, and either look up contained log dates from the last execution, + * or parse files to memory now. */ + Map<LocalDate, Map<String, Long>> sanitizedLinesByDate + = new HashMap<>(); + Set<LogMetadata> previouslyReadFiles = new HashSet<>(); + for (LogMetadata logMetadata : physicalEntry.getValue().values()) { + Set<LocalDate> containedLogDates; + if (previouslyProcessedWebstats.containsKey(logMetadata)) { + containedLogDates = previouslyProcessedWebstats.get(logMetadata); + for (LocalDate date : containedLogDates) { + sanitizedLinesByDate.putIfAbsent(date, new TreeMap<>()); + } + previouslyReadFiles.add(logMetadata); + } else { + containedLogDates = sanitizeWebstatsLog(sanitizedLinesByDate, + logMetadata); + } + newlyProcessedWebstats.put(logMetadata, containedLogDates); + } + /* Determine log dates that are safe to be written to disk now and that + * we didn't write to disk before. */ + Set<LocalDate> storeDates = new HashSet<>(); + LocalDate[] interval = determineInterval(sanitizedLinesByDate.keySet()); + for (LocalDate newDate : sanitizedLinesByDate.keySet()) { + if (newDate.isAfter(interval[0]) && newDate.isBefore(interval[1])) { + Path outputPath = this.calculateOutputPath(virtualHost, + physicalHost, newDate); + if (!Files.exists(outputPath)) { + storeDates.add(newDate); + } + } + } + /* Reprocess previously read files containing log dates that we're going + * to write to disk below. */ + for (LogMetadata previouslyReadFile : previouslyReadFiles) { + if (!Collections.disjoint(storeDates, + newlyProcessedWebstats.get(previouslyReadFile))) { + sanitizeWebstatsLog(sanitizedLinesByDate, previouslyReadFile); + } + } + /* Write sanitized log files to disk. */ + sanitizedLinesByDate.entrySet().stream() + .filter((entry) -> storeDates.contains(entry.getKey())).parallel() .forEach((entry) -> storeSortedAndForget(virtualHost, physicalHost, entry.getKey(), entry.getValue())); } } + return newlyProcessedWebstats; + } + + private Set<LocalDate> sanitizeWebstatsLog( + Map<LocalDate, Map<String, Long>> sanitizedLinesByDate, + LogMetadata logFile) { + Map<LocalDate, Map<String, Long>> newlySanitizedLinesByDate + = sanitzedLineStream(logFile); + for (Map.Entry<LocalDate, Map<String, Long>> e + : newlySanitizedLinesByDate.entrySet()) { + sanitizedLinesByDate.putIfAbsent(e.getKey(), new TreeMap<>()); + Map<String, Long> newlySanitizedLines + = sanitizedLinesByDate.get(e.getKey()); + for (Map.Entry<String, Long> e1 : e.getValue().entrySet()) { + newlySanitizedLines.put(e1.getKey(), + newlySanitizedLines.getOrDefault(e1.getKey(), 0L) + e1.getValue()); + } + } + return newlySanitizedLinesByDate.keySet(); + } + + private static DateTimeFormatter yearPattern + = DateTimeFormatter.ofPattern("yyyy"); + private static DateTimeFormatter monthPattern + = DateTimeFormatter.ofPattern("MM"); + private static DateTimeFormatter dayPattern + = DateTimeFormatter.ofPattern("dd"); + private static DateTimeFormatter datePattern + = DateTimeFormatter.BASIC_ISO_DATE; + + private Path calculateOutputPath(String virtualHost, String physicalHost, + LocalDate logDate) { + return this.outputDirectory.resolve(Paths.get(virtualHost, + logDate.format(yearPattern), logDate.format(monthPattern), + logDate.format(dayPattern), String.format("%s_%s_access.log_%s.xz", + virtualHost, physicalHost, logDate.format(datePattern)))); + } + + private Path calculateRecentPath(String virtualHost, String physicalHost, + LocalDate logDate) { + return this.recentDirectory.resolve( + Paths.get(String.format("%s_%s_access.log_%s.xz", virtualHost, + physicalHost, logDate.format(datePattern)))); } private void storeSortedAndForget(String virtualHost, String physicalHost, LocalDate date, Map<String, Long> lineCounts) { - String name = new StringJoiner(WebServerAccessLogImpl.SEP) - .add(virtualHost).add(physicalHost) - .add(WebServerAccessLogImpl.MARKER) - .add(date.format(DateTimeFormatter.BASIC_ISO_DATE)) - .toString() + "." + FileType.XZ.name().toLowerCase(); - log.debug("Storing {}.", name); Map<String, Long> retainedLines = new TreeMap<>(lineCounts); lineCounts.clear(); // not needed anymore try { - WebServerAccessLogPersistence walp - = new WebServerAccessLogPersistence( - new WebServerAccessLogImpl(toCompressedBytes(retainedLines), - new File(name), name)); - log.debug("Storing {}.", name); - walp.storeOut(this.outputPathName); - walp.storeRecent(this.recentPathName); - } catch (DescriptorParseException dpe) { - log.error("Cannot store log desriptor {}.", name, dpe); + byte[] compressedBytes = toCompressedBytes(retainedLines); + PersistenceUtils.storeToFileSystem(new byte[0], compressedBytes, + calculateOutputPath(virtualHost, physicalHost, date), + StandardOpenOption.CREATE_NEW); + PersistenceUtils.storeToFileSystem(new byte[0], compressedBytes, + calculateRecentPath(virtualHost, physicalHost, date), + StandardOpenOption.CREATE_NEW); } catch (Throwable th) { // catch all else - log.error("Serious problem. Cannot store log desriptor {}.", name, th); + log.error("Cannot store log descriptor.", th); } } @@ -279,5 +383,25 @@ public class SanitizeWeblogs extends CollecTorMain { return Collections.emptyMap(); } + private void writeProcessedWebstats( + Map<LogMetadata, Set<LocalDate>> newlyProcessedWebstats) { + try { + if (!Files.exists(this.processedWebstatsFile.getParent())) { + Files.createDirectories(this.processedWebstatsFile.getParent()); + } + List<String> lines = new ArrayList<>(); + for (Map.Entry<LogMetadata, Set<LocalDate>> e + : newlyProcessedWebstats.entrySet()) { + for (LocalDate logLineDate : e.getValue()) { + lines.add(String.format("%s,%s", logLineDate, e.getKey().path)); + } + } + Files.write(this.processedWebstatsFile, lines, StandardOpenOption.CREATE); + } catch (IOException e) { + log.error("Cannot write state file {}.", this.processedWebstatsFile, e); + } + log.debug("Wrote state file containing {} log files.", + newlyProcessedWebstats.size()); + } } |
