Recordings Management (Programmatically)

Here we will show you how can schedule, start and stop Recordings using Python.

Scheduling a Recording

First, we need a graphql client. In this example, we will use the python_graphql_client library:

pip install python_graphql_client
import os
import sys
from datetime import datetime
from python_graphql_client import GraphqlClient

endpoint = os.environ["HASURA_GRAPHQL_ENDPOINT"]
secret = os.environ["HASURA_GRAPHQL_ADMIN_SECRET"]

client = GraphqlClient(endpoint=endpoint, headers={ "x-hasura-admin-secret": secret })

Next, we can define the following queries for:

  • Getting the id of the camera we want to record with (camera_with_model_and_serial)
  • Adding a recording session (insert_recording_session)
  • Adding a record to a session (insert_record)
    • A record defines that a camera should be used to record data for a session
  • Adding a stream to a record (insert_stream)
    • A stream defines the kind of data a camera should record. You can have multiple streams per camera as they could record color, depth and infra at the same time
  • Setting a sessions state (update_recording_session_state)
    • Allows us to start and stop sessions.
camera_with_model_and_serial_query = """query CameraWithModelAndSerial($model: camera_model_enum!, $serial: String!) {
  cameras(where: {model: {_eq: $model}, serial: {_eq: $serial}}) {

def camera_with_model_and_serial(model, serial):
    result = client.execute(
            "model": model,
            "serial": serial,
    return result["data"]["cameras"][0]["id"]

insert_recording_session_query = """mutation InsertRecordingSession(
  $name: String!,
  $startDate: timestamp!,
  $endDate: timestamp!,
  $mode: RecordingSessionMode_enum!,
  $state: RecordingSessionState_enum!
  $maxSizeMegabytes: Int!,
  $maxTimeSeconds: Int!,
) {
  insertRecordingSession(object: {
    name: $name,
    startDate: $startDate,
    endDate: $endDate,
    mode: $mode,
    state: $state
    maxTimeSeconds: $maxTimeSeconds,
    maxSizeMegabytes: $maxSizeMegabytes,
    storage: { data: {
        totalSize: 0,
        estimatedSize: 0,
  }) {

def insert_recording_session(now, later, name):
    return client.execute(
            "name": name,
            "startDate": now,
            "endDate": later,
            "mode": "Storage",
            "state": "Stopped",
            "maxTimeSeconds": "0",
            "maxSizeMegabytes": "0",

insert_record_query = """mutation InsertRecord($recordingId: uuid!, $cameraId: uuid!) {
  insertRecord(object: {cameraId: $cameraId, recordingId: $recordingId}) {

def insert_record(recording_session_id, camera_id):
    return client.execute(
            "recordingId": recording_session_id,
            "cameraId": camera_id,

insert_stream_query = """mutation InsertStream(
  $recordId: uuid!,
  $type: stream_type_enum!,
  $alignTo: String!,
  $encoder: stream_encoder_enum!,
  $width: Int!
  $height: Int!,
  $nearCut: Int!,
  $farCut: Int!,
  $bitrate: Int!,
  $frameRate: Int!,
) {
  insertStream(object: {
    recordId: $recordId,
    type: $type,
    alignTo: $alignTo,
    encoder: $encoder,
    width: $width,
    height: $height,
    nearCut: $nearCut,
    farCut: $farCut,
    bitrate: $bitrate
    frameRate: $frameRate,
  }) {

def insert_stream(
    record_id, type, align_to, encoder, width, height, near_cut, far_cut, bitrate, frame_rate
    return client.execute(
            "recordId": record_id,
            "type": type,
            "alignTo": align_to,
            "encoder": encoder,
            "width": width,
            "height": height,
            "nearCut": near_cut,
            "farCut": far_cut,
            "bitrate": bitrate,
            "frameRate": frame_rate,

update_recording_session_state_query = """mutation UpdateRecordingSessionState($id: uuid!, $state: RecordingSessionState_enum!) {
  updateRecordingSession(pk_columns: {id: $id}, _set: {state: $state}) {

def update_recording_session_state(session_id, state):
    client.execute(update_recording_session_state_query, {"id": session_id, "state": state})

With these functions, we can make small scripts for working with recording sessions. Here is a small script that can add and stop a recording session with one camera and two streams:

# A simple script that allows you to start and stop recording sessions
def print_help():
    print(f"To start: {sys.argv[0]} add <camera-serial> <date>")
    print(f"To stop: {sys.argv[0]} stop <recording-session-id>")

def main():
    if len(sys.argv) < 2:
        print("Missing start/stop argument")
        return 1

    if sys.argv[1] == "start":
        if len(sys.argv) < 4:
            print("Missing camera-serial or date argument")
            return 1

        camera_id = camera_with_model_and_serial("REALSENSE_D435", sys.argv[2])
        now ="%Y-%m-%dT%H:%M:%S")
        later = sys.argv[3]

        recording_session_id = insert_recording_session(now, later, "mysession")        
        record_id = insert_record(recording_session_id, camera_id)

        insert_stream(record_id, "color", "color", "HEVC", "1280", "720", "300", "1000", "1000000", "15")
        insert_stream(record_id, "depth", "color", "Classic", "1280", "720", "300", "1000", "5000000", "15")
        update_recording_session_state(recording_session_id, "Scheduled")

        print(f"Started a Recording Session with start date: {now} and end date {later}.")
        print("Note the recording session id to stop the session early:")
    elif sys.argv[1] == "stop":
        if len(sys.argv) < 3:
            print("Missing recording-session-id argument")
            return 1
        update_recording_session_state(sys.argv[2], "Stopped")
    return 0

if __name__ == '__main__':

You can also trigger a session to start and stop recording with low latency. To do this we need to have the session in the Standby state:

import time

def main():
      # Initialize parser
    parser = argparse.ArgumentParser()
    # Adding optional argument
    parser.add_argument("-m", "--Model", help = "Introduce the model of the camera.")
    parser.add_argument("-s", "--Serial", help = "Introduce the serial number of the camera.")
    parser.add_argument("-r", "--Recording", help = "Introduce the name of the recording session.")

    # Read arguments from command line
    args = parser.parse_args()

    if not args.Model:
      sys.exit("You are missing the model of the camera.")
    if not args.Serial:
      sys.exit("You are missing the serial number of the camera.")
    if not args.Recording:
      sys.exit("You are missing the name of the recording session.")

    camera_id = camera_with_model_and_serial(args.Model, args.Serial)
    now = datetime.min.strftime("%Y-%m-%dT%H:%M:%S")
    later = datetime.max.strftime("%Y-%m-%dT%H:%M:%S")

    recording_session_id = insert_recording_session(now, later, args.Recording)
    record_id = insert_record(recording_session_id, camera_id)

    insert_stream(record_id, "color", "color", "HEVC", 1280, 720, 300, 1000, 1000000, 30)
    insert_stream(record_id, "depth", "color", "Classic", 1280, 720, 300, 1000, 5000000, 30)
    update_recording_session_state(recording_session_id, "Standby")

    # Wait 5 seconds and record for 10. Do so 5 times.
    for i in range(5):
        update_recording_session_state(recording_session_id, "Started")
        update_recording_session_state(recording_session_id, "Standby")

    update_recording_session_state(recording_session_id, "Stopped")

if __name__ == '__main__':

You can find the full script in the aivero-python-interface under the name

The values which are allowed for the Model argument are:

  • REALSENSE (This is for the D405 camera)
  • V4L2CAMERA (This is for RaspberryPi cameras)