diff --git a/jobpool.go b/jobpool.go deleted file mode 100644 index 4064412..0000000 --- a/jobpool.go +++ /dev/null @@ -1,117 +0,0 @@ -package main - -import ( - "fmt" - "sync" -) - -// An interface which represents a job to be done by the pool -type Job interface { - Run() // Start the job - Abort() // Abort any work that needs to be completed and close the DoneChannel - DoneChannel() chan struct{} // Returns a channel that is closed when the job is done -} - -// An interface which represents a pool of workers doing jobs -type JobPool struct { - MaxThreads int - - jobQueue chan Job - jobQueueDone chan struct{} - jobQueueAborted chan struct{} - jobQueueShutDown chan struct{} -} - -func NewJobPool(maxThreads int) *JobPool { - q := JobPool{ - MaxThreads: maxThreads, - jobQueue: make(chan Job), - jobQueueDone: make(chan struct{}), // Closing will shut down workers and reject any incoming work - jobQueueAborted: make(chan struct{}), // Closing tells workers to abort - jobQueueShutDown: make(chan struct{}), // Closed when all workers are shut down - } - - return &q -} - -func (q *JobPool) RunJob(j Job) error { - select { - case <-q.jobQueueDone: - return fmt.Errorf("job queue closed") - default: - } - - q.jobQueue <- j - return nil -} - -func (q *JobPool) Run() { - if q.MaxThreads > 0 { - // Create pool of routines that read from the queue and run jobs - var w sync.WaitGroup - for i := 0; i < q.MaxThreads; i++ { - w.Add(1) - go func() { - defer w.Done() - for { - select { - case job := <-q.jobQueue: - go func() { - job.Run() - }() - - select { - case <-job.DoneChannel(): // The job finishes normally - case <-q.jobQueueAborted: // We have to abort the job - job.Abort() // Tell the job to abort - <-job.DoneChannel() // Wait for the job to abort - } - case <-q.jobQueueDone: - // We're done and out of jobs, quit - close(q.jobQueueShutDown) // Flag that the workers quit - return - } - } - }() - } - - w.Wait() // Wait for workers to quit - close(q.jobQueueShutDown) // Flag that all workers quit - } else { - // Create a thread any time we pull something out of the job queue - for { - select { - case job := <-q.jobQueue: - go func() { - go func() { - job.Run() - }() - - select { - case <-job.DoneChannel(): // The job finishes normally - case <-q.jobQueueAborted: // We have to abort the job - job.Abort() // Tell the job to abort - <-job.DoneChannel() // Wait for the job to abort - } - }() - case <-q.jobQueueDone: - close(q.jobQueueShutDown) // Flag that the workers quit - return - } - } - } -} - -func (q *JobPool) Abort() { - close(q.jobQueueDone) // Stop accepting jobs and tell the workers to quit - close(q.jobQueueAborted) // Tell the workers to abort - <-q.jobQueueShutDown // Wait for all the workers to shut down - close(q.jobQueue) // Clean up the job queue -} - -func (q *JobPool) CompleteAndClose() { - close(q.jobQueueDone) // Stop accepting jobs and tell the workers to quit - <-q.jobQueueShutDown // Wait for all the workers to shut down - close(q.jobQueue) // Clean up the job queue - close(q.jobQueueAborted) // Clean up abort channel -} diff --git a/main.go b/main.go deleted file mode 100644 index e710e29..0000000 --- a/main.go +++ /dev/null @@ -1,163 +0,0 @@ -package main - -import ( - "errors" - "flag" - "fmt" - "io/ioutil" - "log" - "net" - "os" - "os/signal" - "strings" - "syscall" - "time" -) - -var logBanner string = ` -======================================== -PUPPYSTARTEDPUPPYSTARTEDPUPPYSTARTEDPUPP - .--. .---. - /:. '. .' .. '._.---. - /:::-. \.-"""-;' .-:::. .::\ - /::'| '\/ _ _ \' '\:' ::::| - __.' | / (o|o) \ ''. ':/ - / .:. / | ___ | '---' -| ::::' /: (._.) .:\ -\ .=' |:' :::| - '""' \ .-. ':/ - '---'|I|'---' - '-' -PUPPYSTARTEDPUPPYSTARTEDPUPPYSTARTEDPUPP -======================================== -` - -type listenArg struct { - Type string - Addr string -} - -func quitErr(msg string) { - os.Stderr.WriteString(msg) - os.Stderr.WriteString("\n") - os.Exit(1) -} - -func checkErr(err error) { - if err != nil { - quitErr(err.Error()) - } -} - -func parseListenString(lstr string) (*listenArg, error) { - args := strings.SplitN(lstr, ":", 2) - if len(args) != 2 { - return nil, errors.New("invalid listener. Must be in the form of \"tye:addr\"") - } - argStruct := &listenArg{ - Type: strings.ToLower(args[0]), - Addr: args[1], - } - if argStruct.Type != "tcp" && argStruct.Type != "unix" { - return nil, fmt.Errorf("invalid listener type: %s", argStruct.Type) - } - return argStruct, nil -} - -func unixAddr() string { - return fmt.Sprintf("%s/proxy.%d.%d.sock", os.TempDir(), os.Getpid(), time.Now().UnixNano()) -} - -var mln net.Listener -var logger *log.Logger - -func cleanup() { - if mln != nil { - mln.Close() - } -} - -var MainLogger *log.Logger - -func main() { - defer cleanup() - // Handle signals - sigc := make(chan os.Signal, 1) - signal.Notify(sigc, os.Interrupt, os.Kill, syscall.SIGTERM) - go func() { - <-sigc - if logger != nil { - logger.Println("Caught signal. Cleaning up.") - } - cleanup() - os.Exit(0) - }() - - msgListenStr := flag.String("msglisten", "", "Listener for the message handler. Examples: \"tcp::8080\", \"tcp:127.0.0.1:8080\", \"unix:/tmp/foobar\"") - // storageFname := flag.String("storage", "", "Datafile to use for storage") - // inMemStorage := flag.Bool("inmem", false, "Set flag to store messages in memroy rather than use a datafile") - autoListen := flag.Bool("msgauto", false, "Automatically pick and open a unix or tcp socket for the message listener") - debugFlag := flag.Bool("dbg", false, "Enable debug logging") - flag.Parse() - - if *debugFlag { - logfile, err := os.OpenFile("log.log", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) - checkErr(err) - logger = log.New(logfile, "[*] ", log.Lshortfile) - } else { - logger = log.New(ioutil.Discard, "[*] ", log.Lshortfile) - log.SetFlags(0) - } - MainLogger = logger - - // Parse arguments to structs - if *msgListenStr == "" && *autoListen == false { - quitErr("message listener address or `--msgauto` required") - } - if *msgListenStr != "" && *autoListen == true { - quitErr("only one of listener address or `--msgauto` can be used") - } - - // Create the message listener - var listenStr string - if *msgListenStr != "" { - msgAddr, err := parseListenString(*msgListenStr) - checkErr(err) - if msgAddr.Type == "tcp" { - var err error - mln, err = net.Listen("tcp", msgAddr.Addr) - checkErr(err) - } else if msgAddr.Type == "unix" { - var err error - mln, err = net.Listen("unix", msgAddr.Addr) - checkErr(err) - } else { - quitErr("unsupported listener type:" + msgAddr.Type) - } - listenStr = fmt.Sprintf("%s:%s", msgAddr.Type, msgAddr.Addr) - } else { - fpath := unixAddr() - ulisten, err := net.Listen("unix", fpath) - if err == nil { - mln = ulisten - listenStr = fmt.Sprintf("unix:%s", fpath) - } else { - tcplisten, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - quitErr("unable to open any messaging ports") - } - mln = tcplisten - listenStr = fmt.Sprintf("tcp:%s", tcplisten.Addr().String()) - } - } - - // Set up the intercepting proxy - iproxy := NewInterceptingProxy(logger) - iproxy.AddHTTPHandler("puppy", WebUIHandler) - - // Create a message server and have it serve for the iproxy - mserv := NewProxyMessageListener(logger, iproxy) - logger.Print(logBanner) - fmt.Println(listenStr) - mserv.Serve(mln) // serve until killed -}