#!/usr/bin/evn python3
"""
Python Seismogram Extraction and Processing (PySEP)
Download, pre-process, and organize seismic waveforms, event and station
metadata. Save waveforms as SAC files for use in moment tensor inversion and
adjoint tomography codes.
"""
import argparse
import logging
import os
import shutil
import sys
import yaml
import warnings
try:
import llnl_db_client # NOQA
except ImportError:
else:
_has_llnl = True
from glob import glob
from pathlib import Path
from obspy import UTCDateTime, Stream, Inventory, read, read_inventory
from obspy.clients.fdsn import Client
from obspy.clients.fdsn.header import FDSNBadRequestException
from obspy.clients.fdsn.mass_downloader import (RectangularDomain, Restrictions,
CircularDomain, MassDownloader)
from obspy.core.event import Event, Origin, Magnitude
from obspy.geodetics import kilometer2degrees
from pysep import logger, __version__
from pysep.utils.cap_sac import (append_sac_headers, write_cap_weights_files,
format_sac_header_w_taup_traveltimes,
format_sac_headers_post_rotation)
from pysep.utils.curtail import (curtail_by_station_distance_azimuth,
quality_check_waveforms_before_processing,
quality_check_waveforms_after_processing,
remove_traces_w_masked_data
)
from pysep.utils.fmt import format_event_tag, format_event_tag_legacy, get_codes
from pysep.utils.io import read_yaml, read_event_file, write_pysep_station_file
from pysep.utils.llnl import scale_llnl_waveform_amplitudes
from pysep.utils.process import (merge_gapped_data, trim_start_end_times,
resample_data, format_streams_for_rotation,
rotate_to_uvw, estimate_prefilter_corners)
from pysep.utils.plot import plot_source_receiver_map
from pysep.recsec import RecordSection
[docs]
class Pysep:
"""Download, preprocess, and save waveform data using ObsPy"""
def __init__(self, config_file=None, event_selection="default",
client="IRIS", origin_time=None, reference_time=None,
networks="*", stations="*", locations="*", channels="*",
station_ids=None, use_mass_download=False,
extra_download_pct=0.005,
event_latitude=None, event_longitude=None, event_depth_km=None,
event_magnitude=None,
remove_response=True, remove_clipped=True,
remove_insufficient_length=True, remove_masked_data=True,
water_level=60, detrend=True, demean=True, taper_percentage=0.,
rotate=None, pre_filt="default", fill_data_gaps=False,
gap_fraction=1.,
mindistance_km=0, maxdistance_km=20E3, minazimuth=0,
maxazimuth=360, minlatitude=None, minlongitude=None,
maxlatitude=None, maxlongitude=None, resample_freq=None,
scale_factor=1, phase_list=None, seconds_before_event=20,
seconds_after_event=20, seconds_before_ref=100,
seconds_after_ref=300, taup_model="ak135", output_unit="VEL",
user=None, password=None, client_debug=False,
log_level="DEBUG", timeout=600,
write_files="inv,event,stream,sac,config_file,station_list",
plot_files="all", llnl_db_path=None, output_dir=None,
overwrite=False, legacy_naming=False,
overwrite_event_tag=None,
**kwargs):
"""
.. note::
Parameters for general data gathering control
:type client: str
:param client: ObsPy FDSN client to query data from, e.g., IRIS, LLNL,
NCEDC or any FDSN clients accepted by ObsPy. Defaults to 'IRIS'
:type minlatitude: float
:param minlatitude: for event, station and waveform retrieval. Defines
the minimum latitude for a rectangular bounding box that is used to
search for data. Only used for events if `event_selection`=='search'
:type maxlatitude: float
:param maxlatitude: for event, station and waveform retrieval. Defines
the maximum latitude for a rectangular bounding box that is used to
search for data. Only used for events if `event_selection`=='search'
:type minlongitude: float
:param minlongitude: for event, station and waveform retrieval. Defines
the minimum longitude for a rectangular bounding box that is used to
search for data. Only used for events if `event_selection`=='search'
:type maxlongitude: float
:param maxlongitude: for event, station and waveform retrieval. Defines
the maximum longitude for a rectangular bounding box that is used to
search for data. Only used for events if `event_selection`=='search'
:type user: str
:param user: User ID if IRIS embargoes data behind passwords. This is
passed into the instantiation of `client`.
:type password: str
:param password: Password if IRIS embargoes data behind passwords. This
is passed into the instantiation of 'client'
:type use_mass_download: bool
:param use_mass_download: Use ObsPy's mass download option to download
all available stations in the region regardless of data provider.
:type client_debug: bool
:param client_debug: turn on DEBUG mode for the ObsPy FDSN client, which
outputs information-rich log messages to std out. Use for debugging
when FDSN fails mysteriously.
:type timeout: float
:param timeout: time out time in units of seconds, passed to the
`client` to determine how long to wait for return data before
exiting. Defaults to 600s.
:type llnl_db_path: str
:param llnl_db_path: If `client`=='LLNL', PySEP assumes we are accesing
data from the LLNL waveeform database (which must be stored local).
Points to the path where this is saved.
.. note::
Event selection parameters
:type event_selection: str
:param event_selection: How to define the Event which is used to define
the event origin time and hypocentral location.
- 'default': User defines Event `origin_time`, and location with
`event_latitude` and `event_longitude`
- 'search': PySEP will use `client` to search for a Catalog event
defined by `event_origintime`, `event_magnitude` and
`event_depth_km`. Buffer time around the `origin_time` can
be defined by `seconds_before_event` and `seconds_after_event`.
:type origin_time: str
:param origin_time: the event origin time used as a central reference
point for data gathering. Must be in a string format that is
recognized by ObsPy UTCDateTime. For example '2000-01-01T00:00:00'.
:type event_latitude: float
:param event_latitude: latitude of the event in units of degrees.
used for defining the event hypocenter and for removing stations
based on distance from the event.
:type event_longitude: float
:param event_longitude: longitude of the event in units of degrees.
used for defining the event hypocenter and for removing stations
based on distance from the event.
:type event_depth_km: float or NoneType
:param event_depth_km: depth of event in units of kilometers. postive
values for deeper depths. Used for:
1) `event_selection`=='search'
2) estimating phase arrivals with TauP
3) plotting events and title on source receiver maps
If set to None, (2) and (3) will fail. Best-guesses are acceptable.
:type event_magnitude: float or NoneType
:param event_magnitude: event magnitude in Mw used for
`event_selection`=='search' and source receiver map plotting. If
provided as None, map plotting will fail.
:type seconds_before_event: float
:param seconds_before_event: For event selection only, only used if
`event_selection`=='search'. Time [s] before given `origin_time` to
search for a matching catalog event from the given `client`
:type seconds_after_event: float
:param seconds_after_event: For event selection only, only used if
`event_selection`=='search'. Time [s] after given `origin_time` to
search for a matching catalog event from the given `client`
.. note::
Waveform and station metadata gathering parameters
:type reference_time: str
:param reference_time: Waveform origin time. If not given, defaults to
the event origin time. This allows for a static time shift from the
event origin time, e.g., if there are timing errors with relation
to the `origin_time`. Defaults to NoneType (`origin_time`).
:type seconds_before_ref: float
:param seconds_before_ref: For waveform fetching. Defines the time
before `reference_time` to fetch waveform data. Units [s]
:type seconds_after_ref: float
:param seconds_after_ref: For waveform fetching. Defines the time
after `reference_time` to fetch waveform data. Units [s]
:type extra_download_pct: float
:param extra_download_pct: extra download percentage. Adds a buffer
around `origin_time` + `seconds_before_ref` + `extra_download_pct`
(also -`seconds_after_ref`), which gathers a bit of extra data which
will be trimmed away. Used because gathering data directly at the
requested time limits may lead to shorter expected waveforms after
resampling or preprocessing procedures. Given as a percent [0,1],
defaults to .5%.
:type networks: str
:param networks: name or names of networks to query for, if names
plural, must be a comma-separated list, i.e., 'AK,AT,AV'. Wildcards
okay, defaults to '*'.
:type stations: str
:param stations: station name or names to query for. If multiple
stations, input as a list of comma-separated values, e.g.,
'STA01,STA02,STA03'. Wildcards acceptable, if using wildcards, use
a '-' to exclude stations (e.g., '*,-STA01' will gather all stations
available, except STA01. Defaults to '*'
:type locations: str
:param locations: locations name or names to query for, wildcard okay.
See `stations` for inputting multiple location values. Default '*'.
:type channels: str
:param channels: channel name or names to query for, wildcard okay. If
multiple stations, input as a list of comma-separated values, e.g.,
'HH?,BH?'. Wildcards acceptable. Defaults to '*'.
:type station_ids: list of str
:param station_ids: an alternative to gathering based on individual
codes, allow the user to input a direct list of trace IDs which
will be broken up and used to gather waveforms and metadata.
NOTE: OVERRIDES `network`, `stations`, `locations`, and `channels`,
these parameters will NOT be used.
Station ids should be provided as: ['NN.SSS.LL.CCC', ...]
.. note::
Station removal and curtailing parameters
:type mindistance_km: float
:param mindistance_km: Used for removing stations and mass download
option
- Removing stations: Remove any stations who are closer than the
given minimum distance away from event (units: km). Always applied
- Mass Download: If `use_mass_download` is True and
`domain_type`=='circular', defines the minimum radius around the
event hypocenter to gather waveform data and station metadata
:type maxdistance_km: float
:param maxdistance_km: Used for removing stations and mass download
option
- Removing stations: Remove any stations who are farther than the
given maximum distance away from event (units: km). Always applied
- Mass Download: If `use_mass_download` is True and
`domain_type`=='circular', defines the maximum radius around the
event hypocenter to gather waveform data and station metadata
:type minazimuth: float
:param minazimuth: for station removal. stations whose azimuth relative
to the event hypocenter that do not fall within the bounds
[`minazimuth`, `maxazimuth`] are removed from the final list.
Defaults to 0 degrees.
:param minazimuth: for station removal. stations whose azimuth relative
to the event hypocenter that do not fall within the bounds
[`minazimuth`, `maxazimuth`] are removed from the final list.
Defaults to 360 degrees.
:type remove_clipped: bool
:param remove_clipped: remove any clipped stations from gathered
stations. Checks the max amplitude of against a maximum value
expected for a 24 bit signal. Defaults False
:type remove_insufficient_length: bool
:param remove_insufficient_length: remove waveforms whose trace length
does not match the average (mode) trace length in the stream.
Defaults to True
:type remove_masked_data: bool
:param remove_masked_data: If `fill_data_gaps` is False or None, data
with gaps that go through the merge process will contain masked
arrays (essentially retaining gaps). By default, PySEP will remove
these data during processing. To keep this data, set
`remove_masked_data` == True.
:type fill_data_gaps: str or int or float or bool
:param fill_data_gaps: How to deal with data gaps (missing sections of
waveform over a continuous time span). False by default, which
means data with gaps are removed completely. Users who want access
to data with gaps must choose how gaps are filled. See API for
ObsPy.core.stream.Stream.merge() for how merge is handled:
Options include:
- 'mean': fill with the mean of all data values in the gappy data
- <int or float>: fill with a constant, user-defined value, e.g.,
0 or 1.23 or 9.999
- 'interpolate': linearly interpolate from the last value pre-gap
to the first value post-gap
- 'latest': fill with the last value of pre-gap data
- False: do not fill data gaps, which will lead to stations w/
data gaps being removed.
NOTE: Be careful about data types, as there are no checks that the
fill value matches the internal data types. This may cause
unexpected errors.
:type gap_fraction: float
:param gap_fraction: if `fill_data_gaps` is not None, determines the
maximum allowable fraction (percentage) of data that gaps can
comprise. For example, a value of 0.3 means that 30% of the data
(in samples) can be gaps that will be filled by `fill_data_gaps`.
Traces with gap fractions that exceed this value will be removed.
Defaults to 1. (100%) of data can be gaps.
.. note::
Data processing parameters
:type detrend: bool
:param detrend: apply simple linear detrend as the first preprocessing
step
:type demean: bool
:param demean: apply demeaning to data during instrument reseponse
removal. Only applied if `remove_response` == True.
:type taper_percentage: float
:param taper_percentage: apply a taper to the waveform with ObsPy taper,
fraction between 0 and 1 as the percentage of the waveform to be
tapered Applied generally used when data is noisy, e.g.,
HutchisonGhosh2016
Note: To get the same results as the default taper in SAC,
use max_percentage=0.05 and leave type as hann.
Tapering also happens while resampling (see util_write_cap.py).
Only applied if `remove_response` == True.
:type rotate: list of str or NoneType
:param rotate: choose how to rotate the waveform data. pre-rotation
processing will be applied. Can include the following options
(order insensitive):
* ZNE: Rotate from arbitrary components to North, East, Up
* RTZ: Rotate from ZNE to Radial, Transverse, Up
* UVW: Rotate from ZNE to orthogonal UVW orientation
If set to None, no rotation processing will take place.
:type resample_freq: float
:param resample_freq: frequency to resample data in units Hz. If not
given, no data resampling will take place. Defaults to NoneType
:type scale_factor: float
:param scale_factor: scale all data by a constant factor
Note: for CAP use 10**2 (to convert m/s to cm/s).
Defaults to NoneType (no scaling applied)
.. note::
Instrument response removal parameters
:type remove_response: bool
:param remove_response: remove instrument response using station
response information gathered from `client`. Defaults to True.
:type output_unit: str
:param output_unit: the output format of the waveforms if instrument
response removal is applied. Only relevant if
`remove_response`==True. See ObsPy.core.trace.Trace.remove_response
for acceptable values. Typical values are: 'DISP', 'VEL', 'ACC'
(displacement [m], velocity [m/s], acceleration [m/s^2]).
:type water_level: float or None
:param water_level: a water level threshold to apply during filtering
for small values. Passed to Obspy.core.trace.Trace.remove_response
:type pre_filt: str, tuple or NoneType
:param pre_filt: apply a pre-filter to the waveforms before deconvolving
instrument response. Options are:
* 'default': automatically calculate (f0, f1, f2, f3) based on the
length of the waveform (dictating longest allowable period) and
the sampling rate (dictating shortest allowable period). This is
the default behavior.
* NoneType: do not apply any pre-filtering
* tuple of float: (f0, f1, f2, f3) define the corners of your pre
filter in units of frequency (Hz)
.. note::
SAC header control parameters
:type phase_list: list of str
:param phase_list: phase names to get ray information from TauP with.
Defaults to 'ttall', which is ObsPy's default for getting all phase
arrivals. Must match Phases expected by TauP (see ObsPy TauP
documentation for acceptable phases). Earliest P and S phase
arrivals will be added to SAC headers, the remainder will be
discarded.
:type taup_model: str
:param taup_model: name of TauP model to use to calculate phase arrivals
See also `phase_list` which defines phases to grab arrival data
for. Defaults to 'AK135'. See ObsPy TauP documentation for avilable
models.
.. note::
PySEP Configuration parameters
:type config_file: str
:param config_file: path to YAML configuration file which will be used
to overwrite internal default parameters. Used for command-line
version of PySEP
:type log_level: str
:param log_level: Level of verbosity for the internal PySEP logger.
In decreasing order of verbosity: 'DEBUG', 'INFO', 'WARNING',
'CRITICAL'
:type legacy_naming: bool
:param legacy_naming: if True, revert to old PySEP naming schema for
event tags, which is responsible for naming the output directory and
SAC files. Legacy filenames look something like
'20000101000000.NN.SSS.LL.CC.c' (event origin time, network,
station, location, channel, component). Default to False
:type overwrite_event_tag: str or bool
:param overwrite_event_tag: option to allow the user to set their own
event tag, rather than the automatically generated one.
- NoneType (default): use automatically generated event tag which
consists of event origin time and Flinn-Engdahl region
- '': empty string will dump ALL files into `output_dir`, no new
directories will be made
- str: User-defined event tag which will be created in `output_dir`,
all files will be stored in {output_dir}/{overwrite_event_tag}/*
.. note::
Output file and figure control
:type write_files: str or NoneType
:param write_files: Which files to write out after data gathering.
1) User-defined comma-separated list of the following
- weights_az: write out CAP weight file sorted by azimuth
- weights_dist: write out CAP weight file sorted by distance
- weights_code: write out CAP weight file sorted by station code
- station_list: write out a text file with station information
- inv: save a StationXML (.xml) file (ObsPy inventory)
- event: save a QuakeML (.xml) file (ObsPy Catalog)
- stream: save an ObsPy stream in Mseed (.ms) (ObsPy Stream)
- config_file: save YAML config file w/ all input parameters
- sac: save all waveforms as SAC (.sac) files w/ correct headers
- sac_raw: save raw waveforms. these are straight from the
data center with no quality check and no SAC headers
- sac_zne: save only ZNE channel SAC files
- sac_rtz: save only RTZ channel SAC files
- sac_uvw: save only UVW channel SAC files
Example input: `write_files`=='inv,event,stream,sac'
By Default: 'inv,event,stream,sac,config_file,station_list'
2) If NoneType or an empty string, no files will be written.
3) If 'all', write all files listed in (1)
:type plot_files: str or NoneType
:param write_files: What to plot after data gathering.
Should be a comma-separated list of the following:
- map: plot a source-receiver map with event and all stations
- record_section: plot a record section with default parameters
- all: plot all of the above (default value)
If None, no files will be plotted.
:type output_dir: str
:param output_dir: path to output directory where all the files and
figures defined by `write_files` and `plot_files` will be stored.
Defaults to the current working directory.
:type overwrite: bool
:param overwrite: If True, overwrite an existing PySEP event directory.
This prevents Users from re-downloading data. Defaults to False.
"""
# Internal attribute but define first so that it sits at the top of
# written config files
self.event_tag = None
self.config_file = config_file
# ObsPy client-related parameters
self.client = client
self.client_debug = bool(client_debug)
self.timeout = timeout
self._user = user
self._password = password
self.taup_model = taup_model
self.use_mass_download = use_mass_download
self._extra_download_pct = extra_download_pct
# Check for LLNL requirement
if self.client == "LLNL" and not _has_llnl:
raise ImportError(f"`client`=='LLNL' requires optional "
f"dependency 'llnl_db_client' which was not "
f"found. Please reinstall PySEP with the command "
f"'pip install -e .[llnl]")
# Parameters related to event selection
self.event_selection = event_selection
try:
self.origin_time = UTCDateTime(origin_time)
except TypeError:
self.origin_time = None
# Force float type to avoid rounding errors
self.seconds_before_event = seconds_before_event
self.seconds_after_event = seconds_after_event
# Optional: if User wants to define an event on their own.
# `event_depth_km` and `event_magnitude` are also used for client query
self.event_latitude = event_latitude
self.event_longitude = event_longitude
self.event_depth_km = event_depth_km
self.event_magnitude = event_magnitude
# Waveform and StationXML gathering parameters
self.networks = networks
self.stations = stations
self.channels = channels
self.locations = locations
self.station_ids = station_ids
# Waveform collection parameters
self.reference_time = reference_time or self.origin_time
self.seconds_before_ref = seconds_before_ref
self.seconds_after_ref = seconds_after_ref
if phase_list is None:
self.phase_list = ["ttall"] # default ObsPy value gather ALL phase
else:
self.phase_list = phase_list
# NOTE: This default is a UAF LUNGS system-specific database path.
# If you are not on LUNGS, you will need to set this path manually
self.llnl_db_path = (
llnl_db_path or
"/store/raw/LLNL/UCRL-MI-222502/westernus.wfdisc"
)
# Event and station search and curtailing criteria
self.mindistance_km = mindistance_km
self.maxdistance_km = maxdistance_km
self.minazimuth = minazimuth
self.maxazimuth = maxazimuth
self.minlatitude = minlatitude
self.maxlatitude = maxlatitude
self.minlongitude = minlongitude
self.maxlongitude = maxlongitude
# Preprocessing flags
self.demean = bool(demean)
self.detrend = bool(detrend)
self.taper_percentage = taper_percentage
self.rotate = rotate
self.remove_response = bool(remove_response)
self.output_unit = output_unit
self.water_level = water_level
self.pre_filt = pre_filt
self.scale_factor = scale_factor
self.resample_freq = resample_freq
self.remove_clipped = bool(remove_clipped)
self.remove_insufficient_length = remove_insufficient_length
self.remove_masked_data = remove_masked_data
self.fill_data_gaps = fill_data_gaps
self.gap_fraction = gap_fraction
# Program related parameters
self._output_dir = output_dir or os.getcwd()
self.output_dir = None # is overwritten w `_event_tag_and_output_dir`
self.write_files = write_files
self.plot_files = plot_files
self.log_level = log_level
self._legacy_naming = legacy_naming
self._overwrite = overwrite
self._overwrite_event_tag = overwrite_event_tag
# Internally filled attributes
self.c = None
self.st = None
self.inv = None
self.event = None
self.st_raw = None
# Allow the user to manipulate the logger during __init__
if log_level is not None:
logger.debug(f"`log_level` set to {log_level}")
logger.setLevel(log_level)
else:
logger.disabled = True
# Allow User to throw in general kwargs. This allows things to be
# more general, but also may obscure some parameters.
self.kwargs = kwargs
[docs]
def check(self):
"""
Check input parameter validity against expected Pysep behavior
"""
self.origin_time = UTCDateTime(self.origin_time)
if self.reference_time is None:
self.reference_time = self.origin_time
self.reference_time = UTCDateTime(self.reference_time)
if self.event_selection == "search" or self.client == "LLNL":
for par in [self.seconds_before_event, self.seconds_after_event]:
assert(par is not None), (
"Event selection requires the following parameters: "
"`seconds_before_event` and `seconds_after_event`")
elif self.event_selection == "default":
for par in [self.event_latitude, self.event_longitude]:
assert(par is not None), (
"`event_selection`=='default' requires "
"`event_latitude`, `event_longitude`, `event_depth_km` "
"and `event_magnitude`")
else:
raise ValueError("`event_selection` must be one of the following: "
"'search' or 'default'")
if self.event_depth_km is None:
logger.warning("TauP arrival times will be incorrect as no depth "
"information is provided")
if self.client.upper() == "LLNL":
assert(os.path.exists(self.llnl_db_path)), (
f"`llnl_db_path` {self.llnl_db_path} does not exist but must "
f"if `client`=='LLNL'. Please check path and try again"
)
if self.client.upper() == "NCEDC":
assert("-" not in self.stations), (
"`client`=='NCEDC' does not allow for '-' in station codes"
)
if self.networks == "*" and self.client.upper() == "IRIS":
logger.warning("`networks`=='*' will search ALL networks, which "
"may take a long time depending on data availability"
)
if self.station_ids is not None:
logger.debug("`station_ids` provided, ignoring parameters "
"`networks`, `stations`, `locations` and `channels`")
for id_ in self.station_ids:
assert(len(id_.split(".")) == 4), (
f"`station_id` entries must have format NN.SSS.LL.CC; {id_}"
)
if self.use_mass_download is True:
logger.info("will use option `mass_download`, ignoring `client` "
"and downloading data from all available data centers")
if not (0 <= self.minazimuth <= 360):
_old_val = self.minazimuth
self.minazimuth = self.minazimuth % 360
logger.warning(f"0 <= `minazimuth` <= 360; "
f"{_old_val} -> {self.minazimuth}")
if not (0 <= self.maxazimuth <= 360):
_old_val = self.maxazimuth
self.maxazimuth = self.maxazimuth % 360
logger.warning(f"0 <= `maxazimuth` <= 360; "
f"{_old_val} -> {self.maxazimuth}")
if self.rotate is not None:
acceptable_rotations = {"RTZ", "UVW", "ENZ"}
self.rotate = ["".join(sorted(val.upper())) for val in self.rotate]
assert(set(self.rotate).issubset(acceptable_rotations)), (
f"`rotate` must be a subset of: {acceptable_rotations}"
)
if "RTZ" in self.rotate:
assert("UVW" not in self.rotate), (
f"rotate can only have one of the following: 'UVW', 'RTZ'"
)
acceptable_units = ["DISP", "VEL", "ACC", "DEF"]
self.output_unit = self.output_unit.upper()
assert(self.output_unit in acceptable_units), (
f"unnacceptable `output_unit` {self.output_unit}, must be in "
f"{acceptable_units}")
if self.pre_filt not in [None, "default"]:
assert(len(self.pre_filt) == 4), (
f"`pre_filt` must be a tuple of length 4, representing four "
f"corner frequencies for a bandpass filter (f1, f2, f3, f4)"
)
acceptable_fill_vals = ["mean", "interpolate", "latest"]
if self.fill_data_gaps is not False:
if isinstance(self.fill_data_gaps, str):
assert(self.fill_data_gaps in acceptable_fill_vals), \
f"`fill_data_gaps` must be one of {acceptable_fill_vals}"
# Enforce acceptable writing options
acceptable_write_files = self.write(_return_filenames=True)
# Empty string or NoneType translates to 'dont write anything'
if not self.write_files:
self.write_files = {}
# Default behavior, write everything under the sun
elif self.write_files == "all":
self.write_files = acceptable_write_files
# User-defined, comma-separated list of values which must match
# against acceptable types
else:
try:
self.write_files = set(self.write_files.split(","))
# TypeError thrown if we're trying to do {{*}}
except TypeError:
pass
assert(self.write_files.issubset(acceptable_write_files)), (
f"`write_files` must be a list of some or all of: "
f"{acceptable_write_files}"
)
# Enforce acceptable plotting options
acceptable_plot_files = {"map", "record_section"}
# Empty string or NoneType translate to do not plot
if not self.plot_files:
self.plot_files = {}
# Default behavior, plot everything under the sun
elif self.plot_files == "all":
self.plot_files = acceptable_plot_files
# User-defined, comma-separated list of values which must match
# against acceptable types
else:
try:
self.plot_files = set(self.plot_files.split(","))
except TypeError:
pass
assert(self.plot_files.issubset(acceptable_plot_files)), (
f"`plot_files` must be a list of some or all of: "
f"{acceptable_plot_files}"
)
# Force all time boundaries to be floats to avoid rounding errors
self.seconds_before_ref = float(self.seconds_before_ref)
self.seconds_after_ref = float(self.seconds_after_ref)
self.seconds_before_event = float(self.seconds_before_event)
self.seconds_after_event = float(self.seconds_after_event)
[docs]
def get_client(self):
"""
Options to choose different Clients based on attribute `client` which
will be used to gather waveforms and metadata
:rtype: obspy.clients.fdsn.client.Client
:return: Client used to gather waveforms and metadata
"""
if self.client is None:
return None
# Lawrence Livermore Natinoal Lab internal waveform database
if self.client.upper() == "LLNL":
c = llnl_db_client.LLNLDBClient(self.llnl_db_path)
# IRIS DMC PH5WS Station Web Service
elif self.client.upper() == "PH5":
c = Client(
"http://service.iris.edu", debug=self.client_debug,
service_mappings={
"station": "http://service.iris.edu/ph5ws/station/1",
"dataselect": "http://service.iris.edu/ph5ws/dataselect/1"
}, user=self._user, password=self._password
)
# Default ObsPy FDSN webservice client
else:
c = Client(self.client, user=self._user, password=self._password,
debug=self.client_debug, timeout=self.timeout)
return c
[docs]
def load(self, config_file=None, overwrite_event=True):
"""
Overwrite default parameters using a YAML config file
:type config_file: str
:param config_file: YAML configuration file to load from
:type overwrite_event: bool
:param overwrite_event: overwrite event search parameters (origin time,
lat, lon etc.) from the YAML config file. Defaults to True
"""
ignore_keys = []
if config_file is None:
config_file = self.config_file
if not overwrite_event:
logger.info("will NOT overwrite event search parameters (including "
"origin and reference time) with config file")
# Set parameters to ignore when overwriting from config file
ignore_keys = ["origin_time", "reference_time", "event_latitude",
"event_longitude", "event_depth_km",
"event_magnitude"]
if config_file is not None:
logger.info(f"overwriting default parameters with config file: "
f"'{config_file}'")
config = read_yaml(config_file)
for key, val in config.items():
if hasattr(self, key):
if key in ignore_keys:
continue
old_val = getattr(self, key)
if val != old_val:
logger.debug(f"{key}: {old_val} -> {val}")
setattr(self, key, val)
else:
logger.warning(f"config parameter '{key}' not explicitely "
f"used by PySEP. adding to kwargs")
self.kwargs[key] = val
# Reset log level based on the config file
if self.log_level is not None:
logger.debug(f"`log_level` set to {self.log_level}")
logger.setLevel(self.log_level)
else:
logger.disabled = True
[docs]
def get_event(self):
"""
Exposed API for grabbing event metadata depending on the
`event_selection` choice.
Options for `event_selection` are:
'search': query FDSN with event parameters
'default': create an event from scratch using user parameters
or if 'client'=='LLNL', grab event from internal database
:rtype: obspy.core.event.Event
:return: Matching event given event criteria. If multiple events are
returned with the query, returns the first in the catalog
"""
if self.event_selection == "search":
event = self._query_event_from_client()
elif self.event_selection == "default":
if self.client.upper() == "LLNL":
event = self._get_event_from_llnl_catalog()
else:
event = self._create_event_from_scratch()
else:
event = None
# This is how we will access the event info so print out and check
lat = event.preferred_origin().latitude
lon = event.preferred_origin().longitude
otime = event.preferred_origin().time
# These values may not be present. General exception because its only
# used for log statement so not that important.
try:
depth_km = event.preferred_origin().depth * 1E-3
except Exception: # NOQA
depth_km = None
try:
mag = event.preferred_magnitude().mag
except Exception: # NOQA
mag = None
logger.info(f"event info summary - origin time: {otime}; "
f"lat={lat:.2f}; lon={lon:.2f}; depth[km]={depth_km}; "
f"magnitude={mag}")
return event
[docs]
def _query_event_from_client(self, magnitude_buffer=0.1,
depth_buffer_km=1.):
"""
Retrieve an event catalog using ObsPy Client.get_events().
Searches Client for a given origin time, location, depth (optional)
and magnitude (optional).
To use this, set attribute `event_selection`=='search'
:type magnitude_buffer: float
:param magnitude_buffer: if attribute `event_magnitude` is given,
will search events for events with magnitude:
event_magnitude +/- magnitude_buffer
:type depth_buffer_km: float
:param depth_buffer_km: if attribute `event_depth_km` is given,
will search events for events with depth:
event_depth_km +/- depth_buffer_km
:rtype: obspy.core.event.Event
:return: Matching event given event criteria. If multiple events are
returned with the query, returns the first in the catalog
"""
logger.info(f"getting event information with client {self.client}")
# Allow user to guess magnitude of the event we're searching for
if self.event_magnitude is not None:
minmagnitude = self.event_magnitude - magnitude_buffer
maxmagnitude = self.event_magnitude + magnitude_buffer
else:
logger.info("no `magnitude` specified, this will make the search "
"criteria more broad. If the returned catalog is too "
"large, consider setting a guess value for `magnitude`")
minmagnitude = None
maxmagnitude = None
# Allow user to guess depth of the event we're searching for
if self.event_depth_km is not None:
mindepth = self.event_depth_km - depth_buffer_km
maxdepth = self.event_depth_km + depth_buffer_km
else:
logger.info("no `depth` specified, this will make the search "
"criteria more broad. If the returned catalog is too "
"large, consider setting a guess value for `depth`")
mindepth = None
maxdepth = None
cat = self.c.get_events(
starttime=self.origin_time - self.seconds_before_event,
endtime=self.origin_time + self.seconds_after_event,
minlatitude=self.minlatitude, maxlatitude=self.maxlatitude,
minlongitude=self.minlongitude, maxlongitude=self.maxlongitude,
minmagnitude=minmagnitude, maxmagnitude=maxmagnitude,
mindepth=mindepth, maxdepth=maxdepth, debug=self.client_debug,
)
logger.info(f"{len(cat)} matching events found for "
f"{self.seconds_before_event}s < {self.origin_time} "
f"< {self.seconds_after_event}, choosing first")
event = cat[0]
return event
[docs]
def _create_event_from_scratch(self):
"""
Make a barebones event object based on user-defined parameters which
will then be used to query for waveforms and StationXML data
:rtype: obspy.core.event.Event
:return: Event object with origin and magnitude information appended
"""
logger.info("creating event metadata with user parameters")
if self.event_depth_km is not None:
depth = self.event_depth_km * 1E3
else:
depth = None
origin = Origin(latitude=self.event_latitude,
longitude=self.event_longitude,
depth=depth, # units: m
time=self.origin_time
)
if self.event_magnitude:
magnitude = Magnitude(mag=self.event_magnitude, magnitude_type="Mw")
magnitudes = [magnitude]
else:
magnitudes = []
event = Event(origins=[origin], magnitudes=magnitudes)
event.preferred_origin_id = origin.resource_id.id
if magnitudes:
event.preferred_magnitude_id = magnitude.resource_id.id
return event
[docs]
def _get_event_from_llnl_catalog(self):
"""
Special getter function for Lawrence Livermore National Lab data
LLNL database has a special client
TODO Do we need more filtering in the catalog?
:rtype: obspy.core.event.Event
:return: Event information queried from LLNL database
"""
logger.info("getting event information from LLNL database")
cat = self.c.get_catalog()
mintime_str = f"time > {self.origin_time - self.seconds_before_event}"
maxtime_str = f"time < {self.origin_time + self.seconds_after_event}"
cat = cat.filter(mintime_str, maxtime_str)
logger.debug(f"{len(cat)} events in LLNL catalog, picking zeroth index")
event = cat[0]
# Apply preferred magnitude and origin if not already
if not event.preferred_magnitude():
event.preferred_magnitude_id = event.magnitudes[0].resource_id.id
if not event.preferred_origin():
event.preferred_origin_id = event.origins[0].resource_id.id
return event
[docs]
def get_stations(self):
"""
Exposed API for grabbing station metadata from client.
Download station metadata using ObsPy get_stations() with a user-defined
bounding box and for user-defined networks, stations etc.
:rtype: obspy.core.inventory.Inventory
:return: Station metadata queried from Client
"""
logger.info(f"querying {self.client.upper()} for station metadata")
if self.client.upper() == "LLNL":
# LLNL DB client behaves differently than ObsPy clients
inv = self.c.get_inventory()
inv = inv.select(
network=self.networks, location=self.locations,
station=self.stations, channel=self.channels,
starttime=self.origin_time - self.seconds_before_ref,
endtime=self.origin_time + self.seconds_after_ref,
minlatitude=self.minlatitude, maxlatitude=self.maxlatitude,
minlongitude=self.minlongitude,
maxlongitude=self.maxlongitude
)
elif self.station_ids is not None:
inv = Inventory()
# Gather data based on individual station IDs, one at a time
for id_ in self.station_ids:
net, sta, loc, cha = id_.split(".")
inv += self.c.get_stations(
network=net, location=loc, station=sta, channel=cha,
starttime=self.origin_time - self.seconds_before_ref,
endtime=self.origin_time + self.seconds_after_ref,
minlatitude=self.minlatitude, maxlatitude=self.maxlatitude,
minlongitude=self.minlongitude,
maxlongitude=self.maxlongitude, level="response"
)
else:
# Standard behavior is to simply wrap get_stations()
inv = self.c.get_stations(
network=self.networks, location=self.locations,
station=self.stations, channel=self.channels,
starttime=self.origin_time - self.seconds_before_ref,
endtime=self.origin_time + self.seconds_after_ref,
minlatitude=self.minlatitude, maxlatitude=self.maxlatitude,
minlongitude=self.minlongitude, maxlongitude=self.maxlongitude,
level="response"
)
nnet = len(inv)
ncha = len(inv.get_contents()["channels"])
nsta = len(inv.get_contents()["stations"])
logger.info(f"collected {ncha} channels from {nsta} stations in "
f"{nnet} networks from {self.client}")
return inv
[docs]
def mass_download(self):
"""
Use ObsPy Mass downloader to grab events from a pre-determined region
Keyword Arguments
::
str domain_type:
How to define the search region domain
- rectangular: rectangular bounding box defined by min/max
latitude/longitude
- circular: circular bounding circle defined by the events
latitude and longitude, with radii defined by `mindistance_km`
and `maxdistance_km`
bool delete_tmpdir:
Remove the temporary directories that store the MSEED and
StationXML files which were downloaded by the mass downloader.
Saves space but also if anything fails prior to saving data,
the downloaded data will not be saved. Defaults to True.
"""
domain_type = self.kwargs.get("domain_type", "rectangular")
delete_tmpdir = self.kwargs.get("delete_tmpdir", True)
# Get around the fact that command line arguments are input as strings
if isinstance(delete_tmpdir, str):
assert(delete_tmpdir.capitalize() in ["True", "False"])
delete_tmpdir = bool(delete_tmpdir.capitalize() == "True")
logger.info("using ObsPy mass downloader to download waveform and "
"station metadata")
# Define the bounding box/circle that specifies our region of interest
if domain_type == "rectangular":
logger.info("using a rectangular domain for mass downloader")
domain = RectangularDomain(minlatitude=self.minlatitude,
maxlatitude=self.maxlatitude,
minlongitude=self.minlongitude,
maxlongitude=self.maxlongitude)
elif domain_type == "circular":
logger.info("using a circular domain for mass downloader")
domain = CircularDomain(
latitude=self.event_latitude, longitude=self.event_longitude,
minradius=kilometer2degrees(self.mindistance_km),
maxradius=kilometer2degrees(self.maxdistance_km)
)
else:
raise NotImplementedError(f"`domain_type` must be 'rectangular' or"
f"'circular'")
# Drop any excluded stations
stations = ",".join([sta for sta in self.stations.split(",")
if "-" not in sta])
sta_exclude = [sta[1:] for sta in self.stations.split(",")
if "-" in sta]
networks = ",".join([sta for sta in self.networks.split(",")
if "-" not in sta])
net_exclude = [net[1:] for net in self.networks.split(",")
if "-" in net]
# Set restrictions on the search criteria for data
# Gather x% more on either side of the requested data incase
# resampling changes the start and end times. These will get trimmed.
restrictions = Restrictions(
starttime=self.reference_time - self.seconds_before_ref * (
1 + self._extra_download_pct),
endtime=self.reference_time + self.seconds_after_ref * (
1 + self._extra_download_pct),
reject_channels_with_gaps=False, minimum_length=0.,
network=networks, station=stations, location=self.locations,
channel=self.channels, exclude_networks=net_exclude,
exclude_stations=sta_exclude,
)
# Mass downloader will download files to a temp directory which we
# will read back from to continue the workflow
tmp_dir = os.path.join(self.output_dir, "tmpdir_md")
tmp_wav = os.path.join(tmp_dir, "waveforms")
tmp_inv = os.path.join(tmp_dir, "inventory")
mdl = MassDownloader()
mdl.download(domain, restrictions, mseed_storage=tmp_wav,
stationxml_storage=tmp_inv, download_chunk_size_in_mb=20,
threads_per_client=3, print_report=True)
# Read back in waveforms and stationxml data
st = Stream()
for fid in glob(os.path.join(tmp_wav, "*.mseed")):
st += read(fid)
logger.info(f"mass downloader downloaded {len(st)} traces")
inv = Inventory()
for fid in glob(os.path.join(tmp_inv, "*.xml")):
inv += read_inventory(fid)
# Delete the tmpdir
if delete_tmpdir:
logger.info("deleting temporary mass downloader directories")
shutil.rmtree(tmp_dir)
return st, inv
[docs]
def curtail_stations(self):
"""
Remove stations from `inv` based on station distance, azimuth, etc.
.. note::
One-function function currently, but we can expand curtailing here
if need by
:rtype: obspy.core.inventory.Inventory
:return: station metadata that has been curtailed based on acceptable
paramaters
"""
inv = self.inv.copy()
inv = curtail_by_station_distance_azimuth(
event=self.event, inv=inv, mindistance_km=self.mindistance_km,
maxdistance_km=self.maxdistance_km, minazimuth=self.minazimuth,
maxazimuth=self.maxazimuth
)
return inv
[docs]
def preprocess(self):
"""
Very simple preprocessing to remove response and apply a prefilter
scale waveforms (if necessary) and clean up waveform time series
:rtype: obspy.core.stream.Stream
:return: a preprocessed stream with response removed, amplitude scaled
(optional), and time series standardized
"""
st_out = self.st.copy()
if self.detrend:
logger.info(f"applying linear detrend to all data")
st_out.detrend("linear")
if self.remove_response:
logger.info(f"removing response, output units in: "
f"{self.output_unit}")
if self.pre_filt is not None:
logger.info(f"will apply pre-filter: {self.pre_filt}")
if self.client.upper() == "LLNL":
st_out = self._remove_response_llnl(st_out)
else:
for code in get_codes(st=self.st, choice="channel", suffix="?",
up_to=True):
net, sta, loc, cha = code.split(".")
st_sta = st_out.select(network=net, station=sta,
location=loc, channel=cha
)
for tr in st_sta:
# Get trace-dependent pre-filtering if desired
if self.pre_filt == "default":
_pre_filt = estimate_prefilter_corners(tr)
else:
_pre_filt = self.pre_filt
try:
tr.remove_response(
inventory=self.inv,
water_level=self.water_level,
pre_filt=_pre_filt,
taper=bool(self.taper_percentage),
taper_fraction=self.taper_percentage,
zero_mean=self.demean,
output=self.output_unit,
)
except ValueError as e:
logger.warning(f"can't remove response {code}: {e}"
f"removing trace from stream")
st_out.remove(tr)
if self.scale_factor:
logger.info(f"applying amplitude scale factor: {self.scale_factor}")
for tr in st_out:
tr.data = tr.data * self.scale_factor
tr.stats.sac["scale"] = self.scale_factor
if self.client == "LLNL":
# This won't do anything if we don't have any 'LL' network codes
st_out = scale_llnl_waveform_amplitudes(st_out)
# Apply pre-resample lowpass, resample waveforms, make contiguous
if self.resample_freq is not None:
st_out = resample_data(st_out, resample_freq=self.resample_freq)
# Remove or fill data gaps
st_out = merge_gapped_data(st_out, fill_value=self.fill_data_gaps,
gap_fraction=self.gap_fraction)
# Ensure that all traces have the same start and end
if self.origin_time:
st_out = trim_start_end_times(
st_out, starttime=self.origin_time - self.seconds_before_ref,
endtime=self.origin_time + self.seconds_after_ref,
fill_value=self.fill_data_gaps
)
# Merging or trimming may introduce masked data, may remove from Stream
if self.remove_masked_data:
st_out = remove_traces_w_masked_data(st_out)
if not st_out:
logger.critical("preprocessing removed all traces from Stream, "
"cannot proceed")
sys.exit(-1)
return st_out
[docs]
def _remove_response_llnl(self, st):
"""
Remove response information from LLNL stations. This requires using
the custom LLNL DB client. There are also some internal checks that
need to be bypassed else they cause the program to crash
"""
st_out = st.copy()
# Mimicing the client check statements to remove stations that
# have no response or matching time span
for tr in st_out[:]:
net = tr.stats.network
sta = tr.stats.station
cha = tr.stats.channel
# Check that response info is available
if (sta, cha) not in self.c.sensors:
logger.warning(f"no response for {net}.{sta}.{cha}")
st_out.remove(tr)
continue
# Check epoch times against the stream midpoint
time = tr.stats.starttime + \
(tr.stats.endtime - tr.stats.starttime) / 2.
for epoch in self.c.sensors[(sta, cha)]:
if epoch.starttime <= time <= epoch.endtime:
break
else:
logger.warning(f"wrong epoch for {net}.{sta}.{cha}")
try:
st_out.remove(tr)
except ValueError:
pass
continue
self.c.remove_response(st_out, water_level=self.water_level,
output=self.output_unit,
pre_filt=self.pre_filt)
return st_out
[docs]
def rotate_streams(self):
"""
Rotate arbitrary three-component seismograms to desired orientation
'ZNE', 'RTZ' or 'UVW'.
.. warning::
This function combines all traces, both rotated and non-rotated
components (ZNE, RTZ, UVW, but not raw, e.g., 12Z), into a single
stream. This is deemed okay because we don't do any
component-specific operations after rotation.
:rtype: obspy.core.stream.Stream
:return: a stream that has been rotated to desired coordinate system
with SAC headers that have been adjusted for the rotation, as well
as non-rotated streams which are saved incase user needs access to
other components
"""
st_raw = self.st.copy()
st_raw = format_streams_for_rotation(st_raw)
# For writing RAW seismograms (prior to ANY rotation). Must be
# specifically requested by User by explicitely adding 'raw' to
# `write_files` list
if "sac_raw" in self.write_files:
self.st_raw = st_raw.copy()
# Empty stream, so we can take advantage of class __add__ method
st_out = Stream()
# RTZ requires rotating to ZNE first. Make sure this happens even if the
# user doesn't specify ZNE rotation.
if "ENZ" in self.rotate or "RTZ" in self.rotate:
logger.info("rotating to components ZNE")
st_zne = st_raw.copy()
stations = set([tr.stats.station for tr in st_zne])
# Assuming each channel has its own azimuth and dip value
channels = set([f"{tr.stats.channel[:-1]}?" for tr in st_zne])
metadata_getter = self.inv.get_channel_metadata
for sta in stations:
for cha in channels:
_st = st_zne.select(station=sta, channel=cha)
# Check if 'dip' or 'azimuth' is None, because that causes
# ObsPy rotate to throw a TypeError. See PySEP Issue #35
channel_okay = bool(_st)
for _tr in _st:
try:
meta = metadata_getter(_tr.id, _tr.stats.starttime)
az = meta["azimuth"]
dip = meta["dip"]
except Exception: # NOQA
logger.warning(f"no matching metadata for {_tr.id}")
channel_okay = False
break
logger.debug(f"{_tr.id} azimuth=={az}; dip=={dip}")
if az is None or dip is None:
channel_okay = False
break
if not channel_okay:
logger.warning(f"{sta}.{cha} bad rotation metadata, "
f"removing")
continue
# components=['ZNE'] FORCES rotation using azimuth and dip
# values, even if components are already in 'ZNE'. This is
# important as some IRIS data will be in ZNE but not be
# aligned (https://github.com/obspy/obspy/issues/2056)
try:
_st.rotate(method="->ZNE", inventory=self.inv,
components=["ZNE", "Z12", "123"])
# General error catching for rotation because any number of
# things can go wrong here based on the ObsPy rotation algo
except Exception as e:
logger.warning(f"rotate issue for {sta}.{cha}, "
"removing from stream")
logger.debug(f"rotate error: {e}")
continue
st_out += _st
# Check to see if rotation errors kicked out all stations
if not st_out:
logger.critical("rotation errors have reduced Stream to len 0, "
"cannot continue")
sys.exit(-1)
# Rotate to radial transverse coordinate system
if "RTZ" in self.rotate:
logger.info("rotating to components RTZ")
# If we rotate the ENTIRE stream at once, ObsPy only uses the
# first backazimuth value which will create incorrect outputs
# https://github.com/obspy/obspy/issues/2623
st_rtz = st_out.copy() # contains ZNE rotated components
stations = set([tr.stats.station for tr in st_rtz])
for sta in stations:
_st = st_rtz.select(station=sta)
if _st and hasattr(_st[0].stats, "back_azimuth"):
_st.rotate(method="NE->RT") # in place rot.
st_out += _st
else:
logger.warning(f"no back azimuth for '{sta}', cannot "
f"rotate NE->RT")
continue
# Allow UVW rotation independent on ENZ or RTZ rotation
if "UVW" in self.rotate:
logger.info("rotating to components UVW")
st_uvw = rotate_to_uvw(st_raw)
st_out += st_uvw
try:
st_out = format_sac_headers_post_rotation(st_out)
except AttributeError as e:
logger.warning(f"cannot format SAC headers after rotating {e}")
return st_out
[docs]
def write(self, write_files=None, _return_filenames=False, _subset=None,
**kwargs):
"""
Write out various files specifying information about the collected
stations and waveforms.
Options are:
* config_file: write the current configuration as a YAML file
* station_list: write a text file with station information
* inv: write the inventory as a StationXML file
* event: write the event as a QuakeML file
* stream: write the stream as a single MSEED file
* sac_zne: write the stream as individual (per-channel) SAC files
for ZNE components with the appropriate SAC header
* sac_rtz: write out per-channel SAC files for RTZ components
* sac_uvw: write out per-channel SAC files for UVW components
* weights_dist: write out CAP 'weights.dat' file sorted by distance
* weights_az: write out CAP 'weights.dat' file sorted by azimuth
* weights_code: write out CAP 'weights.dat' file sorted by sta code
:type write_files: list of str
:param write_files: list of files that should be written out, must
match the acceptable list defined in the function or here in the
docstring. If not given, defaults to internal list of files
:type _return_filenames: bool
:param _return_filenames: internal flag to not actually write anything
but just return a list of acceptable filenames. This keeps all the
file naming definitions in one function. This is only required by
the check() function.
:type _subset: list
:param _subset: internal parameter used for intermediate file saving.
PySEP will attempt to save files once they have been collected
however if the files it tries to save do not match against the
User-defined file list, they will be ignored.
Keyword Arguments
::
str order_station_list_by:
how to order the station list available options are:
network, station, latitude, longitude, elevation, burial.
str config_fid:
optional name for the configuration file name defaults to
'pysep_config.yaml'
str station_fid:
optional name for the stations list file name defaults to
'station_list.txt'
str inv_fid:
optional name for saved ObsPy inventory object, defaults to
'inv.xml'
str event_fid:
optional name for saved ObsPy Event object, defaults to
'event.xml'
str stream_fid:
optional name for saved ObsPy Stream miniseed object,
defaults to 'stream.ms'
str sac_subdir:
sub-directory within output directory and event directory to
save SAC files. Defaults to SAC/. Use an empty string to dump
files directly into the event directory
"""
# Set some default values that can be overridden with kwargs
order_station_list_by = self.kwargs.get("order_station_list_by", None)
config_fid = self.kwargs.get("config_fid", "pysep_config.yaml")
station_fid = self.kwargs.get("station_fid", "station_file.txt")
inv_fid = self.kwargs.get("inv_fid", "inv.xml")
event_fid = self.kwargs.get("event_fid", "event.xml")
stream_fid = self.kwargs.get("stream_fid", "stream.ms")
sac_subdir = self.kwargs.get("sac_subdir", "SAC")
# This is defined here so that all these filenames are in one place,
# but really this set is just required by check(), not by write()
_acceptable_files = {"weights_az", "weights_dist", "weights_code",
"station_list", "inv", "event", "stream",
"config_file", "sac", "sac_raw", "sac_zne",
"sac_rtz", "sac_uvw"}
if _return_filenames:
return _acceptable_files
# Allow the user to call write() with their own set of filenames if this
# wasn't defined by the Config or this is being scripted and they only
# want certain files out at intermediate steps
if write_files is None:
write_files = self.write_files
else:
write_files = set(write_files)
assert(write_files.issubset(_acceptable_files)), (
f"`write_files` must be a list of some or all of: "
f"{_acceptable_files}"
)
# Allow for internal intermediate file saving validated against
# user-defined list
if _subset:
write_files = write_files.intersection(set(_subset))
for weights_fid in ["weights_dist", "weights_az", "weights_code"]:
if weights_fid in write_files:
order_by = weights_fid.split("_")[1]
write_cap_weights_files(st=self.st, order_by=order_by,
path_out=self.output_dir)
if "config_file" in write_files:
logger.info("writing config YAML file")
self.write_config(fid=os.path.join(self.output_dir, config_fid),
overwrite=True)
if "station_list" in write_files or "all" in write_files:
fid = os.path.join(self.output_dir, station_fid)
logger.info("writing stations file")
logger.debug(fid)
write_pysep_station_file(
self.inv, self.event, fid,
order_station_list_by=order_station_list_by
)
if "inv" in write_files:
fid = os.path.join(self.output_dir, inv_fid)
logger.info("writing inventory as StationXML")
logger.debug(fid)
self.inv.write(fid, format="STATIONXML")
if "event" in write_files:
fid = os.path.join(self.output_dir, event_fid)
logger.info("writing event as QuakeML")
logger.debug(fid)
self.event.write(fid, format="QuakeML")
if "stream" in write_files:
fid = os.path.join(self.output_dir, stream_fid)
logger.info("writing waveform stream in MiniSEED")
logger.debug(fid)
with warnings.catch_warnings():
# ignore the encoding warning that comes from ObsPy
warnings.simplefilter("ignore")
self.st.write(fid, format="MSEED")
# Used for determining where to save SAC files
if self._legacy_naming:
_output_dir = self.output_dir
else:
_output_dir = os.path.join(self.output_dir, sac_subdir)
if "sac_raw" in write_files:
logger.info("writing RAW waveforms traces in SAC format")
self._write_sac(st=self.st_raw,
output_dir=os.path.join(_output_dir, "RAW"))
if "sac" in write_files:
logger.info("writing all waveforms traces in SAC format")
self._write_sac(st=self.st, output_dir=_output_dir)
# Allow outputting only certain components. If used together with 'sac',
# probably these will overwrite already written files
if "sac_zne" in write_files:
logger.info("writing ZNE waveforms traces in SAC format")
self._write_sac(st=self.st, output_dir=_output_dir,
components="ZNE")
if "sac_rtz" in write_files:
logger.info("writing RTZ waveforms traces in SAC format")
self._write_sac(st=self.st, output_dir=_output_dir,
components="RTZ")
if "sac_uvw" in write_files:
logger.info("writing UVW waveforms traces in SAC format")
self._write_sac(st=self.st, output_dir=_output_dir,
components="UVW")
[docs]
def _write_sac(self, st, output_dir=os.getcwd(), components=None):
"""
Write SAC files with a specific naming schema, which allows for both
legacy (old PySEP) or non-legacy (new PySEP) naming.
:type st: obspy.core.stream.Stream
:param st: Stream to be written
:type output_dir: str
:param output_dir: where to save the SAC files, defaults to the
current working directory
:type components: str
:param components: acceptable component values for saving files,
allows only saving subsets of the Stream. Example 'RTZNE' or
just 'R'. Must match against Trace.stats.component
"""
if not os.path.exists(output_dir):
os.makedirs(output_dir)
for tr in st:
if components and tr.stats.component not in components:
continue
if self._legacy_naming:
# Legacy: e.g., 20000101000000.NN.SSS.LL.CC.c
_trace_id = f"{tr.get_id()[:-1]}.{tr.get_id()[-1].lower()}"
if self.event_tag:
tag = f"{self.event_tag}.{_trace_id}"
else:
tag = _trace_id
else:
# New style: e.g., 2000-01-01T000000.NN.SSS.LL.CCC.sac
if self.event_tag:
tag = f"{self.event_tag}.{tr.get_id()}.sac"
else:
tag = f"{tr.get_id()}.sac"
fid = os.path.join(output_dir, tag)
logger.debug(os.path.basename(fid))
tr.write(fid, format="SAC")
[docs]
def write_config(self, fid=None, overwrite=False):
"""
Write a YAML config file based on the internal `Pysep` attributes.
Remove a few internal attributes (those containing data) before writing
and also change types on a few to keep the output file simple but
also re-usable for repeat queries.
:type fid: str
:param fid: name of the file to write. defaults to config.yaml
:type overwrite: bool
:param overwrite: if True and `fid` already exists, save a new config
file with the same name, overwriting the old file. if False
(default), throws a warning if encountering existing `fid` and does
not write config file
"""
if fid is None:
fid = f"pysep_config.yaml"
if not overwrite and os.path.exists(fid):
logger.warning(f"config '{fid}' already exists. use "
f"`-o/--overwrite` to write anyway.")
return
logger.debug(fid)
dict_out = vars(self)
# Drop hidden variables
dict_out = {key: val for key, val in dict_out.items()
if not key.startswith("_")}
# Internal attributes that don't need to go into the written config
attr_remove_list = ["st", "st_raw", "event", "inv", "c", "write_files",
"plot_files", "output_dir", "station_ids", "kwargs"]
if self.client.upper() != "LLNL":
attr_remove_list.append("llnl_db_path") # not important unless LLNL
for key in attr_remove_list:
del(dict_out[key])
# Write times in as strings not UTCDateTime
for key in ["origin_time", "reference_time"]:
val = dict_out[key]
if isinstance(val, UTCDateTime):
dict_out[key] = str(val)
with open(fid, "w") as f:
yaml.dump(dict_out, f, default_flow_style=False, sort_keys=False)
[docs]
def plot(self):
"""
Plot map and record section if requested. Allow general error catching
for mapping and record section plotting because we don't want these
auxiliary steps to crash the entire workflow since they are not
critical.
"""
show_map = self.kwargs.get("show_map", False)
show_rs = self.kwargs.get("show_rs", False)
if "map" in self.plot_files or "all" in self.plot_files:
logger.info("plotting source receiver map")
fid = os.path.join(self.output_dir, f"station_map.png")
try:
plot_source_receiver_map(self.inv, self.event, save=fid,
show=show_map)
except Exception as e: # NOQA
logger.warning(f"could not plot source receiver map: {e}")
if "record_section" in self.plot_files or "all" in self.plot_files:
fid = os.path.join(self.output_dir, f"record_section.png")
try:
# Default settings to create a general record section
rs = RecordSection(st=self.st, sort_by="distance",
scale_by="normalize", overwrite=True,
show=show_rs, save=fid)
rs.run()
except Exception as e: # NOQA
logger.warning("could not plot record section: {e}")
[docs]
def _event_tag_and_output_dir(self):
"""
Convenience function to establish and naming schema for files and
directories. Also takes care of making empty directories.
:rtype: tuple of str
:return: (unique event tag, path to output directory)
"""
# Default behavior, auto-generate event tag
if self._overwrite_event_tag is None:
# Options for choosing how to name things. Legacy or new-style
if self._legacy_naming:
logger.debug("reverting to legacy style file naming")
event_tag = format_event_tag_legacy(self.event)
else:
event_tag = format_event_tag(self.event)
# Either User turns off event tag so dump files directly to `output_dir`
# or User defines their own `event_tag`
else:
event_tag = self._overwrite_event_tag
full_output_dir = os.path.join(self._output_dir, event_tag)
logger.info(f"full output directory is: {full_output_dir}")
if not os.path.exists(full_output_dir):
os.makedirs(full_output_dir)
elif not self._overwrite:
logger.warning(f"output directory '{full_output_dir}' exists and "
f"overwrite flag (-o/--overwrite) not set, exiting")
sys.exit(0)
return event_tag, full_output_dir
[docs]
def _set_log_file(self, mode):
"""
Write logger to file as well as stdout, with the same format as the
stdout logger. Need mode==1 to move the log file after everything is
done because we don't know the event tag prior to starting the logs
:type mode: int
:param mode: Two options for using this function
0: set the logger to a temporary file 'pysep.log',
1: move the logger from the temporary file into final output dir
"""
save_log = self.kwargs.get("save_log", True)
log_file = self.kwargs.get("log_file", "pysep.log")
initial_log_path = os.path.join(self._output_dir, log_file)
if not save_log:
return
# Make the output directory that the log file will be saved in
if not os.path.exists(self._output_dir):
os.makedirs(self._output_dir)
if mode == 0:
fh = logging.FileHandler(initial_log_path)
fh.setFormatter(logger.handlers[0].formatter)
logger.addHandler(fh)
elif mode == 1:
# Should have been set by `_event_tag_and_output_dir()`
final_log_path = os.path.join(self.output_dir, log_file)
os.rename(src=initial_log_path, dst=final_log_path)
[docs]
def run(self, event=None, inv=None, st=None, **kwargs):
"""
Run PySEP: Seismogram Extraction and Processing. Steps in order are:
1) Set default parameters or load from config file
2) Check parameter validity, exit if unexpected values
3) Get data and metadata (QuakeML, StationXML, waveforms)
4) Remove unacceptable stations based on user-defined criteria
5) Remove unacceptable waveforms based on user-defined criteria
6) Generate some new metadata for tagging and output
7) Pre-process waveforms and standardize for general use
8) Generate output files and figures as end-product
:type event: obspy.core.event.Event
:param event: optional user-provided event object which will force a
skip over QuakeML/event searching
:type inv: obspy.core.inventory.Inventory
:param inv: optional user-provided inventory object which will force a
skip over StationXML/inventory searching
:type st: obspy.core.stream.Stream
:param st: optional user-provided strean object which will force a
skip over waveform searching
"""
self._set_log_file(mode=0)
logger.debug(f"running PySEP version {__version__}")
# Overload default parameters with event input file and check validity
self.load(**kwargs)
self.check()
self.c = self.get_client()
# Get QuakeML (event) metadata
if event is None:
self.event = self.get_event()
else:
self.event = event
self.event_tag, self.output_dir = self._event_tag_and_output_dir()
# Intermediate write of Config file and QuakeML
self.write(_subset=["config_file", "event"], **kwargs)
# Default method of retrieving waveforms/metadata from data center
if self.use_mass_download is False:
if inv is None:
self.inv = self.get_stations()
else:
self.inv = inv
self.inv = self.curtail_stations()
self.write(_subset=["inv"], **kwargs) # write out inventory
# Get waveforms, format and assess waveform quality
if st is None:
self.st_raw = self.get_waveforms()
else:
self.st_raw = st
# Use ObsPy's mass download option to gather all available data
else:
self.st_raw, self.inv = self.mass_download()
# Intermediate write of StationXML and raw waveforms
self.write(_subset=["sac_raw", "inv", "station_list"], **kwargs)
# Quality check and process the raw waveforms. `st` is an intermediate
# attribute and will NOT be written
self.st = quality_check_waveforms_before_processing(
self.st_raw, remove_clipped=self.remove_clipped
)
# Mark `st_raw` for deletion because we no longer need raw data. If the
# user wants it, it should have been written out
del self.st_raw
self.st = append_sac_headers(self.st, self.event, self.inv)
if self.taup_model is not None:
self.st = format_sac_header_w_taup_traveltimes(self.st,
self.taup_model,
self.phase_list)
# Waveform preprocessing and standardization
self.st = self.preprocess()
# Rotation to various orientations. The output stream will have ALL
# components, both rotated and non-rotated
if self.rotate is not None:
self.st = self.rotate_streams()
# Final quality checks on ALL waveforms before we write them out
self.st = quality_check_waveforms_after_processing(
self.st, remove_insufficient_length=self.remove_insufficient_length
)
# Write out the remainder files and make figures for user consumption
self.write(_subset=["stream", "sac", "sac_zne", "sac_rtz", "sac_uvw",
"weights_dist", "weights_az", "weights_code"],
**kwargs)
self.plot()
# Last minute move the log file into the output directory
self._set_log_file(mode=1)
[docs]
def parse_args():
"""
Define command line arguments. Allow user to set Pysep completely from
the command line (although that would be extremely cumbersome, best to
use Config files)
"""
parser = argparse.ArgumentParser(
description="PYSEP: Python Seismogram Extraction and Processing",
)
# Exposing some of the more useful parameters as public arguments
parser.add_argument("-c", "--config", default="", type=str, nargs="?",
help="path to a YAML config file which defines "
"parameters used to control PySEP")
parser.add_argument("-p", "--preset", default="", type=str, nargs="?",
help="Overwrites '-c/--config', use one of the default "
"in-repo config files which have been previously "
"designed. See 'pysep/pysep/configs' for options")
parser.add_argument("-e", "--event", default="", type=str, nargs="?",
help="Required if using '-p/--preset'. Each preset "
"config may define >1 event. This flag determines"
"which event to run. If `event`=='all', will "
"gather ALL events in the preset.")
parser.add_argument("-E", "--event_file", default="", type=str, nargs="?",
help="Allows using a single config file to gather"
"data for multiple events. Overwrites "
"'origin_time' parameter in the original config "
"file.")
parser.add_argument("-U", "--user", default=None, type=str, nargs="?",
help="Username if required to access IRIS webservices")
parser.add_argument("-P", "--password", default=None, type=str, nargs="?",
help="Password if required to access IRIS webservices")
parser.add_argument("-W", "--write", default=False, action="store_true",
help="Write out a blank configuration file to be "
"filled in by the User")
parser.add_argument("-l", "--list", default=False, action="store_true",
help="list out avaialable `preset` config options")
parser.add_argument("-L", "--log_level", default="DEBUG", type=str,
nargs="?", help="verbosity of logging: 'WARNING', "
"'INFO', 'DEBUG'")
parser.add_argument("--legacy_naming", default=False, action="store_true",
help="use the file naming schema and directory "
"structure of the legacy version of PySEP.")
parser.add_argument("--overwrite_event_tag", default=None, type=str,
nargs="?",
help="Manually set the event tag used to name the "
"output directory and SAC files. If None, will "
"default to a tag consisting of event origin time "
"and region, or just origin time if using "
"`--legacy_naming`")
parser.add_argument("-o", "--overwrite", default=False, action="store_true",
help="overwrite existing directory which matches "
"the unique event tag")
parser.add_argument("-v", "--version", default=False, action="store_true",
help="print current PySEP version number")
# Keyword arguments can be passed directly to the argparser in the same
# format as the above kwargs (e.g., --linewidth 2), but they will not have
# help messages or type checking
parsed, unknown = parser.parse_known_args()
for arg in unknown:
if arg.startswith(("-", "--")):
parser.add_argument(arg.split("=")[0])
return parser
[docs]
def get_data(config_file=None, event=None, inv=None, st=None, write_files=None,
plot_files=None, log_level=None, *args, **kwargs):
"""
Interactive/scripting function to run PySep and return quality controlled,
SAC-headed stream object which can then be used for other processes.
.. note::
By default turns file writing and plotting OFF so that this function
acts solely as a data collection/processing call.
.. note::
args and kwargs are passed directly to Pysep.__init__() so you can
define all your parameters in this call, or through a config file
.. rubric::
>>> from pysep import get_data
>>> st = get_data(config_file='config.yaml')
:type config_file: str
:param config_file: path to YAML config file which will overload any default
configs
:type event: obspy.core.event.Event
:param event: optional user-provided event object which will force a
skip over QuakeML/event searching
:type inv: obspy.core.inventory.Inventory
:param inv: optional user-provided inventory object which will force a
skip over StationXML/inventory searching
:type st: obspy.core.stream.Stream
:param st: optional user-provided strean object which will force a
skip over waveform searching
:type write_files: list or None
:param write_files: list of files to write, acceptable options defined in
write(). Defaults to None, no files will be written
:type plot_files: list or None
:param plot_files: list of files to plot, acceptable options defined in
plot(). Defaults to None, no figures will be made
:type log_level: str or None
:param log_level: verbosity of logger. Defaults to no logging to mimic
a standard function call rather than a standalone package
:rtype: tuple of (obspy.core.event.Event, obspy.core.inventory.Inventory,
obspy.core.stream.Stream)
:return: returns obspy objects defining data and metadata that have been
collected by PySEP
"""
sep = Pysep(config_file=config_file, write_files=write_files,
plot_files=plot_files, log_level=log_level, *args, **kwargs)
sep.run(event=event, inv=inv, st=st)
return sep.event, sep.inv, sep.st
[docs]
def _print_preset_configs():
"""
Print out a list of Config files that can be used to run PySEP
"""
pysep_dir = Path(__file__).parent.absolute()
filelist = glob(os.path.join(pysep_dir, "configs", "*", "*yaml"))
filelist = sorted([fid.split("/")[-2:] for fid in filelist])
print(f"-p/--preset -e/--event")
for i, fid in enumerate(filelist):
preset, event = fid
print(f"-p {preset} -e {event}")
[docs]
def _return_matching_preset_configs(args):
"""
Return a list of preset configs from the itnernal PySEP directory
Searches through the internal pysep/configs/*/*.yaml files for matching
user queries.
:type args: argparser args
:param args: command line argumments
:rtype: list
:return: matching config files that will be used to run PySEP
"""
pysep_dir = Path(__file__).parent.absolute()
preset_glob = os.path.join(pysep_dir, "configs", "*")
# Ignore the template config files, only look at directories
presets = [os.path.basename(_) for _ in glob(preset_glob)
if not _.endswith(".yaml")]
if not args.list:
assert (args.preset in presets), (
f"chosen preset (-p/--preset) {args.preset} not in available: "
f"{presets}"
)
event_glob = os.path.join(pysep_dir, "configs", args.preset, "*")
event_paths = sorted(glob(event_glob))
event_names = [os.path.basename(_) for _ in event_paths]
# 'event' arg can be an index (int) or a name (str)
try:
event_idx = int(args.event)
config_files = [event_paths[event_idx]]
except (ValueError, TypeError):
if args.event.upper() == "ALL":
config_files = event_paths
else:
assert (args.event in event_names), (
f"chosen event (-e/--event) must be an integer "
f"index < {len(event_names)} or one of: {event_names}"
)
config_files = [event_paths[event_names.index(args.event)]]
return config_files
[docs]
def main():
"""
Command-line-tool function which parses command line arguments and runs
data gathering and processing. This is accessed through the command line:
.. rubric::
$ pysep -c config.yaml
"""
config_files, events = [], None
parser = parse_args()
# Print help message if no arguments are given
if len(sys.argv) == 1:
parser.print_help()
sys.exit(0)
# Grab arguments from parser and continue
args = parser.parse_args()
if args.version:
print(__version__)
return
# Write out a blank configuration file to use as a template
if args.write:
Pysep().write_config(overwrite=args.overwrite)
return
# List out available configurations inside the repo
elif args.list:
_print_preset_configs()
return
# Choose one of the available preset configuration files to run
elif args.preset:
config_files = _return_matching_preset_configs(args)
# If not preset, user should specify the config file themselves
elif args.config:
assert(os.path.exists(args.config)), (
f"config file (-c/--config) {args.config} does not exist"
)
config_files = [args.config]
if args.event_file:
events = read_event_file(args.event_file)
# Or allow user to manually set all the input parameters through CLI
else:
config_files = [None]
if events is None:
# Normal run: parse through config files and run PySEP
for config_file in config_files:
sep = Pysep(config_file=config_file, **vars(args))
sep.run()
else:
# Event File run, use the same Config to parse through events
# NOTE: Does not allow a unique reference time, reference time is FORCED
# to be the origin time; ignores reference time in par file
logger.info(f"looping over {len(events)} events for event file run")
for event in events:
sep = Pysep(config_file=config_files[0],
origin_time=event["origin_time"],
event_latitude=event["event_latitude"],
event_longitude=event["event_longitude"],
event_depth_km=event["event_depth_km"],
event_magnitude=event["event_magnitude"],
**vars(args))
sep.run(overwrite_event=False)
if __name__ == "__main__":
main()