Source code for pysep.utils.cap_sac
"""
Utils to honor file formats from SAC and CAP
For SAC header names and descriptions, see:
http://www.adc1.iris.edu/files/sac-manual/manual/file_format.html
"""
import os
import numpy as np
from obspy import UTCDateTime
from obspy.core.stream import Stream
from obspy.geodetics import gps2dist_azimuth, kilometer2degrees
from pysep import logger
from pysep.utils.fmt import format_event_tag_legacy
from pysep.utils.fetch import get_taup_arrivals
# SAC HEADER CONSTANTS DEFINING NON-INTUITIVE QUANTITIES
[docs]
SACDICT = {
"first_arrival_time": "a",
"p_arrival_time": "t5",
"p_incident_angle": "user1",
"p_takeoff_angle": "user3",
"s_arrival_time": "t6",
"s_incident_angle": "user2",
"s_takeoff_angle": "user4",
}
[docs]
def write_cap_weights_files(st, path_out="./", order_by="dist"):
"""
Write CAP (Cut-and-Paste) moment tensor inversion code weight files,
assuming that SAC headers are already present.
TODO re-add Ptime setting with event.picks
Replaces `write_cap_weights`
The weight file has columns corresponding to the following:
0: EVENT_STATION_ID
1: DIST_KM
2: BODY_Z
3: BODY_R
4: SURF_Z
5: SURF_R
6: SURF_T
7: P_ARRIVAL
8: LEGACY (unused)
9: S_ARRIVAL
10: LEGACY (unused)
11: STATIC CORRECTION RAYLEIGH
:type st: obspy.core.stream.Stream
:param st: input stream to use to write CAP weight files, expected to
have SAC header
:type path_out: str
:param path_out: path to write the weight file, filenames are set
by default inside the function
:type order_by: str
:param order_by: how to order the list of stations that gets written out
available options are:
* dist: order by smallest to largest source-receiver distance (default)
* az: order by smallest to largest azimuth (deg)
* code: order alphabetically by station name
"""
assert(order_by in ["dist", "az", "code"]), (f"CAP weights sorting must be "
f"by 'dist', 'az', or 'code'")
# Standard look to the white space in the weights' files
weight_fmt = ("{code:>32}{dist:8.2f} {body_z:>3}{body_r:>2} "
"{surf_z:>2}{surf_r:>2}{surf_t:>2} "
"{p_arr:6.2f}{leg:>2} "
"{s_arr:6.2f}{leg:>2} "
"{corr:>2}\n")
# Define pre-set keys for differently weighted weights files
weight_files = {"weights.dat": {"body_z": 1, "body_r": 1,
"surf_z": 1, "surf_r": 1, "surf_t": 1},
"weights_body.dat": {"body_z": 1, "body_r": 1,
"surf_z": 0, "surf_r": 0, "surf_t": 0},
"weights_surf.dat": {"body_z": 0, "body_r": 0,
"surf_z": 1, "surf_r": 1, "surf_t": 1},
}
# e.g. NN.SSS.LL.CC?; wildcard component
codes = list(set([f"{tr.get_id()[:-1]}?" for tr in st]))
code_list = []
p_arrival = 0. # default value
for code in codes:
net, sta, loc, cha = code.split(".")
# Grab any component as they all have the same dist and azimuth
tr = st.select(network=net, station=sta, location=loc, channel=cha)[0]
# [code, dist_km, az, p_arrival]
code_list.append([f"{tr.stats.sac['kevnm']}.{code}",
tr.stats.sac["dist"], tr.stats.sac["az"],
p_arrival])
# Order codes based on distance, name or azimuth
idx = ["code", "dist", "az"].index(order_by)
code_list = np.array(code_list)
ordered_codes = code_list[code_list[:, idx].argsort()]
logger.info("writing CAP weight files")
for basename, weights in weight_files.items():
fid = os.path.join(path_out, basename)
logger.debug(f"writing CAP weight file for {len(ordered_codes)} "
f"station(s), ordered by '{order_by}': '{fid}'")
with open(fid, "w") as f:
for vals in ordered_codes:
code, dist, az, p_arr = vals
# note: code drops the wild card that we appended earlier
f.write(weight_fmt.format(
code=code[:-1], dist=float(dist), body_z=weights["body_z"],
body_r=weights["body_r"], surf_z=weights["surf_z"],
surf_r=weights["surf_r"], surf_t=weights["surf_t"],
p_arr=float(p_arr), leg=0, s_arr=0., corr=0)
)
[docs]
def append_sac_headers(st, event, inv):
"""
Wrapper for trace header appending to get a loop and some logic in
:type st: obspy.core.stream.Stream
:param st: Stream to append SAC header to
:type event: obspy.core.event.Event
:param event: Event with metadata for SAC header
:type inv: obspy.core.inventory.Inventory
:param event: StationXML with metadata for SAC header
:rtype: obspy.core.stream.Stream
:return: Stream with SAC headers, those that could not be appended to
have been removed from the stream
"""
st_out = Stream()
for tr in st.copy()[:]:
try:
st_out.append(_append_sac_headers_trace(tr, event, inv))
except Exception as e:
logger.warning(f"{tr.get_id()} can't write SAC headers: {e}")
return st_out
[docs]
def _append_sac_headers_trace(tr, event, inv):
"""
Append SAC headers to ObsPy streams given event and station metadata.
Also add 'back_azimuth' to Stream stats which can be used for rotation.
Rewritten from: `util_write_cap.add_sac_metadata()`
TODO Add back in information removed from original function
* Add sensor type somewhere, previously stored in KT? (used for picks)
.. note::
We explicitely set 'iztype, 'b' and 'e' in the SAC header to tell ObsPy
that the trace start is NOT the origin time. Otherwise all the relative
timing (e.g., picks) will be wrong.
:type tr: obspy.core.trace.Trace
:param tr: Trace to append SAC header to
:type event: obspy.core.event.Event
:param event: Event with metadata for SAC header
:type inv: obspy.core.inventory.Inventory
:param event: StationXML with metadata for SAC header
:rtype: obspy.core.trace.Trace
:return: Trace with appended SAC header
"""
net_code, sta_code, loc_code, cha_code = tr.get_id().split(".")
# An inventory that is narrowed down to a per-channel basis
inv_unique = inv.select(network=net_code, station=sta_code,
channel=cha_code)
# All of these are subsets of an inventory
net = inv_unique[0]
sta = net[0]
dist_m, az, baz = gps2dist_azimuth(
lat1=event.preferred_origin().latitude,
lon1=event.preferred_origin().longitude,
lat2=sta.latitude, lon2=sta.longitude
)
dist_km = dist_m * 1E-3 # units: m -> km
dist_deg = kilometer2degrees(dist_km) # spherical earth approximation
otime = event.preferred_origin().time
sac_header = {
"iztype": 9, # Ref time equivalence, IB (9): Begin time
"b": tr.stats.starttime - event.preferred_origin().time, # begin time
"e": tr.stats.npts * tr.stats.delta, # end time
"evla": event.preferred_origin().latitude,
"evlo": event.preferred_origin().longitude,
"stla": sta.latitude,
"stlo": sta.longitude,
"stel": sta.elevation / 1E3, # elevation in km
"kevnm": format_event_tag_legacy(event), # only take date code
"nzyear": otime.year,
"nzjday": otime.julday,
"nzhour": otime.hour,
"nzmin": otime.minute,
"nzsec": otime.second,
"nzmsec": otime.microsecond,
"dist": dist_km,
"az": az, # degrees
"baz": baz, # degrees
"gcarc": dist_deg, # degrees
"lpspol": 0, # 1 if left-hand polarity (usually no in passive seis)
"lcalda": 1, # 1 if DIST, AZ, BAZ, GCARC to be calc'd from metadata
}
# Some Inventory objects will not go all the way to channel, only to station
try:
cha = sta[0]
sac_header["cmpinc"] = cha.dip # channel dip/inclination in degrees
sac_header["cmpaz"] = cha.azimuth # channel azimuth in degrees
except IndexError:
pass
# Allow for no magnitude and no depth information
try:
evdp = event.preferred_origin().depth / 1E3 # units: km
sac_header["evdp"] = evdp
except Exception: # NOQA
pass
try:
mag = event.preferred_magnitude().mag
sac_header["mag"] = mag
except Exception: # NOQA
pass
# Append SAC header and include back azimuth for rotation
tr.stats.sac = sac_header
tr.stats.back_azimuth = baz
return tr
[docs]
def format_sac_header_w_taup_traveltimes(st, model="ak135",
phase_list=("ttall",)):
"""
Add TauP travel times to the SAC headers using information in the SAC header
Also get some information from TauP regarding incident angle, takeoff angle
Hardcoded to only look at P and S arrivals (both upgoing and downgoing)
TODO Probably find better ways to store arrival time and incident angles
.. note::
This function expects that the Stream has been formatted with SAC header
.. note::
SAC header writing could probably be in a loop, but I think it's more
readable to see P and S values getting written separately.
:type st: obspy.core.stream.Stream
:param st: Stream object with SAC headers which will be written to with
new SAC header attributser
:type model: str
:param model: name of the TauP model to use for arrival times etc.
defaults to 'ak135'
:type phase_list: list of str
:param phase_list: phase names to get ray information from TauP with.
Defaults to 'ttall', which is ObsPy's default for getting all phase
arrivals. Must match Phases expected by TauP (see ObsPy TauP
documentation for acceptable phases).
"""
st_out = st.copy()
phase_dict = get_taup_arrivals(st=st, model=model, phase_list=phase_list)
# Arrivals may return multiple entires for each phase, pick earliest
for tr in st_out[:]:
net_sta = ".".join(tr.get_id().split(".")[:2]) # NN.SSS
# Missing SAC header values may cause certain or all stations to not
# be present in the `phase_dict`
if net_sta not in phase_dict:
logger.warning(f"{tr.get_id()} not present in TauP arrivals, cant "
f"append arrival time SAC headers")
continue
arrivals = phase_dict[net_sta]
# If the TauP arrival calculation fails, `arrivals` will be empty
if not arrivals:
logger.warning(f"{tr.get_id()} has no TauP phase arrivals, cant "
f"append arrival time SAC headers")
continue
# Find earliest arriving phase (likely same as P phase but maybe not)
idx_times = [(i, a.time) for i, a in enumerate(arrivals)]
idx, _ = min(idx_times, key=lambda x: x[1]) # find index of min time
phase = arrivals[idx] # Earliest Arrival object
tr.stats.sac[SACDICT["first_arrival_time"]] = phase.time # 'a'
tr.stats.sac[f"k{SACDICT['first_arrival_time']}"] = f"{phase.name}" #ka
# Find earliest arriving P phase (must start with letter 'P')
idx_times = [(i, a.time) for i, a in enumerate(arrivals) if
a.name.upper().startswith("P")]
idx, _ = min(idx_times, key=lambda x: x[1]) # find index of min time
p = arrivals[idx] # Earliest P-wave Arrival object
tr.stats.sac[SACDICT["p_arrival_time"]] = p.time
tr.stats.sac[f"k{SACDICT['p_arrival_time']}"] = f"{p.name}"
# P phase incident angle (ia) and takeoff angle (ta)
tr.stats.sac[SACDICT["p_incident_angle"]] = p.incident_angle
tr.stats.sac[f"k{SACDICT['p_incident_angle']}"] = f"p_IncAng"
tr.stats.sac[SACDICT["p_takeoff_angle"]] = arrivals[idx].takeoff_angle
tr.stats.sac[f"k{SACDICT['p_takeoff_angle']}"] = f"p_TkfAng"
# Find earliest arriving S phase (must start with letter 'S')
idx_times = [(i, a.time) for i, a in enumerate(arrivals) if
a.name.upper().startswith("S")]
idx, _ = min(idx_times, key=lambda x: x[1])
s = arrivals[idx] # Earliest S-wave Arrival object
tr.stats.sac[SACDICT["s_arrival_time"]] = s.time
tr.stats.sac[f"k{SACDICT['s_arrival_time']}"] = f"{s.name}"
tr.stats.sac[SACDICT["s_incident_angle"]] = s.incident_angle
tr.stats.sac[f"k{SACDICT['s_incident_angle']}"] = f"s_IncAng"
tr.stats.sac[SACDICT["s_takeoff_angle"]] = s.takeoff_angle
tr.stats.sac[f"k{SACDICT['s_takeoff_angle']}"] = f"s_TkfAng"
return st_out
[docs]
def format_sac_headers_post_rotation(st):
"""
SAC headers do not update when rotating so we need to apply manual
changes to the azimuth, inclination and naming values
TODO is this necessary? Who is using the SAC headers and what info
do they need? Or can we just re-run SAC header appending?
:type st: obspy.core.stream.Stream
:param st: Stream to append SAC headers for
"""
azimuth_dict = {"E": 90., "N": 0., "Z": 0.}
inclin_dict = {"E": 0., "N": 0., "Z": -90.}
st_out = st.copy()
for tr in st_out:
comp = tr.stats.component
tr.stats.sac["kcmpnm"] = tr.stats.channel # TODO component or channel?
if comp in azimuth_dict.keys():
tr.stats.sac["cmpaz"] = azimuth_dict[comp]
tr.stats.sac["cmpinc"] = inclin_dict[comp]
elif comp == "R":
tr.stats.sac["cmpaz"] = tr.stats.sac["az"]
elif comp == "T":
tr.stats.sac["cmpaz"] = (tr.stats.sac["az"] + 90) % 360
return st_out
[docs]
def origin_time_from_sac_header(sac_header):
"""
Build a UTCDateTime origin time from values in the SAC header appended to
an ObsPy trace.
:type sac_header: obspy.core.util.attribdict.AttribDict
:param sac_header: SAC header built by `append_sac_header()`
:rtype: UTCDateTime
:return: event origin time built from SAC header
"""
year = sac_header["nzyear"]
jday = sac_header["nzjday"]
hour = sac_header["nzhour"]
min_ = sac_header["nzmin"]
sec_ = sac_header["nzsec"]
msec = sac_header["nzmsec"]
time_string = f"{year}-{jday:0>3}T{hour:0>2}:{min_:0>2}:{sec_:0>2}.{msec}"
return UTCDateTime(time_string)