Skip to content

Commit

Permalink
Fix: docker exec hangs indefinitely when reading from stdin
Browse files Browse the repository at this point in the history
This commit fixes the improper handling of half-closed connection behavior when attaching stdin to a container.
The proper approach requires:
- The docker client receives a half-close of the connection from client to daemon (stdin)
- The connection from daemon to client (stdout, stderr) remains open until no more data is to be received

The first part of the issue is resolved by enabling MessageMode on Windows named pipes.
This ensures that when stdin ends, an EOF is received on the reader, allowing proper half-closing of the connection.

The previous implementation used stdlib's httputil.DockerProxy to proxy from Windows named pipe to WSL hvsock.
This proxy does not use CloseWrite on hijacked connections, preventing proper handling of this use case.
By using Close() instead of CloseWrite(), both connection directions are closed simultaneously,
causing the client to miss pending stdout/stderr content after stdin EOF.

This commit introduces a simpler custom `util.ReverseProxy` to replace httputil.DockerProxy, implementing proper handling
of half-closed connections with explicit support for:
- HTTP protocol upgrades
- Half-close TCP connection management
- Precise stream handling for Docker API interactions

Closes #2094

Signed-off-by: Carlos Barcenilla <carlos@barcenilla.com.ar>
  • Loading branch information
bcxpro committed Dec 8, 2024
1 parent 9539c7a commit f4d9c09
Show file tree
Hide file tree
Showing 5 changed files with 369 additions and 61 deletions.
7 changes: 6 additions & 1 deletion src/go/wsl-helper/pkg/dockerproxy/platform/serve_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,12 @@ func Listen(endpoint string) (net.Listener, error) {
return nil, fmt.Errorf("endpoint %s does not start with protocol %s", endpoint, prefix)
}

listener, err := winio.ListenPipe(endpoint[len(prefix):], nil)
// Configure pipe in MessageMode to support Docker's half-close semantics
// - Enables zero-byte writes as EOF signals (CloseWrite)
// - Crucial for stdin stream termination in interactive containers
pipeConfig := &winio.PipeConfig{MessageMode: true}

