Source code for device_client.client_camera

from device_client import DeviceClient
from device.camera_device import CameraDevice


description = """
Start the camera feed using the **Watch** button in *Simple commands*.

Notes on performance issues:
- The real-time camera feed has high bandwidth requirements, therefore
    - it is not able to handle large number of viewers, ideally just one
    - it may affect the internet connection of other people on the lab network,
      especially when accessed from outside networks (i.e. from home)
- When the video feed noticeably slows down and starts accumulating latency,
  try adjusting the **Quality** setting.
"""

[docs] def connect(name, password, server, device): pretty_name = name.partition("_")[-1].replace("_", " ").title() # Camera name format: camera_name_24 --> streaming at port 5224 # By 'encoding' the port number in the device name we are letting know # the server where to forward the user requests for the camera feed. try: port = 5200 + int(name.rpartition("_")[-1]) except ValueError: port = 5200 client = DeviceClient(name, password, title=f"Camera: {pretty_name}", description=description, logger=name) cam = CameraDevice( device=device, port=port, quality="720p@15", noviewer_timeout=30, log_callback=client.emit_log, logger=name, ) def set_quality(quality): cam.set_quality(quality) client.emit_command_state("set_quality", [quality]) client.register_command(set_quality, "Set quality", inputs=[ {"type": "select", "options": list(cam.quality_options)}, ]) def watching(): if not cam.is_video_running(): client.emit_command_state("set_quality", [cam.quality]) cam.watching() client.register_command(watching, "Watch") client.register_background_task(lambda _: cam.check_watching()) client.keep_server_updated( check_readiness=cam.is_ready, check_readiness_interval=5, server_address=server, retry_on_error=True )