This is a fork of: https://github.com/roglew/puppy
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

2176 lines
49 KiB

package puppy
import (
"bufio"
"bytes"
"crypto/tls"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/gorilla/websocket"
)
// Creates a MessageListener that implements the default puppy handlers. See the message docs for more info
func NewProxyMessageListener(logger *log.Logger, iproxy *InterceptingProxy) *MessageListener {
l := NewMessageListener(logger, iproxy)
l.AddHandler("ping", pingHandler)
l.AddHandler("submit", submitHandler)
l.AddHandler("savenew", saveNewHandler)
l.AddHandler("storagequery", storageQueryHandler)
l.AddHandler("validatequery", validateQueryHandler)
l.AddHandler("checkrequest", checkRequestHandler)
l.AddHandler("setscope", setScopeHandler)
l.AddHandler("viewscope", viewScopeHandler)
l.AddHandler("addtag", addTagHandler)
l.AddHandler("removetag", removeTagHandler)
l.AddHandler("cleartag", clearTagHandler)
l.AddHandler("intercept", interceptHandler)
l.AddHandler("allsavedqueries", allSavedQueriesHandler)
l.AddHandler("savequery", saveQueryHandler)
l.AddHandler("loadquery", loadQueryHandler)
l.AddHandler("deletequery", deleteQueryHandler)
l.AddHandler("addlistener", addListenerHandler)
l.AddHandler("removelistener", removeListenerHandler)
l.AddHandler("getlisteners", getListenersHandler)
l.AddHandler("loadcerts", loadCertificatesHandler)
l.AddHandler("setcerts", setCertificatesHandler)
l.AddHandler("clearcerts", clearCertificatesHandler)
l.AddHandler("gencerts", generateCertificatesHandler)
l.AddHandler("genpemcerts", generatePEMCertificatesHandler)
l.AddHandler("addsqlitestorage", addSQLiteStorageHandler)
l.AddHandler("addinmemorystorage", addInMemoryStorageHandler)
l.AddHandler("closestorage", closeStorageHandler)
l.AddHandler("setproxystorage", setProxyStorageHandler)
l.AddHandler("liststorage", listProxyStorageHandler)
l.AddHandler("setproxy", setProxyHandler)
l.AddHandler("watchstorage", watchStorageHandler)
l.AddHandler("setpluginvalue", setPluginValueHandler)
l.AddHandler("getpluginvalue", getPluginValueHandler)
return l
}
// JSON data representing a ProxyRequest
type RequestJSON struct {
DestHost string
DestPort int
UseTLS bool
Method string
Path string
ProtoMajor int
ProtoMinor int
Headers map[string][]string
Body string
Tags []string
StartTime int64 `json:"StartTime,omitempty"`
EndTime int64 `json:"EndTime,omitempty"`
Unmangled *RequestJSON `json:"Unmangled,omitempty"`
Response *ResponseJSON `json:"Response,omitempty"`
WSMessages []*WSMessageJSON `json:"WSMessages,omitempty"`
DbId string `json:"DbId,omitempty"`
}
// JSON data representing a ProxyResponse
type ResponseJSON struct {
ProtoMajor int
ProtoMinor int
StatusCode int
Reason string
Headers map[string][]string
Body string
Unmangled *ResponseJSON `json:"Unmangled,omitempty"`
DbId string
}
// JSON data representing a ProxyWSMessage
type WSMessageJSON struct {
Message string
IsBinary bool
ToServer bool
Timestamp int64
Unmangled *WSMessageJSON `json:"Unmangled,omitempty"`
DbId string
}
// Check that the RequestJSON contains valid data
func (reqd *RequestJSON) Validate() error {
if reqd.DestHost == "" {
return errors.New("request is missing target host")
}
if reqd.DestPort == 0 {
return errors.New("request is missing target port")
}
return nil
}
// Convert RequestJSON into a ProxyRequest
func (reqd *RequestJSON) Parse() (*ProxyRequest, error) {
if err := reqd.Validate(); err != nil {
return nil, err
}
dataBuf := new(bytes.Buffer)
statusLine := fmt.Sprintf("%s %s HTTP/%d.%d", reqd.Method, reqd.Path, reqd.ProtoMajor, reqd.ProtoMinor)
dataBuf.Write([]byte(statusLine))
dataBuf.Write([]byte("\r\n"))
for k, vs := range reqd.Headers {
for _, v := range vs {
if strings.ToLower(k) != "content-length" {
dataBuf.Write([]byte(k))
dataBuf.Write([]byte(": "))
dataBuf.Write([]byte(v))
dataBuf.Write([]byte("\r\n"))
}
}
}
body, err := base64.StdEncoding.DecodeString(reqd.Body)
if err != nil {
return nil, err
}
dataBuf.Write([]byte("Content-Length"))
dataBuf.Write([]byte(": "))
dataBuf.Write([]byte(strconv.Itoa(len(body))))
dataBuf.Write([]byte("\r\n\r\n"))
dataBuf.Write(body)
req, err := ProxyRequestFromBytes(dataBuf.Bytes(), reqd.DestHost, reqd.DestPort, reqd.UseTLS)
if err != nil {
return nil, err
}
if req.Host == "" {
req.Host = reqd.DestHost
}
if reqd.StartTime > 0 {
req.StartDatetime = time.Unix(0, reqd.StartTime)
}
if reqd.EndTime > 0 {
req.EndDatetime = time.Unix(0, reqd.EndTime)
}
for _, tag := range reqd.Tags {
req.AddTag(tag)
}
if reqd.Response != nil {
rsp, err := reqd.Response.Parse()
if err != nil {
return nil, err
}
req.ServerResponse = rsp
}
for _, wsmd := range reqd.WSMessages {
wsm, err := wsmd.Parse()
if err != nil {
return nil, err
}
req.WSMessages = append(req.WSMessages, wsm)
}
sort.Sort(WSSort(req.WSMessages))
return req, nil
}
// Convert a ProxyRequest into JSON data. If headersOnly is true, the JSON data will only contain the headers and metadata of the message
func NewRequestJSON(req *ProxyRequest, headersOnly bool) *RequestJSON {
newHeaders := make(map[string][]string)
for k, vs := range req.Header {
for _, v := range vs {
l, ok := newHeaders[k]
if ok {
newHeaders[k] = append(l, v)
} else {
newHeaders[k] = make([]string, 1)
newHeaders[k][0] = v
}
}
}
var unmangled *RequestJSON = nil
if req.Unmangled != nil {
unmangled = NewRequestJSON(req.Unmangled, headersOnly)
}
var rsp *ResponseJSON = nil
if req.ServerResponse != nil {
rsp = NewResponseJSON(req.ServerResponse, headersOnly)
}
wsms := make([]*WSMessageJSON, 0)
for _, wsm := range req.WSMessages {
wsms = append(wsms, NewWSMessageJSON(wsm))
}
ret := &RequestJSON{
DestHost: req.DestHost,
DestPort: req.DestPort,
UseTLS: req.DestUseTLS,
Method: req.Method,
Path: req.HTTPPath(),
ProtoMajor: req.ProtoMajor,
ProtoMinor: req.ProtoMinor,
Headers: newHeaders,
Tags: req.Tags(),
StartTime: req.StartDatetime.UnixNano(),
EndTime: req.EndDatetime.UnixNano(),
Unmangled: unmangled,
Response: rsp,
WSMessages: wsms,
DbId: req.DbId,
}
if !headersOnly {
ret.Body = base64.StdEncoding.EncodeToString(req.BodyBytes())
}
return ret
}
// Ensure that response JSON data is valid
func (rspd *ResponseJSON) Validate() error {
return nil
}
// Convert response JSON data into a ProxyResponse
func (rspd *ResponseJSON) Parse() (*ProxyResponse, error) {
if err := rspd.Validate(); err != nil {
return nil, err
}
dataBuf := new(bytes.Buffer)
statusLine := fmt.Sprintf("HTTP/%d.%d %03d %s", rspd.ProtoMajor, rspd.ProtoMinor, rspd.StatusCode, rspd.Reason)
dataBuf.Write([]byte(statusLine))
dataBuf.Write([]byte("\r\n"))
for k, vs := range rspd.Headers {
for _, v := range vs {
if strings.ToLower(k) != "content-length" {
dataBuf.Write([]byte(k))
dataBuf.Write([]byte(": "))
dataBuf.Write([]byte(v))
dataBuf.Write([]byte("\r\n"))
}
}
}
body, err := base64.StdEncoding.DecodeString(rspd.Body)
if err != nil {
return nil, err
}
dataBuf.Write([]byte("Content-Length"))
dataBuf.Write([]byte(": "))
dataBuf.Write([]byte(strconv.Itoa(len(rspd.Body))))
dataBuf.Write([]byte("\r\n\r\n"))
dataBuf.Write(body)
rsp, err := ProxyResponseFromBytes(dataBuf.Bytes())
if err != nil {
return nil, err
}
if rspd.Unmangled != nil {
ursp, err := rspd.Unmangled.Parse()
if err != nil {
return nil, err
}
rsp.Unmangled = ursp
}
return rsp, nil
}
// Serialize a ProxyResponse into JSON data. If headersOnly is true, the JSON data will only contain the headers and metadata of the message
func NewResponseJSON(rsp *ProxyResponse, headersOnly bool) *ResponseJSON {
newHeaders := make(map[string][]string)
for k, vs := range rsp.Header {
for _, v := range vs {
l, ok := newHeaders[k]
if ok {
newHeaders[k] = append(l, v)
} else {
newHeaders[k] = make([]string, 1)
newHeaders[k][0] = v
}
}
}
var unmangled *ResponseJSON = nil
if rsp.Unmangled != nil {
unmangled = NewResponseJSON(rsp.Unmangled, headersOnly)
}
ret := &ResponseJSON{
ProtoMajor: rsp.ProtoMajor,
ProtoMinor: rsp.ProtoMinor,
StatusCode: rsp.StatusCode,
Reason: rsp.HTTPStatus(),
Headers: newHeaders,
DbId: rsp.DbId,
Unmangled: unmangled,
}
if !headersOnly {
ret.Body = base64.StdEncoding.EncodeToString(rsp.BodyBytes())
}
return ret
}
// Parse websocket message JSON data into a ProxyWSMEssage
func (wsmd *WSMessageJSON) Parse() (*ProxyWSMessage, error) {
var Direction int
if wsmd.ToServer {
Direction = ToServer
} else {
Direction = ToClient
}
var mtype int
if wsmd.IsBinary {
mtype = websocket.BinaryMessage
} else {
mtype = websocket.TextMessage
}
message, err := base64.StdEncoding.DecodeString(wsmd.Message)
if err != nil {
return nil, err
}
var unmangled *ProxyWSMessage
if wsmd.Unmangled != nil {
unmangled, err = wsmd.Unmangled.Parse()
if err != nil {
return nil, err
}
}
retData := &ProxyWSMessage{
Message: message,
Type: mtype,
Direction: Direction,
Timestamp: time.Unix(0, wsmd.Timestamp),
Unmangled: unmangled,
DbId: wsmd.DbId,
}
return retData, nil
}
// Serialize a websocket message into JSON data
func NewWSMessageJSON(wsm *ProxyWSMessage) *WSMessageJSON {
isBinary := false
if wsm.Type == websocket.BinaryMessage {
isBinary = true
}
toServer := false
if wsm.Direction == ToServer {
toServer = true
}
var unmangled *WSMessageJSON
if wsm.Unmangled != nil {
unmangled = NewWSMessageJSON(wsm.Unmangled)
}
ret := &WSMessageJSON{
Message: base64.StdEncoding.EncodeToString(wsm.Message),
IsBinary: isBinary,
ToServer: toServer,
Timestamp: wsm.Timestamp.UnixNano(),
Unmangled: unmangled,
DbId: wsm.DbId,
}
return ret
}
// Clears metadata (start/end time, DbId) and dependent message data (response, websocket messages, and unmangled versions) from the RequestJSON
func CleanReqJSON(req *RequestJSON) {
req.StartTime = 0
req.EndTime = 0
req.Unmangled = nil
req.Response = nil
req.WSMessages = nil
req.DbId = ""
}
// Clears metadata (DbId) and dependent message data (unmangled version) from the ResponseJSON
func CleanRspJSON(rsp *ResponseJSON) {
rsp.Unmangled = nil
rsp.DbId = ""
}
// Clears metadata (timestamp, DbId) and dependent message data (unmangled version) from the WSMessageJSON
func CleanWSJSON(wsm *WSMessageJSON) {
wsm.Timestamp = 0
wsm.Unmangled = nil
wsm.DbId = ""
}
type successResult struct {
Success bool
}
/*
Ping
*/
type pingMessage struct{}
type pingResponse struct {
Success bool
Ping string
}
func pingHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *InterceptingProxy) {
rsp := pingResponse{Success: true, Ping: "Pong"}
MessageResponse(c, rsp)
}
/*
Submit
*/
type submitMessage struct {
Request *RequestJSON
Storage int
}
type submitResponse struct {
Success bool
SubmittedRequest *RequestJSON
}
func submitHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *InterceptingProxy) {
mreq := submitMessage{}
if err := json.Unmarshal(b, &mreq); err != nil {
ErrorResponse(c, fmt.Sprintf("error parsing submit message: %s", err.Error()))
return
}
req, err := mreq.Request.Parse()
if err != nil {
ErrorResponse(c, fmt.Sprintf("error parsing http request: %s", err.Error()))
return
}
if mreq.Storage > 0 {
storage, _ := iproxy.GetMessageStorage(mreq.Storage)
if storage == nil {
ErrorResponse(c, fmt.Sprintf("storage with id %d does not exist", mreq.Storage))
return
}
SaveNewRequest(storage, req)
}
logger.Println("Submitting request to", req.FullURL(), "...")
if err := iproxy.SubmitRequest(req); err != nil {
ErrorResponse(c, err.Error())
return
}
if mreq.Storage > 0 {
storage, _ := iproxy.GetMessageStorage(mreq.Storage)
if storage == nil {
ErrorResponse(c, fmt.Sprintf("storage with id %d does not exist", mreq.Storage))
return
}
UpdateRequest(storage, req)
}
result := NewRequestJSON(req, false)
response := submitResponse{Success: true, SubmittedRequest: result}
MessageResponse(c, response)
}
/*
SaveNew
*/
type saveNewMessage struct {
Request *RequestJSON
Storage int
}
type saveNewResponse struct {
Success bool
DbId string
}
func saveNewHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *InterceptingProxy) {
mreq := submitMessage{}
if err := json.Unmarshal(b, &mreq); err != nil {
ErrorResponse(c, fmt.Sprintf("error parsing submit message: %s", err.Error()))
return
}
if mreq.Storage == 0 {
ErrorResponse(c, "storage is required")
return
}
req, err := mreq.Request.Parse()
if err != nil {
ErrorResponse(c, fmt.Sprintf("error parsing http request: %s", err.Error()))
return
}
storage, _ := iproxy.GetMessageStorage(mreq.Storage)
if storage == nil {
ErrorResponse(c, fmt.Sprintf("storage with id %d does not exist", mreq.Storage))
return
}
err = SaveNewRequest(storage, req)
if err != nil {
ErrorResponse(c, fmt.Sprintf("error saving http request: %s", err.Error()))
return
}
response := &saveNewResponse{
Success: true,
DbId: req.DbId,
}
MessageResponse(c, response)
}
/*
QueryRequests
*/
type storageQueryMessage struct {
Query StrMessageQuery
HeadersOnly bool
MaxResults int64
Storage int
}
type storageQueryResult struct {
Success bool
Results []*RequestJSON
}
func storageQueryHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *InterceptingProxy) {
mreq := storageQueryMessage{
Query: nil,
HeadersOnly: false,
MaxResults: 0,
}
if err := json.Unmarshal(b, &mreq); err != nil {
ErrorResponse(c, "error parsing query message")
return
}
if mreq.Query == nil {
ErrorResponse(c, "query is required")
return
}
if mreq.Storage == 0 {
ErrorResponse(c, "storage is required")
return
}
storage, _ := iproxy.GetMessageStorage(mreq.Storage)
if storage == nil {
ErrorResponse(c, fmt.Sprintf("storage with id %d does not exist", mreq.Storage))
return
}
var searchResults []*ProxyRequest
if len(mreq.Query) == 1 && len(mreq.Query[0]) == 1 {
args, err := CheckArgsStrToGo(mreq.Query[0][0])
if err != nil {
ErrorResponse(c, err.Error())
return
}
logger.Println("Search query is one phrase, sending directly to storage...")
searchResults, err = storage.Search(mreq.MaxResults, args...)
if err != nil {
ErrorResponse(c, err.Error())
return
}
} else {
logger.Println("Search query is multple sets of arguments, creating checker and checking naively...")
goQuery, err := StrQueryToMsgQuery(mreq.Query)
if err != nil {
ErrorResponse(c, err.Error())
return
}
checker, err := CheckerFromMessageQuery(goQuery)
if err != nil {
ErrorResponse(c, err.Error())
return
}
searchResults, err = storage.CheckRequests(mreq.MaxResults, checker)
if err != nil {
ErrorResponse(c, err.Error())
return
}
}
var result storageQueryResult
reqResults := make([]*RequestJSON, len(searchResults))
for i, req := range searchResults {
reqResults[i] = NewRequestJSON(req, mreq.HeadersOnly)
}
result.Success = true
result.Results = reqResults
MessageResponse(c, &result)
}
/*
ValidateQuery
*/
type validateQueryMessage struct {
Query StrMessageQuery
}
func validateQueryHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *InterceptingProxy) {
mreq := validateQueryMessage{}
if err := json.Unmarshal(b, &mreq); err != nil {
ErrorResponse(c, "error parsing query message")
return
}
goQuery, err := StrQueryToMsgQuery(mreq.Query)
if err != nil {
ErrorResponse(c, err.Error())
return
}
_, err = CheckerFromMessageQuery(goQuery)
if err != nil {
ErrorResponse(c, err.Error())
return
}
MessageResponse(c, &successResult{Success: true})
}
/*
CheckRequest
*/
type checkRequestMessage struct {
Query StrMessageQuery
Request *RequestJSON
DbId string
StorageId int
}
type checkRequestResponse struct {
Result bool
Success bool
}
func checkRequestHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *InterceptingProxy) {
mreq := checkRequestMessage{}
if err := json.Unmarshal(b, &mreq); err != nil {
ErrorResponse(c, "error parsing query message")
return
}
var req *ProxyRequest
var err error
if mreq.DbId != "" {
storage, _ := iproxy.GetMessageStorage(mreq.StorageId)
if storage == nil {
ErrorResponse(c, fmt.Sprintf("storage with id %d does not exist", mreq.StorageId))
return
}
searchResults, err := storage.Search(1, FieldId, StrIs, mreq.DbId)
if err != nil {
ErrorResponse(c, err.Error())
return
}
if len(searchResults) == 0 {
ErrorResponse(c, fmt.Sprintf("message with id=%s does not exist", mreq.DbId))
return
}
req = searchResults[0]
} else {
req, err = mreq.Request.Parse()
if err != nil {
ErrorResponse(c, fmt.Sprintf("error parsing http request: %s", err.Error()))
return
}
}
goQuery, err := StrQueryToMsgQuery(mreq.Query)
if err != nil {
ErrorResponse(c, err.Error())
return
}
checker, err := CheckerFromMessageQuery(goQuery)
if err != nil {
ErrorResponse(c, err.Error())
return
}
result := &checkRequestResponse{
Success: true,
Result: checker(req),
}
MessageResponse(c, result)
}
/*
SetScope
*/
type setScopeMessage struct {
Query StrMessageQuery
}
func setScopeHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *InterceptingProxy) {
mreq := setScopeMessage{}
if err := json.Unmarshal(b, &mreq); err != nil {
ErrorResponse(c, "error parsing query message")
return
}
goQuery, err := StrQueryToMsgQuery(mreq.Query)
if err != nil {
ErrorResponse(c, err.Error())
return
}
err = iproxy.SetScopeQuery(goQuery)
if err != nil {
ErrorResponse(c, err.Error())
return
}
MessageResponse(c, &successResult{Success: true})
}
/*
ViewScope
*/
type viewScopeMessage struct {
}
type viewScopeResult struct {
Success bool
IsCustom bool
Query StrMessageQuery
}
func viewScopeHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *InterceptingProxy) {
scopeQuery := iproxy.GetScopeQuery()
scopeChecker := iproxy.GetScopeChecker()
if scopeQuery == nil && scopeChecker != nil {
MessageResponse(c, &viewScopeResult{
Success: true,
IsCustom: true,
})
return
}
var err error
strQuery, err := MsgQueryToStrQuery(scopeQuery)
if err != nil {
ErrorResponse(c, err.Error())
return
}
MessageResponse(c, &viewScopeResult{
Success: true,
IsCustom: false,
Query: strQuery,
})
}
/*
Tag messages
*/
type addTagMessage struct {
ReqId string
Tag string
Storage int
}
func addTagHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *InterceptingProxy) {
mreq := addTagMessage{}
if err := json.Unmarshal(b, &mreq); err != nil {
ErrorResponse(c, fmt.Sprintf("error parsing message: %s", err.Error()))
return
}
if mreq.Storage == 0 {
ErrorResponse(c, "storage is required")
return
}
storage, _ := iproxy.GetMessageStorage(mreq.Storage)
if storage == nil {
ErrorResponse(c, fmt.Sprintf("storage with id %d does not exist", mreq.Storage))
return
}
if mreq.ReqId == "" || mreq.Tag == "" {
ErrorResponse(c, "both request id and tag are required")
return
}
req, err := storage.LoadRequest(mreq.ReqId)
if err != nil {
ErrorResponse(c, fmt.Sprintf("error loading request: %s", err.Error()))
return
}
req.AddTag(mreq.Tag)
err = UpdateRequest(storage, req)
if err != nil {
ErrorResponse(c, fmt.Sprintf("error saving request: %s", err.Error()))
return
}
MessageResponse(c, &successResult{Success: true})
}
type removeTagMessage struct {
ReqId string
Tag string
Storage int
}
func removeTagHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *InterceptingProxy) {
mreq := removeTagMessage{}
if err := json.Unmarshal(b, &mreq); err != nil {
ErrorResponse(c, fmt.Sprintf("error parsing message: %s", err.Error()))
return
}
if mreq.Storage == 0 {
ErrorResponse(c, "storage is required")
return
}
storage, _ := iproxy.GetMessageStorage(mreq.Storage)
if storage == nil {
ErrorResponse(c, fmt.Sprintf("storage with id %d does not exist", mreq.Storage))
return
}
if mreq.ReqId == "" || mreq.Tag == "" {
ErrorResponse(c, "both request id and tag are required")
return
}
req, err := storage.LoadRequest(mreq.ReqId)
if err != nil {
ErrorResponse(c, fmt.Sprintf("error loading request: %s", err.Error()))
return
}
req.RemoveTag(mreq.Tag)
err = UpdateRequest(storage, req)
if err != nil {
ErrorResponse(c, fmt.Sprintf("error saving request: %s", err.Error()))
return
}
MessageResponse(c, &successResult{Success: true})
}
type clearTagsMessage struct {
ReqId string
Storage int
}
func clearTagHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *InterceptingProxy) {
mreq := clearTagsMessage{}
if err := json.Unmarshal(b, &mreq); err != nil {
ErrorResponse(c, fmt.Sprintf("error parsing message: %s", err.Error()))
return
}
if mreq.Storage == 0 {
ErrorResponse(c, "storage is required")
return
}
storage, _ := iproxy.GetMessageStorage(mreq.Storage)
if storage == nil {
ErrorResponse(c, fmt.Sprintf("storage with id %d does not exist", mreq.Storage))
return
}
req, err := storage.LoadRequest(mreq.ReqId)
if err != nil {
ErrorResponse(c, fmt.Sprintf("error loading request: %s", err.Error()))
return
}
req.ClearTags()
err = UpdateRequest(storage, req)
if err != nil {
ErrorResponse(c, fmt.Sprintf("error saving request: %s", err.Error()))
return
}
MessageResponse(c, &successResult{Success: true})
}
/*
Intercept
*/
type interceptMessage struct {
InterceptRequests bool
InterceptResponses bool
InterceptWS bool
UseQuery bool
Query MessageQuery
}
type intHandshakeResult struct {
Success bool
}
// id func
var getNextIntId = IdCounter()
type intRequest struct {
// A request to have a message mangled
Id int
Type string
Success bool
Result chan *intResponse `json:"-"`
Request *RequestJSON `json:"Request,omitempty"`
Response *ResponseJSON `json:"Response,omitempty"`
WSMessage *WSMessageJSON `json:"WSMessage,omitempty"`
}
type intResponse struct {
// response from the client with a mangled http request
Id int
Dropped bool
Request *RequestJSON `json:"Request,omitempty"`
Response *ResponseJSON `json:"Response,omitempty"`
WSMessage *WSMessageJSON `json:"WSMessage,omitempty"`
}
type intErrorMessage struct {
// a message template for sending an error to client if there is an error
// with the mangled message they sent
Id int
Success bool
Reason string
}
func intErrorResponse(id int, conn net.Conn, reason string) {
m := &intErrorMessage{
Id: id,
Success: false,
Reason: reason,
}
MessageResponse(conn, m)
}
func interceptHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *InterceptingProxy) {
mreq := interceptMessage{
InterceptRequests: false,
InterceptResponses: false,
InterceptWS: false,
UseQuery: false,
}
if err := json.Unmarshal(b, &mreq); err != nil {
ErrorResponse(c, fmt.Sprintf("error parsing message: %s", err.Error()))
return
}
if !mreq.InterceptRequests && !mreq.InterceptResponses && !mreq.InterceptWS {
ErrorResponse(c, "must intercept at least one message type")
return
}
pendingRequests := make(map[int]*intRequest)
var pendingRequestsMtx sync.Mutex
// helper functions for managing pending requests
getPendingRequest := func(id int) (*intRequest, error) {
pendingRequestsMtx.Lock()
defer pendingRequestsMtx.Unlock()
ret, ok := pendingRequests[id]
if !ok {
return nil, fmt.Errorf("pending request with id %d does not exist", id)
}
return ret, nil
}
addPendingRequest := func(pendingReq *intRequest) {
pendingRequestsMtx.Lock()
defer pendingRequestsMtx.Unlock()
pendingRequests[pendingReq.Id] = pendingReq
}
removePendingRequest := func(pendingReq *intRequest) {
pendingRequestsMtx.Lock()
defer pendingRequestsMtx.Unlock()
delete(pendingRequests, pendingReq.Id)
}
// parse the checker
var checker RequestChecker = nil
if mreq.UseQuery {
var err error
checker, err = CheckerFromMessageQuery(mreq.Query)
if err != nil {
ErrorResponse(c, fmt.Sprintf("error with message query: %s", err.Error()))
return
}
}
MessageResponse(c, &intHandshakeResult{Success: true})
// hook the request interceptor
var reqSub *ReqIntSub = nil
if mreq.InterceptRequests {
logger.Println("Adding request interceptor...")
// Create a function that sends requests to client and wait for the client to respond
reqIntFunc := func(req *ProxyRequest) (*ProxyRequest, error) {
// if it doesn't pass the query, return the request unmodified
if checker != nil && !checker(req) {
return req, nil
}
// JSON serialize the request
reqData := NewRequestJSON(req, false)
CleanReqJSON(reqData)
// convert request data to an intRequest
intReq := &intRequest{
Id: getNextIntId(),
Type: "httprequest",
Result: make(chan *intResponse),
Success: true,
Request: reqData,
}
// add bookkeeping for results, defer cleanup
addPendingRequest(intReq)
defer removePendingRequest(intReq)
// submit the request
MessageResponse(c, intReq)
// wait for result
intRsp, ok := <-intReq.Result
if !ok {
// if it closed, just pass the request along
return req, nil
}
if intRsp.Dropped {
// if it's dropped, return nil
return nil, nil
}
newReq := intRsp.Request
CleanReqJSON(newReq)
ret, err := newReq.Parse()
if err != nil {
return nil, err
}
return ret, nil
}
reqSub = iproxy.AddReqInterceptor(reqIntFunc)
}
var rspSub *RspIntSub = nil
if mreq.InterceptResponses {
logger.Println("Adding response interceptor...")
rspIntFunc := func(req *ProxyRequest, rsp *ProxyResponse) (*ProxyResponse, error) {
logger.Println("Intercepted response!")
// if it doesn't pass the query, return the request unmodified
if checker != nil && !checker(req) {
return rsp, nil
}
reqData := NewRequestJSON(req, false)
CleanReqJSON(reqData)
rspData := NewResponseJSON(rsp, false)
CleanRspJSON(rspData)
intReq := &intRequest{
Id: getNextIntId(),
Type: "httpresponse",
Result: make(chan *intResponse),
Success: true,
Request: reqData,
Response: rspData,
}
// add bookkeeping for results, defer cleanup
addPendingRequest(intReq)
defer removePendingRequest(intReq)
// submit the request
MessageResponse(c, intReq)
// wait for result
intRsp, ok := <-intReq.Result
if !ok {
// it closed, pass response along unmodified
return rsp, nil
}
if intRsp.Dropped {
// if it's dropped, return nil
return nil, nil
}
newRsp := intRsp.Response
CleanRspJSON(newRsp)
ret, err := newRsp.Parse()
if err != nil {
return nil, err
}
return ret, nil
}
rspSub = iproxy.AddRspInterceptor(rspIntFunc)
}
var wsSub *WSIntSub = nil
if mreq.InterceptWS {
logger.Println("Adding websocket interceptor...")
wsIntFunc := func(req *ProxyRequest, rsp *ProxyResponse, wsm *ProxyWSMessage) (*ProxyWSMessage, error) {
// if it doesn't pass the query, return the request unmodified
if checker != nil && !checker(req) {
return wsm, nil
}
wsData := NewWSMessageJSON(wsm)
var msgType string
if wsData.ToServer {
msgType = "wstoserver"
} else {
msgType = "wstoclient"
}
CleanWSJSON(wsData)
reqData := NewRequestJSON(req, false)
CleanReqJSON(reqData)
rspData := NewResponseJSON(rsp, false)
CleanRspJSON(rspData)
intReq := &intRequest{
Id: getNextIntId(),
Type: msgType,
Result: make(chan *intResponse),
Success: true,
Request: reqData,
Response: rspData,
WSMessage: wsData,
}
// add bookkeeping for results, defer cleanup
addPendingRequest(intReq)
defer removePendingRequest(intReq)
// submit the request
MessageResponse(c, intReq)
// wait for result
intRsp, ok := <-intReq.Result
if !ok {
// it closed, pass message along unmodified
return wsm, nil
}
if intRsp.Dropped {
// if it's dropped, return nil
return nil, nil
}
newWsm := intRsp.WSMessage
CleanWSJSON(newWsm)
ret, err := newWsm.Parse()
if err != nil {
return nil, err
}
return ret, nil
}
wsSub = iproxy.AddWSInterceptor(wsIntFunc)
}
closeAll := func() {
if reqSub != nil {
// close req sub
iproxy.RemoveReqInterceptor(reqSub)
}
if rspSub != nil {
// close rsp sub
iproxy.RemoveRspInterceptor(rspSub)
}
if wsSub != nil {
// close websocket sub
iproxy.RemoveWSInterceptor(wsSub)
}
// Close all pending requests
pendingRequestsMtx.Lock()
defer pendingRequestsMtx.Unlock()
for _, req := range pendingRequests {
close(req.Result)
}
}
defer closeAll()
// Read from the connection and process mangled requests
reader := bufio.NewReader(c)
for {
// read line from conn
logger.Println("Waiting on next message...")
m, err := ReadMessage(reader)
if err != nil {
if err != io.EOF {
logger.Println("Error reading message:", err.Error())
intErrorResponse(0, c, "error reading message")
continue
}
logger.Println("Connection closed")
return
}
// convert line to appropriate struct
var intRsp intResponse
if err := json.Unmarshal(m, &intRsp); err != nil {
intErrorResponse(0, c, fmt.Sprintf("error parsing message: %s", err.Error()))
continue
}
// get the pending request
pendingReq, err := getPendingRequest(intRsp.Id)
if err != nil {
intErrorResponse(intRsp.Id, c, err.Error())
continue
}
// Validate the data contained in the response
switch pendingReq.Type {
case "httprequest":
if intRsp.Request == nil {
intErrorResponse(intRsp.Id, c, "missing request")
continue
}
case "httpresponse":
if intRsp.Response == nil {
intErrorResponse(intRsp.Id, c, "missing response")
continue
}
case "wstoserver", "wstoclient":
if intRsp.WSMessage == nil {
intErrorResponse(intRsp.Id, c, "missing websocket message")
continue
}
intRsp.WSMessage.ToServer = (pendingReq.Type == "wstoserver")
default:
intErrorResponse(intRsp.Id, c, "internal error, stored message has invalid type")
continue
}
// pass along message
removePendingRequest(pendingReq)
pendingReq.Result <- &intRsp
}
}
/*
Query management
*/
type allSavedQueriesMessage struct {
Storage int
}
type allSavedQueriesResponse struct {
Success bool
Queries []*StrSavedQuery
}
type StrSavedQuery struct {
Name string
Query StrMessageQuery
}
func allSavedQueriesHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *InterceptingProxy) {
mreq := allSavedQueriesMessage{}
if err := json.Unmarshal(b, &mreq); err != nil {
ErrorResponse(c, fmt.Sprintf("error parsing message: %s", err.Error()))
return
}
if mreq.Storage == 0 {
ErrorResponse(c, "storage is required")
return
}
storage, _ := iproxy.GetMessageStorage(mreq.Storage)
if storage == nil {
ErrorResponse(c, fmt.Sprintf("storage with id %d does not exist", mreq.Storage))
return
}
goQueries, err := storage.AllSavedQueries()
if err != nil {
ErrorResponse(c, err.Error())
return
}
savedQueries := make([]*StrSavedQuery, 0)
for _, q := range goQueries {
strSavedQuery := &StrSavedQuery{
Name: q.Name,
Query: nil,
}
sq, err := MsgQueryToStrQuery(q.Query)
if err == nil {
strSavedQuery.Query = sq
savedQueries = append(savedQueries, strSavedQuery)
}
}
MessageResponse(c, &allSavedQueriesResponse{
Success: true,
Queries: savedQueries,
})
}
type saveQueryMessage struct {
Name string
Query StrMessageQuery
Storage int
}
func saveQueryHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *InterceptingProxy) {
mreq := saveQueryMessage{}
if err := json.Unmarshal(b, &mreq); err != nil {
ErrorResponse(c, "error parsing message")
return
}
if mreq.Storage == 0 {
ErrorResponse(c, "storage is required")
return
}
storage, _ := iproxy.GetMessageStorage(mreq.Storage)
if storage == nil {
ErrorResponse(c, fmt.Sprintf("storage with id %d does not exist", mreq.Storage))
return
}
if mreq.Name == "" || mreq.Query == nil {
ErrorResponse(c, "name and query are required")
return
}
goQuery, err := StrQueryToMsgQuery(mreq.Query)
if err != nil {
ErrorResponse(c, err.Error())
return
}
_, err = CheckerFromMessageQuery(goQuery)
if err != nil {
ErrorResponse(c, err.Error())
return
}
err = storage.SaveQuery(mreq.Name, goQuery)
if err != nil {
ErrorResponse(c, err.Error())
return
}
MessageResponse(c, &successResult{Success: true})
}
type loadQueryMessage struct {
Name string
Storage int
}
type loadQueryResult struct {
Success bool
Query StrMessageQuery
}
func loadQueryHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *InterceptingProxy) {
mreq := loadQueryMessage{}
if err := json.Unmarshal(b, &mreq); err != nil {
ErrorResponse(c, "error parsing message")
return
}
if mreq.Name == "" {
ErrorResponse(c, "name is required")
return
}
if mreq.Storage == 0 {
ErrorResponse(c, "storage is required")
return
}
storage, _ := iproxy.GetMessageStorage(mreq.Storage)
if storage == nil {
ErrorResponse(c, fmt.Sprintf("storage with id %d does not exist", mreq.Storage))
return
}
query, err := storage.LoadQuery(mreq.Name)
if err != nil {
ErrorResponse(c, err.Error())
return
}
strQuery, err := MsgQueryToStrQuery(query)
if err != nil {
ErrorResponse(c, err.Error())
return
}
result := &loadQueryResult{
Success: true,
Query: strQuery,
}
MessageResponse(c, result)
}
type deleteQueryMessage struct {
Name string
Storage int
}
func deleteQueryHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *InterceptingProxy) {
mreq := deleteQueryMessage{}
if err := json.Unmarshal(b, &mreq); err != nil {
ErrorResponse(c, "error parsing message")
return
}
if mreq.Storage == 0 {
ErrorResponse(c, "storage is required")
return
}
storage, _ := iproxy.GetMessageStorage(mreq.Storage)
if storage == nil {
ErrorResponse(c, fmt.Sprintf("storage with id %d does not exist", mreq.Storage))
return
}
err := storage.DeleteQuery(mreq.Name)
if err != nil {
ErrorResponse(c, err.Error())
return
}
MessageResponse(c, &successResult{Success: true})
}
/*
Listener management
*/
type activeListener struct {
Id int
Listener net.Listener `json:"-"`
Type string
Addr string
}
type addListenerMessage struct {
Type string
Addr string
TransparentMode bool
DestHost string
DestPort int
DestUseTLS bool
}
type addListenerResult struct {
Success bool
Id int
}
var getNextMsgListenerId = IdCounter()
// may want to move these into the iproxy to avoid globals since this assumes exactly one iproxy
var msgActiveListenersMtx sync.Mutex
var msgActiveListeners map[int]*activeListener = make(map[int]*activeListener)
func addListenerHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *InterceptingProxy) {
mreq := addListenerMessage{}
if err := json.Unmarshal(b, &mreq); err != nil {
ErrorResponse(c, "error parsing message")
return
}
if mreq.Type == "" || mreq.Addr == "" {
ErrorResponse(c, "type and addr are required")
return
}
// why did I add support to listen on unix sockets? I have no idea but I'm gonna leave it
if !(mreq.Type == "tcp" ||
mreq.Type == "unix") {
ErrorResponse(c, "type must be \"tcp\" or \"unix\"")
return
}
listener, err := net.Listen(mreq.Type, mreq.Addr)
if err != nil {
ErrorResponse(c, err.Error())
return
}
if mreq.TransparentMode {
iproxy.AddTransparentListener(listener, mreq.DestHost,
mreq.DestPort, mreq.DestUseTLS)
} else {
iproxy.AddListener(listener)
}
alistener := &activeListener{
Id: getNextMsgListenerId(),
Listener: listener,
Type: mreq.Type,
Addr: mreq.Addr,
}
msgActiveListenersMtx.Lock()
defer msgActiveListenersMtx.Unlock()
msgActiveListeners[alistener.Id] = alistener
result := &addListenerResult{
Success: true,
Id: alistener.Id,
}
MessageResponse(c, result)
}
type removeListenerMessage struct {
Id int
}
func removeListenerHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *InterceptingProxy) {
mreq := removeListenerMessage{}
if err := json.Unmarshal(b, &mreq); err != nil {
ErrorResponse(c, "error parsing message")
return
}
msgActiveListenersMtx.Lock()
defer msgActiveListenersMtx.Unlock()
alistener, ok := msgActiveListeners[mreq.Id]
if !ok {
ErrorResponse(c, "listener does not exist")
return
}
iproxy.RemoveListener(alistener.Listener)
delete(msgActiveListeners, alistener.Id)
MessageResponse(c, &successResult{Success: true})
}
type getListenersMessage struct{}
type getListenersResult struct {
Success bool
Results []*activeListener
}
func getListenersHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *InterceptingProxy) {
result := &getListenersResult{
Success: true,
Results: make([]*activeListener, 0),
}
msgActiveListenersMtx.Lock()
defer msgActiveListenersMtx.Unlock()
for _, alistener := range msgActiveListeners {
result.Results = append(result.Results, alistener)
}
MessageResponse(c, result)
}
/*
Certificate Management
*/
type loadCertificatesMessage struct {
KeyFile string
CertificateFile string
}
func loadCertificatesHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *InterceptingProxy) {
mreq := loadCertificatesMessage{}
if err := json.Unmarshal(b, &mreq); err != nil {
ErrorResponse(c, "error parsing message")
return
}
if mreq.KeyFile == "" || mreq.CertificateFile == "" {
ErrorResponse(c, "both KeyFile and CertificateFile are required")
return
}
err := iproxy.LoadCACertificates(mreq.CertificateFile, mreq.KeyFile)
if err != nil {
ErrorResponse(c, err.Error())
return
}
MessageResponse(c, &successResult{Success: true})
}
type setCertificatesMessage struct {
KeyPEMData []byte
CertificatePEMData []byte
}
func setCertificatesHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *InterceptingProxy) {
mreq := setCertificatesMessage{}
if err := json.Unmarshal(b, &mreq); err != nil {
ErrorResponse(c, "error parsing message")
return
}
if len(mreq.KeyPEMData) == 0 || len(mreq.CertificatePEMData) == 0 {
ErrorResponse(c, "both KeyPEMData and CertificatePEMData are required")
return
}
caCert, err := tls.X509KeyPair(mreq.CertificatePEMData, mreq.KeyPEMData)
if err != nil {
ErrorResponse(c, err.Error())
return
}
iproxy.SetCACertificate(&caCert)
MessageResponse(c, &successResult{Success: true})
}
func clearCertificatesHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *InterceptingProxy) {
iproxy.SetCACertificate(nil)
MessageResponse(c, &successResult{Success: true})
}
type generateCertificatesMessage struct {
KeyFile string
CertFile string
}
func generateCertificatesHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *InterceptingProxy) {
mreq := generateCertificatesMessage{}
if err := json.Unmarshal(b, &mreq); err != nil {
ErrorResponse(c, "error parsing message")
return
}
_, err := GenerateCACertsToDisk(mreq.CertFile, mreq.KeyFile)
if err != nil {
ErrorResponse(c, err.Error())
return
}
MessageResponse(c, &successResult{Success: true})
}
type generatePEMCertificatesResult struct {
Success bool
KeyPEMData []byte
CertificatePEMData []byte
}
func generatePEMCertificatesHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *InterceptingProxy) {
mreq := generateCertificatesMessage{}
if err := json.Unmarshal(b, &mreq); err != nil {
ErrorResponse(c, "error parsing message")
return
}
pair, err := GenerateCACerts()
if err != nil {
ErrorResponse(c, "error generating certificates: "+err.Error())
return
}
result := &generatePEMCertificatesResult{
Success: true,
KeyPEMData: pair.PrivateKeyPEM(),
CertificatePEMData: pair.CACertPEM(),
}
MessageResponse(c, result)
}
/*
Storage functions
*/
type addSQLiteStorageMessage struct {
Path string
Description string
}
type addSQLiteStorageResult struct {
Success bool
StorageId int
}
func addSQLiteStorageHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *InterceptingProxy) {
mreq := addSQLiteStorageMessage{}
if err := json.Unmarshal(b, &mreq); err != nil {
ErrorResponse(c, "error parsing message")
return
}
if mreq.Path == "" {
ErrorResponse(c, "file path is required")
return
}
storage, err := OpenSQLiteStorage(mreq.Path, logger)
if err != nil {
ErrorResponse(c, "error opening SQLite databae: "+err.Error())
return
}
sid := iproxy.AddMessageStorage(storage, mreq.Description)
result := &addSQLiteStorageResult{
Success: true,
StorageId: sid,
}
MessageResponse(c, result)
}
type addInMemoryStorageMessage struct {
Description string
}
type addInMemoryStorageResult struct {
Success bool
StorageId int
}
func addInMemoryStorageHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *InterceptingProxy) {
mreq := addInMemoryStorageMessage{}
if err := json.Unmarshal(b, &mreq); err != nil {
ErrorResponse(c, "error parsing message")
return
}
storage, err := InMemoryStorage(logger)
if err != nil {
ErrorResponse(c, "error creating in memory storage: "+err.Error())
return
}
sid := iproxy.AddMessageStorage(storage, mreq.Description)
result := &addInMemoryStorageResult{
Success: true,
StorageId: sid,
}
MessageResponse(c, result)
}
type closeStorageMessage struct {
StorageId int
}
type closeStorageResult struct {
Success bool
}
func closeStorageHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *InterceptingProxy) {
mreq := closeStorageMessage{}
if err := json.Unmarshal(b, &mreq); err != nil {
ErrorResponse(c, "error parsing message")
return
}
if mreq.StorageId == 0 {
ErrorResponse(c, "StorageId is required")
return
}
iproxy.CloseMessageStorage(mreq.StorageId)
MessageResponse(c, &successResult{Success: true})
}
type setProxyStorageMessage struct {
StorageId int
}
func setProxyStorageHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *InterceptingProxy) {
mreq := setProxyStorageMessage{}
if err := json.Unmarshal(b, &mreq); err != nil {
ErrorResponse(c, "error parsing message")
return
}
if mreq.StorageId == 0 {
ErrorResponse(c, "StorageId is required")
return
}
err := iproxy.SetProxyStorage(mreq.StorageId)
if err != nil {
ErrorResponse(c, err.Error())
return
}
MessageResponse(c, &successResult{Success: true})
}
type savedStorageJSON struct {
Id int
Description string
}
type listProxyStorageResult struct {
Storages []*savedStorageJSON
Success bool
}
func listProxyStorageHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *InterceptingProxy) {
storages := iproxy.ListMessageStorage()
storagesJSON := make([]*savedStorageJSON, len(storages))
for i, ss := range storages {
storagesJSON[i] = &savedStorageJSON{ss.Id, ss.Description}
}
result := &listProxyStorageResult{
Storages: storagesJSON,
Success: true,
}
MessageResponse(c, result)
}
/*
SetProxy
*/
type setProxyMessage struct {
UseProxy bool
ProxyHost string
ProxyPort int
ProxyIsSOCKS bool
UseCredentials bool
Username string
Password string
}
func setProxyHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *InterceptingProxy) {
mreq := setProxyMessage{}
if err := json.Unmarshal(b, &mreq); err != nil {
ErrorResponse(c, "error parsing message")
return
}
var creds *ProxyCredentials = nil
if mreq.UseCredentials {
creds = &ProxyCredentials{
Username: mreq.Username,
Password: mreq.Password,
}
}
if !mreq.UseProxy {
iproxy.ClearUpstreamProxy()
} else {
if mreq.ProxyIsSOCKS {
iproxy.SetUpstreamSOCKSProxy(mreq.ProxyHost, mreq.ProxyPort, creds)
} else {
iproxy.SetUpstreamProxy(mreq.ProxyHost, mreq.ProxyPort, creds)
}
}
MessageResponse(c, &successResult{Success: true})
}
/*
WatchStorage
*/
type watchStorageMessage struct {
HeadersOnly bool
}
type proxyMsgStorageWatcher struct {
connMtx sync.Mutex
headersOnly bool
conn net.Conn
}
type storageUpdateResponse struct {
StorageId int
Action string
MessageId string
Request *RequestJSON
Response *ResponseJSON
WSMessage *WSMessageJSON
}
// Implement watcher
func (sw *proxyMsgStorageWatcher) NewRequestSaved(storageId int, ms MessageStorage, req *ProxyRequest) {
sw.connMtx.Lock()
defer sw.connMtx.Unlock()
msgRsp := &storageUpdateResponse{}
msgRsp.Request = NewRequestJSON(req, sw.headersOnly)
msgRsp.Action = "NewRequest"
msgRsp.StorageId = storageId
MessageResponse(sw.conn, msgRsp)
}
func (sw *proxyMsgStorageWatcher) RequestUpdated(storageId int, ms MessageStorage, req *ProxyRequest) {
sw.connMtx.Lock()
defer sw.connMtx.Unlock()
msgRsp := &storageUpdateResponse{}
msgRsp.Request = NewRequestJSON(req, sw.headersOnly)
msgRsp.Action = "RequestUpdated"
msgRsp.StorageId = storageId
MessageResponse(sw.conn, msgRsp)
}
func (sw *proxyMsgStorageWatcher) RequestDeleted(storageId int, ms MessageStorage, DbId string) {
sw.connMtx.Lock()
defer sw.connMtx.Unlock()
msgRsp := &storageUpdateResponse{}
msgRsp.Action = "RequestDeleted"
msgRsp.MessageId = DbId
msgRsp.StorageId = storageId
MessageResponse(sw.conn, msgRsp)
}
func (sw *proxyMsgStorageWatcher) NewResponseSaved(storageId int, ms MessageStorage, rsp *ProxyResponse) {
sw.connMtx.Lock()
defer sw.connMtx.Unlock()
msgRsp := &storageUpdateResponse{}
msgRsp.Response = NewResponseJSON(rsp, sw.headersOnly)
msgRsp.Action = "NewResponse"
msgRsp.StorageId = storageId
MessageResponse(sw.conn, msgRsp)
}
func (sw *proxyMsgStorageWatcher) ResponseUpdated(storageId int, ms MessageStorage, rsp *ProxyResponse) {
sw.connMtx.Lock()
defer sw.connMtx.Unlock()
msgRsp := &storageUpdateResponse{}
msgRsp.Response = NewResponseJSON(rsp, sw.headersOnly)
msgRsp.Action = "ResponseUpdated"
msgRsp.StorageId = storageId
MessageResponse(sw.conn, msgRsp)
}
func (sw *proxyMsgStorageWatcher) ResponseDeleted(storageId int, ms MessageStorage, DbId string) {
sw.connMtx.Lock()
defer sw.connMtx.Unlock()
msgRsp := &storageUpdateResponse{}
msgRsp.Action = "ResponseDeleted"
msgRsp.MessageId = DbId
msgRsp.StorageId = storageId
MessageResponse(sw.conn, msgRsp)
}
func (sw *proxyMsgStorageWatcher) NewWSMessageSaved(storageId int, ms MessageStorage, req *ProxyRequest, wsm *ProxyWSMessage) {
sw.connMtx.Lock()
defer sw.connMtx.Unlock()
msgRsp := &storageUpdateResponse{}
msgRsp.Request = NewRequestJSON(req, sw.headersOnly)
msgRsp.WSMessage = NewWSMessageJSON(wsm)
msgRsp.Action = "NewWSMessage"
msgRsp.StorageId = storageId
MessageResponse(sw.conn, msgRsp)
}
func (sw *proxyMsgStorageWatcher) WSMessageUpdated(storageId int, ms MessageStorage, req *ProxyRequest, wsm *ProxyWSMessage) {
sw.connMtx.Lock()
defer sw.connMtx.Unlock()
msgRsp := &storageUpdateResponse{}
msgRsp.Request = NewRequestJSON(req, sw.headersOnly)
msgRsp.WSMessage = NewWSMessageJSON(wsm)
msgRsp.Action = "WSMessageUpdated"
msgRsp.StorageId = storageId
MessageResponse(sw.conn, msgRsp)
}
func (sw *proxyMsgStorageWatcher) WSMessageDeleted(storageId int, ms MessageStorage, DbId string) {
sw.connMtx.Lock()
defer sw.connMtx.Unlock()
var msgRsp storageUpdateResponse
msgRsp.Action = "WSMessageDeleted"
msgRsp.MessageId = DbId
msgRsp.StorageId = storageId
MessageResponse(sw.conn, msgRsp)
}
// Actual handler
func watchStorageHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *InterceptingProxy) {
mreq := watchStorageMessage{}
if err := json.Unmarshal(b, &mreq); err != nil {
ErrorResponse(c, "error parsing message")
return
}
// add global watcher
watcher := &proxyMsgStorageWatcher{
headersOnly: mreq.HeadersOnly,
conn: c,
}
iproxy.GlobalStorageWatch(watcher)
defer iproxy.GlobalStorageEndWatch(watcher)
// Keep the connection open
MessageResponse(c, &successResult{Success: true})
tmpbuf := make([]byte, 1)
var err error = nil
for err == nil {
_, err = c.Read(tmpbuf)
}
}
/*
SetPluginValue and GetPluginValue
*/
type setPluginValueMessage struct {
Key string
Value string
Storage int
}
func setPluginValueHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *InterceptingProxy) {
mreq := setPluginValueMessage{}
if err := json.Unmarshal(b, &mreq); err != nil {
ErrorResponse(c, "error parsing message")
return
}
if mreq.Storage == 0 {
ErrorResponse(c, "storage is required")
return
}
storage, _ := iproxy.GetMessageStorage(mreq.Storage)
if storage == nil {
ErrorResponse(c, fmt.Sprintf("storage with id %d does not exist", mreq.Storage))
return
}
if mreq.Key == "" {
ErrorResponse(c, "key value is required")
return
}
err := storage.SetPluginValue(mreq.Key, mreq.Value)
if err != nil {
ErrorResponse(c, fmt.Sprintf("error saving value: %s", err.Error()))
return
}
MessageResponse(c, &successResult{Success: true})
}
type getPluginValueMessage struct {
Key string
Storage int
}
type getPluginValueResponse struct {
Value string
Success bool
}
func getPluginValueHandler(b []byte, c net.Conn, logger *log.Logger, iproxy *InterceptingProxy) {
mreq := getPluginValueMessage{}
if err := json.Unmarshal(b, &mreq); err != nil {
ErrorResponse(c, "error parsing message")
return
}
if mreq.Storage == 0 {
ErrorResponse(c, "storage is required")
return
}
storage, _ := iproxy.GetMessageStorage(mreq.Storage)
if storage == nil {
ErrorResponse(c, fmt.Sprintf("storage with id %d does not exist", mreq.Storage))
return
}
if mreq.Key == "" {
ErrorResponse(c, "key value is required")
return
}
value, err := storage.GetPluginValue(mreq.Key)
if err != nil {
ErrorResponse(c, fmt.Sprintf("error getting value: %s", err.Error()))
return
}
MessageResponse(c, &getPluginValueResponse{Value: value, Success: true})
}