"""
Utilities for processing waveforms
"""
import numpy as np
from scipy import signal
from obspy import Stream
from obspy.geodetics import gps2dist_azimuth
from pysep import logger
from pysep.utils.fmt import get_codes
[docs]
def append_back_azimuth_to_stats(st, inv, event):
"""
ObsPy rotation requires back azimuth information which is accessed through
trace.stats.back_azimuth. This function appends back azimuth values to the
stream by cacluating using metadata from the inventory and event objects.
.. note::
This was written and then I realized that `_append_sac_headers_trace`
already does this, so it is unused, but left incase it's useful later.
:type st: obspy.core.stream.Stream
:param st: Stream with missing components
:type inv: obspy.core.inventory.Inventory
:param inv: optional user-provided inventory object which will force a
skip over StationXML/inventory searching
:type event: obspy.core.event.Event
:param event: optional user-provided event object which will force a
skip over QuakeML/event searching
:rtype: obspy.core.stream.Stream
:return: stream with back azimuth values appended
"""
st_out = st.copy()
ev_lat = event.preferred_origin().latitude
ev_lon = event.preferred_origin().longitude
for net in inv:
for sta in net:
_, _, baz = gps2dist_azimuth(lat1=ev_lat, lon1=ev_lon,
lat2=sta.latitude, lon2=sta.longitude
)
# Apply back azimuth changes to stream in place
for tr in st_out.select(network=net.code, station=sta.code):
tr.stats.back_azimuth = baz
return st_out
[docs]
def _append_null_trace(st, component, cmpaz=None, cmpinc=None):
"""
Create a copy of a trace with zeroed out data, used for rotation
:type st: obspy.core.stream.Stream
:param st: Stream with missing components
:type component: str
:param component: component of data to turn into a null trace
:type cmpaz: float
:param cmpaz: component azimuth for SAC header
:type cmpinc: float
:param cmpinc: component inclination for SAC header
:rtype st: obspy.core.stream.Stream
:return st: Stream that has had null traces appended
"""
st_out = st.copy()
trace_null = st_out[0].copy()
trace_null.data = np.zeros(len(trace_null.data))
trace_null.stats.component = component
if cmpaz is not None:
trace_null.stats.sac["cmpaz"] = cmpaz
if cmpinc is not None:
trace_null.stats.sac["cmpinc"] = cmpinc
st_out.append(trace_null)
return st_out
[docs]
def _check_component_availability(st):
"""
Quick logic check to see if stream is missing horizontals or vertical
:type st: obspy.core.stream.Stream
:param st: Steam to check
:rtype: tuple of bool
:return: (is the vertical component missing?,
is one or both horizontal components missing?)
"""
comps = sorted([tr.stats.component.upper() for tr in st])
missing_vertical, missing_horizontal = False, False
if comps:
if "Z" not in comps:
missing_vertical = True
# Any one of these criteria will trigger a 'missing_horizontal' flag
for pairs in [("N", "E"), ("R", "T"), ("1", "2")]:
first, second = pairs
if first in comps and second not in comps:
missing_horizontal = True
elif second in comps and first not in comps:
missing_horizontal = True
# Deal with the case where we have an empty Stream
else:
missing_vertical, missing_horizontal = True, True
return missing_vertical, missing_horizontal
[docs]
def rotate_to_uvw(st):
"""
UVW orthogonal frame rotation
In Symmetric Triaxial Seismometers, the sensing elements are also arranged
to be mutually orthogonal, but instead of one axis being vertical, all
three are inclined upwards from the horizontal at precisely the same angle,
as if they were aligned with the edges of a cube balanced on a corner.
TODO test this, not sure how it's supposed to work
See reference:
http://link.springer.com/referenceworkentry/10.1007/978-3-642-36197-5_194-1
Rotation matrix reference:
http://link.springer.com/referenceworkentry/10.1007/
978-3-642-36197-5_194-1#page-1
:type st: obspy.core.stream.Stream
:param st: three-component stream to rotate the UVW
:rtype: obspy.core.stream.Stream
:return: Stream that has been rotated to UVW
"""
st_out = st.copy()
assert(len(st) == 3), f"UVW rotation requires 3 component stream input"
xyz = np.array([[st_out[0].data],
[st_out[1].data],
[st_out[2].data]])
xyz = np.asmatrix(xyz)
rotation_matrix = np.matrix([[2, 0, np.sqrt(2)],
[-1, np.sqrt(3), np.sqrt(2)],
[-1, -np.sqrt(3), np.sqrt(2)]])
rotation_matrix *= 1 / np.sqrt(6)
# Theta is the angle between the axis and the horizontal plane
theta = (np.arcsin(1 / np.sqrt(3))) * (180.0 / np.pi)
uvw = rotation_matrix * xyz
for i, comp in enumerate(["U", "V", "W"]):
st_out[i].data = np.reshape(np.asarray(uvw[i]), uvw.shape[1])
st_out[i].stats.component = comp
st_out[i].stats.sac["kcmpnm"] = st_out[i].stats.channel
st_out[i].stats.sac["cmpaz"] *= (120 * i) # azimuth
st_out[i].stats.sac["cmpinc"] -= theta # inclination
if comp == "W":
st_out[i].stats.sac["cmpinc"] += 90.
return st_out
[docs]
def trim_start_end_times(st, starttime=None, endtime=None, fill_value=None):
"""
Trim all traces in a Stream to a uniform start and end times. If no
`starttime` or `endtime` are provided, they are selected as the outer
bounds of the Streams time bounds.
:type st: obspy.core.stream.Stream
:param st: stream to merge and trim start and end times for
:type starttime: obspy.core.UTCDateTime
:param starttime: user-defined starttime to trim stream. If None, will get
the maximum starttime in entire stream
:type endtime: obspy.core.UTCDateTime
:param endtime: user-defined endtime to trim stream. If None, will get
the minimum endtime in entire stream
:type fill_value: str or int or float
:param fill_value: How to deal with data gaps (missing sections of
waveform before trace start or trace end w.r.t requested start and end
times). NoneType by default, which means data with gaps are removed
completely.
Options include:
- 'mean': fill with the mean of all data values in the gappy data
- <int or float>: fill with a constant, user-defined value, e.g.,
0 or 1.23 or 9.999
- None: do not fill data gaps
"""
st_edit = st.copy()
if starttime is None:
starttime = min([tr.stats.starttime for tr in st_edit])
if endtime is None:
endtime = max([tr.stats.endtime for tr in st_edit])
# Trace.trim() can only take float or int as fill values
if fill_value in ["interpolate", "latest"]:
logger.warning("ObsPy cannot use `fill_value` 'interpolate' or "
"'latest' for missing boundary times. Setting to 'mean'")
fill_value = "mean"
# Go through all stations and look for those with late starts or early ends.
# Then either remove short traces or fill them with acceptable values
for tr in st_edit:
if tr.stats.starttime > starttime or tr.stats.endtime < endtime:
if fill_value is not False:
if fill_value == "mean":
_fillval = np.nanmean(tr.data).astype(tr.data.dtype)
else:
_fillval = fill_value
logger.info(f"{tr.get_id()} filling start/endtime data gap w/: "
f"{_fillval}")
tr.trim(starttime=starttime, endtime=endtime, pad=True,
nearest_sample=False, fill_value=_fillval)
else:
# Directly remove stations with late starttime or early endtime
logger.warning(f"{tr.get_id()} start or endtime does not match "
f"requested bounds, remove")
st_edit.remove(tr)
else:
tr.trim(starttime=starttime, endtime=endtime, nearest_sample=False)
return st_edit
[docs]
def merge_gapped_data(st, fill_value=None, gap_fraction=1.):
"""
Merges traces with the same ID, filling gappy data with values if requested,
otherwise removes traces that have gapped data.
.. note::
Be careful about `fill_value` data types, as there are no checks that
the fill value matches the internal data types. This may cause
unexpected errors.
:type st: obspy.core.stream.Stream
:param st: stream to merge and trim start and end times for
:type fill_value: str or int or float
:param fill_value: How to deal with data gaps (missing sections of
waveform over a continuous time span). NoneType by default, which
means data with gaps are removed completely. Users who want access
to data with gaps must choose how gaps are filled. See API for
ObsPy.core.stream.Stream.merge() for how merge is handled:
Options include:
- 'mean': fill with the mean of all data values in the gappy data
- <int or float>: fill with a constant, user-defined value, e.g.,
0 or 1.23 or 9.999
- 'interpolate': linearly interpolate from the last value pre-gap
to the first value post-gap
- 'latest': fill with the last value of pre-gap data
- False: do not fill data gaps, which will lead to stations w/
data gaps being removed.
:type gap_fraction: float
:param gap_fraction: if `fill_data_gaps` is not None, determines the
maximum allowable fraction (percentage) of data that gaps can
comprise. For example, a value of 0.3 means that 30% of the data
(in samples) can be gaps that will be filled by `fill_data_gaps`.
Traces with gap fractions that exceed this value will be removed.
Defaults to 1. (100%) of data can be gaps.
:rtype: obspy.core.stream.Stream
:return: Stream that has been trimmed
"""
st_edit = st.copy()
logger.info("trimming start and end times on a per-station basis")
st_out = Stream()
# e.g., NN.SSS.LL.CC? i.e., only getting per-component codes
codes = get_codes(st=st_edit, choice="component")
for code in codes:
# Subset the stream at each component
net, sta, loc, cha = code.split(".")
st_edit_select = st_edit.select(network=net, station=sta,
location=loc, channel=cha)
# Try-except catches any general ObsPy merge errors to keep code going
try:
data_gaps = st_edit_select.get_gaps()
fillval = fill_value # dummy value to allow in-place changes
# Check if there are data gaps that we need to address
if data_gaps and fill_value is not False:
# Determine the percentage of data that comprises gaps
gap_duration_samp = data_gaps[0][-1] # assuming only one set
starttime = min([tr.stats.starttime for tr in st_edit_select])
endtime = max([tr.stats.endtime for tr in st_edit_select])
total_samps = \
(endtime - starttime) / st_edit_select[0].stats.delta
gap_percentage = gap_duration_samp / total_samps
# If we exceed the allowable amount of data gap, do not fill
if gap_percentage > gap_fraction:
logger.info(f"{code} has gap fraction "
f"({gap_percentage:.2f}) > allowable "
f"{gap_fraction}, will not fill gaps")
fillval = None
else:
# Decide how we're going to fill the gaps
if fill_value == "mean":
# Collect mean value over all data surrounding the gap
# Convert to data type expected by array
fillval = np.nanmean(
[np.nanmean(tr.data) for tr in st_edit_select]
).astype(st_edit_select[0].data.dtype)
else:
fillval = fill_value
logger.info(f"{code} has data gaps, filling with: "
f"{fillval}")
else:
fillval = None
st_edit_select.merge(fill_value=fillval) # combine like trace ID
except Exception as e: # NOQA
logger.warning(f"{code} merge error: {e}")
continue
st_out += st_edit_select
return st_out
[docs]
def resample_data(st, resample_freq, method="interpolate"):
"""
Apply a lowpass filter and taper data before resampling it using
one of a choice of resampling strategies
TODO did not refactor `resample_cut` which doesn't seem to do anything but
is included in old code
:type st: obspy.core.stream.Stream
:param st: Stream to resample data for
:type resample_freq: float
:param resample_freq: frequency to resample for
:type method: str
:param method: choice of ObsPy resampling strategy:
- interpolate: interpolate data, requires anti-aliasing lowpass filter
before interpolating
- resample: reample data with fourier methods
:rtype: obspy.core.stream.Stream
:return: Stream that has been resampled and maybe filtered
"""
if method not in ["interpolate", "resample"]:
logger.warning(f"unexpected resample method {method}, defaulting to "
f"'interpolate'")
method = "interpolate"
logger.info(f"resampling data to sampling rate: {resample_freq}Hz with "
f"method '{method}'")
st_out = st.copy()
for i, tr in enumerate(st_out[:]):
target_nyquist = 0.5 * resample_freq
current_nyquist = 0.5 * tr.stats.sampling_rate
if target_nyquist < current_nyquist:
try:
logger.debug(f"{tr.get_id()} applying lowpass filter to prep "
f"for new nyquist freq: "
f"{current_nyquist} -> {target_nyquist}")
# This function affects the trace in-place
zerophase_chebychev_lowpass_filter(
tr=tr, freqmax=target_nyquist
)
except Exception as e:
logger.warning(f"exception in lowpass filtering "
f"{tr.get_id()}, remove: {e}")
logger.debug(e)
st_out.remove(tr)
else:
tr.detrend("linear")
tr.taper(max_percentage=0.01, type="hann")
# Enforce that this data array is 'c'ontiguous
tr.data = np.require(tr.data, requirements=["C"])
if method == "interpolate":
tr.interpolate(sampling_rate=resample_freq, method="lanczos", a=8)
elif method == "resample":
tr.resample(sampling_rate=resample_freq, strict_length=True)
return st_out
[docs]
def zerophase_chebychev_lowpass_filter(tr, freqmax):
"""
Custom Chebychev type two zerophase lowpass filter useful for decimation
filtering. This filter is stable up to a reduction in frequency with a
factor of 10. If more reduction is desired, simply decimate in steps.
Partly based on a filter in ObsPy.
Filter parameters:
rp: maximum ripple of passband
rs: attenuation of stopband
ws: stop band frequency
wp: pass band frequency
.. note::
Should be replaced once ObsPy has a proper decimation filter.
.. note::
This function affects the trace in place!
:type tr: obspy.core.trace.Trace
:param tr: The trace to be filtered.
:type freqmax: float
:param freqmax: The desired lowpass frequency.
"""
rp, rs, order = 1, 96, 1e99
ws = freqmax / (tr.stats.sampling_rate * 0.5)
wp = ws
wn = None
while True:
if order <= 12:
break
wp *= 0.99
order, wn = signal.cheb2ord(wp=wp, ws=ws, gpass=rp, gstop=rs,
analog=False)
b, a = signal.cheby2(N=order, rs=rs, Wn=wn, btype="low", analog=0, # NOQA
output="ba") # NOQA
# Apply twice to get rid of the phase distortion.
tr.data = signal.filtfilt(b, a, tr.data)
[docs]
def estimate_prefilter_corners(tr):
"""
Estimates corners for pre-deconvolution (instrument response removal)
Essentially band limits the data based on the minimum and maximum allowable
periods based on the sampling rate and overall waveform length.
See also: https://ds.iris.edu/files/sac-manual/commands/transfer.html
Replaces the old `get_pre_filt` function
:type tr: obspy.core.trace.Trace
:param tr: trace from which stats are taken
:rtype: tuple of float
:return: frequency corners (f0, f1, f2, f3)
"""
# filtering constants
fcut1_par = 4.0
fcut2_par = 0.5
f1 = fcut1_par / (tr.stats.endtime - tr.stats.starttime)
nyquist_freq = tr.stats.sampling_rate / 2
f2 = nyquist_freq * fcut2_par
f0 = 0.5 * f1
f3 = 2.0 * f2
return f0, f1, f2, f3