import tempfile
from pathlib import Path
import numpy as np
import os
import dascore as dc
from dascore.utils.patch import get_patch_names
## Load libraries and get a spool to work on
# Define path for saving results (swap out with your path)
= Path(tempfile.mkdtemp())
output_data_dir
# Get a spool to work on
= dc.get_example_spool().update()
sp
# Sort the spool
= sp.sort("time")
sp # You may also want to sub-select the spool for the desired distance or time samples before proceeding.
# Get the first patch
= sp[0]
pa
# Define the target sampling interval (in sec.)
= 1
dt # With a sampling interval of 10 seconds, the cutoff frequency (Nyquist frequency) is determined to be 0.05 Hz
= 1 / (2*dt)
cutoff_freq
# Safety factor for the low-pass filter to avoid ailiasing
= 0.9
filter_safety_factor
# Enter memory size to be dedicated for processing (in MB)
= 1_000
memory_limit_MB
# Define a tolerance for determining edge effects (used in next step)
= 1e-3 tolerance
Low-Frequency Processing
This recipe demonstrates how DASCore can be used to apply low-frequency (LF) processing to a spool of DAS data. LF processing helps efficiently downsample the entire spool.
Get a spool and define parameters
Calculate chunk size and determine edge effects
To chunk the spool, first we need to figure out the chunk size based on machine’s memory size so we ensure we can load and process patches with no memory issues. Longer chunk size (longer patches) increases computation efficiency.
Notes:
- The
processing_factor
is required because certain processing routines involve making copies of the data during the processing steps. It should be determined by performing memory profiling on an example dataset for the specific processing routine. For instance, the combination of low-pass filtering and interpolation, discussed in the next section, requires a processing factor of approximately 5. - The
memory_safety_factor
is optional and helps prevent getting too close to the memory limit.
# Get patch's number of bytes per seconds (based on patch's data type)
= pa.data.nbytes / pa.seconds
pa_bytes_per_second # Define processing factor and safety factor
= 5
processing_factor = 1.2
memory_safety_factor
# Calculate memory size required for each second of data to get processed
= pa_bytes_per_second * processing_factor * memory_safety_factor
memory_size_per_second = memory_size_per_second / 1e6
memory_size_per_second_MB
# Calculate chunk size that can be loaded (in seconds)
= memory_limit_MB / memory_size_per_second_MB
chunk_size
# Ensure `chunk_size` does not exceed the spool length
= sp[0].get_coord('time').step
time_step = sp[0].get_coord('time').min()
time_min = sp[-1].get_coord('time').max()
time_max = dc.to_float((time_max - time_min + time_step))
spool_length if chunk_size > spool_length:
print(
f"Warning: Specified `chunk_size` ({chunk_size:.2f} seconds) exceeds the spool length "
f"({spool_length:.2f} seconds). Adjusting `chunk_size` to match spool length."
)= spool_length chunk_size
Warning: Specified `chunk_size` (277.78 seconds) exceeds the spool length (24.00 seconds). Adjusting `chunk_size` to match spool length.
Next, we need to determine the extent of artifacts introduced by low-pass filtering at the edges of each patch. To achieve this, we apply LF processing to a delta function patch, which contains a unit value at the center and zeros elsewhere. The distorted edges are then identified based on a defined threshold.
# Retrieve a patch of appropriate size for LF processing that fits into memory
= sp.chunk(time=chunk_size, keep_partial=True)[0]
pa_chunked_sp # Create a delta patch based on new patch size
= dc.get_example_patch("delta_patch", dim="time", patch=pa_chunked_sp)
delta_pa
# Apply the low-pass filter on the delta patch
= delta_pa.pass_filter(time=(None, cutoff_freq * filter_safety_factor))
delta_pa_low_passed # Resample the low-passed filtered patch
= np.arange(delta_pa.attrs["time_min"], delta_pa.attrs["time_max"], np.timedelta64(dt, "s"))
new_time_ax = delta_pa_low_passed.interpolate(time=new_time_ax)
delta_pa_lfp
# Identify the indices where the absolute value of the data exceeds the threshold
= np.abs(delta_pa_lfp.data)
data_abs = np.max(data_abs) * tolerance
threshold = data_abs > threshold
ind = np.where(ind)[1][0]
ind_1 = np.where(ind)[1][-1]
ind_2
# Get the total duration of the processed delta function patch in seconds
= delta_pa_lfp.seconds
delta_pa_lfp_length # Convert the new time axis to absolute seconds, relative to the first timestamp
= (new_time_ax - new_time_ax[0]) / np.timedelta64(1, "s")
time_ax_abs # Center the time axis
= time_ax_abs - delta_pa_lfp_length // 2
time_ax_centered
# Calculate the maximum of edges in both sides (in seconds) where artifacts are present
= max(np.abs(time_ax_centered[ind_1]), np.abs(time_ax_centered[ind_2]))
edge
# Validate the `edge` value to ensure sufficient processing patch size
if np.ceil(edge) >= chunk_size / 2:
raise ValueError(
f"The calculated `edge` value ({edge:.2f} seconds) is greater than half of the processing patch size "
f"({chunk_size:.2f} seconds). To resolve this and increase efficiency, consider one of the following:\n"
"- Increase `memory_size` to allow for a larger processing window.\n"
"- Increase `tolerance` to reduce the sensitivity of artifact detection."
)
Perform low-frequency processing and save results on disk
# First we chunk the spool based on the `chunk_size' and `edge` calculated before.
= sp.chunk(time=chunk_size, overlap=2*edge, keep_partial=True)
sp_chunked_overlap
# Process each patch in the spool and save the result patch
= []
lf_patches for patch in sp_chunked_overlap:
# Apply any pre-processing you may need (such as velocity to strain rate transformation, detrending, etc.)
# ...
# Apply the low-pass filter on the delta patch
= patch.pass_filter(time=(..., cutoff_freq * filter_safety_factor))
pa_low_passed # Resample the low-passed filter patch
= np.arange(pa_low_passed.attrs["time_min"], pa_low_passed.attrs["time_max"], np.timedelta64(dt, "s"))
new_time_ax = (
pa_lfp =new_time_ax)
pa_low_passed.interpolate(time=dt) # Update sampling interval
.update_coords(time_step# remove edges from data at both ends
=(edge, -edge), relative=True)
.select(time
)
# Save processed patch
= pa_lfp.get_patch_name()
pa_lf_name = output_data_dir / f"{pa_lf_name}.h5"
path "dasdae") pa_lfp.io.write(path,
Visualize the results
# Create a spool of LF processed results
= dc.spool(output_data_dir)
sp_lf
# Merge the spool and create a single patch. May need to sub-select before merging to prevent exceeding the memory limit.
= sp_lf.chunk(time=None, conflict="keep_first")
sp_lf_merged = sp_lf_merged[0]
pa_lf_merged
# Visualize the results. Try different scale values for better Visualization.
=0.5) pa_lf_merged.viz.waterfall(scale
<Axes: xlabel='time', ylabel='distance(m)'>