From be2bb5d15f409b3bc6ad2532eb79163739edbcab Mon Sep 17 00:00:00 2001 From: Nikolai Kabanenkov <58106770+nikolaikabanenkov@users.noreply.github.com> Date: Sun, 12 Jan 2025 13:07:33 +0500 Subject: [PATCH 1/4] feat: HTTP CONNECT client --- x/httpproxy/connect_client.go | 109 +++++++++++++++++++++++++++++ x/httpproxy/connect_client_test.go | 59 ++++++++++++++++ 2 files changed, 168 insertions(+) create mode 100644 x/httpproxy/connect_client.go create mode 100644 x/httpproxy/connect_client_test.go diff --git a/x/httpproxy/connect_client.go b/x/httpproxy/connect_client.go new file mode 100644 index 00000000..a4875446 --- /dev/null +++ b/x/httpproxy/connect_client.go @@ -0,0 +1,109 @@ +// Copyright 2023 The Outline Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package httpproxy + +import ( + "bufio" + "context" + "errors" + "fmt" + "github.com/Jigsaw-Code/outline-sdk/transport" + "net" + "net/http" +) + +// connectClient is a [transport.StreamDialer] implementation that sends a CONNECT request +type connectClient struct { + endpoint transport.StreamEndpoint + + // proxyAuth is the Proxy-Authorization header value. If empty, the header is not sent + proxyAuth string +} + +var _ transport.StreamDialer = (*connectClient)(nil) + +type ConnectClientOption func(c *connectClient) + +func NewConnectClient(endpoint transport.StreamEndpoint, opts ...ConnectClientOption) (transport.StreamDialer, error) { + if endpoint == nil { + return nil, errors.New("endpoint must not be nil") + } + + cc := &connectClient{ + endpoint: endpoint, + } + + for _, opt := range opts { + opt(cc) + } + + return cc, nil +} + +// WithProxyAuthorization - sets the Proxy-Authorization header value +func WithProxyAuthorization(proxyAuth string) ConnectClientOption { + return func(c *connectClient) { + c.proxyAuth = proxyAuth + } +} + +// DialStream - connects using the endpoint and sends a CONNECT request to the remoteAddr, closes the connection if the request fails +func (c *connectClient) DialStream(ctx context.Context, remoteAddr string) (transport.StreamConn, error) { + conn, err := c.endpoint.ConnectStream(ctx) + if err != nil { + return nil, fmt.Errorf("failed to connect to endpoint: %w", err) + } + + err = c.sendConnectRequest(ctx, remoteAddr, conn) + if err != nil { + conn.Close() + return nil, fmt.Errorf("sendConnectRequest: %w", err) + } + + return conn, nil +} + +func (c *connectClient) sendConnectRequest(ctx context.Context, remoteAddr string, conn transport.StreamConn) error { + _, _, err := net.SplitHostPort(remoteAddr) + if err != nil { + return fmt.Errorf("failed to parse remote address: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, http.MethodConnect, "http://"+remoteAddr, nil) + if err != nil { + return fmt.Errorf("failed to create request: %w", err) + } + + if c.proxyAuth != "" { + req.Header.Add("Proxy-Authorization", c.proxyAuth) + } + + err = req.Write(conn) + if err != nil { + return fmt.Errorf("failed to write request: %w", err) + } + + resp, err := http.ReadResponse(bufio.NewReader(conn), req) + if err != nil { + return fmt.Errorf("failed to read response: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("unexpected status code: %d", resp.StatusCode) + } + + return nil +} diff --git a/x/httpproxy/connect_client_test.go b/x/httpproxy/connect_client_test.go new file mode 100644 index 00000000..f6e3ac26 --- /dev/null +++ b/x/httpproxy/connect_client_test.go @@ -0,0 +1,59 @@ +// Copyright 2023 The Outline Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package httpproxy + +import ( + "context" + "encoding/base64" + "github.com/Jigsaw-Code/outline-sdk/transport" + "github.com/stretchr/testify/require" + "net" + "net/http" + "net/http/httptest" + "net/url" + "testing" +) + +func TestConnectClient(t *testing.T) { + t.Parallel() + + host := "host:1234" + creds := base64.StdEncoding.EncodeToString([]byte("username:password")) + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, http.MethodConnect, r.Method, "Method") + require.Equal(t, host, r.Host, "Host") + require.Equal(t, []string{"Basic " + creds}, r.Header["Proxy-Authorization"], "Proxy-Authorization") + w.WriteHeader(http.StatusOK) + _, err := w.Write([]byte("HTTP/1.1 200 Connection established\r\n\r\n")) + require.NoError(t, err, "Write") + })) + defer srv.Close() + + u, err := url.Parse(srv.URL) + require.NoError(t, err, "Parse") + + endpoint := &transport.TCPEndpoint{ + Dialer: net.Dialer{}, + Address: u.Host, + } + + connClient, err := NewConnectClient(endpoint, WithProxyAuthorization("Basic "+creds)) + require.NoError(t, err, "NewConnectClient") + + streamConn, err := connClient.DialStream(context.Background(), host) + require.NoError(t, err, "DialStream") + require.NotNil(t, streamConn, "StreamConn") +} From 474fa650c11b1a8160dfb64cae4436571828b076 Mon Sep 17 00:00:00 2001 From: Nikolai Kabanenkov <58106770+nikolaikabanenkov@users.noreply.github.com> Date: Sun, 12 Jan 2025 13:07:33 +0500 Subject: [PATCH 2/4] feat: HTTP CONNECT client (review) --- x/httpconnect/connect_client.go | 130 +++++++++++++++++++++++++++ x/httpconnect/connect_client_test.go | 101 +++++++++++++++++++++ x/httpconnect/pipe_conn.go | 35 ++++++++ x/httpproxy/connect_client.go | 109 ---------------------- x/httpproxy/connect_client_test.go | 59 ------------ 5 files changed, 266 insertions(+), 168 deletions(-) create mode 100644 x/httpconnect/connect_client.go create mode 100644 x/httpconnect/connect_client_test.go create mode 100644 x/httpconnect/pipe_conn.go delete mode 100644 x/httpproxy/connect_client.go delete mode 100644 x/httpproxy/connect_client_test.go diff --git a/x/httpconnect/connect_client.go b/x/httpconnect/connect_client.go new file mode 100644 index 00000000..017ee17c --- /dev/null +++ b/x/httpconnect/connect_client.go @@ -0,0 +1,130 @@ +// Copyright 2023 The Outline Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package httpconnect + +import ( + "context" + "errors" + "fmt" + "github.com/Jigsaw-Code/outline-sdk/transport" + "io" + "net" + "net/http" +) + +// connectClient is a [transport.StreamDialer] implementation that dials [proxyAddr] with the given [dialer] +// and sends a CONNECT request to the dialed proxy. +type connectClient struct { + dialer transport.StreamDialer + proxyAddr string + + headers http.Header +} + +var _ transport.StreamDialer = (*connectClient)(nil) + +type ConnectClientOption func(c *connectClient) + +func NewConnectClient(dialer transport.StreamDialer, proxyAddr string, opts ...ConnectClientOption) (transport.StreamDialer, error) { + if dialer == nil { + return nil, errors.New("dialer must not be nil") + } + _, _, err := net.SplitHostPort(proxyAddr) + if err != nil { + return nil, fmt.Errorf("failed to parse proxy address %s: %w", proxyAddr, err) + } + + cc := &connectClient{ + dialer: dialer, + proxyAddr: proxyAddr, + headers: make(http.Header), + } + + for _, opt := range opts { + opt(cc) + } + + return cc, nil +} + +// WithHeaders appends the given [headers] to the CONNECT request +func WithHeaders(headers http.Header) ConnectClientOption { + return func(c *connectClient) { + c.headers = headers + } +} + +// DialStream - connects to the proxy and sends a CONNECT request to it, closes the connection if the request fails +func (cc *connectClient) DialStream(ctx context.Context, remoteAddr string) (transport.StreamConn, error) { + innerConn, err := cc.dialer.DialStream(ctx, cc.proxyAddr) + if err != nil { + return nil, fmt.Errorf("failed to dial proxy %s: %w", cc.proxyAddr, err) + } + + conn, err := cc.doConnect(ctx, remoteAddr, innerConn) + if err != nil { + _ = innerConn.Close() + return nil, fmt.Errorf("doConnect %s: %w", remoteAddr, err) + } + + return conn, nil +} + +func (cc *connectClient) doConnect(ctx context.Context, remoteAddr string, conn transport.StreamConn) (transport.StreamConn, error) { + _, _, err := net.SplitHostPort(remoteAddr) + if err != nil { + return nil, fmt.Errorf("failed to parse remote address %s: %w", remoteAddr, err) + } + + pr, pw := io.Pipe() + + req, err := http.NewRequestWithContext(ctx, http.MethodConnect, "http://"+remoteAddr, pr) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + req.ContentLength = -1 // -1 means length unknown + mergeHeaders(req.Header, cc.headers) + + tr := &http.Transport{ + // TODO: HTTP/2 support with [http2.ConfigureTransport] + DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { + return conn, nil + }, + } + + hc := http.Client{ + Transport: tr, + } + + resp, err := hc.Do(req) + if err != nil { + return nil, fmt.Errorf("do: %w", err) + } + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) + } + + return &PipeConn{ + reader: resp.Body, + writer: pw, + StreamConn: conn, + }, nil +} + +func mergeHeaders(dst http.Header, src http.Header) { + for k, v := range src { + dst[k] = append(dst[k], v...) + } +} diff --git a/x/httpconnect/connect_client_test.go b/x/httpconnect/connect_client_test.go new file mode 100644 index 00000000..7dd29a3d --- /dev/null +++ b/x/httpconnect/connect_client_test.go @@ -0,0 +1,101 @@ +// Copyright 2023 The Outline Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package httpconnect + +import ( + "bufio" + "context" + "encoding/base64" + "github.com/Jigsaw-Code/outline-sdk/transport" + "github.com/stretchr/testify/require" + "io" + "net" + "net/http" + "net/http/httptest" + "net/url" + "testing" +) + +func TestConnectClientOk(t *testing.T) { + t.Parallel() + + creds := base64.StdEncoding.EncodeToString([]byte("username:password")) + + targetSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, http.MethodGet, r.Method, "Method") + w.WriteHeader(http.StatusOK) + _, err := w.Write([]byte("HTTP/1.1 200 OK\r\n")) + require.NoError(t, err) + })) + defer targetSrv.Close() + + targetURL, err := url.Parse(targetSrv.URL) + require.NoError(t, err) + + proxySrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, http.MethodConnect, r.Method, "Method") + require.Equal(t, targetURL.Host, r.Host, "Host") + require.Equal(t, []string{"Basic " + creds}, r.Header["Proxy-Authorization"], "Proxy-Authorization") + + conn, err := net.Dial("tcp", targetURL.Host) + require.NoError(t, err, "Dial") + + w.WriteHeader(http.StatusOK) + _, err = w.Write([]byte("HTTP/1.1 200 Connection established\r\n\r\n")) + require.NoError(t, err, "Write") + + rc := http.NewResponseController(w) + err = rc.Flush() + require.NoError(t, err, "Flush") + + clientConn, _, err := rc.Hijack() + require.NoError(t, err, "Hijack") + + go func() { + _, _ = io.Copy(conn, clientConn) + }() + _, _ = io.Copy(clientConn, conn) + })) + defer proxySrv.Close() + + proxyURL, err := url.Parse(proxySrv.URL) + require.NoError(t, err, "Parse") + + dialer := &transport.TCPDialer{ + Dialer: net.Dialer{}, + } + + connClient, err := NewConnectClient( + dialer, + proxyURL.Host, + WithHeaders(http.Header{"Proxy-Authorization": []string{"Basic " + creds}}), + ) + require.NoError(t, err, "NewConnectClient") + + streamConn, err := connClient.DialStream(context.Background(), targetURL.Host) + require.NoError(t, err, "DialStream") + require.NotNil(t, streamConn, "StreamConn") + + req, err := http.NewRequest(http.MethodGet, targetSrv.URL, nil) + require.NoError(t, err, "NewRequest") + + err = req.Write(streamConn) + require.NoError(t, err, "Write") + + resp, err := http.ReadResponse(bufio.NewReader(streamConn), req) + require.NoError(t, err, "ReadResponse") + + require.Equal(t, http.StatusOK, resp.StatusCode) +} diff --git a/x/httpconnect/pipe_conn.go b/x/httpconnect/pipe_conn.go new file mode 100644 index 00000000..401b7512 --- /dev/null +++ b/x/httpconnect/pipe_conn.go @@ -0,0 +1,35 @@ +// Copyright 2023 The Outline Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package httpconnect + +import ( + "github.com/Jigsaw-Code/outline-sdk/transport" + "io" +) + +// PipeConn is a [transport.StreamConn] that overrides [Read] and [Write] functions with the given [reader] and [writer] +type PipeConn struct { + reader io.Reader + writer io.Writer + transport.StreamConn +} + +func (p *PipeConn) Read(b []byte) (n int, err error) { + return p.reader.Read(b) +} + +func (p *PipeConn) Write(b []byte) (n int, err error) { + return p.writer.Write(b) +} diff --git a/x/httpproxy/connect_client.go b/x/httpproxy/connect_client.go deleted file mode 100644 index a4875446..00000000 --- a/x/httpproxy/connect_client.go +++ /dev/null @@ -1,109 +0,0 @@ -// Copyright 2023 The Outline Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package httpproxy - -import ( - "bufio" - "context" - "errors" - "fmt" - "github.com/Jigsaw-Code/outline-sdk/transport" - "net" - "net/http" -) - -// connectClient is a [transport.StreamDialer] implementation that sends a CONNECT request -type connectClient struct { - endpoint transport.StreamEndpoint - - // proxyAuth is the Proxy-Authorization header value. If empty, the header is not sent - proxyAuth string -} - -var _ transport.StreamDialer = (*connectClient)(nil) - -type ConnectClientOption func(c *connectClient) - -func NewConnectClient(endpoint transport.StreamEndpoint, opts ...ConnectClientOption) (transport.StreamDialer, error) { - if endpoint == nil { - return nil, errors.New("endpoint must not be nil") - } - - cc := &connectClient{ - endpoint: endpoint, - } - - for _, opt := range opts { - opt(cc) - } - - return cc, nil -} - -// WithProxyAuthorization - sets the Proxy-Authorization header value -func WithProxyAuthorization(proxyAuth string) ConnectClientOption { - return func(c *connectClient) { - c.proxyAuth = proxyAuth - } -} - -// DialStream - connects using the endpoint and sends a CONNECT request to the remoteAddr, closes the connection if the request fails -func (c *connectClient) DialStream(ctx context.Context, remoteAddr string) (transport.StreamConn, error) { - conn, err := c.endpoint.ConnectStream(ctx) - if err != nil { - return nil, fmt.Errorf("failed to connect to endpoint: %w", err) - } - - err = c.sendConnectRequest(ctx, remoteAddr, conn) - if err != nil { - conn.Close() - return nil, fmt.Errorf("sendConnectRequest: %w", err) - } - - return conn, nil -} - -func (c *connectClient) sendConnectRequest(ctx context.Context, remoteAddr string, conn transport.StreamConn) error { - _, _, err := net.SplitHostPort(remoteAddr) - if err != nil { - return fmt.Errorf("failed to parse remote address: %w", err) - } - - req, err := http.NewRequestWithContext(ctx, http.MethodConnect, "http://"+remoteAddr, nil) - if err != nil { - return fmt.Errorf("failed to create request: %w", err) - } - - if c.proxyAuth != "" { - req.Header.Add("Proxy-Authorization", c.proxyAuth) - } - - err = req.Write(conn) - if err != nil { - return fmt.Errorf("failed to write request: %w", err) - } - - resp, err := http.ReadResponse(bufio.NewReader(conn), req) - if err != nil { - return fmt.Errorf("failed to read response: %w", err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("unexpected status code: %d", resp.StatusCode) - } - - return nil -} diff --git a/x/httpproxy/connect_client_test.go b/x/httpproxy/connect_client_test.go deleted file mode 100644 index f6e3ac26..00000000 --- a/x/httpproxy/connect_client_test.go +++ /dev/null @@ -1,59 +0,0 @@ -// Copyright 2023 The Outline Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package httpproxy - -import ( - "context" - "encoding/base64" - "github.com/Jigsaw-Code/outline-sdk/transport" - "github.com/stretchr/testify/require" - "net" - "net/http" - "net/http/httptest" - "net/url" - "testing" -) - -func TestConnectClient(t *testing.T) { - t.Parallel() - - host := "host:1234" - creds := base64.StdEncoding.EncodeToString([]byte("username:password")) - - srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - require.Equal(t, http.MethodConnect, r.Method, "Method") - require.Equal(t, host, r.Host, "Host") - require.Equal(t, []string{"Basic " + creds}, r.Header["Proxy-Authorization"], "Proxy-Authorization") - w.WriteHeader(http.StatusOK) - _, err := w.Write([]byte("HTTP/1.1 200 Connection established\r\n\r\n")) - require.NoError(t, err, "Write") - })) - defer srv.Close() - - u, err := url.Parse(srv.URL) - require.NoError(t, err, "Parse") - - endpoint := &transport.TCPEndpoint{ - Dialer: net.Dialer{}, - Address: u.Host, - } - - connClient, err := NewConnectClient(endpoint, WithProxyAuthorization("Basic "+creds)) - require.NoError(t, err, "NewConnectClient") - - streamConn, err := connClient.DialStream(context.Background(), host) - require.NoError(t, err, "DialStream") - require.NotNil(t, streamConn, "StreamConn") -} From 7b36d642b541094503cc1b0bfa0b13235bec2fd0 Mon Sep 17 00:00:00 2001 From: Nikolai Kabanenkov <58106770+nikolaikabanenkov@users.noreply.github.com> Date: Sun, 12 Jan 2025 13:07:33 +0500 Subject: [PATCH 3/4] feat: HTTP CONNECT client (review) --- x/httpconnect/connect_client_test.go | 32 ++++++++++++++++++++++++++++ x/httpconnect/pipe_conn.go | 19 ++++++++++++++--- 2 files changed, 48 insertions(+), 3 deletions(-) diff --git a/x/httpconnect/connect_client_test.go b/x/httpconnect/connect_client_test.go index 7dd29a3d..d7fe2961 100644 --- a/x/httpconnect/connect_client_test.go +++ b/x/httpconnect/connect_client_test.go @@ -99,3 +99,35 @@ func TestConnectClientOk(t *testing.T) { require.Equal(t, http.StatusOK, resp.StatusCode) } + +func TestConnectClientFail(t *testing.T) { + t.Parallel() + + targetURL := "somehost:1234" + + proxySrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, http.MethodConnect, r.Method, "Method") + require.Equal(t, targetURL, r.Host, "Host") + + w.WriteHeader(http.StatusBadRequest) + _, err := w.Write([]byte("HTTP/1.1 400 Bad request\r\n\r\n")) + require.NoError(t, err, "Write") + })) + defer proxySrv.Close() + + proxyURL, err := url.Parse(proxySrv.URL) + require.NoError(t, err, "Parse") + + dialer := &transport.TCPDialer{ + Dialer: net.Dialer{}, + } + + connClient, err := NewConnectClient( + dialer, + proxyURL.Host, + ) + require.NoError(t, err, "NewConnectClient") + + _, err = connClient.DialStream(context.Background(), targetURL) + require.Error(t, err, "unexpected status code: 400") +} diff --git a/x/httpconnect/pipe_conn.go b/x/httpconnect/pipe_conn.go index 401b7512..82ed516b 100644 --- a/x/httpconnect/pipe_conn.go +++ b/x/httpconnect/pipe_conn.go @@ -15,14 +15,15 @@ package httpconnect import ( + "errors" "github.com/Jigsaw-Code/outline-sdk/transport" "io" ) -// PipeConn is a [transport.StreamConn] that overrides [Read] and [Write] functions with the given [reader] and [writer] +// PipeConn is a [transport.StreamConn] that overrides [Read], [Write] (and corresponding [Close]) functions with the given [reader] and [writer] type PipeConn struct { - reader io.Reader - writer io.Writer + reader io.ReadCloser + writer io.WriteCloser transport.StreamConn } @@ -33,3 +34,15 @@ func (p *PipeConn) Read(b []byte) (n int, err error) { func (p *PipeConn) Write(b []byte) (n int, err error) { return p.writer.Write(b) } + +func (p *PipeConn) CloseRead() error { + return errors.Join(p.reader.Close(), p.StreamConn.CloseRead()) +} + +func (p *PipeConn) CloseWrite() error { + return errors.Join(p.writer.Close(), p.StreamConn.CloseWrite()) +} + +func (p *PipeConn) Close() error { + return errors.Join(p.reader.Close(), p.writer.Close(), p.StreamConn.Close()) +} From b9e4f2d8eabbd7a9bbd501a73da4f9ed7e79b0fc Mon Sep 17 00:00:00 2001 From: Nikolai Kabanenkov <58106770+nikolaikabanenkov@users.noreply.github.com> Date: Sun, 12 Jan 2025 13:07:33 +0500 Subject: [PATCH 4/4] feat: HTTP CONNECT client (review) --- x/httpconnect/connect_client.go | 15 ++++----- x/httpconnect/connect_client_test.go | 46 +++++++--------------------- x/httpconnect/doc.go | 16 ++++++++++ x/httpconnect/pipe_conn.go | 18 ++++++----- 4 files changed, 45 insertions(+), 50 deletions(-) create mode 100644 x/httpconnect/doc.go diff --git a/x/httpconnect/connect_client.go b/x/httpconnect/connect_client.go index 017ee17c..03990bc2 100644 --- a/x/httpconnect/connect_client.go +++ b/x/httpconnect/connect_client.go @@ -1,4 +1,4 @@ -// Copyright 2023 The Outline Authors +// Copyright 2025 The Outline Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -35,9 +35,9 @@ type connectClient struct { var _ transport.StreamDialer = (*connectClient)(nil) -type ConnectClientOption func(c *connectClient) +type ClientOption func(c *connectClient) -func NewConnectClient(dialer transport.StreamDialer, proxyAddr string, opts ...ConnectClientOption) (transport.StreamDialer, error) { +func NewConnectClient(dialer transport.StreamDialer, proxyAddr string, opts ...ClientOption) (transport.StreamDialer, error) { if dialer == nil { return nil, errors.New("dialer must not be nil") } @@ -60,9 +60,9 @@ func NewConnectClient(dialer transport.StreamDialer, proxyAddr string, opts ...C } // WithHeaders appends the given [headers] to the CONNECT request -func WithHeaders(headers http.Header) ConnectClientOption { +func WithHeaders(headers http.Header) ClientOption { return func(c *connectClient) { - c.headers = headers + c.headers = headers.Clone() } } @@ -90,7 +90,7 @@ func (cc *connectClient) doConnect(ctx context.Context, remoteAddr string, conn pr, pw := io.Pipe() - req, err := http.NewRequestWithContext(ctx, http.MethodConnect, "http://"+remoteAddr, pr) + req, err := http.NewRequestWithContext(ctx, http.MethodConnect, "http://"+remoteAddr, pr) // TODO: HTTPS support if err != nil { return nil, fmt.Errorf("failed to create request: %w", err) } @@ -113,10 +113,11 @@ func (cc *connectClient) doConnect(ctx context.Context, remoteAddr string, conn return nil, fmt.Errorf("do: %w", err) } if resp.StatusCode != http.StatusOK { + _ = resp.Body.Close() return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) } - return &PipeConn{ + return &pipeConn{ reader: resp.Body, writer: pw, StreamConn: conn, diff --git a/x/httpconnect/connect_client_test.go b/x/httpconnect/connect_client_test.go index d7fe2961..0c28e03b 100644 --- a/x/httpconnect/connect_client_test.go +++ b/x/httpconnect/connect_client_test.go @@ -1,4 +1,4 @@ -// Copyright 2023 The Outline Authors +// Copyright 2025 The Outline Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -19,8 +19,8 @@ import ( "context" "encoding/base64" "github.com/Jigsaw-Code/outline-sdk/transport" + "github.com/Jigsaw-Code/outline-sdk/x/httpproxy" "github.com/stretchr/testify/require" - "io" "net" "net/http" "net/http/httptest" @@ -44,41 +44,19 @@ func TestConnectClientOk(t *testing.T) { targetURL, err := url.Parse(targetSrv.URL) require.NoError(t, err) - proxySrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - require.Equal(t, http.MethodConnect, r.Method, "Method") - require.Equal(t, targetURL.Host, r.Host, "Host") - require.Equal(t, []string{"Basic " + creds}, r.Header["Proxy-Authorization"], "Proxy-Authorization") - - conn, err := net.Dial("tcp", targetURL.Host) - require.NoError(t, err, "Dial") - - w.WriteHeader(http.StatusOK) - _, err = w.Write([]byte("HTTP/1.1 200 Connection established\r\n\r\n")) - require.NoError(t, err, "Write") - - rc := http.NewResponseController(w) - err = rc.Flush() - require.NoError(t, err, "Flush") - - clientConn, _, err := rc.Hijack() - require.NoError(t, err, "Hijack") - - go func() { - _, _ = io.Copy(conn, clientConn) - }() - _, _ = io.Copy(clientConn, conn) + tcpDialer := &transport.TCPDialer{Dialer: net.Dialer{}} + connectHandler := httpproxy.NewConnectHandler(tcpDialer) + proxySrv := httptest.NewServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { + require.Equal(t, "Basic "+creds, request.Header.Get("Proxy-Authorization")) + connectHandler.ServeHTTP(writer, request) })) defer proxySrv.Close() proxyURL, err := url.Parse(proxySrv.URL) require.NoError(t, err, "Parse") - dialer := &transport.TCPDialer{ - Dialer: net.Dialer{}, - } - connClient, err := NewConnectClient( - dialer, + tcpDialer, proxyURL.Host, WithHeaders(http.Header{"Proxy-Authorization": []string{"Basic " + creds}}), ) @@ -118,12 +96,10 @@ func TestConnectClientFail(t *testing.T) { proxyURL, err := url.Parse(proxySrv.URL) require.NoError(t, err, "Parse") - dialer := &transport.TCPDialer{ - Dialer: net.Dialer{}, - } - connClient, err := NewConnectClient( - dialer, + &transport.TCPDialer{ + Dialer: net.Dialer{}, + }, proxyURL.Host, ) require.NoError(t, err, "NewConnectClient") diff --git a/x/httpconnect/doc.go b/x/httpconnect/doc.go new file mode 100644 index 00000000..54418797 --- /dev/null +++ b/x/httpconnect/doc.go @@ -0,0 +1,16 @@ +// Copyright 2025 The Outline Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package httpconnect contains an HTTP CONNECT client implementation. +package httpconnect diff --git a/x/httpconnect/pipe_conn.go b/x/httpconnect/pipe_conn.go index 82ed516b..50f174a3 100644 --- a/x/httpconnect/pipe_conn.go +++ b/x/httpconnect/pipe_conn.go @@ -1,4 +1,4 @@ -// Copyright 2023 The Outline Authors +// Copyright 2025 The Outline Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -20,29 +20,31 @@ import ( "io" ) -// PipeConn is a [transport.StreamConn] that overrides [Read], [Write] (and corresponding [Close]) functions with the given [reader] and [writer] -type PipeConn struct { +var _ transport.StreamConn = (*pipeConn)(nil) + +// pipeConn is a [transport.StreamConn] that overrides [Read], [Write] (and corresponding [Close]) functions with the given [reader] and [writer] +type pipeConn struct { reader io.ReadCloser writer io.WriteCloser transport.StreamConn } -func (p *PipeConn) Read(b []byte) (n int, err error) { +func (p *pipeConn) Read(b []byte) (n int, err error) { return p.reader.Read(b) } -func (p *PipeConn) Write(b []byte) (n int, err error) { +func (p *pipeConn) Write(b []byte) (n int, err error) { return p.writer.Write(b) } -func (p *PipeConn) CloseRead() error { +func (p *pipeConn) CloseRead() error { return errors.Join(p.reader.Close(), p.StreamConn.CloseRead()) } -func (p *PipeConn) CloseWrite() error { +func (p *pipeConn) CloseWrite() error { return errors.Join(p.writer.Close(), p.StreamConn.CloseWrite()) } -func (p *PipeConn) Close() error { +func (p *pipeConn) Close() error { return errors.Join(p.reader.Close(), p.writer.Close(), p.StreamConn.Close()) }