From 9e6ac23f407e84ea84eab6b59574baf5eaa1773a Mon Sep 17 00:00:00 2001 From: Rob Glew Date: Thu, 15 Feb 2018 14:48:11 -0600 Subject: [PATCH] Version 0.2.6 --- proxy.go | 122 +++++++++++++++++++++++++++++ proxymessages.go | 194 +++++++++++++++++++++++++++++++++-------------- schema.go | 25 ++++++ sqlitestorage.go | 65 ++++++++++++++++ storage.go | 4 + 5 files changed, 353 insertions(+), 57 deletions(-) diff --git a/proxy.go b/proxy.go index 8203ed7..a883304 100644 --- a/proxy.go +++ b/proxy.go @@ -26,6 +26,39 @@ type savedStorage struct { description string } +type GlobalStorageWatcher interface { + // Callback for when a new request is saved + NewRequestSaved(storageId int, ms MessageStorage, req *ProxyRequest) + // Callback for when a request is updated + RequestUpdated(storageId int, ms MessageStorage, req *ProxyRequest) + // Callback for when a request is deleted + RequestDeleted(storageId int, ms MessageStorage, DbId string) + + // Callback for when a new response is saved + NewResponseSaved(storageId int, ms MessageStorage, rsp *ProxyResponse) + // Callback for when a response is updated + ResponseUpdated(storageId int, ms MessageStorage, rsp *ProxyResponse) + // Callback for when a response is deleted + ResponseDeleted(storageId int, ms MessageStorage, DbId string) + + // Callback for when a new wsmessage is saved + NewWSMessageSaved(storageId int, ms MessageStorage, req *ProxyRequest, wsm *ProxyWSMessage) + // Callback for when a wsmessage is updated + WSMessageUpdated(storageId int, ms MessageStorage, req *ProxyRequest, wsm *ProxyWSMessage) + // Callback for when a wsmessage is deleted + WSMessageDeleted(storageId int, ms MessageStorage, DbId string) +} + +type globalWatcher struct { + watchers []GlobalStorageWatcher +} + +type globalWatcherShim struct { + storageId int + globWatcher *globalWatcher + logger *log.Logger +} + // InterceptingProxy is a struct which represents a proxy which can intercept and modify HTTP and websocket messages type InterceptingProxy struct { slistener *ProxyListener @@ -54,6 +87,7 @@ type InterceptingProxy struct { httpHandlers map[string]ProxyWebUIHandler messageStorage map[int]*savedStorage + globWatcher *globalWatcher } // ProxyCredentials are a username/password combination used to represent an HTTP BasicAuth session @@ -111,6 +145,9 @@ func NewInterceptingProxy(logger *log.Logger) *InterceptingProxy { iproxy.server = newProxyServer(useLogger, &iproxy) iproxy.logger = useLogger iproxy.httpHandlers = make(map[string]ProxyWebUIHandler) + iproxy.globWatcher = &globalWatcher{ + watchers: make([]GlobalStorageWatcher, 0), + } go func() { iproxy.server.Serve(iproxy.slistener) @@ -189,6 +226,13 @@ func (iproxy *InterceptingProxy) AddMessageStorage(storage MessageStorage, descr defer iproxy.mtx.Unlock() id := getNextStorageId() iproxy.messageStorage[id] = &savedStorage{storage, description} + + shim := &globalWatcherShim{ + storageId: id, + globWatcher: iproxy.globWatcher, + logger: iproxy.logger, + } + storage.Watch(shim) return id } @@ -492,6 +536,28 @@ func (iproxy *InterceptingProxy) RemoveWSInterceptor(sub *WSIntSub) { } } +// Add a global storage watcher +func (iproxy *InterceptingProxy) GlobalStorageWatch(watcher GlobalStorageWatcher) error { + iproxy.mtx.Lock() + defer iproxy.mtx.Unlock() + iproxy.globWatcher.watchers = append(iproxy.globWatcher.watchers, watcher) + return nil +} + +// Remove a global storage watcher +func (iproxy *InterceptingProxy) GlobalStorageEndWatch(watcher GlobalStorageWatcher) error { + iproxy.mtx.Lock() + defer iproxy.mtx.Unlock() + var newWatched = make([]GlobalStorageWatcher, 0) + for _, testWatcher := range iproxy.globWatcher.watchers { + if (testWatcher != watcher) { + newWatched = append(newWatched, testWatcher) + } + } + iproxy.globWatcher.watchers = newWatched + return nil +} + // SetProxyStorage sets which storage should be used to store messages as they pass through the proxy func (iproxy *InterceptingProxy) SetProxyStorage(storageId int) error { iproxy.mtx.Lock() @@ -916,3 +982,59 @@ func newProxyServer(logger *log.Logger, iproxy *InterceptingProxy) *http.Server } return server } + +// StorageWatcher implementation +func (watcher *globalWatcherShim) NewRequestSaved(ms MessageStorage, req *ProxyRequest) { + for _, w := range watcher.globWatcher.watchers { + w.NewRequestSaved(watcher.storageId, ms, req) + } +} + +func (watcher *globalWatcherShim) RequestUpdated(ms MessageStorage, req *ProxyRequest) { + for _, w := range watcher.globWatcher.watchers { + w.RequestUpdated(watcher.storageId, ms, req) + } +} + +func (watcher *globalWatcherShim) RequestDeleted(ms MessageStorage, DbId string) { + for _, w := range watcher.globWatcher.watchers { + w.RequestDeleted(watcher.storageId, ms, DbId) + } +} + +func (watcher *globalWatcherShim) NewResponseSaved(ms MessageStorage, rsp *ProxyResponse) { + for _, w := range watcher.globWatcher.watchers { + w.NewResponseSaved(watcher.storageId, ms, rsp) + } +} + +func (watcher *globalWatcherShim) ResponseUpdated(ms MessageStorage, rsp *ProxyResponse) { + for _, w := range watcher.globWatcher.watchers { + w.ResponseUpdated(watcher.storageId, ms, rsp) + } +} + +func (watcher *globalWatcherShim) ResponseDeleted(ms MessageStorage, DbId string) { + for _, w := range watcher.globWatcher.watchers { + w.ResponseDeleted(watcher.storageId, ms, DbId) + } +} + +func (watcher *globalWatcherShim) NewWSMessageSaved(ms MessageStorage, req *ProxyRequest, wsm *ProxyWSMessage) { + for _, w := range watcher.globWatcher.watchers { + w.NewWSMessageSaved(watcher.storageId, ms, req, wsm) + } +} + +func (watcher *globalWatcherShim) WSMessageUpdated(ms MessageStorage, req *ProxyRequest, wsm *ProxyWSMessage) { + for _, w := range watcher.globWatcher.watchers { + w.WSMessageUpdated(watcher.storageId, ms, req, wsm) + } +} + +func (watcher *globalWatcherShim) WSMessageDeleted(ms MessageStorage, DbId string) { + for _, w := range watcher.globWatcher.watchers { + w.WSMessageDeleted(watcher.storageId, ms, DbId) + } +} + diff --git a/proxymessages.go b/proxymessages.go index 0d7bf60..2cf796e 100644 --- a/proxymessages.go +++ b/proxymessages.go @@ -55,6 +55,8 @@ func NewProxyMessageListener(logger *log.Logger, iproxy *InterceptingProxy) *Mes l.AddHandler("liststorage", listProxyStorageHandler) l.AddHandler("setproxy", setProxyHandler) l.AddHandler("watchstorage", watchStorageHandler) + l.AddHandler("setpluginvalue", setPluginValueHandler) + l.AddHandler("getpluginvalue", getPluginValueHandler) return l } @@ -1949,13 +1951,11 @@ func setProxyHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *Intercept WatchStorage */ type watchStorageMessage struct { - StorageId int HeadersOnly bool } type proxyMsgStorageWatcher struct { connMtx sync.Mutex - storageId int headersOnly bool conn net.Conn } @@ -1971,77 +1971,95 @@ type storageUpdateResponse struct { // Implement watcher -func (sw *proxyMsgStorageWatcher) NewRequestSaved(ms MessageStorage, req *ProxyRequest) { - var msgRsp storageUpdateResponse +func (sw *proxyMsgStorageWatcher) NewRequestSaved(storageId int, ms MessageStorage, req *ProxyRequest) { + sw.connMtx.Lock() + defer sw.connMtx.Unlock() + msgRsp := &storageUpdateResponse{} msgRsp.Request = NewRequestJSON(req, sw.headersOnly) msgRsp.Action = "NewRequest" - msgRsp.StorageId = sw.storageId + msgRsp.StorageId = storageId MessageResponse(sw.conn, msgRsp) } -func (sw *proxyMsgStorageWatcher) RequestUpdated(ms MessageStorage, req *ProxyRequest) { - var msgRsp storageUpdateResponse +func (sw *proxyMsgStorageWatcher) RequestUpdated(storageId int, ms MessageStorage, req *ProxyRequest) { + sw.connMtx.Lock() + defer sw.connMtx.Unlock() + msgRsp := &storageUpdateResponse{} msgRsp.Request = NewRequestJSON(req, sw.headersOnly) msgRsp.Action = "RequestUpdated" - msgRsp.StorageId = sw.storageId + msgRsp.StorageId = storageId MessageResponse(sw.conn, msgRsp) } -func (sw *proxyMsgStorageWatcher) RequestDeleted(ms MessageStorage, DbId string) { - var msgRsp storageUpdateResponse +func (sw *proxyMsgStorageWatcher) RequestDeleted(storageId int, ms MessageStorage, DbId string) { + sw.connMtx.Lock() + defer sw.connMtx.Unlock() + msgRsp := &storageUpdateResponse{} msgRsp.Action = "RequestDeleted" msgRsp.MessageId = DbId - msgRsp.StorageId = sw.storageId + msgRsp.StorageId = storageId MessageResponse(sw.conn, msgRsp) } -func (sw *proxyMsgStorageWatcher) NewResponseSaved(ms MessageStorage, rsp *ProxyResponse) { - var msgRsp storageUpdateResponse +func (sw *proxyMsgStorageWatcher) NewResponseSaved(storageId int, ms MessageStorage, rsp *ProxyResponse) { + sw.connMtx.Lock() + defer sw.connMtx.Unlock() + msgRsp := &storageUpdateResponse{} msgRsp.Response = NewResponseJSON(rsp, sw.headersOnly) msgRsp.Action = "NewResponse" - msgRsp.StorageId = sw.storageId + msgRsp.StorageId = storageId MessageResponse(sw.conn, msgRsp) } -func (sw *proxyMsgStorageWatcher) ResponseUpdated(ms MessageStorage, rsp *ProxyResponse) { - var msgRsp storageUpdateResponse +func (sw *proxyMsgStorageWatcher) ResponseUpdated(storageId int, ms MessageStorage, rsp *ProxyResponse) { + sw.connMtx.Lock() + defer sw.connMtx.Unlock() + msgRsp := &storageUpdateResponse{} msgRsp.Response = NewResponseJSON(rsp, sw.headersOnly) msgRsp.Action = "ResponseUpdated" - msgRsp.StorageId = sw.storageId + msgRsp.StorageId = storageId MessageResponse(sw.conn, msgRsp) } -func (sw *proxyMsgStorageWatcher) ResponseDeleted(ms MessageStorage, DbId string) { - var msgRsp storageUpdateResponse +func (sw *proxyMsgStorageWatcher) ResponseDeleted(storageId int, ms MessageStorage, DbId string) { + sw.connMtx.Lock() + defer sw.connMtx.Unlock() + msgRsp := &storageUpdateResponse{} msgRsp.Action = "ResponseDeleted" msgRsp.MessageId = DbId - msgRsp.StorageId = sw.storageId + msgRsp.StorageId = storageId MessageResponse(sw.conn, msgRsp) } -func (sw *proxyMsgStorageWatcher) NewWSMessageSaved(ms MessageStorage, req *ProxyRequest, wsm *ProxyWSMessage) { - var msgRsp storageUpdateResponse +func (sw *proxyMsgStorageWatcher) NewWSMessageSaved(storageId int, ms MessageStorage, req *ProxyRequest, wsm *ProxyWSMessage) { + sw.connMtx.Lock() + defer sw.connMtx.Unlock() + msgRsp := &storageUpdateResponse{} msgRsp.Request = NewRequestJSON(req, sw.headersOnly) msgRsp.WSMessage = NewWSMessageJSON(wsm) msgRsp.Action = "NewWSMessage" - msgRsp.StorageId = sw.storageId + msgRsp.StorageId = storageId MessageResponse(sw.conn, msgRsp) } -func (sw *proxyMsgStorageWatcher) WSMessageUpdated(ms MessageStorage, req *ProxyRequest, wsm *ProxyWSMessage) { - var msgRsp storageUpdateResponse +func (sw *proxyMsgStorageWatcher) WSMessageUpdated(storageId int, ms MessageStorage, req *ProxyRequest, wsm *ProxyWSMessage) { + sw.connMtx.Lock() + defer sw.connMtx.Unlock() + msgRsp := &storageUpdateResponse{} msgRsp.Request = NewRequestJSON(req, sw.headersOnly) msgRsp.WSMessage = NewWSMessageJSON(wsm) msgRsp.Action = "WSMessageUpdated" - msgRsp.StorageId = sw.storageId + msgRsp.StorageId = storageId MessageResponse(sw.conn, msgRsp) } -func (sw *proxyMsgStorageWatcher) WSMessageDeleted(ms MessageStorage, DbId string) { +func (sw *proxyMsgStorageWatcher) WSMessageDeleted(storageId int, ms MessageStorage, DbId string) { + sw.connMtx.Lock() + defer sw.connMtx.Unlock() var msgRsp storageUpdateResponse msgRsp.Action = "WSMessageDeleted" msgRsp.MessageId = DbId - msgRsp.StorageId = sw.storageId + msgRsp.StorageId = storageId MessageResponse(sw.conn, msgRsp) } @@ -2054,36 +2072,13 @@ func watchStorageHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *Inter return } - // Parse storageId - storages := make([]*SavedStorage, 0) - if mreq.StorageId == -1 { - storages = iproxy.ListMessageStorage() - } else { - ms, desc := iproxy.GetMessageStorage(mreq.StorageId) - if ms == nil { - ErrorResponse(c, "invalid storage id") - return - } - - storage := &SavedStorage{ - Id: mreq.StorageId, - Storage: ms, - Description: desc, - } - storages = append(storages, storage) - } - - // Create the watchers - for _, storage := range storages { - watcher := &proxyMsgStorageWatcher{ - storageId: storage.Id, - headersOnly: mreq.HeadersOnly, - conn: c, - } - // Apply the watcher, kill them at end of connection - storage.Storage.Watch(watcher) - defer storage.Storage.EndWatch(watcher) + // add global watcher + watcher := &proxyMsgStorageWatcher{ + headersOnly: mreq.HeadersOnly, + conn: c, } + iproxy.GlobalStorageWatch(watcher) + defer iproxy.GlobalStorageEndWatch(watcher) // Keep the connection open MessageResponse(c, &successResult{Success: true}) @@ -2094,3 +2089,88 @@ func watchStorageHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *Inter } } +/* +SetPluginValue and GetPluginValue +*/ +type setPluginValueMessage struct { + Key string + Value string + Storage int +} + +func setPluginValueHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *InterceptingProxy) { + mreq := setPluginValueMessage{} + + if err := json.Unmarshal(b, &mreq); err != nil { + ErrorResponse(c, "error parsing message") + return + } + + if mreq.Storage == 0 { + ErrorResponse(c, "storage is required") + return + } + + storage, _ := iproxy.GetMessageStorage(mreq.Storage) + if storage == nil { + ErrorResponse(c, fmt.Sprintf("storage with id %d does not exist", mreq.Storage)) + return + } + + if mreq.Key == "" { + ErrorResponse(c, "key value is required") + return + } + + err := storage.SetPluginValue(mreq.Key, mreq.Value) + if err != nil { + ErrorResponse(c, fmt.Sprintf("error saving value: %s", err.Error())) + return + } + + MessageResponse(c, &successResult{Success: true}) +} + +type getPluginValueMessage struct { + Key string + Storage int +} + +type getPluginValueResponse struct { + Value string + Success bool +} + +func getPluginValueHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *InterceptingProxy) { + mreq := getPluginValueMessage{} + + if err := json.Unmarshal(b, &mreq); err != nil { + ErrorResponse(c, "error parsing message") + return + } + + if mreq.Storage == 0 { + ErrorResponse(c, "storage is required") + return + } + + storage, _ := iproxy.GetMessageStorage(mreq.Storage) + if storage == nil { + ErrorResponse(c, fmt.Sprintf("storage with id %d does not exist", mreq.Storage)) + return + } + + if mreq.Key == "" { + ErrorResponse(c, "key value is required") + return + } + + value, err := storage.GetPluginValue(mreq.Key) + if err != nil { + ErrorResponse(c, fmt.Sprintf("error getting value: %s", err.Error())) + return + } + + MessageResponse(c, &getPluginValueResponse{Value: value, Success: true}) +} + diff --git a/schema.go b/schema.go index 4b12965..2ad5271 100644 --- a/schema.go +++ b/schema.go @@ -19,6 +19,7 @@ type tableNameRow struct { var schemaUpdaters = []schemaUpdater{ schema8, schema9, + schema10, } func UpdateSchema(db *sql.DB, logger *log.Logger) error { @@ -543,3 +544,27 @@ func schema9(tx *sql.Tx) error { } return nil } + +func schema10(tx *sql.Tx) error { + /* + Create a "plugin data" table to let applications store app-specific data in the datafile + */ + cmds := []string{` + CREATE TABLE plugin_data ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + key TEXT UNIQUE, + value STRING + ); + CREATE INDEX plugin_key_ind ON plugin_data(key); + `, + + `UPDATE schema_meta SET version=10`, + } + + err := executeMultiple(tx, cmds) + if err != nil { + return err + } + + return nil +} diff --git a/sqlitestorage.go b/sqlitestorage.go index 2f875cb..b293c44 100644 --- a/sqlitestorage.go +++ b/sqlitestorage.go @@ -1651,3 +1651,68 @@ func (ms *SQLiteStorage) EndWatch(watcher StorageWatcher) error { ms.mtx.Unlock() return nil } + +func (ms *SQLiteStorage) SetPluginValue(key string, value string) error { + ms.mtx.Lock() + defer ms.mtx.Unlock() + tx, err := ms.dbConn.Begin() + if err != nil { + return err + } + err = ms.setPluginValue(tx, key, value) + if err != nil { + tx.Rollback() + return err + } + tx.Commit() + return nil +} + +func (ms *SQLiteStorage) setPluginValue(tx *sql.Tx, key string, value string) error { + stmt, err := tx.Prepare(` + INSERT OR REPLACE INTO plugin_data ( + key, + value + ) VALUES (?, ?); + `) + if err != nil { + return fmt.Errorf("error preparing statement to insert request into database: %s", err.Error()) + } + defer stmt.Close() + + _, err = stmt.Exec(key, value) + if err != nil { + return fmt.Errorf("error inserting plugin data into database: %s", err.Error()) + } + + return nil +} + +func (ms *SQLiteStorage) GetPluginValue(key string) (string, error) { + ms.mtx.Lock() + defer ms.mtx.Unlock() + tx, err := ms.dbConn.Begin() + if err != nil { + return "", err + } + value, err := ms.getPluginValue(tx, key) + if err != nil { + tx.Rollback() + return "", err + } + tx.Commit() + return value, nil +} + +func (ms *SQLiteStorage) getPluginValue(tx *sql.Tx, key string) (string, error) { + var value sql.NullString + err := tx.QueryRow(`SELECT value FROM plugin_data WHERE key=?`, key).Scan( + &value, + ) + if err == sql.ErrNoRows { + return "", fmt.Errorf("plugin data with key %s does not exist", key) + } else if err != nil { + return "", fmt.Errorf("error loading data from datafile: %s", err.Error()) + } + return value.String, nil +} diff --git a/storage.go b/storage.go index c331700..276ba58 100644 --- a/storage.go +++ b/storage.go @@ -66,6 +66,10 @@ type MessageStorage interface { Watch(watcher StorageWatcher) error // Remove a storage watcher from the storage EndWatch(watcher StorageWatcher) error + + // Set/get plugin values + SetPluginValue(key string, value string) error + GetPluginValue(key string) (string, error) } type StorageWatcher interface {