Skip to content

Monitor Module

create_monitor_schema()

Create the monitor schema with:

  • App name;
  • Process status;
  • Process name;
  • Entity type.

Returns:

Type Description
List[Dict[str, str]]

List[Dict[str, str]]: The list of dicts containing the monitor schema with: app_name, process_status, process_name, entity_type.

Source code in carol_app_monitor/monitor.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def create_monitor_schema() -> List[Dict[str, str]]:
    """Create the monitor schema with:

    * App name;
    * Process status;
    * Process name;
    * Entity type.

    Returns:
        List[Dict[str, str]]: The list of dicts containing the monitor schema\
        with: `app_name`, `process_status`, `process_name`, `entity_type`.
    """
    properties = get_properties_from_app()

    app_names = get_all_carol_app_names(properties)

    process_statuses = get_process_status(app_names)
    process_names = get_process_name(app_names)
    entity_types = get_entity_type(app_names)

    temp_schema = []
    for process_status, process_name, entity_type in zip(
        process_statuses, process_names, entity_types
    ):
        merged_dict = merge_dicts(process_status, process_name, entity_type)

        temp_schema.append(merged_dict)

    monitor_schema = filter_dicts_by_values(temp_schema, None, drop=True)

    return monitor_schema

start_online_processes(monitor_schema, num_retries=2)

Start all online processes that's not running.

This function will filter online APPs from all CarolApps and check which isn't running. To this cases, the start_online_processes() function will try to start the process for num_retries explicited.

Parameters:

Name Type Description Default
monitor_schema List[Dict[str, str]]

The list of dicts containing app_name, process_status, process_name, entity_type.

required
num_retries int

Number of retries that function will execute. Defaults to 2.

2

Returns:

Type Description
Dict[str, Dict[str, str]]

Dict[str, Dict[str, str]]: The dictionary contaning the app_name, status, success and detail

Source code in carol_app_monitor/monitor.py
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def start_online_processes(
    monitor_schema: List[Dict[str, str]], num_retries: int = 2
) -> Dict[str, Dict[str, str]]:
    """Start all online processes that's not running.

    This function will filter online APPs from all CarolApps and check which\
    isn't running. To this cases, the `start_online_processes()` function\
    will try to start the process for `num_retries` explicited.

    Args:
        monitor_schema (List[Dict[str, str]]): The list of dicts containing\
        `app_name`, `process_status`, `process_name`, `entity_type`.

        num_retries (int, optional): Number of retries that function will\
            execute. Defaults to 2.

    Returns:
        Dict[str, Dict[str, str]]: The dictionary contaning the `app_name`,\
        `status`, `success` and `detail`
    """
    online_processes = filter_dicts_by_values(
        monitor_schema, "ONLINE", keep=True
    )
    not_running_processes = filter_dicts_by_values(
        online_processes, "RUNNING", drop=True
    )

    processes_status = []
    for process in not_running_processes:
        retry = 1
        app_name = process.get("app_name")
        process_name = process.get("process_name")
        try:
            process_info = start_app_process(process)
        except TaskError:
            try:
                time.sleep(round(12 * random.random() * 5, 2))
                process_info = start_app_process(process)
            except TaskError as error:
                process_status = {
                    "app_name": app_name,
                    "status": {"success": "no", "detail": repr(error)},
                }

                processes_status.append(process_status)
                break

        task_name = f"AI Process: {process_name} START (Carol App: {app_name})"
        task_info = check_task_status(process_info.get("task_id"), task_name)

        if task_info.get("task_status") != "COMPLETED":
            while retry <= num_retries:
                process_info = start_app_process(process)

                task_info = check_task_status(
                    process_info.get("task_id"), task_name
                )

                if task_info.get("task_status") == "COMPLETED":
                    break
                else:
                    time.sleep(round(12 * random.random() * 5, 2))
                    retry += 1

        if retry > num_retries:
            error = MaxRetryError(num_retries)

            process_status = {
                "app_name": app_name,
                "status": {"success": "no", "detail": repr(error)},
            }

            processes_status.append(process_status)

            continue

        process_status = {
            "app_name": app_name,
            "status": {"success": "yes", "detail": task_info.get("detail")},
        }

        processes_status.append(process_status)

    return processes_status