Republish a MJPEG HTTP image stream using a server in Go
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

171 lines
4.2 KiB

7 months ago
7 months ago
7 months ago
7 months ago
  1. /*
  2. * mjpeg-proxy -- Republish a MJPEG HTTP image stream using a server in Go
  3. *
  4. * Copyright (C) 2015-2020, Valentin Vidic
  5. *
  6. * This program is free software: you can redistribute it and/or modify
  7. * it under the terms of the GNU General Public License as published by
  8. * the Free Software Foundation, either version 3 of the License, or
  9. * (at your option) any later version.
  10. *
  11. * This program is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. * GNU General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU General Public License
  17. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. */
  19. package main
  20. import (
  21. "encoding/json"
  22. "flag"
  23. "fmt"
  24. "io"
  25. "net"
  26. "net/http"
  27. "os"
  28. "runtime"
  29. "strings"
  30. "time"
  31. )
  32. var clientHeader string
  33. var stopDelay time.Duration
  34. var tcpSendBuffer int
  35. type configSource struct {
  36. Source string
  37. Username string
  38. Password string
  39. Path string
  40. Rate float64
  41. }
  42. func startSource(source, username, password, proxyUrl string, rate float64) error {
  43. chunker, err := NewChunker(proxyUrl, source, username, password, rate)
  44. if err != nil {
  45. return fmt.Errorf("chunker[%s]: create failed: %s", proxyUrl, err)
  46. }
  47. pubSub := NewPubSub(proxyUrl, chunker)
  48. pubSub.Start()
  49. fmt.Printf("chunker[%s]: serving from %s\n", proxyUrl, source)
  50. http.Handle(proxyUrl, pubSub)
  51. return nil
  52. }
  53. func loadConfig(filename string) error {
  54. file, err := os.Open(filename)
  55. if err != nil {
  56. return err
  57. }
  58. defer func() {
  59. err := file.Close()
  60. if err != nil {
  61. fmt.Printf("config: file close failed for %s: %s\n", file.Name(), err)
  62. }
  63. }()
  64. sources := make([]configSource, 0)
  65. dec := json.NewDecoder(file)
  66. err = dec.Decode(&sources)
  67. if err != nil && err != io.EOF {
  68. return err
  69. }
  70. exists := make(map[string]bool)
  71. for _, conf := range sources {
  72. if exists[conf.Path] {
  73. return fmt.Errorf("duplicate proxy path: %s", conf.Path)
  74. }
  75. err = startSource(conf.Source, conf.Username, conf.Password, conf.Path, conf.Rate)
  76. if err != nil {
  77. return err
  78. }
  79. exists[conf.Path] = true
  80. }
  81. return nil
  82. }
  83. func connStateEvent(conn net.Conn, event http.ConnState) {
  84. if event == http.StateActive && tcpSendBuffer > 0 {
  85. switch c := conn.(type) {
  86. case *net.TCPConn:
  87. c.SetWriteBuffer(tcpSendBuffer)
  88. case *net.UnixConn:
  89. c.SetWriteBuffer(tcpSendBuffer)
  90. }
  91. }
  92. }
  93. func unixListen(path string) (net.Listener, error) {
  94. fi, err := os.Stat(path)
  95. if !os.IsNotExist(err) && fi.Mode()&os.ModeSocket != 0 {
  96. os.Remove(path)
  97. }
  98. return net.Listen("unix", path)
  99. }
  100. func listenAndServe(addr string) error {
  101. var listener net.Listener
  102. var err error
  103. if strings.HasPrefix(addr, "unix:") {
  104. listener, err = unixListen(strings.TrimPrefix(addr, "unix:"))
  105. } else {
  106. listener, err = net.Listen("tcp", addr)
  107. }
  108. if err != nil {
  109. return err
  110. }
  111. fmt.Printf("server: starting on address %s\n", addr)
  112. server := &http.Server{
  113. ConnState: connStateEvent,
  114. }
  115. return server.Serve(listener)
  116. }
  117. func main() {
  118. source := flag.String("source", "http://example.com/img.mjpg", "source uri")
  119. username := flag.String("username", "", "source uri username")
  120. password := flag.String("password", "", "source uri password")
  121. sources := flag.String("sources", "", "JSON configuration file to load sources from")
  122. bind := flag.String("bind", ":8080", "proxy bind address")
  123. path := flag.String("path", "/", "proxy serving path")
  124. rate := flag.Float64("rate", 0, "limit output frame rate")
  125. maxprocs := flag.Int("maxprocs", 0, "limit number of CPUs used")
  126. flag.DurationVar(&stopDelay, "stopduration", 60*time.Second, "follow source after last client")
  127. flag.IntVar(&tcpSendBuffer, "sendbuffer", 4096, "limit buffering of frames")
  128. flag.StringVar(&clientHeader, "clientheader", "", "request header with client address")
  129. flag.Parse()
  130. if *maxprocs > 0 {
  131. runtime.GOMAXPROCS(*maxprocs)
  132. }
  133. var err error
  134. if *sources != "" {
  135. err = loadConfig(*sources)
  136. } else {
  137. err = startSource(*source, *username, *password, *path, *rate)
  138. }
  139. if err != nil {
  140. fmt.Println("config:", err)
  141. os.Exit(1)
  142. }
  143. err = listenAndServe(*bind)
  144. if err != nil {
  145. fmt.Println("server:", err)
  146. os.Exit(1)
  147. }
  148. }