Streamifying Reference Data for Temporal Consistency with Telemetry Events

In data systems, telemetry and reference data play complementary roles, with telemetry providing real-time updates (e.g., vehicle locations, sensor readings) and reference data supplying the essential context (e.g., route details, sensor placements). While telemetry data updates frequently and reference data less so, both types need accurate temporal alignment for reliable historical analysis. Telemetry data is only meaningful when contextualized by reference data that is temporally consistent. If a bus's location is tracked in real-time, the route configuration at the time of tracking is essential for understanding the bus's context.

Telemetry and reference data are often handled as if they were fundamentally different, but in reality they differ mainly in update frequency. Not a lot of reference data is truly static and never changes in the lifecycle of a system.

"Static" and "real-time" are simply statements about how often data is updated, not about the data's fundamental nature. Over longer periods of time, the "static" concept is outright misleading. Most data records are reflections of the state of a subject in time. If we capture a point in time for a real-time observation in telemtry, we also need to capture the point in time for all context information the real-time telemetry relates to.

Yet, telemetry and reference data are often provided through separate APIs. Take public transit data as an example: GTFS (General Transit Feed Specification) provides schedule data in ZIP files containing CSV files, while GTFS-RT (Real-Time) delivers "real-time" updates of vehicle locations and schedules via Protobuf-encoded stream-like data structures, even if only through HTTP polling. Although both data types originate from the same transit authority, the different formats and update frequencies create integration challenges. Amongst those is that the schedule data is not being labeled with timestamps that clearly tell when changes occurred.

To achieve temporal consistency in analytics systems that handle data where time is a significant dimension, we should therefore aim to detect and signal every state change of any subject through an event. This approach allows us to maintain a point-in-time view of any subject if we record every state change, and we can correlate all events that affect the same subject "in time". With GTFS, for instance, we can detect changes by maintaining hash/checksum indexes at the file or row level, and raising events when a file or record change has been detected through the hash/checksum changing.

CloudEvents: Standardizing Data with Temporal and Contextual Metadata

CNCF CloudEvents provides a standardized schema for events, ensuring that telemetry and state change notifications about reference data share the same fundamental correlation information. Having a common, standardized envelope for event data lets you maintain context and temporal order when flowing and storing state changes and observations of various kinds.

Since telemetry and reference data share key temporal and contextual requirements, CloudEvents also ensures that these data streams can coexist and be easily moved and correlated in a single, real-time event-processing pipeline.

The key CloudEvents attributes that support consistent structuring:

  1. type: Specifies the kind of event’s (e.g., condition update or reference subject state change).
  2. time: Captures the timestamp when the event was raised inside the source context, which is crucial for temporal alignment across data types.
  3. source: Identifies the data’s origin, enabling telemetry and reference data from the same system to be processed together, despite being delivered via different APIs.
  4. subject: Provides additional specificity, such as a route or sensor ID, enabling granular data association.

Setting origin context with source and subject

The source attribute plays a particularly vital role in unifying telemetry and reference data. Even when served by different APIs (as with GTFS and GTFS-RT), the shared source identifier, which is set by the API bridge(s) that turn API responses or database rows into events, links each event back to its origin, enabling consistent processing despite originating from different APIs.

The source identifier is a URI reference that can be used to identify the data source, and it can be used to identify the data source in a way that is meaningful to parties inside of the system. If the URI is absolute, the authority part of the URI can be used to disambiguate sources owned by different parties. The authority and path are the significant elements of the source URI. To avoid confusion about the function of the URI, you can choose a custom scheme like scope: to underline that the URI is a hierarchical identifier and not a network endpoint.

The subject attribute further qualifies the event within the source context. It's useful practice to set the attribute to a value that is a unique identifier for the object described by the event. The identifier might be a composite of multiple data elements contained in the event payload, like route/M15/stop/56. The upside of using the subject attribute in this way is that it allows filtering out a time series of changes related to that particular subject from an event store generically.

The combination of source and subject then uniquely identifies the context of events, differentiated by type. Combinations of these attributes also make for good partition keys. Combining source and subject into a partition key ensures that events from the same context stay in correct absolute order. Adding the type to the partition key breaks absolute order but still preserves relative order of the same kinds of events, allowing a broader spread of the data across partitions.

Setting temporal context with time

The time attribute is essential for temporal alignment. By capturing the event’s timestamp in the source context, CloudEvents helps ensure that telemetry and reference data can maintain their temporal integrity. This is particularly important when the clock model of the source system is not the same as the clock model of the processing system.