listener, err := winio.ListenPipe(endpoint[len(prefix):], pipeConfig)
if err != nil {
return nil, fmt.Errorf("could not listen on %s: %w", endpoint, err)
}
Expand Down
15 changes: 5 additions & 10 deletions src/go/wsl-helper/pkg/dockerproxy/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@ import (
"encoding/json"
"fmt"
"io"
"log"
"net"
"net/http"
"net/http/httputil"
"os"
"os/signal"
"regexp"
Expand All @@ -38,6 +36,7 @@ import (

"github.com/rancher-sandbox/rancher-desktop/src/go/wsl-helper/pkg/dockerproxy/models"
"github.com/rancher-sandbox/rancher-desktop/src/go/wsl-helper/pkg/dockerproxy/platform"
"github.com/rancher-sandbox/rancher-desktop/src/go/wsl-helper/pkg/dockerproxy/util"
)

// RequestContextValue contains things we attach to incoming requests
Expand Down Expand Up @@ -74,7 +73,10 @@ func Serve(endpoint string, dialer func() (net.Conn, error)) error {
logWriter := logrus.StandardLogger().Writer()
defer logWriter.Close()
munger := newRequestMunger()
proxy := &httputil.ReverseProxy{
proxy := util.ReverseProxy{
Dial: func(string, string) (net.Conn, error) {
return dialer()
},
Director: func(req *http.Request) {
logrus.WithField("request", req).
WithField("headers", req.Header).
Expand All @@ -96,12 +98,6 @@ func Serve(endpoint string, dialer func() (net.Conn, error)) error {
Error("could not munge request")
}
},
Transport: &http.Transport{
Dial: func(string, string) (net.Conn, error) {
return dialer()
},
DisableCompression: true, // for debugging
},
ModifyResponse: func(resp *http.Response) error {
logEntry := logrus.WithField("response", resp)
defer func() { logEntry.Debug("got backend response") }()
Expand All @@ -124,7 +120,6 @@ func Serve(endpoint string, dialer func() (net.Conn, error)) error {
}
return nil
},
ErrorLog: log.New(logWriter, "", 0),
}

server := &http.Server{
Expand Down
37 changes: 20 additions & 17 deletions src/go/wsl-helper/pkg/dockerproxy/util/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ import (
"io"
)

// Pipe bidirectionally between two streams.
func Pipe(c1, c2 io.ReadWriteCloser) error {
func Pipe(c1, c2 HalfReadWriteCloser) error {
ioCopy := func(reader io.Reader, writer io.Writer) <-chan error {
ch := make(chan error)
go func() {
Expand All @@ -33,22 +32,26 @@ func Pipe(c1, c2 io.ReadWriteCloser) error {

ch1 := ioCopy(c1, c2)
ch2 := ioCopy(c2, c1)
select {
case err := <-ch1:
c1.Close()
c2.Close()
<-ch2
if err != io.EOF {
return err
}
case err := <-ch2:
c1.Close()
c2.Close()
<-ch1
if err != io.EOF {
return err
for i := 0; i < 2; i++ {
select {
case err := <-ch1:
c2.CloseWrite()

Check failure on line 38 in src/go/wsl-helper/pkg/dockerproxy/util/pipe.go

View workflow job for this annotation

GitHub Actions / lint (macos-latest)

Error return value of `c2.CloseWrite` is not checked (errcheck)

Check failure on line 38 in src/go/wsl-helper/pkg/dockerproxy/util/pipe.go

View workflow job for this annotation

GitHub Actions / lint (macos-latest)

Error return value of `c2.CloseWrite` is not checked (errcheck)
if err != nil && err != io.EOF {
return err
}
case err := <-ch2:
c1.CloseWrite()

Check failure on line 43 in src/go/wsl-helper/pkg/dockerproxy/util/pipe.go

View workflow job for this annotation

GitHub Actions / lint (macos-latest)

Error return value of `c1.CloseWrite` is not checked (errcheck)

Check failure on line 43 in src/go/wsl-helper/pkg/dockerproxy/util/pipe.go

View workflow job for this annotation

GitHub Actions / lint (macos-latest)

Error return value of `c1.CloseWrite` is not checked (errcheck)
if err != nil && err != io.EOF {
return err
}
}
}

return nil
}

type HalfReadWriteCloser interface {
// CloseWrite closes the write-side of the connection.
CloseWrite() error
// Write is a passthrough to the underlying connection.
io.ReadWriteCloser
}
132 changes: 99 additions & 33 deletions src/go/wsl-helper/pkg/dockerproxy/util/pipe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,55 +18,121 @@ package util

import (
"bytes"
"errors"
"io"
"sync"
"testing"

"github.com/stretchr/testify/assert"
)

type nopReadWriteCloser struct {
io.ReadWriter
// bidirectionalHalfClosePipe is a testing utility that simulates a bidirectional pipe
// with the ability to half-close connections. It's designed to mimic scenarios
// like interactive command-line operations where a client can send data and
// then half-close the connection while waiting for a response.
type bidirectionalHalfClosePipe struct {
r io.ReadCloser
w io.WriteCloser
}

func (nopReadWriteCloser) Close() error {
return nil
// newBidirectionalHalfClosePipe creates two interconnected bidirectional pipe endpoints.
//
// The function returns two bidirectionalHalfClosePipe instances that are connected
// such that what is written to one's write endpoint can be read from the other's
// read endpoint, and vice versa.
//
// Returns:
// - h1: First bidirectional pipe endpoint
// - h2: Second bidirectional pipe endpoint
func newBidirectionalHalfClosePipe() (h1, h2 *bidirectionalHalfClosePipe) {
pr1, pw1 := io.Pipe()
pr2, pw2 := io.Pipe()

h1 = &bidirectionalHalfClosePipe{
r: pr1, w: pw2,
}

h2 = &bidirectionalHalfClosePipe{
r: pr2, w: pw1,
}
return
}

type passthroughReadWriteCloser struct {
io.ReadCloser
io.WriteCloser
func (h *bidirectionalHalfClosePipe) CloseWrite() error {
return h.w.Close()
}

func newPipeReadWriter() io.ReadWriteCloser {
r, w := io.Pipe()
return &passthroughReadWriteCloser{
ReadCloser: r,
WriteCloser: w,
func (h *bidirectionalHalfClosePipe) Close() error {
wErr := h.w.Close()
rErr := h.r.Close()

if wErr != nil {
return wErr
}
return rErr
}

func (p *passthroughReadWriteCloser) Close() error {
err := p.ReadCloser.Close()
if err != nil && !errors.Is(err, io.ErrClosedPipe) {
return err
}
err = p.WriteCloser.Close()
if err != nil && !errors.Is(err, io.ErrClosedPipe) {
return err
}
return nil
func (h *bidirectionalHalfClosePipe) Read(p []byte) (n int, err error) {
return h.r.Read(p)
}

func (h *bidirectionalHalfClosePipe) Write(p []byte) (n int, err error) {
return h.w.Write(p)
}

// TestPipe verifies the functionality of the bidirectional pipe utility.
//
// The test simulates a scenario similar to interactive command execution,
// such as a docker run -i command, which requires bidirectional communication.
// This test case mimics scenarios like:
// - Sending input to a Docker container via stdin
// - Half-closing the input stream
// - Receiving output from the container
//
// The test steps are:
// 1. A client sends data
// 2. The client half-closes the connection
// 3. The server reads the data
// 4. The server sends a return response
// 5. The server half-closes the connection
//
// This approach is particularly relevant for interactive Docker runs where
// the client needs to send input and then wait for the container's response,
// while maintaining the ability to close streams independently.
func TestPipe(t *testing.T) {

Check failure on line 101 in src/go/wsl-helper/pkg/dockerproxy/util/pipe_test.go

View workflow job for this annotation

GitHub Actions / lint (macos-latest)

unnecessary leading newline (whitespace)

Check failure on line 101 in src/go/wsl-helper/pkg/dockerproxy/util/pipe_test.go

View workflow job for this annotation

GitHub Actions / lint (macos-latest)

unnecessary leading newline (whitespace)
rw := newPipeReadWriter()
output := bytes.Buffer{}
data := &passthroughReadWriteCloser{
ReadCloser: nopReadWriteCloser{bytes.NewBufferString("some data")},
WriteCloser: nopReadWriteCloser{&output},
}
err := Pipe(rw, data)
if assert.NoError(t, err) {
assert.Equal(t, "some data", output.String())
}

h1a, h1b := newBidirectionalHalfClosePipe()
h2a, h2b := newBidirectionalHalfClosePipe()
var wg sync.WaitGroup
wg.Add(2)

// Goroutine simulating the client-side operation
go func() {
defer wg.Done()
dataToSend := bytes.NewBufferString("some data")
_, err := h1a.Write(dataToSend.Bytes())
assert.NoError(t, err)
h1a.CloseWrite()

output, err := io.ReadAll(h1a)
assert.NoError(t, err)
assert.EqualValues(t, output, "return data")
}()

// Goroutine simulating the server-side operation
go func() {
defer wg.Done()
output, err := io.ReadAll(h2b)
assert.NoError(t, err)
assert.EqualValues(t, output, "some data")

dataToSend := bytes.NewBufferString("return data")
_, err = h2b.Write(dataToSend.Bytes())
assert.NoError(t, err)

h2b.CloseWrite()
}()

err := Pipe(h1b, h2a)
assert.NoError(t, err)
wg.Wait()
}
Loading

0 comments on commit f4d9c09

Please sign in to comment.