Skip to content

Commit

Permalink
fix: Replacing websocket library (#46)
Browse files Browse the repository at this point in the history
* fix: concurrent writes

Signed-off-by: Martin Buchleitner <mbuchleitner@infralovers.com>

* fix: adding writemutex

Signed-off-by: Martin Buchleitner <mbuchleitner@infralovers.com>

---------

Signed-off-by: Martin Buchleitner <mbuchleitner@infralovers.com>
  • Loading branch information
mabunixda authored Jul 9, 2024
1 parent 3ea480b commit 83add7a
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 18 deletions.
7 changes: 2 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,12 @@ module github.com/mabunixda/wattpilot
go 1.21

require (
github.com/gorilla/websocket v1.5.2
github.com/sirupsen/logrus v1.9.3
golang.org/x/crypto v0.24.0
gopkg.in/yaml.v2 v2.4.0
nhooyr.io/websocket v1.8.11
)

require (
golang.org/x/net v0.26.0 // indirect
golang.org/x/sys v0.21.0 // indirect
)
require golang.org/x/sys v0.21.0 // indirect

retract v1.6.3
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gorilla/websocket v1.5.2 h1:qoW6V1GT3aZxybsbC6oLnailWnB+qTMVwMreOso9XUw=
github.com/gorilla/websocket v1.5.2/go.mod h1:0n9H61RBAcf5/38py2MCYbxzPIY9rOkpvvMT24Rqs30=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
Expand All @@ -12,8 +10,6 @@ github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5Cc
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI=
golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM=
golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ=
golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
Expand All @@ -23,3 +19,5 @@ gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
nhooyr.io/websocket v1.8.11 h1:f/qXNc2/3DpoSZkHt1DQu6rj4zGC8JmkkLkWss0MgN0=
nhooyr.io/websocket v1.8.11/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c=
19 changes: 10 additions & 9 deletions wattpilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ import (
"syscall"
"time"

"github.com/gorilla/websocket"
log "github.com/sirupsen/logrus"
"golang.org/x/crypto/pbkdf2"
"nhooyr.io/websocket"
)

const (
Expand Down Expand Up @@ -50,8 +50,8 @@ type Wattpilot struct {
initialized chan bool
secured bool

readMutex sync.Mutex
connWriteMutex sync.Mutex
readMutex sync.Mutex
writeMutex sync.Mutex

host string
password string
Expand Down Expand Up @@ -379,7 +379,7 @@ func (w *Wattpilot) Connect() error {
w.logger.WithFields(log.Fields{"wattpilot": w.host}).Info("Connecting")
var err error

conn, _, err := websocket.DefaultDialer.Dial(fmt.Sprintf("ws://%s/ws", w.host), nil)
conn, _, err := websocket.Dial(context.Background(), fmt.Sprintf("ws://%s/ws", w.host), nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -437,7 +437,7 @@ func (w *Wattpilot) disconnectImpl() {

if w.conn != nil {
w.logger.WithFields(log.Fields{"wattpilot": w.host}).Trace("Closing connection...")
if err := (*w.conn).Close(); err != nil {
if err := (*w.conn).CloseNow(); err != nil {
w.logger.WithFields(log.Fields{"wattpilot": w.host}).Trace("Error on closing connection: ", err)
}
}
Expand Down Expand Up @@ -487,7 +487,7 @@ func (w *Wattpilot) receiveHandler() {
w.logger.WithFields(log.Fields{"wattpilot": w.host}).Info("Starting receive handler...")

for {
_, msg, err := w.conn.ReadMessage()
_, msg, err := w.conn.Read(context.Background())
if err != nil {
w.logger.WithFields(log.Fields{"wattpilot": w.host}).Info("Stopping receive handler...")
return
Expand Down Expand Up @@ -561,6 +561,9 @@ func (w *Wattpilot) onEventAuthRequired(message map[string]interface{}) {

func (w *Wattpilot) onSendResponse(secured bool, message map[string]interface{}) error {

w.writeMutex.Lock()
defer w.writeMutex.Unlock()

w.logger.WithFields(log.Fields{"wattpilot": w.host}).Trace("Sending data to wattpilot: ", message["requestId"], " secured: ", secured)

if secured {
Expand All @@ -578,9 +581,7 @@ func (w *Wattpilot) onSendResponse(secured bool, message map[string]interface{})

data, _ := json.Marshal(message)

w.connWriteMutex.Lock()
defer w.connWriteMutex.Unlock()
err := w.conn.WriteMessage(websocket.TextMessage, data)
err := w.conn.Write(context.Background(), websocket.MessageText, data)
if err != nil {
w.logger.WithFields(log.Fields{"wattpilot": w.host}).Trace("Sending data to wattpilot: ", message["data"], " Error: ", err)
return err
Expand Down

0 comments on commit 83add7a

Please sign in to comment.