summaryrefslogtreecommitdiff
path: root/src/main/java/org/torproject/collector/cron/Scheduler.java
blob: 6bc90cab048e58784e345cf32b914c0b169b7570 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
/* Copyright 2016 The Tor Project
 * See LICENSE for licensing information */

package org.torproject.collector.cron;

import org.torproject.collector.conf.Configuration;
import org.torproject.collector.conf.ConfigurationException;
import org.torproject.collector.conf.Key;
import org.torproject.collector.cron.CollecTorMain;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.InvocationTargetException;
import java.util.Calendar;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

/**
 * Scheduler that starts the modules configured in collector.properties.
 */
public class Scheduler implements ThreadFactory {

  public static final String ACTIVATED = "Activated";
  public static final String PERIODMIN = "PeriodMinutes";
  public static final String OFFSETMIN = "OffsetMinutes";

  private static final Logger log = LoggerFactory.getLogger(Scheduler.class);

  private final ThreadFactory threads = Executors.defaultThreadFactory();

  private int currentThreadNo = 0;

  private final ScheduledExecutorService scheduler =
      Executors.newScheduledThreadPool(10, this);

  private static Scheduler instance = new Scheduler();

  private Scheduler(){}

  public static Scheduler getInstance() {
    return instance;
  }

  /**
   * Schedule all classes given according to the parameters in the
   * the configuration.
   */
  public void scheduleModuleRuns(Map<Key,
      Class<? extends CollecTorMain>> collecTorMains, Configuration conf) {
    for ( Map.Entry<Key, Class<? extends CollecTorMain>> ctmEntry
        : collecTorMains.entrySet() ) {
      try {
        if ( conf.getBool(ctmEntry.getKey()) ) {
          String prefix = ctmEntry.getKey().name().replace(ACTIVATED, "");
          CollecTorMain ctm = ctmEntry.getValue()
              .getConstructor(Configuration.class).newInstance(conf);
          scheduleExecutions(ctm,
              conf.getInt(Key.valueOf(prefix + OFFSETMIN)),
              conf.getInt(Key.valueOf(prefix + PERIODMIN)));
        }
      } catch (ConfigurationException | IllegalAccessException
          | InstantiationException | InvocationTargetException
          | NoSuchMethodException | RejectedExecutionException
          | NullPointerException ex) {
        log.error("Cannot schedule " + ctmEntry.getValue().getName()
            + ". Reason: " + ex.getMessage(), ex);
      }
    }
  }

  private void scheduleExecutions(CollecTorMain ctm, int offset, int period) {
    this.log.info("Periodic updater started for " + ctm.getClass().getName()
        +  "; offset=" + offset + ", period=" + period + ".");
    int currentMinute = Calendar.getInstance().get(Calendar.MINUTE);
    int initialDelay = (period - (currentMinute % period)  + offset) % period;

    /* Run after initialDelay delay and then every period min. */
    this.log.info("Periodic updater will start every " + period + "th min "
        + "at minute " + ((currentMinute + initialDelay) % period) + "."
        + "  The first start will happen in " + initialDelay + " minute(s).");
    this.scheduler.scheduleAtFixedRate(ctm, initialDelay, period,
        TimeUnit.MINUTES);
  }

  /**
   * Try to shutdown smoothly, i.e., wait for running tasks to terminate.
   */
  public void shutdownScheduler() {
    try {
      scheduler.shutdown();
      scheduler.awaitTermination(20L, java.util.concurrent.TimeUnit.MINUTES);
      log.info("Shutdown of all scheduled tasks completed successfully.");
    } catch ( InterruptedException ie ) {
      List<Runnable> notTerminated = scheduler.shutdownNow();
      log.error("Regular shutdown failed for: " + notTerminated);
      if ( !notTerminated.isEmpty() ) {
        log.error("Forced shutdown failed for: " + notTerminated);
      }
    }
  }

  /**
   * Provide a nice name for debugging and log thread creation.
   */
  @Override
  public Thread newThread(Runnable runner) {
    Thread newThread = threads.newThread(runner);
    newThread.setName("CollecTor-Scheduled-Thread-" + ++currentThreadNo);
    log.info("New Thread created: " + newThread.getName());
    return newThread;
  }

}