summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Fifield <david@bamsoftware.com>2020-02-14 01:25:41 -0700
committerDavid Fifield <david@bamsoftware.com>2020-02-14 01:31:44 -0700
commitd5be0906ffe4ef8de8a9345690713bc362d3bcee (patch)
treee38d8215072753e0b59f69d1baa204fad215fbb1
parentd49ec4b4955475e720040eb70f1c2711c1b9c4be (diff)
Support KCP and QUIC in the server simultaneously.
It works by having separate magic token prefixes for the two protocols. This isn't meant to be used forever, but only temporarily so we can compare KCP and QUIC. https://bugs.torproject.org/33336#ticket
-rw-r--r--client/lib/snowflake.go2
-rw-r--r--common/turbotunnel/consts.go12
-rw-r--r--server/server.go176
3 files changed, 149 insertions, 41 deletions
diff --git a/client/lib/snowflake.go b/client/lib/snowflake.go
index 5f40d12..01dc448 100644
--- a/client/lib/snowflake.go
+++ b/client/lib/snowflake.go
@@ -46,7 +46,7 @@ func Handler(socks net.Conn, snowflakes SnowflakeCollector) error {
}
log.Println("---- Handler: snowflake assigned ----")
// Send the magic Turbo Tunnel token.
- _, err := conn.Write(turbotunnel.Token[:])
+ _, err := conn.Write(turbotunnel.TokenQUIC[:])
if err != nil {
return nil, err
}
diff --git a/common/turbotunnel/consts.go b/common/turbotunnel/consts.go
index 80f70af..a8fc5ed 100644
--- a/common/turbotunnel/consts.go
+++ b/common/turbotunnel/consts.go
@@ -6,9 +6,15 @@ package turbotunnel
import "errors"
-// This magic prefix is how a client opts into turbo tunnel mode. It is just a
-// randomly generated byte string.
-var Token = [8]byte{0x12, 0x93, 0x60, 0x5d, 0x27, 0x81, 0x75, 0xf5}
+type Token [8]byte
+
+// This magic prefix is how a client opts into turbo tunnel mode with KCP. It is
+// just a randomly generated byte string.
+var TokenKCP = [8]byte{0x12, 0x93, 0x60, 0x5d, 0x27, 0x81, 0x75, 0xf5}
+
+// This magic prefix is how a client opts into turbo tunnel mode with QUIC. It
+// is just a randomly generated byte string.
+var TokenQUIC = [8]byte{0x46, 0x7f, 0xa7, 0x7f, 0x88, 0xe9, 0x9c, 0x17}
// The size of receive and send queues.
const queueSize = 32
diff --git a/server/server.go b/server/server.go
index 4871e29..e67d6c9 100644
--- a/server/server.go
+++ b/server/server.go
@@ -29,6 +29,8 @@ import (
"git.torproject.org/pluggable-transports/snowflake.git/common/websocketconn"
"github.com/gorilla/websocket"
"github.com/lucas-clemente/quic-go"
+ "github.com/xtaci/kcp-go"
+ "github.com/xtaci/smux"
"golang.org/x/crypto/acme/autocert"
"golang.org/x/net/http2"
)
@@ -123,9 +125,10 @@ var upgrader = websocket.Upgrader{
// When we call pt.DialOr, tor wants us to provide a USERADDR string that
// represents the remote IP address of the client (for metrics purposes, etc.).
// This data structure bridges the gap between ServeHTTP, which knows about IP
-// addresses, and handleStream, which is what calls pt.DialOr. The common piece
-// of information linking both ends of the chain is the ClientID, which is
-// attached to the WebSocket connection and every session.
+// addresses, and handleStreamKCP and handleStreamQUIC, which are what call
+// pt.DialOr. The common piece of information linking both ends of the chain is
+// the ClientID, which is attached to the WebSocket connection and every
+// session.
var clientIDAddrMap = newClientIDMap(clientIDAddrMapCapacity)
// overrideReadConn is a net.Conn with an overridden Read method. Compare to
@@ -141,9 +144,10 @@ func (conn *overrideReadConn) Read(p []byte) (int, error) {
}
type HTTPHandler struct {
- // pconn is the adapter layer between stream-oriented WebSocket
- // connections and the packet-oriented QUIC layer.
- pconn *turbotunnel.QueuePacketConn
+ // pconnKCP and pconnQUIC are the adapter layer between stream-oriented
+ // WebSocket connections and packet-oriented KCP or QUIC layers.
+ pconnKCP *turbotunnel.QueuePacketConn
+ pconnQUIC *turbotunnel.QueuePacketConn
}
func (handler *HTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@@ -160,7 +164,7 @@ func (handler *HTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
clientIPParam := r.URL.Query().Get("client_ip")
addr := clientAddr(clientIPParam)
- var token [len(turbotunnel.Token)]byte
+ var token turbotunnel.Token
_, err = io.ReadFull(conn, token[:])
if err != nil {
// Don't bother logging EOF: that happens with an unused
@@ -173,8 +177,10 @@ func (handler *HTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
switch {
- case bytes.Equal(token[:], turbotunnel.Token[:]):
- err = turbotunnelMode(conn, addr, handler.pconn)
+ case bytes.Equal(token[:], turbotunnel.TokenKCP[:]):
+ err = turbotunnelMode(conn, addr, handler.pconnKCP)
+ case bytes.Equal(token[:], turbotunnel.TokenQUIC[:]):
+ err = turbotunnelMode(conn, addr, handler.pconnQUIC)
default:
// We didn't find a matching token, which means that we are
// dealing with a client that doesn't know about such things.
@@ -209,9 +215,10 @@ func oneshotMode(conn net.Conn, addr string) error {
return nil
}
-// turbotunnelMode handles clients that sent turbotunnel.Token at the start of
-// their stream. These clients expect to send and receive encapsulated packets,
-// with a long-lived session identified by ClientID.
+// turbotunnelMode handles clients that sent turbotunnel.TokenKCP or
+// turbotunnel.TokenQUIC at the start of their stream. These clients expect to
+// send and receive encapsulated packets, with a long-lived session identified
+// by ClientID.
func turbotunnelMode(conn net.Conn, addr string, pconn *turbotunnel.QueuePacketConn) error {
// Read the ClientID prefix. Every packet encapsulated in this WebSocket
// connection pertains to the same ClientID.
@@ -223,13 +230,13 @@ func turbotunnelMode(conn net.Conn, addr string, pconn *turbotunnel.QueuePacketC
// Store a a short-term mapping from the ClientID to the client IP
// address attached to this WebSocket connection. tor will want us to
- // provide a client IP address when we call pt.DialOr. But a QUIC
+ // provide a client IP address when we call pt.DialOr. But a KCP or QUIC
// session does not necessarily correspond to any single IP
// address--it's composed of packets that are carried in possibly
// multiple WebSocket streams. We apply the heuristic that the IP
// address of the most recent WebSocket connection that has had to do
// with a session, at the time the session is established, is the IP
- // address that should be credited for the entire QUIC session.
+ // address that should be credited for the entire KCP or QUIC session.
clientIDAddrMap.Set(clientID, addr)
errCh := make(chan error)
@@ -237,7 +244,7 @@ func turbotunnelMode(conn net.Conn, addr string, pconn *turbotunnel.QueuePacketC
// The remainder of the WebSocket stream consists of encapsulated
// packets. We read them one by one and feed them into the
// QueuePacketConn on which quic.Listen was set up, which eventually
- // leads to QUIC-level sessions in the acceptSessions function.
+ // leads to KCP- or QUIC-level sessions.
go func() {
for {
p, err := encapsulation.ReadData(conn)
@@ -274,8 +281,8 @@ func turbotunnelMode(conn net.Conn, addr string, pconn *turbotunnel.QueuePacketC
return nil
}
-// handleStream bidirectionally connects a client stream with the ORPort.
-func handleStream(ctx context.Context, stream quic.Stream, addr string) error {
+// handleStreamKCP bidirectionally connects a client stream with the ORPort.
+func handleStreamKCP(stream net.Conn, addr string) error {
statsChannel <- addr != ""
or, err := pt.DialOr(&ptInfo, addr, ptMethodName)
if err != nil {
@@ -288,9 +295,84 @@ func handleStream(ctx context.Context, stream quic.Stream, addr string) error {
return nil
}
-// acceptStreams awaits streams on a QUIC session. Passes each stream to
-// handleStream.
-func acceptStreams(ctx context.Context, sess quic.Session) error {
+// acceptStreamsKCP layers an smux.Session on the KCP connection and awaits
+// streams on it. Passes each stream to handleStreamKCP.
+func acceptStreamsKCP(conn *kcp.UDPSession) error {
+ // Look up the IP address associated with this KCP session, via the
+ // ClientID that is returned by the session's RemoteAddr method.
+ addr, ok := clientIDAddrMap.Get(conn.RemoteAddr().(turbotunnel.ClientID))
+ if !ok {
+ // This means that the map is tending to run over capacity, not
+ // just that there was not client_ip on the incoming connection.
+ // We store "" in the map in the absence of client_ip. This log
+ // message means you should increase clientIDAddrMapCapacity.
+ log.Printf("no address in clientID-to-IP map (capacity %d)", clientIDAddrMapCapacity)
+ }
+
+ smuxConfig := smux.DefaultConfig()
+ smuxConfig.Version = 2
+ sess, err := smux.Server(conn, smuxConfig)
+ if err != nil {
+ return err
+ }
+
+ for {
+ stream, err := sess.AcceptStream()
+ if err != nil {
+ if err, ok := err.(net.Error); ok && err.Temporary() {
+ continue
+ }
+ return err
+ }
+ go func() {
+ defer stream.Close()
+ err := handleStreamKCP(stream, addr)
+ if err != nil {
+ log.Printf("handleStreamKCP: %v", err)
+ }
+ }()
+ }
+}
+
+// acceptSessionsKCP listens for incoming KCP connections and passes them to
+// acceptStreamsKCP. It is handler.ServeHTTP that provides the network interface
+// that drives this function.
+func acceptSessionsKCP(ln *kcp.Listener) error {
+ for {
+ conn, err := ln.AcceptKCP()
+ if err != nil {
+ if err, ok := err.(net.Error); ok && err.Temporary() {
+ continue
+ }
+ return err
+ }
+ go func() {
+ defer conn.Close()
+ err := acceptStreamsKCP(conn)
+ if err != nil {
+ log.Printf("acceptStreamsKCP: %v", err)
+ }
+ }()
+ }
+}
+
+// handleStreamQUIC bidirectionally connects a client stream with the ORPort.
+func handleStreamQUIC(ctx context.Context, stream quic.Stream, addr string) error {
+ statsChannel <- addr != ""
+ or, err := pt.DialOr(&ptInfo, addr, ptMethodName)
+ if err != nil {
+ return fmt.Errorf("connecting to ORPort: %v", err)
+ }
+ defer or.Close()
+
+ proxy(or, stream)
+
+ return nil
+}
+
+// acceptStreamsQUIC awaits streams on a QUIC session. Passes each stream to
+// handleStreamQUIC.
+func acceptStreamsQUIC(ctx context.Context, sess quic.Session) error {
// Look up the IP address associated with this KCP session, via the
// ClientID that is returned by the session's RemoteAddr method.
addr, ok := clientIDAddrMap.Get(sess.RemoteAddr().(turbotunnel.ClientID))
@@ -314,18 +396,18 @@ func acceptStreams(ctx context.Context, sess quic.Session) error {
defer stream.Close()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
- err := handleStream(ctx, stream, addr)
+ err := handleStreamQUIC(ctx, stream, addr)
if err != nil {
- log.Printf("handleStream: %v", err)
+ log.Printf("handleStreamQUIC: %v", err)
}
}()
}
}
-// acceptSessions listens for incoming QUIC sessions and passes them to
-// acceptStreams. It is handler.ServeHTTP that provides the network interface
-// that drives this function.
-func acceptSessions(ctx context.Context, ln quic.Listener) error {
+// acceptSessionsQUIC listens for incoming QUIC sessions and passes them to
+// acceptStreamsQUIC. It is handler.ServeHTTP that provides the network
+// interface that drives this function.
+func acceptSessionsQUIC(ctx context.Context, ln quic.Listener) error {
for {
sess, err := ln.Accept()
if err != nil {
@@ -338,9 +420,9 @@ func acceptSessions(ctx context.Context, ln quic.Listener) error {
defer sess.Close()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
- err := acceptStreams(ctx, sess)
+ err := acceptStreamsQUIC(ctx, sess)
if err != nil {
- log.Printf("acceptStreams: %v", err)
+ log.Printf("acceptStreamsQUIC: %v", err)
}
}()
}
@@ -361,10 +443,12 @@ func initServer(addr *net.TCPAddr,
}
handler := HTTPHandler{
- // pconn is shared among all connections to this server. It
- // overlays packet-based client sessions on top of ephemeral
- // WebSocket connections.
- pconn: turbotunnel.NewQueuePacketConn(addr, clientMapTimeout),
+ // pconnKCP and pconnQUIC are shared among all KCP and QUIC
+ // connections to this server, respectively. They overlays
+ // packet-based client sessions on top of ephemeral WebSocket
+ // connections.
+ pconnKCP: turbotunnel.NewQueuePacketConn(addr, clientMapTimeout),
+ pconnQUIC: turbotunnel.NewQueuePacketConn(addr, clientMapTimeout),
}
server := &http.Server{
Addr: addr.String(),
@@ -401,11 +485,29 @@ func initServer(addr *net.TCPAddr,
break
}
+ // Start a KCP engine, set up to read and write its packets over the
+ // WebSocket connections that arrive at the web server.
+ // handler.ServeHTTP is responsible for encapsulation/decapsulation of
+ // packets on behalf of KCP. KCP takes those packets and turns them into
+ // sessions which appear in the acceptSessions function.
+ lnKCP, err := kcp.ServeConn(nil, 0, 0, handler.pconnKCP)
+ if err != nil {
+ server.Close()
+ return server, err
+ }
+ go func() {
+ defer lnKCP.Close()
+ err := acceptSessionsKCP(lnKCP)
+ if err != nil {
+ log.Printf("acceptSessions: %v", err)
+ }
+ }()
+
// Start a QUIC engine, set up to read and write its packets over the
// WebSocket connections that arrive at the web server.
// handler.ServeHTTP is responsible for encapsulation/decapsulation of
// packets on behalf of QUIC. QUIC takes those packets and turns them
- // into sessions which appear in the acceptSessions function.
+ // into sessions which appear in the acceptSessionsQUIC function.
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{*quicCertificate},
NextProtos: []string{quicNextProto},
@@ -414,18 +516,18 @@ func initServer(addr *net.TCPAddr,
HandshakeTimeout: 2 * time.Minute,
IdleTimeout: 10 * time.Minute,
}
- ln, err := quic.Listen(handler.pconn, tlsConfig, quicConfig)
+ lnQUIC, err := quic.Listen(handler.pconnQUIC, tlsConfig, quicConfig)
if err != nil {
server.Close()
return server, err
}
go func() {
- defer ln.Close()
+ defer lnQUIC.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- err := acceptSessions(ctx, ln)
+ err := acceptSessionsQUIC(ctx, lnQUIC)
if err != nil {
- log.Printf("acceptSessions: %v", err)
+ log.Printf("acceptSessionsQUIC: %v", err)
}
}()