Spool

Spools are containers/managers of patches. The spool interface is designed to manage a variety of data sources, including a group of patches loaded into memory, archives of local files, and a variety of remote resources.

Data Sources

The simplest way to get the appropriate spool for a specified input is to use the spool function, which knows about many different input types and returns an appropriate BaseSpool subclass instance.

Patches (in-memory)

import dascore as dc

patch_list = [dc.get_example_patch()]

spool1 = dc.spool(patch_list)

A Single file

import dascore as dc
# Import fetch to read DASCore example files 
from dascore.utils.downloader import fetch

path_to_das_file = fetch("terra15_das_1_trimmed.hdf5")
# To read DAS data stored locally on your machine, simply replace the above line with:
# path_to_das_file = "/path/to/data/directory/data.EXT"

spool2 = dc.spool(path_to_das_file)

A directory of DAS files

import dascore as dc
# Import fetch to read DASCore example files 
from dascore.utils.downloader import fetch

# Fetch a sample file path from DASCore (just to get a usable path for the rest of the cell)
directory_path = fetch('terra15_das_1_trimmed.hdf5').parent
# To read a directory of DAS data stored locally on your machine, 
# simply replace the above line with:
# directory_path = "/path/to/data/directory/"

# Update will create an index of the contents for fast querying/access
spool3 = dc.spool(directory_path).update()

If you want the index file to exist somewhere else, for example if you can’t write to the data directory, you can specify an index path.

import tempfile
from pathlib import Path

index_path = Path(tempfile.mkdtemp()) / "index.h5"

# Update will create an index of the contents for fast querying/access.
spool = dc.spool(directory_path, index_path=index_path).update()

New spools created using the same directory will know where to find the index file, unless there is a valid index file already in the directory.

Warning

If you remove files from a directory that has already been indexed, you should delete the index file and then call update again on the spool like this:

spool.indexer.index_path.unlink()
spool.update()

It is best not to delete files once added to a directory managed by DASCore.

Despite some implementation differences, all spools have common behavior/methods.

Accessing patches

Patches are extracted from the spool via simple iteration or indexing. New spools are returned via slicing.

import dascore as dc

spool = dc.get_example_spool()

# Extract first patch in the spool.
patch = spool[0]

# Iterate patches in spool.
for patch in spool:
    ...

# Slice spool to create new spool which excludes first patch.
new_spool = spool[1:]

get_contents

The get_contents method returns a dataframe listing the spool contents. This method may not be supported on all spools, especially those interfacing with large remote resources.

import dascore as dc
spool = dc.get_example_spool()

# Return dataframe with contents of spool (each row has metadata of a patch)
contents = spool.get_contents()
print(contents)
data_type data_category data_units instrument_id acquisition_id tag station network history dims ... distance_dtype distance_min distance_max distance_step distance_units time_dtype time_min time_max time_step time_units
0 None random distance,time ... int64 0 299 1 m datetime64 2020-01-03 00:00:00 2020-01-03 00:00:07.996 0 days 00:00:00.004000 s
1 None random distance,time ... int64 0 299 1 m datetime64 2020-01-03 00:00:08 2020-01-03 00:00:15.996 0 days 00:00:00.004000 s
2 None random distance,time ... int64 0 299 1 m datetime64 2020-01-03 00:00:16 2020-01-03 00:00:23.996 0 days 00:00:00.004000 s

3 rows × 21 columns

select

The select method selects a subset of a spool and returns a new spool. get_contents will now reflect a subset of the original data requested by the select operation.

import dascore as dc
spool = dc.get_example_spool()

# Select a spool with data after Jan 3rd, 2020.
subspool = spool.select(time=('2020-01-03T00:00:09', None))

In addition to trimming the data along a specified dimension (as shown above), select can be used to filter patches that meet a specified criteria.

import dascore as dc
# Load a spool which has many diverse patches.
spool = dc.get_example_spool('diverse_das')

# Only include patches which are in network 'das2' or 'das3'.
subspool = spool.select(network={'das2', 'das3'})

# Only include spools which match some unix-style query on their tags.
subspool = spool.select(tag='some*')

chunk

The chunk method controls how data are grouped together in patches within the spool. It can be used to merge contiguous patches together, specify the size of patches for processing, specify overlap with previous patches, etc.

import dascore as dc
spool = dc.get_example_spool()

# Chunk spool for 3 second increments with 1 second overlaps
# and keep any segements at the end that don't have the full 3 seconds.
subspool = spool.chunk(time=3, overlap=1, keep_partial=True)

# Merge all contiguous segments along time dimension.
merged_spool = spool.chunk(time=None)

concatenate

Similar to chunk, Spool.concatenate is used to combine patches together. However, concatenate doesn’t account for coordinate values along the concatenation axis, and can even be used to create new patch dimensions.

Warning

However, unlike chunk, not all Spool types implement concatenate.

import dascore as dc

patch = dc.get_example_patch()

# Create a spool with patches that have a large gap
time = patch.get_coord("time")
one_hour = dc.to_timedelta64(3600)
patch2 = patch.update_coords(time_min=time.max() + one_hour)
spool = dc.spool([patch, patch2])

# chunk rightfully wouldn't merge these patches, but concatenate will.
merged = spool.concatenate(time=None)
print(merged[0].coords)

map

The map method applies a function to all patches in the spool. It provides an efficient way to process large datasets, especially when combined with clients (aka executors).

For example, calculating the maximum value for each channel (distance) for 4 second increments with 1 second overlap can be done like so:

import dascore as dc
spool = dc.get_example_spool()

# define function for mapping to each patch
def get_dist_max(patch):
    """Function which will be mapped to each patch in spool."""
    return patch.aggregate("time", "max")

# chunk and apply function
map_out = spool.chunk(time=5, overlap=1).map(get_dist_max)

# combine output back into a single patch
agg_patch = dc.spool(map_out).concatenate(time=None)[0]

print(agg_patch)

DASCore Patch ⚡
---------------
➤ Coordinates (distance: 300, time: 5)
    *distance: CoordRange( min: 0 max: 299 step: 1 shape: (300,) dtype: int64 units: m )
    *time: CoordPartial( shape: (5,) dtype: None )
➤ Data (float64)
   [[1.    1.    1.    1.    1.   ]
    [1.    1.    1.    1.    1.   ]
    [0.993 1.    0.993 1.    0.993]
    ...
    [1.    1.    1.    1.    1.   ]
    [0.999 0.999 0.999 0.999 0.999]
    [1.    0.998 1.    0.998 1.   ]]
➤ Attributes
    tag: random
    history: ("aggregate(dim='time',dim_reduce='empty',method='max')", 'drop_private_coords()', 'concatenate')
    category: DAS

See the parallel processing recipe for more examples with map.