While CloudEvents standardizes the time attribute to be a RFC3339 timestamp, a source system might indeed encode its own clock model in the timestamp. For instance, the actual UTC time is largely irrelevant for anyone analyzing a soccer match, but the time since the match period started is crucial. The time attribute in all CloudEvents raised about a match might thus reflect the time since the match period started, not the actual UTC time. The first half might (logically) start at Jan 1st at 00:00:00 UTC and the second half at Jan 2nd at 00:00:00 UTC, with extra time and penalties at Jan 3rd, 4th, and 5th, with the year being irrelevant but normalized to a fixed value for all events. The advantage of this approach is that it reflects the custom clock model of soccer games including their unpredictable stoppage time periods and yet makes all recorded matches and all data from all periods comparable. Similar custom clock models might be used in other sports, in industrial processes, in financial trading, in healthcare, and in many other domains.

The time attribute is indeed optional in CloudEvents and could be replaced by some other custom attribute like a sequence number or some other clock concept, giving temporal context in a specific solution. If such a custom clock model were used, it's important for that model to be used consistently.

Flattening CloudEvents into Event Store Tables

The most straightforward mapping of CloudEvents into event store tables is to flatten the CloudEvents attributes into columns, with the event payload data stored in a data column. This approach allows for retaining a copy of the original event stream as ingested, in temporal order.

The data column may contain a byte array if the event payload shall be retained undecoded, or it may contain the decoded, structured data. The choice depends on the need for the original data and the need for the data to be queryable. The KQL schema below assume the data having been decoded into a structured form and therefore the datacontenttype attribute has been omitted.

create-merge table [_cloudevents_dispatch] (
    [specversion]: string,
    [type]: string,
    [source]: string,
    [id]: string,
    [time]: datetime,
    [subject]: string,
    [data]: dynamic,
    [dataschema]: string
)

If you want the payload data to be queryable in the event store, you can further flatten the payload data into columns. The best approach is to create a table per event type as each event type should have a schema that evolves compatibly.

The upside of maintaining a common table that holds all ingested events is that it allows retaining events that are newly published but yet unknown in your database. Such a common table also allows rebuilding the entire rest of the database from the original events if needed; this is commonly referred to as "Event Sourcing".

In the KQL language of Fabric Eventhouse or Azure Data Explorer, you can ingest CloudEvents into a common table as shown above, and then use an update policy to move the data into tables per event type. You can obviously also ingest the CloudEvents directly into the tables per event type.

Consider the following GTFS example for bus route data from the New York Metropolitan Transit Agency. A CloudEvents-wrapped route data update might look like this:

{
  "type": "GTFSStatic.Route",
  "time": "2024-01-01T12:00:00Z",
  "source": "/us/ny/nyc/mta",
  "subject": "/route/123",
  "data": {
    "route_id": "123",
    "agency_id": "XYZ",
    "route_short_name": "A",
    "route_long_name": "Route A",
    "route_desc": "Route A description",
    "route_type": "Bus",
    "route_url": "🔗
    "route_color": "#FF0000",
    "route_text_color": "#FFFFFF",
    "route_sort_order": 1,
    "continuous_pickup": "1",
    "continuous_drop_off": "1",
    "network_id": "1"
  }
}

The source and time attribute annotations provide important context for the data that isn't contained in the payload, reflecting the original GTFS schedule data. The source attribute identifies the data origin and scope, which is a data feeder for the entire New York Metropolitan Transit Agency data scope, which includes data from several agencies. In GTFS, the agency_id field is an optional qualifier and therefore not reliable. The time attribute captures when the data was read and/or a change was detected.

The mapping into a KQL table schema as used with Fabric Eventhouse or Azure Data Explorer might look like this:

create-merge table [Routes] (
    [route_id]: string,
    [agency_id]: string,
    [route_short_name]: string,
    [route_long_name]: string,
    [route_desc]: string,
    [route_type]: string,
    [route_url]: string,
    [route_color]: string,
    [route_text_color]: string,
    [route_sort_order]: int,
    [continuous_pickup]: string,
    [continuous_drop_off]: string,
    [network_id]: string,
    [___type]: string,
    [___source]: string,
    [___id]: string,
    [___time]: datetime,
    [___subject]: string
);

As you can see, the schema includes the prefixed CloudEvents attributes (e.g., ___type, ___source, ___id, ___time, ___subject), which retain temporal and contextual metadata across telemetry and reference data and yet avoid conflicts with payload data columns that might have the same name. The CloudEvents specification mandates that ___id and ___source in combination form a unique key for each event.

This general structure can also be used with any SQL-based event store.

For completeness, the mentioned KQL update policy that can be used to extract and copy the raw event data from the _cloudevents_dispatch table into the Routes table may look like this:

