Republish a MJPEG HTTP image stream using a server in Go
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

269 lines
5.7 KiB

/*
* mjpeg-proxy -- Republish a MJPEG HTTP image stream using a server in Go
*
* Copyright (C) 2015-2020, Valentin Vidic
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package main
import (
"fmt"
"mime/multipart"
"net/http"
"net/textproto"
"strings"
"time"
)
type Subscriber struct {
RemoteAddr string
ChunkChannel chan []byte
}
type PubSub struct {
id string
chunker *Chunker
pubChan chan []byte
subChan chan *Subscriber
unsubChan chan *Subscriber
subscribers map[*Subscriber]struct{}
stopTimer *time.Timer
}
func NewSubscriber(client string) *Subscriber {
sub := new(Subscriber)
sub.RemoteAddr = client
sub.ChunkChannel = make(chan []byte)
return sub
}
func NewPubSub(id string, chunker *Chunker) *PubSub {
pubSub := new(PubSub)
pubSub.id = id
pubSub.chunker = chunker
pubSub.subChan = make(chan *Subscriber)
pubSub.unsubChan = make(chan *Subscriber)
pubSub.subscribers = make(map[*Subscriber]struct{})
pubSub.stopTimer = time.NewTimer(0)
<-pubSub.stopTimer.C
return pubSub
}
func (pubSub *PubSub) Start() {
go pubSub.loop()
}
func (pubSub *PubSub) Subscribe(s *Subscriber) {
pubSub.subChan <- s
}
func (pubSub *PubSub) Unsubscribe(s *Subscriber) {
pubSub.unsubChan <- s
}
func (pubSub *PubSub) loop() {
for {
select {
case data, ok := <-pubSub.pubChan:
if ok {
pubSub.doPublish(data)
} else {
pubSub.stopChunker()
pubSub.stopSubscribers()
}
case sub := <-pubSub.subChan:
pubSub.doSubscribe(sub)
case sub := <-pubSub.unsubChan:
pubSub.doUnsubscribe(sub)
case <-pubSub.stopTimer.C:
if len(pubSub.subscribers) == 0 {
pubSub.stopChunker()
}
}
}
}
func (pubSub *PubSub) doPublish(data []byte) {
subs := pubSub.subscribers
for s := range subs {
select {
case s.ChunkChannel <- data: // try to send
default: // or skip this frame
}
}
}
func (pubSub *PubSub) doSubscribe(s *Subscriber) {
pubSub.subscribers[s] = struct{}{}
fmt.Printf("pubsub[%s]: added subscriber %s (total=%d)\n",
pubSub.id, s.RemoteAddr, len(pubSub.subscribers))
if len(pubSub.subscribers) == 1 {
if err := pubSub.startChunker(); err != nil {
fmt.Printf("pubsub[%s]: failed to start chunker: %s\n",
pubSub.id, err)
pubSub.stopSubscribers()
}
}
}
func (pubSub *PubSub) stopSubscribers() {
for s := range pubSub.subscribers {
close(s.ChunkChannel)
}
}
func (pubSub *PubSub) doUnsubscribe(s *Subscriber) {
delete(pubSub.subscribers, s)
fmt.Printf("pubsub[%s]: removed subscriber %s (total=%d)\n",
pubSub.id, s.RemoteAddr, len(pubSub.subscribers))
if len(pubSub.subscribers) == 0 {
if !pubSub.stopTimer.Stop() {
select {
case <-pubSub.stopTimer.C:
default:
}
}
pubSub.stopTimer.Reset(stopDelay)
}
}
func (pubSub *PubSub) startChunker() error {
if pubSub.chunker.Started() {
return nil
}
err := pubSub.chunker.Connect()
if err != nil {
return err
}
pubSub.pubChan = make(chan []byte)
go pubSub.chunker.Start(pubSub.pubChan)
return nil
}
func (pubSub *PubSub) stopChunker() {
if pubSub.pubChan != nil {
pubSub.chunker.Stop()
}
pubSub.pubChan = nil
}
func clientAddress(r *http.Request) string {
client := r.RemoteAddr
if clientHeader != "" {
header := r.Header.Get(clientHeader)
hosts := strings.Split(header, ",")
if hosts[0] != "" {
client = hosts[0]
}
}
return client
}
func (pubSub *PubSub) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet && r.Method != http.MethodHead {
w.Header().Set("Allow", fmt.Sprintf("%s, %s", http.MethodGet, http.MethodHead))
http.Error(w, fmt.Sprintf("HTTP method %s not supported", r.Method), http.StatusMethodNotAllowed)
return
}
// prepare response for flushing
flusher, ok := w.(http.Flusher)
if !ok {
fmt.Printf("server[%s]: client %s could not be flushed\n",
pubSub.id, r.RemoteAddr)
return
}
// subscribe to new chunks
sub := NewSubscriber(clientAddress(r))
pubSub.Subscribe(sub)
defer pubSub.Unsubscribe(sub)
mw := multipart.NewWriter(w)
contentType := fmt.Sprintf("multipart/x-mixed-replace; boundary=%s", mw.Boundary())
mimeHeader := make(textproto.MIMEHeader)
mimeHeader.Set("Content-Type", "image/jpeg")
var data []byte
var chunkOk, headersSent bool
LOOP:
for {
// wait for next chunk
select {
case data, chunkOk = <-sub.ChunkChannel:
if !chunkOk {
break LOOP
}
case <-r.Context().Done():
break LOOP
}
// send HTTP header before first chunk
if !headersSent {
header := w.Header()
header.Add("Content-Type", contentType)
w.WriteHeader(http.StatusOK)
headersSent = true
}
mimeHeader.Set("Content-Size", fmt.Sprintf("%d", len(data)))
part, err := mw.CreatePart(mimeHeader)
if err != nil {
fmt.Printf("server[%s]: part create failed: %s\n", pubSub.id, err)
return
}
// send image to client
_, err = part.Write(data)
if err != nil {
fmt.Printf("server[%s]: part write failed: %s\n", pubSub.id, err)
return
}
flusher.Flush()
}
if !headersSent && !chunkOk {
fmt.Printf("server[%s]: stream failed\n", pubSub.id)
http.Error(w, "Stream failed", http.StatusServiceUnavailable)
return
}
err := mw.Close()
if err != nil {
fmt.Printf("server[%s]: mime close failed: %s\n", pubSub.id, err)
}
}