Browse Source

Add flag for delaying the stop of chunker

master
Valentin Vidic 2 weeks ago
parent
commit
6920c7b6ad
1 changed files with 36 additions and 1 deletions
  1. +36
    -1
      mjpeg-proxy.go

+ 36
- 1
mjpeg-proxy.go View File

@@ -34,6 +34,7 @@ import (
"os"
"runtime"
"strings"
"time"
)

/* Sample source stream starts like this:
@@ -205,6 +206,19 @@ func (chunker *Chunker) Stop() {
close(chunker.stop)
}

func (chunker *Chunker) Started() bool {
if chunker.stop == nil { // Never started
return false
}

select {
case <-chunker.stop: // Already stopped
return false
default:
return true // Still running
}
}

type PubSub struct {
id string
chunker *Chunker
@@ -212,6 +226,7 @@ type PubSub struct {
subChan chan *Subscriber
unsubChan chan *Subscriber
subscribers map[*Subscriber]struct{}
stopTimer *time.Timer
}

func NewPubSub(id string, chunker *Chunker) *PubSub {
@@ -222,6 +237,8 @@ func NewPubSub(id string, chunker *Chunker) *PubSub {
pubSub.subChan = make(chan *Subscriber)
pubSub.unsubChan = make(chan *Subscriber)
pubSub.subscribers = make(map[*Subscriber]struct{})
pubSub.stopTimer = time.NewTimer(0)
<-pubSub.stopTimer.C

return pubSub
}
@@ -254,6 +271,11 @@ func (pubSub *PubSub) loop() {

case sub := <-pubSub.unsubChan:
pubSub.doUnsubscribe(sub)

case <-pubSub.stopTimer.C:
if len(pubSub.subscribers) == 0 {
pubSub.stopChunker()
}
}
}
}
@@ -297,11 +319,21 @@ func (pubSub *PubSub) doUnsubscribe(s *Subscriber) {
pubSub.id, s.RemoteAddr, len(pubSub.subscribers))

if len(pubSub.subscribers) == 0 {
pubSub.stopChunker()
if !pubSub.stopTimer.Stop() {
select {
case <-pubSub.stopTimer.C:
default:
}
}
pubSub.stopTimer.Reset(stopDelay)
}
}

func (pubSub *PubSub) startChunker() error {
if pubSub.chunker.Started() {
return nil
}

err := pubSub.chunker.Connect()
if err != nil {
return err
@@ -451,6 +483,8 @@ func loadConfig(filename string) error {
return nil
}

var stopDelay time.Duration

func main() {
source := flag.String("source", "http://example.com/img.mjpg", "source uri")
username := flag.String("username", "", "source uri username")
@@ -459,6 +493,7 @@ func main() {
bind := flag.String("bind", ":8080", "proxy bind address")
path := flag.String("path", "/", "proxy serving path")
maxprocs := flag.Int("maxprocs", 0, "limit number of CPUs used")
flag.DurationVar(&stopDelay, "stopduration", 60*time.Second, "follow source after last client")
flag.Parse()

if *maxprocs > 0 {


Loading…
Cancel
Save