Source code for pysep.utils.io

"""
Read utilities for Pysep
"""
import os
import re
import yaml
import numpy as np

from obspy import UTCDateTime, Stream, Trace, read_events, Inventory, Catalog
from obspy.core.inventory.network import Network
from obspy.core.inventory.station import Station
from obspy.geodetics import gps2dist_azimuth
from obspy.core.event import Event, Origin, Magnitude

from pysep import logger
from pysep.utils.mt import moment_magnitude, seismic_moment
from pysep.utils.fmt import channel_code
from pysep.utils.cap_sac import append_sac_headers, append_sac_headers_cartesian


[docs] def read_yaml(fid): """ Read a YAML file and return a dictionary :type fid: str :param fid: YAML file to read from :rtype: dict :return: YAML keys and variables in a dictionary """ # work around PyYAML bugs yaml.SafeLoader.add_implicit_resolver( u'tag:yaml.org,2002:float', re.compile(u'''^(?: [-+]?(?:[0-9][0-9_]*)\\.[0-9_]*(?:[eE][-+]?[0-9]+)? |[-+]?(?:[0-9][0-9_]*)(?:[eE][-+]?[0-9]+) |\\.[0-9_]+(?:[eE][-+][0-9]+)? |[-+]?[0-9][0-9_]*(?::[0-5]?[0-9])+\\.[0-9_]* |[-+]?\\.(?:inf|Inf|INF) |\\.(?:nan|NaN|NAN))$''', re.X), list(u'-+0123456789.')) with open(fid, 'r') as f: config = yaml.safe_load(f) # Replace 'None' and 'inf' values to match expectations for key, val in config.items(): if val == "None": config[key] = None if val == "inf": config[key] = np.inf return config
[docs] def read_sem(fid, origintime="1970-01-01T00:00:00", source=None, stations=None, location="", precision=4, source_format="CMTSOLUTION"): """ Specfem3D outputs seismograms to ASCII (.sem? or .sem.ascii) files. Converts SPECFEM synthetics into ObsPy Stream objects with the correct header information. If `source` and `stations` files are also provided, PySEP will write appropriate SAC headers to the underlying data. :type fid: str :param fid: path of the given ascii file :type origintime: obspy.UTCDateTime :param origintime: UTCDatetime object for the origintime of the event. If None given, defaults to dummy value of '1970-01-01T00:00:00' :type source: str :param source: optional SPECFEM source file (e.g., CMTSOLUTION, SOURCE) defining the event which generated the synthetics. Used to grab event information and append as SAC headers to the ObsPy Stream :type stations: str :param stations: optional STATIONS file defining the station locations for the SPECFEM generated synthetics, used to generate SAC headers :type location: str :param location: location value for a given station/component :type precision: int :param precision: dt precision determined by differencing two adjancent time steps in the underlying ascii text file. :rtype st: obspy.Stream.stream :return st: stream containing header and data info taken from ascii file """ # This was tested up to SPECFEM3D Cartesian git version 6895e2f7 try: times = np.loadtxt(fname=fid, usecols=0) data = np.loadtxt(fname=fid, usecols=1) # At some point in 2018, the Specfem developers changed how the ascii files # were formatted from two columns to comma separated values, and repeat # values represented as 2*value_float where value_float represents the data # value as a float except ValueError: times, data = [], [] with open(fid, 'r') as f: lines = f.readlines() for line in lines: try: time_, data_ = line.strip().split(',') except ValueError: if "*" in line: time_ = data_ = line.split('*')[-1] else: raise ValueError times.append(float(time_)) data.append(float(data_)) times = np.array(times) data = np.array(data) # We assume that dt is constant after 'precision' decimal points delta = round(times[1] - times[0], precision) # Get metadata information from CMTSOLUTION and STATIONS files event = None if source is None: origintime = UTCDateTime(origintime) else: event = read_events_plus(source, format=source_format)[0] origintime = event.preferred_origin().time logger.info(f"reading origintime from event: {origintime}") # Honor that Specfem doesn't start exactly on 0 due to USER_T0 origintime += times[0] # SPECFEM2D/SPECFEM3D_Cartesian style name format, e.g., NZ.BFZ.BXE.semd OR # SPECFEM3D_Globe style name format, e.g., TA.O20K.BXR.sem.ascii net, sta, cha, fmt, *_ = os.path.basename(fid).split(".") stats = {"network": net, "station": sta, "location": location, "channel": cha, "starttime": origintime, "npts": len(data), "delta": delta, "mseed": {"dataquality": 'D'}, "format": fmt } st = Stream([Trace(data=data, header=stats)]) if event and stations: try: # `read_stations` will throw a ValueError for Cartesian coordinates inv = read_stations(stations) st = append_sac_headers(st, event, inv) except ValueError as e: # If Cartesian coordinate system, slightly different header approach st = append_sac_headers_cartesian(st, event, stations) # Broad catch here as this is an optional step that might not always # work or be possible except Exception as e: logger.warning(f"could not append SAC header to trace because {e}") return st
[docs] def read_events_plus(fid, format, **kwargs): """ Addition to the base ObsPy.read_events() function that, in addition to the acceptable formats read by ObsPy, can also read the following: * SPECFEM2D SOURCE * SPECFEM3D/3D_GLOBE FORCESOLUTION * SPECFEM3D/3D_GLOBE CMTSOLUTION (both geographic and non-geographic) See the following link for acceptable ObsPy formats: See the following link for acceptable ObsPy formats: https://docs.obspy.org/packages/autogen/obspy.core.event.read_events.html :type fid: str :param fid: full path to the event file to be read :type format: str :param format: Expected format of the file (case-insensitive), available are - SOURCE - FORCESOLUTION - CMTSOLUTION - any of ObsPy's accepted arguments for ObsPy.read_events() :rtype: obspy.core.catalog.Catalog :return: Catalog which should only contain one event, read from the `fid` for the given `fmt` (format) """ format = format.upper() # Allow input of various types of source files not allowed in ObsPy if format == "SOURCE": cat = Catalog(events=[read_specfem2d_source(fid)]) elif format == "FORCESOLUTION": cat = Catalog(events=[read_forcesolution(fid)]) # ObsPy can handle QuakeML, CMTSOLUTION, etc. else: try: cat = read_events(fid, format=format, **kwargs) except ValueError: # ObsPy throws an error when trying to read CMTSOLUTION files that # are not defined on geographic coordinates (i.e., Cartesian) try: cat = Catalog( events=[read_specfem3d_cmtsolution_cartesian(fid)] ) except Exception as e: raise ValueError(f"unexpected source format {format} for {fid}") return cat
[docs] def read_specfem3d_cmtsolution_cartesian(path_to_cmtsolution): """ Create a barebones ObsPy Event object from a SPECFEM3D CMTSOLUTION file with coordinates defined in cartesian. Required because ObsPy read_events() will throw a ValueError when CMTSOLUTIONS do not have geographically defined coordinates. """ with open(path_to_cmtsolution, "r") as f: lines = f.readlines() # First line contains meta information about event _, year, month, day, hour, minute, sec, lat, lon, depth, mb, ms, *_ = \ lines[0].strip().split() origin_time = UTCDateTime(f"{year}-{month}-{day}T{hour}:{minute}:{sec}") # Remaining lines contain information on event, some useful some not source_dict = {} for line in lines[1:]: # Skip comments and newlines if line.startswith("#") or line == "\n": continue try: key, val = line.strip().split(":") # Lines that do not conform to 'key: val' will be ignored except ValueError: continue # Strip trailing comments from values source_dict[key.strip()] = val.strip() origin = Origin( resource_id=_get_resource_id(source_dict["event name"], "origin", tag="source"), time=origin_time, longitude=source_dict["longorUTM"], latitude=source_dict["latorUTM"], depth=abs(float(source_dict["depth"]) * 1E3) # units: m ) magnitude = Magnitude( resource_id=_get_resource_id(source_dict["event name"], "magnitude"), mag=ms, magnitude_type="Ms", origin_id=origin.resource_id.id ) event = Event(resource_id=_get_resource_id(name=source_dict["event name"], res_type="event")) event.origins.append(origin) event.magnitudes.append(magnitude) event.preferred_origin_id = origin.resource_id.id event.preferred_magnitude_id = magnitude.resource_id.id return event
[docs] def read_specfem2d_source(path_to_source, origin_time=None): """ Create a barebones ObsPy Event object from a SPECFEM2D Source file, which only contains information required by Pyatoa. Only requires access to: event.preferred_origin(), event.preferred_magnitude() and event.preferred_moment_tensor(). Moment tensor is wrapped in try-except so we only need origin and magnitude. Modified from: https://docs.obspy.org/master/_modules/obspy/io/cmtsolution/ core.html#_internal_read_cmtsolution .. note:: Source files do not provide origin times so we just provide an arbitrary value but allow user to set time """ # First set dummy origin time if origin_time is None: origin_time = "1970-01-01T00:00:00" logger.warning("no origin time set for SPECFEM2D source, setting " f"dummy value: {origin_time}") with open(path_to_source, "r") as f: lines = f.readlines() # First line expected to be e.g.,: '## Source 1' source_name = lines[0].strip().split()[-1] source_dict = {} for line in lines: # Skip comments and newlines if line.startswith("#") or line == "\n": continue key, val, *_ = line.strip().split("=") # Strip trailing comments from values val = val.split("#")[0].strip() source_dict[key.strip()] = val.strip() origin = Origin( resource_id=_get_resource_id(source_name, "origin", tag="source"), time=origin_time, longitude=source_dict["xs"], latitude=source_dict["xs"], depth=source_dict["zs"] ) # Attempt to calculate the moment magnitude from the moment tensor # components. This might fail because the values don't make sense or are # not provided try: moment = seismic_moment(mt=[float(source_dict["Mxx"]), float(source_dict["Mzz"]), float(source_dict["Mxz"]) ]) moment_mag = moment_magnitude(moment=moment) except Exception as e: moment_mag = None magnitude = Magnitude( resource_id=_get_resource_id(source_name, "magnitude"), mag=moment_mag, magnitude_type="Mw", origin_id=origin.resource_id.id ) event = Event(resource_id=_get_resource_id(name=source_name, res_type="event")) event.origins.append(origin) event.magnitudes.append(magnitude) event.preferred_origin_id = origin.resource_id.id event.preferred_magnitude_id = magnitude.resource_id.id return event
[docs] def read_forcesolution(path_to_forcesolution, origin_time="2000-01-01T00:00:00"): """ Create a barebones Source object from a FORCESOLUTION Source file, which mimics the behavior of the more complex ObsPy Event object and can be used in the same way as an Event object. .. note:: Designed to read FORCESOLUTION files from SPECFEM3D/3D_GLOBE, which all have slightly different formats/keys :type path_to_forcesolution: str :param path_to_forcesolution: path to the FORCESOLUTION file :type origin_time: str :param origin_time: FORCESOLUTION files do not natively contain any information on origin time, which is required by ObsPy Event objects. The User can provide this information if it is important, or a default value of 2000-01-01T00:00:00 will be provided as a dummy variable to keep ObsPy happy :rtype: obspy.core.event.Event :return: Barebones ObsPy Event object which contains hypocentral location of FORCE, and the origin time defined by `origin_time` :raises KeyError: if the minimum required keys are not found in the file defined by `path_to_source` """ with open(path_to_forcesolution, "r") as f: lines = f.readlines() origin_time = UTCDateTime(origin_time) # Grab information from the file source_dict = {} for line in lines[:]: if ":" not in line: continue key, val = line.strip().split(":") val = val.split("!")[0].strip() # remove trailing comments source_dict[key] = val # First line contains the name of the source source_dict["event name"] = lines[0].strip().split()[-1] # Latitude/Y value differs for 3D/3D_GLOBE for key in ["latitude", "latorUTM"]: try: latitude = source_dict[key] break except KeyError: continue else: raise KeyError("cannot find matching key for `latitude` in file") # Longitude/X value differs for 3D/3D_GLOBE for key in ["longitude", "longorUTM"]: try: longitude = source_dict[key] break except KeyError: continue else: raise KeyError("cannot find matching key for `longitude` in file") if "depth" not in source_dict: raise KeyError("cannot find matching key for `depth` in file") origin = Origin( resource_id=_get_resource_id(source_dict["event name"], "origin", tag="source"), time=origin_time, longitude=longitude, latitude=latitude, depth=abs(float(source_dict["depth"]) * 1E3) # units: m ) event = Event(resource_id=_get_resource_id(name=source_dict["event name"], res_type="event")) event.origins.append(origin) event.preferred_origin_id = origin.resource_id.id return event
[docs] def _get_resource_id(name, res_type, tag=None): """ Helper function to create consistent resource ids, from ObsPy. Used to create resource ID's when generating Event objects """ res_id = f"smi:local/source/{name:s}/{res_type:s}" if tag is not None: res_id += "#" + tag return res_id
[docs] def read_stations(path_to_stations): """ Convert a SPECFEM STATIONS file into an ObsPy Inventory object. Specfem3D STATION files contain no channel or location information, so the inventory can only go down to the station level. .. note:: This assumes a row structure for the station file is STA, NET, LAT [deg], LON [deg], ELEVATION [m], BURIAL [m] :type path_to_stations: str :param path_to_stations: the path to the STATIONS file that is associated with the Specfem3D DATA directory :rtype: obspy.core.inventory.Inventory :return: a station-level Inventory object :raises ValueError: if latitude and longitude values are not in geographic coordinates (i.e., in cartesian coordinates). Thrown by the init of the Station class. """ stations = np.loadtxt(path_to_stations, dtype="str") if stations.size == 0: return Inventory() # Get all the unique network names, try-except to catch when there is only # one station in the file try: networks = {_: [] for _ in np.unique(stations[:, 1])} except IndexError: networks = {stations[1]: []} stations = [stations] for sta in stations: # Parse the station information station_ = sta[0] network_ = sta[1] latitude_ = float(sta[2]) longitude_ = float(sta[3]) elevation_ = float(sta[4]) burial_ = float(sta[5]) # burial isnt an option in ObsPy, not used # Create the station object, temp store in a network station = Station(code=station_, latitude=latitude_, longitude=longitude_, elevation=elevation_, creation_date=UTCDateTime() ) networks[network_].append(station) # Create the network objects list_of_networks = [] for network, stations in networks.items(): list_of_networks.append(Network(code=network, stations=stations)) return Inventory(networks=list_of_networks, source="PySEP")
[docs] def read_event_file(fid): """ Read an event input file which is just a text file that should contain information on different events and their hypocenters :type fid: str :param fid: event input file :rtype: list of dict :return: parsed in event information """ list_out = [] with open(fid, "r") as f: lines = f.readlines() for line in lines: # Commented out lines are skipped if line.strip().startswith("#"): continue origin_time, longitude, latitude, depth_km, mag = line.strip().split() dict_out = {"origin_time": UTCDateTime(origin_time), "event_longitude": float(longitude), "event_latitude": float(latitude), "event_depth_km": float(depth_km), "event_magnitude": float(mag) } list_out.append(dict_out) return list_out
[docs] def read_asdfdataset(path, evaluation): """ Read an ASDFDataSet created by a SeisFlows Pyaflowa inversion run. The dataset should contain observed and synthetic waveforms, and optionally contains misfit windows. Will return Streams with SAC headers .. note:: This function makes assumptions about the PyASDF data structure which is defined by the external package Pyatoa. If Pyatoa changes that structure, this function will break. :type path: str :param path: Path to the ASDF dataset to be read :type evaluation: str :param evaluation: evaluation to take synthetics from. These are saved following a format specified by Pyatoa, but usually follow the form iteration/step_count, e.g., i01s00 gives iteration 1, step count 0. Take a look at the waveform tags in `ASDFDataSet.waveforms[<station>]` for tags following the 'synthetic_' prefix """ # PySEP, by default will not require PyASDF to be installed try: from pyasdf import ASDFDataSet # NOQA except ImportError: logger.critical("pyasdf is not installed. Please install pyasdf " "to read ASDF datasets") return None, None with ASDFDataSet(path) as ds: event = ds.events[0] st_out = Stream() st_syn_out = Stream() for station in ds.waveforms.list(): inv = ds.waveforms[station].StationXML st = ds.waveforms[station].observed st_syn = ds.waveforms[station][f"synthetic_{evaluation}"] st_out += append_sac_headers(st, event, inv) st_syn_out += append_sac_headers(st_syn, event, inv) # Read windows from the dataset windows = {} if hasattr(ds.auxiliary_data, "MisfitWindows"): iter_ = evaluation[:3] # 'i01s00' -> 'i01' step = evaluation[3:] for station in ds.auxiliary_data.MisfitWindows[iter_][step].list(): parameters = ds.auxiliary_data.MisfitWindows[iter_][step][ station].parameters trace_id = parameters["channel_id"] starttime = parameters["relative_starttime"] endtime = parameters["relative_endtime"] # initialize empty window array if trace_id not in windows: windows[trace_id] = [] windows[trace_id].append((starttime, endtime)) return st_out, st_syn_out, windows
[docs] def write_sem(st, unit, path="./", time_offset=0): """ Write an ObsPy Stream in the two-column ASCII format that Specfem uses :type st: obspy.core.stream.Stream :param st: stream containing synthetic data to be written :type unit: str :param unit: the units of the synthetic data, used for file extension, must be 'd', 'v', 'a' for displacement, velocity, acceleration, resp. :type path: str :param path: path to save data to, defaults to cwd :type time_offset: float :param time_offset: optional argument to offset the time array. Sign matters e.g. time_offset=-20 means t0=-20 """ assert(unit.lower() in ["d", "v", "a"]), "'unit' must be 'd', 'v' or 'a'" for tr in st: s = tr.stats fid = f"{s.network}.{s.station}.{channel_code(s.delta)}X{s.channel[-1]}" fid = os.path.join(path, f"{fid}.sem{unit.lower()}") data = np.vstack((tr.times() + time_offset, tr.data)).T np.savetxt(fid, data, ["%13.7f", "%17.7f"])
[docs] def write_pysep_station_file(inv, event, fid="./station_list.txt", order_station_list_by=None): """ Write a list of station codes, distances, etc. useful for understanding characterization of all collected stations :type event: obspy.core.event.Event :param event: optional user-provided event object which will force a skip over QuakeML/event searching :type inv: obspy.core.inventory.Inventory :param inv: optional user-provided inventory object which will force a skip over StationXML/inventory searching :type fid: str :param fid: name of the file to write to. defaults to ./station_list.txt :type order_station_list_by: str :param order_station_list_by: how to order the stations written to file. Available are: network, station, latitude, longitude, elevation, burial. If not given, order is determined by the internal sorting of the input Inventory """ # Key indices correspond to stations list keys = ["station", "network", "latitude", "longitude", "distance", "azimuth"] if order_station_list_by and order_station_list_by not in keys: logger.warning(f"`order_station_by` must be in {keys}, " f"setting default") order_station_by = None event_latitude = event.preferred_origin().latitude event_longitude = event.preferred_origin().longitude stations = [] for net in inv: for sta in net: dist_m, az, baz = gps2dist_azimuth(lat1=event_latitude, lon1=event_longitude, lat2=sta.latitude, lon2=sta.longitude ) dist_km = dist_m * 1E-3 stations.append([sta.code, net.code, sta.latitude, sta.longitude, dist_km, az]) # Set the order of the station file based on the order of keys if order_station_list_by: idx = keys.index(order_station_list_by) stations.sort(key=lambda x: x[idx]) with open(fid, "w") as f: for s in stations: # Order follows that listed in 'keys' f.write(f"{s[0]:<6} {s[1]:<2} {s[2]:9.4f} {s[3]:9.4f} {s[4]:8.3f} " f"{s[5]:6.2f}\n")
[docs] def write_stations_file(inv, fid="./STATIONS", order_by=None, use_elevation=False, burials=None): """ Write a SPECFEM STATIONS file given an ObsPy inventory object .. note:: If topography is implemented in your mesh, elevation values should be set to 0 which means 'surface' in SPECFEM. .. note:: burial information is not contained in the ObsPy inventory so it is always set to a constant value, which can be given as input. default 0 :type inv: obspy.core.inventory.Inventory :param inv: Inventory object with station locations to write :type fid: str :param fid: path and file id to save the STATIONS file to. :type order_by: str :param order_by: how to order the stations written to file. Available are: network, station, latitude, longitude, elevation, burial. If not given, order is determined by the internal sorting of the input Inventory :type use_elevation: bool :param use_elevation: if True, sets the actual elevation value from the inventory. if False, sets elevation to 0. :type burials: list of float :param burials: If the User has burial information they want to be used as the last column. Length of `burials` must match the number of stations in the inventory when traversing by network and station """ # Simply generate lists by traversing through the inventory i = 0 stations = [] keys = ["network", "station", "latitude", "longitude", "elevation", "burial"] if order_by: assert(order_by in keys), f"`order_by` must be in {keys}" with open(fid, "w") as f: for net in inv: for sta in net: if use_elevation: elevation = sta.elevation else: elevation = 0. if burials: burial = burials[i] else: burial = 0. stations.append([sta.code, net.code, sta.latitude, sta.longitude, elevation, burial]) i += 1 # Set the order of the station file based on the order of keys if order_by: idx = keys.index(order_by) stations.sort(key=lambda x: x[idx]) with open(fid, "w") as f: for s in stations: f.write(f"{s[0]:>6}{s[1]:>6}{s[2]:12.4f}{s[3]:12.4f}" f"{s[4]:7.1f}{s[5]:7.1f}\n" )
[docs] def write_cat_to_event_list(cat, fid_out="event_input.txt"): """ Writes out an event Catalog (ObsPy object) information to an ASCII file that PySEP can use for collecting data. The format of the output file is ORIGIN_TIME LONGITUDE LATITUDE DEPTH[KM] MAGNITUDE :type cat: obspy.core.catalog.Catalog :param cat: Catalog of events to write out :type fid_out: str :param fid_out: name of the output text file to be written. Defaults to 'event_input.txt' """ with open(fid_out, "w") as f: for event in cat: origintime = str(event.preferred_origin().time) longitude = event.preferred_origin().longitude latitude = event.preferred_origin().latitude depth = event.preferred_origin().depth * 1E-3 mag = event.preferred_magnitude().mag f.write(f"{origintime:<31}{longitude:8.2f}{latitude:8.2f}" f"{depth:8.2f}{mag:6.1f}\n")