diff --git a/src/go/wsl-helper/pkg/dockerproxy/platform/serve_windows.go b/src/go/wsl-helper/pkg/dockerproxy/platform/serve_windows.go index c3592ec5e8c..45ef00e29e6 100644 --- a/src/go/wsl-helper/pkg/dockerproxy/platform/serve_windows.go +++ b/src/go/wsl-helper/pkg/dockerproxy/platform/serve_windows.go @@ -81,7 +81,12 @@ func Listen(endpoint string) (net.Listener, error) { return nil, fmt.Errorf("endpoint %s does not start with protocol %s", endpoint, prefix) } - listener, err := winio.ListenPipe(endpoint[len(prefix):], nil) + // Configure pipe in MessageMode to support Docker's half-close semantics + // - Enables zero-byte writes as EOF signals (CloseWrite) + // - Crucial for stdin stream termination in interactive containers + pipeConfig := &winio.PipeConfig{MessageMode: true} + + listener, err := winio.ListenPipe(endpoint[len(prefix):], pipeConfig) if err != nil { return nil, fmt.Errorf("could not listen on %s: %w", endpoint, err) } diff --git a/src/go/wsl-helper/pkg/dockerproxy/serve.go b/src/go/wsl-helper/pkg/dockerproxy/serve.go index 287a9c950f8..40e2a472623 100644 --- a/src/go/wsl-helper/pkg/dockerproxy/serve.go +++ b/src/go/wsl-helper/pkg/dockerproxy/serve.go @@ -23,10 +23,8 @@ import ( "encoding/json" "fmt" "io" - "log" "net" "net/http" - "net/http/httputil" "os" "os/signal" "regexp" @@ -38,6 +36,7 @@ import ( "github.com/rancher-sandbox/rancher-desktop/src/go/wsl-helper/pkg/dockerproxy/models" "github.com/rancher-sandbox/rancher-desktop/src/go/wsl-helper/pkg/dockerproxy/platform" + "github.com/rancher-sandbox/rancher-desktop/src/go/wsl-helper/pkg/dockerproxy/util" ) // RequestContextValue contains things we attach to incoming requests @@ -74,7 +73,10 @@ func Serve(endpoint string, dialer func() (net.Conn, error)) error { logWriter := logrus.StandardLogger().Writer() defer logWriter.Close() munger := newRequestMunger() - proxy := &httputil.ReverseProxy{ + proxy := util.ReverseProxy{ + Dial: func(string, string) (net.Conn, error) { + return dialer() + }, Director: func(req *http.Request) { logrus.WithField("request", req). WithField("headers", req.Header). @@ -96,12 +98,6 @@ func Serve(endpoint string, dialer func() (net.Conn, error)) error { Error("could not munge request") } }, - Transport: &http.Transport{ - Dial: func(string, string) (net.Conn, error) { - return dialer() - }, - DisableCompression: true, // for debugging - }, ModifyResponse: func(resp *http.Response) error { logEntry := logrus.WithField("response", resp) defer func() { logEntry.Debug("got backend response") }() @@ -124,7 +120,6 @@ func Serve(endpoint string, dialer func() (net.Conn, error)) error { } return nil }, - ErrorLog: log.New(logWriter, "", 0), } server := &http.Server{ diff --git a/src/go/wsl-helper/pkg/dockerproxy/util/pipe.go b/src/go/wsl-helper/pkg/dockerproxy/util/pipe.go index f05a261cdff..e339d766b09 100644 --- a/src/go/wsl-helper/pkg/dockerproxy/util/pipe.go +++ b/src/go/wsl-helper/pkg/dockerproxy/util/pipe.go @@ -20,8 +20,7 @@ import ( "io" ) -// Pipe bidirectionally between two streams. -func Pipe(c1, c2 io.ReadWriteCloser) error { +func Pipe(c1, c2 HalfReadWriteCloser) error { ioCopy := func(reader io.Reader, writer io.Writer) <-chan error { ch := make(chan error) go func() { @@ -33,22 +32,26 @@ func Pipe(c1, c2 io.ReadWriteCloser) error { ch1 := ioCopy(c1, c2) ch2 := ioCopy(c2, c1) - select { - case err := <-ch1: - c1.Close() - c2.Close() - <-ch2 - if err != io.EOF { - return err - } - case err := <-ch2: - c1.Close() - c2.Close() - <-ch1 - if err != io.EOF { - return err + for i := 0; i < 2; i++ { + select { + case err := <-ch1: + c2.CloseWrite() + if err != nil && err != io.EOF { + return err + } + case err := <-ch2: + c1.CloseWrite() + if err != nil && err != io.EOF { + return err + } } } - return nil } + +type HalfReadWriteCloser interface { + // CloseWrite closes the write-side of the connection. + CloseWrite() error + // Write is a passthrough to the underlying connection. + io.ReadWriteCloser +} diff --git a/src/go/wsl-helper/pkg/dockerproxy/util/pipe_test.go b/src/go/wsl-helper/pkg/dockerproxy/util/pipe_test.go index ab9bda1e8e3..fca2a35d6eb 100644 --- a/src/go/wsl-helper/pkg/dockerproxy/util/pipe_test.go +++ b/src/go/wsl-helper/pkg/dockerproxy/util/pipe_test.go @@ -18,55 +18,121 @@ package util import ( "bytes" - "errors" "io" + "sync" "testing" "github.com/stretchr/testify/assert" ) -type nopReadWriteCloser struct { - io.ReadWriter +// bidirectionalHalfClosePipe is a testing utility that simulates a bidirectional pipe +// with the ability to half-close connections. It's designed to mimic scenarios +// like interactive command-line operations where a client can send data and +// then half-close the connection while waiting for a response. +type bidirectionalHalfClosePipe struct { + r io.ReadCloser + w io.WriteCloser } -func (nopReadWriteCloser) Close() error { - return nil +// newBidirectionalHalfClosePipe creates two interconnected bidirectional pipe endpoints. +// +// The function returns two bidirectionalHalfClosePipe instances that are connected +// such that what is written to one's write endpoint can be read from the other's +// read endpoint, and vice versa. +// +// Returns: +// - h1: First bidirectional pipe endpoint +// - h2: Second bidirectional pipe endpoint +func newBidirectionalHalfClosePipe() (h1, h2 *bidirectionalHalfClosePipe) { + pr1, pw1 := io.Pipe() + pr2, pw2 := io.Pipe() + + h1 = &bidirectionalHalfClosePipe{ + r: pr1, w: pw2, + } + + h2 = &bidirectionalHalfClosePipe{ + r: pr2, w: pw1, + } + return } -type passthroughReadWriteCloser struct { - io.ReadCloser - io.WriteCloser +func (h *bidirectionalHalfClosePipe) CloseWrite() error { + return h.w.Close() } -func newPipeReadWriter() io.ReadWriteCloser { - r, w := io.Pipe() - return &passthroughReadWriteCloser{ - ReadCloser: r, - WriteCloser: w, +func (h *bidirectionalHalfClosePipe) Close() error { + wErr := h.w.Close() + rErr := h.r.Close() + + if wErr != nil { + return wErr } + return rErr } -func (p *passthroughReadWriteCloser) Close() error { - err := p.ReadCloser.Close() - if err != nil && !errors.Is(err, io.ErrClosedPipe) { - return err - } - err = p.WriteCloser.Close() - if err != nil && !errors.Is(err, io.ErrClosedPipe) { - return err - } - return nil +func (h *bidirectionalHalfClosePipe) Read(p []byte) (n int, err error) { + return h.r.Read(p) +} + +func (h *bidirectionalHalfClosePipe) Write(p []byte) (n int, err error) { + return h.w.Write(p) } +// TestPipe verifies the functionality of the bidirectional pipe utility. +// +// The test simulates a scenario similar to interactive command execution, +// such as a docker run -i command, which requires bidirectional communication. +// This test case mimics scenarios like: +// - Sending input to a Docker container via stdin +// - Half-closing the input stream +// - Receiving output from the container +// +// The test steps are: +// 1. A client sends data +// 2. The client half-closes the connection +// 3. The server reads the data +// 4. The server sends a return response +// 5. The server half-closes the connection +// +// This approach is particularly relevant for interactive Docker runs where +// the client needs to send input and then wait for the container's response, +// while maintaining the ability to close streams independently. func TestPipe(t *testing.T) { - rw := newPipeReadWriter() - output := bytes.Buffer{} - data := &passthroughReadWriteCloser{ - ReadCloser: nopReadWriteCloser{bytes.NewBufferString("some data")}, - WriteCloser: nopReadWriteCloser{&output}, - } - err := Pipe(rw, data) - if assert.NoError(t, err) { - assert.Equal(t, "some data", output.String()) - } + + h1a, h1b := newBidirectionalHalfClosePipe() + h2a, h2b := newBidirectionalHalfClosePipe() + var wg sync.WaitGroup + wg.Add(2) + + // Goroutine simulating the client-side operation + go func() { + defer wg.Done() + dataToSend := bytes.NewBufferString("some data") + _, err := h1a.Write(dataToSend.Bytes()) + assert.NoError(t, err) + h1a.CloseWrite() + + output, err := io.ReadAll(h1a) + assert.NoError(t, err) + assert.EqualValues(t, output, "return data") + }() + + // Goroutine simulating the server-side operation + go func() { + defer wg.Done() + output, err := io.ReadAll(h2b) + assert.NoError(t, err) + assert.EqualValues(t, output, "some data") + + dataToSend := bytes.NewBufferString("return data") + _, err = h2b.Write(dataToSend.Bytes()) + assert.NoError(t, err) + + h2b.CloseWrite() + }() + + err := Pipe(h1b, h2a) + assert.NoError(t, err) + wg.Wait() } diff --git a/src/go/wsl-helper/pkg/dockerproxy/util/reverse_proxy.go b/src/go/wsl-helper/pkg/dockerproxy/util/reverse_proxy.go new file mode 100644 index 00000000000..595cb91a3ca --- /dev/null +++ b/src/go/wsl-helper/pkg/dockerproxy/util/reverse_proxy.go @@ -0,0 +1,239 @@ +package util + +import ( + "bufio" + "context" + "io" + "log" + "net" + "net/http" + "time" +) + +const ( + hostHeaderValue = "api.moby.localhost" + targetProtocol = "http://" +) + +// ReverseProxy is a custom reverse proxy specifically designed for Rancher Desktop's +// Docker API communication. Unlike the standard library's ReverseProxy, this +// implementation provides explicit support for half-close connections and +// HTTP protocol upgrades required by the Docker API. +// +// Key design features: +// - Handles HTTP protocol upgrades (WebSocket-like connections) +// - Supports half-close TCP connections +// - Provides hooks for request/response modification +// - Designed for specific Docker API interaction requirements +type ReverseProxy struct { + // Dial provides a custom connection establishment method + Dial func(network, addr string) (net.Conn, error) + // Director allows modification of the outgoing request before forwarding + Director func(*http.Request) + // ModifyResponse enables post-processing of the backend response + ModifyResponse func(*http.Response) error +} + +// ServeHTTP implements the http.Handler interface, routing incoming +// HTTP requests through the custom reverse proxy +func (proxy ReverseProxy) ServeHTTP(rw http.ResponseWriter, r *http.Request) { + proxy.forwardRequest(rw, r) +} + +// forwardRequest is the core method that handles request proxying, +// with special handling for Docker API-specific requirements. +// +// Primary responsibilities: +// - Establish backend connection +// - Forward request to backend +// - Handle response streaming +// - Support protocol upgrades +// - Ensure proper connection management +func (proxy *ReverseProxy) forwardRequest(w http.ResponseWriter, r *http.Request) { + + // periodicHttpFlush is a critical component for supporting + // long-running, streaming connections like "docker log -f" + periodicHttpFlush := func(w http.ResponseWriter, ctx context.Context) { + + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + // Validate flushing capability of the ResponseWriter + flusher, ok := w.(http.Flusher) + if !ok { + log.Println("error: ResponseWriter does not support http.Flusher") + return + } + + // Continuous flushing loop with context-aware cancellation + for { + select { + case <-ctx.Done(): + // Context cancellation stops the flushing + return + case <-ticker.C: + select { + case <-ctx.Done(): + return + default: + flusher.Flush() + } + } + } + } + + // Leverage the original request's context as the base + ctx := r.Context() + + // Create a new context with cancellation to ensure we can stop the flush + // The context will be canceled when the request is done or if needed earlier + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // Establish a connection to the backend using a custom Dial method + backendConn, err := proxy.Dial("", "") + if err != nil { + http.Error(w, err.Error(), http.StatusBadGateway) + return + } + defer backendConn.Close() + + // Create a new HTTP request with the same headers + url := targetProtocol + hostHeaderValue + r.RequestURI + newReq, err := http.NewRequest(r.Method, url, r.Body) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + newReq.Header = r.Header + + // Director function + // Allows complete customization of the outgoing request + if proxy.Director != nil { + proxy.Director(newReq) + } + // Prevent automatic connection closure + newReq.Close = false + + // Forward the modified request to the backend + newReq.Write(backendConn) + + // Read the response from the backend + backendResponse, err := http.ReadResponse(bufio.NewReader(backendConn), newReq) + if err != nil { + http.Error(w, err.Error(), http.StatusBadGateway) + return + } + defer backendResponse.Body.Close() + + // ModifyResponse function + // Allows post-processing of the backend response + if proxy.ModifyResponse != nil { + err := proxy.ModifyResponse(backendResponse) + if err != nil { + http.Error(w, err.Error(), http.StatusBadGateway) + return + } + } + + // Propagate backend response headers to the client + for key, values := range backendResponse.Header { + for _, value := range values { + w.Header().Add(key, value) + } + } + + // Write the response status code and headers and flush it immediately + w.WriteHeader(backendResponse.StatusCode) + flusher, ok := w.(http.Flusher) + if !ok { + panic("expected http.ResponseWriter to be an http.Flusher") + } + flusher.Flush() + + // Check if the response has a status code of 101 (Switching Protocols) + if backendResponse.StatusCode == http.StatusSwitchingProtocols { + proxy.handleUpgradedConnection(w, backendConn) + return + } + + // Start periodic flushing in a background goroutine + // Supports long-running, streaming responses + go periodicHttpFlush(w, ctx) + + // Stream the response body back to the client + _, err = io.Copy(w, backendResponse.Body) + if err != nil { + return + } + +} + +// handleUpgradedConnection manages HTTP protocol upgrades (e.g., WebSocket), +// specifically tailored for Docker API's hijacking mechanism. +// +// This method: +// - Hijacks the existing connection +// - Manages buffered data +// - Enables bidirectional communication after protocol upgrade +func (*ReverseProxy) handleUpgradedConnection(w http.ResponseWriter, backendConn net.Conn) { + // Create a ResponseController to safely hijack the connection + rc := http.NewResponseController(w) + + // Hijack attempts to take control of the underlying connection + // Returns: + // - clientConn: The raw network connection + // - bufferedClientConn: A buffered reader/writer for any pending data + clientConn, bufferedClientConn, err := rc.Hijack() + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + defer clientConn.Close() + + // Flush any buffered data in the writer to ensure no data is lost + if bufferedClientConn.Writer.Buffered() > 0 { + if err := bufferedClientConn.Writer.Flush(); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } + + // Process any data already buffered in the reader before full duplex communication + // This prevents losing any data that might have been read but not yet processed + if bufferedLen := bufferedClientConn.Reader.Buffered(); bufferedLen > 0 { + bufferedData := make([]byte, bufferedLen) + _, err := bufferedClientConn.Reader.Read(bufferedData) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + _, err = backendConn.Write(bufferedData) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } + + // Cast backend and client connections to HalfReadWriteCloser + var xBackendConn HalfReadWriteCloser + var xClientConn HalfReadWriteCloser + if x, ok := backendConn.(HalfReadWriteCloser); !ok { + http.Error(w, "backend connection does not implement HalfReadCloseWriter", http.StatusInternalServerError) + return + } else { + xBackendConn = x + } + if x, ok := clientConn.(HalfReadWriteCloser); !ok { + http.Error(w, "client connection does not implement HalfReadCloseWriter", http.StatusInternalServerError) + return + } else { + xClientConn = x + } + + // Establish a bidirectional pipe between client and backend connections + // This allows full-duplex communication with support for half-closes + // Critical for Docker API's stream-based communication model + Pipe(xClientConn, xBackendConn) + +}