1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
|
"""
Service providing hostname resolution via reverse DNS lookups. This provides
both resolution via a thread pool (looking up several addresses at a time) and
caching of the results. If used, it's advisable that this service is stopped
when it's no longer needed. All calls are both non-blocking and thread safe.
Be aware that this relies on querying the system's DNS servers, possibly
leaking the requested addresses to third parties.
"""
# The only points of concern in terms of concurrent calls are the RESOLVER and
# RESOLVER.resolvedCache. This services provides (mostly) non-locking thread
# safety via the following invariants:
# - Resolver and cache instances are non-destructible
# Nothing can be removed or invalidated. Rather, halting resolvers and
# trimming the cache are done via reassignment (pointing the RESOLVER or
# RESOLVER.resolvedCache to another copy).
# - Functions create and use local references to the resolver and its cache
# This is for consistency (ie, all operations are done on the same resolver
# or cache instance regardless of concurrent assignments). Usually it's
# assigned to a local variable called 'resolverRef' or 'cacheRef'.
# - Locks aren't necessary, but used to help in the following cases:
# - When assigning to the RESOLVER (to avoid orphaned instances with
# running thread pools).
# - When adding/removing from the cache (prevents workers from updating
# an outdated cache reference).
import time
import socket
import threading
import itertools
import Queue
import distutils.sysconfig
from util import log, sysTools
RESOLVER = None # hostname resolver (service is stopped if None)
RESOLVER_LOCK = threading.RLock() # regulates assignment to the RESOLVER
RESOLVER_COUNTER = itertools.count() # atomic counter, providing the age for new entries (for trimming)
DNS_ERROR_CODES = ("1(FORMERR)", "2(SERVFAIL)", "3(NXDOMAIN)", "4(NOTIMP)", "5(REFUSED)", "6(YXDOMAIN)",
"7(YXRRSET)", "8(NXRRSET)", "9(NOTAUTH)", "10(NOTZONE)", "16(BADVERS)")
CONFIG = {"queries.hostnames.poolSize": 5,
"queries.hostnames.useSocketModule": False,
"cache.hostnames.size": 700000,
"cache.hostnames.trimSize": 200000,
"log.hostnameCacheTrimmed": log.INFO}
def loadConfig(config):
config.update(CONFIG, {
"queries.hostnames.poolSize": 1,
"cache.hostnames.size": 100,
"cache.hostnames.trimSize": 10})
CONFIG["cache.hostnames.trimSize"] = min(CONFIG["cache.hostnames.trimSize"], CONFIG["cache.hostnames.size"] / 2)
def start():
"""
Primes the service to start resolving addresses. Calling this explicitly is
not necessary since resolving any address will start the service if it isn't
already running.
"""
global RESOLVER
RESOLVER_LOCK.acquire()
if not isRunning(): RESOLVER = _Resolver()
RESOLVER_LOCK.release()
def stop():
"""
Halts further resolutions and stops the service. This joins on the resolver's
thread pool and clears its lookup cache.
"""
global RESOLVER
RESOLVER_LOCK.acquire()
if isRunning():
# Releases resolver instance. This is done first so concurrent calls to the
# service won't try to use it. However, using a halted instance is fine and
# all calls currently in progress can still proceed on the RESOLVER's local
# references.
resolverRef, RESOLVER = RESOLVER, None
# joins on its worker thread pool
resolverRef.stop()
for t in resolverRef.threadPool: t.join()
RESOLVER_LOCK.release()
def setPaused(isPause):
"""
Allows or prevents further hostname resolutions (resolutions still make use of
cached entries if available). This starts the service if it isn't already
running.
Arguments:
isPause - puts a freeze on further resolutions if true, allows them to
continue otherwise
"""
# makes sure a running resolver is set with the pausing setting
RESOLVER_LOCK.acquire()
start()
RESOLVER.isPaused = isPause
RESOLVER_LOCK.release()
def isRunning():
"""
Returns True if the service is currently running, False otherwise.
"""
return bool(RESOLVER)
def isPaused():
"""
Returns True if the resolver is paused, False otherwise.
"""
resolverRef = RESOLVER
if resolverRef: return resolverRef.isPaused
else: return False
def isResolving():
"""
Returns True if addresses are currently waiting to be resolved, False
otherwise.
"""
resolverRef = RESOLVER
if resolverRef: return not resolverRef.unresolvedQueue.empty()
else: return False
def resolve(ipAddr, timeout = 0, suppressIOExc = True):
"""
Provides the hostname associated with a given IP address. By default this is
a non-blocking call, fetching cached results if available and queuing the
lookup if not. This provides None if the lookup fails (with a suppressed
exception) or timeout is reached without resolution. This starts the service
if it isn't already running.
If paused this simply returns the cached reply (no request is queued and
returns immediately regardless of the timeout argument).
Requests may raise the following exceptions:
- ValueError - address was unresolvable (includes the DNS error response)
- IOError - lookup failed due to os or network issues (suppressed by default)
Arguments:
ipAddr - ip address to be resolved
timeout - maximum duration to wait for a resolution (blocks to
completion if None)
suppressIOExc - suppresses lookup errors and re-runs failed calls if true,
raises otherwise
"""
# starts the service if it isn't already running (making sure we have an
# instance in a thread safe fashion before continuing)
resolverRef = RESOLVER
if resolverRef == None:
RESOLVER_LOCK.acquire()
start()
resolverRef = RESOLVER
RESOLVER_LOCK.release()
if resolverRef.isPaused:
# get cache entry, raising if an exception and returning if a hostname
cacheRef = resolverRef.resolvedCache
if ipAddr in cacheRef:
entry = cacheRef[ipAddr][0]
if suppressIOExc and type(entry) == IOError: return None
elif isinstance(entry, Exception): raise entry
else: return entry
else: return None
elif suppressIOExc:
# if resolver has cached an IOError then flush the entry (this defaults to
# suppression since these error may be transient)
cacheRef = resolverRef.resolvedCache
flush = ipAddr in cacheRef and type(cacheRef[ipAddr]) == IOError
try: return resolverRef.getHostname(ipAddr, timeout, flush)
except IOError: return None
else: return resolverRef.getHostname(ipAddr, timeout)
def getPendingCount():
"""
Provides an approximate count of the number of addresses still pending
resolution.
"""
resolverRef = RESOLVER
if resolverRef: return resolverRef.unresolvedQueue.qsize()
else: return 0
def getRequestCount():
"""
Provides the number of resolutions requested since starting the service.
"""
resolverRef = RESOLVER
if resolverRef: return resolverRef.totalResolves
else: return 0
def _resolveViaSocket(ipAddr):
"""
Performs hostname lookup via the socket module's gethostbyaddr function. This
raises an IOError if the lookup fails (network issue) and a ValueError in
case of DNS errors (address unresolvable).
Arguments:
ipAddr - ip address to be resolved
"""
try:
# provides tuple like: ('localhost', [], ['127.0.0.1'])
return socket.gethostbyaddr(ipAddr)[0]
except socket.herror, exc:
if exc[0] == 2: raise IOError(exc[1]) # "Host name lookup failure"
else: raise ValueError(exc[1]) # usually "Unknown host"
except socket.error, exc: raise ValueError(exc[1])
def _resolveViaHost(ipAddr):
"""
Performs a host lookup for the given IP, returning the resolved hostname.
This raises an IOError if the lookup fails (os or network issue), and a
ValueError in the case of DNS errors (address is unresolvable).
Arguments:
ipAddr - ip address to be resolved
"""
hostname = sysTools.call("host %s" % ipAddr)[0].split()[-1:][0]
if hostname == "reached":
# got message: ";; connection timed out; no servers could be reached"
raise IOError("lookup timed out")
elif hostname in DNS_ERROR_CODES:
# got error response (can't do resolution on address)
raise ValueError("address is unresolvable: %s" % hostname)
else:
# strips off ending period and returns hostname
return hostname[:-1]
class _Resolver():
"""
Performs reverse DNS resolutions. Lookups are a network bound operation so
this spawns a pool of worker threads to do several at a time in parallel.
"""
def __init__(self):
# IP Address => (hostname/error, age), resolution failures result in a
# ValueError with the lookup's status
self.resolvedCache = {}
self.resolvedLock = threading.RLock() # governs concurrent access when modifying resolvedCache
self.unresolvedQueue = Queue.Queue() # unprocessed lookup requests
self.recentQueries = [] # recent resolution requests to prevent duplicate requests
self.threadPool = [] # worker threads that process requests
self.totalResolves = 0 # counter for the total number of addresses queried to be resolved
self.isPaused = False # prevents further resolutions if true
self.halt = False # if true, tells workers to stop
self.cond = threading.Condition() # used for pausing threads
# Determines if resolutions are made using os 'host' calls or python's
# 'socket.gethostbyaddr'. The following checks if the system has the
# gethostbyname_r function, which determines if python resolutions can be
# done in parallel or not. If so, this is preferable.
isSocketResolutionParallel = distutils.sysconfig.get_config_var("HAVE_GETHOSTBYNAME_R")
self.useSocketResolution = CONFIG["queries.hostnames.useSocketModule"] and isSocketResolutionParallel
for _ in range(CONFIG["queries.hostnames.poolSize"]):
t = threading.Thread(target = self._workerLoop)
t.setDaemon(True)
t.start()
self.threadPool.append(t)
def getHostname(self, ipAddr, timeout, flushCache = False):
"""
Provides the hostname, queuing the request and returning None if the
timeout is reached before resolution. If a problem's encountered then this
either raises an IOError (for os and network issues) or ValueError (for DNS
resolution errors).
Arguments:
ipAddr - ip address to be resolved
timeout - maximum duration to wait for a resolution (blocks to
completion if None)
flushCache - if true the cache is skipped and address re-resolved
"""
# if outstanding requests are done then clear recentQueries to allow
# entries removed from the cache to be re-run
if self.unresolvedQueue.empty(): self.recentQueries = []
# copies reference cache (this is important in case the cache is trimmed
# during this call)
cacheRef = self.resolvedCache
if not flushCache and ipAddr in cacheRef:
# cached response is available - raise if an error, return if a hostname
response = cacheRef[ipAddr][0]
if isinstance(response, Exception): raise response
else: return response
elif flushCache or ipAddr not in self.recentQueries:
# new request - queue for resolution
self.totalResolves += 1
self.recentQueries.append(ipAddr)
self.unresolvedQueue.put(ipAddr)
# periodically check cache if requester is willing to wait
if timeout == None or timeout > 0:
startTime = time.time()
while timeout == None or time.time() - startTime < timeout:
if ipAddr in cacheRef:
# address was resolved - raise if an error, return if a hostname
response = cacheRef[ipAddr][0]
if isinstance(response, Exception): raise response
else: return response
else: time.sleep(0.1)
return None # timeout reached without resolution
def stop(self):
"""
Halts further resolutions and terminates the thread.
"""
self.cond.acquire()
self.halt = True
self.cond.notifyAll()
self.cond.release()
def _workerLoop(self):
"""
Simple producer-consumer loop followed by worker threads. This takes
addresses from the unresolvedQueue, attempts to look up its hostname, and
adds its results or the error to the resolved cache. Resolver reference
provides shared resources used by the thread pool.
"""
while not self.halt:
# if resolver is paused then put a hold on further resolutions
if self.isPaused:
self.cond.acquire()
if not self.halt: self.cond.wait(1)
self.cond.release()
continue
# snags next available ip, timeout is because queue can't be woken up
# when 'halt' is set
try: ipAddr = self.unresolvedQueue.get_nowait()
except Queue.Empty:
# no elements ready, wait a little while and try again
self.cond.acquire()
if not self.halt: self.cond.wait(1)
self.cond.release()
continue
if self.halt: break
try:
if self.useSocketResolution: result = _resolveViaSocket(ipAddr)
else: result = _resolveViaHost(ipAddr)
except IOError, exc: result = exc # lookup failed
except ValueError, exc: result = exc # dns error
self.resolvedLock.acquire()
self.resolvedCache[ipAddr] = (result, RESOLVER_COUNTER.next())
# trim cache if excessively large (clearing out oldest entries)
if len(self.resolvedCache) > CONFIG["cache.hostnames.size"]:
# Providing for concurrent, non-blocking calls require that entries are
# never removed from the cache, so this creates a new, trimmed version
# instead.
# determines minimum age of entries to be kept
currentCount = RESOLVER_COUNTER.next()
newCacheSize = CONFIG["cache.hostnames.size"] - CONFIG["cache.hostnames.trimSize"]
threshold = currentCount - newCacheSize
newCache = {}
msg = "trimming hostname cache from %i entries to %i" % (len(self.resolvedCache), newCacheSize)
log.log(CONFIG["log.hostnameCacheTrimmed"], msg)
# checks age of each entry, adding to toDelete if too old
for ipAddr, entry in self.resolvedCache.iteritems():
if entry[1] >= threshold: newCache[ipAddr] = entry
self.resolvedCache = newCache
self.resolvedLock.release()
|