Source code for device.camera_device
from . import base
from .camera_run import (
check_dependencies, start_stream, start_encoder_rtmp, quality_presets_alt
)
import os
import time
[docs]
class CameraDevice(base.HardwareBase):
"""Stream camera video at http://this-ip-address:5200/stream.mpjpeg
[Low latency 500ms-1000ms]
Additionally, the stream can be encoded (h264) and send over RTMP to a
remote streaming service for large audiences (e.g. YouTube Livestreams).
[High latency 10-40s]
Use `watching()` and `check_watching()` methods to automatically stop the
video stream when nobody is watching.
"""
def __init__(self, device="/dev/video0", port=5200, quality="medium",
noviewer_timeout=30, **kwargs):
super().__init__(**kwargs)
if not check_dependencies():
error = (
"Missing dependencies (vlc, ffmpeg). Install them using"
"'sudo apt install vlc ffmpeg'"
)
self.log.critical(error)
raise RuntimeError(error)
self.device = device
self.port = port
self._quality = quality
self.noviewer_timeout = noviewer_timeout
self._viewer_watching = time.time()
self._proc_video = None
self._proc_encoder = None
@staticmethod
def _is_running(process):
return process is not None and process.poll() is None
@property
def quality(self):
return self._quality
@property
def quality_options(self):
return tuple(quality_presets_alt)
# ==== Inherited abstract methods ====
def _connect(self):
return self._is_ready()
def _disconnect(self):
return
def _is_ready(self):
if not os.path.exists(self.device):
self.log.error(f"Path '{self.device}' does not exist.")
return False
else:
return True
# ==== Commands ====
[docs]
@base.base_command
def set_quality(self, quality):
if self._quality != quality:
self._quality = quality
if self.is_video_running():
self.stop_video()
self.start_video()
return self._quality
[docs]
def is_video_running(self):
return self._is_running(self._proc_video)
[docs]
@base.base_command
def start_video(self):
if not self.is_video_running():
if self._proc_video is not None and self._proc_video.returncode != 0:
self.log.warning(
"Previous camera video process has exited with code %d.",
self._proc_video.returncode,
)
self._proc_video = start_stream(self.device, self._quality)
self._viewer_watching = time.time()
self.log.info("Started camera feed!")
[docs]
@base.base_command
def stop_video(self):
if self.is_video_running():
self._proc_video.terminate()
self._proc_video.wait()
self.log.info("Terminated camera feed")
self._proc_video = None
[docs]
def is_rtmp_running(self):
return self._is_running(self._proc_encoder)
[docs]
@base.base_command
def start_rtmp_stream(self, rtmp_url, key):
if not self.is_rtmp_running():
self.start_video()
self._proc_encoder = start_encoder_rtmp(
rtmp_url=f"{rtmp_url}/{key}",
mpjpeg_host=f"127.0.0.1:{self.port}",
)
self.log.info(f"Started RTMP stream to {rtmp_url}!")
[docs]
@base.base_command
def stop_rtmp_stream(self):
if self.is_rtmp_running():
self._proc_encoder.terminate()
self._proc_encoder.wait()
self.log.info(f"Terminated RTMP stream")
self._proc_encoder = None
[docs]
@base.idle_command
def watching(self):
"""Users can call this method periodically to signalize that they are
(still) watching the video stream. See check_watching() for more info.
This method also starts the camera stream if not yet running.
"""
self.start_video()
self._viewer_watching = time.time()
[docs]
@base.idle_command
def check_watching(self):
"""Call this method periodically (background task) to check if somebody
was recently watching the video feed. If no watching() call took place
during the specified noviewer_timeout, the video stream is stopped.
"""
if (
time.time() > self._viewer_watching + self.noviewer_timeout
and not self.is_rtmp_running()
):
self.stop_video()