From 1c59e45d703a6f8a45be1be5e5efa0f93a00844a Mon Sep 17 00:00:00 2001 From: Martin Buchleitner Date: Fri, 9 Feb 2024 07:39:54 +0100 Subject: [PATCH] fix: process loop and locks (#31) * fix: remove full status Update Requests Signed-off-by: Martin Buchleitner * fix: remove full status Update Requests Signed-off-by: Martin Buchleitner * merge update Signed-off-by: Martin Buchleitner * fix: update that loop ctrl Signed-off-by: Martin Buchleitner * fix: atomic increment Signed-off-by: Martin Buchleitner * refactor: split helper code to dedicated files Signed-off-by: Martin Buchleitner --------- Signed-off-by: Martin Buchleitner --- go.mod | 8 +- go.sum | 16 ++-- helper.go | 38 +++++++++ pubsub.go | 58 +++++++++++++ shell/main.go | 11 +-- wattpilot.go | 226 ++++++++++++++++++-------------------------------- 6 files changed, 196 insertions(+), 161 deletions(-) create mode 100644 helper.go create mode 100644 pubsub.go diff --git a/go.mod b/go.mod index 0de3560..7ea14f2 100644 --- a/go.mod +++ b/go.mod @@ -3,16 +3,16 @@ module github.com/mabunixda/wattpilot go 1.21 require ( - github.com/gobwas/ws v1.2.0 - github.com/sirupsen/logrus v1.9.0 - golang.org/x/crypto v0.17.0 + github.com/gobwas/ws v1.3.2 + github.com/sirupsen/logrus v1.9.3 + golang.org/x/crypto v0.18.0 gopkg.in/yaml.v2 v2.4.0 ) require ( github.com/gobwas/httphead v0.1.0 // indirect github.com/gobwas/pool v0.2.1 // indirect - golang.org/x/sys v0.15.0 // indirect + golang.org/x/sys v0.16.0 // indirect ) retract v1.6.3 diff --git a/go.sum b/go.sum index 2035b1a..621920f 100644 --- a/go.sum +++ b/go.sum @@ -5,21 +5,21 @@ github.com/gobwas/httphead v0.1.0 h1:exrUm0f4YX0L7EBwZHuCF4GDp8aJfVeBrlLQrs6NqWU github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM= github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og= github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= -github.com/gobwas/ws v1.2.0 h1:u0p9s3xLYpZCA1z5JgCkMeB34CKCMMQbM+G8Ii7YD0I= -github.com/gobwas/ws v1.2.0/go.mod h1:hRKAFb8wOxFROYNsT1bqfWnhX+b5MFeJM9r2ZSwg/KY= +github.com/gobwas/ws v1.3.2 h1:zlnbNHxumkRvfPWgfXu8RBwyNR1x8wh9cf5PTOCqs9Q= +github.com/gobwas/ws v1.3.2/go.mod h1:hRKAFb8wOxFROYNsT1bqfWnhX+b5MFeJM9r2ZSwg/KY= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0= -github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= -golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= +golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc= +golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= -golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= +golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= diff --git a/helper.go b/helper.go new file mode 100644 index 0000000..3ccc86d --- /dev/null +++ b/helper.go @@ -0,0 +1,38 @@ +package wattpilot + +import ( + "crypto/sha256" + "encoding/hex" + "fmt" + "math/rand" + "time" +) + +var randomSource = rand.New(rand.NewSource(time.Now().UnixNano())) + +func Keys[K comparable, V any](m map[K]V) []K { + keys := make([]K, 0, len(m)) + for k := range m { + keys = append(keys, k) + } + return keys +} + +func hasKey(data map[string]interface{}, key string) bool { + _, isKnown := data[key] + return isKnown +} + +func sha256sum(data string) string { + bs := sha256.Sum256([]byte(data)) + return fmt.Sprintf("%x", bs) +} +func randomHexString(n int) string { + b := make([]byte, (n+2)/2) // can be simplified to n/2 if n is always even + + if _, err := randomSource.Read(b); err != nil { + panic(err) + } + + return hex.EncodeToString(b)[1 : n+1] +} diff --git a/pubsub.go b/pubsub.go new file mode 100644 index 0000000..09af62c --- /dev/null +++ b/pubsub.go @@ -0,0 +1,58 @@ +package wattpilot + +import "sync" + +type Pubsub struct { + mu sync.RWMutex + subs map[string][]chan interface{} + closed bool +} + +func NewPubsub() *Pubsub { + ps := &Pubsub{} + ps.subs = make(map[string][]chan interface{}) + return ps +} + +func (ps *Pubsub) IsEmpty() bool { + ps.mu.Lock() + defer ps.mu.Unlock() + + return len(ps.subs) == 0 +} + +func (ps *Pubsub) Subscribe(topic string) <-chan interface{} { + ps.mu.Lock() + defer ps.mu.Unlock() + + ch := make(chan interface{}, 1) + ps.subs[topic] = append(ps.subs[topic], ch) + return ch +} + +func (ps *Pubsub) Publish(topic string, msg interface{}) { + ps.mu.RLock() + defer ps.mu.RUnlock() + + if ps.closed { + return + } + for _, ch := range ps.subs[topic] { + go func(ch chan interface{}) { + ch <- msg + }(ch) + } +} +func (ps *Pubsub) Close() { + ps.mu.Lock() + defer ps.mu.Unlock() + + if !ps.closed { + ps.closed = true + for _, subs := range ps.subs { + for _, ch := range subs { + close(ch) + } + } + } +} diff --git a/shell/main.go b/shell/main.go index edced6f..a96226d 100644 --- a/shell/main.go +++ b/shell/main.go @@ -22,7 +22,7 @@ var inputs = map[string]InputFunc{ "disconnect": inDisconnect, "properties": inProperties, "dump": dumpData, - "level": setLevel, + "log": setLevel, "update": inUpdateStatus, } @@ -134,6 +134,7 @@ func inConnect(w *api.Wattpilot, data []string) { err := w.Connect() if err != nil { log.Println("Could not connect", err) + return } log.Printf("Connected to WattPilot %s, Serial %s", w.GetName(), w.GetSerial()) } @@ -142,9 +143,9 @@ func inDisconnect(w *api.Wattpilot, data []string) { w.Disconnect() } -func processUpdates(ups <-chan interface{}) { - updates = ups -} +// func processUpdates(ups <-chan interface{}) { +// updates = ups +// } var interrupt chan os.Signal @@ -165,7 +166,7 @@ func main() { fmt.Println("Could not update loglevel to", level, err) } // just a sample to test notification updates - processUpdates(w.GetNotifications("fhz")) + // processUpdates(w.GetNotifications("fhz")) inConnect(w, nil) w.StatusInfo() diff --git a/wattpilot.go b/wattpilot.go index 7b6d8a0..5647131 100644 --- a/wattpilot.go +++ b/wattpilot.go @@ -10,12 +10,12 @@ import ( "encoding/json" "errors" "fmt" - "math/rand" "net" "os" "os/signal" "strconv" "sync" + "sync/atomic" "time" "github.com/gobwas/ws" @@ -29,70 +29,13 @@ const ( RECONNECT_TIMEOUT = 5 // seconds ) -func Keys[K comparable, V any](m map[K]V) []K { - keys := make([]K, 0, len(m)) - for k := range m { - keys = append(keys, k) - } - return keys -} - //go:generate go run gen/generate.go -var randomSource = rand.New(rand.NewSource(time.Now().UnixNano())) - type eventFunc func(map[string]interface{}) -type Pubsub struct { - mu sync.RWMutex - subs map[string][]chan interface{} - closed bool -} - -func NewPubsub() *Pubsub { - ps := &Pubsub{} - ps.subs = make(map[string][]chan interface{}) - return ps -} -func (ps *Pubsub) Subscribe(topic string) <-chan interface{} { - ps.mu.Lock() - defer ps.mu.Unlock() - - ch := make(chan interface{}, 1) - ps.subs[topic] = append(ps.subs[topic], ch) - return ch -} - -func (ps *Pubsub) Publish(topic string, msg interface{}) { - ps.mu.RLock() - defer ps.mu.RUnlock() - - if ps.closed { - return - } - for _, ch := range ps.subs[topic] { - go func(ch chan interface{}) { - ch <- msg - }(ch) - } -} -func (ps *Pubsub) Close() { - ps.mu.Lock() - defer ps.mu.Unlock() - - if !ps.closed { - ps.closed = true - for _, subs := range ps.subs { - for _, ch := range subs { - close(ch) - } - } - } -} - type Wattpilot struct { _currentConnection *net.Conn - _requestId int + _requestId int64 _name string _hostname string _serial string @@ -140,7 +83,7 @@ func New(host string, password string) *Wattpilot { _currentConnection: nil, _isConnected: false, _isInitialized: false, - _requestId: 1, + _requestId: 0, _status: make(map[string]interface{}), } @@ -207,6 +150,10 @@ func (w *Wattpilot) IsInitialized() bool { func (w *Wattpilot) Properties() []string { keys := []string{} + + w._readMutex.Lock() + defer w._readMutex.Unlock() + for k := range w._status { keys = append(keys, k) } @@ -223,23 +170,8 @@ func (w *Wattpilot) LookupAlias(name string) string { return propertyMap[name] } -func hasKey(data map[string]interface{}, key string) bool { - _, isKnown := data[key] - return isKnown -} - -func sha256sum(data string) string { - bs := sha256.Sum256([]byte(data)) - return fmt.Sprintf("%x", bs) -} - -func (w *Wattpilot) getRequestId() int { - w._readMutex.Lock() - defer w._readMutex.Unlock() - current := w._requestId - w._requestId += 1 - - return current +func (w *Wattpilot) getRequestId() int64 { + return atomic.AddInt64(&w._requestId, 1) } func (w *Wattpilot) onEventHello(message map[string]interface{}) { @@ -270,16 +202,6 @@ func (w *Wattpilot) onEventHello(message map[string]interface{}) { } -func randomHexString(n int) string { - b := make([]byte, (n+2)/2) // can be simplified to n/2 if n is always even - - if _, err := randomSource.Read(b); err != nil { - panic(err) - } - - return hex.EncodeToString(b)[1 : n+1] -} - func (w *Wattpilot) onEventAuthRequired(message map[string]interface{}) { w._log.WithFields(log.Fields{"wattpilot": w._host}).Info("Auhtentication required") @@ -295,16 +217,16 @@ func (w *Wattpilot) onEventAuthRequired(message map[string]interface{}) { "token3": w._token3, "hash": hash, } - err := w.onSendRepsonse(false, response) + err := w.onSendResponse(false, response) w._isInitialized = (err != nil) } -func (w *Wattpilot) onSendRepsonse(secured bool, message map[string]interface{}) error { +func (w *Wattpilot) onSendResponse(secured bool, message map[string]interface{}) error { - w._log.WithFields(log.Fields{"wattpilot": w._host}).Trace("Sending data to wattpilot") + w._log.WithFields(log.Fields{"wattpilot": w._host}).Trace("Sending data to wattpilot: ", message["requestId"], " secured: ", secured) if secured { - msgId := message["requestId"].(int) + msgId := message["requestId"].(int64) payload, _ := json.Marshal(message) mac := hmac.New(sha256.New, []byte(w._hashedpassword)) @@ -312,7 +234,7 @@ func (w *Wattpilot) onSendRepsonse(secured bool, message map[string]interface{}) message = make(map[string]interface{}) message["type"] = "securedMsg" message["data"] = string(payload) - message["requestId"] = strconv.Itoa(msgId) + "sm" + message["requestId"] = fmt.Sprintf("%d", msgId) + "sm" message["hmac"] = hex.EncodeToString(mac.Sum(nil)) } @@ -344,9 +266,10 @@ func (w *Wattpilot) onEventResponse(message map[string]interface{}) { } func (w *Wattpilot) onEventAuthSuccess(message map[string]interface{}) { - w._log.WithFields(log.Fields{"wattpilot": w._host}).Info("Auhtentication successful") + w._log.WithFields(log.Fields{"wattpilot": w._host}).Info("Auhtentication successful") w.connected <- true + } func (w *Wattpilot) onEventAuthError(message map[string]interface{}) { @@ -360,11 +283,16 @@ func (w *Wattpilot) onEventFullStatus(message map[string]interface{}) { isPartial := message["partial"].(bool) - w.onEventDeltaStatus(message) + w.updateStatus(message) if isPartial { return } + if w.IsInitialized() { + return + } + + w._log.WithFields(log.Fields{"wattpilot": w._host}).Trace("Initialization done") w.initialized <- true w._isInitialized = true @@ -372,16 +300,20 @@ func (w *Wattpilot) onEventFullStatus(message map[string]interface{}) { func (w *Wattpilot) onEventDeltaStatus(message map[string]interface{}) { w._log.WithFields(log.Fields{"wattpilot": w._host}).Trace("Delta status update") + w.updateStatus(message) + +} + +func (w *Wattpilot) updateStatus(message map[string]interface{}) { + + statusUpdates := message["status"].(map[string]interface{}) + w._log.WithFields(log.Fields{"wattpilot": w._host}).Trace("Data-status gets updates #", len(statusUpdates)) w._readMutex.Lock() defer w._readMutex.Unlock() - status := message["status"].(map[string]interface{}) - for k, v := range status { + for k, v := range statusUpdates { w._status[k] = v - } - w._log.WithFields(log.Fields{"wattpilot": w._host}).Trace("Trigger notifications") - for k, v := range status { go w._notifications.Publish(k, v) } } @@ -397,48 +329,52 @@ func (w *Wattpilot) onEventUpdateInverter(message map[string]interface{}) { w._log.WithFields(log.Fields{"wattpilot": w._host}).Trace("update inverters") } func (w *Wattpilot) Disconnect() { + w._log.WithFields(log.Fields{"wattpilot": w._host}).Info("Going to disconnect...") + w._isConnected = false w.disconnectImpl() <-w.interrupt } func (w *Wattpilot) disconnectImpl() { - w._log.WithFields(log.Fields{"wattpilot": w._host}).Info("Disconnecting") + w._log.WithFields(log.Fields{"wattpilot": w._host}).Info("Disconnecting...") - if !w._isConnected { + if !w._isInitialized { return } - (*w._currentConnection).Close() - // if err := w._currentConnection.Close(websocket.StatusNormalClosure, "Bye Bye"); err != nil { - // w._log.WithFields(log.Fields{"wattpilot": w._host}).Trace("Error on closing connection: ", err) - // } + if err := (*w._currentConnection).Close(); err != nil { + w._log.WithFields(log.Fields{"wattpilot": w._host}).Trace("Error on closing connection: ", err) + } + + w._log.WithFields(log.Fields{"wattpilot": w._host}).Trace("closed connection") w._isInitialized = false w._isConnected = false w._currentConnection = nil - - w._log.WithFields(log.Fields{"wattpilot": w._host}).Trace("closed connection") + w._status = make(map[string]interface{}) } func (w *Wattpilot) Connect() error { - w._log.WithFields(log.Fields{"wattpilot": w._host}).Info("Connecting") - if w._isConnected || w._isInitialized { w._log.WithFields(log.Fields{"wattpilot": w._host}).Debug("Already Connected") return nil } + w._log.WithFields(log.Fields{"wattpilot": w._host}).Info("Connecting") + var err error dialContext, cancel := context.WithTimeout(w._readContext, time.Second*CONTEXT_TIMEOUT) defer cancel() - conn, _, _, err := ws.DefaultDialer.Dial(dialContext, fmt.Sprintf("ws://%s/ws", w._host)) + conn, reader, _, err := ws.DefaultDialer.Dial(dialContext, fmt.Sprintf("ws://%s/ws", w._host)) if err != nil { return err } w._currentConnection = &conn - + if reader != nil { + ws.PutReader(reader) + } go w.receiveHandler(w._readContext) w._isConnected = <-w.connected @@ -458,27 +394,24 @@ func (w *Wattpilot) Connect() error { func (w *Wattpilot) reconnect() { - w._log.WithFields(log.Fields{"wattpilot": w._host}).Info("Reconnecting") - - if w._isConnected { + if w._isConnected && !w._isInitialized { + w._log.WithFields(log.Fields{"wattpilot": w._host}).Info("Reconnect - Is still connected") return } - for { - w._log.WithFields(log.Fields{"wattpilot": w._host}).Debug("Reconnect is running...") - time.Sleep(time.Second * time.Duration(RECONNECT_TIMEOUT)) - if err := w.Connect(); err != nil { - w._log.WithFields(log.Fields{"wattpilot": w._host}).Debug("Reconnect failure: ", err) - continue - } - w._log.WithFields(log.Fields{"wattpilot": w._host}).Info("Successfully reconnected") + w._log.WithFields(log.Fields{"wattpilot": w._host}).Debug("Reconnecting..") + time.Sleep(time.Second * time.Duration(RECONNECT_TIMEOUT)) + if err := w.Connect(); err != nil { + w._log.WithFields(log.Fields{"wattpilot": w._host}).Debug("Reconnect failure: ", err) return } + w._log.WithFields(log.Fields{"wattpilot": w._host}).Info("Successfully reconnected") + } func (w *Wattpilot) processLoop(ctx context.Context) { - w._log.WithFields(log.Fields{"wattpilot": w._host}).Info("Starting process loop...") + w._log.WithFields(log.Fields{"wattpilot": w._host}).Info("Starting processing loop...") delayDuration := time.Duration(time.Second * CONTEXT_TIMEOUT) delay := time.NewTimer(delayDuration) @@ -491,11 +424,14 @@ func (w *Wattpilot) processLoop(ctx context.Context) { continue } w._log.WithFields(log.Fields{"wattpilot": w._host}).Trace("Hello there") - // if err := w.RequestStatusUpdate(); err != nil { - // w._log.WithFields(log.Fields{"wattpilot": w._host}).Error("Full Status Update failed: ", err) - // w._readCancel() - // break - // } + go func() { + time.Sleep(time.Millisecond * 100) + if err := w.RequestStatusUpdate(); err != nil { + w._log.WithFields(log.Fields{"wattpilot": w._host}).Error("Full Status Update failed: ", err) + w.disconnectImpl() + w.reconnect() + } + }() break case <-w._readContext.Done(): w._log.WithFields(log.Fields{"wattpilot": w._host}).Trace("Read context is done") @@ -520,10 +456,10 @@ func (w *Wattpilot) receiveHandler(ctx context.Context) { w._log.WithFields(log.Fields{"wattpilot": w._host}).Info("Starting receive handler...") for { - msg, _, err := wsutil.ReadServerData(*w._currentConnection) + msg, err := wsutil.ReadServerText(*w._currentConnection) if err != nil { + // w._readCancel() w._log.WithFields(log.Fields{"wattpilot": w._host}).Info("Stopping receive handler...") - w._readCancel() return } data := make(map[string]interface{}) @@ -549,14 +485,12 @@ func (w *Wattpilot) receiveHandler(ctx context.Context) { func (w *Wattpilot) GetProperty(name string) (interface{}, error) { - w._readMutex.Lock() - defer w._readMutex.Unlock() - w._log.WithFields(log.Fields{"wattpilot": w._host}).Debug("Get Property ", name) if !w._isInitialized { return nil, errors.New("connection is not valid") } + origName := name if v, isKnown := propertyMap[name]; isKnown { name = v @@ -566,6 +500,9 @@ func (w *Wattpilot) GetProperty(name string) (interface{}, error) { name = m.key } + w._readMutex.Lock() + defer w._readMutex.Unlock() + if !hasKey(w._status, name) { return nil, errors.New("could not find value of " + name) } @@ -578,24 +515,21 @@ func (w *Wattpilot) GetProperty(name string) (interface{}, error) { func (w *Wattpilot) SetProperty(name string, value interface{}) error { - w._readMutex.Lock() - defer w._readMutex.Unlock() w._log.WithFields(log.Fields{"wattpilot": w._host}).Debug("setting property ", name, " to ", value) if !w._isInitialized { - return errors.New("connection is not valid") + return errors.New("Connection is not valid") } + w._readMutex.Lock() + defer w._readMutex.Unlock() + if !hasKey(w._status, name) { - return errors.New("could not find reference for update on " + name) + return errors.New("Could not find reference for update on " + name) } - err := w.sendUpdate(name, value) - if err != nil { - return err - } + return w.sendUpdate(name, value) - return nil } func (w *Wattpilot) transformValue(value interface{}) interface{} { @@ -629,10 +563,14 @@ func (w *Wattpilot) sendUpdate(name string, value interface{}) error { message["requestId"] = w.getRequestId() message["key"] = name message["value"] = w.transformValue(value) - return w.onSendRepsonse(w._secured, message) + return w.onSendResponse(w._secured, message) } +// -------------------------------- +// helper functions that wrap properties +// -------------------------------- + func (w *Wattpilot) StatusInfo() { fmt.Println("Wattpilot: " + w._name) @@ -743,5 +681,5 @@ func (w *Wattpilot) RequestStatusUpdate() error { message := make(map[string]interface{}) message["type"] = "requestFullStatus" message["requestId"] = w.getRequestId() - return w.onSendRepsonse(w._secured, message) + return w.onSendResponse(w._secured, message) }