import dascore as dc
= [dc.get_example_patch()]
patch_list
= dc.spool(patch_list) spool1
Spool
Spools are containers/managers of patches. The spool interface is designed to manage a variety of data sources, including a group of patches loaded into memory, archives of local files, and a variety of remote resources.
Data Sources
The simplest way to get the appropriate spool for a specified input is to use the spool
function, which knows about many different input types and returns an appropriate BaseSpool
subclass instance.
Patches (in-memory)
A Single file
import dascore as dc
from dascore.utils.downloader import fetch
= fetch("terra15_das_1_trimmed.hdf5")
path_to_das_file
= dc.spool(path_to_das_file) spool2
A directory of DAS files
import dascore as dc
from dascore.utils.downloader import fetch
# We use fetch to make sure the file is downloaded. You would
# just need to replace directory_path with your data directory path.
= fetch("terra15_das_1_trimmed.hdf5")
path_to_das_file = path_to_das_file.parent
directory_path
# Update will create an index of the contents for fast querying/access
= dc.spool(directory_path).update() spool3
If you want the index file to exist somewhere else, for example if you can’t write to the data directory, you can specify an index path.
import tempfile
from pathlib import Path
= Path(tempfile.mkdtemp()) / "index.h5"
index_path
# Update will create an index of the contents for fast querying/access.
= dc.spool(directory_path, index_path=index_path).update() spool
New spools created using the same directory will know where to find the index file, unless there is a valid index file already in the directory.
If you remove files from a directory that has already been indexed, you should delete the index file and then call update
again on the spool like this:
spool.indexer.index_path.unlink() spool.update()
It is best not to delete files once added to a directory managed by DASCore.
Despite some implementation differences, all spools have common behavior/methods.
Accessing patches
Patches are extracted from the spool via simple iteration or indexing. New spools are returned via slicing.
import dascore as dc
= dc.get_example_spool()
spool
# Extract first patch in the spool.
= spool[0]
patch
# Iterate patches in spool.
for patch in spool:
...
# Slice spool to create new spool which excludes first patch.
= spool[1:] new_spool
get_contents
The get_contents
method returns a dataframe listing the spool contents. This method may not be supported on all spools, especially those interfacing with large remote resources.
import dascore as dc
= dc.get_example_spool()
spool
# Return dataframe with contents of spool (each row has metadata of a patch)
= spool.get_contents()
contents print(contents)
data_type | data_category | data_units | instrument_id | acquisition_id | tag | station | network | history | dims | ... | distance_dtype | distance_min | distance_max | distance_step | distance_units | time_dtype | time_min | time_max | time_step | time_units | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | None | random | distance,time | ... | int64 | 0 | 299 | 1 | m | datetime64 | 2020-01-03 00:00:00 | 2020-01-03 00:00:07.996 | 0 days 00:00:00.004000 | s | |||||||
1 | None | random | distance,time | ... | int64 | 0 | 299 | 1 | m | datetime64 | 2020-01-03 00:00:08 | 2020-01-03 00:00:15.996 | 0 days 00:00:00.004000 | s | |||||||
2 | None | random | distance,time | ... | int64 | 0 | 299 | 1 | m | datetime64 | 2020-01-03 00:00:16 | 2020-01-03 00:00:23.996 | 0 days 00:00:00.004000 | s |
3 rows × 21 columns
select
The select method selects a subset of a spool and returns a new spool. get_contents
will now reflect a subset of the original data requested by the select operation.
import dascore as dc
= dc.get_example_spool()
spool
# Select a spool with data after Jan 3rd, 2020.
= spool.select(time=('2020-01-03T00:00:09', None)) subspool
In addition to trimming the data along a specified dimension (as shown above), select
can be used to filter patches that meet a specified criteria.
import dascore as dc
# Load a spool which has many diverse patches.
= dc.get_example_spool('diverse_das')
spool
# Only include patches which are in network 'das2' or 'das3'.
= spool.select(network={'das2', 'das3'})
subspool
# Only include spools which match some unix-style query on their tags.
= spool.select(tag='some*') subspool
chunk
The chunk
method controls how data are grouped together in patches within the spool. It can be used to merge contiguous patches together, specify the size of patches for processing, specify overlap with previous patches, etc.
import dascore as dc
= dc.get_example_spool()
spool
# Chunk spool for 3 second increments with 1 second overlaps
# and keep any segements at the end that don't have the full 3 seconds.
= spool.chunk(time=3, overlap=1, keep_partial=True)
subspool
# Merge all contiguous segments along time dimension.
= spool.chunk(time=None) merged_spool
concatenate
Similar to chunk
, Spool.concatenate
is used to combine patches together. However, concatenate
doesn’t account for coordinate values along the concatenation axis, and can even be used to create new patch dimensions.
However, unlike chunk
, not all Spool
types implement concatenate
.
import dascore as dc
= dc.get_example_patch()
patch
# Create a spool with patches that have a large gap
= patch.get_coord("time")
time = dc.to_timedelta64(3600)
one_hour = patch.update_coords(time_min=time.max() + one_hour)
patch2 = dc.spool([patch, patch2])
spool
# chunk rightfully wouldn't merge these patches, but concatenate will.
= spool.concatenate(time=None)
merged print(merged[0].coords)
map
The map
method applies a function to all patches in the spool. It provides an efficient way to process large datasets, especially when combined with clients (aka executors).
For example, calculating the maximum value for each channel (distance) for 4 second increments with 1 second overlap can be done like so:
import dascore as dc
= dc.get_example_spool()
spool
# define function for mapping to each patch
def get_dist_max(patch):
"""Function which will be mapped to each patch in spool."""
return patch.aggregate("time", "max")
# chunk and apply function
= spool.chunk(time=5, overlap=1).map(get_dist_max)
map_out
# combine output back into a single patch
= dc.spool(map_out).concatenate(time=None)[0]
agg_patch
print(agg_patch)
DASCore Patch ⚡
---------------
➤ Coordinates (distance: 300, time: 5)
*distance: CoordRange( min: 0 max: 299 step: 1 shape: (300,) dtype: int64 units: m )
*time: CoordPartial( shape: (5,) dtype: None )
➤ Data (float64)
[[1. 1. 1. 1. 1. ]
[1. 1. 1. 1. 1. ]
[0.993 1. 0.993 1. 0.993]
...
[1. 1. 1. 1. 1. ]
[0.999 0.999 0.999 0.999 0.999]
[1. 0.998 1. 0.998 1. ]]
➤ Attributes
tag: random
history: ("aggregate(dim='time',dim_reduce='empty',method='max')", 'drop_private_coords()', 'concatenate')
category: DAS
See the parallel processing recipe for more examples with map
.