Version 0.2.6

master
Rob Glew 7 years ago
parent f93ebb818c
commit 9e6ac23f40
  1. 122
      proxy.go
  2. 194
      proxymessages.go
  3. 25
      schema.go
  4. 65
      sqlitestorage.go
  5. 4
      storage.go

@ -26,6 +26,39 @@ type savedStorage struct {
description string
}
type GlobalStorageWatcher interface {
// Callback for when a new request is saved
NewRequestSaved(storageId int, ms MessageStorage, req *ProxyRequest)
// Callback for when a request is updated
RequestUpdated(storageId int, ms MessageStorage, req *ProxyRequest)
// Callback for when a request is deleted
RequestDeleted(storageId int, ms MessageStorage, DbId string)
// Callback for when a new response is saved
NewResponseSaved(storageId int, ms MessageStorage, rsp *ProxyResponse)
// Callback for when a response is updated
ResponseUpdated(storageId int, ms MessageStorage, rsp *ProxyResponse)
// Callback for when a response is deleted
ResponseDeleted(storageId int, ms MessageStorage, DbId string)
// Callback for when a new wsmessage is saved
NewWSMessageSaved(storageId int, ms MessageStorage, req *ProxyRequest, wsm *ProxyWSMessage)
// Callback for when a wsmessage is updated
WSMessageUpdated(storageId int, ms MessageStorage, req *ProxyRequest, wsm *ProxyWSMessage)
// Callback for when a wsmessage is deleted
WSMessageDeleted(storageId int, ms MessageStorage, DbId string)
}
type globalWatcher struct {
watchers []GlobalStorageWatcher
}
type globalWatcherShim struct {
storageId int
globWatcher *globalWatcher
logger *log.Logger
}
// InterceptingProxy is a struct which represents a proxy which can intercept and modify HTTP and websocket messages
type InterceptingProxy struct {
slistener *ProxyListener
@ -54,6 +87,7 @@ type InterceptingProxy struct {
httpHandlers map[string]ProxyWebUIHandler
messageStorage map[int]*savedStorage
globWatcher *globalWatcher
}
// ProxyCredentials are a username/password combination used to represent an HTTP BasicAuth session
@ -111,6 +145,9 @@ func NewInterceptingProxy(logger *log.Logger) *InterceptingProxy {
iproxy.server = newProxyServer(useLogger, &iproxy)
iproxy.logger = useLogger
iproxy.httpHandlers = make(map[string]ProxyWebUIHandler)
iproxy.globWatcher = &globalWatcher{
watchers: make([]GlobalStorageWatcher, 0),
}
go func() {
iproxy.server.Serve(iproxy.slistener)
@ -189,6 +226,13 @@ func (iproxy *InterceptingProxy) AddMessageStorage(storage MessageStorage, descr
defer iproxy.mtx.Unlock()
id := getNextStorageId()
iproxy.messageStorage[id] = &savedStorage{storage, description}
shim := &globalWatcherShim{
storageId: id,
globWatcher: iproxy.globWatcher,
logger: iproxy.logger,
}
storage.Watch(shim)
return id
}
@ -492,6 +536,28 @@ func (iproxy *InterceptingProxy) RemoveWSInterceptor(sub *WSIntSub) {
}
}
// Add a global storage watcher
func (iproxy *InterceptingProxy) GlobalStorageWatch(watcher GlobalStorageWatcher) error {
iproxy.mtx.Lock()
defer iproxy.mtx.Unlock()
iproxy.globWatcher.watchers = append(iproxy.globWatcher.watchers, watcher)
return nil
}
// Remove a global storage watcher
func (iproxy *InterceptingProxy) GlobalStorageEndWatch(watcher GlobalStorageWatcher) error {
iproxy.mtx.Lock()
defer iproxy.mtx.Unlock()
var newWatched = make([]GlobalStorageWatcher, 0)
for _, testWatcher := range iproxy.globWatcher.watchers {
if (testWatcher != watcher) {
newWatched = append(newWatched, testWatcher)
}
}
iproxy.globWatcher.watchers = newWatched
return nil
}
// SetProxyStorage sets which storage should be used to store messages as they pass through the proxy
func (iproxy *InterceptingProxy) SetProxyStorage(storageId int) error {
iproxy.mtx.Lock()
@ -916,3 +982,59 @@ func newProxyServer(logger *log.Logger, iproxy *InterceptingProxy) *http.Server
}
return server
}
// StorageWatcher implementation
func (watcher *globalWatcherShim) NewRequestSaved(ms MessageStorage, req *ProxyRequest) {
for _, w := range watcher.globWatcher.watchers {
w.NewRequestSaved(watcher.storageId, ms, req)
}
}
func (watcher *globalWatcherShim) RequestUpdated(ms MessageStorage, req *ProxyRequest) {
for _, w := range watcher.globWatcher.watchers {
w.RequestUpdated(watcher.storageId, ms, req)
}
}
func (watcher *globalWatcherShim) RequestDeleted(ms MessageStorage, DbId string) {
for _, w := range watcher.globWatcher.watchers {
w.RequestDeleted(watcher.storageId, ms, DbId)
}
}
func (watcher *globalWatcherShim) NewResponseSaved(ms MessageStorage, rsp *ProxyResponse) {
for _, w := range watcher.globWatcher.watchers {
w.NewResponseSaved(watcher.storageId, ms, rsp)
}
}
func (watcher *globalWatcherShim) ResponseUpdated(ms MessageStorage, rsp *ProxyResponse) {
for _, w := range watcher.globWatcher.watchers {
w.ResponseUpdated(watcher.storageId, ms, rsp)
}
}
func (watcher *globalWatcherShim) ResponseDeleted(ms MessageStorage, DbId string) {
for _, w := range watcher.globWatcher.watchers {
w.ResponseDeleted(watcher.storageId, ms, DbId)
}
}
func (watcher *globalWatcherShim) NewWSMessageSaved(ms MessageStorage, req *ProxyRequest, wsm *ProxyWSMessage) {
for _, w := range watcher.globWatcher.watchers {
w.NewWSMessageSaved(watcher.storageId, ms, req, wsm)
}
}
func (watcher *globalWatcherShim) WSMessageUpdated(ms MessageStorage, req *ProxyRequest, wsm *ProxyWSMessage) {
for _, w := range watcher.globWatcher.watchers {
w.WSMessageUpdated(watcher.storageId, ms, req, wsm)
}
}
func (watcher *globalWatcherShim) WSMessageDeleted(ms MessageStorage, DbId string) {
for _, w := range watcher.globWatcher.watchers {
w.WSMessageDeleted(watcher.storageId, ms, DbId)
}
}

@ -55,6 +55,8 @@ func NewProxyMessageListener(logger *log.Logger, iproxy *InterceptingProxy) *Mes
l.AddHandler("liststorage", listProxyStorageHandler)
l.AddHandler("setproxy", setProxyHandler)
l.AddHandler("watchstorage", watchStorageHandler)
l.AddHandler("setpluginvalue", setPluginValueHandler)
l.AddHandler("getpluginvalue", getPluginValueHandler)
return l
}
@ -1949,13 +1951,11 @@ func setProxyHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *Intercept
WatchStorage
*/
type watchStorageMessage struct {
StorageId int
HeadersOnly bool
}
type proxyMsgStorageWatcher struct {
connMtx sync.Mutex
storageId int
headersOnly bool
conn net.Conn
}
@ -1971,77 +1971,95 @@ type storageUpdateResponse struct {
// Implement watcher
func (sw *proxyMsgStorageWatcher) NewRequestSaved(ms MessageStorage, req *ProxyRequest) {
var msgRsp storageUpdateResponse
func (sw *proxyMsgStorageWatcher) NewRequestSaved(storageId int, ms MessageStorage, req *ProxyRequest) {
sw.connMtx.Lock()
defer sw.connMtx.Unlock()
msgRsp := &storageUpdateResponse{}
msgRsp.Request = NewRequestJSON(req, sw.headersOnly)
msgRsp.Action = "NewRequest"
msgRsp.StorageId = sw.storageId
msgRsp.StorageId = storageId
MessageResponse(sw.conn, msgRsp)
}
func (sw *proxyMsgStorageWatcher) RequestUpdated(ms MessageStorage, req *ProxyRequest) {
var msgRsp storageUpdateResponse
func (sw *proxyMsgStorageWatcher) RequestUpdated(storageId int, ms MessageStorage, req *ProxyRequest) {
sw.connMtx.Lock()
defer sw.connMtx.Unlock()
msgRsp := &storageUpdateResponse{}
msgRsp.Request = NewRequestJSON(req, sw.headersOnly)
msgRsp.Action = "RequestUpdated"
msgRsp.StorageId = sw.storageId
msgRsp.StorageId = storageId
MessageResponse(sw.conn, msgRsp)
}
func (sw *proxyMsgStorageWatcher) RequestDeleted(ms MessageStorage, DbId string) {
var msgRsp storageUpdateResponse
func (sw *proxyMsgStorageWatcher) RequestDeleted(storageId int, ms MessageStorage, DbId string) {
sw.connMtx.Lock()
defer sw.connMtx.Unlock()
msgRsp := &storageUpdateResponse{}
msgRsp.Action = "RequestDeleted"
msgRsp.MessageId = DbId
msgRsp.StorageId = sw.storageId
msgRsp.StorageId = storageId
MessageResponse(sw.conn, msgRsp)
}
func (sw *proxyMsgStorageWatcher) NewResponseSaved(ms MessageStorage, rsp *ProxyResponse) {
var msgRsp storageUpdateResponse
func (sw *proxyMsgStorageWatcher) NewResponseSaved(storageId int, ms MessageStorage, rsp *ProxyResponse) {
sw.connMtx.Lock()
defer sw.connMtx.Unlock()
msgRsp := &storageUpdateResponse{}
msgRsp.Response = NewResponseJSON(rsp, sw.headersOnly)
msgRsp.Action = "NewResponse"
msgRsp.StorageId = sw.storageId
msgRsp.StorageId = storageId
MessageResponse(sw.conn, msgRsp)
}
func (sw *proxyMsgStorageWatcher) ResponseUpdated(ms MessageStorage, rsp *ProxyResponse) {
var msgRsp storageUpdateResponse
func (sw *proxyMsgStorageWatcher) ResponseUpdated(storageId int, ms MessageStorage, rsp *ProxyResponse) {
sw.connMtx.Lock()
defer sw.connMtx.Unlock()
msgRsp := &storageUpdateResponse{}
msgRsp.Response = NewResponseJSON(rsp, sw.headersOnly)
msgRsp.Action = "ResponseUpdated"
msgRsp.StorageId = sw.storageId
msgRsp.StorageId = storageId
MessageResponse(sw.conn, msgRsp)
}
func (sw *proxyMsgStorageWatcher) ResponseDeleted(ms MessageStorage, DbId string) {
var msgRsp storageUpdateResponse
func (sw *proxyMsgStorageWatcher) ResponseDeleted(storageId int, ms MessageStorage, DbId string) {
sw.connMtx.Lock()
defer sw.connMtx.Unlock()
msgRsp := &storageUpdateResponse{}
msgRsp.Action = "ResponseDeleted"
msgRsp.MessageId = DbId
msgRsp.StorageId = sw.storageId
msgRsp.StorageId = storageId
MessageResponse(sw.conn, msgRsp)
}
func (sw *proxyMsgStorageWatcher) NewWSMessageSaved(ms MessageStorage, req *ProxyRequest, wsm *ProxyWSMessage) {
var msgRsp storageUpdateResponse
func (sw *proxyMsgStorageWatcher) NewWSMessageSaved(storageId int, ms MessageStorage, req *ProxyRequest, wsm *ProxyWSMessage) {
sw.connMtx.Lock()
defer sw.connMtx.Unlock()
msgRsp := &storageUpdateResponse{}
msgRsp.Request = NewRequestJSON(req, sw.headersOnly)
msgRsp.WSMessage = NewWSMessageJSON(wsm)
msgRsp.Action = "NewWSMessage"
msgRsp.StorageId = sw.storageId
msgRsp.StorageId = storageId
MessageResponse(sw.conn, msgRsp)
}
func (sw *proxyMsgStorageWatcher) WSMessageUpdated(ms MessageStorage, req *ProxyRequest, wsm *ProxyWSMessage) {
var msgRsp storageUpdateResponse
func (sw *proxyMsgStorageWatcher) WSMessageUpdated(storageId int, ms MessageStorage, req *ProxyRequest, wsm *ProxyWSMessage) {
sw.connMtx.Lock()
defer sw.connMtx.Unlock()
msgRsp := &storageUpdateResponse{}
msgRsp.Request = NewRequestJSON(req, sw.headersOnly)
msgRsp.WSMessage = NewWSMessageJSON(wsm)
msgRsp.Action = "WSMessageUpdated"
msgRsp.StorageId = sw.storageId
msgRsp.StorageId = storageId
MessageResponse(sw.conn, msgRsp)
}
func (sw *proxyMsgStorageWatcher) WSMessageDeleted(ms MessageStorage, DbId string) {
func (sw *proxyMsgStorageWatcher) WSMessageDeleted(storageId int, ms MessageStorage, DbId string) {
sw.connMtx.Lock()
defer sw.connMtx.Unlock()
var msgRsp storageUpdateResponse
msgRsp.Action = "WSMessageDeleted"
msgRsp.MessageId = DbId
msgRsp.StorageId = sw.storageId
msgRsp.StorageId = storageId
MessageResponse(sw.conn, msgRsp)
}
@ -2054,36 +2072,13 @@ func watchStorageHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *Inter
return
}
// Parse storageId
storages := make([]*SavedStorage, 0)
if mreq.StorageId == -1 {
storages = iproxy.ListMessageStorage()
} else {
ms, desc := iproxy.GetMessageStorage(mreq.StorageId)
if ms == nil {
ErrorResponse(c, "invalid storage id")
return
}
storage := &SavedStorage{
Id: mreq.StorageId,
Storage: ms,
Description: desc,
}
storages = append(storages, storage)
}
// Create the watchers
for _, storage := range storages {
watcher := &proxyMsgStorageWatcher{
storageId: storage.Id,
headersOnly: mreq.HeadersOnly,
conn: c,
}
// Apply the watcher, kill them at end of connection
storage.Storage.Watch(watcher)
defer storage.Storage.EndWatch(watcher)
// add global watcher
watcher := &proxyMsgStorageWatcher{
headersOnly: mreq.HeadersOnly,
conn: c,
}
iproxy.GlobalStorageWatch(watcher)
defer iproxy.GlobalStorageEndWatch(watcher)
// Keep the connection open
MessageResponse(c, &successResult{Success: true})
@ -2094,3 +2089,88 @@ func watchStorageHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *Inter
}
}
/*
SetPluginValue and GetPluginValue
*/
type setPluginValueMessage struct {
Key string
Value string
Storage int
}
func setPluginValueHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *InterceptingProxy) {
mreq := setPluginValueMessage{}
if err := json.Unmarshal(b, &mreq); err != nil {
ErrorResponse(c, "error parsing message")
return
}
if mreq.Storage == 0 {
ErrorResponse(c, "storage is required")
return
}
storage, _ := iproxy.GetMessageStorage(mreq.Storage)
if storage == nil {
ErrorResponse(c, fmt.Sprintf("storage with id %d does not exist", mreq.Storage))
return
}
if mreq.Key == "" {
ErrorResponse(c, "key value is required")
return
}
err := storage.SetPluginValue(mreq.Key, mreq.Value)
if err != nil {
ErrorResponse(c, fmt.Sprintf("error saving value: %s", err.Error()))
return
}
MessageResponse(c, &successResult{Success: true})
}
type getPluginValueMessage struct {
Key string
Storage int
}
type getPluginValueResponse struct {
Value string
Success bool
}
func getPluginValueHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *InterceptingProxy) {
mreq := getPluginValueMessage{}
if err := json.Unmarshal(b, &mreq); err != nil {
ErrorResponse(c, "error parsing message")
return
}
if mreq.Storage == 0 {
ErrorResponse(c, "storage is required")
return
}
storage, _ := iproxy.GetMessageStorage(mreq.Storage)
if storage == nil {
ErrorResponse(c, fmt.Sprintf("storage with id %d does not exist", mreq.Storage))
return
}
if mreq.Key == "" {
ErrorResponse(c, "key value is required")
return
}
value, err := storage.GetPluginValue(mreq.Key)
if err != nil {
ErrorResponse(c, fmt.Sprintf("error getting value: %s", err.Error()))
return
}
MessageResponse(c, &getPluginValueResponse{Value: value, Success: true})
}

@ -19,6 +19,7 @@ type tableNameRow struct {
var schemaUpdaters = []schemaUpdater{
schema8,
schema9,
schema10,
}
func UpdateSchema(db *sql.DB, logger *log.Logger) error {
@ -543,3 +544,27 @@ func schema9(tx *sql.Tx) error {
}
return nil
}
func schema10(tx *sql.Tx) error {
/*
Create a "plugin data" table to let applications store app-specific data in the datafile
*/
cmds := []string{`
CREATE TABLE plugin_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
key TEXT UNIQUE,
value STRING
);
CREATE INDEX plugin_key_ind ON plugin_data(key);
`,
`UPDATE schema_meta SET version=10`,
}
err := executeMultiple(tx, cmds)
if err != nil {
return err
}
return nil
}

@ -1651,3 +1651,68 @@ func (ms *SQLiteStorage) EndWatch(watcher StorageWatcher) error {
ms.mtx.Unlock()
return nil
}
func (ms *SQLiteStorage) SetPluginValue(key string, value string) error {
ms.mtx.Lock()
defer ms.mtx.Unlock()
tx, err := ms.dbConn.Begin()
if err != nil {
return err
}
err = ms.setPluginValue(tx, key, value)
if err != nil {
tx.Rollback()
return err
}
tx.Commit()
return nil
}
func (ms *SQLiteStorage) setPluginValue(tx *sql.Tx, key string, value string) error {
stmt, err := tx.Prepare(`
INSERT OR REPLACE INTO plugin_data (
key,
value
) VALUES (?, ?);
`)
if err != nil {
return fmt.Errorf("error preparing statement to insert request into database: %s", err.Error())
}
defer stmt.Close()
_, err = stmt.Exec(key, value)
if err != nil {
return fmt.Errorf("error inserting plugin data into database: %s", err.Error())
}
return nil
}
func (ms *SQLiteStorage) GetPluginValue(key string) (string, error) {
ms.mtx.Lock()
defer ms.mtx.Unlock()
tx, err := ms.dbConn.Begin()
if err != nil {
return "", err
}
value, err := ms.getPluginValue(tx, key)
if err != nil {
tx.Rollback()
return "", err
}
tx.Commit()
return value, nil
}
func (ms *SQLiteStorage) getPluginValue(tx *sql.Tx, key string) (string, error) {
var value sql.NullString
err := tx.QueryRow(`SELECT value FROM plugin_data WHERE key=?`, key).Scan(
&value,
)
if err == sql.ErrNoRows {
return "", fmt.Errorf("plugin data with key %s does not exist", key)
} else if err != nil {
return "", fmt.Errorf("error loading data from datafile: %s", err.Error())
}
return value.String, nil
}

@ -66,6 +66,10 @@ type MessageStorage interface {
Watch(watcher StorageWatcher) error
// Remove a storage watcher from the storage
EndWatch(watcher StorageWatcher) error
// Set/get plugin values
SetPluginValue(key string, value string) error
GetPluginValue(key string) (string, error)
}
type StorageWatcher interface {

Loading…
Cancel
Save