9771 - Keep Torflow happy
[pytorctl.git] / PathSupport.py
1 #!/usr/bin/python
2 # Copyright 2007-2010 Mike Perry. See LICENSE file.
3 """
4
5 Support classes for path construction
6
7 The PathSupport package builds on top of TorCtl.TorCtl. It provides a
8 number of interfaces that make path construction easier.
9
10 The inheritance diagram for event handling is as follows:
11 TorCtl.EventHandler <- TorCtl.ConsensusTracker <- PathBuilder
12 <- CircuitHandler <- StreamHandler.
13
14 Basically, EventHandler is what gets all the control port events
15 packaged in nice clean classes (see help(TorCtl) for information on
16 those).
17
18 ConsensusTracker tracks the NEWCONSENSUS and NEWDESC events to maintain
19 a view of the network that is consistent with the Tor client's current
20 consensus.
21
22 PathBuilder inherits from ConsensusTracker and is what builds all
23 circuits based on the requirements specified in the SelectionManager
24 instance passed to its constructor. It also handles attaching streams to
25 circuits. It only handles one building one circuit at a time.
26
27 CircuitHandler optionally inherits from PathBuilder, and overrides its
28 circuit event handling to manage building a pool of circuits as opposed
29 to just one. It still uses the SelectionManager for path selection.
30
31 StreamHandler inherits from CircuitHandler, and is what governs the
32 attachment of an incoming stream on to one of the multiple circuits of
33 the circuit handler.
34
35 The SelectionManager is essentially a configuration wrapper around the
36 most elegant portions of TorFlow: NodeGenerators, NodeRestrictions, and
37 PathRestrictions. It extends from a BaseSelectionManager that provides
38 a basic example of using these mechanisms for custom implementations.
39
40 In the SelectionManager, a NodeGenerator is used to choose the nodes
41 probabilistically according to some distribution while obeying the
42 NodeRestrictions. These generators (one per hop) are handed off to the
43 PathSelector, which uses the generators to build a complete path that
44 satisfies the PathRestriction requirements.
45
46 Have a look at the class hierarchy directly below to get a feel for how
47 the restrictions fit together, and what options are available.
48
49 """
50
51 import TorCtl
52 import re
53 import struct
54 import random
55 import socket
56 import copy
57 import Queue
58 import time
59 import TorUtil
60 import traceback
61 import threading
62 from TorUtil import *
63
64 import sys
65 if sys.version_info < (2, 5):
66 from sets import Set as set
67
68 __all__ = ["NodeRestrictionList", "PathRestrictionList",
69 "PercentileRestriction", "OSRestriction", "ConserveExitsRestriction",
70 "FlagsRestriction", "MinBWRestriction", "VersionIncludeRestriction",
71 "VersionExcludeRestriction", "VersionRangeRestriction",
72 "ExitPolicyRestriction", "NodeRestriction", "PathRestriction",
73 "OrNodeRestriction", "MetaNodeRestriction", "AtLeastNNodeRestriction",
74 "NotNodeRestriction", "Subnet16Restriction", "UniqueRestriction",
75 "NodeGenerator", "UniformGenerator", "OrderedExitGenerator",
76 "BwWeightedGenerator", "PathSelector", "Connection", "NickRestriction",
77 "IdHexRestriction", "PathBuilder", "CircuitHandler", "StreamHandler",
78 "SelectionManager", "BaseSelectionManager", "CountryCodeRestriction",
79 "CountryRestriction", "UniqueCountryRestriction", "SingleCountryRestriction",
80 "ContinentRestriction", "ContinentJumperRestriction",
81 "UniqueContinentRestriction", "MetaPathRestriction", "RateLimitedRestriction",
82 "SmartSocket"]
83
84 #################### Path Support Interfaces #####################
85
86 class RestrictionError(Exception):
87 "Error raised for issues with applying restrictions"
88 pass
89
90 class NoNodesRemain(RestrictionError):
91 "Error raised for issues with applying restrictions"
92 pass
93
94 class NodeRestriction:
95 "Interface for node restriction policies"
96 def r_is_ok(self, r):
97 "Returns true if Router 'r' is acceptable for this restriction"
98 return True
99
100 class PathRestriction:
101 "Interface for path restriction policies"
102 def path_is_ok(self, path):
103 "Return true if the list of Routers in path satisfies this restriction"
104 return True
105
106 # TODO: Or, Not, N of M
107 class MetaPathRestriction(PathRestriction):
108 "MetaPathRestrictions are path restriction aggregators."
109 def add_restriction(self, rstr): raise NotImplemented()
110 def del_restriction(self, RestrictionClass): raise NotImplemented()
111
112 class PathRestrictionList(MetaPathRestriction):
113 """Class to manage a list of PathRestrictions"""
114 def __init__(self, restrictions):
115 "Constructor. 'restrictions' is a list of PathRestriction instances"
116 self.restrictions = restrictions
117
118 def path_is_ok(self, path):
119 "Given list if Routers in 'path', check it against each restriction."
120 for rs in self.restrictions:
121 if not rs.path_is_ok(path):
122 return False
123 return True
124
125 def add_restriction(self, rstr):
126 "Add a PathRestriction 'rstr' to the list"
127 self.restrictions.append(rstr)
128
129 def del_restriction(self, RestrictionClass):
130 "Remove all PathRestrictions of type RestrictionClass from the list."
131 self.restrictions = filter(
132 lambda r: not isinstance(r, RestrictionClass),
133 self.restrictions)
134
135 def __str__(self):
136 return self.__class__.__name__+"("+str(map(str, self.restrictions))+")"
137
138 class NodeGenerator:
139 "Interface for node generation"
140 def __init__(self, sorted_r, rstr_list):
141 """Constructor. Takes a bandwidth-sorted list of Routers 'sorted_r'
142 and a NodeRestrictionList 'rstr_list'"""
143 self.rstr_list = rstr_list
144 self.rebuild(sorted_r)
145
146 def reset_restriction(self, rstr_list):
147 "Reset the restriction list to a new list"
148 self.rstr_list = rstr_list
149 self.rebuild()
150
151 def rewind(self):
152 "Rewind the generator to the 'beginning'"
153 self.routers = copy.copy(self.rstr_routers)
154 if not self.routers:
155 plog("NOTICE", "No routers left after restrictions applied: "+str(self.rstr_list))
156 raise NoNodesRemain(str(self.rstr_list))
157
158 def rebuild(self, sorted_r=None):
159 """ Extra step to be performed when new routers are added or when
160 the restrictions change. """
161 if sorted_r != None:
162 self.sorted_r = sorted_r
163 self.rstr_routers = filter(lambda r: self.rstr_list.r_is_ok(r), self.sorted_r)
164 if not self.rstr_routers:
165 plog("NOTICE", "No routers left after restrictions applied: "+str(self.rstr_list))
166 raise NoNodesRemain(str(self.rstr_list))
167
168 def mark_chosen(self, r):
169 """Mark a router as chosen: remove it from the list of routers
170 that can be returned in the future"""
171 self.routers.remove(r)
172
173 def all_chosen(self):
174 "Return true if all the routers have been marked as chosen"
175 return not self.routers
176
177 def generate(self):
178 "Return a python generator that yields routers according to the policy"
179 raise NotImplemented()
180
181 class Connection(TorCtl.Connection):
182 """Extended Connection class that provides a method for building circuits"""
183 def __init__(self, sock):
184 TorCtl.Connection.__init__(self,sock)
185 def build_circuit(self, path):
186 "Tell Tor to build a circuit chosen by the PathSelector 'path_sel'"
187 circ = Circuit()
188 circ.path = path
189 circ.exit = circ.path[len(path)-1]
190 circ.circ_id = self.extend_circuit(0, circ.id_path())
191 return circ
192
193 ######################## Node Restrictions ########################
194
195 # TODO: We still need more path support implementations
196 # - NodeRestrictions:
197 # - Uptime/LongLivedPorts (Does/should hibernation count?)
198 # - Published/Updated
199 # - Add a /8 restriction for ExitPolicy?
200 # - PathRestrictions:
201 # - NodeFamily
202 # - GeoIP:
203 # - Mathematical/empirical study of predecessor expectation
204 # - If middle node on the same continent as exit, exit learns nothing
205 # - else, exit has a bias on the continent of origin of user
206 # - Language and browser accept string determine this anyway
207 # - ContinentRestrictor (avoids doing more than N continent crossings)
208 # - EchelonPhobicRestrictor
209 # - Does not cross international boundaries for client->Entry or
210 # Exit->destination hops
211
212 class PercentileRestriction(NodeRestriction):
213 """Restriction to cut out a percentile slice of the network."""
214 def __init__(self, pct_skip, pct_fast, r_list):
215 """Constructor. Sets up the restriction such that routers in the
216 'pct_skip' to 'pct_fast' percentile of bandwidth rankings are
217 returned from the sorted list 'r_list'"""
218 self.pct_fast = pct_fast
219 self.pct_skip = pct_skip
220 self.sorted_r = r_list
221
222 def r_is_ok(self, r):
223 "Returns true if r is in the percentile boundaries (by rank)"
224 if r.list_rank < len(self.sorted_r)*self.pct_skip/100: return False
225 elif r.list_rank > len(self.sorted_r)*self.pct_fast/100: return False
226
227 return True
228
229 def __str__(self):
230 return self.__class__.__name__+"("+str(self.pct_skip)+","+str(self.pct_fast)+")"
231
232 class RatioPercentileRestriction(NodeRestriction):
233 """Restriction to cut out a percentile slice of the network by ratio of
234 consensus bw to descriptor bw."""
235 def __init__(self, pct_skip, pct_fast, r_list):
236 """Constructor. Sets up the restriction such that routers in the
237 'pct_skip' to 'pct_fast' percentile of bandwidth rankings are
238 returned from the sorted list 'r_list'"""
239 self.pct_fast = pct_fast
240 self.pct_skip = pct_skip
241 self.sorted_r = r_list
242
243 def r_is_ok(self, r):
244 "Returns true if r is in the percentile boundaries (by rank)"
245 if r.ratio_rank < len(self.sorted_r)*self.pct_skip/100: return False
246 elif r.ratio_rank > len(self.sorted_r)*self.pct_fast/100: return False
247
248 return True
249
250 def __str__(self):
251 return self.__class__.__name__+"("+str(self.pct_skip)+","+str(self.pct_fast)+")"
252
253 class UptimeRestriction(NodeRestriction):
254 """Restriction to filter out routers with uptimes < min_uptime or
255 > max_uptime"""
256 def __init__(self, min_uptime=None, max_uptime=None):
257 self.min_uptime = min_uptime
258 self.max_uptime = max_uptime
259
260 def r_is_ok(self, r):
261 "Returns true if r is in the uptime boundaries"
262 if self.min_uptime and r.uptime < self.min_uptime: return False
263 if self.max_uptime and r.uptime > self.max_uptime: return False
264 return True
265
266 class RankRestriction(NodeRestriction):
267 """Restriction to cut out a list-rank slice of the network."""
268 def __init__(self, rank_skip, rank_stop):
269 self.rank_skip = rank_skip
270 self.rank_stop = rank_stop
271
272 def r_is_ok(self, r):
273 "Returns true if r is in the boundaries (by rank)"
274 if r.list_rank < self.rank_skip: return False
275 elif r.list_rank > self.rank_stop: return False
276
277 return True
278
279 def __str__(self):
280 return self.__class__.__name__+"("+str(self.rank_skip)+","+str(self.rank_stop)+")"
281
282 class OSRestriction(NodeRestriction):
283 "Restriction based on operating system"
284 def __init__(self, ok, bad=[]):
285 """Constructor. Accept router OSes that match regexes in 'ok',
286 rejects those that match regexes in 'bad'."""
287 self.ok = ok
288 self.bad = bad
289
290 def r_is_ok(self, r):
291 "Returns true if r is in 'ok', false if 'r' is in 'bad'. If 'ok'"
292 for y in self.ok:
293 if re.search(y, r.os):
294 return True
295 for b in self.bad:
296 if re.search(b, r.os):
297 return False
298 if self.ok: return False
299 if self.bad: return True
300
301 def __str__(self):
302 return self.__class__.__name__+"("+str(self.ok)+","+str(self.bad)+")"
303
304 class ConserveExitsRestriction(NodeRestriction):
305 "Restriction to reject exits from selection"
306 def __init__(self, exit_ports=None):
307 self.exit_ports = exit_ports
308
309 def r_is_ok(self, r):
310 if self.exit_ports:
311 for port in self.exit_ports:
312 if r.will_exit_to("255.255.255.255", port):
313 return False
314 return True
315 return not "Exit" in r.flags
316
317 def __str__(self):
318 return self.__class__.__name__+"()"
319
320 class FlagsRestriction(NodeRestriction):
321 "Restriction for mandatory and forbidden router flags"
322 def __init__(self, mandatory, forbidden=[]):
323 """Constructor. 'mandatory' and 'forbidden' are both lists of router
324 flags as strings."""
325 self.mandatory = mandatory
326 self.forbidden = forbidden
327
328 def r_is_ok(self, router):
329 for m in self.mandatory:
330 if not m in router.flags: return False
331 for f in self.forbidden:
332 if f in router.flags: return False
333 return True
334
335 def __str__(self):
336 return self.__class__.__name__+"("+str(self.mandatory)+","+str(self.forbidden)+")"
337
338 class NickRestriction(NodeRestriction):
339 """Require that the node nickname is as specified"""
340 def __init__(self, nickname):
341 self.nickname = nickname
342
343 def r_is_ok(self, router):
344 return router.nickname == self.nickname
345
346 def __str__(self):
347 return self.__class__.__name__+"("+str(self.nickname)+")"
348
349 class IdHexRestriction(NodeRestriction):
350 """Require that the node idhash is as specified"""
351 def __init__(self, idhex):
352 if idhex[0] == '$':
353 self.idhex = idhex[1:].upper()
354 else:
355 self.idhex = idhex.upper()
356
357 def r_is_ok(self, router):
358 return router.idhex == self.idhex
359
360 def __str__(self):
361 return self.__class__.__name__+"("+str(self.idhex)+")"
362
363 class MinBWRestriction(NodeRestriction):
364 """Require a minimum bandwidth"""
365 def __init__(self, minbw):
366 self.min_bw = minbw
367
368 def r_is_ok(self, router): return router.bw >= self.min_bw
369
370 def __str__(self):
371 return self.__class__.__name__+"("+str(self.min_bw)+")"
372
373 class RateLimitedRestriction(NodeRestriction):
374 def __init__(self, limited=True):
375 self.limited = limited
376
377 def r_is_ok(self, router): return router.rate_limited == self.limited
378
379 def __str__(self):
380 return self.__class__.__name__+"("+str(self.limited)+")"
381
382 class VersionIncludeRestriction(NodeRestriction):
383 """Require that the version match one in the list"""
384 def __init__(self, eq):
385 "Constructor. 'eq' is a list of versions as strings"
386 self.eq = map(TorCtl.RouterVersion, eq)
387
388 def r_is_ok(self, router):
389 """Returns true if the version of 'router' matches one of the
390 specified versions."""
391 for e in self.eq:
392 if e == router.version:
393 return True
394 return False
395
396 def __str__(self):
397 return self.__class__.__name__+"("+str(self.eq)+")"
398
399 class VersionExcludeRestriction(NodeRestriction):
400 """Require that the version not match one in the list"""
401 def __init__(self, exclude):
402 "Constructor. 'exclude' is a list of versions as strings"
403 self.exclude = map(TorCtl.RouterVersion, exclude)
404
405 def r_is_ok(self, router):
406 """Returns false if the version of 'router' matches one of the
407 specified versions."""
408 for e in self.exclude:
409 if e == router.version:
410 return False
411 return True
412
413 def __str__(self):
414 return self.__class__.__name__+"("+str(map(str, self.exclude))+")"
415
416 class VersionRangeRestriction(NodeRestriction):
417 """Require that the versions be inside a specified range"""
418 def __init__(self, gr_eq, less_eq=None):
419 self.gr_eq = TorCtl.RouterVersion(gr_eq)
420 if less_eq: self.less_eq = TorCtl.RouterVersion(less_eq)
421 else: self.less_eq = None
422
423 def r_is_ok(self, router):
424 return (not self.gr_eq or router.version >= self.gr_eq) and \
425 (not self.less_eq or router.version <= self.less_eq)
426
427 def __str__(self):
428 return self.__class__.__name__+"("+str(self.gr_eq)+","+str(self.less_eq)+")"
429
430 class ExitPolicyRestriction(NodeRestriction):
431 """Require that a router exit to an ip+port"""
432 def __init__(self, to_ip, to_port):
433 self.to_ip = to_ip
434 self.to_port = to_port
435
436 def r_is_ok(self, r): return r.will_exit_to(self.to_ip, self.to_port)
437
438 def __str__(self):
439 return self.__class__.__name__+"("+str(self.to_ip)+","+str(self.to_port)+")"
440
441 class MetaNodeRestriction(NodeRestriction):
442 """Interface for a NodeRestriction that is an expression consisting of
443 multiple other NodeRestrictions"""
444 def add_restriction(self, rstr): raise NotImplemented()
445 # TODO: these should collapse the restriction and return a new
446 # instance for re-insertion (or None)
447 def next_rstr(self): raise NotImplemented()
448 def del_restriction(self, RestrictionClass): raise NotImplemented()
449
450 class OrNodeRestriction(MetaNodeRestriction):
451 """MetaNodeRestriction that is the boolean or of two or more
452 NodeRestrictions"""
453 def __init__(self, rs):
454 "Constructor. 'rs' is a list of NodeRestrictions"
455 self.rstrs = rs
456
457 def r_is_ok(self, r):
458 "Returns true if one of 'rs' is true for this router"
459 for rs in self.rstrs:
460 if rs.r_is_ok(r):
461 return True
462 return False
463
464 def __str__(self):
465 return self.__class__.__name__+"("+str(map(str, self.rstrs))+")"
466
467 class NotNodeRestriction(MetaNodeRestriction):
468 """Negates a single restriction"""
469 def __init__(self, a):
470 self.a = a
471
472 def r_is_ok(self, r): return not self.a.r_is_ok(r)
473
474 def __str__(self):
475 return self.__class__.__name__+"("+str(self.a)+")"
476
477 class AtLeastNNodeRestriction(MetaNodeRestriction):
478 """MetaNodeRestriction that is true if at least n member
479 restrictions are true."""
480 def __init__(self, rstrs, n):
481 self.rstrs = rstrs
482 self.n = n
483
484 def r_is_ok(self, r):
485 cnt = 0
486 for rs in self.rstrs:
487 if rs.r_is_ok(r):
488 cnt += 1
489 if cnt < self.n: return False
490 else: return True
491
492 def __str__(self):
493 return self.__class__.__name__+"("+str(map(str, self.rstrs))+","+str(self.n)+")"
494
495 class NodeRestrictionList(MetaNodeRestriction):
496 "Class to manage a list of NodeRestrictions"
497 def __init__(self, restrictions):
498 "Constructor. 'restrictions' is a list of NodeRestriction instances"
499 self.restrictions = restrictions
500
501 def r_is_ok(self, r):
502 "Returns true of Router 'r' passes all of the contained restrictions"
503 for rs in self.restrictions:
504 if not rs.r_is_ok(r): return False
505 return True
506
507 def add_restriction(self, restr):
508 "Add a NodeRestriction 'restr' to the list of restrictions"
509 self.restrictions.append(restr)
510
511 # TODO: This does not collapse meta restrictions..
512 def del_restriction(self, RestrictionClass):
513 """Remove all restrictions of type RestrictionClass from the list.
514 Does NOT inspect or collapse MetaNode Restrictions (though
515 MetaRestrictions can be removed if RestrictionClass is
516 MetaNodeRestriction)"""
517 self.restrictions = filter(
518 lambda r: not isinstance(r, RestrictionClass),
519 self.restrictions)
520
521 def clear(self):
522 """ Remove all restrictions """
523 self.restrictions = []
524
525 def __str__(self):
526 return self.__class__.__name__+"("+str(map(str, self.restrictions))+")"
527
528
529 #################### Path Restrictions #####################
530
531 class Subnet16Restriction(PathRestriction):
532 """PathRestriction that mandates that no two nodes from the same
533 /16 subnet be in the path"""
534 def path_is_ok(self, path):
535 mask16 = struct.unpack(">I", socket.inet_aton("255.255.0.0"))[0]
536 ip16 = path[0].ip & mask16
537 for r in path[1:]:
538 if ip16 == (r.ip & mask16):
539 return False
540 return True
541
542 def __str__(self):
543 return self.__class__.__name__+"()"
544
545 class UniqueRestriction(PathRestriction):
546 """Path restriction that mandates that the same router can't appear more
547 than once in a path"""
548 def path_is_ok(self, path):
549 for i in xrange(0,len(path)):
550 if path[i] in path[:i]:
551 return False
552 return True
553
554 def __str__(self):
555 return self.__class__.__name__+"()"
556
557 #################### GeoIP Restrictions ###################
558
559 class CountryCodeRestriction(NodeRestriction):
560 """ Ensure that the country_code is set """
561 def r_is_ok(self, r):
562 return r.country_code != None
563
564 def __str__(self):
565 return self.__class__.__name__+"()"
566
567 class CountryRestriction(NodeRestriction):
568 """ Only accept nodes that are in 'country_code' """
569 def __init__(self, country_code):
570 self.country_code = country_code
571
572 def r_is_ok(self, r):
573 return r.country_code == self.country_code
574
575 def __str__(self):
576 return self.__class__.__name__+"("+str(self.country_code)+")"
577
578 class ExcludeCountriesRestriction(NodeRestriction):
579 """ Exclude a list of countries """
580 def __init__(self, countries):
581 self.countries = countries
582
583 def r_is_ok(self, r):
584 return not (r.country_code in self.countries)
585
586 def __str__(self):
587 return self.__class__.__name__+"("+str(self.countries)+")"
588
589 class UniqueCountryRestriction(PathRestriction):
590 """ Ensure every router to have a distinct country_code """
591 def path_is_ok(self, path):
592 for i in xrange(0, len(path)-1):
593 for j in xrange(i+1, len(path)):
594 if path[i].country_code == path[j].country_code:
595 return False;
596 return True;
597
598 def __str__(self):
599 return self.__class__.__name__+"()"
600
601 class SingleCountryRestriction(PathRestriction):
602 """ Ensure every router to have the same country_code """
603 def path_is_ok(self, path):
604 country_code = path[0].country_code
605 for r in path:
606 if country_code != r.country_code:
607 return False
608 return True
609
610 def __str__(self):
611 return self.__class__.__name__+"()"
612
613 class ContinentRestriction(PathRestriction):
614 """ Do not more than n continent crossings """
615 # TODO: Add src and dest
616 def __init__(self, n, src=None, dest=None):
617 self.n = n
618
619 def path_is_ok(self, path):
620 crossings = 0
621 prev = None
622 # Compute crossings until now
623 for r in path:
624 # Jump over the first router
625 if prev:
626 if r.continent != prev.continent:
627 crossings += 1
628 prev = r
629 if crossings > self.n: return False
630 else: return True
631
632 def __str__(self):
633 return self.__class__.__name__+"("+str(self.n)+")"
634
635 class ContinentJumperRestriction(PathRestriction):
636 """ Ensure continent crossings between all hops """
637 def path_is_ok(self, path):
638 prev = None
639 for r in path:
640 # Jump over the first router
641 if prev:
642 if r.continent == prev.continent:
643 return False
644 prev = r
645 return True
646
647 def __str__(self):
648 return self.__class__.__name__+"()"
649
650 class UniqueContinentRestriction(PathRestriction):
651 """ Ensure every hop to be on a different continent """
652 def path_is_ok(self, path):
653 for i in xrange(0, len(path)-1):
654 for j in xrange(i+1, len(path)):
655 if path[i].continent == path[j].continent:
656 return False;
657 return True;
658
659 def __str__(self):
660 return self.__class__.__name__+"()"
661
662 class OceanPhobicRestriction(PathRestriction):
663 """ Not more than n ocean crossings """
664 # TODO: Add src and dest
665 def __init__(self, n, src=None, dest=None):
666 self.n = n
667
668 def path_is_ok(self, path):
669 crossings = 0
670 prev = None
671 # Compute ocean crossings until now
672 for r in path:
673 # Jump over the first router
674 if prev:
675 if r.cont_group != prev.cont_group:
676 crossings += 1
677 prev = r
678 if crossings > self.n: return False
679 else: return True
680
681 def __str__(self):
682 return self.__class__.__name__+"("+str(self.n)+")"
683
684 #################### Node Generators ######################
685
686 class UniformGenerator(NodeGenerator):
687 """NodeGenerator that produces nodes in the uniform distribution"""
688 def generate(self):
689 # XXX: hrmm.. this is not really the right thing to check
690 while not self.all_chosen():
691 yield random.choice(self.routers)
692
693 class ExactUniformGenerator(NodeGenerator):
694 """NodeGenerator that produces nodes randomly, yet strictly uniformly
695 over time"""
696 def __init__(self, sorted_r, rstr_list, position=0):
697 self.position = position
698 NodeGenerator.__init__(self, sorted_r, rstr_list)
699
700 def generate(self):
701 min_gen = min(map(lambda r: r._generated[self.position], self.routers))
702 choices = filter(lambda r: r._generated[self.position]==min_gen,
703 self.routers)
704 while choices:
705 r = random.choice(choices)
706 yield r
707 choices.remove(r)
708
709 choices = filter(lambda r: r._generated[self.position]==min_gen,
710 self.routers)
711 plog("NOTICE", "Ran out of choices in ExactUniformGenerator. Incrementing nodes")
712 for r in choices:
713 r._generated[self.position] += 1
714
715 def mark_chosen(self, r):
716 r._generated[self.position] += 1
717 NodeGenerator.mark_chosen(self, r)
718
719 def rebuild(self, sorted_r=None):
720 plog("DEBUG", "Rebuilding ExactUniformGenerator")
721 NodeGenerator.rebuild(self, sorted_r)
722 for r in self.rstr_routers:
723 lgen = len(r._generated)
724 if lgen < self.position+1:
725 for i in xrange(lgen, self.position+1):
726 r._generated.append(0)
727
728
729 class OrderedExitGenerator(NodeGenerator):
730 """NodeGenerator that produces exits in an ordered fashion for a
731 specific port"""
732 def __init__(self, to_port, sorted_r, rstr_list):
733 self.to_port = to_port
734 self.next_exit_by_port = {}
735 NodeGenerator.__init__(self, sorted_r, rstr_list)
736
737 def rewind(self):
738 NodeGenerator.rewind(self)
739 if self.to_port not in self.next_exit_by_port or not self.next_exit_by_port[self.to_port]:
740 self.next_exit_by_port[self.to_port] = 0
741 self.last_idx = len(self.routers)
742 else:
743 self.last_idx = self.next_exit_by_port[self.to_port]
744
745 def set_port(self, port):
746 self.to_port = port
747 self.rewind()
748
749 def mark_chosen(self, r):
750 self.next_exit_by_port[self.to_port] += 1
751
752 def all_chosen(self):
753 return self.last_idx == self.next_exit_by_port[self.to_port]
754
755 def generate(self):
756 while True: # A do..while would be real nice here..
757 if self.next_exit_by_port[self.to_port] >= len(self.routers):
758 self.next_exit_by_port[self.to_port] = 0
759 yield self.routers[self.next_exit_by_port[self.to_port]]
760 self.next_exit_by_port[self.to_port] += 1
761 if self.last_idx == self.next_exit_by_port[self.to_port]:
762 break
763
764 class BwWeightedGenerator(NodeGenerator):
765 """
766
767 This is a generator designed to match the Tor Path Selection
768 algorithm. It will generate nodes weighted by their bandwidth,
769 but take the appropriate weighting into account against guard
770 nodes and exit nodes when they are chosen for positions other
771 than guard/exit. For background see:
772 routerlist.c::smartlist_choose_by_bandwidth(),
773 http://archives.seul.org/or/dev/Jul-2007/msg00021.html,
774 http://archives.seul.org/or/dev/Jul-2007/msg00056.html, and
775 https://tor-svn.freehaven.net/svn/tor/trunk/doc/spec/path-spec.txt
776 The formulas used are from the first or-dev link, but are proven
777 optimal and equivalent to the ones now used in routerlist.c in the
778 second or-dev link.
779
780 """
781 def __init__(self, sorted_r, rstr_list, pathlen, exit=False, guard=False):
782 """ Pass exit=True to create a generator for exit-nodes """
783 self.max_bandwidth = 10000000
784 # Out for an exit-node?
785 self.exit = exit
786 # Is this a guard node?
787 self.guard = guard
788 # Different sums of bandwidths
789 self.total_bw = 0
790 self.total_exit_bw = 0
791 self.total_guard_bw = 0
792 self.total_weighted_bw = 0
793 self.pathlen = pathlen
794 NodeGenerator.__init__(self, sorted_r, rstr_list)
795
796 def rebuild(self, sorted_r=None):
797 NodeGenerator.rebuild(self, sorted_r)
798 NodeGenerator.rewind(self)
799 # Set the exit_weight
800 # We are choosing a non-exit
801 self.total_exit_bw = 0
802 self.total_guard_bw = 0
803 self.total_bw = 0
804 for r in self.routers:
805 # TODO: Check max_bandwidth and cap...
806 self.total_bw += r.bw
807 if "Exit" in r.flags:
808 self.total_exit_bw += r.bw
809 if "Guard" in r.flags:
810 self.total_guard_bw += r.bw
811
812 bw_per_hop = (1.0*self.total_bw)/self.pathlen
813
814 # Print some debugging info about bandwidth ratios
815 if self.total_bw > 0:
816 e_ratio = self.total_exit_bw/float(self.total_bw)
817 g_ratio = self.total_guard_bw/float(self.total_bw)
818 else:
819 g_ratio = 0
820 e_ratio = 0
821 plog("DEBUG",
822 "E = " + str(self.total_exit_bw) +
823 ", G = " + str(self.total_guard_bw) +
824 ", T = " + str(self.total_bw) +
825 ", g_ratio = " + str(g_ratio) + ", e_ratio = " +str(e_ratio) +
826 ", bw_per_hop = " + str(bw_per_hop))
827
828 if self.exit:
829 self.exit_weight = 1.0
830 else:
831 if self.total_exit_bw < bw_per_hop:
832 # Don't use exit nodes at all
833 self.exit_weight = 0
834 else:
835 if self.total_exit_bw > 0:
836 self.exit_weight = ((self.total_exit_bw-bw_per_hop)/self.total_exit_bw)
837 else: self.exit_weight = 0
838
839 if self.guard:
840 self.guard_weight = 1.0
841 else:
842 if self.total_guard_bw < bw_per_hop:
843 # Don't use exit nodes at all
844 self.guard_weight = 0
845 else:
846 if self.total_guard_bw > 0:
847 self.guard_weight = ((self.total_guard_bw-bw_per_hop)/self.total_guard_bw)
848 else: self.guard_weight = 0
849
850 for r in self.routers:
851 bw = r.bw
852 if "Exit" in r.flags:
853 bw *= self.exit_weight
854 if "Guard" in r.flags:
855 bw *= self.guard_weight
856 self.total_weighted_bw += bw
857
858 self.total_weighted_bw = int(self.total_weighted_bw)
859 plog("DEBUG", "Bw: "+str(self.total_weighted_bw)+"/"+str(self.total_bw)
860 +". The exit-weight is: "+str(self.exit_weight)
861 + ", guard weight is: "+str(self.guard_weight))
862
863 def generate(self):
864 while True:
865 # Choose a suitable random int
866 i = random.randint(0, self.total_weighted_bw)
867
868 # Go through the routers
869 for r in self.routers:
870 # Below zero here means next() -> choose a new random int+router
871 if i < 0: break
872 bw = r.bw
873 if "Exit" in r.flags:
874 bw *= self.exit_weight
875 if "Guard" in r.flags:
876 bw *= self.guard_weight
877
878 i -= bw
879 if i < 0:
880 plog("DEBUG", "Chosen router with a bandwidth of: " + str(r.bw))
881 yield r
882
883 ####################### Secret Sauce ###########################
884
885 class PathError(Exception):
886 pass
887
888 class NoRouters(PathError):
889 pass
890
891 class PathSelector:
892 """Implementation of path selection policies. Builds a path according
893 to entry, middle, and exit generators that satisfies the path
894 restrictions."""
895 def __init__(self, entry_gen, mid_gen, exit_gen, path_restrict):
896 """Constructor. The first three arguments are NodeGenerators with
897 their appropriate restrictions. The 'path_restrict' is a
898 PathRestrictionList"""
899 self.entry_gen = entry_gen
900 self.mid_gen = mid_gen
901 self.exit_gen = exit_gen
902 self.path_restrict = path_restrict
903
904 def rebuild_gens(self, sorted_r):
905 "Rebuild the 3 generators with a new sorted router list"
906 self.entry_gen.rebuild(sorted_r)
907 self.mid_gen.rebuild(sorted_r)
908 self.exit_gen.rebuild(sorted_r)
909
910 def select_path(self, pathlen):
911 """Creates a path of 'pathlen' hops, and returns it as a list of
912 Router instances"""
913 self.entry_gen.rewind()
914 self.mid_gen.rewind()
915 self.exit_gen.rewind()
916 entry = self.entry_gen.generate()
917 mid = self.mid_gen.generate()
918 ext = self.exit_gen.generate()
919
920 plog("DEBUG", "Selecting path..")
921
922 while True:
923 path = []
924 plog("DEBUG", "Building path..")
925 try:
926 if pathlen == 1:
927 path = [ext.next()]
928 else:
929 path.append(entry.next())
930 for i in xrange(1, pathlen-1):
931 path.append(mid.next())
932 path.append(ext.next())
933 if self.path_restrict.path_is_ok(path):
934 self.entry_gen.mark_chosen(path[0])
935 for i in xrange(1, pathlen-1):
936 self.mid_gen.mark_chosen(path[i])
937 self.exit_gen.mark_chosen(path[pathlen-1])
938 plog("DEBUG", "Marked path.")
939 break
940 else:
941 plog("DEBUG", "Path rejected by path restrictions.")
942 except StopIteration:
943 plog("NOTICE", "Ran out of routers during buildpath..");
944 self.entry_gen.rewind()
945 self.mid_gen.rewind()
946 self.exit_gen.rewind()
947 entry = self.entry_gen.generate()
948 mid = self.mid_gen.generate()
949 ext = self.exit_gen.generate()
950 for r in path:
951 r.refcount += 1
952 plog("DEBUG", "Circ refcount "+str(r.refcount)+" for "+r.idhex)
953 return path
954
955 # TODO: Implement example manager.
956 class BaseSelectionManager:
957 """
958 The BaseSelectionManager is a minimalistic node selection manager.
959
960 It is meant to be used with a PathSelector that consists of an
961 entry NodeGenerator, a middle NodeGenerator, and an exit NodeGenerator.
962
963 However, none of these are absolutely necessary. It is possible
964 to completely avoid them if you wish by hacking whatever selection
965 mechanisms you want straight into this interface and then passing
966 an instance to a PathBuilder implementation.
967 """
968 def __init__(self):
969 self.bad_restrictions = False
970 self.consensus = None
971
972 def reconfigure(self, consensus=None):
973 """
974 This method is called whenever a significant configuration change
975 occurs. Currently, this only happens via PathBuilder.__init__ and
976 PathBuilder.schedule_selmgr().
977
978 This method should NOT throw any exceptions.
979 """
980 pass
981
982 def new_consensus(self, consensus):
983 """
984 This method is called whenever a consensus change occurs.
985
986 This method should NOT throw any exceptions.
987 """
988 pass
989
990 def set_exit(self, exit_name):
991 """
992 This method provides notification that a fixed exit is desired.
993
994 This method should NOT throw any exceptions.
995 """
996 pass
997
998 def set_target(self, host, port):
999 """
1000 This method provides notification that a new target endpoint is
1001 desired.
1002
1003 May throw a RestrictionError if target is impossible to reach.
1004 """
1005 pass
1006
1007 def select_path(self):
1008 """
1009 Returns a new path in the form of a list() of Router instances.
1010
1011 May throw a RestrictionError.
1012 """
1013 pass
1014
1015 class SelectionManager(BaseSelectionManager):
1016 """Helper class to handle configuration updates
1017
1018 The methods are NOT threadsafe. They may ONLY be called from
1019 EventHandler's thread. This means that to update the selection
1020 manager, you must schedule a config update job using
1021 PathBuilder.schedule_selmgr() with a worker function to modify
1022 this object.
1023
1024 XXX: Warning. The constructor of this class is subject to change
1025 and may undergo reorganization in the near future. Watch for falling
1026 bits.
1027 """
1028 # XXX: Hrmm, consider simplifying this. It is confusing and unweildy.
1029 def __init__(self, pathlen, order_exits,
1030 percent_fast, percent_skip, min_bw, use_all_exits,
1031 uniform, use_exit, use_guards,geoip_config=None,
1032 restrict_guards=False, extra_node_rstr=None, exit_ports=None,
1033 order_by_ratio=False):
1034 BaseSelectionManager.__init__(self)
1035 self.__ordered_exit_gen = None
1036 self.pathlen = pathlen
1037 self.order_exits = order_exits
1038 self.percent_fast = percent_fast
1039 self.percent_skip = percent_skip
1040 self.min_bw = min_bw
1041 self.use_all_exits = use_all_exits
1042 self.uniform = uniform
1043 self.exit_id = use_exit
1044 self.use_guards = use_guards
1045 self.geoip_config = geoip_config
1046 self.restrict_guards_only = restrict_guards
1047 self.bad_restrictions = False
1048 self.consensus = None
1049 self.exit_ports = exit_ports
1050 self.extra_node_rstr=extra_node_rstr
1051 self.order_by_ratio = order_by_ratio
1052
1053 def reconfigure(self, consensus=None):
1054 try:
1055 self._reconfigure(consensus)
1056 self.bad_restrictions = False
1057 except NoNodesRemain:
1058 plog("WARN", "No nodes remain in selection manager")
1059 self.bad_restrictions = True
1060 return self.bad_restrictions
1061
1062 def _reconfigure(self, consensus=None):
1063 """This function is called after a configuration change,
1064 to rebuild the RestrictionLists."""
1065 if consensus:
1066 plog("DEBUG", "Reconfigure with consensus")
1067 self.consensus = consensus
1068 else:
1069 plog("DEBUG", "Reconfigure without consensus")
1070
1071 sorted_r = self.consensus.sorted_r
1072
1073 if self.use_all_exits:
1074 self.path_rstr = PathRestrictionList([UniqueRestriction()])
1075 else:
1076 self.path_rstr = PathRestrictionList(
1077 [Subnet16Restriction(), UniqueRestriction()])
1078
1079 if self.use_guards: entry_flags = ["Guard", "Running"]
1080 else: entry_flags = ["Running"]
1081
1082 if self.restrict_guards_only:
1083 nonentry_skip = 0
1084 nonentry_fast = 100
1085 else:
1086 nonentry_skip = self.percent_skip
1087 nonentry_fast = self.percent_fast
1088
1089 if self.order_by_ratio:
1090 PctRstr = RatioPercentileRestriction
1091 else:
1092 PctRstr = PercentileRestriction
1093
1094 # XXX: sometimes we want the ability to do uniform scans
1095 # without the conserve exit restrictions..
1096 entry_rstr = NodeRestrictionList(
1097 [PctRstr(self.percent_skip, self.percent_fast, sorted_r),
1098 OrNodeRestriction(
1099 [FlagsRestriction(["BadExit"]),
1100 ConserveExitsRestriction(self.exit_ports)]),
1101 FlagsRestriction(entry_flags, [])]
1102 )
1103 mid_rstr = NodeRestrictionList(
1104 [PctRstr(nonentry_skip, nonentry_fast, sorted_r),
1105 OrNodeRestriction(
1106 [FlagsRestriction(["BadExit"]),
1107 ConserveExitsRestriction(self.exit_ports)]),
1108 FlagsRestriction(["Running"], [])]
1109
1110 )
1111
1112 if self.exit_id:
1113 self._set_exit(self.exit_id)
1114 plog("DEBUG", "Applying Setexit: "+self.exit_id)
1115 self.exit_rstr = NodeRestrictionList([IdHexRestriction(self.exit_id)])
1116 elif self.use_all_exits:
1117 self.exit_rstr = NodeRestrictionList(
1118 [FlagsRestriction(["Running"], ["BadExit"])])
1119 else:
1120 self.exit_rstr = NodeRestrictionList(
1121 [PctRstr(nonentry_skip, nonentry_fast, sorted_r),
1122 FlagsRestriction(["Running"], ["BadExit"])])
1123
1124 if self.extra_node_rstr:
1125 entry_rstr.add_restriction(self.extra_node_rstr)
1126 mid_rstr.add_restriction(self.extra_node_rstr)
1127 self.exit_rstr.add_restriction(self.extra_node_rstr)
1128
1129 # GeoIP configuration
1130 if self.geoip_config:
1131 # Every node needs country_code
1132 entry_rstr.add_restriction(CountryCodeRestriction())
1133 mid_rstr.add_restriction(CountryCodeRestriction())
1134 self.exit_rstr.add_restriction(CountryCodeRestriction())
1135
1136 # Specified countries for different positions
1137 if self.geoip_config.entry_country:
1138 entry_rstr.add_restriction(CountryRestriction(self.geoip_config.entry_country))
1139 if self.geoip_config.middle_country:
1140 mid_rstr.add_restriction(CountryRestriction(self.geoip_config.middle_country))
1141 if self.geoip_config.exit_country:
1142 self.exit_rstr.add_restriction(CountryRestriction(self.geoip_config.exit_country))
1143
1144 # Excluded countries
1145 if self.geoip_config.excludes:
1146 plog("INFO", "Excluded countries: " + str(self.geoip_config.excludes))
1147 if len(self.geoip_config.excludes) > 0:
1148 entry_rstr.add_restriction(ExcludeCountriesRestriction(self.geoip_config.excludes))
1149 mid_rstr.add_restriction(ExcludeCountriesRestriction(self.geoip_config.excludes))
1150 self.exit_rstr.add_restriction(ExcludeCountriesRestriction(self.geoip_config.excludes))
1151
1152 # Unique countries set? None --> pass
1153 if self.geoip_config.unique_countries != None:
1154 if self.geoip_config.unique_countries:
1155 # If True: unique countries
1156 self.path_rstr.add_restriction(UniqueCountryRestriction())
1157 else:
1158 # False: use the same country for all nodes in a path
1159 self.path_rstr.add_restriction(SingleCountryRestriction())
1160
1161 # Specify max number of continent crossings, None means UniqueContinents
1162 if self.geoip_config.continent_crossings == None:
1163 self.path_rstr.add_restriction(UniqueContinentRestriction())
1164 else: self.path_rstr.add_restriction(ContinentRestriction(self.geoip_config.continent_crossings))
1165 # Should even work in combination with continent crossings
1166 if self.geoip_config.ocean_crossings != None:
1167 self.path_rstr.add_restriction(OceanPhobicRestriction(self.geoip_config.ocean_crossings))
1168
1169 # This is kind of hokey..
1170 if self.order_exits:
1171 if self.__ordered_exit_gen:
1172 exitgen = self.__ordered_exit_gen
1173 exitgen.reset_restriction(self.exit_rstr)
1174 else:
1175 exitgen = self.__ordered_exit_gen = \
1176 OrderedExitGenerator(80, sorted_r, self.exit_rstr)
1177 elif self.uniform:
1178 exitgen = ExactUniformGenerator(sorted_r, self.exit_rstr)
1179 else:
1180 exitgen = BwWeightedGenerator(sorted_r, self.exit_rstr, self.pathlen, exit=True)
1181
1182 if self.uniform:
1183 self.path_selector = PathSelector(
1184 ExactUniformGenerator(sorted_r, entry_rstr),
1185 ExactUniformGenerator(sorted_r, mid_rstr),
1186 exitgen, self.path_rstr)
1187 else:
1188 # Remove ConserveExitsRestriction for entry and middle positions
1189 # by removing the OrNodeRestriction that contains it...
1190 # FIXME: This is a landmine for a poor soul to hit.
1191 # Then again, most of the rest of this function is, too.
1192 entry_rstr.del_restriction(OrNodeRestriction)
1193 mid_rstr.del_restriction(OrNodeRestriction)
1194 self.path_selector = PathSelector(
1195 BwWeightedGenerator(sorted_r, entry_rstr, self.pathlen,
1196 guard=self.use_guards),
1197 BwWeightedGenerator(sorted_r, mid_rstr, self.pathlen),
1198 exitgen, self.path_rstr)
1199 return
1200
1201 def _set_exit(self, exit_name):
1202 # sets an exit, if bad, sets bad_exit
1203 exit_id = None
1204 if exit_name:
1205 if exit_name[0] == '$':
1206 exit_id = exit_name
1207 elif exit_name in self.consensus.name_to_key:
1208 exit_id = self.consensus.name_to_key[exit_name]
1209 self.exit_id = exit_id
1210
1211 def set_exit(self, exit_name):
1212 self._set_exit(exit_name)
1213 self.exit_rstr.clear()
1214 if not self.exit_id:
1215 plog("NOTICE", "Requested null exit "+str(self.exit_id))
1216 self.bad_restrictions = True
1217 elif self.exit_id[1:] not in self.consensus.routers:
1218 plog("NOTICE", "Requested absent exit "+str(self.exit_id))
1219 self.bad_restrictions = True
1220 elif self.consensus.routers[self.exit_id[1:]].down:
1221 e = self.consensus.routers[self.exit_id[1:]]
1222 plog("NOTICE", "Requested downed exit "+str(self.exit_id)+" (bw: "+str(e.bw)+", flags: "+str(e.flags)+")")
1223 self.bad_restrictions = True
1224 elif self.consensus.routers[self.exit_id[1:]].deleted:
1225 e = self.consensus.routers[self.exit_id[1:]]
1226 plog("NOTICE", "Requested deleted exit "+str(self.exit_id)+" (bw: "+str(e.bw)+", flags: "+str(e.flags)+", Down: "+str(e.down)+", ref: "+str(e.refcount)+")")
1227 self.bad_restrictions = True
1228 else:
1229 self.exit_rstr.add_restriction(IdHexRestriction(self.exit_id))
1230 plog("DEBUG", "Added exit restriction for "+self.exit_id)
1231 try:
1232 self.path_selector.exit_gen.rebuild()
1233 self.bad_restrictions = False
1234 except RestrictionError, e:
1235 plog("WARN", "Restriction error "+str(e)+" after set_exit")
1236 self.bad_restrictions = True
1237 return self.bad_restrictions
1238
1239 def new_consensus(self, consensus):
1240 self.consensus = consensus
1241 try:
1242 self.path_selector.rebuild_gens(self.consensus.sorted_r)
1243 if self.exit_id:
1244 self.set_exit(self.exit_id)
1245 except NoNodesRemain:
1246 plog("NOTICE", "No viable nodes in consensus for restrictions.")
1247 # Punting + performing reconfigure..")
1248 #self.reconfigure(consensus)
1249
1250 def set_target(self, ip, port):
1251 # sets an exit policy, if bad, rasies exception..
1252 "Called to update the ExitPolicyRestrictions with a new ip and port"
1253 if self.bad_restrictions:
1254 plog("WARN", "Requested target with bad restrictions")
1255 raise RestrictionError()
1256 self.exit_rstr.del_restriction(ExitPolicyRestriction)
1257 self.exit_rstr.add_restriction(ExitPolicyRestriction(ip, port))
1258 if self.__ordered_exit_gen: self.__ordered_exit_gen.set_port(port)
1259 # Try to choose an exit node in the destination country
1260 # needs an IP != 255.255.255.255
1261 if self.geoip_config and self.geoip_config.echelon:
1262 import GeoIPSupport
1263 c = GeoIPSupport.get_country(ip)
1264 if c:
1265 plog("INFO", "[Echelon] IP "+ip+" is in ["+c+"]")
1266 self.exit_rstr.del_restriction(CountryRestriction)
1267 self.exit_rstr.add_restriction(CountryRestriction(c))
1268 else:
1269 plog("INFO", "[Echelon] Could not determine destination country of IP "+ip)
1270 # Try to use a backup country
1271 if self.geoip_config.exit_country:
1272 self.exit_rstr.del_restriction(CountryRestriction)
1273 self.exit_rstr.add_restriction(CountryRestriction(self.geoip_config.exit_country))
1274 # Need to rebuild exit generator
1275 self.path_selector.exit_gen.rebuild()
1276
1277 def select_path(self):
1278 if self.bad_restrictions:
1279 plog("WARN", "Requested target with bad restrictions")
1280 raise RestrictionError()
1281 return self.path_selector.select_path(self.pathlen)
1282
1283 class Circuit:
1284 "Class to describe a circuit"
1285 def __init__(self):
1286 self.circ_id = 0
1287 self.path = [] # routers
1288 self.exit = None
1289 self.built = False
1290 self.failed = False
1291 self.dirty = False
1292 self.requested_closed = False
1293 self.detached_cnt = 0
1294 self.last_extended_at = time.time()
1295 self.extend_times = [] # List of all extend-durations
1296 self.setup_duration = None # Sum of extend-times
1297 self.pending_streams = [] # Which stream IDs are pending us
1298 # XXX: Unused.. Need to use for refcounting because
1299 # sometimes circuit closed events come before the stream
1300 # close and we need to track those failures..
1301 self.carried_streams = []
1302
1303 def id_path(self):
1304 "Returns a list of idhex keys for the path of Routers"
1305 return map(lambda r: r.idhex, self.path)
1306
1307 class Stream:
1308 "Class to describe a stream"
1309 def __init__(self, sid, host, port, kind):
1310 self.strm_id = sid
1311 self.detached_from = [] # circ id #'s
1312 self.pending_circ = None
1313 self.circ = None
1314 self.host = host
1315 self.port = port
1316 self.kind = kind
1317 self.attached_at = 0
1318 self.bytes_read = 0
1319 self.bytes_written = 0
1320 self.failed = False
1321 self.ignored = False # Set if PURPOSE=DIR_*
1322 self.failed_reason = None # Cheating a little.. Only used by StatsHandler
1323
1324 def lifespan(self, now):
1325 "Returns the age of the stream"
1326 return now-self.attached_at
1327
1328 _origsocket = socket.socket
1329 class _SocketWrapper(socket.socket):
1330 """ Ghetto wrapper to workaround python same_slots_added() and
1331 socket __base__ braindamage """
1332 pass
1333
1334 class SmartSocket(_SocketWrapper):
1335 """ A SmartSocket is a socket that tracks global socket creation
1336 for local ports. It has a member StreamSelector that can
1337 be used as a PathBuilder stream StreamSelector (see below).
1338
1339 Most users will want to reset the base class of SocksiPy to
1340 use this class:
1341 __oldsocket = socket.socket
1342 socket.socket = PathSupport.SmartSocket
1343 import SocksiPy
1344 socket.socket = __oldsocket
1345 """
1346 port_table = set()
1347 _table_lock = threading.Lock()
1348
1349 def __init__(self, family=2, type=1, proto=0, _sock=None):
1350 ret = super(SmartSocket, self).__init__(family, type, proto, _sock)
1351 self.__local_addr = None
1352 plog("DEBUG", "New socket constructor")
1353 return ret
1354
1355 def connect(self, args):
1356 ret = super(SmartSocket, self).connect(args)
1357 myaddr = self.getsockname()
1358 self.__local_addr = myaddr[0]+":"+str(myaddr[1])
1359 SmartSocket._table_lock.acquire()
1360 assert(self.__local_addr not in SmartSocket.port_table)
1361 SmartSocket.port_table.add(myaddr[0]+":"+str(myaddr[1]))
1362 SmartSocket._table_lock.release()
1363 plog("DEBUG", "Added "+self.__local_addr+" to our local port list")
1364 return ret
1365
1366 def connect_ex(self, args):
1367 ret = super(SmartSocket, self).connect_ex(args)
1368 myaddr = ret.getsockname()
1369 self.__local_addr = myaddr[0]+":"+str(myaddr[1])
1370 SmartSocket._table_lock.acquire()
1371 assert(self.__local_addr not in SmartSocket.port_table)
1372 SmartSocket.port_table.add(myaddr[0]+":"+str(myaddr[1]))
1373 SmartSocket._table_lock.release()
1374 plog("DEBUG", "Added "+self.__local_addr+" to our local port list")
1375 return ret
1376
1377 def __del__(self):
1378 if self.__local_addr:
1379 SmartSocket._table_lock.acquire()
1380 SmartSocket.port_table.remove(self.__local_addr)
1381 plog("DEBUG", "Removed "+self.__local_addr+" from our local port list")
1382 SmartSocket._table_lock.release()
1383 else:
1384 plog("DEBUG", "Got a socket deletion with no address")
1385
1386 def table_size():
1387 SmartSocket._table_lock.acquire()
1388 ret = len(SmartSocket.port_table)
1389 SmartSocket._table_lock.release()
1390 return ret
1391 table_size = Callable(table_size)
1392
1393 def clear_port_table():
1394 """ WARNING: Calling this periodically is a *really good idea*.
1395 Relying on __del__ can expose you to race conditions on garbage
1396 collection between your processes. """
1397 SmartSocket._table_lock.acquire()
1398 for i in list(SmartSocket.port_table):
1399 plog("DEBUG", "Cleared "+i+" from our local port list")
1400 SmartSocket.port_table.remove(i)
1401 SmartSocket._table_lock.release()
1402 clear_port_table = Callable(clear_port_table)
1403
1404 def StreamSelector(host, port):
1405 to_test = host+":"+str(port)
1406 SmartSocket._table_lock.acquire()
1407 ret = (to_test in SmartSocket.port_table)
1408 SmartSocket._table_lock.release()
1409 return ret
1410 StreamSelector = Callable(StreamSelector)
1411
1412
1413 def StreamSelector(host, port):
1414 """ A StreamSelector is a function that takes a host and a port as
1415 arguments (parsed from Tor's SOURCE_ADDR field in STREAM NEW
1416 events) and decides if it is a stream from this process or not.
1417
1418 This StreamSelector is just a placeholder that always returns True.
1419 When you define your own, be aware that you MUST DO YOUR OWN
1420 LOCKING inside this function, as it is called from the Eventhandler
1421 thread.
1422
1423 See PathSupport.SmartSocket.StreamSelctor for an actual
1424 implementation.
1425
1426 """
1427 return True
1428
1429 # TODO: Make passive "PathWatcher" so people can get aggregate
1430 # node reliability stats for normal usage without us attaching streams
1431 # Can use __metaclass__ and type
1432
1433 class PathBuilder(TorCtl.ConsensusTracker):
1434 """
1435 PathBuilder implementation. Handles circuit construction, subject
1436 to the constraints of the SelectionManager selmgr.
1437
1438 Do not access this object from other threads. Instead, use the
1439 schedule_* functions to schedule work to be done in the thread
1440 of the EventHandler.
1441 """
1442 def __init__(self, c, selmgr, RouterClass=TorCtl.Router,
1443 strm_selector=StreamSelector):
1444 """Constructor. 'c' is a Connection, 'selmgr' is a SelectionManager,
1445 and 'RouterClass' is a class that inherits from Router and is used
1446 to create annotated Routers."""
1447 TorCtl.ConsensusTracker.__init__(self, c, RouterClass)
1448 self.last_exit = None
1449 self.new_nym = False
1450 self.resolve_port = 0
1451 self.num_circuits = 1
1452 self.circuits = {}
1453 self.streams = {}
1454 self.selmgr = selmgr
1455 self.selmgr.reconfigure(self.current_consensus())
1456 self.imm_jobs = Queue.Queue()
1457 self.low_prio_jobs = Queue.Queue()
1458 self.run_all_jobs = False
1459 self.do_reconfigure = False
1460 self.strm_selector = strm_selector
1461 plog("INFO", "Read "+str(len(self.sorted_r))+"/"+str(len(self.ns_map))+" routers")
1462
1463 def schedule_immediate(self, job):
1464 """
1465 Schedules an immediate job to be run before the next event is
1466 processed.
1467 """
1468 assert(self.c.is_live())
1469 self.imm_jobs.put(job)
1470
1471 def schedule_low_prio(self, job):
1472 """
1473 Schedules a job to be run when a non-time critical event arrives.
1474 """
1475 assert(self.c.is_live())
1476 self.low_prio_jobs.put(job)
1477
1478 def reset(self):
1479 """
1480 Resets accumulated state. Currently only clears the
1481 ExactUniformGenerator state.
1482 """
1483 plog("DEBUG", "Resetting _generated values for ExactUniformGenerator")
1484 for r in self.routers.itervalues():
1485 for g in xrange(0, len(r._generated)):
1486 r._generated[g] = 0
1487
1488 def is_urgent_event(event):
1489 # If event is stream:NEW*/DETACHED or circ BUILT/FAILED,
1490 # it is high priority and requires immediate action.
1491 if isinstance(event, TorCtl.CircuitEvent):
1492 if event.status in ("BUILT", "FAILED", "CLOSED"):
1493 return True
1494 elif isinstance(event, TorCtl.StreamEvent):
1495 if event.status in ("NEW", "NEWRESOLVE", "DETACHED"):
1496 return True
1497 return False
1498 is_urgent_event = Callable(is_urgent_event)
1499
1500 def schedule_selmgr(self, job):
1501 """
1502 Schedules an immediate job to be run before the next event is
1503 processed. Also notifies the selection manager that it needs
1504 to update itself.
1505 """
1506 assert(self.c.is_live())
1507 def notlambda(this):
1508 job(this.selmgr)
1509 this.do_reconfigure = True
1510 self.schedule_immediate(notlambda)
1511
1512
1513 def heartbeat_event(self, event):
1514 """This function handles dispatching scheduled jobs. If you
1515 extend PathBuilder and want to implement this function for
1516 some reason, be sure to call the parent class"""
1517 while not self.imm_jobs.empty():
1518 imm_job = self.imm_jobs.get_nowait()
1519 imm_job(self)
1520
1521 if self.do_reconfigure:
1522 self.selmgr.reconfigure(self.current_consensus())
1523 self.do_reconfigure = False
1524
1525 if self.run_all_jobs:
1526 while not self.low_prio_jobs.empty() and self.run_all_jobs:
1527 imm_job = self.low_prio_jobs.get_nowait()
1528 imm_job(self)
1529 self.run_all_jobs = False
1530 return
1531
1532 # If event is stream:NEW*/DETACHED or circ BUILT/FAILED,
1533 # don't run low prio jobs.. No need to delay streams for them.
1534 if PathBuilder.is_urgent_event(event): return
1535
1536 # Do the low prio jobs one at a time in case a
1537 # higher priority event is queued
1538 if not self.low_prio_jobs.empty():
1539 delay_job = self.low_prio_jobs.get_nowait()
1540 delay_job(self)
1541
1542 def build_path(self):
1543 """ Get a path from the SelectionManager's PathSelector, can be used
1544 e.g. for generating paths without actually creating any circuits """
1545 return self.selmgr.select_path()
1546
1547 def close_all_streams(self, reason):
1548 """ Close all open streams """
1549 for strm in self.streams.itervalues():
1550 if not strm.ignored:
1551 try:
1552 self.c.close_stream(strm.strm_id, reason)
1553 except TorCtl.ErrorReply, e:
1554 # This can happen. Streams can timeout before this call.
1555 plog("NOTICE", "Error closing stream "+str(strm.strm_id)+": "+str(e))
1556
1557 def close_all_circuits(self):
1558 """ Close all open circuits """
1559 for circ in self.circuits.itervalues():
1560 self.close_circuit(circ.circ_id)
1561
1562 def close_circuit(self, id):
1563 """ Close a circuit with given id """
1564 # TODO: Pass streams to another circ before closing?
1565 plog("DEBUG", "Requesting close of circuit id: "+str(id))
1566 if self.circuits[id].requested_closed: return
1567 self.circuits[id].requested_closed = True
1568 try: self.c.close_circuit(id)
1569 except TorCtl.ErrorReply, e:
1570 plog("ERROR", "Failed closing circuit " + str(id) + ": " + str(e))
1571
1572 def circuit_list(self):
1573 """ Return an iterator or a list of circuits prioritized for
1574 stream selection."""
1575 return self.circuits.itervalues()
1576
1577 def attach_stream_any(self, stream, badcircs):
1578 "Attach a stream to a valid circuit, avoiding any in 'badcircs'"
1579 # Newnym, and warn if not built plus pending
1580 unattached_streams = [stream]
1581 if self.new_nym:
1582 self.new_nym = False
1583 plog("DEBUG", "Obeying new nym")
1584 for key in self.circuits.keys():
1585 if (not self.circuits[key].dirty
1586 and len(self.circuits[key].pending_streams)):
1587 plog("WARN", "New nym called, destroying circuit "+str(key)
1588 +" with "+str(len(self.circuits[key].pending_streams))
1589 +" pending streams")
1590 unattached_streams.extend(self.circuits[key].pending_streams)
1591 self.circuits[key].pending_streams = []
1592 # FIXME: Consider actually closing circ if no streams.
1593 self.circuits[key].dirty = True
1594
1595 for circ in self.circuit_list():
1596 if circ.built and not circ.requested_closed and not circ.dirty \
1597 and circ.circ_id not in badcircs:
1598 # XXX: Fails for 'tor-resolve 530.19.6.80' -> NEWRESOLVE
1599 if circ.exit.will_exit_to(stream.host, stream.port):
1600 try:
1601 self.c.attach_stream(stream.strm_id, circ.circ_id)
1602 stream.pending_circ = circ # Only one possible here
1603 circ.pending_streams.append(stream)
1604 except TorCtl.ErrorReply, e:
1605 # No need to retry here. We should get the failed
1606 # event for either the circ or stream next
1607 plog("WARN", "Error attaching new stream: "+str(e.args))
1608 return
1609 break
1610 # This else clause is executed when we go through the circuit
1611 # list without finding an entry (or it is empty).
1612 # http://docs.python.org/tutorial/controlflow.html#break-and-continue-statements-and-else-clauses-on-loops
1613 else:
1614 circ = None
1615 try:
1616 self.selmgr.set_target(stream.host, stream.port)
1617 circ = self.c.build_circuit(self.selmgr.select_path())
1618 except RestrictionError, e:
1619 # XXX: Dress this up a bit
1620 self.last_exit = None
1621 # Kill this stream
1622 plog("WARN", "Closing impossible stream "+str(stream.strm_id)+" ("+str(e)+")")
1623 try:
1624 self.c.close_stream(stream.strm_id, "4") # END_STREAM_REASON_EXITPOLICY
1625 except TorCtl.ErrorReply, e:
1626 plog("WARN", "Error closing stream: "+str(e))
1627 return
1628 except TorCtl.ErrorReply, e:
1629 plog("WARN", "Error building circ: "+str(e.args))
1630 self.last_exit = None
1631 # Kill this stream
1632 plog("NOTICE", "Closing stream "+str(stream.strm_id))
1633 try:
1634 self.c.close_stream(stream.strm_id, "5") # END_STREAM_REASON_DESTROY
1635 except TorCtl.ErrorReply, e:
1636 plog("WARN", "Error closing stream: "+str(e))
1637 return
1638 for u in unattached_streams:
1639 plog("DEBUG",
1640 "Attaching "+str(u.strm_id)+" pending build of "+str(circ.circ_id))
1641 u.pending_circ = circ
1642 circ.pending_streams.extend(unattached_streams)
1643 self.circuits[circ.circ_id] = circ
1644 self.last_exit = circ.exit
1645 plog("DEBUG", "Set last exit to "+self.last_exit.idhex)
1646
1647 def circ_status_event(self, c):
1648 output = [str(time.time()-c.arrived_at), c.event_name, str(c.circ_id),
1649 c.status]
1650 if c.path: output.append(",".join(c.path))
1651 if c.reason: output.append("REASON=" + c.reason)
1652 if c.remote_reason: output.append("REMOTE_REASON=" + c.remote_reason)
1653 plog("DEBUG", " ".join(output))
1654 # Circuits we don't control get built by Tor
1655 if c.circ_id not in self.circuits:
1656 plog("DEBUG", "Ignoring circ " + str(c.circ_id))
1657 return
1658 if c.status == "EXTENDED":
1659 self.circuits[c.circ_id].last_extended_at = c.arrived_at
1660 elif c.status == "FAILED" or c.status == "CLOSED":
1661 # XXX: Can still get a STREAM FAILED for this circ after this
1662 circ = self.circuits[c.circ_id]
1663 for r in circ.path:
1664 r.refcount -= 1
1665 plog("DEBUG", "Close refcount "+str(r.refcount)+" for "+r.idhex)
1666 if r.deleted and r.refcount == 0:
1667 # XXX: This shouldn't happen with StatsRouters..
1668 if r.__class__.__name__ == "StatsRouter":
1669 plog("WARN", "Purging expired StatsRouter "+r.idhex)
1670 else:
1671 plog("INFO", "Purging expired router "+r.idhex)
1672 del self.routers[r.idhex]
1673 self.selmgr.new_consensus(self.current_consensus())
1674 del self.circuits[c.circ_id]
1675 for stream in circ.pending_streams:
1676 # If it was built, let Tor decide to detach or fail the stream
1677 if not circ.built:
1678 plog("DEBUG", "Finding new circ for " + str(stream.strm_id))
1679 self.attach_stream_any(stream, stream.detached_from)
1680 else:
1681 plog("NOTICE", "Waiting on Tor to hint about stream "+str(stream.strm_id)+" on closed circ "+str(circ.circ_id))
1682 elif c.status == "BUILT":
1683 self.circuits[c.circ_id].built = True
1684 try:
1685 for stream in self.circuits[c.circ_id].pending_streams:
1686 self.c.attach_stream(stream.strm_id, c.circ_id)
1687 except TorCtl.ErrorReply, e:
1688 # No need to retry here. We should get the failed
1689 # event for either the circ or stream in the next event
1690 plog("NOTICE", "Error attaching pending stream: "+str(e.args))
1691 return
1692
1693 def stream_status_event(self, s):
1694 output = [str(time.time()-s.arrived_at), s.event_name, str(s.strm_id),
1695 s.status, str(s.circ_id),
1696 s.target_host, str(s.target_port)]
1697 if s.reason: output.append("REASON=" + s.reason)
1698 if s.remote_reason: output.append("REMOTE_REASON=" + s.remote_reason)
1699 if s.purpose: output.append("PURPOSE=" + s.purpose)
1700 if s.source_addr: output.append("SOURCE_ADDR="+s.source_addr)
1701 if not re.match(r"\d+.\d+.\d+.\d+", s.target_host):
1702 s.target_host = "255.255.255.255" # ignore DNS for exit policy check
1703
1704 # Hack to ignore Tor-handled streams
1705 if s.strm_id in self.streams and self.streams[s.strm_id].ignored:
1706 if s.status == "CLOSED":
1707 plog("DEBUG", "Deleting ignored stream: " + str(s.strm_id))
1708 del self.streams[s.strm_id]
1709 else:
1710 plog("DEBUG", "Ignoring stream: " + str(s.strm_id))
1711 return
1712
1713 plog("DEBUG", " ".join(output))
1714 # XXX: Copy s.circ_id==0 check+reset from StatsSupport here too?
1715
1716 if s.status == "NEW" or s.status == "NEWRESOLVE":
1717 if s.status == "NEWRESOLVE" and not s.target_port:
1718 s.target_port = self.resolve_port
1719 if s.circ_id == 0:
1720 self.streams[s.strm_id] = Stream(s.strm_id, s.target_host, s.target_port, s.status)
1721 elif s.strm_id not in self.streams:
1722 plog("NOTICE", "Got new stream "+str(s.strm_id)+" with circuit "
1723 +str(s.circ_id)+" already attached.")
1724 self.streams[s.strm_id] = Stream(s.strm_id, s.target_host, s.target_port, s.status)
1725 self.streams[s.strm_id].circ_id = s.circ_id
1726
1727 # Remember Tor-handled streams (Currently only directory streams)
1728
1729 if s.purpose and s.purpose.find("DIR_") == 0:
1730 self.streams[s.strm_id].ignored = True
1731 plog("DEBUG", "Ignoring stream: " + str(s.strm_id))
1732 return
1733 elif s.source_addr:
1734 src_addr = s.source_addr.split(":")
1735 src_addr[1] = int(src_addr[1])
1736 if not self.strm_selector(*src_addr):
1737 self.streams[s.strm_id].ignored = True
1738 plog("INFO", "Ignoring foreign stream: " + str(s.strm_id))
1739 return
1740 if s.circ_id == 0:
1741 self.attach_stream_any(self.streams[s.strm_id],
1742 self.streams[s.strm_id].detached_from)
1743 elif s.status == "DETACHED":
1744 if s.strm_id not in self.streams:
1745 plog("WARN", "Detached stream "+str(s.strm_id)+" not found")
1746 self.streams[s.strm_id] = Stream(s.strm_id, s.target_host,
1747 s.target_port, "NEW")
1748 # FIXME Stats (differentiate Resolved streams also..)
1749 if not s.circ_id:
1750 if s.reason == "TIMEOUT" or s.reason == "EXITPOLICY":
1751 plog("NOTICE", "Stream "+str(s.strm_id)+" detached with "+s.reason)
1752 else:
1753 plog("WARN", "Stream "+str(s.strm_id)+" detached from no circuit with reason: "+str(s.reason))
1754 else:
1755 self.streams[s.strm_id].detached_from.append(s.circ_id)
1756
1757 if self.streams[s.strm_id].pending_circ and \
1758 self.streams[s.strm_id] in \
1759 self.streams[s.strm_id].pending_circ.pending_streams:
1760 self.streams[s.strm_id].pending_circ.pending_streams.remove(
1761 self.streams[s.strm_id])
1762 self.streams[s.strm_id].pending_circ = None
1763 self.attach_stream_any(self.streams[s.strm_id],
1764 self.streams[s.strm_id].detached_from)
1765 elif s.status == "SUCCEEDED":
1766 if s.strm_id not in self.streams:
1767 plog("NOTICE", "Succeeded stream "+str(s.strm_id)+" not found")
1768 return
1769 if s.circ_id and self.streams[s.strm_id].pending_circ.circ_id != s.circ_id:
1770 # Hrmm.. this can happen on a new-nym.. Very rare, putting warn
1771 # in because I'm still not sure this is correct
1772 plog("WARN", "Mismatch of pending: "
1773 +str(self.streams[s.strm_id].pending_circ.circ_id)+" vs "
1774 +str(s.circ_id))
1775 # This can happen if the circuit existed before we started up
1776 if s.circ_id in self.circuits:
1777 self.streams[s.strm_id].circ = self.circuits[s.circ_id]
1778 else:
1779 plog("NOTICE", "Stream "+str(s.strm_id)+" has unknown circuit: "+str(s.circ_id))
1780 else:
1781 self.streams[s.strm_id].circ = self.streams[s.strm_id].pending_circ
1782 self.streams[s.strm_id].pending_circ.pending_streams.remove(self.streams[s.strm_id])
1783 self.streams[s.strm_id].pending_circ = None
1784 self.streams[s.strm_id].attached_at = s.arrived_at
1785 elif s.status == "FAILED" or s.status == "CLOSED":
1786 # FIXME stats
1787 if s.strm_id not in self.streams:
1788 plog("NOTICE", "Failed stream "+str(s.strm_id)+" not found")
1789 return
1790
1791 # XXX: Can happen on timeout
1792 if not s.circ_id:
1793 if s.reason == "TIMEOUT" or s.reason == "EXITPOLICY":
1794 plog("NOTICE", "Stream "+str(s.strm_id)+" "+s.status+" with "+s.reason)
1795 else:
1796 plog("WARN", "Stream "+str(s.strm_id)+" "+s.status+" from no circuit with reason: "+str(s.reason))
1797
1798 # We get failed and closed for each stream. OK to return
1799 # and let the closed do the cleanup
1800 if s.status == "FAILED":
1801 # Avoid busted circuits that will not resolve or carry
1802 # traffic.
1803 self.streams[s.strm_id].failed = True
1804 if s.circ_id in self.circuits: self.circuits[s.circ_id].dirty = True
1805 elif s.circ_id != 0:
1806 plog("WARN", "Failed stream "+str(s.strm_id)+" on unknown circ "+str(s.circ_id))
1807 return
1808
1809 if self.streams[s.strm_id].pending_circ:
1810 self.streams[s.strm_id].pending_circ.pending_streams.remove(self.streams[s.strm_id])
1811 del self.streams[s.strm_id]
1812 elif s.status == "REMAP":
1813 if s.strm_id not in self.streams:
1814 plog("WARN", "Remap id "+str(s.strm_id)+" not found")
1815 else:
1816 if not re.match(r"\d+.\d+.\d+.\d+", s.target_host):
1817 s.target_host = "255.255.255.255"
1818 plog("NOTICE", "Non-IP remap for "+str(s.strm_id)+" to "
1819 + s.target_host)
1820 self.streams[s.strm_id].host = s.target_host
1821 self.streams[s.strm_id].port = s.target_port
1822
1823 def stream_bw_event(self, s):
1824 output = [str(time.time()-s.arrived_at), s.event_name, str(s.strm_id),
1825 str(s.bytes_written),
1826 str(s.bytes_read)]
1827 if not s.strm_id in self.streams:
1828 plog("DEBUG", " ".join(output))
1829 plog("WARN", "BW event for unknown stream id: "+str(s.strm_id))
1830 else:
1831 if not self.streams[s.strm_id].ignored:
1832 plog("DEBUG", " ".join(output))
1833 self.streams[s.strm_id].bytes_read += s.bytes_read
1834 self.streams[s.strm_id].bytes_written += s.bytes_written
1835
1836 def new_consensus_event(self, n):
1837 TorCtl.ConsensusTracker.new_consensus_event(self, n)
1838 self.selmgr.new_consensus(self.current_consensus())
1839
1840 def new_desc_event(self, d):
1841 if TorCtl.ConsensusTracker.new_desc_event(self, d):
1842 self.selmgr.new_consensus(self.current_consensus())
1843
1844 def bandwidth_event(self, b): pass # For heartbeat only..
1845
1846 ################### CircuitHandler #############################
1847
1848 class CircuitHandler(PathBuilder):
1849 """ CircuitHandler that extends from PathBuilder to handle multiple
1850 circuits as opposed to just one. """
1851 def __init__(self, c, selmgr, num_circuits, RouterClass):
1852 """Constructor. 'c' is a Connection, 'selmgr' is a SelectionManager,
1853 'num_circuits' is the number of circuits to keep in the pool,
1854 and 'RouterClass' is a class that inherits from Router and is used
1855 to create annotated Routers."""
1856 PathBuilder.__init__(self, c, selmgr, RouterClass)
1857 # Set handler to the connection here to
1858 # not miss any circuit events on startup
1859 c.set_event_handler(self)
1860 self.num_circuits = num_circuits # Size of the circuit pool
1861 self.check_circuit_pool() # Bring up the pool of circs
1862
1863 def check_circuit_pool(self):
1864 """ Init or check the status of the circuit-pool """
1865 # Get current number of circuits
1866 n = len(self.circuits.values())
1867 i = self.num_circuits-n
1868 if i > 0:
1869 plog("INFO", "Checked pool of circuits: we need to build " +
1870 str(i) + " circuits")
1871 # Schedule (num_circs-n) circuit-buildups
1872 while (n < self.num_circuits):
1873 # TODO: Should mimic Tor's learning here
1874 self.build_circuit("255.255.255.255", 80)
1875 plog("DEBUG", "Scheduled circuit No. " + str(n+1))
1876 n += 1
1877
1878 def build_circuit(self, host, port):
1879 """ Build a circuit """
1880 circ = None
1881 while circ == None:
1882 try:
1883 self.selmgr.set_target(host, port)
1884 circ = self.c.build_circuit(self.selmgr.select_path())
1885 self.circuits[circ.circ_id] = circ
1886 return circ
1887 except RestrictionError, e:
1888 # XXX: Dress this up a bit
1889 traceback.print_exc()
1890 plog("ERROR", "Impossible restrictions: "+str(e))
1891 except TorCtl.ErrorReply, e:
1892 traceback.print_exc()
1893 plog("WARN", "Error building circuit: " + str(e.args))
1894
1895 def circ_status_event(self, c):
1896 """ Handle circuit status events """
1897 output = [c.event_name, str(c.circ_id), c.status]
1898 if c.path: output.append(",".join(c.path))
1899 if c.reason: output.append("REASON=" + c.reason)
1900 if c.remote_reason: output.append("REMOTE_REASON=" + c.remote_reason)
1901 plog("DEBUG", " ".join(output))
1902
1903 # Circuits we don't control get built by Tor
1904 if c.circ_id not in self.circuits:
1905 plog("DEBUG", "Ignoring circuit " + str(c.circ_id) +
1906 " (controlled by Tor)")
1907 return
1908
1909 # EXTENDED
1910 if c.status == "EXTENDED":
1911 # Compute elapsed time
1912 extend_time = c.arrived_at-self.circuits[c.circ_id].last_extended_at
1913 self.circuits[c.circ_id].extend_times.append(extend_time)
1914 plog("INFO", "Circuit " + str(c.circ_id) + " extended in " +
1915 str(extend_time) + " sec")
1916 self.circuits[c.circ_id].last_extended_at = c.arrived_at
1917
1918 # FAILED & CLOSED
1919 elif c.status == "FAILED" or c.status == "CLOSED":
1920 PathBuilder.circ_status_event(self, c)
1921 # Check if there are enough circs
1922 self.check_circuit_pool()
1923 return
1924 # BUILT
1925 elif c.status == "BUILT":
1926 PathBuilder.circ_status_event(self, c)
1927 # Compute duration by summing up extend_times
1928 circ = self.circuits[c.circ_id]
1929 duration = reduce(lambda x, y: x+y, circ.extend_times, 0.0)
1930 plog("INFO", "Circuit " + str(c.circ_id) + " needed " +
1931 str(duration) + " seconds to be built")
1932 # Save the duration to the circuit for later use
1933 circ.setup_duration = duration
1934
1935 # OTHER?
1936 else:
1937 # If this was e.g. a LAUNCHED
1938 pass
1939
1940 ################### StreamHandler ##############################
1941
1942 class StreamHandler(CircuitHandler):
1943 """ StreamHandler that extends from the CircuitHandler
1944 to handle attaching streams to an appropriate circuit
1945 in the pool. """
1946 def __init__(self, c, selmgr, num_circs, RouterClass):
1947 CircuitHandler.__init__(self, c, selmgr, num_circs, RouterClass)
1948
1949 def clear_dns_cache(self):
1950 """ Send signal CLEARDNSCACHE """
1951 lines = self.c.sendAndRecv("SIGNAL CLEARDNSCACHE\r\n")
1952 for _, msg, more in lines:
1953 plog("DEBUG", "CLEARDNSCACHE: " + msg)
1954
1955 def close_stream(self, id, reason):
1956 """ Close a stream with given id and reason """
1957 self.c.close_stream(id, reason)
1958
1959 def address_mapped_event(self, event):
1960 """ It is necessary to listen to ADDRMAP events to be able to
1961 perform DNS lookups using Tor """
1962 output = [event.event_name, event.from_addr, event.to_addr,
1963 time.asctime(event.when)]
1964 plog("DEBUG", " ".join(output))
1965
1966 def unknown_event(self, event):
1967 plog("DEBUG", "UNKNOWN EVENT '" + event.event_name + "':" +
1968 event.event_string)
1969
1970 ########################## Unit tests ##########################
1971
1972 def do_gen_unit(gen, r_list, weight_bw, num_print):
1973 trials = 0
1974 for r in r_list:
1975 if gen.rstr_list.r_is_ok(r):
1976 trials += weight_bw(gen, r)
1977 trials = int(trials/1024)
1978
1979 print "Running "+str(trials)+" trials"
1980
1981 # 0. Reset r.chosen = 0 for all routers
1982 for r in r_list:
1983 r.chosen = 0
1984
1985 # 1. Generate 'trials' choices:
1986 # 1a. r.chosen++
1987
1988 loglevel = TorUtil.loglevel
1989 TorUtil.loglevel = "INFO"
1990
1991 gen.rewind()
1992 rtrs = gen.generate()
1993 for i in xrange(1, trials):
1994 r = rtrs.next()
1995 r.chosen += 1
1996
1997 TorUtil.loglevel = loglevel
1998
1999 # 2. Print top num_print routers choices+bandwidth stats+flags
2000 i = 0
2001 copy_rlist = copy.copy(r_list)
2002 copy_rlist.sort(lambda x, y: cmp(y.chosen, x.chosen))
2003 for r in copy_rlist:
2004 if r.chosen and not gen.rstr_list.r_is_ok(r):
2005 print "WARN: Restriction fail at "+r.idhex
2006 if not r.chosen and gen.rstr_list.r_is_ok(r):
2007 print "WARN: Generation fail at "+r.idhex
2008 if not gen.rstr_list.r_is_ok(r): continue
2009 flag = ""
2010 bw = int(weight_bw(gen, r))
2011 if "Exit" in r.flags:
2012 flag += "E"
2013 if "Guard" in r.flags:
2014 flag += "G"
2015 print str(r.list_rank)+". "+r.nickname+" "+str(r.bw/1024.0)+"/"+str(bw/1024.0)+": "+str(r.chosen)+", "+flag
2016 i += 1
2017 if i > num_print: break
2018
2019 def do_unit(rst, r_list, plamb):
2020 print "\n"
2021 print "-----------------------------------"
2022 print rst.r_is_ok.im_class
2023 above_i = 0
2024 above_bw = 0
2025 below_i = 0
2026 below_bw = 0
2027 for r in r_list:
2028 if rst.r_is_ok(r):
2029 print r.nickname+" "+plamb(r)+"="+str(rst.r_is_ok(r))+" "+str(r.bw)
2030 if r.bw > 400000:
2031 above_i = above_i + 1
2032 above_bw += r.bw
2033 else:
2034 below_i = below_i + 1
2035 below_bw += r.bw
2036
2037 print "Routers above: " + str(above_i) + " bw: " + str(above_bw)
2038 print "Routers below: " + str(below_i) + " bw: " + str(below_bw)
2039
2040 # TODO: Tests:
2041 # - Test each NodeRestriction and print in/out lines for it
2042 # - Test NodeGenerator and reapply NodeRestrictions
2043 # - Same for PathSelector and PathRestrictions
2044 # - Also Reapply each restriction by hand to path. Verify returns true
2045
2046 if __name__ == '__main__':
2047 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2048 s.connect((TorUtil.control_host,TorUtil.control_port))
2049 c = Connection(s)
2050 c.debug(file("control.log", "w"))
2051 c.authenticate(TorUtil.control_pass)
2052 nslist = c.get_network_status()
2053 sorted_rlist = c.read_routers(c.get_network_status())
2054
2055 sorted_rlist.sort(lambda x, y: cmp(y.bw, x.bw))
2056 for i in xrange(len(sorted_rlist)): sorted_rlist[i].list_rank = i
2057
2058 def flag_weighting(bwgen, r):
2059 bw = r.bw
2060 if "Exit" in r.flags:
2061 bw *= bwgen.exit_weight
2062 if "Guard" in r.flags:
2063 bw *= bwgen.guard_weight
2064 return bw
2065
2066 def uniform_weighting(bwgen, r):
2067 return 10240000
2068
2069 # XXX: Test OrderedexitGenerators
2070 do_gen_unit(
2071 UniformGenerator(sorted_rlist,
2072 NodeRestrictionList([PercentileRestriction(20,30,sorted_rlist),
2073 FlagsRestriction(["Valid"])])),
2074 sorted_rlist, uniform_weighting, 1500)
2075
2076
2077 do_gen_unit(BwWeightedGenerator(sorted_rlist, FlagsRestriction(["Exit"]),
2078 3, exit=True),
2079 sorted_rlist, flag_weighting, 500)
2080
2081 do_gen_unit(BwWeightedGenerator(sorted_rlist, FlagsRestriction(["Guard"]),
2082 3, guard=True),
2083 sorted_rlist, flag_weighting, 500)
2084
2085 do_gen_unit(
2086 BwWeightedGenerator(sorted_rlist, FlagsRestriction(["Valid"]), 3),
2087 sorted_rlist, flag_weighting, 500)
2088
2089
2090 for r in sorted_rlist:
2091 if r.will_exit_to("211.11.21.22", 465):
2092 print r.nickname+" "+str(r.bw)
2093
2094 do_unit(FlagsRestriction(["Guard"], []), sorted_rlist, lambda r: " ".join(r.flags))
2095 do_unit(FlagsRestriction(["Fast"], []), sorted_rlist, lambda r: " ".join(r.flags))
2096
2097 do_unit(ExitPolicyRestriction("2.11.2.2", 80), sorted_rlist,
2098 lambda r: "exits to 80")
2099 do_unit(PercentileRestriction(0, 100, sorted_rlist), sorted_rlist,
2100 lambda r: "")
2101 do_unit(PercentileRestriction(10, 20, sorted_rlist), sorted_rlist,
2102 lambda r: "")
2103 do_unit(OSRestriction([r"[lL]inux", r"BSD", "Darwin"], []), sorted_rlist,
2104 lambda r: r.os)
2105 do_unit(OSRestriction([], ["Windows", "Solaris"]), sorted_rlist,
2106 lambda r: r.os)
2107
2108 do_unit(VersionRangeRestriction("0.1.2.0"), sorted_rlist,
2109 lambda r: str(r.version))
2110 do_unit(VersionRangeRestriction("0.1.2.0", "0.1.2.5"), sorted_rlist,
2111 lambda r: str(r.version))
2112 do_unit(VersionIncludeRestriction(["0.1.1.26-alpha", "0.1.2.7-ignored"]),
2113 sorted_rlist, lambda r: str(r.version))
2114 do_unit(VersionExcludeRestriction(["0.1.1.26"]), sorted_rlist,
2115 lambda r: str(r.version))
2116
2117 do_unit(ConserveExitsRestriction(), sorted_rlist, lambda r: " ".join(r.flags))
2118 do_unit(FlagsRestriction([], ["Valid"]), sorted_rlist, lambda r: " ".join(r.flags))
2119
2120 do_unit(IdHexRestriction("$FFCB46DB1339DA84674C70D7CB586434C4370441"),
2121 sorted_rlist, lambda r: r.idhex)
2122
2123 rl = [AtLeastNNodeRestriction([ExitPolicyRestriction("255.255.255.255", 80), ExitPolicyRestriction("255.255.255.255", 443), ExitPolicyRestriction("255.255.255.255", 6667)], 2), FlagsRestriction([], ["BadExit"])]
2124
2125 exit_rstr = NodeRestrictionList(rl)
2126
2127 ug = UniformGenerator(sorted_rlist, exit_rstr)
2128
2129 ug.rewind()
2130 rlist = []
2131 for r in ug.generate():
2132 print "Checking: " + r.nickname
2133 for rs in rl:
2134 if not rs.r_is_ok(r):
2135 raise PathError()
2136 if not "Exit" in r.flags:
2137 print "No exit in flags of "+r.idhex
2138 for e in r.exitpolicy:
2139 print " "+str(e)
2140 print " 80: "+str(r.will_exit_to("255.255.255.255", 80))
2141 print " 443: "+str(r.will_exit_to("255.255.255.255", 443))
2142 print " 6667: "+str(r.will_exit_to("255.255.255.255", 6667))
2143
2144 ug.mark_chosen(r)
2145 rlist.append(r)
2146 for r in sorted_rlist:
2147 if "Exit" in r.flags and not r in rlist:
2148 print r.idhex+" is an exit not in rl!"
2149