diff --git a/proxymessages.go b/proxymessages.go index 7ea316c..3f7d1f7 100644 --- a/proxymessages.go +++ b/proxymessages.go @@ -54,6 +54,7 @@ func NewProxyMessageListener(logger *log.Logger, iproxy *InterceptingProxy) *Mes l.AddHandler("setproxystorage", setProxyStorageHandler) l.AddHandler("liststorage", listProxyStorageHandler) l.AddHandler("setproxy", setProxyHandler) + l.AddHandler("watchstorage", watchStorageHandler) return l } @@ -678,8 +679,10 @@ CheckRequest */ type checkRequestMessage struct { - Query StrMessageQuery - Request *RequestJSON + Query StrMessageQuery + Request *RequestJSON + DbId string + StorageId int } type checkRequestResponse struct { @@ -694,10 +697,31 @@ func checkRequestHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *Inter return } - req, err := mreq.Request.Parse() - if err != nil { - ErrorResponse(c, fmt.Sprintf("error parsing http request: %s", err.Error())) - return + var req *ProxyRequest + var err error + + if mreq.DbId != "" { + storage, _ := iproxy.GetMessageStorage(mreq.StorageId) + if storage == nil { + ErrorResponse(c, fmt.Sprintf("storage with id %d does not exist", mreq.StorageId)) + return + } + searchResults, err := storage.Search(1, FieldId, StrIs, mreq.DbId) + if err != nil { + ErrorResponse(c, err.Error()) + return + } + if len(searchResults) == 0 { + ErrorResponse(c, fmt.Sprintf("message with id=%s does not exist", mreq.DbId)) + return + } + req = searchResults[0] + } else { + req, err = mreq.Request.Parse() + if err != nil { + ErrorResponse(c, fmt.Sprintf("error parsing http request: %s", err.Error())) + return + } } goQuery, err := StrQueryToMsgQuery(mreq.Query) @@ -1920,3 +1944,153 @@ func setProxyHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *Intercept MessageResponse(c, &successResult{Success: true}) } + +/* +WatchStorage +*/ +type watchStorageMessage struct { + StorageId int + HeadersOnly bool +} + +type proxyMsgStorageWatcher struct { + connMtx sync.Mutex + storageId int + headersOnly bool + conn net.Conn +} + +type storageUpdateResponse struct { + StorageId int + Action string + MessageId string + Request *RequestJSON + Response *ResponseJSON + WSMessage *WSMessageJSON +} + +// Implement watcher + +func (sw *proxyMsgStorageWatcher) NewRequestSaved(ms MessageStorage, req *ProxyRequest) { + var msgRsp storageUpdateResponse + msgRsp.Request = NewRequestJSON(req, sw.headersOnly) + msgRsp.Action = "NewRequest" + msgRsp.StorageId = sw.storageId + MessageResponse(sw.conn, msgRsp) +} + +func (sw *proxyMsgStorageWatcher) RequestUpdated(ms MessageStorage, req *ProxyRequest) { + var msgRsp storageUpdateResponse + msgRsp.Request = NewRequestJSON(req, sw.headersOnly) + msgRsp.Action = "RequestUpdated" + msgRsp.StorageId = sw.storageId + MessageResponse(sw.conn, msgRsp) +} + +func (sw *proxyMsgStorageWatcher) RequestDeleted(ms MessageStorage, DbId string) { + var msgRsp storageUpdateResponse + msgRsp.Action = "RequestDeleted" + msgRsp.MessageId = DbId + msgRsp.StorageId = sw.storageId + MessageResponse(sw.conn, msgRsp) +} + +func (sw *proxyMsgStorageWatcher) NewResponseSaved(ms MessageStorage, rsp *ProxyResponse) { + var msgRsp storageUpdateResponse + msgRsp.Response = NewResponseJSON(rsp, sw.headersOnly) + msgRsp.Action = "NewResponse" + msgRsp.StorageId = sw.storageId + MessageResponse(sw.conn, msgRsp) +} + +func (sw *proxyMsgStorageWatcher) ResponseUpdated(ms MessageStorage, rsp *ProxyResponse) { + var msgRsp storageUpdateResponse + msgRsp.Response = NewResponseJSON(rsp, sw.headersOnly) + msgRsp.Action = "ResponseUpdated" + msgRsp.StorageId = sw.storageId + MessageResponse(sw.conn, msgRsp) +} + +func (sw *proxyMsgStorageWatcher) ResponseDeleted(ms MessageStorage, DbId string) { + var msgRsp storageUpdateResponse + msgRsp.Action = "ResponseDeleted" + msgRsp.MessageId = DbId + msgRsp.StorageId = sw.storageId + MessageResponse(sw.conn, msgRsp) +} + +func (sw *proxyMsgStorageWatcher) NewWSMessageSaved(ms MessageStorage, req *ProxyRequest, wsm *ProxyWSMessage) { + var msgRsp storageUpdateResponse + msgRsp.Request = NewRequestJSON(req, sw.headersOnly) + msgRsp.WSMessage = NewWSMessageJSON(wsm) + msgRsp.Action = "NewWSMessage" + msgRsp.StorageId = sw.storageId + MessageResponse(sw.conn, msgRsp) +} + +func (sw *proxyMsgStorageWatcher) WSMessageUpdated(ms MessageStorage, req *ProxyRequest, wsm *ProxyWSMessage) { + var msgRsp storageUpdateResponse + msgRsp.Request = NewRequestJSON(req, sw.headersOnly) + msgRsp.WSMessage = NewWSMessageJSON(wsm) + msgRsp.Action = "WSMessageUpdated" + msgRsp.StorageId = sw.storageId + MessageResponse(sw.conn, msgRsp) +} + +func (sw *proxyMsgStorageWatcher) WSMessageDeleted(ms MessageStorage, DbId string) { + var msgRsp storageUpdateResponse + msgRsp.Action = "WSMessageDeleted" + msgRsp.MessageId = DbId + msgRsp.StorageId = sw.storageId + MessageResponse(sw.conn, msgRsp) +} + +// Actual handler + +func watchStorageHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *InterceptingProxy) { + mreq := watchStorageMessage{} + if err := json.Unmarshal(b, &mreq); err != nil { + ErrorResponse(c, "error parsing message") + return + } + + // Parse storageId + storages := make([]*SavedStorage, 0) + if mreq.StorageId == -1 { + storages = iproxy.ListMessageStorage() + } else { + ms, desc := iproxy.GetMessageStorage(mreq.StorageId) + if ms == nil { + ErrorResponse(c, "invalid storage id") + return + } + + storage := &SavedStorage{ + Id: mreq.StorageId, + Storage: ms, + Description: desc, + } + storages = append(storages, storage) + } + + // Create the watchers + for _, storage := range storages { + watcher := &proxyMsgStorageWatcher{ + storageId: storage.Id, + headersOnly: mreq.HeadersOnly, + conn: c, + } + // Apply the watcher, kill them at end of connection + storage.Storage.Watch(watcher) + defer storage.Storage.EndWatch(watcher) + } + + // Keep the connection open + MessageResponse(c, &successResult{Success: true}) + tmpbuf := make([]byte, 0) + var err error = nil + for err == nil { + _, err = c.Read(tmpbuf) + } +} + diff --git a/sqlitestorage.go b/sqlitestorage.go index 6b3af2e..2f875cb 100644 --- a/sqlitestorage.go +++ b/sqlitestorage.go @@ -25,6 +25,7 @@ type SQLiteStorage struct { dbConn *sql.DB mtx sync.Mutex logger *log.Logger + storageWatchers []StorageWatcher } /* @@ -47,6 +48,7 @@ func OpenSQLiteStorage(fname string, logger *log.Logger) (*SQLiteStorage, error) } rs.logger = logger + rs.storageWatchers = make([]StorageWatcher, 0) return rs, nil } @@ -373,6 +375,9 @@ func (ms *SQLiteStorage) SaveNewRequest(req *ProxyRequest) error { return err } tx.Commit() + for _, watcher := range ms.storageWatchers { + watcher.NewRequestSaved(ms, req) + } return nil } @@ -449,6 +454,9 @@ func (ms *SQLiteStorage) UpdateRequest(req *ProxyRequest) error { return err } tx.Commit() + for _, watcher := range ms.storageWatchers { + watcher.RequestUpdated(ms, req) + } return nil } @@ -638,6 +646,9 @@ func (ms *SQLiteStorage) DeleteRequest(reqid string) error { return err } tx.Commit() + for _, watcher := range ms.storageWatchers { + watcher.RequestDeleted(ms, reqid) + } return nil } @@ -723,6 +734,9 @@ func (ms *SQLiteStorage) SaveNewResponse(rsp *ProxyResponse) error { return err } tx.Commit() + for _, watcher := range ms.storageWatchers { + watcher.NewResponseSaved(ms, rsp) + } return nil } @@ -776,6 +790,9 @@ func (ms *SQLiteStorage) UpdateResponse(rsp *ProxyResponse) error { return err } tx.Commit() + for _, watcher := range ms.storageWatchers { + watcher.ResponseUpdated(ms, rsp) + } return nil } @@ -912,6 +929,9 @@ func (ms *SQLiteStorage) DeleteResponse(rspid string) error { return err } tx.Commit() + for _, watcher := range ms.storageWatchers { + watcher.ResponseDeleted(ms, rspid) + } return nil } @@ -970,6 +990,9 @@ func (ms *SQLiteStorage) SaveNewWSMessage(req *ProxyRequest, wsm *ProxyWSMessage return err } tx.Commit() + for _, watcher := range ms.storageWatchers { + watcher.NewWSMessageSaved(ms, req, wsm) + } return nil } @@ -1047,6 +1070,9 @@ func (ms *SQLiteStorage) UpdateWSMessage(req *ProxyRequest, wsm *ProxyWSMessage) return err } tx.Commit() + for _, watcher := range ms.storageWatchers { + watcher.WSMessageUpdated(ms, req, wsm) + } return nil } @@ -1225,6 +1251,9 @@ func (ms *SQLiteStorage) DeleteWSMessage(wsmid string) error { return err } tx.Commit() + for _, watcher := range ms.storageWatchers { + watcher.WSMessageDeleted(ms, wsmid) + } return nil } @@ -1602,3 +1631,23 @@ func (ms *SQLiteStorage) allSavedQueries(tx *sql.Tx) ([]*SavedQuery, error) { } return savedQueries, nil } + +func (ms *SQLiteStorage) Watch(watcher StorageWatcher) error { + ms.mtx.Lock() + defer ms.mtx.Unlock() + ms.storageWatchers = append(ms.storageWatchers, watcher) + return nil +} + +func (ms *SQLiteStorage) EndWatch(watcher StorageWatcher) error { + ms.mtx.Lock() + var newWatched = make([]StorageWatcher, 0) + for _, testWatcher := range ms.storageWatchers { + if (testWatcher != watcher) { + newWatched = append(newWatched, testWatcher) + } + } + ms.storageWatchers = newWatched + ms.mtx.Unlock() + return nil +} diff --git a/storage.go b/storage.go index d2048a6..c331700 100644 --- a/storage.go +++ b/storage.go @@ -61,6 +61,34 @@ type MessageStorage interface { LoadQuery(name string) (MessageQuery, error) // Delete a query by name from the storage DeleteQuery(name string) error + + // Add a storage watcher to make callbacks to on message saves + Watch(watcher StorageWatcher) error + // Remove a storage watcher from the storage + EndWatch(watcher StorageWatcher) error +} + +type StorageWatcher interface { + // Callback for when a new request is saved + NewRequestSaved(ms MessageStorage, req *ProxyRequest) + // Callback for when a request is updated + RequestUpdated(ms MessageStorage, req *ProxyRequest) + // Callback for when a request is deleted + RequestDeleted(ms MessageStorage, DbId string) + + // Callback for when a new response is saved + NewResponseSaved(ms MessageStorage, rsp *ProxyResponse) + // Callback for when a response is updated + ResponseUpdated(ms MessageStorage, rsp *ProxyResponse) + // Callback for when a response is deleted + ResponseDeleted(ms MessageStorage, DbId string) + + // Callback for when a new wsmessage is saved + NewWSMessageSaved(ms MessageStorage, req *ProxyRequest, wsm *ProxyWSMessage) + // Callback for when a wsmessage is updated + WSMessageUpdated(ms MessageStorage, req *ProxyRequest, wsm *ProxyWSMessage) + // Callback for when a wsmessage is deleted + WSMessageDeleted(ms MessageStorage, DbId string) } // An error to be returned if a query is not supported