summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ChangeLog294
-rw-r--r--PathSupport.py49
-rw-r--r--README9
-rw-r--r--SQLSupport.py319
-rw-r--r--ScanSupport.py20
-rwxr-xr-xTorCtl.py258
-rw-r--r--TorUtil.py27
-rw-r--r--debian/changelog15
-rw-r--r--debian/control6
-rwxr-xr-xdebian/rules2
10 files changed, 775 insertions, 224 deletions
diff --git a/ChangeLog b/ChangeLog
index eef0813..47debbc 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,297 @@
+commit 4fdd2031e6b231ed4bbaa79940f67e9b8f691382
+Author: aagbsn <aagbsn@extc.org>
+Date: Fri Sep 20 16:52:58 2013 +0200
+
+ 9771 - Keep Torflow happy
+
+ Torflow expects the PURPOSE to not be None but "" if no PURPOSE is set.
+
+commit c161f352e7fabd88cdd7eb9249a55957038b07d9
+Author: aagbsn <aagbsn@extc.org>
+Date: Thu Sep 19 12:44:50 2013 +0200
+
+ 9771 - Fix CIRC event parsing
+
+ In torspec.git commit e195a4c8d288eb27385060740b8fde170a5e3e38 the
+ format of the CIRC event was changed slightly. This commit parses this
+ newer format.
+
+commit 68bc5de84dc90e1292c6aa19abd95f660b5e3277
+Author: aagbsn <aagbsn@extc.org>
+Date: Tue Mar 5 19:56:29 2013 +0100
+
+ 8399 - Do not return 0 from get_unmeasured_bw
+
+commit e6c7fd73e75456ea216402ff21faa2cc31e4c603
+Author: aagbsn <aagbsn@extc.org>
+Date: Sat Mar 2 10:18:05 2013 +0100
+
+ 8273 - Remove Fast Flag requirement for measurements
+
+commit e27fb553b471b8f0774ae81cb024b5507d49894e
+Author: aagbsn <aagbsn@extc.org>
+Date: Thu Feb 28 11:48:41 2013 +0100
+
+ 6131 - BwAuths learn to recognize Unmeasured=1 in consensus
+
+ Routers with the Unmeasured flag get a hard-coded consensus to descriptor
+ ratio of 1.0
+
+commit 853cb1e2040c31f90c89d4941e18a8f9ae677146
+Author: Mike Perry <mikeperry+git@torproject.org>
+Date: Sat Dec 29 16:30:53 2012 -0800
+
+ Mention stem and txtorcon as alternatives.
+
+commit 305a759d99dd01f60faed9aa036b37746d3c54c5
+Author: Mike Perry <mikeperry+git@torproject.org>
+Date: Wed Apr 18 16:47:25 2012 -0700
+
+ Turn an exception into a log.
+
+commit 3de91767e208d1642a8cf89dc1b645f637abaf8e
+Author: Mike Perry <mikeperry-git@fscked.org>
+Date: Tue Nov 15 18:42:18 2011 -0800
+
+ Make memory-only sqlite work.
+
+commit 391b4399adda91e8b68ffd6e1fa294efe846debd
+Author: Mike Perry <mikeperry-git@fscked.org>
+Date: Fri Nov 4 15:41:54 2011 -0700
+
+ Don't round the circ fail rate.
+
+commit 66ed91b5c1453b9c25b7070b702e2e548efb4f80
+Merge: e15ed99 a582469
+Author: Mike Perry <mikeperry-git@fscked.org>
+Date: Mon Oct 31 21:34:30 2011 -0700
+
+ Merge commit 'a58246918f58f3fc5663f8772df39cdcd3ccce8d'
+
+commit a58246918f58f3fc5663f8772df39cdcd3ccce8d
+Author: Mike Perry <mikeperry-git@fscked.org>
+Date: Mon Oct 31 21:33:20 2011 -0700
+
+ Add TorCtl piece of the fix for #1984.
+
+commit e15ed99df8e8e04cadd0f3792e42a621df6fbe02
+Author: Mike Perry <mikeperry-git@fscked.org>
+Date: Wed Oct 19 16:32:34 2011 -0700
+
+ Bug #4097: Clear old last_exit before requesting a new one
+
+ Should eliminate the keyerror in the test's node_map, which seemed to be due
+ to really stale last_exits laying around from previous tests on a different
+ consensus.
+
+commit 750d7c392766476d12cc9e47bdffffcce84bcdf5
+Author: Mike Perry <mikeperry-git@fscked.org>
+Date: Tue Oct 4 17:39:44 2011 -0700
+
+ Document a pythonism that briefly confused Aaron and me.
+
+commit 520722b8e3a7bb72bf9b5063f3c1649b53110225
+Author: aagbsn <aagbsn@extc.org>
+Date: Wed Sep 14 16:56:16 2011 -0700
+
+ 4024 - get_git_version does not work for submodule
+
+ Added support for detached heads
+
+commit e99394c46b30ed73ffbf02d5c6a39bf9b04416a2
+Author: aagbsn <aagbsn@extc.org>
+Date: Wed Aug 31 16:23:42 2011 -0700
+
+ use local sessions
+
+ see:
+ http://www.sqlalchemy.org/docs/orm/session.html#lifespan-of-a-contextual-session
+ "This has the effect such that each web request starts fresh with a brand new
+ session, and is the most definitive approach to closing out a request."
+
+commit 097a95c4185b086dbac67e0d2618bd15b9f41521
+Author: aagbsn <aagbsn@extc.org>
+Date: Thu Sep 1 13:53:22 2011 -0700
+
+ NOTICE when git repo not found rather than ERROR
+
+commit a3ed3d01ff93df9d25dc64ffbacfcb6b4ef0befa
+Author: aagbsn <aagbsn@extc.org>
+Date: Wed Aug 31 17:07:27 2011 -0700
+
+ add function to get current branch and head
+
+commit 4a990f47e5ecd26af4f6459ae6d63ea36dd88f00
+Author: aagbsn <aagbsn@extc.org>
+Date: Wed Aug 31 16:24:27 2011 -0700
+
+ remove ;
+
+commit db835ef1f6a252d9923be766f1d7dc813ea9a4eb
+Author: aagbsn <aagbsn@extc.org>
+Date: Wed Aug 31 16:09:00 2011 -0700
+
+ use Elixir model delete method
+
+ try to use the same session for deletes and queries where possible,
+ this may be the cause of an ObjectNotFound exception that occurs
+ right after _update_db(), despite the Router object being in the
+ consensus and also in the db.
+
+commit 741cc04db278cede4bbe0c9e0147066fbfd00a4b
+Author: aagbsn <aagbsn@extc.org>
+Date: Thu Jul 7 15:21:26 2011 -0700
+
+ placeholder for call to refresh_all()
+
+ suspicious session behavior may require refreshing all
+ objects in order to keep tc_session and sessions bound
+ to Elixir synchronized
+
+commit 37f612706ad384d7a1d34a33268f3d1d8449d5f2
+Author: aagbsn <aagbsn@extc.org>
+Date: Thu Jul 7 14:58:37 2011 -0700
+
+ WARN when circ_status_event fails to query Router
+
+commit 25cb2bf77b1a98f2273b0f90c99b57e3d6e21a41
+Merge: 962d30c e538547
+Author: Mike Perry <mikeperry-git@fscked.org>
+Date: Mon Jun 27 11:23:39 2011 -0700
+
+ Merge branch 'ratio-restrictions'
+
+commit 962d30c72ffc85c1df79270d6ac5b25f0336d2d2
+Author: Mike Perry <mikeperry-git@fscked.org>
+Date: Sat Jun 25 10:37:22 2011 -0700
+
+ Allow TorCtl.connect() to specify alternate Connection classes.
+
+ Need to move it for that...
+
+commit e538547dfc2973d7bd99e2d927a1d87be5230704
+Author: Mike Perry <mikeperry-git@fscked.org>
+Date: Fri Jun 24 13:32:37 2011 -0700
+
+ Implement ratio ranking support.
+
+commit dc1cb9ce39fd5c9c5ea50f542642130a44e71fee
+Author: Mike Perry <mikeperry-git@fscked.org>
+Date: Fri Jun 24 12:33:02 2011 -0700
+
+ Add a little more rambling to reset_all().
+
+commit d0f593f2ae245e951323ff7fc3a65fa6a1f16b47
+Merge: 3c438e0 eb736ba
+Author: Mike Perry <mikeperry-git@fscked.org>
+Date: Fri Jun 24 11:57:47 2011 -0700
+
+ Merge remote branch 'aagbsn/testing'
+
+commit 3c438e013b2301f2c5e04bb1aceb907ae5f96576
+Author: Mike Perry <mikeperry-git@fscked.org>
+Date: Fri Jun 24 10:46:59 2011 -0700
+
+ Eliminate a warn about a consensus miscount.
+
+ By fixing the miscount, of course.
+
+commit eb736ba5e24d1e4259db921bda52ab918c043553
+Author: aagbsn <aagbsn@extc.org>
+Date: Thu Jun 23 15:40:05 2011 -0700
+
+ Added refresh_all() and warnings to reset_all()
+
+ SQLSupport.refresh_all() is required to keep Elixir and
+ tc_session in sync. Otherwise it is possible for
+ routers added by the consensus update to not show up
+ in queries using the Elixir model i.e. Router.query.all()
+
+ Also, warnings have been added to SQLSupport.reset_all() because
+ this does not work properly -- in some cases relation tables
+ were not being reset properly (this resulted in old bw measurements
+ being repeated in future output!).
+
+ Finally, even when reset_all() works properly, bwauthority memory
+ usage continues to grow.
+
+commit e1266cb4d716a8a066ea3d92fb3d19305939a058
+Author: aagbsn <aagbsn@extc.org>
+Date: Thu May 19 14:17:12 2011 -0700
+
+ update consensus after resetting stats within the same job
+
+commit 13307d9cc0ed54bf5602c2c9b5e8db86113f3a45
+Author: aagbsn <aagbsn@extc.org>
+Date: Mon Apr 18 10:38:50 2011 -0700
+
+ added reset_stats() to Scansupport.py
+
+ calls parent and SQLSupport reset functions
+ Tests show that SQLSupport.reset_all() may clear too much because
+ if BwAuthority calls Scansupport.reset_stats() after each speedrace()
+ run only the first slice is properly recorded; the rest are empty.
+ See: https://trac.torproject.org/projects/tor/ticket/2947
+
+commit 7cd3224c4615c8b2e3f465ed2d1b22d2f9dbf89f
+Author: Mike Perry <mikeperry-git@fscked.org>
+Date: Mon Jun 20 10:03:30 2011 -0700
+
+ Remove python 2.5ism
+
+commit ef0b770e9fa3ddb63ec52dfa4796c7dff27e8709
+Author: aagbsn <aagbsn@extc.org>
+Date: Sat Jun 18 16:36:24 2011 -0700
+
+ add case for long
+
+commit ffb68994a4d440db9deca2624ed33075f3fb7125
+Author: aagbsn <aagbsn@extc.org>
+Date: Fri Jun 17 10:53:05 2011 -0700
+
+ generalize db message, we now support postgres and mysql too
+
+commit fbf2fcf865bc0d9b65841076fd88e31f321638de
+Author: aagbsn <aagbsn@extc.org>
+Date: Fri Apr 29 16:21:40 2011 -0700
+
+ fixes for divide-by-zeros
+
+ Postgres doesn't ignore divide-by-zeros like MySQL
+ CASE statement added to set the result to NULL if
+ the denominator is zero
+
+commit 3a2b44864417908bae45e835e0502297671e6c7b
+Author: aagbsn <aagbsn@extc.org>
+Date: Sun Apr 24 21:12:56 2011 -0700
+
+ rewrite query for mysql compatibility attempt 2
+
+ this actually appears to work
+
+commit a416402b0a4c6b5406448a5f87538081f26d9368
+Author: aagbsn <aagbsn@extc.org>
+Date: Thu Apr 21 18:28:17 2011 -0700
+
+ rewrite query for mysql compatibility attempt 1
+
+commit c8fee8b7b5ea6c584bb5a75647ee44737e60b66d
+Author: aagbsn <aagbsn@extc.org>
+Date: Sun Apr 10 23:59:09 2011 -0700
+
+ backward compatibility with SQLAlchemy 0.5.x
+
+commit 5161c64139f18b55288dab07e74f9f3f1c5bb704
+Author: aagbsn <aagbsn@extc.org>
+Date: Sun Apr 10 23:24:23 2011 -0700
+
+ SQLAlchemey and Elixir upgrade
+
+ enabled elixir migration aid options.
+
+ renamed a few function calls, as per SQLAlchemy upgrade docs:
+ session.clear() is removed. use session.remove_all()
+
commit af306b53a668168ae92a6c919d4013b81d7e61ad
Author: Mike Perry <mikeperry-git@fscked.org>
Date: Fri Jun 17 17:29:43 2011 -0700
diff --git a/PathSupport.py b/PathSupport.py
index 4e9ad6d..c9c5ee1 100644
--- a/PathSupport.py
+++ b/PathSupport.py
@@ -229,6 +229,27 @@ class PercentileRestriction(NodeRestriction):
def __str__(self):
return self.__class__.__name__+"("+str(self.pct_skip)+","+str(self.pct_fast)+")"
+class RatioPercentileRestriction(NodeRestriction):
+ """Restriction to cut out a percentile slice of the network by ratio of
+ consensus bw to descriptor bw."""
+ def __init__(self, pct_skip, pct_fast, r_list):
+ """Constructor. Sets up the restriction such that routers in the
+ 'pct_skip' to 'pct_fast' percentile of bandwidth rankings are
+ returned from the sorted list 'r_list'"""
+ self.pct_fast = pct_fast
+ self.pct_skip = pct_skip
+ self.sorted_r = r_list
+
+ def r_is_ok(self, r):
+ "Returns true if r is in the percentile boundaries (by rank)"
+ if r.ratio_rank < len(self.sorted_r)*self.pct_skip/100: return False
+ elif r.ratio_rank > len(self.sorted_r)*self.pct_fast/100: return False
+
+ return True
+
+ def __str__(self):
+ return self.__class__.__name__+"("+str(self.pct_skip)+","+str(self.pct_fast)+")"
+
class UptimeRestriction(NodeRestriction):
"""Restriction to filter out routers with uptimes < min_uptime or
> max_uptime"""
@@ -1008,7 +1029,8 @@ class SelectionManager(BaseSelectionManager):
def __init__(self, pathlen, order_exits,
percent_fast, percent_skip, min_bw, use_all_exits,
uniform, use_exit, use_guards,geoip_config=None,
- restrict_guards=False, extra_node_rstr=None, exit_ports=None):
+ restrict_guards=False, extra_node_rstr=None, exit_ports=None,
+ order_by_ratio=False):
BaseSelectionManager.__init__(self)
self.__ordered_exit_gen = None
self.pathlen = pathlen
@@ -1026,6 +1048,7 @@ class SelectionManager(BaseSelectionManager):
self.consensus = None
self.exit_ports = exit_ports
self.extra_node_rstr=extra_node_rstr
+ self.order_by_ratio = order_by_ratio
def reconfigure(self, consensus=None):
try:
@@ -1053,8 +1076,8 @@ class SelectionManager(BaseSelectionManager):
self.path_rstr = PathRestrictionList(
[Subnet16Restriction(), UniqueRestriction()])
- if self.use_guards: entry_flags = ["Guard", "Running", "Fast"]
- else: entry_flags = ["Running", "Fast"]
+ if self.use_guards: entry_flags = ["Guard", "Running"]
+ else: entry_flags = ["Running"]
if self.restrict_guards_only:
nonentry_skip = 0
@@ -1063,21 +1086,26 @@ class SelectionManager(BaseSelectionManager):
nonentry_skip = self.percent_skip
nonentry_fast = self.percent_fast
+ if self.order_by_ratio:
+ PctRstr = RatioPercentileRestriction
+ else:
+ PctRstr = PercentileRestriction
+
# XXX: sometimes we want the ability to do uniform scans
# without the conserve exit restrictions..
entry_rstr = NodeRestrictionList(
- [PercentileRestriction(self.percent_skip, self.percent_fast, sorted_r),
+ [PctRstr(self.percent_skip, self.percent_fast, sorted_r),
OrNodeRestriction(
[FlagsRestriction(["BadExit"]),
ConserveExitsRestriction(self.exit_ports)]),
FlagsRestriction(entry_flags, [])]
)
mid_rstr = NodeRestrictionList(
- [PercentileRestriction(nonentry_skip, nonentry_fast, sorted_r),
+ [PctRstr(nonentry_skip, nonentry_fast, sorted_r),
OrNodeRestriction(
[FlagsRestriction(["BadExit"]),
ConserveExitsRestriction(self.exit_ports)]),
- FlagsRestriction(["Running","Fast"], [])]
+ FlagsRestriction(["Running"], [])]
)
@@ -1087,11 +1115,11 @@ class SelectionManager(BaseSelectionManager):
self.exit_rstr = NodeRestrictionList([IdHexRestriction(self.exit_id)])
elif self.use_all_exits:
self.exit_rstr = NodeRestrictionList(
- [FlagsRestriction(["Running","Fast"], ["BadExit"])])
+ [FlagsRestriction(["Running"], ["BadExit"])])
else:
self.exit_rstr = NodeRestrictionList(
- [PercentileRestriction(nonentry_skip, nonentry_fast, sorted_r),
- FlagsRestriction(["Running","Fast"], ["BadExit"])])
+ [PctRstr(nonentry_skip, nonentry_fast, sorted_r),
+ FlagsRestriction(["Running"], ["BadExit"])])
if self.extra_node_rstr:
entry_rstr.add_restriction(self.extra_node_rstr)
@@ -1579,6 +1607,9 @@ class PathBuilder(TorCtl.ConsensusTracker):
plog("WARN", "Error attaching new stream: "+str(e.args))
return
break
+ # This else clause is executed when we go through the circuit
+ # list without finding an entry (or it is empty).
+ # http://docs.python.org/tutorial/controlflow.html#break-and-continue-statements-and-else-clauses-on-loops
else:
circ = None
try:
diff --git a/README b/README
index 07ac6aa..afee87f 100644
--- a/README
+++ b/README
@@ -1,3 +1,12 @@
+Note: TorCtl is mostly unmaintained. It serves primarily as the support
+library for the Bandwidth Authorities, Exit Scanner, and other projects in
+TorFlow. For more actively maintained python libraries, you may consider using
+Stem or TxTorCon. See:
+https://stem.torproject.org/ and https://github.com/meejah/txtorcon
+
+
+
+
TorCtl Python Bindings
diff --git a/SQLSupport.py b/SQLSupport.py
index 2532973..28963fc 100644
--- a/SQLSupport.py
+++ b/SQLSupport.py
@@ -22,12 +22,18 @@ from TorUtil import meta_port, meta_host, control_port, control_host, control_pa
from TorCtl import EVENT_TYPE, EVENT_STATE, TorCtlError
import sqlalchemy
+import sqlalchemy.pool
import sqlalchemy.orm.exc
from sqlalchemy.orm import scoped_session, sessionmaker, eagerload, lazyload, eagerload_all
+from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy import create_engine, and_, or_, not_, func
-from sqlalchemy.sql import func,select
+from sqlalchemy.sql import func,select,alias,case
from sqlalchemy.schema import ThreadLocalMetaData,MetaData
from elixir import *
+from elixir import options
+# migrate from elixir 06 to 07
+options.MIGRATION_TO_07_AID = True
+
# Nodes with a ratio below this value will be removed from consideration
# for higher-valued nodes
@@ -35,6 +41,13 @@ MIN_RATIO=0.5
NO_FPE=2**-50
+#################### Session Usage ###############
+# What is all this l_session madness? See:
+# http://www.sqlalchemy.org/docs/orm/session.html#lifespan-of-a-contextual-session
+# "This has the effect such that each web request starts fresh with
+# a brand new session, and is the most definitive approach to closing
+# out a request."
+
#################### Model #######################
# In elixir, the session (DB connection) is a property of the model..
@@ -46,7 +59,13 @@ tc_metadata.echo=True
tc_session = scoped_session(sessionmaker(autoflush=True))
def setup_db(db_uri, echo=False, drop=False):
- tc_engine = create_engine(db_uri, echo=echo)
+ # Memory-only sqlite requires some magic options...
+ if db_uri == "sqlite://":
+ tc_engine = create_engine(db_uri, echo=echo,
+ connect_args={'check_same_thread':False},
+ poolclass=sqlalchemy.pool.StaticPool)
+ else:
+ tc_engine = create_engine(db_uri, echo=echo)
tc_metadata.bind = tc_engine
tc_metadata.echo = echo
@@ -59,6 +78,10 @@ def setup_db(db_uri, echo=False, drop=False):
# wouldn't kill you, you know.
tc_session.add = tc_session.save_or_update
+ if sqlalchemy.__version__ < "0.6.0":
+ # clear() replaced with expunge_all
+ tc_session.clear = tc_session.expunge_all
+
class Router(Entity):
using_options(shortnames=True, order_by='-published', session=tc_session, metadata=tc_metadata)
using_mapper_options(save_on_init=False)
@@ -248,6 +271,7 @@ class RouterStats(Entity):
filt_sbw_ratio = Field(Float)
def _compute_stats_relation(stats_clause):
+ l_session = tc_session()
for rs in RouterStats.query.\
filter(stats_clause).\
options(eagerload_all('router.circuits.extensions')).\
@@ -286,12 +310,15 @@ class RouterStats(Entity):
if rs.circ_try_to+rs.circ_try_from > 0:
rs.circ_bi_rate = (1.0*rs.circ_fail_to+rs.circ_fail_from)/(rs.circ_try_to+rs.circ_try_from)
- tc_session.add(rs)
- tc_session.commit()
+ l_session.add(rs)
+ l_session.commit()
+ tc_session.remove()
_compute_stats_relation = Callable(_compute_stats_relation)
+
def _compute_stats_query(stats_clause):
- tc_session.clear()
+ tc_session.expunge_all()
+ l_session = tc_session()
# http://www.sqlalchemy.org/docs/04/sqlexpression.html#sql_update
to_s = select([func.count(Extension.id)],
and_(stats_clause, Extension.table.c.to_node_idhex
@@ -320,15 +347,20 @@ class RouterStats(Entity):
RouterStats.table.c.circ_fail_from:f_from_s,
RouterStats.table.c.avg_first_ext:avg_ext}).execute()
+ # added case() to set NULL and avoid divide-by-zeros (Postgres)
RouterStats.table.update(stats_clause, values=
{RouterStats.table.c.circ_from_rate:
- RouterStats.table.c.circ_fail_from/RouterStats.table.c.circ_try_from,
+ case([(RouterStats.table.c.circ_try_from == 0, None)],
+ else_=(RouterStats.table.c.circ_fail_from/RouterStats.table.c.circ_try_from)),
RouterStats.table.c.circ_to_rate:
- RouterStats.table.c.circ_fail_to/RouterStats.table.c.circ_try_to,
+ case([(RouterStats.table.c.circ_try_to == 0, None)],
+ else_=(RouterStats.table.c.circ_fail_to/RouterStats.table.c.circ_try_to)),
RouterStats.table.c.circ_bi_rate:
- (RouterStats.table.c.circ_fail_to+RouterStats.table.c.circ_fail_from)
+ case([(RouterStats.table.c.circ_try_to+RouterStats.table.c.circ_try_from == 0, None)],
+ else_=((RouterStats.table.c.circ_fail_to+RouterStats.table.c.circ_fail_from)
/
- (RouterStats.table.c.circ_try_to+RouterStats.table.c.circ_try_from)}).execute()
+ (RouterStats.table.c.circ_try_to+RouterStats.table.c.circ_try_from))),
+ }).execute()
# TODO: Give the streams relation table a sane name and reduce this too
@@ -360,17 +392,20 @@ class RouterStats(Entity):
tot_var += (s.bandwidth()-rs.sbw)*(s.bandwidth()-rs.sbw)
tot_var /= s_cnt
rs.sbw_dev = math.sqrt(tot_var)
- tc_session.add(rs)
- tc_session.commit()
+ l_session.add(rs)
+ l_session.commit()
+ tc_session.remove()
_compute_stats_query = Callable(_compute_stats_query)
+
def _compute_stats(stats_clause):
RouterStats._compute_stats_query(stats_clause)
#RouterStats._compute_stats_relation(stats_clause)
_compute_stats = Callable(_compute_stats)
def _compute_ranks():
- tc_session.clear()
+ tc_session.expunge_all()
+ l_session = tc_session()
min_r = select([func.min(BwHistory.rank)],
BwHistory.table.c.router_idhex
== RouterStats.table.c.router_idhex).as_scalar()
@@ -395,26 +430,46 @@ class RouterStats(Entity):
RouterStats.table.c.avg_desc_bw:avg_desc_bw}).execute()
#min_avg_rank = select([func.min(RouterStats.avg_rank)]).as_scalar()
- max_avg_rank = select([func.max(RouterStats.avg_rank)]).as_scalar()
+
+ # the commented query breaks mysql because UPDATE cannot reference
+ # target table in the FROM clause. So we throw in an anonymous alias and wrap
+ # another select around it in order to get the nested SELECT stored into a
+ # temporary table.
+ # FIXME: performance? no idea
+ #max_avg_rank = select([func.max(RouterStats.avg_rank)]).as_scalar()
+ max_avg_rank = select([alias(select([func.max(RouterStats.avg_rank)]))]).as_scalar()
RouterStats.table.update(values=
{RouterStats.table.c.percentile:
(100.0*RouterStats.table.c.avg_rank)/max_avg_rank}).execute()
- tc_session.commit()
+
+ l_session.commit()
+ tc_session.remove()
_compute_ranks = Callable(_compute_ranks)
def _compute_ratios(stats_clause):
- tc_session.clear()
- avg_from_rate = select([func.avg(RouterStats.circ_from_rate)],
- stats_clause).as_scalar()
- avg_to_rate = select([func.avg(RouterStats.circ_to_rate)],
- stats_clause).as_scalar()
- avg_bi_rate = select([func.avg(RouterStats.circ_bi_rate)],
- stats_clause).as_scalar()
- avg_ext = select([func.avg(RouterStats.avg_first_ext)],
- stats_clause).as_scalar()
- avg_sbw = select([func.avg(RouterStats.sbw)],
- stats_clause).as_scalar()
+ tc_session.expunge_all()
+ l_session = tc_session()
+ avg_from_rate = select([alias(
+ select([func.avg(RouterStats.circ_from_rate)],
+ stats_clause)
+ )]).as_scalar()
+ avg_to_rate = select([alias(
+ select([func.avg(RouterStats.circ_to_rate)],
+ stats_clause)
+ )]).as_scalar()
+ avg_bi_rate = select([alias(
+ select([func.avg(RouterStats.circ_bi_rate)],
+ stats_clause)
+ )]).as_scalar()
+ avg_ext = select([alias(
+ select([func.avg(RouterStats.avg_first_ext)],
+ stats_clause)
+ )]).as_scalar()
+ avg_sbw = select([alias(
+ select([func.avg(RouterStats.sbw)],
+ stats_clause)
+ )]).as_scalar()
RouterStats.table.update(stats_clause, values=
{RouterStats.table.c.circ_from_ratio:
@@ -427,10 +482,12 @@ class RouterStats(Entity):
avg_ext/RouterStats.table.c.avg_first_ext,
RouterStats.table.c.sbw_ratio:
RouterStats.table.c.sbw/avg_sbw}).execute()
- tc_session.commit()
+ l_session.commit()
+ tc_session.remove()
_compute_ratios = Callable(_compute_ratios)
def _compute_filtered_relational(min_ratio, stats_clause, filter_clause):
+ l_session = tc_session()
badrouters = RouterStats.query.filter(stats_clause).filter(filter_clause).\
filter(RouterStats.sbw_ratio < min_ratio).all()
@@ -455,18 +512,19 @@ class RouterStats(Entity):
if sbw_cnt: rs.filt_sbw = tot_sbw/sbw_cnt
else: rs.filt_sbw = None
- tc_session.add(rs)
+ l_session.add(rs)
if sqlalchemy.__version__ < "0.5.0":
avg_sbw = RouterStats.query.filter(stats_clause).avg(RouterStats.filt_sbw)
else:
- avg_sbw = tc_session.query(func.avg(RouterStats.filt_sbw)).filter(stats_clause).scalar()
+ avg_sbw = l_session.query(func.avg(RouterStats.filt_sbw)).filter(stats_clause).scalar()
for rs in RouterStats.query.filter(stats_clause).all():
if type(rs.filt_sbw) == float and avg_sbw:
rs.filt_sbw_ratio = rs.filt_sbw/avg_sbw
else:
rs.filt_sbw_ratio = None
- tc_session.add(rs)
- tc_session.commit()
+ l_session.add(rs)
+ l_session.commit()
+ tc_session.remove()
_compute_filtered_relational = Callable(_compute_filtered_relational)
def _compute_filtered_ratios(min_ratio, stats_clause, filter_clause):
@@ -476,18 +534,21 @@ class RouterStats(Entity):
_compute_filtered_ratios = Callable(_compute_filtered_ratios)
def reset():
- tc_session.clear()
+ tc_session.expunge_all()
+ l_session = tc_session()
RouterStats.table.drop()
RouterStats.table.create()
for r in Router.query.all():
rs = RouterStats()
rs.router = r
r.stats = rs
- tc_session.add(r)
- tc_session.commit()
+ l_session.add(r)
+ l_session.commit()
+ tc_session.remove()
reset = Callable(reset)
def compute(pct_low=0, pct_high=100, stat_clause=None, filter_clause=None):
+ l_session = tc_session()
pct_clause = and_(RouterStats.percentile >= pct_low,
RouterStats.percentile < pct_high)
if stat_clause:
@@ -500,10 +561,12 @@ class RouterStats(Entity):
RouterStats._compute_stats(stat_clause)
RouterStats._compute_ratios(stat_clause)
RouterStats._compute_filtered_ratios(MIN_RATIO, stat_clause, filter_clause)
- tc_session.commit()
+ l_session.commit()
+ tc_session.remove()
compute = Callable(compute)
def write_stats(f, pct_low=0, pct_high=100, order_by=None, recompute=False, stat_clause=None, filter_clause=None, disp_clause=None):
+ l_session = tc_session()
if not order_by:
order_by=RouterStats.avg_first_ext
@@ -525,18 +588,21 @@ class RouterStats(Entity):
filt_sbw = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.filt_sbw)
percentile = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.percentile)
else:
- circ_from_rate = tc_session.query(func.avg(RouterStats.circ_from_rate)).filter(pct_clause).filter(stat_clause).scalar()
- circ_to_rate = tc_session.query(func.avg(RouterStats.circ_to_rate)).filter(pct_clause).filter(stat_clause).scalar()
- circ_bi_rate = tc_session.query(func.avg(RouterStats.circ_bi_rate)).filter(pct_clause).filter(stat_clause).scalar()
+ circ_from_rate = l_session.query(func.avg(RouterStats.circ_from_rate)).filter(pct_clause).filter(stat_clause).scalar()
+ circ_to_rate = l_session.query(func.avg(RouterStats.circ_to_rate)).filter(pct_clause).filter(stat_clause).scalar()
+ circ_bi_rate = l_session.query(func.avg(RouterStats.circ_bi_rate)).filter(pct_clause).filter(stat_clause).scalar()
- avg_first_ext = tc_session.query(func.avg(RouterStats.avg_first_ext)).filter(pct_clause).filter(stat_clause).scalar()
- sbw = tc_session.query(func.avg(RouterStats.sbw)).filter(pct_clause).filter(stat_clause).scalar()
- filt_sbw = tc_session.query(func.avg(RouterStats.filt_sbw)).filter(pct_clause).filter(stat_clause).scalar()
- percentile = tc_session.query(func.avg(RouterStats.percentile)).filter(pct_clause).filter(stat_clause).scalar()
+ avg_first_ext = l_session.query(func.avg(RouterStats.avg_first_ext)).filter(pct_clause).filter(stat_clause).scalar()
+ sbw = l_session.query(func.avg(RouterStats.sbw)).filter(pct_clause).filter(stat_clause).scalar()
+ filt_sbw = l_session.query(func.avg(RouterStats.filt_sbw)).filter(pct_clause).filter(stat_clause).scalar()
+ percentile = l_session.query(func.avg(RouterStats.percentile)).filter(pct_clause).filter(stat_clause).scalar()
+
+ tc_session.remove()
def cvt(a,b,c=1):
if type(a) == float: return round(a/c,b)
elif type(a) == int: return a
+ elif type(a) == long: return a
elif type(a) == type(None): return "None"
else: return type(a)
@@ -584,6 +650,7 @@ class RouterStats(Entity):
def write_bws(f, pct_low=0, pct_high=100, order_by=None, recompute=False, stat_clause=None, filter_clause=None, disp_clause=None):
+ l_session = tc_session()
if not order_by:
order_by=RouterStats.avg_first_ext
@@ -598,8 +665,8 @@ class RouterStats(Entity):
sbw = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.sbw)
filt_sbw = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.filt_sbw)
else:
- sbw = tc_session.query(func.avg(RouterStats.sbw)).filter(pct_clause).filter(stat_clause).scalar()
- filt_sbw = tc_session.query(func.avg(RouterStats.filt_sbw)).filter(pct_clause).filter(stat_clause).scalar()
+ sbw = l_session.query(func.avg(RouterStats.sbw)).filter(pct_clause).filter(stat_clause).scalar()
+ filt_sbw = l_session.query(func.avg(RouterStats.filt_sbw)).filter(pct_clause).filter(stat_clause).scalar()
f.write(str(int(time.time()))+"\n")
@@ -614,10 +681,12 @@ class RouterStats(Entity):
f.write("node_id=$"+s.router.idhex+" nick="+s.router.nickname)
f.write(" strm_bw="+str(cvt(s.sbw,0)))
f.write(" filt_bw="+str(cvt(s.filt_sbw,0)))
+ f.write(" circ_fail_rate="+str(s.circ_to_rate))
f.write(" desc_bw="+str(int(cvt(s.avg_desc_bw,0))))
f.write(" ns_bw="+str(int(cvt(s.avg_bw,0)))+"\n")
f.flush()
+ tc_session.remove()
write_bws = Callable(write_bws)
@@ -625,7 +694,18 @@ class RouterStats(Entity):
#################### Model Support ################
def reset_all():
- # Need to keep routers around..
+ l_session = tc_session()
+ plog("WARN", "SQLSupport.reset_all() called. See SQLSupport.py for details")
+ # XXX: We still have a memory leak somewhere in here
+ # Current suspects are sqlite, python-sqlite, or sqlalchemy misuse...
+ # http://stackoverflow.com/questions/5552932/sqlalchemy-misuse-causing-memory-leak
+ # The bandwidth scanners switched to a parent/child model because of this.
+
+ # XXX: WARNING!
+ # Must keep the routers around because circ_status_event may
+ # reference old Routers that are no longer in consensus
+ # and will raise an ObjectDeletedError. See function circ_status_event in
+ # class CircuitListener in SQLSupport.py
for r in Router.query.all():
# This appears to be needed. the relation tables do not get dropped
# automatically.
@@ -634,10 +714,21 @@ def reset_all():
r.detached_streams = []
r.bw_history = []
r.stats = None
- tc_session.add(r)
+ l_session.add(r)
+
+ l_session.commit()
+ tc_session.expunge_all()
- tc_session.commit()
- tc_session.clear()
+ # XXX: WARNING!
+ # May not clear relation all tables! (SQLAlchemy or Elixir bug)
+ # Try:
+ # tc_session.execute('delete from router_streams__stream;')
+
+
+ # XXX: WARNING!
+ # This will cause Postgres databases to hang
+ # on DROP TABLE. Possibly an issue with cascade.
+ # Sqlite works though.
BwHistory.table.drop() # Will drop subclasses
Extension.table.drop()
@@ -651,13 +742,25 @@ def reset_all():
Stream.table.create()
Circuit.table.create()
- tc_session.commit()
+ l_session.commit()
#for r in Router.query.all():
# if len(r.bw_history) or len(r.circuits) or len(r.streams) or r.stats:
# plog("WARN", "Router still has dropped data!")
plog("NOTICE", "Reset all SQL stats")
+ tc_session.remove()
+
+def refresh_all():
+ # necessary to keep all sessions synchronized
+ # This is probably a bug. See reset_all() above.
+ # Call this after update_consensus(), _update_rank_history()
+ # See: ScanSupport.reset_stats()
+ # Could be a cascade problem too, see:
+ # http://stackoverflow.com/questions/3481976/sqlalchemy-objectdeletederror-instance-class-at-has-been-deleted-help
+ # Also see:
+ # http://groups.google.com/group/sqlalchemy/browse_thread/thread/c9099eaaffd7c348
+ [tc_session.refresh(r) for r in Router.query.all()]
##################### End Model Support ####################
@@ -672,6 +775,7 @@ class ConsensusTrackerListener(TorCtl.DualEventListener):
# TODO: What about non-running routers and uptime information?
def _update_rank_history(self, idlist):
+ l_session = tc_session()
plog("INFO", "Consensus change... Updating rank history")
for idhex in idlist:
if idhex not in self.consensus.routers: continue
@@ -683,15 +787,17 @@ class ConsensusTrackerListener(TorCtl.DualEventListener):
bwh = BwHistory(router=r, rank=rc.list_rank, bw=rc.bw,
desc_bw=rc.desc_bw, pub_time=r.published)
r.bw_history.append(bwh)
- #tc_session.add(bwh)
- tc_session.add(r)
+ #l_session.add(bwh)
+ l_session.add(r)
except sqlalchemy.orm.exc.NoResultFound:
plog("WARN", "No descriptor found for consenus router "+str(idhex))
plog("INFO", "Consensus history updated.")
- tc_session.commit()
+ l_session.commit()
+ tc_session.remove()
def _update_db(self, idlist):
+ l_session = tc_session()
# FIXME: It is tempting to delay this as well, but we need
# this info to be present immediately for circuit construction...
plog("INFO", "Consensus change... Updating db")
@@ -704,9 +810,12 @@ class ConsensusTrackerListener(TorCtl.DualEventListener):
continue
if not r: r = Router()
r.from_router(rc)
- tc_session.add(r)
+ l_session.add(r)
plog("INFO", "Consensus db updated")
- tc_session.commit()
+ l_session.commit()
+ tc_session.remove()
+ # testing
+ #refresh_all() # Too many sessions, don't trust commit()
def update_consensus(self):
plog("INFO", "Updating DB with full consensus.")
@@ -719,6 +828,7 @@ class ConsensusTrackerListener(TorCtl.DualEventListener):
TorCtl.DualEventListener.set_parent(self, parent_handler)
def heartbeat_event(self, e):
+ l_session = tc_session()
# This sketchiness is to ensure we have an accurate history
# of each router's rank+bandwidth for the entire duration of the run..
if e.state == EVENT_STATE.PRELISTEN:
@@ -731,8 +841,9 @@ class ConsensusTrackerListener(TorCtl.DualEventListener):
orhash="000000000000000000000000000",
nickname="!!TorClient",
published=datetime.datetime.utcnow())
- tc_session.add(OP)
- tc_session.commit()
+ l_session.add(OP)
+ l_session.commit()
+ tc_session.remove()
self.update_consensus()
# XXX: This hack exists because update_rank_history is expensive.
# However, even if we delay it till the end of the consensus update,
@@ -786,7 +897,9 @@ class CircuitListener(TorCtl.PreEventListener):
self.track_parent = False
def circ_status_event(self, c):
+ l_session = tc_session()
if self.track_parent and c.circ_id not in self.parent_handler.circuits:
+ tc_session.remove()
return # Ignore circuits that aren't ours
# TODO: Hrmm, consider making this sane in TorCtl.
if c.reason: lreason = c.reason
@@ -806,17 +919,25 @@ class CircuitListener(TorCtl.PreEventListener):
last_extend=c.arrived_at)
if self.track_parent:
for r in self.parent_handler.circuits[c.circ_id].path:
- rq = Router.query.options(eagerload('circuits')).filter_by(
+ try:
+ rq = Router.query.options(eagerload('circuits')).filter_by(
idhex=r.idhex).with_labels().one()
+ except NoResultFound:
+ plog("WARN", "Query for Router %s=%s in circ %s failed but was in parent_handler" %
+ (r.nickname, r.idhex, circ.circ_id))
+ tc_session.remove()
+ return
circ.routers.append(rq)
#rq.circuits.append(circ) # done automagically?
- #tc_session.add(rq)
- tc_session.add(circ)
- tc_session.commit()
+ #l_session.add(rq)
+ l_session.add(circ)
+ l_session.commit()
elif c.status == "EXTENDED":
circ = Circuit.query.options(eagerload('extensions')).filter_by(
circ_id = c.circ_id).first()
- if not circ: return # Skip circuits from before we came online
+ if not circ:
+ tc_session.remove()
+ return # Skip circuits from before we came online
e = Extension(circ=circ, hop=len(c.path)-1, time=c.arrived_at)
@@ -835,17 +956,19 @@ class CircuitListener(TorCtl.PreEventListener):
# FIXME: Eager load here?
circ.routers.append(e.to_node)
e.to_node.circuits.append(circ)
- tc_session.add(e.to_node)
+ l_session.add(e.to_node)
e.delta = c.arrived_at - circ.last_extend
circ.last_extend = c.arrived_at
circ.extensions.append(e)
- tc_session.add(e)
- tc_session.add(circ)
- tc_session.commit()
+ l_session.add(e)
+ l_session.add(circ)
+ l_session.commit()
elif c.status == "FAILED":
circ = Circuit.query.filter_by(circ_id = c.circ_id).first()
- if not circ: return # Skip circuits from before we came online
+ if not circ:
+ tc_session.remove()
+ return # Skip circuits from before we came online
circ.expunge()
if isinstance(circ, BuiltCircuit):
@@ -883,14 +1006,16 @@ class CircuitListener(TorCtl.PreEventListener):
e.reason = reason
circ.extensions.append(e)
circ.fail_time = c.arrived_at
- tc_session.add(e)
+ l_session.add(e)
- tc_session.add(circ)
- tc_session.commit()
+ l_session.add(circ)
+ l_session.commit()
elif c.status == "BUILT":
circ = Circuit.query.filter_by(
circ_id = c.circ_id).first()
- if not circ: return # Skip circuits from before we came online
+ if not circ:
+ tc_session.remove()
+ return # Skip circuits from before we came online
circ.expunge()
# Convert to built circuit
@@ -900,8 +1025,8 @@ class CircuitListener(TorCtl.PreEventListener):
circ.built_time = c.arrived_at
circ.tot_delta = c.arrived_at - circ.launch_time
- tc_session.add(circ)
- tc_session.commit()
+ l_session.add(circ)
+ l_session.commit()
elif c.status == "CLOSED":
circ = BuiltCircuit.query.filter_by(circ_id = c.circ_id).first()
if circ:
@@ -919,20 +1044,24 @@ class CircuitListener(TorCtl.PreEventListener):
circ = DestroyedCircuit.query.filter_by(id=circ.id).one()
circ.destroy_reason = reason
circ.destroy_time = c.arrived_at
- tc_session.add(circ)
- tc_session.commit()
+ l_session.add(circ)
+ l_session.commit()
+ tc_session.remove()
class StreamListener(CircuitListener):
def stream_bw_event(self, s):
+ l_session = tc_session()
strm = Stream.query.filter_by(strm_id = s.strm_id).first()
if strm and strm.start_time and strm.start_time < s.arrived_at:
plog("DEBUG", "Got stream bw: "+str(s.strm_id))
strm.tot_read_bytes += s.bytes_read
strm.tot_write_bytes += s.bytes_written
- tc_session.add(strm)
- tc_session.commit()
+ l_session.add(strm)
+ l_session.commit()
+ tc_session.remove()
def stream_status_event(self, s):
+ l_session = tc_session()
if s.reason: lreason = s.reason
else: lreason = "NONE"
if s.remote_reason: rreason = s.remote_reason
@@ -942,8 +1071,9 @@ class StreamListener(CircuitListener):
strm = Stream(strm_id=s.strm_id, tgt_host=s.target_host,
tgt_port=s.target_port, init_status=s.status,
tot_read_bytes=0, tot_write_bytes=0)
- tc_session.add(strm)
- tc_session.commit()
+ l_session.add(strm)
+ l_session.commit()
+ tc_session.remove()
return
strm = Stream.query.filter_by(strm_id = s.strm_id).first()
@@ -951,8 +1081,9 @@ class StreamListener(CircuitListener):
(s.strm_id not in self.parent_handler.streams or \
self.parent_handler.streams[s.strm_id].ignored):
if strm:
- tc_session.delete(strm)
- tc_session.commit()
+ strm.delete()
+ l_session.commit()
+ tc_session.remove()
return # Ignore streams that aren't ours
if not strm:
@@ -966,8 +1097,9 @@ class StreamListener(CircuitListener):
strm.circuit = Circuit.query.filter_by(circ_id=s.circ_id).first()
if not strm.circuit:
plog("NOTICE", "Ignoring prior stream "+str(strm.strm_id)+" with old circuit "+str(s.circ_id))
- tc_session.delete(strm)
- tc_session.commit()
+ strm.delete()
+ l_session.commit()
+ tc_session.remove()
return
else:
circ = None
@@ -993,19 +1125,19 @@ class StreamListener(CircuitListener):
for r in strm.circuit.routers:
plog("DEBUG", "Added router "+r.idhex+" to stream "+str(s.strm_id))
r.streams.append(strm)
- tc_session.add(r)
- tc_session.add(strm)
- tc_session.commit()
+ l_session.add(r)
+ l_session.add(strm)
+ l_session.commit()
elif s.status == "DETACHED":
for r in strm.circuit.routers:
r.detached_streams.append(strm)
- tc_session.add(r)
+ l_session.add(r)
#strm.detached_circuits.append(strm.circuit)
strm.circuit.detached_streams.append(strm)
strm.circuit.streams.remove(strm)
strm.circuit = None
- tc_session.add(strm)
- tc_session.commit()
+ l_session.add(strm)
+ l_session.commit()
elif s.status == "FAILED":
strm.expunge()
# Convert to destroyed circuit
@@ -1014,8 +1146,8 @@ class StreamListener(CircuitListener):
strm = FailedStream.query.filter_by(id=strm.id).one()
strm.fail_time = s.arrived_at
strm.fail_reason = reason
- tc_session.add(strm)
- tc_session.commit()
+ l_session.add(strm)
+ l_session.commit()
elif s.status == "CLOSED":
if isinstance(strm, FailedStream):
strm.close_reason = reason
@@ -1037,8 +1169,9 @@ class StreamListener(CircuitListener):
strm.end_time = s.arrived_at
plog("DEBUG", "Stream "+str(strm.strm_id)+" xmitted "+str(strm.tot_bytes()))
strm.close_reason = reason
- tc_session.add(strm)
- tc_session.commit()
+ l_session.add(strm)
+ l_session.commit()
+ tc_session.remove()
def run_example(host, port):
""" Example of basic TorCtl usage. See PathSupport for more advanced
@@ -1047,11 +1180,13 @@ def run_example(host, port):
print "host is %s:%d"%(host,port)
setup_db("sqlite:///torflow.sqlite", echo=False)
- #print tc_session.query(((func.count(Extension.id)))).filter(and_(FailedExtension.table.c.row_type=='extension', FailedExtension.table.c.from_node_idhex == "7CAA2F5F998053EF5D2E622563DEB4A6175E49AC")).one()
+ #l_session = tc_session()
+ #print l_session.query(((func.count(Extension.id)))).filter(and_(FailedExtension.table.c.row_type=='extension', FailedExtension.table.c.from_node_idhex == "7CAA2F5F998053EF5D2E622563DEB4A6175E49AC")).one()
#return
#for e in Extension.query.filter(FailedExtension.table.c.row_type=='extension').all():
# if e.from_node: print "From: "+e.from_node.idhex+" "+e.from_node.nickname
# if e.to_node: print "To: "+e.to_node.idhex+" "+e.to_node.nickname
+ #tc_session.remove()
#return
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
diff --git a/ScanSupport.py b/ScanSupport.py
index d102c78..e6752c5 100644
--- a/ScanSupport.py
+++ b/ScanSupport.py
@@ -164,6 +164,8 @@ class ScanHandler(PathSupport.PathBuilder):
def notlambda(sm):
plog("DEBUG", "Job for setexit: "+exit_name)
cond.acquire()
+ # Clear last successful exit, we're running a new test
+ self.last_exit = None
sm.set_exit(exit_name)
cond.notify()
cond.release()
@@ -182,7 +184,7 @@ class SQLScanHandler(ScanHandler):
ScanHandler.__init__(self, c, selmgr, RouterClass, strm_selector)
def attach_sql_listener(self, db_uri):
- plog("DEBUG", "Got sqlite: "+db_uri)
+ plog("DEBUG", "Got db: "+db_uri)
SQLSupport.setup_db(db_uri, echo=False, drop=True)
self.sql_consensus_listener = SQLSupport.ConsensusTrackerListener()
self.add_event_listener(self.sql_consensus_listener)
@@ -260,4 +262,18 @@ class SQLScanHandler(ScanHandler):
cond.release()
plog("INFO", "Consensus OK")
-
+ def reset_stats(self):
+ cond = threading.Condition()
+ def notlambda(this):
+ cond.acquire()
+ ScanHandler.reset_stats(self)
+ SQLSupport.reset_all()
+ this.sql_consensus_listener.update_consensus()
+ this.sql_consensus_listener._update_rank_history(this.sql_consensus_listener.consensus.ns_map.iterkeys())
+ SQLSupport.refresh_all()
+ cond.notify()
+ cond.release()
+ cond.acquire()
+ self.schedule_low_prio(notlambda)
+ cond.wait()
+ cond.release()
diff --git a/TorCtl.py b/TorCtl.py
index c515c97..562ea78 100755
--- a/TorCtl.py
+++ b/TorCtl.py
@@ -96,82 +96,6 @@ AUTH_TYPE = Enum2(
INCORRECT_PASSWORD_MSG = "Provided passphrase was incorrect"
-def connect(controlAddr="127.0.0.1", controlPort=9051, passphrase=None):
- """
- Convenience function for quickly getting a TorCtl connection. This is very
- handy for debugging or CLI setup, handling setup and prompting for a password
- if necessary (if either none is provided as input or it fails). If any issues
- arise this prints a description of the problem and returns None.
-
- Arguments:
- controlAddr - ip address belonging to the controller
- controlPort - port belonging to the controller
- passphrase - authentication passphrase (if defined this is used rather
- than prompting the user)
- """
-
- conn = None
- try:
- conn, authType, authValue = preauth_connect(controlAddr, controlPort)
-
- if authType == AUTH_TYPE.PASSWORD:
- # password authentication, promting for the password if it wasn't provided
- if passphrase: authValue = passphrase
- else:
- try: authValue = getpass.getpass()
- except KeyboardInterrupt: return None
-
- conn.authenticate(authValue)
- return conn
- except Exception, exc:
- if conn: conn.close()
-
- if passphrase and str(exc) == "Unable to authenticate: password incorrect":
- # provide a warning that the provided password didn't work, then try
- # again prompting for the user to enter it
- print INCORRECT_PASSWORD_MSG
- return connect(controlAddr, controlPort)
- else:
- print exc
- return None
-
-def preauth_connect(controlAddr="127.0.0.1", controlPort=9051):
- """
- Provides an uninitiated torctl connection components for the control port,
- returning a tuple of the form...
- (torctl connection, authType, authValue)
-
- The authValue corresponds to the cookie path if using an authentication
- cookie, otherwise this is the empty string. This raises an IOError in case
- of failure.
-
- Arguments:
- controlAddr - ip address belonging to the controller
- controlPort - port belonging to the controller
- """
-
- conn = None
- try:
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.connect((controlAddr, controlPort))
- conn = Connection(s)
- authType, authValue = conn.get_auth_type(), ""
-
- if authType == AUTH_TYPE.COOKIE:
- authValue = conn.get_auth_cookie_path()
-
- return (conn, authType, authValue)
- except socket.error, exc:
- if conn: conn.close()
-
- if "Connection refused" in exc.args:
- # most common case - tor control port isn't available
- raise IOError("Connection refused. Is the ControlPort enabled?")
-
- raise IOError("Failed to establish socket: %s" % exc)
- except Exception, exc:
- if conn: conn.close()
- raise IOError(exc)
class TorCtlError(Exception):
"Generic error raised by TorControl code."
@@ -196,7 +120,7 @@ class ErrorReply(TorCtlError):
class NetworkStatus:
"Filled in during NS events"
- def __init__(self, nickname, idhash, orhash, updated, ip, orport, dirport, flags, bandwidth=None):
+ def __init__(self, nickname, idhash, orhash, updated, ip, orport, dirport, flags, bandwidth=None, unmeasured=None):
self.nickname = nickname
self.idhash = idhash
self.orhash = orhash
@@ -206,6 +130,7 @@ class NetworkStatus:
self.flags = flags
self.idhex = (self.idhash + "=").decode("base64").encode("hex").upper()
self.bandwidth = bandwidth
+ self.unmeasured = unmeasured
m = re.search(r"(\d+)-(\d+)-(\d+) (\d+):(\d+):(\d+)", updated)
self.updated = datetime.datetime(*map(int, m.groups()))
@@ -445,13 +370,15 @@ class Router:
else:
(idhex, name, bw, down, exitpolicy, flags, ip, version, os, uptime,
published, contact, rate_limited, orhash,
- ns_bandwidth,extra_info_digest) = args
+ ns_bandwidth,extra_info_digest,unmeasured) = args
self.idhex = idhex
self.nickname = name
if ns_bandwidth != None:
self.bw = ns_bandwidth
else:
- self.bw = bw
+ self.bw = bw
+ if unmeasured:
+ self.unmeasured = True
self.desc_bw = bw
self.exitpolicy = exitpolicy
self.flags = flags # Technicaly from NS doc
@@ -460,6 +387,7 @@ class Router:
self.version = RouterVersion(version)
self.os = os
self.list_rank = 0 # position in a sorted list of routers.
+ self.ratio_rank = 0 # position in a ratio-sorted list of routers
self.uptime = uptime
self.published = published
self.refcount = 0 # How many open circs are we currently in?
@@ -558,7 +486,7 @@ class Router:
plog("INFO", "No version and/or OS for router " + ns.nickname)
return Router(ns.idhex, ns.nickname, bw_observed, dead, exitpolicy,
ns.flags, ip, version, os, uptime, published, contact, rate_limited,
- ns.orhash, ns.bandwidth, extra_info_digest)
+ ns.orhash, ns.bandwidth, extra_info_digest, ns.unmeasured)
build_from_desc = Callable(build_from_desc)
def update_to(self, new):
@@ -579,6 +507,12 @@ class Router:
return ret
plog("WARN", "No matching exit line for "+self.nickname)
return False
+
+ def get_unmeasured_bw(self):
+ # if unmeasured, the ratio of self.bw/self.desc_bw should be 1.0
+ if self.unmeasured and self.bw > 0: return self.bw
+ elif self.desc_bw > 0: return self.desc_bw
+ else: return 1
class Connection:
"""A Connection represents a connection to the Tor process via the
@@ -617,7 +551,8 @@ class Connection:
try:
authInfo = self.sendAndRecv("PROTOCOLINFO\r\n")[1][1]
except Exception, exc:
- excMsg = ": %s" % exc if exc.message else ""
+ if exc.message: excMsg = ": %s" % exc
+ else: excMsg = ""
raise IOError("Unable to query PROTOCOLINFO for the authentication type%s" % excMsg)
authType, cookiePath = None, None
@@ -637,7 +572,7 @@ class Connection:
else:
# not of a recognized authentication type (new addition to the
# control-spec?)
- raise IOError("Unrecognized authentication type: %s" % authInfo)
+ plog("INFO", "Unrecognized authentication type: %s" % authInfo)
self._authType = authType
self._cookiePath = cookiePath
@@ -1294,10 +1229,11 @@ def ns_body_iter(data):
flags = m.groups()
flags = flags[0].strip().split(" ")
m = re.match(r"(\S+)\s(\S+)\s(\S+)\s(\S+\s\S+)\s(\S+)\s(\d+)\s(\d+)", nsline)
- w = re.search(r"^w Bandwidth=(\d+)", nsline, re.M)
-
+ w = re.search(r"^w Bandwidth=(\d+)(?:\s(Unmeasured)=1)?", nsline, re.M)
+ unmeasured = None
if w:
- yield NetworkStatus(*(m.groups()+(flags,)+(int(w.group(1))*1000,)))
+ if w.groups(2): unmeasured = True
+ yield NetworkStatus(*(m.groups()+(flags,)+(int(w.group(1))*1000,))+(unmeasured,))
else:
yield NetworkStatus(*(m.groups() + (flags,)))
@@ -1421,38 +1357,24 @@ class EventHandler(EventSink):
evtype,body = body,""
evtype = evtype.upper()
if evtype == "CIRC":
- m = re.match(r"(\d+)\s+(\S+)(\s\S+)?(\s\S+)?(\s\S+)?(\s\S+)?", body)
- if not m:
- raise ProtocolError("CIRC event misformatted.")
- ident,status,path,purpose,reason,remote = m.groups()
- ident = int(ident)
- if path:
- if "PURPOSE=" in path:
- remote = reason
- reason = purpose
- purpose=path
- path=[]
- elif "REASON=" in path:
- remote = reason
- reason = path
- purpose = ""
- path=[]
- else:
- path_verb = path.strip().split(",")
- path = []
- for p in path_verb:
- path.append(p.replace("~", "=").split("=")[0])
+ fields = body.split()
+ (ident,status),rest = fields[:2], fields[2:]
+ if rest[0].startswith('$'):
+ path = rest.pop(0)
+ path_verb = path.strip().split(",")
+ path = []
+ for p in path_verb:
+ path.append(p.replace("~", "=").split("=")[0])
else:
path = []
-
- if purpose and "REASON=" in purpose:
- remote=reason
- reason=purpose
- purpose=""
-
- if purpose: purpose = purpose[9:]
- if reason: reason = reason[8:]
- if remote: remote = remote[15:]
+ try:
+ kwargs = dict([i.split('=') for i in rest])
+ except ValueError:
+ raise ProtocolError("CIRC event misformatted.")
+ ident = int(ident)
+ purpose = kwargs.get('PURPOSE', "")
+ reason = kwargs.get('REASON', None)
+ remote = kwargs.get('REMOTE_REASON', None)
event = CircuitEvent(evtype, ident, status, path, purpose, reason,
remote, body)
elif evtype == "STREAM":
@@ -1708,6 +1630,17 @@ class ConsensusTracker(EventHandler):
self.sorted_r.sort(lambda x, y: cmp(y.bw, x.bw))
for i in xrange(len(self.sorted_r)): self.sorted_r[i].list_rank = i
+ # https://trac.torproject.org/projects/tor/ticket/6131
+ # Routers with 'Unmeasured=1' should get a hard-coded ratio of 1.0.
+ # "Alter the ratio_r sorting to use a Router.get_unmeasured_bw()
+ # method to return the NetworkStatus bw value instead of desc_bw
+ # if unmeasured is true... Then the ratio will work out to 1 that way."
+
+ ratio_r = copy.copy(self.sorted_r)
+ ratio_r.sort(lambda x, y: cmp(float(y.bw)/y.get_unmeasured_bw(),
+ float(x.bw)/x.get_unmeasured_bw()))
+ for i in xrange(len(ratio_r)): ratio_r[i].ratio_rank = i
+
# XXX: Verification only. Can be removed.
self._sanity_check(self.sorted_r)
@@ -1771,8 +1704,9 @@ class ConsensusTracker(EventHandler):
r = r[0]
ns = ns[0]
- if ns.idhex in self.routers and self.routers[ns.idhex].orhash == r.orhash:
- plog("NOTICE",
+ if ns.idhex in self.routers:
+ if self.routers[ns.idhex].orhash == r.orhash:
+ plog("NOTICE",
"Got extra NEWDESC event for router "+ns.nickname+"="+ns.idhex)
else:
self.consensus_count += 1
@@ -1790,6 +1724,11 @@ class ConsensusTracker(EventHandler):
self.sorted_r = filter(lambda r: not r.down, self.routers.itervalues())
self.sorted_r.sort(lambda x, y: cmp(y.bw, x.bw))
for i in xrange(len(self.sorted_r)): self.sorted_r[i].list_rank = i
+
+ ratio_r = copy.copy(self.sorted_r)
+ ratio_r.sort(lambda x, y: cmp(float(y.bw)/y.get_unmeasured_bw(),
+ float(x.bw)/x.get_unmeasured_bw()))
+ for i in xrange(len(ratio_r)): ratio_r[i].ratio_rank = i
plog("DEBUG", str(time.time()-d.arrived_at)+ " Read " + str(len(d.idlist))
+" ND => "+str(len(self.sorted_r))+" routers. Update: "+str(update))
# XXX: Verification only. Can be removed.
@@ -1819,6 +1758,11 @@ class ConsensusTracker(EventHandler):
self.sorted_r = filter(lambda r: not r.down, self.routers.itervalues())
self.sorted_r.sort(lambda x, y: cmp(y.bw, x.bw))
for i in xrange(len(self.sorted_r)): self.sorted_r[i].list_rank = i
+
+ ratio_r = copy.copy(self.sorted_r)
+ ratio_r.sort(lambda x, y: cmp(float(y.bw)/y.get_unmeasured_bw(),
+ float(x.bw)/x.get_unmeasured_bw()))
+ for i in xrange(len(ratio_r)): ratio_r[i].ratio_rank = i
self._sanity_check(self.sorted_r)
def current_consensus(self):
@@ -1901,3 +1845,83 @@ def parseHostAndPort(h):
return host, port
+def connect(controlAddr="127.0.0.1", controlPort=9051, passphrase=None,
+ ConnClass=Connection):
+ """
+ Convenience function for quickly getting a TorCtl connection. This is very
+ handy for debugging or CLI setup, handling setup and prompting for a password
+ if necessary (if either none is provided as input or it fails). If any issues
+ arise this prints a description of the problem and returns None.
+
+ Arguments:
+ controlAddr - ip address belonging to the controller
+ controlPort - port belonging to the controller
+ passphrase - authentication passphrase (if defined this is used rather
+ than prompting the user)
+ """
+
+ conn = None
+ try:
+ conn, authType, authValue = preauth_connect(controlAddr, controlPort,
+ ConnClass)
+
+ if authType == AUTH_TYPE.PASSWORD:
+ # password authentication, promting for the password if it wasn't provided
+ if passphrase: authValue = passphrase
+ else:
+ try: authValue = getpass.getpass()
+ except KeyboardInterrupt: return None
+
+ conn.authenticate(authValue)
+ return conn
+ except Exception, exc:
+ if conn: conn.close()
+
+ if passphrase and str(exc) == "Unable to authenticate: password incorrect":
+ # provide a warning that the provided password didn't work, then try
+ # again prompting for the user to enter it
+ print INCORRECT_PASSWORD_MSG
+ return connect(controlAddr, controlPort)
+ else:
+ print exc
+ return None
+
+def preauth_connect(controlAddr="127.0.0.1", controlPort=9051,
+ ConnClass=Connection):
+ """
+ Provides an uninitiated torctl connection components for the control port,
+ returning a tuple of the form...
+ (torctl connection, authType, authValue)
+
+ The authValue corresponds to the cookie path if using an authentication
+ cookie, otherwise this is the empty string. This raises an IOError in case
+ of failure.
+
+ Arguments:
+ controlAddr - ip address belonging to the controller
+ controlPort - port belonging to the controller
+ """
+
+ conn = None
+ try:
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.connect((controlAddr, controlPort))
+ conn = ConnClass(s)
+ authType, authValue = conn.get_auth_type(), ""
+
+ if authType == AUTH_TYPE.COOKIE:
+ authValue = conn.get_auth_cookie_path()
+
+ return (conn, authType, authValue)
+ except socket.error, exc:
+ if conn: conn.close()
+
+ if "Connection refused" in exc.args:
+ # most common case - tor control port isn't available
+ raise IOError("Connection refused. Is the ControlPort enabled?")
+
+ raise IOError("Failed to establish socket: %s" % exc)
+ except Exception, exc:
+ if conn: conn.close()
+ raise IOError(exc)
+
diff --git a/TorUtil.py b/TorUtil.py
index b8d6425..fee4407 100644
--- a/TorUtil.py
+++ b/TorUtil.py
@@ -417,3 +417,30 @@ Usage: lzprob(z)
prob = ((1.0-x)*0.5)
return prob
+def get_git_version(path_to_repo):
+ """ Returns a tuple of the branch and head from a git repo (.git)
+ if available, or returns ('unknown', 'unknown')
+ """
+ try:
+ f = open(path_to_repo+'HEAD')
+ ref = f.readline().strip().split(' ')
+ f.close()
+ except IOError, e:
+ plog('NOTICE', 'Git Repo at %s Not Found' % path_to_repo)
+ return ('unknown','unknown')
+ try:
+ if len(ref) > 1:
+ f = open(path_to_repo+ref[1])
+ branch = ref[1].strip().split('/')[-1]
+ head = f.readline().strip()
+ else:
+ branch = 'detached'
+ head = ref[0]
+ f.close()
+ return (branch, head)
+ except IOError, e:
+ pass
+ except IndexError, e:
+ pass
+ plog('NOTICE', 'Git Repo at %s Not Found' % path_to_repo)
+ return ('unknown','unknown')
diff --git a/debian/changelog b/debian/changelog
index 5dd3c2a..ae9877b 100644
--- a/debian/changelog
+++ b/debian/changelog
@@ -1,3 +1,18 @@
+python-torctl (20130920git-2) unstable; urgency=high
+
+ * Transitioning from pysupport to dh_python2, missed dh_ call.
+ Thanks MaximilianStein (Closes: #747527).
+
+ -- Ulises Vitulli <dererk@debian.org> Mon, 12 May 2014 12:21:12 -0300
+
+python-torctl (20130920git-1) unstable; urgency=medium
+
+ * New upstream git snapshot, fixes several relevant issues.
+ * Drop unnecessary dependency on python-socksipy. Thanks SebastianRamacher
+ (Closes: 707561).
+
+ -- Ulises Vitulli <dererk@debian.org> Fri, 02 May 2014 09:32:19 -0300
+
python-torctl (20110618git-1) unstable; urgency=low
* New upstream git snapshot, fixes several relevant issues.
diff --git a/debian/control b/debian/control
index 821808e..4ae5f4a 100644
--- a/debian/control
+++ b/debian/control
@@ -3,14 +3,14 @@ Section: python
Priority: optional
Maintainer: Ulises Vitulli <dererk@debian.org>
Build-Depends: debhelper (>= 7.0.50~)
-Build-Depends-Indep: python, python-support (>= 0.4)
-Standards-Version: 3.9.2.0
+Build-Depends-Indep: python
+Standards-Version: 3.9.5
Homepage: https://gitweb.torproject.org/pytorctl.git
XS-Python-Version: >= 2.4
Package: python-torctl
Architecture: all
-Depends: ${python:Depends}, ${misc:Depends}, python-socksipy, python-geoip
+Depends: ${misc:Depends}, python (>= 2.7), python-geoip
Description: Tor control library for Python
TorCtl is a Python Tor controller with extensions to support path
building and various constraints on node and path selection, as well as
diff --git a/debian/rules b/debian/rules
index 0c6ad5b..9c98aa6 100755
--- a/debian/rules
+++ b/debian/rules
@@ -36,7 +36,7 @@ binary-indep: build install
dh_installchangelogs ChangeLog
dh_installdocs README
dh_installexamples example.py ScanSupport.py SQLSupport.py
- dh_pysupport
+ dh_python2
dh_compress
dh_fixperms
dh_installdeb