Republish a MJPEG HTTP image stream using a server in Go. Also available at https://github.com/kescherCode/mjpeg-proxy || Originally forked from: https://github.com/vvidic/mjpeg-proxy
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

521 lines
11 KiB

  1. /*
  2. * mjpeg-proxy -- Republish a MJPEG HTTP image stream using a server in Go
  3. *
  4. * Copyright (C) 2015-2020, Valentin Vidic
  5. *
  6. * This program is free software: you can redistribute it and/or modify
  7. * it under the terms of the GNU General Public License as published by
  8. * the Free Software Foundation, either version 3 of the License, or
  9. * (at your option) any later version.
  10. *
  11. * This program is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. * GNU General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU General Public License
  17. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. */
  19. package main
  20. import (
  21. "encoding/json"
  22. "errors"
  23. "flag"
  24. "fmt"
  25. "io"
  26. "io/ioutil"
  27. "mime"
  28. "mime/multipart"
  29. "net/http"
  30. "net/textproto"
  31. "net/url"
  32. "os"
  33. "runtime"
  34. "strings"
  35. "time"
  36. )
  37. /* Sample source stream starts like this:
  38. HTTP/1.1 200 OK
  39. Content-Type: multipart/x-mixed-replace;boundary=myboundary
  40. Cache-Control: no-cache
  41. Pragma: no-cache
  42. --myboundary
  43. Content-Type: image/jpeg
  44. Content-Length: 36291
  45. JPEG data...
  46. */
  47. type Chunker struct {
  48. id string
  49. source string
  50. username string
  51. password string
  52. resp *http.Response
  53. boundary string
  54. stop chan struct{}
  55. }
  56. func NewChunker(id, source, username, password string) (*Chunker, error) {
  57. chunker := new(Chunker)
  58. sourceUrl, err := url.Parse(source)
  59. if err != nil {
  60. return nil, err
  61. }
  62. if !sourceUrl.IsAbs() {
  63. return nil, fmt.Errorf("uri is not absolute: %s", source)
  64. }
  65. chunker.id = id
  66. chunker.source = source
  67. chunker.username = username
  68. chunker.password = password
  69. return chunker, nil
  70. }
  71. func (chunker *Chunker) Connect() error {
  72. fmt.Printf("chunker[%s]: connecting to %s\n", chunker.id, chunker.source)
  73. req, err := http.NewRequest("GET", chunker.source, nil)
  74. if err != nil {
  75. return err
  76. }
  77. if chunker.username != "" && chunker.password != "" {
  78. req.SetBasicAuth(chunker.username, chunker.password)
  79. }
  80. client := &http.Client{}
  81. resp, err := client.Do(req)
  82. if err != nil {
  83. return err
  84. }
  85. if resp.StatusCode != http.StatusOK {
  86. chunker.closeResponse(resp)
  87. return fmt.Errorf("request failed: %s", resp.Status)
  88. }
  89. boundary, err := getBoundary(resp)
  90. if err != nil {
  91. chunker.closeResponse(resp)
  92. return err
  93. }
  94. chunker.resp = resp
  95. chunker.boundary = boundary
  96. chunker.stop = make(chan struct{})
  97. return nil
  98. }
  99. func (chunker *Chunker) closeResponse(resp *http.Response) {
  100. err := resp.Body.Close()
  101. if err != nil {
  102. fmt.Printf("chunker[%s]: body close failed: %s\n", chunker.id, err)
  103. }
  104. }
  105. func getBoundary(resp *http.Response) (string, error) {
  106. contentType := resp.Header.Get("Content-Type")
  107. mediaType, params, err := mime.ParseMediaType(contentType)
  108. if err != nil {
  109. return "", err
  110. }
  111. if !strings.HasPrefix(mediaType, "multipart/") {
  112. return "", fmt.Errorf("expected multipart media type: %s", contentType)
  113. }
  114. boundary := params["boundary"]
  115. if boundary == "" {
  116. return "", fmt.Errorf("boundary not found: %s", contentType)
  117. }
  118. return boundary, nil
  119. }
  120. func (chunker *Chunker) GetHeader() http.Header {
  121. return chunker.resp.Header
  122. }
  123. func (chunker *Chunker) Start(pubChan chan []byte) {
  124. fmt.Printf("chunker[%s]: started\n", chunker.id)
  125. body := chunker.resp.Body
  126. defer func() {
  127. err := body.Close()
  128. if err != nil {
  129. fmt.Printf("chunker[%s]: body close failed: %s\n", chunker.id, err)
  130. }
  131. }()
  132. defer close(pubChan)
  133. var failure error
  134. mr := multipart.NewReader(body, chunker.boundary)
  135. ChunkLoop:
  136. for {
  137. part, err := mr.NextPart()
  138. if err == io.EOF {
  139. break ChunkLoop
  140. }
  141. if err != nil {
  142. failure = err
  143. break ChunkLoop
  144. }
  145. data, err := ioutil.ReadAll(part)
  146. if err != nil {
  147. failure = err
  148. break ChunkLoop
  149. }
  150. err = part.Close()
  151. if err != nil {
  152. failure = err
  153. break ChunkLoop
  154. }
  155. if len(data) == 0 {
  156. failure = errors.New("received final chunk of size 0")
  157. break ChunkLoop
  158. }
  159. select {
  160. case <-chunker.stop:
  161. break ChunkLoop
  162. case pubChan <- append(data):
  163. }
  164. }
  165. if failure != nil {
  166. fmt.Printf("chunker[%s]: failed: %s\n", chunker.id, failure)
  167. } else {
  168. fmt.Printf("chunker[%s]: stopped\n", chunker.id)
  169. }
  170. }
  171. func (chunker *Chunker) Stop() {
  172. fmt.Printf("chunker[%s]: stopping\n", chunker.id)
  173. close(chunker.stop)
  174. }
  175. func (chunker *Chunker) Started() bool {
  176. if chunker.stop == nil { // Never started
  177. return false
  178. }
  179. select {
  180. case <-chunker.stop: // Already stopped
  181. return false
  182. default:
  183. return true // Still running
  184. }
  185. }
  186. type PubSub struct {
  187. id string
  188. chunker *Chunker
  189. pubChan chan []byte
  190. subChan chan *Subscriber
  191. unsubChan chan *Subscriber
  192. subscribers map[*Subscriber]struct{}
  193. stopTimer *time.Timer
  194. }
  195. func NewPubSub(id string, chunker *Chunker) *PubSub {
  196. pubSub := new(PubSub)
  197. pubSub.id = id
  198. pubSub.chunker = chunker
  199. pubSub.subChan = make(chan *Subscriber)
  200. pubSub.unsubChan = make(chan *Subscriber)
  201. pubSub.subscribers = make(map[*Subscriber]struct{})
  202. pubSub.stopTimer = time.NewTimer(0)
  203. <-pubSub.stopTimer.C
  204. return pubSub
  205. }
  206. func (pubSub *PubSub) Start() {
  207. go pubSub.loop()
  208. }
  209. func (pubSub *PubSub) Subscribe(s *Subscriber) {
  210. pubSub.subChan <- s
  211. }
  212. func (pubSub *PubSub) Unsubscribe(s *Subscriber) {
  213. pubSub.unsubChan <- s
  214. }
  215. func (pubSub *PubSub) loop() {
  216. for {
  217. select {
  218. case data, ok := <-pubSub.pubChan:
  219. if ok {
  220. pubSub.doPublish(data)
  221. } else {
  222. pubSub.stopChunker()
  223. pubSub.stopSubscribers()
  224. }
  225. case sub := <-pubSub.subChan:
  226. pubSub.doSubscribe(sub)
  227. case sub := <-pubSub.unsubChan:
  228. pubSub.doUnsubscribe(sub)
  229. case <-pubSub.stopTimer.C:
  230. if len(pubSub.subscribers) == 0 {
  231. pubSub.stopChunker()
  232. }
  233. }
  234. }
  235. }
  236. func (pubSub *PubSub) doPublish(data []byte) {
  237. subs := pubSub.subscribers
  238. for s := range subs {
  239. select {
  240. case s.ChunkChannel <- data: // try to send
  241. default: // or skip this frame
  242. }
  243. }
  244. }
  245. func (pubSub *PubSub) doSubscribe(s *Subscriber) {
  246. pubSub.subscribers[s] = struct{}{}
  247. fmt.Printf("pubsub[%s]: added subscriber %s (total=%d)\n",
  248. pubSub.id, s.RemoteAddr, len(pubSub.subscribers))
  249. if len(pubSub.subscribers) == 1 {
  250. if err := pubSub.startChunker(); err != nil {
  251. fmt.Printf("pubsub[%s]: failed to start chunker: %s\n",
  252. pubSub.id, err)
  253. pubSub.stopSubscribers()
  254. }
  255. }
  256. }
  257. func (pubSub *PubSub) stopSubscribers() {
  258. for s := range pubSub.subscribers {
  259. close(s.ChunkChannel)
  260. }
  261. }
  262. func (pubSub *PubSub) doUnsubscribe(s *Subscriber) {
  263. delete(pubSub.subscribers, s)
  264. fmt.Printf("pubsub[%s]: removed subscriber %s (total=%d)\n",
  265. pubSub.id, s.RemoteAddr, len(pubSub.subscribers))
  266. if len(pubSub.subscribers) == 0 {
  267. if !pubSub.stopTimer.Stop() {
  268. select {
  269. case <-pubSub.stopTimer.C:
  270. default:
  271. }
  272. }
  273. pubSub.stopTimer.Reset(stopDelay)
  274. }
  275. }
  276. func (pubSub *PubSub) startChunker() error {
  277. if pubSub.chunker.Started() {
  278. return nil
  279. }
  280. err := pubSub.chunker.Connect()
  281. if err != nil {
  282. return err
  283. }
  284. pubSub.pubChan = make(chan []byte)
  285. go pubSub.chunker.Start(pubSub.pubChan)
  286. return nil
  287. }
  288. func (pubSub *PubSub) stopChunker() {
  289. if pubSub.pubChan != nil {
  290. pubSub.chunker.Stop()
  291. }
  292. pubSub.pubChan = nil
  293. }
  294. type Subscriber struct {
  295. RemoteAddr string
  296. ChunkChannel chan []byte
  297. }
  298. func NewSubscriber(client string) *Subscriber {
  299. sub := new(Subscriber)
  300. sub.RemoteAddr = client
  301. sub.ChunkChannel = make(chan []byte)
  302. return sub
  303. }
  304. func (pubSub *PubSub) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  305. // prepare response for flushing
  306. flusher, ok := w.(http.Flusher)
  307. if !ok {
  308. fmt.Printf("server[%s]: client %s could not be flushed\n",
  309. pubSub.id, r.RemoteAddr)
  310. return
  311. }
  312. // subscribe to new chunks
  313. sub := NewSubscriber(r.RemoteAddr)
  314. pubSub.Subscribe(sub)
  315. defer pubSub.Unsubscribe(sub)
  316. mw := multipart.NewWriter(w)
  317. contentType := fmt.Sprintf("multipart/x-mixed-replace; boundary=%s", mw.Boundary())
  318. mimeHeader := make(textproto.MIMEHeader)
  319. mimeHeader.Set("Content-Type", "image/jpeg")
  320. headersSent := false
  321. for {
  322. // wait for next chunk
  323. data, ok := <-sub.ChunkChannel
  324. if !ok {
  325. return
  326. }
  327. // send HTTP header before first chunk
  328. if !headersSent {
  329. header := w.Header()
  330. header.Add("Content-Type", contentType)
  331. w.WriteHeader(http.StatusOK)
  332. headersSent = true
  333. }
  334. mimeHeader.Set("Content-Size", fmt.Sprintf("%d", len(data)))
  335. part, err := mw.CreatePart(mimeHeader)
  336. if err != nil {
  337. fmt.Printf("server[%s]: part create failed: %s\n", pubSub.id, err)
  338. return
  339. }
  340. // send image to client
  341. _, err = part.Write(data)
  342. if err != nil {
  343. fmt.Printf("server[%s]: part write failed: %s\n", pubSub.id, err)
  344. return
  345. }
  346. flusher.Flush()
  347. }
  348. err := mw.Close()
  349. if err != nil {
  350. fmt.Printf("server[%s]: mime close failed: %s\n", pubSub.id, err)
  351. }
  352. }
  353. func startSource(source, username, password, proxyUrl string) error {
  354. chunker, err := NewChunker(proxyUrl, source, username, password)
  355. if err != nil {
  356. return fmt.Errorf("chunker[%s]: create failed: %s", proxyUrl, err)
  357. }
  358. pubSub := NewPubSub(proxyUrl, chunker)
  359. pubSub.Start()
  360. fmt.Printf("chunker[%s]: serving from %s\n", proxyUrl, source)
  361. http.Handle(proxyUrl, pubSub)
  362. return nil
  363. }
  364. type configSource struct {
  365. Source string
  366. Username string
  367. Password string
  368. Path string
  369. }
  370. func loadConfig(filename string) error {
  371. file, err := os.Open(filename)
  372. if err != nil {
  373. return err
  374. }
  375. defer func() {
  376. err := file.Close()
  377. if err != nil {
  378. fmt.Printf("config: file close failed for %s: %s\n", file.Name(), err)
  379. }
  380. }()
  381. sources := make([]configSource, 0)
  382. dec := json.NewDecoder(file)
  383. err = dec.Decode(&sources)
  384. if err != nil && err != io.EOF {
  385. return err
  386. }
  387. exists := make(map[string]bool)
  388. for _, conf := range sources {
  389. if exists[conf.Path] {
  390. return fmt.Errorf("duplicate proxy path: %s", conf.Path)
  391. }
  392. err = startSource(conf.Source, conf.Username, conf.Password, conf.Path)
  393. if err != nil {
  394. return err
  395. }
  396. exists[conf.Path] = true
  397. }
  398. return nil
  399. }
  400. var stopDelay time.Duration
  401. func main() {
  402. source := flag.String("source", "http://example.com/img.mjpg", "source uri")
  403. username := flag.String("username", "", "source uri username")
  404. password := flag.String("password", "", "source uri password")
  405. sources := flag.String("sources", "", "JSON configuration file to load sources from")
  406. bind := flag.String("bind", ":8080", "proxy bind address")
  407. path := flag.String("path", "/", "proxy serving path")
  408. maxprocs := flag.Int("maxprocs", 0, "limit number of CPUs used")
  409. flag.DurationVar(&stopDelay, "stopduration", 60*time.Second, "follow source after last client")
  410. flag.Parse()
  411. if *maxprocs > 0 {
  412. runtime.GOMAXPROCS(*maxprocs)
  413. }
  414. var err error
  415. if *sources != "" {
  416. err = loadConfig(*sources)
  417. } else {
  418. err = startSource(*source, *username, *password, *path)
  419. }
  420. if err != nil {
  421. fmt.Println("config:", err)
  422. os.Exit(1)
  423. }
  424. fmt.Printf("server: starting on address %s\n", *bind)
  425. err = http.ListenAndServe(*bind, nil)
  426. if err != nil {
  427. fmt.Println("server:", err)
  428. os.Exit(1)
  429. }
  430. }