Version 0.2.4

master
Rob Glew 7 years ago
parent 267d32bdbc
commit 26c1ad2ee6
  1. 176
      proxymessages.go
  2. 49
      sqlitestorage.go
  3. 28
      storage.go

@ -54,6 +54,7 @@ func NewProxyMessageListener(logger *log.Logger, iproxy *InterceptingProxy) *Mes
l.AddHandler("setproxystorage", setProxyStorageHandler) l.AddHandler("setproxystorage", setProxyStorageHandler)
l.AddHandler("liststorage", listProxyStorageHandler) l.AddHandler("liststorage", listProxyStorageHandler)
l.AddHandler("setproxy", setProxyHandler) l.AddHandler("setproxy", setProxyHandler)
l.AddHandler("watchstorage", watchStorageHandler)
return l return l
} }
@ -680,6 +681,8 @@ CheckRequest
type checkRequestMessage struct { type checkRequestMessage struct {
Query StrMessageQuery Query StrMessageQuery
Request *RequestJSON Request *RequestJSON
DbId string
StorageId int
} }
type checkRequestResponse struct { type checkRequestResponse struct {
@ -694,11 +697,32 @@ func checkRequestHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *Inter
return return
} }
req, err := mreq.Request.Parse() var req *ProxyRequest
var err error
if mreq.DbId != "" {
storage, _ := iproxy.GetMessageStorage(mreq.StorageId)
if storage == nil {
ErrorResponse(c, fmt.Sprintf("storage with id %d does not exist", mreq.StorageId))
return
}
searchResults, err := storage.Search(1, FieldId, StrIs, mreq.DbId)
if err != nil {
ErrorResponse(c, err.Error())
return
}
if len(searchResults) == 0 {
ErrorResponse(c, fmt.Sprintf("message with id=%s does not exist", mreq.DbId))
return
}
req = searchResults[0]
} else {
req, err = mreq.Request.Parse()
if err != nil { if err != nil {
ErrorResponse(c, fmt.Sprintf("error parsing http request: %s", err.Error())) ErrorResponse(c, fmt.Sprintf("error parsing http request: %s", err.Error()))
return return
} }
}
goQuery, err := StrQueryToMsgQuery(mreq.Query) goQuery, err := StrQueryToMsgQuery(mreq.Query)
if err != nil { if err != nil {
@ -1920,3 +1944,153 @@ func setProxyHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *Intercept
MessageResponse(c, &successResult{Success: true}) MessageResponse(c, &successResult{Success: true})
} }
/*
WatchStorage
*/
type watchStorageMessage struct {
StorageId int
HeadersOnly bool
}
type proxyMsgStorageWatcher struct {
connMtx sync.Mutex
storageId int
headersOnly bool
conn net.Conn
}
type storageUpdateResponse struct {
StorageId int
Action string
MessageId string
Request *RequestJSON
Response *ResponseJSON
WSMessage *WSMessageJSON
}
// Implement watcher
func (sw *proxyMsgStorageWatcher) NewRequestSaved(ms MessageStorage, req *ProxyRequest) {
var msgRsp storageUpdateResponse
msgRsp.Request = NewRequestJSON(req, sw.headersOnly)
msgRsp.Action = "NewRequest"
msgRsp.StorageId = sw.storageId
MessageResponse(sw.conn, msgRsp)
}
func (sw *proxyMsgStorageWatcher) RequestUpdated(ms MessageStorage, req *ProxyRequest) {
var msgRsp storageUpdateResponse
msgRsp.Request = NewRequestJSON(req, sw.headersOnly)
msgRsp.Action = "RequestUpdated"
msgRsp.StorageId = sw.storageId
MessageResponse(sw.conn, msgRsp)
}
func (sw *proxyMsgStorageWatcher) RequestDeleted(ms MessageStorage, DbId string) {
var msgRsp storageUpdateResponse
msgRsp.Action = "RequestDeleted"
msgRsp.MessageId = DbId
msgRsp.StorageId = sw.storageId
MessageResponse(sw.conn, msgRsp)
}
func (sw *proxyMsgStorageWatcher) NewResponseSaved(ms MessageStorage, rsp *ProxyResponse) {
var msgRsp storageUpdateResponse
msgRsp.Response = NewResponseJSON(rsp, sw.headersOnly)
msgRsp.Action = "NewResponse"
msgRsp.StorageId = sw.storageId
MessageResponse(sw.conn, msgRsp)
}
func (sw *proxyMsgStorageWatcher) ResponseUpdated(ms MessageStorage, rsp *ProxyResponse) {
var msgRsp storageUpdateResponse
msgRsp.Response = NewResponseJSON(rsp, sw.headersOnly)
msgRsp.Action = "ResponseUpdated"
msgRsp.StorageId = sw.storageId
MessageResponse(sw.conn, msgRsp)
}
func (sw *proxyMsgStorageWatcher) ResponseDeleted(ms MessageStorage, DbId string) {
var msgRsp storageUpdateResponse
msgRsp.Action = "ResponseDeleted"
msgRsp.MessageId = DbId
msgRsp.StorageId = sw.storageId
MessageResponse(sw.conn, msgRsp)
}
func (sw *proxyMsgStorageWatcher) NewWSMessageSaved(ms MessageStorage, req *ProxyRequest, wsm *ProxyWSMessage) {
var msgRsp storageUpdateResponse
msgRsp.Request = NewRequestJSON(req, sw.headersOnly)
msgRsp.WSMessage = NewWSMessageJSON(wsm)
msgRsp.Action = "NewWSMessage"
msgRsp.StorageId = sw.storageId
MessageResponse(sw.conn, msgRsp)
}
func (sw *proxyMsgStorageWatcher) WSMessageUpdated(ms MessageStorage, req *ProxyRequest, wsm *ProxyWSMessage) {
var msgRsp storageUpdateResponse
msgRsp.Request = NewRequestJSON(req, sw.headersOnly)
msgRsp.WSMessage = NewWSMessageJSON(wsm)
msgRsp.Action = "WSMessageUpdated"
msgRsp.StorageId = sw.storageId
MessageResponse(sw.conn, msgRsp)
}
func (sw *proxyMsgStorageWatcher) WSMessageDeleted(ms MessageStorage, DbId string) {
var msgRsp storageUpdateResponse
msgRsp.Action = "WSMessageDeleted"
msgRsp.MessageId = DbId
msgRsp.StorageId = sw.storageId
MessageResponse(sw.conn, msgRsp)
}
// Actual handler
func watchStorageHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *InterceptingProxy) {
mreq := watchStorageMessage{}
if err := json.Unmarshal(b, &mreq); err != nil {
ErrorResponse(c, "error parsing message")
return
}
// Parse storageId
storages := make([]*SavedStorage, 0)
if mreq.StorageId == -1 {
storages = iproxy.ListMessageStorage()
} else {
ms, desc := iproxy.GetMessageStorage(mreq.StorageId)
if ms == nil {
ErrorResponse(c, "invalid storage id")
return
}
storage := &SavedStorage{
Id: mreq.StorageId,
Storage: ms,
Description: desc,
}
storages = append(storages, storage)
}
// Create the watchers
for _, storage := range storages {
watcher := &proxyMsgStorageWatcher{
storageId: storage.Id,
headersOnly: mreq.HeadersOnly,
conn: c,
}
// Apply the watcher, kill them at end of connection
storage.Storage.Watch(watcher)
defer storage.Storage.EndWatch(watcher)
}
// Keep the connection open
MessageResponse(c, &successResult{Success: true})
tmpbuf := make([]byte, 0)
var err error = nil
for err == nil {
_, err = c.Read(tmpbuf)
}
}

