diff options
| -rw-r--r-- | ChangeLog | 294 | ||||
| -rw-r--r-- | PathSupport.py | 49 | ||||
| -rw-r--r-- | README | 9 | ||||
| -rw-r--r-- | SQLSupport.py | 319 | ||||
| -rw-r--r-- | ScanSupport.py | 20 | ||||
| -rwxr-xr-x | TorCtl.py | 258 | ||||
| -rw-r--r-- | TorUtil.py | 27 | ||||
| -rw-r--r-- | debian/changelog | 15 | ||||
| -rw-r--r-- | debian/control | 6 | ||||
| -rwxr-xr-x | debian/rules | 2 |
10 files changed, 775 insertions, 224 deletions
@@ -1,3 +1,297 @@ +commit 4fdd2031e6b231ed4bbaa79940f67e9b8f691382 +Author: aagbsn <aagbsn@extc.org> +Date: Fri Sep 20 16:52:58 2013 +0200 + + 9771 - Keep Torflow happy + + Torflow expects the PURPOSE to not be None but "" if no PURPOSE is set. + +commit c161f352e7fabd88cdd7eb9249a55957038b07d9 +Author: aagbsn <aagbsn@extc.org> +Date: Thu Sep 19 12:44:50 2013 +0200 + + 9771 - Fix CIRC event parsing + + In torspec.git commit e195a4c8d288eb27385060740b8fde170a5e3e38 the + format of the CIRC event was changed slightly. This commit parses this + newer format. + +commit 68bc5de84dc90e1292c6aa19abd95f660b5e3277 +Author: aagbsn <aagbsn@extc.org> +Date: Tue Mar 5 19:56:29 2013 +0100 + + 8399 - Do not return 0 from get_unmeasured_bw + +commit e6c7fd73e75456ea216402ff21faa2cc31e4c603 +Author: aagbsn <aagbsn@extc.org> +Date: Sat Mar 2 10:18:05 2013 +0100 + + 8273 - Remove Fast Flag requirement for measurements + +commit e27fb553b471b8f0774ae81cb024b5507d49894e +Author: aagbsn <aagbsn@extc.org> +Date: Thu Feb 28 11:48:41 2013 +0100 + + 6131 - BwAuths learn to recognize Unmeasured=1 in consensus + + Routers with the Unmeasured flag get a hard-coded consensus to descriptor + ratio of 1.0 + +commit 853cb1e2040c31f90c89d4941e18a8f9ae677146 +Author: Mike Perry <mikeperry+git@torproject.org> +Date: Sat Dec 29 16:30:53 2012 -0800 + + Mention stem and txtorcon as alternatives. + +commit 305a759d99dd01f60faed9aa036b37746d3c54c5 +Author: Mike Perry <mikeperry+git@torproject.org> +Date: Wed Apr 18 16:47:25 2012 -0700 + + Turn an exception into a log. + +commit 3de91767e208d1642a8cf89dc1b645f637abaf8e +Author: Mike Perry <mikeperry-git@fscked.org> +Date: Tue Nov 15 18:42:18 2011 -0800 + + Make memory-only sqlite work. + +commit 391b4399adda91e8b68ffd6e1fa294efe846debd +Author: Mike Perry <mikeperry-git@fscked.org> +Date: Fri Nov 4 15:41:54 2011 -0700 + + Don't round the circ fail rate. + +commit 66ed91b5c1453b9c25b7070b702e2e548efb4f80 +Merge: e15ed99 a582469 +Author: Mike Perry <mikeperry-git@fscked.org> +Date: Mon Oct 31 21:34:30 2011 -0700 + + Merge commit 'a58246918f58f3fc5663f8772df39cdcd3ccce8d' + +commit a58246918f58f3fc5663f8772df39cdcd3ccce8d +Author: Mike Perry <mikeperry-git@fscked.org> +Date: Mon Oct 31 21:33:20 2011 -0700 + + Add TorCtl piece of the fix for #1984. + +commit e15ed99df8e8e04cadd0f3792e42a621df6fbe02 +Author: Mike Perry <mikeperry-git@fscked.org> +Date: Wed Oct 19 16:32:34 2011 -0700 + + Bug #4097: Clear old last_exit before requesting a new one + + Should eliminate the keyerror in the test's node_map, which seemed to be due + to really stale last_exits laying around from previous tests on a different + consensus. + +commit 750d7c392766476d12cc9e47bdffffcce84bcdf5 +Author: Mike Perry <mikeperry-git@fscked.org> +Date: Tue Oct 4 17:39:44 2011 -0700 + + Document a pythonism that briefly confused Aaron and me. + +commit 520722b8e3a7bb72bf9b5063f3c1649b53110225 +Author: aagbsn <aagbsn@extc.org> +Date: Wed Sep 14 16:56:16 2011 -0700 + + 4024 - get_git_version does not work for submodule + + Added support for detached heads + +commit e99394c46b30ed73ffbf02d5c6a39bf9b04416a2 +Author: aagbsn <aagbsn@extc.org> +Date: Wed Aug 31 16:23:42 2011 -0700 + + use local sessions + + see: + http://www.sqlalchemy.org/docs/orm/session.html#lifespan-of-a-contextual-session + "This has the effect such that each web request starts fresh with a brand new + session, and is the most definitive approach to closing out a request." + +commit 097a95c4185b086dbac67e0d2618bd15b9f41521 +Author: aagbsn <aagbsn@extc.org> +Date: Thu Sep 1 13:53:22 2011 -0700 + + NOTICE when git repo not found rather than ERROR + +commit a3ed3d01ff93df9d25dc64ffbacfcb6b4ef0befa +Author: aagbsn <aagbsn@extc.org> +Date: Wed Aug 31 17:07:27 2011 -0700 + + add function to get current branch and head + +commit 4a990f47e5ecd26af4f6459ae6d63ea36dd88f00 +Author: aagbsn <aagbsn@extc.org> +Date: Wed Aug 31 16:24:27 2011 -0700 + + remove ; + +commit db835ef1f6a252d9923be766f1d7dc813ea9a4eb +Author: aagbsn <aagbsn@extc.org> +Date: Wed Aug 31 16:09:00 2011 -0700 + + use Elixir model delete method + + try to use the same session for deletes and queries where possible, + this may be the cause of an ObjectNotFound exception that occurs + right after _update_db(), despite the Router object being in the + consensus and also in the db. + +commit 741cc04db278cede4bbe0c9e0147066fbfd00a4b +Author: aagbsn <aagbsn@extc.org> +Date: Thu Jul 7 15:21:26 2011 -0700 + + placeholder for call to refresh_all() + + suspicious session behavior may require refreshing all + objects in order to keep tc_session and sessions bound + to Elixir synchronized + +commit 37f612706ad384d7a1d34a33268f3d1d8449d5f2 +Author: aagbsn <aagbsn@extc.org> +Date: Thu Jul 7 14:58:37 2011 -0700 + + WARN when circ_status_event fails to query Router + +commit 25cb2bf77b1a98f2273b0f90c99b57e3d6e21a41 +Merge: 962d30c e538547 +Author: Mike Perry <mikeperry-git@fscked.org> +Date: Mon Jun 27 11:23:39 2011 -0700 + + Merge branch 'ratio-restrictions' + +commit 962d30c72ffc85c1df79270d6ac5b25f0336d2d2 +Author: Mike Perry <mikeperry-git@fscked.org> +Date: Sat Jun 25 10:37:22 2011 -0700 + + Allow TorCtl.connect() to specify alternate Connection classes. + + Need to move it for that... + +commit e538547dfc2973d7bd99e2d927a1d87be5230704 +Author: Mike Perry <mikeperry-git@fscked.org> +Date: Fri Jun 24 13:32:37 2011 -0700 + + Implement ratio ranking support. + +commit dc1cb9ce39fd5c9c5ea50f542642130a44e71fee +Author: Mike Perry <mikeperry-git@fscked.org> +Date: Fri Jun 24 12:33:02 2011 -0700 + + Add a little more rambling to reset_all(). + +commit d0f593f2ae245e951323ff7fc3a65fa6a1f16b47 +Merge: 3c438e0 eb736ba +Author: Mike Perry <mikeperry-git@fscked.org> +Date: Fri Jun 24 11:57:47 2011 -0700 + + Merge remote branch 'aagbsn/testing' + +commit 3c438e013b2301f2c5e04bb1aceb907ae5f96576 +Author: Mike Perry <mikeperry-git@fscked.org> +Date: Fri Jun 24 10:46:59 2011 -0700 + + Eliminate a warn about a consensus miscount. + + By fixing the miscount, of course. + +commit eb736ba5e24d1e4259db921bda52ab918c043553 +Author: aagbsn <aagbsn@extc.org> +Date: Thu Jun 23 15:40:05 2011 -0700 + + Added refresh_all() and warnings to reset_all() + + SQLSupport.refresh_all() is required to keep Elixir and + tc_session in sync. Otherwise it is possible for + routers added by the consensus update to not show up + in queries using the Elixir model i.e. Router.query.all() + + Also, warnings have been added to SQLSupport.reset_all() because + this does not work properly -- in some cases relation tables + were not being reset properly (this resulted in old bw measurements + being repeated in future output!). + + Finally, even when reset_all() works properly, bwauthority memory + usage continues to grow. + +commit e1266cb4d716a8a066ea3d92fb3d19305939a058 +Author: aagbsn <aagbsn@extc.org> +Date: Thu May 19 14:17:12 2011 -0700 + + update consensus after resetting stats within the same job + +commit 13307d9cc0ed54bf5602c2c9b5e8db86113f3a45 +Author: aagbsn <aagbsn@extc.org> +Date: Mon Apr 18 10:38:50 2011 -0700 + + added reset_stats() to Scansupport.py + + calls parent and SQLSupport reset functions + Tests show that SQLSupport.reset_all() may clear too much because + if BwAuthority calls Scansupport.reset_stats() after each speedrace() + run only the first slice is properly recorded; the rest are empty. + See: https://trac.torproject.org/projects/tor/ticket/2947 + +commit 7cd3224c4615c8b2e3f465ed2d1b22d2f9dbf89f +Author: Mike Perry <mikeperry-git@fscked.org> +Date: Mon Jun 20 10:03:30 2011 -0700 + + Remove python 2.5ism + +commit ef0b770e9fa3ddb63ec52dfa4796c7dff27e8709 +Author: aagbsn <aagbsn@extc.org> +Date: Sat Jun 18 16:36:24 2011 -0700 + + add case for long + +commit ffb68994a4d440db9deca2624ed33075f3fb7125 +Author: aagbsn <aagbsn@extc.org> +Date: Fri Jun 17 10:53:05 2011 -0700 + + generalize db message, we now support postgres and mysql too + +commit fbf2fcf865bc0d9b65841076fd88e31f321638de +Author: aagbsn <aagbsn@extc.org> +Date: Fri Apr 29 16:21:40 2011 -0700 + + fixes for divide-by-zeros + + Postgres doesn't ignore divide-by-zeros like MySQL + CASE statement added to set the result to NULL if + the denominator is zero + +commit 3a2b44864417908bae45e835e0502297671e6c7b +Author: aagbsn <aagbsn@extc.org> +Date: Sun Apr 24 21:12:56 2011 -0700 + + rewrite query for mysql compatibility attempt 2 + + this actually appears to work + +commit a416402b0a4c6b5406448a5f87538081f26d9368 +Author: aagbsn <aagbsn@extc.org> +Date: Thu Apr 21 18:28:17 2011 -0700 + + rewrite query for mysql compatibility attempt 1 + +commit c8fee8b7b5ea6c584bb5a75647ee44737e60b66d +Author: aagbsn <aagbsn@extc.org> +Date: Sun Apr 10 23:59:09 2011 -0700 + + backward compatibility with SQLAlchemy 0.5.x + +commit 5161c64139f18b55288dab07e74f9f3f1c5bb704 +Author: aagbsn <aagbsn@extc.org> +Date: Sun Apr 10 23:24:23 2011 -0700 + + SQLAlchemey and Elixir upgrade + + enabled elixir migration aid options. + + renamed a few function calls, as per SQLAlchemy upgrade docs: + session.clear() is removed. use session.remove_all() + commit af306b53a668168ae92a6c919d4013b81d7e61ad Author: Mike Perry <mikeperry-git@fscked.org> Date: Fri Jun 17 17:29:43 2011 -0700 diff --git a/PathSupport.py b/PathSupport.py index 4e9ad6d..c9c5ee1 100644 --- a/PathSupport.py +++ b/PathSupport.py @@ -229,6 +229,27 @@ class PercentileRestriction(NodeRestriction): def __str__(self): return self.__class__.__name__+"("+str(self.pct_skip)+","+str(self.pct_fast)+")" +class RatioPercentileRestriction(NodeRestriction): + """Restriction to cut out a percentile slice of the network by ratio of + consensus bw to descriptor bw.""" + def __init__(self, pct_skip, pct_fast, r_list): + """Constructor. Sets up the restriction such that routers in the + 'pct_skip' to 'pct_fast' percentile of bandwidth rankings are + returned from the sorted list 'r_list'""" + self.pct_fast = pct_fast + self.pct_skip = pct_skip + self.sorted_r = r_list + + def r_is_ok(self, r): + "Returns true if r is in the percentile boundaries (by rank)" + if r.ratio_rank < len(self.sorted_r)*self.pct_skip/100: return False + elif r.ratio_rank > len(self.sorted_r)*self.pct_fast/100: return False + + return True + + def __str__(self): + return self.__class__.__name__+"("+str(self.pct_skip)+","+str(self.pct_fast)+")" + class UptimeRestriction(NodeRestriction): """Restriction to filter out routers with uptimes < min_uptime or > max_uptime""" @@ -1008,7 +1029,8 @@ class SelectionManager(BaseSelectionManager): def __init__(self, pathlen, order_exits, percent_fast, percent_skip, min_bw, use_all_exits, uniform, use_exit, use_guards,geoip_config=None, - restrict_guards=False, extra_node_rstr=None, exit_ports=None): + restrict_guards=False, extra_node_rstr=None, exit_ports=None, + order_by_ratio=False): BaseSelectionManager.__init__(self) self.__ordered_exit_gen = None self.pathlen = pathlen @@ -1026,6 +1048,7 @@ class SelectionManager(BaseSelectionManager): self.consensus = None self.exit_ports = exit_ports self.extra_node_rstr=extra_node_rstr + self.order_by_ratio = order_by_ratio def reconfigure(self, consensus=None): try: @@ -1053,8 +1076,8 @@ class SelectionManager(BaseSelectionManager): self.path_rstr = PathRestrictionList( [Subnet16Restriction(), UniqueRestriction()]) - if self.use_guards: entry_flags = ["Guard", "Running", "Fast"] - else: entry_flags = ["Running", "Fast"] + if self.use_guards: entry_flags = ["Guard", "Running"] + else: entry_flags = ["Running"] if self.restrict_guards_only: nonentry_skip = 0 @@ -1063,21 +1086,26 @@ class SelectionManager(BaseSelectionManager): nonentry_skip = self.percent_skip nonentry_fast = self.percent_fast + if self.order_by_ratio: + PctRstr = RatioPercentileRestriction + else: + PctRstr = PercentileRestriction + # XXX: sometimes we want the ability to do uniform scans # without the conserve exit restrictions.. entry_rstr = NodeRestrictionList( - [PercentileRestriction(self.percent_skip, self.percent_fast, sorted_r), + [PctRstr(self.percent_skip, self.percent_fast, sorted_r), OrNodeRestriction( [FlagsRestriction(["BadExit"]), ConserveExitsRestriction(self.exit_ports)]), FlagsRestriction(entry_flags, [])] ) mid_rstr = NodeRestrictionList( - [PercentileRestriction(nonentry_skip, nonentry_fast, sorted_r), + [PctRstr(nonentry_skip, nonentry_fast, sorted_r), OrNodeRestriction( [FlagsRestriction(["BadExit"]), ConserveExitsRestriction(self.exit_ports)]), - FlagsRestriction(["Running","Fast"], [])] + FlagsRestriction(["Running"], [])] ) @@ -1087,11 +1115,11 @@ class SelectionManager(BaseSelectionManager): self.exit_rstr = NodeRestrictionList([IdHexRestriction(self.exit_id)]) elif self.use_all_exits: self.exit_rstr = NodeRestrictionList( - [FlagsRestriction(["Running","Fast"], ["BadExit"])]) + [FlagsRestriction(["Running"], ["BadExit"])]) else: self.exit_rstr = NodeRestrictionList( - [PercentileRestriction(nonentry_skip, nonentry_fast, sorted_r), - FlagsRestriction(["Running","Fast"], ["BadExit"])]) + [PctRstr(nonentry_skip, nonentry_fast, sorted_r), + FlagsRestriction(["Running"], ["BadExit"])]) if self.extra_node_rstr: entry_rstr.add_restriction(self.extra_node_rstr) @@ -1579,6 +1607,9 @@ class PathBuilder(TorCtl.ConsensusTracker): plog("WARN", "Error attaching new stream: "+str(e.args)) return break + # This else clause is executed when we go through the circuit + # list without finding an entry (or it is empty). + # http://docs.python.org/tutorial/controlflow.html#break-and-continue-statements-and-else-clauses-on-loops else: circ = None try: @@ -1,3 +1,12 @@ +Note: TorCtl is mostly unmaintained. It serves primarily as the support +library for the Bandwidth Authorities, Exit Scanner, and other projects in +TorFlow. For more actively maintained python libraries, you may consider using +Stem or TxTorCon. See: +https://stem.torproject.org/ and https://github.com/meejah/txtorcon + + + + TorCtl Python Bindings diff --git a/SQLSupport.py b/SQLSupport.py index 2532973..28963fc 100644 --- a/SQLSupport.py +++ b/SQLSupport.py @@ -22,12 +22,18 @@ from TorUtil import meta_port, meta_host, control_port, control_host, control_pa from TorCtl import EVENT_TYPE, EVENT_STATE, TorCtlError import sqlalchemy +import sqlalchemy.pool import sqlalchemy.orm.exc from sqlalchemy.orm import scoped_session, sessionmaker, eagerload, lazyload, eagerload_all +from sqlalchemy.orm.exc import NoResultFound from sqlalchemy import create_engine, and_, or_, not_, func -from sqlalchemy.sql import func,select +from sqlalchemy.sql import func,select,alias,case from sqlalchemy.schema import ThreadLocalMetaData,MetaData from elixir import * +from elixir import options +# migrate from elixir 06 to 07 +options.MIGRATION_TO_07_AID = True + # Nodes with a ratio below this value will be removed from consideration # for higher-valued nodes @@ -35,6 +41,13 @@ MIN_RATIO=0.5 NO_FPE=2**-50 +#################### Session Usage ############### +# What is all this l_session madness? See: +# http://www.sqlalchemy.org/docs/orm/session.html#lifespan-of-a-contextual-session +# "This has the effect such that each web request starts fresh with +# a brand new session, and is the most definitive approach to closing +# out a request." + #################### Model ####################### # In elixir, the session (DB connection) is a property of the model.. @@ -46,7 +59,13 @@ tc_metadata.echo=True tc_session = scoped_session(sessionmaker(autoflush=True)) def setup_db(db_uri, echo=False, drop=False): - tc_engine = create_engine(db_uri, echo=echo) + # Memory-only sqlite requires some magic options... + if db_uri == "sqlite://": + tc_engine = create_engine(db_uri, echo=echo, + connect_args={'check_same_thread':False}, + poolclass=sqlalchemy.pool.StaticPool) + else: + tc_engine = create_engine(db_uri, echo=echo) tc_metadata.bind = tc_engine tc_metadata.echo = echo @@ -59,6 +78,10 @@ def setup_db(db_uri, echo=False, drop=False): # wouldn't kill you, you know. tc_session.add = tc_session.save_or_update + if sqlalchemy.__version__ < "0.6.0": + # clear() replaced with expunge_all + tc_session.clear = tc_session.expunge_all + class Router(Entity): using_options(shortnames=True, order_by='-published', session=tc_session, metadata=tc_metadata) using_mapper_options(save_on_init=False) @@ -248,6 +271,7 @@ class RouterStats(Entity): filt_sbw_ratio = Field(Float) def _compute_stats_relation(stats_clause): + l_session = tc_session() for rs in RouterStats.query.\ filter(stats_clause).\ options(eagerload_all('router.circuits.extensions')).\ @@ -286,12 +310,15 @@ class RouterStats(Entity): if rs.circ_try_to+rs.circ_try_from > 0: rs.circ_bi_rate = (1.0*rs.circ_fail_to+rs.circ_fail_from)/(rs.circ_try_to+rs.circ_try_from) - tc_session.add(rs) - tc_session.commit() + l_session.add(rs) + l_session.commit() + tc_session.remove() _compute_stats_relation = Callable(_compute_stats_relation) + def _compute_stats_query(stats_clause): - tc_session.clear() + tc_session.expunge_all() + l_session = tc_session() # http://www.sqlalchemy.org/docs/04/sqlexpression.html#sql_update to_s = select([func.count(Extension.id)], and_(stats_clause, Extension.table.c.to_node_idhex @@ -320,15 +347,20 @@ class RouterStats(Entity): RouterStats.table.c.circ_fail_from:f_from_s, RouterStats.table.c.avg_first_ext:avg_ext}).execute() + # added case() to set NULL and avoid divide-by-zeros (Postgres) RouterStats.table.update(stats_clause, values= {RouterStats.table.c.circ_from_rate: - RouterStats.table.c.circ_fail_from/RouterStats.table.c.circ_try_from, + case([(RouterStats.table.c.circ_try_from == 0, None)], + else_=(RouterStats.table.c.circ_fail_from/RouterStats.table.c.circ_try_from)), RouterStats.table.c.circ_to_rate: - RouterStats.table.c.circ_fail_to/RouterStats.table.c.circ_try_to, + case([(RouterStats.table.c.circ_try_to == 0, None)], + else_=(RouterStats.table.c.circ_fail_to/RouterStats.table.c.circ_try_to)), RouterStats.table.c.circ_bi_rate: - (RouterStats.table.c.circ_fail_to+RouterStats.table.c.circ_fail_from) + case([(RouterStats.table.c.circ_try_to+RouterStats.table.c.circ_try_from == 0, None)], + else_=((RouterStats.table.c.circ_fail_to+RouterStats.table.c.circ_fail_from) / - (RouterStats.table.c.circ_try_to+RouterStats.table.c.circ_try_from)}).execute() + (RouterStats.table.c.circ_try_to+RouterStats.table.c.circ_try_from))), + }).execute() # TODO: Give the streams relation table a sane name and reduce this too @@ -360,17 +392,20 @@ class RouterStats(Entity): tot_var += (s.bandwidth()-rs.sbw)*(s.bandwidth()-rs.sbw) tot_var /= s_cnt rs.sbw_dev = math.sqrt(tot_var) - tc_session.add(rs) - tc_session.commit() + l_session.add(rs) + l_session.commit() + tc_session.remove() _compute_stats_query = Callable(_compute_stats_query) + def _compute_stats(stats_clause): RouterStats._compute_stats_query(stats_clause) #RouterStats._compute_stats_relation(stats_clause) _compute_stats = Callable(_compute_stats) def _compute_ranks(): - tc_session.clear() + tc_session.expunge_all() + l_session = tc_session() min_r = select([func.min(BwHistory.rank)], BwHistory.table.c.router_idhex == RouterStats.table.c.router_idhex).as_scalar() @@ -395,26 +430,46 @@ class RouterStats(Entity): RouterStats.table.c.avg_desc_bw:avg_desc_bw}).execute() #min_avg_rank = select([func.min(RouterStats.avg_rank)]).as_scalar() - max_avg_rank = select([func.max(RouterStats.avg_rank)]).as_scalar() + + # the commented query breaks mysql because UPDATE cannot reference + # target table in the FROM clause. So we throw in an anonymous alias and wrap + # another select around it in order to get the nested SELECT stored into a + # temporary table. + # FIXME: performance? no idea + #max_avg_rank = select([func.max(RouterStats.avg_rank)]).as_scalar() + max_avg_rank = select([alias(select([func.max(RouterStats.avg_rank)]))]).as_scalar() RouterStats.table.update(values= {RouterStats.table.c.percentile: (100.0*RouterStats.table.c.avg_rank)/max_avg_rank}).execute() - tc_session.commit() + + l_session.commit() + tc_session.remove() _compute_ranks = Callable(_compute_ranks) def _compute_ratios(stats_clause): - tc_session.clear() - avg_from_rate = select([func.avg(RouterStats.circ_from_rate)], - stats_clause).as_scalar() - avg_to_rate = select([func.avg(RouterStats.circ_to_rate)], - stats_clause).as_scalar() - avg_bi_rate = select([func.avg(RouterStats.circ_bi_rate)], - stats_clause).as_scalar() - avg_ext = select([func.avg(RouterStats.avg_first_ext)], - stats_clause).as_scalar() - avg_sbw = select([func.avg(RouterStats.sbw)], - stats_clause).as_scalar() + tc_session.expunge_all() + l_session = tc_session() + avg_from_rate = select([alias( + select([func.avg(RouterStats.circ_from_rate)], + stats_clause) + )]).as_scalar() + avg_to_rate = select([alias( + select([func.avg(RouterStats.circ_to_rate)], + stats_clause) + )]).as_scalar() + avg_bi_rate = select([alias( + select([func.avg(RouterStats.circ_bi_rate)], + stats_clause) + )]).as_scalar() + avg_ext = select([alias( + select([func.avg(RouterStats.avg_first_ext)], + stats_clause) + )]).as_scalar() + avg_sbw = select([alias( + select([func.avg(RouterStats.sbw)], + stats_clause) + )]).as_scalar() RouterStats.table.update(stats_clause, values= {RouterStats.table.c.circ_from_ratio: @@ -427,10 +482,12 @@ class RouterStats(Entity): avg_ext/RouterStats.table.c.avg_first_ext, RouterStats.table.c.sbw_ratio: RouterStats.table.c.sbw/avg_sbw}).execute() - tc_session.commit() + l_session.commit() + tc_session.remove() _compute_ratios = Callable(_compute_ratios) def _compute_filtered_relational(min_ratio, stats_clause, filter_clause): + l_session = tc_session() badrouters = RouterStats.query.filter(stats_clause).filter(filter_clause).\ filter(RouterStats.sbw_ratio < min_ratio).all() @@ -455,18 +512,19 @@ class RouterStats(Entity): if sbw_cnt: rs.filt_sbw = tot_sbw/sbw_cnt else: rs.filt_sbw = None - tc_session.add(rs) + l_session.add(rs) if sqlalchemy.__version__ < "0.5.0": avg_sbw = RouterStats.query.filter(stats_clause).avg(RouterStats.filt_sbw) else: - avg_sbw = tc_session.query(func.avg(RouterStats.filt_sbw)).filter(stats_clause).scalar() + avg_sbw = l_session.query(func.avg(RouterStats.filt_sbw)).filter(stats_clause).scalar() for rs in RouterStats.query.filter(stats_clause).all(): if type(rs.filt_sbw) == float and avg_sbw: rs.filt_sbw_ratio = rs.filt_sbw/avg_sbw else: rs.filt_sbw_ratio = None - tc_session.add(rs) - tc_session.commit() + l_session.add(rs) + l_session.commit() + tc_session.remove() _compute_filtered_relational = Callable(_compute_filtered_relational) def _compute_filtered_ratios(min_ratio, stats_clause, filter_clause): @@ -476,18 +534,21 @@ class RouterStats(Entity): _compute_filtered_ratios = Callable(_compute_filtered_ratios) def reset(): - tc_session.clear() + tc_session.expunge_all() + l_session = tc_session() RouterStats.table.drop() RouterStats.table.create() for r in Router.query.all(): rs = RouterStats() rs.router = r r.stats = rs - tc_session.add(r) - tc_session.commit() + l_session.add(r) + l_session.commit() + tc_session.remove() reset = Callable(reset) def compute(pct_low=0, pct_high=100, stat_clause=None, filter_clause=None): + l_session = tc_session() pct_clause = and_(RouterStats.percentile >= pct_low, RouterStats.percentile < pct_high) if stat_clause: @@ -500,10 +561,12 @@ class RouterStats(Entity): RouterStats._compute_stats(stat_clause) RouterStats._compute_ratios(stat_clause) RouterStats._compute_filtered_ratios(MIN_RATIO, stat_clause, filter_clause) - tc_session.commit() + l_session.commit() + tc_session.remove() compute = Callable(compute) def write_stats(f, pct_low=0, pct_high=100, order_by=None, recompute=False, stat_clause=None, filter_clause=None, disp_clause=None): + l_session = tc_session() if not order_by: order_by=RouterStats.avg_first_ext @@ -525,18 +588,21 @@ class RouterStats(Entity): filt_sbw = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.filt_sbw) percentile = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.percentile) else: - circ_from_rate = tc_session.query(func.avg(RouterStats.circ_from_rate)).filter(pct_clause).filter(stat_clause).scalar() - circ_to_rate = tc_session.query(func.avg(RouterStats.circ_to_rate)).filter(pct_clause).filter(stat_clause).scalar() - circ_bi_rate = tc_session.query(func.avg(RouterStats.circ_bi_rate)).filter(pct_clause).filter(stat_clause).scalar() + circ_from_rate = l_session.query(func.avg(RouterStats.circ_from_rate)).filter(pct_clause).filter(stat_clause).scalar() + circ_to_rate = l_session.query(func.avg(RouterStats.circ_to_rate)).filter(pct_clause).filter(stat_clause).scalar() + circ_bi_rate = l_session.query(func.avg(RouterStats.circ_bi_rate)).filter(pct_clause).filter(stat_clause).scalar() - avg_first_ext = tc_session.query(func.avg(RouterStats.avg_first_ext)).filter(pct_clause).filter(stat_clause).scalar() - sbw = tc_session.query(func.avg(RouterStats.sbw)).filter(pct_clause).filter(stat_clause).scalar() - filt_sbw = tc_session.query(func.avg(RouterStats.filt_sbw)).filter(pct_clause).filter(stat_clause).scalar() - percentile = tc_session.query(func.avg(RouterStats.percentile)).filter(pct_clause).filter(stat_clause).scalar() + avg_first_ext = l_session.query(func.avg(RouterStats.avg_first_ext)).filter(pct_clause).filter(stat_clause).scalar() + sbw = l_session.query(func.avg(RouterStats.sbw)).filter(pct_clause).filter(stat_clause).scalar() + filt_sbw = l_session.query(func.avg(RouterStats.filt_sbw)).filter(pct_clause).filter(stat_clause).scalar() + percentile = l_session.query(func.avg(RouterStats.percentile)).filter(pct_clause).filter(stat_clause).scalar() + + tc_session.remove() def cvt(a,b,c=1): if type(a) == float: return round(a/c,b) elif type(a) == int: return a + elif type(a) == long: return a elif type(a) == type(None): return "None" else: return type(a) @@ -584,6 +650,7 @@ class RouterStats(Entity): def write_bws(f, pct_low=0, pct_high=100, order_by=None, recompute=False, stat_clause=None, filter_clause=None, disp_clause=None): + l_session = tc_session() if not order_by: order_by=RouterStats.avg_first_ext @@ -598,8 +665,8 @@ class RouterStats(Entity): sbw = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.sbw) filt_sbw = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.filt_sbw) else: - sbw = tc_session.query(func.avg(RouterStats.sbw)).filter(pct_clause).filter(stat_clause).scalar() - filt_sbw = tc_session.query(func.avg(RouterStats.filt_sbw)).filter(pct_clause).filter(stat_clause).scalar() + sbw = l_session.query(func.avg(RouterStats.sbw)).filter(pct_clause).filter(stat_clause).scalar() + filt_sbw = l_session.query(func.avg(RouterStats.filt_sbw)).filter(pct_clause).filter(stat_clause).scalar() f.write(str(int(time.time()))+"\n") @@ -614,10 +681,12 @@ class RouterStats(Entity): f.write("node_id=$"+s.router.idhex+" nick="+s.router.nickname) f.write(" strm_bw="+str(cvt(s.sbw,0))) f.write(" filt_bw="+str(cvt(s.filt_sbw,0))) + f.write(" circ_fail_rate="+str(s.circ_to_rate)) f.write(" desc_bw="+str(int(cvt(s.avg_desc_bw,0)))) f.write(" ns_bw="+str(int(cvt(s.avg_bw,0)))+"\n") f.flush() + tc_session.remove() write_bws = Callable(write_bws) @@ -625,7 +694,18 @@ class RouterStats(Entity): #################### Model Support ################ def reset_all(): - # Need to keep routers around.. + l_session = tc_session() + plog("WARN", "SQLSupport.reset_all() called. See SQLSupport.py for details") + # XXX: We still have a memory leak somewhere in here + # Current suspects are sqlite, python-sqlite, or sqlalchemy misuse... + # http://stackoverflow.com/questions/5552932/sqlalchemy-misuse-causing-memory-leak + # The bandwidth scanners switched to a parent/child model because of this. + + # XXX: WARNING! + # Must keep the routers around because circ_status_event may + # reference old Routers that are no longer in consensus + # and will raise an ObjectDeletedError. See function circ_status_event in + # class CircuitListener in SQLSupport.py for r in Router.query.all(): # This appears to be needed. the relation tables do not get dropped # automatically. @@ -634,10 +714,21 @@ def reset_all(): r.detached_streams = [] r.bw_history = [] r.stats = None - tc_session.add(r) + l_session.add(r) + + l_session.commit() + tc_session.expunge_all() - tc_session.commit() - tc_session.clear() + # XXX: WARNING! + # May not clear relation all tables! (SQLAlchemy or Elixir bug) + # Try: + # tc_session.execute('delete from router_streams__stream;') + + + # XXX: WARNING! + # This will cause Postgres databases to hang + # on DROP TABLE. Possibly an issue with cascade. + # Sqlite works though. BwHistory.table.drop() # Will drop subclasses Extension.table.drop() @@ -651,13 +742,25 @@ def reset_all(): Stream.table.create() Circuit.table.create() - tc_session.commit() + l_session.commit() #for r in Router.query.all(): # if len(r.bw_history) or len(r.circuits) or len(r.streams) or r.stats: # plog("WARN", "Router still has dropped data!") plog("NOTICE", "Reset all SQL stats") + tc_session.remove() + +def refresh_all(): + # necessary to keep all sessions synchronized + # This is probably a bug. See reset_all() above. + # Call this after update_consensus(), _update_rank_history() + # See: ScanSupport.reset_stats() + # Could be a cascade problem too, see: + # http://stackoverflow.com/questions/3481976/sqlalchemy-objectdeletederror-instance-class-at-has-been-deleted-help + # Also see: + # http://groups.google.com/group/sqlalchemy/browse_thread/thread/c9099eaaffd7c348 + [tc_session.refresh(r) for r in Router.query.all()] ##################### End Model Support #################### @@ -672,6 +775,7 @@ class ConsensusTrackerListener(TorCtl.DualEventListener): # TODO: What about non-running routers and uptime information? def _update_rank_history(self, idlist): + l_session = tc_session() plog("INFO", "Consensus change... Updating rank history") for idhex in idlist: if idhex not in self.consensus.routers: continue @@ -683,15 +787,17 @@ class ConsensusTrackerListener(TorCtl.DualEventListener): bwh = BwHistory(router=r, rank=rc.list_rank, bw=rc.bw, desc_bw=rc.desc_bw, pub_time=r.published) r.bw_history.append(bwh) - #tc_session.add(bwh) - tc_session.add(r) + #l_session.add(bwh) + l_session.add(r) except sqlalchemy.orm.exc.NoResultFound: plog("WARN", "No descriptor found for consenus router "+str(idhex)) plog("INFO", "Consensus history updated.") - tc_session.commit() + l_session.commit() + tc_session.remove() def _update_db(self, idlist): + l_session = tc_session() # FIXME: It is tempting to delay this as well, but we need # this info to be present immediately for circuit construction... plog("INFO", "Consensus change... Updating db") @@ -704,9 +810,12 @@ class ConsensusTrackerListener(TorCtl.DualEventListener): continue if not r: r = Router() r.from_router(rc) - tc_session.add(r) + l_session.add(r) plog("INFO", "Consensus db updated") - tc_session.commit() + l_session.commit() + tc_session.remove() + # testing + #refresh_all() # Too many sessions, don't trust commit() def update_consensus(self): plog("INFO", "Updating DB with full consensus.") @@ -719,6 +828,7 @@ class ConsensusTrackerListener(TorCtl.DualEventListener): TorCtl.DualEventListener.set_parent(self, parent_handler) def heartbeat_event(self, e): + l_session = tc_session() # This sketchiness is to ensure we have an accurate history # of each router's rank+bandwidth for the entire duration of the run.. if e.state == EVENT_STATE.PRELISTEN: @@ -731,8 +841,9 @@ class ConsensusTrackerListener(TorCtl.DualEventListener): orhash="000000000000000000000000000", nickname="!!TorClient", published=datetime.datetime.utcnow()) - tc_session.add(OP) - tc_session.commit() + l_session.add(OP) + l_session.commit() + tc_session.remove() self.update_consensus() # XXX: This hack exists because update_rank_history is expensive. # However, even if we delay it till the end of the consensus update, @@ -786,7 +897,9 @@ class CircuitListener(TorCtl.PreEventListener): self.track_parent = False def circ_status_event(self, c): + l_session = tc_session() if self.track_parent and c.circ_id not in self.parent_handler.circuits: + tc_session.remove() return # Ignore circuits that aren't ours # TODO: Hrmm, consider making this sane in TorCtl. if c.reason: lreason = c.reason @@ -806,17 +919,25 @@ class CircuitListener(TorCtl.PreEventListener): last_extend=c.arrived_at) if self.track_parent: for r in self.parent_handler.circuits[c.circ_id].path: - rq = Router.query.options(eagerload('circuits')).filter_by( + try: + rq = Router.query.options(eagerload('circuits')).filter_by( idhex=r.idhex).with_labels().one() + except NoResultFound: + plog("WARN", "Query for Router %s=%s in circ %s failed but was in parent_handler" % + (r.nickname, r.idhex, circ.circ_id)) + tc_session.remove() + return circ.routers.append(rq) #rq.circuits.append(circ) # done automagically? - #tc_session.add(rq) - tc_session.add(circ) - tc_session.commit() + #l_session.add(rq) + l_session.add(circ) + l_session.commit() elif c.status == "EXTENDED": circ = Circuit.query.options(eagerload('extensions')).filter_by( circ_id = c.circ_id).first() - if not circ: return # Skip circuits from before we came online + if not circ: + tc_session.remove() + return # Skip circuits from before we came online e = Extension(circ=circ, hop=len(c.path)-1, time=c.arrived_at) @@ -835,17 +956,19 @@ class CircuitListener(TorCtl.PreEventListener): # FIXME: Eager load here? circ.routers.append(e.to_node) e.to_node.circuits.append(circ) - tc_session.add(e.to_node) + l_session.add(e.to_node) e.delta = c.arrived_at - circ.last_extend circ.last_extend = c.arrived_at circ.extensions.append(e) - tc_session.add(e) - tc_session.add(circ) - tc_session.commit() + l_session.add(e) + l_session.add(circ) + l_session.commit() elif c.status == "FAILED": circ = Circuit.query.filter_by(circ_id = c.circ_id).first() - if not circ: return # Skip circuits from before we came online + if not circ: + tc_session.remove() + return # Skip circuits from before we came online circ.expunge() if isinstance(circ, BuiltCircuit): @@ -883,14 +1006,16 @@ class CircuitListener(TorCtl.PreEventListener): e.reason = reason circ.extensions.append(e) circ.fail_time = c.arrived_at - tc_session.add(e) + l_session.add(e) - tc_session.add(circ) - tc_session.commit() + l_session.add(circ) + l_session.commit() elif c.status == "BUILT": circ = Circuit.query.filter_by( circ_id = c.circ_id).first() - if not circ: return # Skip circuits from before we came online + if not circ: + tc_session.remove() + return # Skip circuits from before we came online circ.expunge() # Convert to built circuit @@ -900,8 +1025,8 @@ class CircuitListener(TorCtl.PreEventListener): circ.built_time = c.arrived_at circ.tot_delta = c.arrived_at - circ.launch_time - tc_session.add(circ) - tc_session.commit() + l_session.add(circ) + l_session.commit() elif c.status == "CLOSED": circ = BuiltCircuit.query.filter_by(circ_id = c.circ_id).first() if circ: @@ -919,20 +1044,24 @@ class CircuitListener(TorCtl.PreEventListener): circ = DestroyedCircuit.query.filter_by(id=circ.id).one() circ.destroy_reason = reason circ.destroy_time = c.arrived_at - tc_session.add(circ) - tc_session.commit() + l_session.add(circ) + l_session.commit() + tc_session.remove() class StreamListener(CircuitListener): def stream_bw_event(self, s): + l_session = tc_session() strm = Stream.query.filter_by(strm_id = s.strm_id).first() if strm and strm.start_time and strm.start_time < s.arrived_at: plog("DEBUG", "Got stream bw: "+str(s.strm_id)) strm.tot_read_bytes += s.bytes_read strm.tot_write_bytes += s.bytes_written - tc_session.add(strm) - tc_session.commit() + l_session.add(strm) + l_session.commit() + tc_session.remove() def stream_status_event(self, s): + l_session = tc_session() if s.reason: lreason = s.reason else: lreason = "NONE" if s.remote_reason: rreason = s.remote_reason @@ -942,8 +1071,9 @@ class StreamListener(CircuitListener): strm = Stream(strm_id=s.strm_id, tgt_host=s.target_host, tgt_port=s.target_port, init_status=s.status, tot_read_bytes=0, tot_write_bytes=0) - tc_session.add(strm) - tc_session.commit() + l_session.add(strm) + l_session.commit() + tc_session.remove() return strm = Stream.query.filter_by(strm_id = s.strm_id).first() @@ -951,8 +1081,9 @@ class StreamListener(CircuitListener): (s.strm_id not in self.parent_handler.streams or \ self.parent_handler.streams[s.strm_id].ignored): if strm: - tc_session.delete(strm) - tc_session.commit() + strm.delete() + l_session.commit() + tc_session.remove() return # Ignore streams that aren't ours if not strm: @@ -966,8 +1097,9 @@ class StreamListener(CircuitListener): strm.circuit = Circuit.query.filter_by(circ_id=s.circ_id).first() if not strm.circuit: plog("NOTICE", "Ignoring prior stream "+str(strm.strm_id)+" with old circuit "+str(s.circ_id)) - tc_session.delete(strm) - tc_session.commit() + strm.delete() + l_session.commit() + tc_session.remove() return else: circ = None @@ -993,19 +1125,19 @@ class StreamListener(CircuitListener): for r in strm.circuit.routers: plog("DEBUG", "Added router "+r.idhex+" to stream "+str(s.strm_id)) r.streams.append(strm) - tc_session.add(r) - tc_session.add(strm) - tc_session.commit() + l_session.add(r) + l_session.add(strm) + l_session.commit() elif s.status == "DETACHED": for r in strm.circuit.routers: r.detached_streams.append(strm) - tc_session.add(r) + l_session.add(r) #strm.detached_circuits.append(strm.circuit) strm.circuit.detached_streams.append(strm) strm.circuit.streams.remove(strm) strm.circuit = None - tc_session.add(strm) - tc_session.commit() + l_session.add(strm) + l_session.commit() elif s.status == "FAILED": strm.expunge() # Convert to destroyed circuit @@ -1014,8 +1146,8 @@ class StreamListener(CircuitListener): strm = FailedStream.query.filter_by(id=strm.id).one() strm.fail_time = s.arrived_at strm.fail_reason = reason - tc_session.add(strm) - tc_session.commit() + l_session.add(strm) + l_session.commit() elif s.status == "CLOSED": if isinstance(strm, FailedStream): strm.close_reason = reason @@ -1037,8 +1169,9 @@ class StreamListener(CircuitListener): strm.end_time = s.arrived_at plog("DEBUG", "Stream "+str(strm.strm_id)+" xmitted "+str(strm.tot_bytes())) strm.close_reason = reason - tc_session.add(strm) - tc_session.commit() + l_session.add(strm) + l_session.commit() + tc_session.remove() def run_example(host, port): """ Example of basic TorCtl usage. See PathSupport for more advanced @@ -1047,11 +1180,13 @@ def run_example(host, port): print "host is %s:%d"%(host,port) setup_db("sqlite:///torflow.sqlite", echo=False) - #print tc_session.query(((func.count(Extension.id)))).filter(and_(FailedExtension.table.c.row_type=='extension', FailedExtension.table.c.from_node_idhex == "7CAA2F5F998053EF5D2E622563DEB4A6175E49AC")).one() + #l_session = tc_session() + #print l_session.query(((func.count(Extension.id)))).filter(and_(FailedExtension.table.c.row_type=='extension', FailedExtension.table.c.from_node_idhex == "7CAA2F5F998053EF5D2E622563DEB4A6175E49AC")).one() #return #for e in Extension.query.filter(FailedExtension.table.c.row_type=='extension').all(): # if e.from_node: print "From: "+e.from_node.idhex+" "+e.from_node.nickname # if e.to_node: print "To: "+e.to_node.idhex+" "+e.to_node.nickname + #tc_session.remove() #return s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) diff --git a/ScanSupport.py b/ScanSupport.py index d102c78..e6752c5 100644 --- a/ScanSupport.py +++ b/ScanSupport.py @@ -164,6 +164,8 @@ class ScanHandler(PathSupport.PathBuilder): def notlambda(sm): plog("DEBUG", "Job for setexit: "+exit_name) cond.acquire() + # Clear last successful exit, we're running a new test + self.last_exit = None sm.set_exit(exit_name) cond.notify() cond.release() @@ -182,7 +184,7 @@ class SQLScanHandler(ScanHandler): ScanHandler.__init__(self, c, selmgr, RouterClass, strm_selector) def attach_sql_listener(self, db_uri): - plog("DEBUG", "Got sqlite: "+db_uri) + plog("DEBUG", "Got db: "+db_uri) SQLSupport.setup_db(db_uri, echo=False, drop=True) self.sql_consensus_listener = SQLSupport.ConsensusTrackerListener() self.add_event_listener(self.sql_consensus_listener) @@ -260,4 +262,18 @@ class SQLScanHandler(ScanHandler): cond.release() plog("INFO", "Consensus OK") - + def reset_stats(self): + cond = threading.Condition() + def notlambda(this): + cond.acquire() + ScanHandler.reset_stats(self) + SQLSupport.reset_all() + this.sql_consensus_listener.update_consensus() + this.sql_consensus_listener._update_rank_history(this.sql_consensus_listener.consensus.ns_map.iterkeys()) + SQLSupport.refresh_all() + cond.notify() + cond.release() + cond.acquire() + self.schedule_low_prio(notlambda) + cond.wait() + cond.release() @@ -96,82 +96,6 @@ AUTH_TYPE = Enum2( INCORRECT_PASSWORD_MSG = "Provided passphrase was incorrect" -def connect(controlAddr="127.0.0.1", controlPort=9051, passphrase=None): - """ - Convenience function for quickly getting a TorCtl connection. This is very - handy for debugging or CLI setup, handling setup and prompting for a password - if necessary (if either none is provided as input or it fails). If any issues - arise this prints a description of the problem and returns None. - - Arguments: - controlAddr - ip address belonging to the controller - controlPort - port belonging to the controller - passphrase - authentication passphrase (if defined this is used rather - than prompting the user) - """ - - conn = None - try: - conn, authType, authValue = preauth_connect(controlAddr, controlPort) - - if authType == AUTH_TYPE.PASSWORD: - # password authentication, promting for the password if it wasn't provided - if passphrase: authValue = passphrase - else: - try: authValue = getpass.getpass() - except KeyboardInterrupt: return None - - conn.authenticate(authValue) - return conn - except Exception, exc: - if conn: conn.close() - - if passphrase and str(exc) == "Unable to authenticate: password incorrect": - # provide a warning that the provided password didn't work, then try - # again prompting for the user to enter it - print INCORRECT_PASSWORD_MSG - return connect(controlAddr, controlPort) - else: - print exc - return None - -def preauth_connect(controlAddr="127.0.0.1", controlPort=9051): - """ - Provides an uninitiated torctl connection components for the control port, - returning a tuple of the form... - (torctl connection, authType, authValue) - - The authValue corresponds to the cookie path if using an authentication - cookie, otherwise this is the empty string. This raises an IOError in case - of failure. - - Arguments: - controlAddr - ip address belonging to the controller - controlPort - port belonging to the controller - """ - - conn = None - try: - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.connect((controlAddr, controlPort)) - conn = Connection(s) - authType, authValue = conn.get_auth_type(), "" - - if authType == AUTH_TYPE.COOKIE: - authValue = conn.get_auth_cookie_path() - - return (conn, authType, authValue) - except socket.error, exc: - if conn: conn.close() - - if "Connection refused" in exc.args: - # most common case - tor control port isn't available - raise IOError("Connection refused. Is the ControlPort enabled?") - - raise IOError("Failed to establish socket: %s" % exc) - except Exception, exc: - if conn: conn.close() - raise IOError(exc) class TorCtlError(Exception): "Generic error raised by TorControl code." @@ -196,7 +120,7 @@ class ErrorReply(TorCtlError): class NetworkStatus: "Filled in during NS events" - def __init__(self, nickname, idhash, orhash, updated, ip, orport, dirport, flags, bandwidth=None): + def __init__(self, nickname, idhash, orhash, updated, ip, orport, dirport, flags, bandwidth=None, unmeasured=None): self.nickname = nickname self.idhash = idhash self.orhash = orhash @@ -206,6 +130,7 @@ class NetworkStatus: self.flags = flags self.idhex = (self.idhash + "=").decode("base64").encode("hex").upper() self.bandwidth = bandwidth + self.unmeasured = unmeasured m = re.search(r"(\d+)-(\d+)-(\d+) (\d+):(\d+):(\d+)", updated) self.updated = datetime.datetime(*map(int, m.groups())) @@ -445,13 +370,15 @@ class Router: else: (idhex, name, bw, down, exitpolicy, flags, ip, version, os, uptime, published, contact, rate_limited, orhash, - ns_bandwidth,extra_info_digest) = args + ns_bandwidth,extra_info_digest,unmeasured) = args self.idhex = idhex self.nickname = name if ns_bandwidth != None: self.bw = ns_bandwidth else: - self.bw = bw + self.bw = bw + if unmeasured: + self.unmeasured = True self.desc_bw = bw self.exitpolicy = exitpolicy self.flags = flags # Technicaly from NS doc @@ -460,6 +387,7 @@ class Router: self.version = RouterVersion(version) self.os = os self.list_rank = 0 # position in a sorted list of routers. + self.ratio_rank = 0 # position in a ratio-sorted list of routers self.uptime = uptime self.published = published self.refcount = 0 # How many open circs are we currently in? @@ -558,7 +486,7 @@ class Router: plog("INFO", "No version and/or OS for router " + ns.nickname) return Router(ns.idhex, ns.nickname, bw_observed, dead, exitpolicy, ns.flags, ip, version, os, uptime, published, contact, rate_limited, - ns.orhash, ns.bandwidth, extra_info_digest) + ns.orhash, ns.bandwidth, extra_info_digest, ns.unmeasured) build_from_desc = Callable(build_from_desc) def update_to(self, new): @@ -579,6 +507,12 @@ class Router: return ret plog("WARN", "No matching exit line for "+self.nickname) return False + + def get_unmeasured_bw(self): + # if unmeasured, the ratio of self.bw/self.desc_bw should be 1.0 + if self.unmeasured and self.bw > 0: return self.bw + elif self.desc_bw > 0: return self.desc_bw + else: return 1 class Connection: """A Connection represents a connection to the Tor process via the @@ -617,7 +551,8 @@ class Connection: try: authInfo = self.sendAndRecv("PROTOCOLINFO\r\n")[1][1] except Exception, exc: - excMsg = ": %s" % exc if exc.message else "" + if exc.message: excMsg = ": %s" % exc + else: excMsg = "" raise IOError("Unable to query PROTOCOLINFO for the authentication type%s" % excMsg) authType, cookiePath = None, None @@ -637,7 +572,7 @@ class Connection: else: # not of a recognized authentication type (new addition to the # control-spec?) - raise IOError("Unrecognized authentication type: %s" % authInfo) + plog("INFO", "Unrecognized authentication type: %s" % authInfo) self._authType = authType self._cookiePath = cookiePath @@ -1294,10 +1229,11 @@ def ns_body_iter(data): flags = m.groups() flags = flags[0].strip().split(" ") m = re.match(r"(\S+)\s(\S+)\s(\S+)\s(\S+\s\S+)\s(\S+)\s(\d+)\s(\d+)", nsline) - w = re.search(r"^w Bandwidth=(\d+)", nsline, re.M) - + w = re.search(r"^w Bandwidth=(\d+)(?:\s(Unmeasured)=1)?", nsline, re.M) + unmeasured = None if w: - yield NetworkStatus(*(m.groups()+(flags,)+(int(w.group(1))*1000,))) + if w.groups(2): unmeasured = True + yield NetworkStatus(*(m.groups()+(flags,)+(int(w.group(1))*1000,))+(unmeasured,)) else: yield NetworkStatus(*(m.groups() + (flags,))) @@ -1421,38 +1357,24 @@ class EventHandler(EventSink): evtype,body = body,"" evtype = evtype.upper() if evtype == "CIRC": - m = re.match(r"(\d+)\s+(\S+)(\s\S+)?(\s\S+)?(\s\S+)?(\s\S+)?", body) - if not m: - raise ProtocolError("CIRC event misformatted.") - ident,status,path,purpose,reason,remote = m.groups() - ident = int(ident) - if path: - if "PURPOSE=" in path: - remote = reason - reason = purpose - purpose=path - path=[] - elif "REASON=" in path: - remote = reason - reason = path - purpose = "" - path=[] - else: - path_verb = path.strip().split(",") - path = [] - for p in path_verb: - path.append(p.replace("~", "=").split("=")[0]) + fields = body.split() + (ident,status),rest = fields[:2], fields[2:] + if rest[0].startswith('$'): + path = rest.pop(0) + path_verb = path.strip().split(",") + path = [] + for p in path_verb: + path.append(p.replace("~", "=").split("=")[0]) else: path = [] - - if purpose and "REASON=" in purpose: - remote=reason - reason=purpose - purpose="" - - if purpose: purpose = purpose[9:] - if reason: reason = reason[8:] - if remote: remote = remote[15:] + try: + kwargs = dict([i.split('=') for i in rest]) + except ValueError: + raise ProtocolError("CIRC event misformatted.") + ident = int(ident) + purpose = kwargs.get('PURPOSE', "") + reason = kwargs.get('REASON', None) + remote = kwargs.get('REMOTE_REASON', None) event = CircuitEvent(evtype, ident, status, path, purpose, reason, remote, body) elif evtype == "STREAM": @@ -1708,6 +1630,17 @@ class ConsensusTracker(EventHandler): self.sorted_r.sort(lambda x, y: cmp(y.bw, x.bw)) for i in xrange(len(self.sorted_r)): self.sorted_r[i].list_rank = i + # https://trac.torproject.org/projects/tor/ticket/6131 + # Routers with 'Unmeasured=1' should get a hard-coded ratio of 1.0. + # "Alter the ratio_r sorting to use a Router.get_unmeasured_bw() + # method to return the NetworkStatus bw value instead of desc_bw + # if unmeasured is true... Then the ratio will work out to 1 that way." + + ratio_r = copy.copy(self.sorted_r) + ratio_r.sort(lambda x, y: cmp(float(y.bw)/y.get_unmeasured_bw(), + float(x.bw)/x.get_unmeasured_bw())) + for i in xrange(len(ratio_r)): ratio_r[i].ratio_rank = i + # XXX: Verification only. Can be removed. self._sanity_check(self.sorted_r) @@ -1771,8 +1704,9 @@ class ConsensusTracker(EventHandler): r = r[0] ns = ns[0] - if ns.idhex in self.routers and self.routers[ns.idhex].orhash == r.orhash: - plog("NOTICE", + if ns.idhex in self.routers: + if self.routers[ns.idhex].orhash == r.orhash: + plog("NOTICE", "Got extra NEWDESC event for router "+ns.nickname+"="+ns.idhex) else: self.consensus_count += 1 @@ -1790,6 +1724,11 @@ class ConsensusTracker(EventHandler): self.sorted_r = filter(lambda r: not r.down, self.routers.itervalues()) self.sorted_r.sort(lambda x, y: cmp(y.bw, x.bw)) for i in xrange(len(self.sorted_r)): self.sorted_r[i].list_rank = i + + ratio_r = copy.copy(self.sorted_r) + ratio_r.sort(lambda x, y: cmp(float(y.bw)/y.get_unmeasured_bw(), + float(x.bw)/x.get_unmeasured_bw())) + for i in xrange(len(ratio_r)): ratio_r[i].ratio_rank = i plog("DEBUG", str(time.time()-d.arrived_at)+ " Read " + str(len(d.idlist)) +" ND => "+str(len(self.sorted_r))+" routers. Update: "+str(update)) # XXX: Verification only. Can be removed. @@ -1819,6 +1758,11 @@ class ConsensusTracker(EventHandler): self.sorted_r = filter(lambda r: not r.down, self.routers.itervalues()) self.sorted_r.sort(lambda x, y: cmp(y.bw, x.bw)) for i in xrange(len(self.sorted_r)): self.sorted_r[i].list_rank = i + + ratio_r = copy.copy(self.sorted_r) + ratio_r.sort(lambda x, y: cmp(float(y.bw)/y.get_unmeasured_bw(), + float(x.bw)/x.get_unmeasured_bw())) + for i in xrange(len(ratio_r)): ratio_r[i].ratio_rank = i self._sanity_check(self.sorted_r) def current_consensus(self): @@ -1901,3 +1845,83 @@ def parseHostAndPort(h): return host, port +def connect(controlAddr="127.0.0.1", controlPort=9051, passphrase=None, + ConnClass=Connection): + """ + Convenience function for quickly getting a TorCtl connection. This is very + handy for debugging or CLI setup, handling setup and prompting for a password + if necessary (if either none is provided as input or it fails). If any issues + arise this prints a description of the problem and returns None. + + Arguments: + controlAddr - ip address belonging to the controller + controlPort - port belonging to the controller + passphrase - authentication passphrase (if defined this is used rather + than prompting the user) + """ + + conn = None + try: + conn, authType, authValue = preauth_connect(controlAddr, controlPort, + ConnClass) + + if authType == AUTH_TYPE.PASSWORD: + # password authentication, promting for the password if it wasn't provided + if passphrase: authValue = passphrase + else: + try: authValue = getpass.getpass() + except KeyboardInterrupt: return None + + conn.authenticate(authValue) + return conn + except Exception, exc: + if conn: conn.close() + + if passphrase and str(exc) == "Unable to authenticate: password incorrect": + # provide a warning that the provided password didn't work, then try + # again prompting for the user to enter it + print INCORRECT_PASSWORD_MSG + return connect(controlAddr, controlPort) + else: + print exc + return None + +def preauth_connect(controlAddr="127.0.0.1", controlPort=9051, + ConnClass=Connection): + """ + Provides an uninitiated torctl connection components for the control port, + returning a tuple of the form... + (torctl connection, authType, authValue) + + The authValue corresponds to the cookie path if using an authentication + cookie, otherwise this is the empty string. This raises an IOError in case + of failure. + + Arguments: + controlAddr - ip address belonging to the controller + controlPort - port belonging to the controller + """ + + conn = None + try: + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.connect((controlAddr, controlPort)) + conn = ConnClass(s) + authType, authValue = conn.get_auth_type(), "" + + if authType == AUTH_TYPE.COOKIE: + authValue = conn.get_auth_cookie_path() + + return (conn, authType, authValue) + except socket.error, exc: + if conn: conn.close() + + if "Connection refused" in exc.args: + # most common case - tor control port isn't available + raise IOError("Connection refused. Is the ControlPort enabled?") + + raise IOError("Failed to establish socket: %s" % exc) + except Exception, exc: + if conn: conn.close() + raise IOError(exc) + @@ -417,3 +417,30 @@ Usage: lzprob(z) prob = ((1.0-x)*0.5) return prob +def get_git_version(path_to_repo): + """ Returns a tuple of the branch and head from a git repo (.git) + if available, or returns ('unknown', 'unknown') + """ + try: + f = open(path_to_repo+'HEAD') + ref = f.readline().strip().split(' ') + f.close() + except IOError, e: + plog('NOTICE', 'Git Repo at %s Not Found' % path_to_repo) + return ('unknown','unknown') + try: + if len(ref) > 1: + f = open(path_to_repo+ref[1]) + branch = ref[1].strip().split('/')[-1] + head = f.readline().strip() + else: + branch = 'detached' + head = ref[0] + f.close() + return (branch, head) + except IOError, e: + pass + except IndexError, e: + pass + plog('NOTICE', 'Git Repo at %s Not Found' % path_to_repo) + return ('unknown','unknown') diff --git a/debian/changelog b/debian/changelog index 5dd3c2a..ae9877b 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,18 @@ +python-torctl (20130920git-2) unstable; urgency=high + + * Transitioning from pysupport to dh_python2, missed dh_ call. + Thanks MaximilianStein (Closes: #747527). + + -- Ulises Vitulli <dererk@debian.org> Mon, 12 May 2014 12:21:12 -0300 + +python-torctl (20130920git-1) unstable; urgency=medium + + * New upstream git snapshot, fixes several relevant issues. + * Drop unnecessary dependency on python-socksipy. Thanks SebastianRamacher + (Closes: 707561). + + -- Ulises Vitulli <dererk@debian.org> Fri, 02 May 2014 09:32:19 -0300 + python-torctl (20110618git-1) unstable; urgency=low * New upstream git snapshot, fixes several relevant issues. diff --git a/debian/control b/debian/control index 821808e..4ae5f4a 100644 --- a/debian/control +++ b/debian/control @@ -3,14 +3,14 @@ Section: python Priority: optional Maintainer: Ulises Vitulli <dererk@debian.org> Build-Depends: debhelper (>= 7.0.50~) -Build-Depends-Indep: python, python-support (>= 0.4) -Standards-Version: 3.9.2.0 +Build-Depends-Indep: python +Standards-Version: 3.9.5 Homepage: https://gitweb.torproject.org/pytorctl.git XS-Python-Version: >= 2.4 Package: python-torctl Architecture: all -Depends: ${python:Depends}, ${misc:Depends}, python-socksipy, python-geoip +Depends: ${misc:Depends}, python (>= 2.7), python-geoip Description: Tor control library for Python TorCtl is a Python Tor controller with extensions to support path building and various constraints on node and path selection, as well as diff --git a/debian/rules b/debian/rules index 0c6ad5b..9c98aa6 100755 --- a/debian/rules +++ b/debian/rules @@ -36,7 +36,7 @@ binary-indep: build install dh_installchangelogs ChangeLog dh_installdocs README dh_installexamples example.py ScanSupport.py SQLSupport.py - dh_pysupport + dh_python2 dh_compress dh_fixperms dh_installdeb |
