diff --git a/.basedpyright/baseline.json b/.basedpyright/baseline.json index 3ec2963299..09f994260b 100644 --- a/.basedpyright/baseline.json +++ b/.basedpyright/baseline.json @@ -1813,14 +1813,6 @@ } ], "./monitoring/mock_uss/ridsp/user_notifications.py": [ - { - "code": "reportArgumentType", - "range": { - "startColumn": 65, - "endColumn": 70, - "lineCount": 1 - } - }, { "code": "reportCallIssue", "range": { @@ -1852,30 +1844,6 @@ "endColumn": 61, "lineCount": 1 } - }, - { - "code": "reportInvalidTypeForm", - "range": { - "startColumn": 10, - "endColumn": 18, - "lineCount": 1 - } - }, - { - "code": "reportInvalidTypeForm", - "range": { - "startColumn": 45, - "endColumn": 53, - "lineCount": 1 - } - }, - { - "code": "reportArgumentType", - "range": { - "startColumn": 20, - "endColumn": 51, - "lineCount": 1 - } } ], "./monitoring/mock_uss/scd_injection/routes_injection.py": [ diff --git a/monitoring/mock_uss/ridsp/user_notifications.py b/monitoring/mock_uss/ridsp/user_notifications.py index 0bd613e512..28d4ac47d0 100644 --- a/monitoring/mock_uss/ridsp/user_notifications.py +++ b/monitoring/mock_uss/ridsp/user_notifications.py @@ -2,6 +2,7 @@ import arrow from implicitdict import ImplicitDict, StringBasedDateTime +from uas_standards.astm.f3411.v22a import constants from uas_standards.interuss.automated_testing.rid.v1.injection import ( Time, UserNotification, @@ -94,22 +95,52 @@ def check_and_generate_missing_fields_notifications( def check_and_generate_slow_update_notification( injected_flights: list[injection_api.TestFlight], -) -> list[datetime]: +) -> list[datetime.datetime]: """ Iterate over the provided list of injected TestFlight objects and, for any flight that has an average update rate under 1Hz, return a time for which a notification should be sent to the operator. """ - operator_slow_update_notifications: list[datetime] = [] + operator_slow_update_notifications: list[datetime.datetime] = [] for f in injected_flights: # Mean rate is not technically correct as per Net0040 # (20% of the samples may be above 1Hz with a mean rate below 1Hz), # but sufficient to trigger a notification to test the relevant scenario. - mean_rate = f.get_mean_update_rate_hz() - if mean_rate and mean_rate < 0.99: - # Arbitrarily use middle of the flight as notification time: - f_start, f_end = f.get_span() - if f_start and f_end: + + f_start, _ = f.get_span() + if not f_start: + continue + + # Compute update rate in 1s buckets: + rates = f.get_update_rates() + + if not rates: + continue + + # Check in a moving window of 10s, that NetMinUasLocRefreshPercentage + # samples are >= NetMinUasLocRefreshFrequency + MOVING_WINDOW_DURATION: int = 10 + + if len(rates) < MOVING_WINDOW_DURATION: + continue + + for wpos in range(0, len(rates) - MOVING_WINDOW_DURATION): + count_ok = sum( + [ + 1 if rate >= constants.NetMinUasLocRefreshFrequencyHz else 0 + for rate in rates[wpos : wpos + MOVING_WINDOW_DURATION] + ] + ) + + if ( + count_ok + < constants.NetMinUasLocRefreshPercentage + / 100.0 + * MOVING_WINDOW_DURATION + ): operator_slow_update_notifications.append( - f_start + (f_end - f_start) / 2 + f_start + + datetime.timedelta( + seconds=wpos + 2 + MOVING_WINDOW_DURATION + ) # get_update_rates is skipping the first 2 seconds (moving average of 3) ) return operator_slow_update_notifications diff --git a/monitoring/monitorlib/rid_automated_testing/injection_api.py b/monitoring/monitorlib/rid_automated_testing/injection_api.py index 866f5e62e4..9b1335564f 100644 --- a/monitoring/monitorlib/rid_automated_testing/injection_api.py +++ b/monitoring/monitorlib/rid_automated_testing/injection_api.py @@ -1,4 +1,5 @@ import datetime +from collections import defaultdict import arrow import s2sphere @@ -226,10 +227,9 @@ def get_rect(self) -> s2sphere.LatLngRect | None: ] ) - def get_mean_update_rate_hz(self) -> float | None: - """ - Calculate the mean update rate of the telemetry in Hz - """ + def get_update_rates(self) -> list[int] | None: + """Return the update rate for every second, relative to the start of the flight, with a moving windows of 3 seconds.""" + if not self.telemetry or len(self.telemetry) == 1: return None # TODO check if required or not (may have been called earlier?) @@ -239,7 +239,26 @@ def get_mean_update_rate_hz(self) -> float | None: return start = self.telemetry[0].timestamp.datetime end = self.telemetry[-1].timestamp.datetime - return (len(self.telemetry) - 1) / (end - start).seconds + + buckets = defaultdict(int) + + for frame in self.telemetry: + if frame.timestamp is not None: + bucket = int((frame.timestamp.datetime - start).total_seconds()) + buckets[bucket] += 1 + + rates = [] + + last_bucket = int((end - start).total_seconds()) + bucket = 2 + + while bucket <= last_bucket: + rates.append( + (buckets[bucket] + buckets[bucket - 1] + buckets[bucket - 2]) / 3.0 + ) + bucket += 1 + + return rates class CreateTestParameters(injection.CreateTestParameters): diff --git a/monitoring/uss_qualifier/scenarios/astm/netrid/injection.py b/monitoring/uss_qualifier/scenarios/astm/netrid/injection.py index 7d2532c13c..ab76ff1aa1 100644 --- a/monitoring/uss_qualifier/scenarios/astm/netrid/injection.py +++ b/monitoring/uss_qualifier/scenarios/astm/netrid/injection.py @@ -1,8 +1,11 @@ import uuid -from datetime import datetime +from datetime import datetime, timedelta import arrow from implicitdict import ImplicitDict +from uas_standards.astm.f3548.v21.constants import ( + TimeSyncMaxDifferentialSeconds, +) from uas_standards.interuss.automated_testing.rid.v1.injection import ChangeTestResponse from monitoring.monitorlib import geo @@ -187,6 +190,7 @@ def get_user_notifications( details=f"Expected response code 200 from {target.participant_id} but received {query.status_code} while trying to retrieve user notifications", query_timestamps=[query.request.timestamp], ) + continue if response is None: check.record_failed( summary="Error while trying to retrieve user notifications", @@ -194,7 +198,23 @@ def get_user_notifications( query_timestamps=[query.request.timestamp], ) notifications[target.participant_id] = [] - else: - notifications[target.participant_id] = response.user_notifications + continue + + if any( + [ + notification.observed_at.value.datetime + > arrow.now() + timedelta(seconds=TimeSyncMaxDifferentialSeconds) + for notification in response.user_notifications + ] + ): + check.record_failed( + summary="Error while trying to retrieve user notifications", + details=f"Response from {target.participant_id} returned notifications in the future.", + query_timestamps=[query.request.timestamp], + ) + notifications[target.participant_id] = [] + continue + + notifications[target.participant_id] = response.user_notifications return notifications diff --git a/monitoring/uss_qualifier/scenarios/astm/utm/notifications_to_operator/notification_checker.py b/monitoring/uss_qualifier/scenarios/astm/utm/notifications_to_operator/notification_checker.py index b61339752e..c2d3d5a2d8 100644 --- a/monitoring/uss_qualifier/scenarios/astm/utm/notifications_to_operator/notification_checker.py +++ b/monitoring/uss_qualifier/scenarios/astm/utm/notifications_to_operator/notification_checker.py @@ -58,6 +58,24 @@ def _get_preexisting_notifications( query_timestamps=[query.request.timestamp], ) continue + + if any( + [ + notification.observed_at.datetime + > arrow.now() + NOTIFICATIONS_MAX_CLOCK_SKEW + for notification in resp.user_notifications + ] + ): + notifications[client.participant_id] = Notifications( + notifications=None, query=query + ) + check.record_failed( + summary="Error while trying to retrieve notifications", + details=f"Response from {client.participant_id} returned notifications in the future.", + query_timestamps=[query.request.timestamp], + ) + continue + notifications[client.participant_id] = Notifications( notifications=resp.user_notifications, query=query ) @@ -105,6 +123,23 @@ def _get_notifications( ) continue + if any( + [ + notification.observed_at.datetime + > arrow.now() + NOTIFICATIONS_MAX_CLOCK_SKEW + for notification in resp.user_notifications + ] + ): + notifications[client.participant_id] = Notifications( + notifications=None, query=query + ) + check.record_failed( + summary="Error while trying to retrieve notifications", + details=f"Response from {client.participant_id} returned notifications in the future.", + query_timestamps=[query.request.timestamp], + ) + continue + # If there was at least one qualifying notification, use the response obtained for this participant previously_observed = { n.observed_at