diff options
| author | iwakeh <iwakeh@torproject.org> | 2017-06-17 06:55:24 +0000 |
|---|---|---|
| committer | iwakeh <iwakeh@torproject.org> | 2017-06-17 06:55:25 +0000 |
| commit | 15cf44d70278e55a472ebf701f98f06d79c78988 (patch) | |
| tree | 16aea252d1e70b8b1f3ef2cc7a1ccb48f085dde9 | |
| parent | 7ef5c4b135c7975273b4a92d4539cfc703d08ccf (diff) | |
Adapts CollecTor to metrics-lib 1.9.0, which removed DescriptorFile.
Build.xml still pointing to 1.8.2-dev
Implements task-22652.
5 files changed, 111 insertions, 112 deletions
@@ -11,7 +11,7 @@ <property name="release.version" value="1.1.2-dev" /> <property name="project-main-class" value="org.torproject.collector.Main" /> <property name="name" value="collector"/> - <property name="descriptorversion" value="1.8.2" /> + <property name="descriptorversion" value="1.8.2-dev" /> <property name="jarincludes" value="collector.properties logback.xml" /> <patternset id="runtime" > diff --git a/src/main/java/org/torproject/collector/relaydescs/ReferenceChecker.java b/src/main/java/org/torproject/collector/relaydescs/ReferenceChecker.java index 59db27f..843156c 100644 --- a/src/main/java/org/torproject/collector/relaydescs/ReferenceChecker.java +++ b/src/main/java/org/torproject/collector/relaydescs/ReferenceChecker.java @@ -4,7 +4,6 @@ package org.torproject.collector.relaydescs; import org.torproject.descriptor.Descriptor; -import org.torproject.descriptor.DescriptorFile; import org.torproject.descriptor.DescriptorReader; import org.torproject.descriptor.DescriptorSourceFactory; import org.torproject.descriptor.DirSourceEntry; @@ -167,36 +166,33 @@ public class ReferenceChecker { private void readNewDescriptors() { DescriptorReader descriptorReader = DescriptorSourceFactory.createDescriptorReader(); - descriptorReader.addDirectory(this.descriptorsDir); descriptorReader.setHistoryFile(this.historyFile); - Iterator<DescriptorFile> descriptorFiles = - descriptorReader.readDescriptors(); - while (descriptorFiles.hasNext()) { - DescriptorFile descriptorFile = descriptorFiles.next(); - for (Descriptor descriptor : descriptorFile.getDescriptors()) { - if (descriptor instanceof RelayNetworkStatusConsensus) { - RelayNetworkStatusConsensus consensus = - (RelayNetworkStatusConsensus) descriptor; - String consensusFlavor = consensus.getConsensusFlavor(); - if (consensusFlavor == null) { - this.readRelayNetworkStatusConsensusUnflavored(consensus); - } else if (consensusFlavor.equals("microdesc")) { - this.readRelayNetworkStatusConsensusMicrodesc(consensus); - } else { - /* Ignore unknown consensus flavors. */ - } - } else if (descriptor instanceof RelayNetworkStatusVote) { - this.readRelayNetworkStatusVote( - (RelayNetworkStatusVote) descriptor); - } else if (descriptor instanceof ServerDescriptor) { - this.readServerDescriptor((ServerDescriptor) descriptor); - } else if (descriptor instanceof ExtraInfoDescriptor) { - this.readExtraInfoDescriptor((ExtraInfoDescriptor) descriptor); - } else if (descriptor instanceof Microdescriptor) { - readMicrodescriptor((Microdescriptor) descriptor); + Iterator<Descriptor> descriptors + = descriptorReader.readDescriptors(this.descriptorsDir).iterator(); + while (descriptors.hasNext()) { + Descriptor descriptor = descriptors.next(); + if (descriptor instanceof RelayNetworkStatusConsensus) { + RelayNetworkStatusConsensus consensus = + (RelayNetworkStatusConsensus) descriptor; + String consensusFlavor = consensus.getConsensusFlavor(); + if (consensusFlavor == null) { + this.readRelayNetworkStatusConsensusUnflavored(consensus); + } else if (consensusFlavor.equals("microdesc")) { + this.readRelayNetworkStatusConsensusMicrodesc(consensus); } else { - /* Ignore unknown descriptors. */ + /* Ignore unknown consensus flavors. */ } + } else if (descriptor instanceof RelayNetworkStatusVote) { + this.readRelayNetworkStatusVote( + (RelayNetworkStatusVote) descriptor); + } else if (descriptor instanceof ServerDescriptor) { + this.readServerDescriptor((ServerDescriptor) descriptor); + } else if (descriptor instanceof ExtraInfoDescriptor) { + this.readExtraInfoDescriptor((ExtraInfoDescriptor) descriptor); + } else if (descriptor instanceof Microdescriptor) { + readMicrodescriptor((Microdescriptor) descriptor); + } else { + /* Ignore unknown descriptors. */ } } descriptorReader.saveHistoryFile(this.historyFile); diff --git a/src/main/java/org/torproject/collector/sync/ProcessCriterium.java b/src/main/java/org/torproject/collector/sync/ProcessCriterium.java index 1fa05aa..c128e14 100644 --- a/src/main/java/org/torproject/collector/sync/ProcessCriterium.java +++ b/src/main/java/org/torproject/collector/sync/ProcessCriterium.java @@ -4,10 +4,9 @@ package org.torproject.collector.sync; import org.torproject.descriptor.Descriptor; -import org.torproject.descriptor.DescriptorFile; /** Should a descriptor file be processed during sync. */ -public class ProcessCriterium implements Criterium<DescriptorFile> { +public class ProcessCriterium implements Criterium<Descriptor> { private final Class<? extends Descriptor> wantedType; @@ -17,13 +16,8 @@ public class ProcessCriterium implements Criterium<DescriptorFile> { /** Only process descriptors with the appropriate type. */ @Override - public boolean applies(DescriptorFile file) { - for (Descriptor desc : file.getDescriptors()) { - if (!this.wantedType.isInstance(desc)) { - return false; - } - } - return true; + public boolean applies(Descriptor desc) { + return this.wantedType.isInstance(desc); } } diff --git a/src/main/java/org/torproject/collector/sync/SyncManager.java b/src/main/java/org/torproject/collector/sync/SyncManager.java index d8d2708..369f3ff 100644 --- a/src/main/java/org/torproject/collector/sync/SyncManager.java +++ b/src/main/java/org/torproject/collector/sync/SyncManager.java @@ -9,9 +9,9 @@ import org.torproject.collector.conf.Key; import org.torproject.descriptor.Descriptor; import org.torproject.descriptor.DescriptorCollector; -import org.torproject.descriptor.DescriptorFile; import org.torproject.descriptor.DescriptorReader; import org.torproject.descriptor.DescriptorSourceFactory; +import org.torproject.descriptor.UnparseableDescriptor; import org.torproject.descriptor.index.DescriptorIndexCollector; import org.slf4j.Logger; @@ -70,15 +70,17 @@ public class SyncManager { String marker, Configuration conf) throws ConfigurationException { Path basePath = conf.getPath(Key.SyncPath); SyncPersistence persist = new SyncPersistence(conf); + Criterium<Descriptor> unparseable + = new ProcessCriterium(UnparseableDescriptor.class); for (URL source : sources) { File base = new File(basePath.toFile(), marker + "-" + source.getHost()); log.info("Merging {} from {} into storage ...", marker, source.getHost()); for (Map.Entry<String, Class<? extends Descriptor>> entry : mapPathDesc.entrySet()) { + File descFile = new File(base, entry.getKey()); DescriptorReader descriptorReader = DescriptorSourceFactory.createDescriptorReader(); - descriptorReader.addDirectory(new File(base, entry.getKey())); String histFileEnding = entry.getValue().getSimpleName() + (entry.getKey().contains("consensus-microdesc") ? "-micro" : ""); @@ -87,27 +89,26 @@ public class SyncManager { + histFileEnding); descriptorReader.setHistoryFile(historyFile); log.info("Reading {} of type {} ... ", marker, histFileEnding); - Iterator<DescriptorFile> descriptorFiles - = descriptorReader.readDescriptors(); + Iterator<Descriptor> descriptors + = descriptorReader.readDescriptors(descFile).iterator(); log.info("Done reading {} of type {}.", marker, histFileEnding); - Criterium crit = new ProcessCriterium(entry.getValue()); - while (descriptorFiles.hasNext()) { - DescriptorFile descFile = descriptorFiles.next(); - log.debug("Operating on desc-file containing {} descs.", - descFile.getDescriptors().size()); - if (!crit.applies(descFile)) { - log.warn("Not processing {} in {}.", descFile.getFileName(), - descFile.getDirectory()); + Criterium<Descriptor> crit = new ProcessCriterium(entry.getValue()); + while (descriptors.hasNext()) { + Descriptor desc = descriptors.next(); + if (unparseable.applies(desc)) { + Exception ex + = ((UnparseableDescriptor)desc).getDescriptorParseException(); + log.warn("Parsing of {} caused Exception(s). Processing anyway.", + descFile, ex); + } + if (!crit.applies(desc)) { + log.warn("Not processing {} in {}.", desc.getClass().getName(), + descFile); continue; } - Exception ex = descFile.getException(); - if (null != ex) { - log.warn("Parsing of {} caused Exception(s). Processing anyway.", - descFile.getDirectory() + "/" + descFile.getFileName(), ex); - } - persist.storeDescs(descFile.getDescriptors(), - descFile.getFile().getName(), collectionDate.getTime()); + persist.storeDesc(desc, + descFile.getName(), collectionDate.getTime()); } descriptorReader.saveHistoryFile(historyFile); } diff --git a/src/main/java/org/torproject/collector/sync/SyncPersistence.java b/src/main/java/org/torproject/collector/sync/SyncPersistence.java index 5f18f63..e764344 100644 --- a/src/main/java/org/torproject/collector/sync/SyncPersistence.java +++ b/src/main/java/org/torproject/collector/sync/SyncPersistence.java @@ -62,61 +62,7 @@ public class SyncPersistence { public void storeDescs(List<Descriptor> descs, String filename, long received) { for (Descriptor desc : descs) { - boolean recognizedAndWritten = false; - for (Class clazz : desc.getClass().getInterfaces()) { - DescriptorPersistence descPersist = null; - switch (clazz.getSimpleName()) { - case "RelayNetworkStatusVote": - descPersist - = new VotePersistence((RelayNetworkStatusVote) desc, received); - break; - case "RelayNetworkStatusConsensus": - RelayNetworkStatusConsensus cons = - (RelayNetworkStatusConsensus) desc; - if (null == cons.getConsensusFlavor()) { - descPersist = new ConsensusPersistence(cons, received); - } else if ("microdesc".equals(cons.getConsensusFlavor())) { - descPersist = new MicroConsensusPersistence(cons, received); - } - break; - case "RelayServerDescriptor": - descPersist = new ServerDescriptorPersistence( - (RelayServerDescriptor) desc, received); - break; - case "BridgeExtraInfoDescriptor": - descPersist = new BridgeExtraInfoPersistence( - (BridgeExtraInfoDescriptor) desc, received); - break; - case "RelayExtraInfoDescriptor": - descPersist = new ExtraInfoPersistence( - (RelayExtraInfoDescriptor) desc, received); - break; - case "BridgeNetworkStatus": // need to infer authId from filename - descPersist = new StatusPersistence( - (BridgeNetworkStatus) desc, filename.split(DASH)[2], received); - break; - case "BridgeServerDescriptor": - descPersist = new BridgeServerDescriptorPersistence( - (BridgeServerDescriptor) desc, received); - break; - case "ExitList": // downloaded is part of desc, which to use? - descPersist = new ExitlistPersistence((ExitList) desc, received); - break; - default: - log.trace("Invalid descriptor type {} for sync-merge.", - clazz.getName()); - continue; - } - if (null != descPersist) { - descPersist.storeAll(recentPathName, outputPathName); - recognizedAndWritten = true; - } - break; - } - if (!recognizedAndWritten) { - log.error("Unknown descriptor type {} implementing {}.", - desc.getClass().getSimpleName(), desc.getClass().getInterfaces()); - } + storeDesc(desc, filename, received); } try { PersistenceUtils.cleanDirectory(recentPath); @@ -125,4 +71,66 @@ public class SyncPersistence { } } + /** + * Stores a descriptor in main storage and recent. + * The storage locations are taken from <code>collector.properties</code>' + * options <code>OutputPath</code> and <code>RecentPath</code>. + */ + public void storeDesc(Descriptor desc, String filename, long received) { + boolean recognizedAndWritten = false; + for (Class clazz : desc.getClass().getInterfaces()) { + DescriptorPersistence descPersist = null; + switch (clazz.getSimpleName()) { + case "RelayNetworkStatusVote": + descPersist + = new VotePersistence((RelayNetworkStatusVote) desc, received); + break; + case "RelayNetworkStatusConsensus": + RelayNetworkStatusConsensus cons = + (RelayNetworkStatusConsensus) desc; + if (null == cons.getConsensusFlavor()) { + descPersist = new ConsensusPersistence(cons, received); + } else if ("microdesc".equals(cons.getConsensusFlavor())) { + descPersist = new MicroConsensusPersistence(cons, received); + } + break; + case "RelayServerDescriptor": + descPersist = new ServerDescriptorPersistence( + (RelayServerDescriptor) desc, received); + break; + case "BridgeExtraInfoDescriptor": + descPersist = new BridgeExtraInfoPersistence( + (BridgeExtraInfoDescriptor) desc, received); + break; + case "RelayExtraInfoDescriptor": + descPersist = new ExtraInfoPersistence( + (RelayExtraInfoDescriptor) desc, received); + break; + case "BridgeNetworkStatus": // need to infer authId from filename + descPersist = new StatusPersistence( + (BridgeNetworkStatus) desc, filename.split(DASH)[2], received); + break; + case "BridgeServerDescriptor": + descPersist = new BridgeServerDescriptorPersistence( + (BridgeServerDescriptor) desc, received); + break; + case "ExitList": // downloaded is part of desc, which to use? + descPersist = new ExitlistPersistence((ExitList) desc, received); + break; + default: + log.trace("Invalid descriptor type {} for sync-merge.", + clazz.getName()); + continue; + } + if (null != descPersist) { + descPersist.storeAll(recentPathName, outputPathName); + recognizedAndWritten = true; + } + break; + } + if (!recognizedAndWritten) { + log.error("Unknown descriptor type {} implementing {}.", + desc.getClass().getSimpleName(), desc.getClass().getInterfaces()); + } + } } |
