Real-Time Processing

This recipe serves as an example to showcase the real-time processing capability of DASCore. Here, we demonstrate how to use DASCore to perform rolling mean processing on a spool in “near” real time for edge computing purposes.

Load libraries and get a spool

import numpy as np
import os
import time

import dascore as dc
from dascore.units import s 


# Define path for saving results
output_data_dir = '/path/to/desired/output/directory'

# Get a spool to work on
sp = dc.get_example_spool().update()

# Sort the spool
sp = sp.sort("time")

Set real-time processing parameters (if needed)

In this section, we define the window size and step size required for rolling mean processing. With a sampling interval of 10 seconds, the cutoff frequency (Nyquist frequency) is determined to be 0.05 Hz. Additionally, we establish the desired wait time after each run by using the sleep_time_mult parameter, which acts as a multiplier coefficient for the number of seconds in each patch.

# Define the target sampling interval (in sec.)
dt = 10 

# Determine window size 
window = dt*s

# Determine step size 
step = dt*s

# Set the desired wait time after each run
sleep_time_mult = 1.2

Real-time processing

Eventually, we use a while loop to frequently call the spool and perform the processing. The while loop breaks if there are no new patches in the spool.

# Start the for loop for real-time processing
i = 1
while True:
    print(f"\nRun number: {i}")

    # Select an updated spool 
    sp = sp.update().sort("time")
    len_updated_sp = len(sp)

    # Get number of seconds in the first patch 
    # (assuming data is getting in with the same time duration)
    sampling_interval = sp[0].coords.step("time")
    num_sec = (sp[0].coords.max("time") - sp[0].coords.min("time") 
    + sampling_interval) / np.timedelta64(1, 's')

    initial_run = (i == 1)
    if initial_run:
        len_last_sp = changed_sleep_run_num = 0
        same_len = False
        print(f"Number of seconds in each patch = {num_sec}") 
    
    # Check for new patches
    if not initial_run and len_last_sp == len_updated_sp: 
        if not same_len: 
            print("No new data was detected in the spool after the set sleep time."
                " Consider manually increasing the sleep time multiplier"
                " coefficient (which could depend on hardware) for better"
                " real-time processing performance.")
            
            # Adjust the sleep_time_mult to a greater value
            sleep_time_mult = 3
            print("So, sleep for longer to make sure no new patch exists.")

            same_len = True
            changed_sleep_run_num = i
        else:
            # Break the while loop if there are no new patches 
            # in the spool after extended sleep
            if i != changed_sleep_run_num:
                print("No new data was detected in spool even after "
                    f"{num_sec} * {str(sleep_time_mult)} = {sleep_time} sec. "
                    "Therefore, real-time data processing ended successfully.")
                break

    # Set sleep time after each run to the 
    sleep_time = num_sec * sleep_time_mult 

    # Do processing on each patch in the spool
    for j, patch in enumerate (sp[len_last_sp:]): 
        patch_num = len_last_sp + j
        print(f"Working on patch number: {patch_num}")

        # Do processing
        rolling_mean_patch = patch.rolling(
            time=window, step=step, engine="numpy").mean()

        # Save results 
        file_name = sp.get_contents()["path"][patch_num]
        output_path = os.path.join(output_data_dir, file_name)
        rolling_mean_patch.io.write(output_path, "dasdae")

    len_last_sp = len(sp)
    i+=1

    # Wait for new data to get into the data_path before proceeding with a new run
    print(f"Sleeping for {num_sec} * {str(sleep_time_mult)} = {sleep_time} sec.")
    time.sleep(sleep_time)