Source code for device.camera_device

from . import base
from .camera_run import (
    check_dependencies, start_stream, start_encoder_rtmp, quality_presets_alt
)

import os
import time


[docs] class CameraDevice(base.HardwareBase): """Stream camera video at http://this-ip-address:5200/stream.mpjpeg [Low latency 500ms-1000ms] Additionally, the stream can be encoded (h264) and send over RTMP to a remote streaming service for large audiences (e.g. YouTube Livestreams). [High latency 10-40s] Use `watching()` and `check_watching()` methods to automatically stop the video stream when nobody is watching. """ def __init__(self, device="/dev/video0", port=5200, quality="medium", noviewer_timeout=30, **kwargs): super().__init__(**kwargs) if not check_dependencies(): error = ( "Missing dependencies (vlc, ffmpeg). Install them using" "'sudo apt install vlc ffmpeg'" ) self.log.critical(error) raise RuntimeError(error) self.device = device self.port = port self._quality = quality self.noviewer_timeout = noviewer_timeout self._viewer_watching = time.time() self._proc_video = None self._proc_encoder = None @staticmethod def _is_running(process): return process is not None and process.poll() is None @property def quality(self): return self._quality @property def quality_options(self): return tuple(quality_presets_alt) # ==== Inherited abstract methods ==== def _connect(self): return self._is_ready() def _disconnect(self): return def _is_ready(self): if not os.path.exists(self.device): self.log.error(f"Path '{self.device}' does not exist.") return False else: return True # ==== Commands ====
[docs] @base.base_command def set_quality(self, quality): if self._quality != quality: self._quality = quality if self.is_video_running(): self.stop_video() self.start_video() return self._quality
[docs] def is_video_running(self): return self._is_running(self._proc_video)
[docs] @base.base_command def start_video(self): if not self.is_video_running(): if self._proc_video is not None and self._proc_video.returncode != 0: self.log.warning( "Previous camera video process has exited with code %d.", self._proc_video.returncode, ) self._proc_video = start_stream(self.device, self._quality) self._viewer_watching = time.time() self.log.info("Started camera feed!")
[docs] @base.base_command def stop_video(self): if self.is_video_running(): self._proc_video.terminate() self._proc_video.wait() self.log.info("Terminated camera feed") self._proc_video = None
[docs] def is_rtmp_running(self): return self._is_running(self._proc_encoder)
[docs] @base.base_command def start_rtmp_stream(self, rtmp_url, key): if not self.is_rtmp_running(): self.start_video() self._proc_encoder = start_encoder_rtmp( rtmp_url=f"{rtmp_url}/{key}", mpjpeg_host=f"127.0.0.1:{self.port}", ) self.log.info(f"Started RTMP stream to {rtmp_url}!")
[docs] @base.base_command def stop_rtmp_stream(self): if self.is_rtmp_running(): self._proc_encoder.terminate() self._proc_encoder.wait() self.log.info(f"Terminated RTMP stream") self._proc_encoder = None
[docs] @base.idle_command def watching(self): """Users can call this method periodically to signalize that they are (still) watching the video stream. See check_watching() for more info. This method also starts the camera stream if not yet running. """ self.start_video() self._viewer_watching = time.time()
[docs] @base.idle_command def check_watching(self): """Call this method periodically (background task) to check if somebody was recently watching the video feed. If no watching() call took place during the specified noviewer_timeout, the video stream is stopped. """ if ( time.time() > self._viewer_watching + self.noviewer_timeout and not self.is_rtmp_running() ): self.stop_video()