diff --git a/client/lib/connwrapper.go b/client/lib/connwrapper.go new file mode 100644 index 0000000000000000000000000000000000000000..0db9324989cd4af8bca8f817717fc257bfcf6465 --- /dev/null +++ b/client/lib/connwrapper.go @@ -0,0 +1,71 @@ +package snowflake_client + +import ( + "errors" + "io" + "net" + "time" +) + +type ReadWriteCloserPreservesBoundary interface { + io.ReadWriteCloser + MessageBoundaryPreserved() +} + +func ConfirmsReadWriteCloserPreservesMessageBoundary(rwc io.ReadWriteCloser) ReadWriteCloserPreservesBoundary { + return &messageBoundaryPreservedReadWriteCloser{rwc} +} + +type messageBoundaryPreservedReadWriteCloser struct { + io.ReadWriteCloser +} + +func (m *messageBoundaryPreservedReadWriteCloser) MessageBoundaryPreserved() {} + +var errENOSYS = errors.New("not implemented") + +func newPacketConnWrapper(localAddr, remoteAddr net.Addr, rwc ReadWriteCloserPreservesBoundary) net.PacketConn { + return &packetConnWrapper{ + ReadWriteCloserPreservesBoundary: rwc, + remoteAddr: remoteAddr, + localAddr: localAddr, + } +} + +type packetConnWrapper struct { + ReadWriteCloserPreservesBoundary + remoteAddr net.Addr + localAddr net.Addr +} + +func (pcw *packetConnWrapper) ReadFrom(p []byte) (n int, addr net.Addr, err error) { + n, err = pcw.Read(p) + if err != nil { + return 0, nil, err + } + return n, pcw.remoteAddr, nil +} + +func (pcw *packetConnWrapper) WriteTo(p []byte, addr net.Addr) (n int, err error) { + return pcw.Write(p) +} + +func (pcw *packetConnWrapper) Close() error { + return pcw.ReadWriteCloserPreservesBoundary.Close() +} + +func (pcw *packetConnWrapper) LocalAddr() net.Addr { + return pcw.localAddr +} + +func (pcw *packetConnWrapper) SetDeadline(t time.Time) error { + return errENOSYS +} + +func (pcw *packetConnWrapper) SetReadDeadline(t time.Time) error { + return errENOSYS +} + +func (pcw *packetConnWrapper) SetWriteDeadline(t time.Time) error { + return errENOSYS +} diff --git a/client/lib/rendezvous.go b/client/lib/rendezvous.go index 91ba088f3291e8f6d6a7768e4f8d1766cd9d8182..517320c9692b648348a0bdf752a441a12d14aa60 100644 --- a/client/lib/rendezvous.go +++ b/client/lib/rendezvous.go @@ -20,6 +20,7 @@ import ( "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/event" "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/messages" "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/nat" + "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/turbotunnel" "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/util" utlsutil "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/utls" ) @@ -188,6 +189,7 @@ type WebRTCDialer struct { eventLogger event.SnowflakeEventReceiver proxy *url.URL + clientID turbotunnel.ClientID } // Deprecated: Use NewWebRTCDialerWithEventsAndProxy instead @@ -203,6 +205,13 @@ func NewWebRTCDialerWithEvents(broker *BrokerChannel, iceServers []webrtc.ICESer // NewWebRTCDialerWithEventsAndProxy constructs a new WebRTCDialer. func NewWebRTCDialerWithEventsAndProxy(broker *BrokerChannel, iceServers []webrtc.ICEServer, max int, eventLogger event.SnowflakeEventReceiver, proxy *url.URL, +) *WebRTCDialer { + return newWebRTCDialerWithEventsProxyAndClientID(broker, iceServers, max, eventLogger, proxy, turbotunnel.NewClientID()) +} + +// NewWebRTCDialerWithEventsProxyAndClientID constructs a new WebRTCDialer. +func newWebRTCDialerWithEventsProxyAndClientID(broker *BrokerChannel, iceServers []webrtc.ICEServer, max int, + eventLogger event.SnowflakeEventReceiver, proxy *url.URL, clientID turbotunnel.ClientID, ) *WebRTCDialer { config := webrtc.Configuration{ ICEServers: iceServers, @@ -215,6 +224,7 @@ func NewWebRTCDialerWithEventsAndProxy(broker *BrokerChannel, iceServers []webrt eventLogger: eventLogger, proxy: proxy, + clientID: clientID, } } @@ -222,7 +232,7 @@ func NewWebRTCDialerWithEventsAndProxy(broker *BrokerChannel, iceServers []webrt func (w WebRTCDialer) Catch() (*WebRTCPeer, error) { // TODO: [#25591] Fetch ICE server information from Broker. // TODO: [#25596] Consider TURN servers here too. - return NewWebRTCPeerWithEventsAndProxy(w.webrtcConfig, w.BrokerChannel, w.eventLogger, w.proxy) + return NewWebRTCPeerWithEventsProxyAndClientID(w.webrtcConfig, w.BrokerChannel, w.eventLogger, w.proxy, w.clientID) } // GetMax returns the maximum number of snowflakes to collect. diff --git a/client/lib/snowflake.go b/client/lib/snowflake.go index 7d405cd50b3ccd0bf1d86458ec28e8d46ac144d2..88936182a4b8ef05455488cdf5e99a7e0b016482 100644 --- a/client/lib/snowflake.go +++ b/client/lib/snowflake.go @@ -32,6 +32,7 @@ import ( "math/rand" "net" "net/url" + "os" "strings" "time" @@ -161,7 +162,10 @@ func NewSnowflakeClient(config ClientConfig) (*Transport, error) { max = config.Max } eventsLogger := event.NewSnowflakeEventDispatcher() - transport := &Transport{dialer: NewWebRTCDialerWithEventsAndProxy(broker, iceServers, max, eventsLogger, config.CommunicationProxy), eventDispatcher: eventsLogger} + transport := &Transport{ + dialer: NewWebRTCDialerWithEventsAndProxy(broker, iceServers, max, eventsLogger, config.CommunicationProxy), + eventDispatcher: eventsLogger, + } return transport, nil } @@ -317,13 +321,14 @@ func parseIceServers(addresses []string) []webrtc.ICEServer { // over. The net.PacketConn successively connects through Snowflake proxies // pulled from snowflakes. func newSession(snowflakes SnowflakeCollector) (net.PacketConn, *smux.Session, error) { - clientID := turbotunnel.NewClientID() - // We build a persistent KCP session on a sequence of ephemeral WebRTC // connections. This dialContext tells RedialPacketConn how to get a new - // WebRTC connection when the previous one dies. Inside each WebRTC - // connection, we use encapsulationPacketConn to encode packets into a + // WebRTC connection when the previous one dies. + // If Stream based transport are used, inside each WebRTC connection, + // we use encapsulationPacketConn to encode packets into a // stream. + // If Packet based transport are used, inside each WebRTC connection, + // packets are sent directly over unreliable data channel. dialContext := func(ctx context.Context) (net.PacketConn, error) { log.Printf("redialing on same connection") // Obtain an available WebRTC remote. May block. @@ -332,17 +337,9 @@ func newSession(snowflakes SnowflakeCollector) (net.PacketConn, *smux.Session, e return nil, errors.New("handler: Received invalid Snowflake") } log.Println("---- Handler: snowflake assigned ----") - // Send the magic Turbo Tunnel token. - _, err := conn.Write(turbotunnel.Token[:]) - if err != nil { - return nil, err - } - // Send ClientID prefix. - _, err = conn.Write(clientID[:]) - if err != nil { - return nil, err - } - return newEncapsulationPacketConn(dummyAddr{}, dummyAddr{}, conn), nil + + packetConnWrapper := newPacketConnWrapper(dummyAddr{}, dummyAddr{}, ConfirmsReadWriteCloserPreservesMessageBoundary(conn)) + return packetConnWrapper, nil } pconn := turbotunnel.NewRedialPacketConn(dummyAddr{}, dummyAddr{}, dialContext) @@ -368,6 +365,14 @@ func newSession(snowflakes SnowflakeCollector) (net.PacketConn, *smux.Session, e 0, // default resend 1, // nc=1 => congestion window off ) + if os.Getenv("SNOWFLAKE_TEST_KCP_FAST3MODE") == "1" { + conn.SetNoDelay( + 1, + 10, + 2, + 1, + ) + } // On the KCP connection we overlay an smux session and stream. smuxConfig := smux.DefaultConfig() smuxConfig.Version = 2 diff --git a/client/lib/webrtc.go b/client/lib/webrtc.go index 8ccc3d077198e2c86b35f841883790dc8f92cf18..14e11fd7deae702d4dad36ed73b6d56f5f6913e8 100644 --- a/client/lib/webrtc.go +++ b/client/lib/webrtc.go @@ -5,6 +5,7 @@ import ( "encoding/hex" "errors" "fmt" + "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/messages" "io" "log" "net/url" @@ -19,6 +20,7 @@ import ( "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/event" "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/proxy" + "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/turbotunnel" ) // WebRTCPeer represents a WebRTC connection to a remote snowflake proxy. @@ -43,31 +45,41 @@ type WebRTCPeer struct { bytesLogger bytesLogger eventsLogger event.SnowflakeEventReceiver proxy *url.URL + + clientID turbotunnel.ClientID } -// Deprecated: Use NewWebRTCPeerWithEventsAndProxy Instead. -func NewWebRTCPeer( +// Deprecated: Use NewWebRTCPeerWithEventsProxyAndClientID Instead. +func newWebRTCPeer( config *webrtc.Configuration, broker *BrokerChannel, ) (*WebRTCPeer, error) { - return NewWebRTCPeerWithEventsAndProxy(config, broker, nil, nil) + return newWebRTCPeerWithEventsAndProxy(config, broker, nil, nil) } -// Deprecated: Use NewWebRTCPeerWithEventsAndProxy Instead. -func NewWebRTCPeerWithEvents( +// Deprecated: Use NewWebRTCPeerWithEventsProxyAndClientID Instead. +func newWebRTCPeerWithEvents( config *webrtc.Configuration, broker *BrokerChannel, eventsLogger event.SnowflakeEventReceiver, ) (*WebRTCPeer, error) { - return NewWebRTCPeerWithEventsAndProxy(config, broker, eventsLogger, nil) + return newWebRTCPeerWithEventsAndProxy(config, broker, eventsLogger, nil) +} + +// Deprecated: Use NewWebRTCPeerWithEventsProxyAndClientID Instead. +func newWebRTCPeerWithEventsAndProxy(config *webrtc.Configuration, + broker *BrokerChannel, eventsLogger event.SnowflakeEventReceiver, proxy *url.URL, +) (*WebRTCPeer, error) { + return NewWebRTCPeerWithEventsProxyAndClientID(config, broker, eventsLogger, proxy, turbotunnel.ClientID{}) } -// NewWebRTCPeerWithEventsAndProxy constructs a WebRTC PeerConnection to a snowflake proxy. +// NewWebRTCPeerWithEventsProxyAndClientID constructs a WebRTC PeerConnection to a snowflake proxy. // // The creation of the peer handles the signaling to the Snowflake broker, including // the exchange of SDP information, the creation of a PeerConnection, and the establishment // of a DataChannel to the Snowflake proxy. -func NewWebRTCPeerWithEventsAndProxy( - config *webrtc.Configuration, broker *BrokerChannel, - eventsLogger event.SnowflakeEventReceiver, proxy *url.URL, +// clientID is the hinted ID for the connection. +func NewWebRTCPeerWithEventsProxyAndClientID(config *webrtc.Configuration, + broker *BrokerChannel, eventsLogger event.SnowflakeEventReceiver, proxy *url.URL, + clientID turbotunnel.ClientID, ) (*WebRTCPeer, error) { if eventsLogger == nil { eventsLogger = event.NewSnowflakeEventDispatcher() @@ -91,6 +103,7 @@ func NewWebRTCPeerWithEventsAndProxy( connection.eventsLogger = eventsLogger connection.proxy = proxy + connection.clientID = clientID err := connection.connect(config, broker) if err != nil { @@ -235,9 +248,18 @@ func (c *WebRTCPeer) preparePeerConnection(config *webrtc.Configuration) error { log.Printf("NewPeerConnection ERROR: %s", err) return err } - ordered := true + ordered := false + var maxRetransmission uint16 = 0 + connectionMetadata := messages.ClientConnectionMetadata{ClientID: c.clientID[:]} + encodedMetadata, err := connectionMetadata.EncodeConnectionMetadata() + if err != nil { + return err + } + protocol := encodedMetadata dataChannelOptions := &webrtc.DataChannelInit{ - Ordered: &ordered, + Ordered: &ordered, + Protocol: &protocol, + MaxRetransmits: &maxRetransmission, } // We must create the data channel before creating an offer // https://github.com/pion/webrtc/wiki/Release-WebRTC@v3.0.0#a-data-channel-is-no-longer-implicitly-created-with-a-peerconnection diff --git a/client/snowflake.go b/client/snowflake.go index 6f4a0e2727aaa7d76e9a27132db6929afb6f12fd..10bacb66deede8f8c5987726e214cb5254742d37 100644 --- a/client/snowflake.go +++ b/client/snowflake.go @@ -271,7 +271,11 @@ func main() { switch methodName { case "snowflake": // TODO: Be able to recover when SOCKS dies. - ln, err := pt.ListenSocks("tcp", "127.0.0.1:0") + listenAddr := "127.0.0.1:0" + if forcedListenAddr := os.Getenv("SNOWFLAKE_TEST_FORCELISTENADDR"); forcedListenAddr != "" { + listenAddr = forcedListenAddr + } + ln, err := pt.ListenSocks("tcp", listenAddr) if err != nil { pt.CmethodError(methodName, err.Error()) break diff --git a/common/messages/client.go b/common/messages/client.go index da6359e5a0dc68c0046090b6f799b68a43eb1551..5d1f0d64b306241f3e2ab82035e48a2bd7460836 100644 --- a/common/messages/client.go +++ b/common/messages/client.go @@ -5,6 +5,7 @@ package messages import ( "bytes" + "encoding/base64" "encoding/json" "fmt" "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/bridgefingerprint" @@ -149,3 +150,31 @@ func DecodeClientPollResponse(data []byte) (*ClientPollResponse, error) { return &message, nil } + +type ClientConnectionMetadata struct { + ClientID []byte `json:"client_id"` +} + +func (meta *ClientConnectionMetadata) EncodeConnectionMetadata() (string, error) { + jsonData, err := json.Marshal(meta) + if err != nil { + return "", err + } + + return base64.RawURLEncoding.EncodeToString(jsonData), nil +} + +func DecodeConnectionMetadata(data string) (*ClientConnectionMetadata, error) { + decodedData, err := base64.RawURLEncoding.DecodeString(data) + if err != nil { + return nil, err + } + + var meta ClientConnectionMetadata + err = json.Unmarshal(decodedData, &meta) + if err != nil { + return nil, err + } + + return &meta, nil +} diff --git a/proxy/lib/snowflake.go b/proxy/lib/snowflake.go index dd53f19e16b321439d29848a2943fa95635fbb04..382e65a1ce4bca62d7e0bda759c44f876c62de51 100644 --- a/proxy/lib/snowflake.go +++ b/proxy/lib/snowflake.go @@ -35,6 +35,7 @@ import ( "net" "net/http" "net/url" + "os" "strings" "sync" "time" @@ -371,6 +372,13 @@ func (sf *SnowflakeProxy) datachannelHandler(conn *webRTCConn, remoteAddr net.Ad log.Printf("no remote address given in websocket") } + { + protocol := conn.GetConnectionProtocol() + q := u.Query() + q.Set("protocol", protocol) + u.RawQuery = q.Encode() + } + ws, _, err := websocket.DefaultDialer.Dial(u.String(), nil) if err != nil { log.Printf("error dialing relay: %s = %s", u.String(), err) @@ -443,7 +451,7 @@ func (sf *SnowflakeProxy) makePeerConnectionFromOffer( close(dataChan) pr, pw := io.Pipe() - conn := newWebRTCConn(pc, dc, pr, sf.bytesLogger) + conn := newWebRTCConn(pc, dc, pr, sf.bytesLogger, dc.Protocol()) dc.SetBufferedAmountLowThreshold(bufferedAmountLowThreshold) @@ -455,7 +463,7 @@ func (sf *SnowflakeProxy) makePeerConnectionFromOffer( }) dc.OnOpen(func() { - log.Printf("Data Channel %s-%d open\n", dc.Label(), dc.ID()) + log.Printf("Data Channel %s-%d;%s open\n", dc.Label(), dc.ID(), dc.Protocol()) if sf.OutboundAddress != "" { selectedCandidatePair, err := pc.SCTP().Transport().ICETransport().GetSelectedCandidatePair() @@ -807,6 +815,11 @@ func (sf *SnowflakeProxy) Stop() { func (sf *SnowflakeProxy) checkNATType(config webrtc.Configuration, probeURL string) error { log.Printf("Checking our NAT type, contacting NAT check probe server at \"%v\"...", probeURL) + if os.Getenv("SNOWFLAKE_TEST_ASSUMEUNRESTRICTED") != "" { + currentNATType = NATUnrestricted + return nil + } + probe, err := newSignalingServer(probeURL, false) if err != nil { return fmt.Errorf("Error parsing url: %w", err) diff --git a/proxy/lib/webrtcconn.go b/proxy/lib/webrtcconn.go index 9ad88b53c24a03e50dfba0a4c55f3d9fea75f00f..d75578db241f03a37220c4cc52f41b92114f2cee 100644 --- a/proxy/lib/webrtcconn.go +++ b/proxy/lib/webrtcconn.go @@ -41,9 +41,14 @@ type webRTCConn struct { cancelTimeoutLoop context.CancelFunc bytesLogger bytesLogger + + // protocol reflect the protocol field in the channel opening + // message of Data Channel Establishment Protocol. + // In snowflake it is used to transmit connection metadata. + protocol string } -func newWebRTCConn(pc *webrtc.PeerConnection, dc *webrtc.DataChannel, pr *io.PipeReader, bytesLogger bytesLogger) *webRTCConn { +func newWebRTCConn(pc *webrtc.PeerConnection, dc *webrtc.DataChannel, pr *io.PipeReader, bytesLogger bytesLogger, protocol string) *webRTCConn { conn := &webRTCConn{pc: pc, dc: dc, pr: pr, bytesLogger: bytesLogger} conn.isClosing = false conn.activity = make(chan struct{}, 100) @@ -51,6 +56,7 @@ func newWebRTCConn(pc *webrtc.PeerConnection, dc *webrtc.DataChannel, pr *io.Pip conn.inactivityTimeout = 30 * time.Second ctx, cancel := context.WithCancel(context.Background()) conn.cancelTimeoutLoop = cancel + conn.protocol = protocol go conn.timeoutLoop(ctx) return conn } @@ -137,6 +143,10 @@ func (c *webRTCConn) SetWriteDeadline(t time.Time) error { return fmt.Errorf("SetWriteDeadline not implemented") } +func (c *webRTCConn) GetConnectionProtocol() string { + return c.protocol +} + func remoteIPFromSDP(str string) net.IP { // Look for remote IP in "a=candidate" attribute fields // https://tools.ietf.org/html/rfc5245#section-15.1 diff --git a/server/lib/http.go b/server/lib/http.go index 403aeb17dcd6702df4258d11f9ed9a2c09a406f9..6da2d0fdb75343ea552edac38a3fece171ca4b7c 100644 --- a/server/lib/http.go +++ b/server/lib/http.go @@ -1,13 +1,11 @@ package snowflake_server import ( - "bufio" - "bytes" "crypto/hmac" "crypto/rand" "crypto/sha256" "encoding/binary" - "fmt" + "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/messages" "io" "log" "net" @@ -17,7 +15,6 @@ import ( "github.com/gorilla/websocket" - "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/encapsulation" "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/turbotunnel" "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/websocketconn" ) @@ -32,7 +29,7 @@ const requestTimeout = 10 * time.Second const clientMapTimeout = 1 * time.Minute // How big to make the map of ClientIDs to IP addresses. The map is used in -// turbotunnelMode to store a reasonable IP address for a client session that +// turboTunnelUDPLikeMode to store a reasonable IP address for a client session that // may outlive any single WebSocket connection. const clientIDAddrMapCapacity = 98304 @@ -108,47 +105,25 @@ func (handler *httpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Pass the address of client as the remote address of incoming connection clientIPParam := r.URL.Query().Get("client_ip") addr := clientAddr(clientIPParam) + protocol := r.URL.Query().Get("protocol") - var token [len(turbotunnel.Token)]byte - _, err = io.ReadFull(conn, token[:]) - if err != nil { - // Don't bother logging EOF: that happens with an unused - // connection, which clients make frequently as they maintain a - // pool of proxies. - if err != io.EOF { - log.Printf("reading token: %v", err) - } - return - } - - switch { - case bytes.Equal(token[:], turbotunnel.Token[:]): - err = handler.turbotunnelMode(conn, addr) - default: - // We didn't find a matching token, which means that we are - // dealing with a client that doesn't know about such things. - // Close the conn as we no longer support the old - // one-session-per-WebSocket mode. - log.Println("Received unsupported oneshot connection") - return - } - if err != nil { + err = handler.turboTunnelUDPLikeMode(conn, addr, protocol) + if err != nil && err != io.EOF { log.Println(err) return } } -// turbotunnelMode handles clients that sent turbotunnel.Token at the start of -// their stream. These clients expect to send and receive encapsulated packets, -// with a long-lived session identified by ClientID. -func (handler *httpHandler) turbotunnelMode(conn net.Conn, addr net.Addr) error { - // Read the ClientID prefix. Every packet encapsulated in this WebSocket - // connection pertains to the same ClientID. - var clientID turbotunnel.ClientID - _, err := io.ReadFull(conn, clientID[:]) +func (handler *httpHandler) turboTunnelUDPLikeMode(conn net.Conn, addr net.Addr, protocol string) error { + // Read the ClientID from the WebRTC data channel protocol string. Every + // packet received on this WebSocket connection pertains to the same + // ClientID. + clientID := turbotunnel.ClientID{} + metaData, err := messages.DecodeConnectionMetadata(protocol) if err != nil { - return fmt.Errorf("reading ClientID: %w", err) + return err } + copy(clientID[:], metaData.ClientID[:]) // Store a short-term mapping from the ClientID to the client IP // address attached to this WebSocket connection. tor will want us to @@ -167,8 +142,8 @@ func (handler *httpHandler) turbotunnelMode(conn net.Conn, addr net.Addr) error wg.Add(2) done := make(chan struct{}) - // The remainder of the WebSocket stream consists of encapsulated - // packets. We read them one by one and feed them into the + // The remainder of the WebSocket stream consists of packets, one packet + // per WebSocket message. We read them one by one and feed them into the // QueuePacketConn on which kcp.ServeConn was set up, which eventually // leads to KCP-level sessions in the acceptSessions function. go func() { @@ -176,11 +151,9 @@ func (handler *httpHandler) turbotunnelMode(conn net.Conn, addr net.Addr) error defer close(done) // Signal the write loop to finish var p [2048]byte for { - n, err := encapsulation.ReadData(conn, p[:]) - if err == io.ErrShortBuffer { - err = nil - } + n, err := conn.Read(p[:]) if err != nil { + log.Println(err) return } pconn.QueueIncoming(p[:n], clientID) @@ -192,10 +165,6 @@ func (handler *httpHandler) turbotunnelMode(conn net.Conn, addr net.Addr) error go func() { defer wg.Done() defer conn.Close() // Signal the read loop to finish - - // Buffer encapsulation.WriteData operations to keep length - // prefixes in the same send as the data that follows. - bw := bufio.NewWriter(conn) for { select { case <-done: @@ -204,12 +173,10 @@ func (handler *httpHandler) turbotunnelMode(conn net.Conn, addr net.Addr) error if !ok { return } - _, err := encapsulation.WriteData(bw, p) + _, err := conn.Write(p) pconn.Restore(p) - if err == nil { - err = bw.Flush() - } if err != nil { + log.Println(err) return } } diff --git a/server/lib/snowflake.go b/server/lib/snowflake.go index bcf9dd68e607806a2151528fefaf1789afa07e53..b158f035afc0abe869d84e69474e9ad632ee5c59 100644 --- a/server/lib/snowflake.go +++ b/server/lib/snowflake.go @@ -41,6 +41,7 @@ import ( "log" "net" "net/http" + "os" "sync" "time" @@ -253,7 +254,7 @@ func (l *SnowflakeListener) acceptSessions(ln *kcp.Listener) error { return err } // Permit coalescing the payloads of consecutive sends. - conn.SetStreamMode(true) + conn.SetStreamMode(false) // Set the maximum send and receive window sizes to a high number // Removes KCP bottlenecks: https://gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/-/issues/40026 conn.SetWindowSize(WindowSize, WindowSize) @@ -265,6 +266,14 @@ func (l *SnowflakeListener) acceptSessions(ln *kcp.Listener) error { 0, // default resend 1, // nc=1 => congestion window off ) + if os.Getenv("SNOWFLAKE_TEST_KCP_FAST3MODE") == "1" { + conn.SetNoDelay( + 1, + 10, + 2, + 1, + ) + } go func() { defer conn.Close() err := l.acceptStreams(conn)