# Import libraries
import os
import time
import dascore as dc
import numpy as np
from dascore.units import s
# Define path for saving results
= '/path/to/desired/output/directory'
output_data_dir
# Get a spool to work on
= dc.get_example_spool().update().sort("time") sp
Real-Time Processing
This recipe serves as an example to showcase the real-time processing capability of DASCore. Here, we demonstrate how to use DASCore to perform rolling mean processing on a spool in near real time for edge computing purposes.
Load libraries and get a spool
Set real-time processing parameters (if needed)
In this section, we define the window size and step size required for rolling mean processing. With a sampling interval of 10 seconds, the cutoff frequency (Nyquist frequency) is determined to be 0.05 Hz. Additionally, we establish the desired wait time after each run by using the sleep_time_mult
parameter, which acts as a multiplier coefficient for the number of seconds in each patch.
# Define the target sampling interval (in sec.)
= 10
dt
# Determine window size
= dt*s
window
# Determine step size
= dt*s
step
# Set the desired wait time after each run
= 1.2 sleep_time_mult
Real-time processing
Eventually, we use a while loop to frequently call the spool and perform the processing. The while loop breaks if there are no new patches in the spool.
# Start the for loop for real-time processing
= 1
i while True:
print(f"\nRun number: {i}")
# Select an updated spool
= sp.update().sort("time")
sp = len(sp)
len_updated_sp
# Get number of seconds in the first patch
# (assuming data is getting in with the same time duration)
= sp[0].coords.step("time")
sampling_interval = (sp[0].coords.max("time") - sp[0].coords.min("time")
num_sec + sampling_interval) / np.timedelta64(1, 's')
= (i == 1)
initial_run if initial_run:
= changed_sleep_run_num = 0
len_last_sp = False
same_len print(f"Number of seconds in each patch = {num_sec}")
# Check for new patches
if not initial_run and len_last_sp == len_updated_sp:
if not same_len:
print("No new data was detected in the spool after the set sleep time."
" Consider manually increasing the sleep time multiplier"
" coefficient (which could depend on hardware) for better"
" real-time processing performance.")
# Adjust the sleep_time_mult to a greater value
= 3
sleep_time_mult print("So, sleep for longer to make sure no new patch exists.")
= True
same_len = i
changed_sleep_run_num else:
# Break the while loop if there are no new patches
# in the spool after extended sleep
if i != changed_sleep_run_num:
print("No new data was detected in spool even after "
f"{num_sec} * {str(sleep_time_mult)} = {sleep_time} sec. "
"Therefore, real-time data processing ended successfully.")
break
# Set sleep time after each run to the
= num_sec * sleep_time_mult
sleep_time
# Do processing on each patch in the spool
for j, patch in enumerate (sp[len_last_sp:]):
= len_last_sp + j
patch_num print(f"Working on patch number: {patch_num}")
# Do processing
= patch.rolling(
rolling_mean_patch =window, step=step, engine="numpy").mean()
time
# Save results
= sp.get_contents()["path"][patch_num]
file_name = os.path.join(output_data_dir, file_name)
output_path "dasdae")
rolling_mean_patch.io.write(output_path,
= len(sp)
len_last_sp +=1
i
# Wait for new data to get into the data_path before proceeding with a new run
print(f"Sleeping for {num_sec} * {str(sleep_time_mult)} = {sleep_time} sec.")
time.sleep(sleep_time)