Source code for device_client.client_camera
from device_client import DeviceClient
from device.camera_device import CameraDevice
description = """
Start the camera feed using the **Watch** button in *Simple commands*.
Notes on performance issues:
- The real-time camera feed has high bandwidth requirements, therefore
- it is not able to handle large number of viewers, ideally just one
- it may affect the internet connection of other people on the lab network,
especially when accessed from outside networks (i.e. from home)
- When the video feed noticeably slows down and starts accumulating latency,
try adjusting the **Quality** setting.
"""
[docs]
def connect(name, password, server, device):
pretty_name = name.partition("_")[-1].replace("_", " ").title()
# Camera name format: camera_name_24 --> streaming at port 5224
# By 'encoding' the port number in the device name we are letting know
# the server where to forward the user requests for the camera feed.
try:
port = 5200 + int(name.rpartition("_")[-1])
except ValueError:
port = 5200
client = DeviceClient(name, password,
title=f"Camera: {pretty_name}", description=description, logger=name)
cam = CameraDevice(
device=device,
port=port,
quality="720p@15",
noviewer_timeout=30,
log_callback=client.emit_log,
logger=name,
)
def set_quality(quality):
cam.set_quality(quality)
client.emit_command_state("set_quality", [quality])
client.register_command(set_quality, "Set quality", inputs=[
{"type": "select", "options": list(cam.quality_options)},
])
def watching():
if not cam.is_video_running():
client.emit_command_state("set_quality", [cam.quality])
cam.watching()
client.register_command(watching, "Watch")
client.register_background_task(lambda _: cam.check_watching())
client.keep_server_updated(
check_readiness=cam.is_ready,
check_readiness_interval=5,
server_address=server,
retry_on_error=True
)