@ -25,6 +25,7 @@ type SQLiteStorage struct {
dbConn *sql.DB dbConn *sql.DB
mtx sync.Mutex mtx sync.Mutex
logger *log.Logger logger *log.Logger
storageWatchers []StorageWatcher
} }
/* /*
@ -47,6 +48,7 @@ func OpenSQLiteStorage(fname string, logger *log.Logger) (*SQLiteStorage, error)
} }
rs.logger = logger rs.logger = logger
rs.storageWatchers = make([]StorageWatcher, 0)
return rs, nil return rs, nil
} }
@ -373,6 +375,9 @@ func (ms *SQLiteStorage) SaveNewRequest(req *ProxyRequest) error {
return err return err
} }
tx.Commit() tx.Commit()
for _, watcher := range ms.storageWatchers {
watcher.NewRequestSaved(ms, req)
}
return nil return nil
} }
@ -449,6 +454,9 @@ func (ms *SQLiteStorage) UpdateRequest(req *ProxyRequest) error {
return err return err
} }
tx.Commit() tx.Commit()
for _, watcher := range ms.storageWatchers {
watcher.RequestUpdated(ms, req)
}
return nil return nil
} }
@ -638,6 +646,9 @@ func (ms *SQLiteStorage) DeleteRequest(reqid string) error {
return err return err
} }
tx.Commit() tx.Commit()
for _, watcher := range ms.storageWatchers {
watcher.RequestDeleted(ms, reqid)
}
return nil return nil
} }
@ -723,6 +734,9 @@ func (ms *SQLiteStorage) SaveNewResponse(rsp *ProxyResponse) error {
return err return err
} }
tx.Commit() tx.Commit()
for _, watcher := range ms.storageWatchers {
watcher.NewResponseSaved(ms, rsp)
}
return nil return nil
} }
@ -776,6 +790,9 @@ func (ms *SQLiteStorage) UpdateResponse(rsp *ProxyResponse) error {
return err return err
} }
tx.Commit() tx.Commit()
for _, watcher := range ms.storageWatchers {
watcher.ResponseUpdated(ms, rsp)
}
return nil return nil
} }
@ -912,6 +929,9 @@ func (ms *SQLiteStorage) DeleteResponse(rspid string) error {
return err return err
} }
tx.Commit() tx.Commit()
for _, watcher := range ms.storageWatchers {
watcher.ResponseDeleted(ms, rspid)
}
return nil return nil
} }
@ -970,6 +990,9 @@ func (ms *SQLiteStorage) SaveNewWSMessage(req *ProxyRequest, wsm *ProxyWSMessage
return err return err
} }
tx.Commit() tx.Commit()
for _, watcher := range ms.storageWatchers {
watcher.NewWSMessageSaved(ms, req, wsm)
}
return nil return nil
} }
@ -1047,6 +1070,9 @@ func (ms *SQLiteStorage) UpdateWSMessage(req *ProxyRequest, wsm *ProxyWSMessage)
return err return err
} }
tx.Commit() tx.Commit()
for _, watcher := range ms.storageWatchers {
watcher.WSMessageUpdated(ms, req, wsm)
}
return nil return nil
} }
@ -1225,6 +1251,9 @@ func (ms *SQLiteStorage) DeleteWSMessage(wsmid string) error {
return err return err
} }
tx.Commit() tx.Commit()
for _, watcher := range ms.storageWatchers {
watcher.WSMessageDeleted(ms, wsmid)
}
return nil return nil
} }
@ -1602,3 +1631,23 @@ func (ms *SQLiteStorage) allSavedQueries(tx *sql.Tx) ([]*SavedQuery, error) {
} }
return savedQueries, nil return savedQueries, nil
} }
func (ms *SQLiteStorage) Watch(watcher StorageWatcher) error {
ms.mtx.Lock()
defer ms.mtx.Unlock()
ms.storageWatchers = append(ms.storageWatchers, watcher)
return nil
}
func (ms *SQLiteStorage) EndWatch(watcher StorageWatcher) error {
ms.mtx.Lock()
var newWatched = make([]StorageWatcher, 0)
for _, testWatcher := range ms.storageWatchers {
if (testWatcher != watcher) {
newWatched = append(newWatched, testWatcher)
}
}
ms.storageWatchers = newWatched
ms.mtx.Unlock()
return nil
}

@ -61,6 +61,34 @@ type MessageStorage interface {
LoadQuery(name string) (MessageQuery, error) LoadQuery(name string) (MessageQuery, error)
// Delete a query by name from the storage // Delete a query by name from the storage
DeleteQuery(name string) error DeleteQuery(name string) error
// Add a storage watcher to make callbacks to on message saves
Watch(watcher StorageWatcher) error
// Remove a storage watcher from the storage
EndWatch(watcher StorageWatcher) error
}
type StorageWatcher interface {
// Callback for when a new request is saved
NewRequestSaved(ms MessageStorage, req *ProxyRequest)
// Callback for when a request is updated
RequestUpdated(ms MessageStorage, req *ProxyRequest)
// Callback for when a request is deleted
RequestDeleted(ms MessageStorage, DbId string)
// Callback for when a new response is saved
NewResponseSaved(ms MessageStorage, rsp *ProxyResponse)
// Callback for when a response is updated
ResponseUpdated(ms MessageStorage, rsp *ProxyResponse)
// Callback for when a response is deleted
ResponseDeleted(ms MessageStorage, DbId string)
// Callback for when a new wsmessage is saved
NewWSMessageSaved(ms MessageStorage, req *ProxyRequest, wsm *ProxyWSMessage)
// Callback for when a wsmessage is updated
WSMessageUpdated(ms MessageStorage, req *ProxyRequest, wsm *ProxyWSMessage)
// Callback for when a wsmessage is deleted
WSMessageDeleted(ms MessageStorage, DbId string)
} }
// An error to be returned if a query is not supported // An error to be returned if a query is not supported

Loading…
Cancel
Save