.alter table [Routes] policy update
[{
  "IsEnabled": true,
  "Source": "_cloudevents_dispatch",
  "Query": "_cloudevents_dispatch | where (specversion == '1.0' and type == 'GeneralTransitFeedStatic.Routes') | project route_id = tostring(data.route_id), agency_id = tostring(data.agency_id), route_short_name = tostring(data.route_short_name), route_long_name = tostring(data.route_long_name), route_desc = tostring(data.route_desc), route_type = tostring(data.route_type), route_url = tostring(data.route_url), route_color = tostring(data.route_color), route_text_color = tostring(data.route_text_color), route_sort_order = toint(data.route_sort_order), continuous_pickup = tostring(data.continuous_pickup), continuous_drop_off = tostring(data.continuous_drop_off), network_id = tostring(data.network_id), ___type = type, ___source = source, ___id = id, ___time = time, ___subject = subject",
  "IsTransactional": false,
  "PropagateIngestionProperties": true
}]

Structural Consistency and Schematization

While CloudEvents offers a simple framework for transforming telemetry and reference data into streams and establishing context, data engineers responsible for bridging APIs to streams must still ensure that, for instance, identifiers inside the data, like route_id and network_id above, are consistent across sources and event schemas, in both spelling and data type.

All event payloads should be declared using formal schema languages such as Avro Schema or JSON Schema. These schemas are essential for defining the structure of data in event stores, serving as the foundation for creating database table schemas, but also informing the user of the data's structure and meaning. The CNCF xRegistry metadata model allows handling event metadata and payload schemas in a unified way.

Temporal Alignment Techniques in Event Stores

Event stores like Azure Data Explorer and Fabric Eventhouse offer flexible approaches for aligning telemetry and reference data temporally, primarily through closest matching and time-based binning.

The KQL documentation provides information on how to perform time window joins., form "row window sessions", and group data into temporal bins.

In addition, you might find the following techniques useful:

Deduplication and accessing the latest state using Materialized Views

Materialized views provide efficient access to both real-time and historical data. By maintaining snapshots of the latest reference data, materialized views support real-time applications while preserving full historical records for temporally aligned analysis. This setup is vital in scenarios where real-time monitoring and retrospective analysis require access to temporally consistent data without repetitive filtering.

Because we can lean on the temporal consistency of CloudEvents, we can use its attributes generically to create such views.

.create materialized-view with (backfill=true) RoutesLatest on table Routes {
    Routes | summarize arg_max(___time, *) by ___type, ___source, ___subject
}

This materialized view retains the latest state of each route, under the assumption that ___type and __source and __subject form a unique subject identifier, ensuring that real-time applications can access the most recent reference data easily and without worrying about duplicates.

Point in time lookups

The function below will find the most recent reference data event that has been recorded before a given point in time. When analyzing time series data related to thej movement of vehicles in a specific time window, using this function or the inner query will provide the context information related to the time series.

.create function RoutesAt(at: datetime)
  {
    Routes
    | where ___time <= at
    | summarize arg_max(___time, *) by ___type, ___source, ___subject 
}

Practical Applications

  1. Transit Systems: Temporal alignment allows archived telemetry (e.g., vehicle positions) to be analyzed alongside route configurations valid at the time, improving transit performance analysis.
  2. Environmental Monitoring: Sensor readings must be evaluated alongside sensor locations and configurations as they existed historically, ensuring accuracy in long-term environmental studies.
  3. Industrial IoT and Manufacturing: Machine telemetry benefits from contextual alignment with factory layouts, equipment configurations, and operational setups relevant during each reading, supporting precise insights into evolving operational contexts.
  4. Supply Chain and Logistics: Logistics telemetry gains value when aligned with the historical configurations of warehouses and routes, supporting accurate logistics performance analysis over time.
  5. Healthcare Monitoring and Clinical Trials: Medical telemetry data must align with calibration states and placements of monitoring devices, ensuring historical data accurately reflects the context at data capture.

Conclusion

By recognizing that telemetry and reference data differ primarily in update frequency rather than in fundamental nature, analytic results of real-time systems can be more consistent and correct through a unified approach where reference data is also treated as events and retained as time series data. CloudEvents helps with this unification by structuring both data types as temporally consistent events, embedding metadata that ensures accurate historical and contextual alignment.

With this unified approach, event stores like Azure Data Explorer or Microsoft Fabric Eventhouse enable real-time and historical data analysis with high temporal precision. This consistency allows applications across sectors—transit, environmental monitoring, industrial IoT, logistics, and healthcare—to derive reliable, temporally consistent insights from complex datasets.

[A code example illustrating the overall model discussed in this article can be found here]

Share on Twitter, Reddit, Facebook or LinkedIn