"""
The main functions for SEED-vault, from original CLI-only version (Pickle 2024)
"""
import os
import sys
import copy
import time
import fnmatch
import sqlite3
from datetime import datetime,timedelta,timezone
import configparser
import pandas as pd
import numpy as np
import threading
import random
from typing import Any, Dict, List, Tuple, Optional, Union
from collections import defaultdict
import warnings
from obspy import UTCDateTime
from obspy.core.stream import Stream
from obspy.core.stream import read as streamread
from obspy.core.inventory import read_inventory,Inventory
from obspy.core.event import read_events,Event,Catalog
from obspy.clients.fdsn import Client
from obspy.taup import TauPyModel
from obspy.geodetics.base import locations2degrees,gps2dist_azimuth
from obspy.clients.fdsn.header import URL_MAPPINGS, FDSNNoDataException
from obspy.io.mseed.headers import InternalMSEEDWarning
warnings.filterwarnings("ignore", category=InternalMSEEDWarning)
from seed_vault.models.config import SeismoLoaderSettings, SeismoQuery
from seed_vault.enums.config import DownloadType, GeoConstraintType
from seed_vault.service.utils import is_in_enum,get_sds_filenames,to_timestamp,\
filter_inventory_by_geo_constraints,filter_catalog_by_geo_constraints,format_error
from seed_vault.service.db import DatabaseManager,stream_to_db_elements,miniseed_to_db_elements,\
populate_database_from_sds,populate_database_from_files,populate_database_from_files_dumb
from seed_vault.service.waveform import get_local_waveform, stream_to_dataframe
[docs]
class CustomConfigParser(configparser.ConfigParser):
"""
Custom configuration parser that can preserve case sensitivity for specified sections.
This class extends the standard ConfigParser to allow certain sections to maintain
case sensitivity while others are converted to lowercase.
Attributes:
case_sensitive_sections (set): Set of section names that should preserve case sensitivity.
"""
def __init__(self, *args, **kwargs):
"""
Initialize the CustomConfigParser.
Args:
*args: Variable length argument list passed to ConfigParser.
**kwargs: Arbitrary keyword arguments passed to ConfigParser.
"""
self.case_sensitive_sections = set()
super().__init__(*args, **kwargs)
[docs]
def read_config(config_file: str) -> CustomConfigParser:
"""
Read and process a configuration file with case-sensitive handling for specific sections.
Reads a configuration file and processes it such that certain sections
(AUTH, DATABASE, SDS, WAVEFORM) preserve their case sensitivity while
other sections are converted to lowercase.
Args:
config_file: Path to the configuration file to read.
Returns:
CustomConfigParser: Processed configuration with appropriate case handling
for different sections.
Example:
>>> config = read_config("config.ini")
>>> auth_value = config.get("AUTH", "ApiKey") # Case preserved
>>> other_value = config.get("settings", "parameter") # Converted to lowercase
"""
config = CustomConfigParser(allow_no_value=True)
config.read(config_file)
processed_config = CustomConfigParser(allow_no_value=True)
for section in config.sections():
processed_config.add_section(section)
for key, value in config.items(section):
if section.upper() in ['AUTH', 'DATABASE', 'SDS', 'WAVEFORM']:
processed_key = key
processed_value = value if value is not None else None
else:
processed_key = key.lower()
processed_value = value.lower() if value is not None else None
processed_config.set(section, processed_key, processed_value)
return processed_config
[docs]
def collect_requests(inv, time0, time1, days_per_request=3,
cha_pref=None, loc_pref=None):
"""
Generate time-windowed data requests for all channels in an inventory.
Creates a list of data requests by breaking a time period into smaller windows
and collecting station metadata for each window. Can optionally filter for
preferred channels and location codes.
Args:
inv (obspy.core.inventory.Inventory): Station inventory to generate requests for
time0 (obspy.UTCDateTime): Start time for data requests
time1 (obspy.UTCDateTime): End time for data requests
days_per_request (int, optional): Length of each request window in days.
Defaults to 3.
cha_pref (list, optional): List of preferred channel codes in priority order.
If provided, only these channels will be requested. Defaults to None.
loc_pref (list, optional): List of preferred location codes in priority order.
If provided, only these location codes will be requested. Defaults to None.
Returns:
list or None: List of tuples containing request parameters:
(network_code, station_code, location_code, channel_code,
start_time_iso, end_time_iso)
Returns None if start time is greater than or equal to end time.
Notes:
- End time is capped at 120 seconds before current time
- Times in returned tuples are ISO formatted strings with 'Z' suffix
- Uses get_preferred_channels() if cha_pref or loc_pref are specified
Examples:
>>> from obspy import UTCDateTime
>>> t0 = UTCDateTime("2020-01-01")
>>> t1 = UTCDateTime("2020-01-10")
>>> requests = collect_requests(inventory, t0, t1,
... days_per_request=2,
... cha_pref=['HHZ', 'BHZ'],
... loc_pref=['', '00'])
"""
requests = [] # network, station, location, channel, starttime, endtime
# Sanity check request times
time1 = min(time1, UTCDateTime.now()-120)
if time0 >= time1:
return None
sub_inv = inv.select(time=time0)
current_start = time0
while current_start < time1:
current_end = min(current_start \
+ timedelta(days=days_per_request, microseconds=-1), time1)
if cha_pref or loc_pref:
sub_inv = get_preferred_channels(inv, cha_pref, loc_pref, current_start)
# Collect requests for the best channels
for net in sub_inv:
for sta in net:
for cha in sta:
requests.append((
net.code,
sta.code,
cha.location_code,
cha.code,
current_start.isoformat() + "Z",
current_end.isoformat() + "Z" ))
current_start = current_end + timedelta(microseconds=1)
return requests
[docs]
def get_p_s_times(eq, dist_deg, ttmodel):
"""
Calculate theoretical P and S wave arrival times for an earthquake at a given distance.
Uses a travel time model to compute the first P and S wave arrivals for a given
earthquake and distance. The first arrival (labeled as "P") may not necessarily be
a direct P wave. For S waves, only phases explicitly labeled as 'S' are considered.
Args:
eq (obspy.core.event.Event): Earthquake event object containing origin time
and depth information
dist_deg (float): Distance between source and receiver in degrees
ttmodel (obspy.taup.TauPyModel): Travel time model to use for calculations
Returns:
tuple: A tuple containing:
- (UTCDateTime or None): Time of first arrival ("P" wave)
- (UTCDateTime or None): Time of first S wave arrival
Returns (None, None) if travel time calculation fails
Notes:
- Earthquake depth is expected in meters in the QuakeML format and is
converted to kilometers for the travel time calculations
- For S waves, only searches for explicit 'S' phase arrivals
- Warns if no P arrival is found at any distance
- Warns if no S arrival is found at distances ≤ 90 degrees
Examples:
>>> from obspy.taup import TauPyModel
>>> model = TauPyModel(model="iasp91")
>>> p_time, s_time = get_p_s_times(earthquake, 45.3, model)
"""
eq_time = eq.origins[0].time
eq_depth = eq.origins[0].depth / 1000 # depths are in meters for QuakeML
try:
phasearrivals = ttmodel.get_travel_times(
source_depth_in_km=eq_depth,
distance_in_degree=dist_deg,
phase_list=['ttbasic']
)
except Exception as e:
print(f"Error calculating travel times:\n {str(e)}")
return None, None
p_arrival_time = None
s_arrival_time = None
# "P" is whatever the first arrival is.. not necessarily literally uppercase P
if phasearrivals[0]:
p_arrival_time = eq_time + phasearrivals[0].time
# Now get "S"...
for arrival in phasearrivals:
if arrival.name.upper() == 'S' and s_arrival_time is None:
s_arrival_time = eq_time + arrival.time
if p_arrival_time and s_arrival_time:
break
if p_arrival_time is None:
print(f"No direct P-wave arrival found for distance {dist_deg} degrees")
if s_arrival_time is None and dist_deg <= 90:
print(f"No direct S-wave arrival found for distance {dist_deg} degrees (event {eq_time})")
return p_arrival_time, s_arrival_time
[docs]
def select_highest_samplerate(inv, minSR=10, time=None):
"""
Filters an inventory to keep only the highest sample rate channels where duplicates exist.
For each station in the inventory, this function identifies duplicate channels (those sharing
the same location code) and keeps only those with the highest sample rate. Channels must
meet the minimum sample rate requirement to be considered.
Args:
inv (obspy.core.inventory.Inventory): Input inventory object
minSR (float, optional): Minimum sample rate in Hz. Defaults to 10.
time (obspy.UTCDateTime, optional): Specific time to check channel existence.
If provided, channels are considered duplicates if they share the same
location code and both exist at that time. If None, channels are considered
duplicates if they share the same location code and time span. Defaults to None.
Returns:
obspy.core.inventory.Inventory: Filtered inventory containing only the highest
sample rate channels where duplicates existed.
Examples:
>>> # Filter inventory keeping only highest sample rate channels
>>> filtered_inv = select_highest_samplerate(inv)
>>>
>>> # Filter for a specific time, minimum 1 Hz
>>> from obspy import UTCDateTime
>>> time = UTCDateTime("2020-01-01")
>>> filtered_inv = select_highest_samplerate(inv, minSR=1, time=time)
Notes:
- Channel duplicates are determined by location code and either:
* Existence at a specific time (if time is provided)
* Having identical time spans (if time is None)
- All retained channels must have sample rates >= minSR
- For duplicate channels, all channels with the highest sample rate are kept
"""
if time:
inv = inv.select(time=time)
for net in inv:
for sta in net:
channels = [ch for ch in sta.channels if ch.sample_rate >= minSR]
loc_groups = {}
for channel in channels:
loc_code = channel.location_code
if loc_code not in loc_groups:
loc_groups[loc_code] = []
loc_groups[loc_code].append(channel)
filtered_channels = []
for loc_group in loc_groups.values():
if len(loc_group) == 1:
filtered_channels.extend(loc_group)
continue
if time:
active_channels = [ch for ch in loc_group]
if active_channels:
max_sr = max(ch.sample_rate for ch in active_channels)
filtered_channels.extend([ch for ch in active_channels if ch.sample_rate == max_sr])
else:
time_groups = {}
for channel in loc_group:
time_key = f"{channel.start_date}_{channel.end_date}"
if time_key not in time_groups:
time_groups[time_key] = []
time_groups[time_key].append(channel)
for time_group in time_groups.values():
if len(time_group) > 1:
max_sr = max(ch.sample_rate for ch in time_group)
filtered_channels.extend([ch for ch in time_group if ch.sample_rate == max_sr])
else:
filtered_channels.extend(time_group)
sta.channels = filtered_channels
return inv
[docs]
def get_preferred_channels(
inv: Inventory,
cha_rank: Optional[List[str]] = None,
loc_rank: Optional[List[str]] = None,
time: Optional[UTCDateTime] = None
) -> Inventory:
"""Select the best available channels from an FDSN inventory based on rankings.
Filters an inventory to keep only the preferred channels based on channel code
and location code rankings. For each component (Z, N, E), selects the channel
with the highest ranking.
Args:
inv: ObsPy Inventory object to filter.
cha_rank: List of channel codes in order of preference (e.g., ['BH', 'HH']).
Lower index means higher preference.
loc_rank: List of location codes in order of preference (e.g., ['', '00']).
Lower index means higher preference. '--' is treated as empty string.
time: Optional time to filter channel availability at that time.
Returns:
Filtered ObsPy Inventory containing only the preferred channels.
If all channels would be filtered out, returns original station.
Note:
Channel preference takes precedence over location preference.
If neither cha_rank nor loc_rank is provided, returns original inventory.
Example:
>>> inventory = client.get_stations(network="IU", station="ANMO")
>>> cha_rank = ['BH', 'HH', 'EH']
>>> loc_rank = ['00', '10', '']
>>> filtered = get_preferred_channels(inventory, cha_rank, loc_rank)
"""
if not cha_rank and not loc_rank:
return inv
# Convert '--' location codes to empty string
if loc_rank:
loc_rank = [lc if lc != '--' else '' for lc in loc_rank]
new_inv = Inventory(networks=[], source=inv.source)
if time:
inv = inv.select(time=time)
for net in inv:
new_net = net.copy()
new_net.stations = []
for sta in net:
new_sta = sta.copy()
new_sta.channels = []
# Group channels by component (e.g. Z, N, E, 1, 2)
components = defaultdict(list)
for chan in sta:
comp = chan.code[-1]
components[comp].append(chan)
# Select best channel for each component
for chan_list in components.values():
best_chan = None
best_cha_rank = float('inf')
best_loc_rank = float('inf')
for chan in chan_list:
if not chan.is_active(time):
continue
cha_code = chan.code[:-1]
# Get ranking positions
cha_position = len(cha_rank) if cha_rank is None else \
len(cha_rank) if cha_code not in cha_rank else cha_rank.index(cha_code)
loc_position = len(loc_rank) if loc_rank is None else \
len(loc_rank) if chan.location_code not in loc_rank else loc_rank.index(chan.location_code)
# Update if better ranking found
if (cha_position < best_cha_rank or
(cha_position == best_cha_rank and loc_position < best_loc_rank)):
best_chan = chan
best_cha_rank = cha_position
best_loc_rank = loc_position
if best_chan is not None:
new_sta.channels.append(best_chan)
# Keep original if no channels passed filtering
if new_sta.channels:
new_net.stations.append(new_sta)
else:
new_net.stations.append(sta)
if new_net.stations:
new_inv.networks.append(new_net)
return new_inv
[docs]
def collect_requests_event(
eq: Event,
inv: Inventory,
model: Optional[TauPyModel] = None,
settings: Optional[SeismoLoaderSettings] = None
) -> Tuple[List[Tuple[str, str, str, str, str, str]],
List[Tuple[Any, ...]],
Dict[str, float]]:
"""
Collect data requests and arrival times for an event at multiple stations.
For a given earthquake event, calculates arrival times and generates data
requests for all appropriate stations in the inventory.
Args:
eq: ObsPy Event object containing earthquake information.
inv: ObsPy Inventory object containing station information.
model: Optional TauPyModel for travel time calculations.
If None, uses model from settings or falls back to IASP91.
settings: Optional SeismoLoaderSettings object containing configuration.
Returns:
Tuple containing:
- List of request tuples (net, sta, loc, chan, start, end)
- List of arrival data tuples for database
- Dictionary mapping "net.sta" to P-arrival timestamps
Note:
Requires a DatabaseManager instance to check for existing arrivals.
Time windows are constructed around P-wave arrivals using settings.
Handles both new calculations and retrieving existing arrival times.
Example:
>>> event = client.get_events()[0]
>>> inventory = client.get_stations(network="IU")
>>> requests, arrivals, p_times = collect_requests_event(
... event, inventory, model=TauPyModel("iasp91")
... )
"""
settings, db_manager = setup_paths(settings)
# Extract settings
model_name = settings.event.model
before_p_sec = settings.event.before_p_sec
after_p_sec = settings.event.after_p_sec
min_radius = settings.event.min_radius
max_radius = settings.event.max_radius
highest_sr_only = settings.station.highest_samplerate_only
cha_pref = settings.waveform.channel_pref
loc_pref = settings.waveform.location_pref
origin = eq.origins[0]
ot = origin.time
sub_inv = inv.select(time=ot)
if highest_sr_only:
sub_inv = select_highest_samplerate(sub_inv, minSR=5)
if cha_pref or loc_pref:
sub_inv = get_preferred_channels(sub_inv, cha_pref, loc_pref)
# Ensure 1D vel model is loaded
if not model:
try:
model = TauPyModel(model_name.upper())
except Exception:
model = TauPyModel('IASP91')
requests_per_eq = []
arrivals_per_eq = []
p_arrivals: Dict[str, float] = {}
for net in sub_inv:
for sta in net:
# Get station timing info
try:
sta_start = sta.start_date.timestamp
sta_end = sta.end_date.timestamp
except Exception:
sta_start = None
sta_end = None
# Check for existing arrivals
fetched_arrivals = db_manager.fetch_arrivals_distances(
str(eq.preferred_origin_id),
net.code,
sta.code
)
if fetched_arrivals:
p_time, s_time, dist_km, dist_deg, azi = fetched_arrivals
t_start = p_time - abs(before_p_sec)
t_end = p_time + abs(after_p_sec)
p_arrivals[f"{net.code}.{sta.code}"] = p_time
else:
# Calculate new arrivals
dist_deg = locations2degrees(
origin.latitude, origin.longitude,
sta.latitude, sta.longitude
)
dist_m, azi, _ = gps2dist_azimuth(
origin.latitude, origin.longitude,
sta.latitude, sta.longitude
)
p_time, s_time = get_p_s_times(eq, dist_deg, model)
if p_time is None:
print(f"Warning: Unable to calculate first arrival for {net.code}.{sta.code}")
continue
t_start = (p_time - abs(before_p_sec)).timestamp
t_end = (p_time + abs(after_p_sec)).timestamp
p_arrivals[f"{net.code}.{sta.code}"] = p_time.timestamp
# save these new arrivals to insert into database
arrivals_per_eq.append((
str(eq.preferred_origin_id),
eq.magnitudes[0].mag,
origin.latitude, origin.longitude, origin.depth/1000,
ot.timestamp,
net.code, sta.code, sta.latitude, sta.longitude, sta.elevation/1000,
sta_start, sta_end,
dist_deg, dist_m/1000, azi, p_time.timestamp,
s_time.timestamp if s_time else None,
model_name
))
# skip anything out of our search parameters
if dist_deg < min_radius:
print(f" Skipping {net.code}.{sta.code} \t(distance {dist_deg:4.1f} < min_radius {min_radius:4.1f})")
continue
elif dist_deg > max_radius:
print(f" Skipping {net.code}.{sta.code} \t(distance {dist_deg:4.1f} > max_radius {max_radius:4.1f})")
continue
else:
# Generate requests for each channel
for cha in sta:
t_end = min(t_end, datetime.now().timestamp() - 120)
t_start = min(t_start, t_end)
requests_per_eq.append((
net.code,
sta.code,
cha.location_code,
cha.code,
datetime.fromtimestamp(t_start, tz=timezone.utc).isoformat(),
datetime.fromtimestamp(t_end, tz=timezone.utc).isoformat()
))
return requests_per_eq, arrivals_per_eq, p_arrivals
[docs]
def combine_requests(
requests: List[Tuple[str, str, str, str, str, str]]
) -> List[Tuple[str, str, str, str, str, str]]:
"""
Combine multiple data requests for efficiency.
Groups requests by network and time range, combining stations, locations,
and channels into comma-separated lists to minimize the number of requests.
Args:
requests: List of request tuples, each containing:
(network, station, location, channel, start_time, end_time)
Returns:
List of combined request tuples with the same structure but with
station, location, and channel fields potentially containing
comma-separated lists.
Example:
>>> original = [
... ("IU", "ANMO", "00", "BHZ", "2020-01-01", "2020-01-02"),
... ("IU", "COLA", "00", "BHZ", "2020-01-01", "2020-01-02")
... ]
>>> combined = combine_requests(original)
>>> print(combined)
[("IU", "ANMO,COLA", "00", "BHZ", "2020-01-01", "2020-01-02")]
"""
if not requests:
return []
# Group requests by network and time range
groups = defaultdict(list)
for net, sta, loc, chan, t0, t1 in requests:
groups[(net, t0, t1)].append((sta, loc, chan))
# Combine requests within each group
combined_requests = []
for (net, t0, t1), items in groups.items():
# Collect unique values
stas = set(sta for sta, _, _ in items)
locs = set(loc for _, loc, _ in items)
chans = set(chan for _, _, chan in items)
# Create combined request
combined_requests.append((
net, # keep networks distinct
','.join(sorted(stas)),
','.join(sorted(locs)),
','.join(sorted(chans)),
t0,
t1
))
return combined_requests
[docs]
def get_missing_from_request(db_manager, eq_id: str, requests: List[Tuple], st: Stream) -> dict:
"""
Compare requested seismic data against what's present in a Stream.
Handles comma-separated values for location and channel codes.
Parameters:
-----------
eq_id : str
Earthquake ID to use as dictionary key
requests : List[Tuple]
List of request tuples, each containing (network, station, location, channel, starttime, endtime)
st : Stream
ObsPy Stream object containing seismic traces
Returns:
--------
dict
Nested dictionary with structure:
{eq_id: {
"network.station": value,
"network2.station2": value2,
...
}}
where value is either:
- list of missing channel strings ("network.station.location.channel")
- "Not Attempted" if stream is empty
- "ALL" if all requested channels are missing
- [] if all requested channels are present
"""
if not requests:
return {}
result = {eq_id: {}}
# Process each request
for request in requests:
net, sta, loc, cha, t0, t1 = request #only using t0 really
station_key = f"{net}.{sta}"
# Split location and channel if comma-separated
locations = loc.split(',') if ',' in loc else [loc]
channels = cha.split(',') if ',' in cha else [cha]
missing_channels = []
total_combinations = 0
missing_combinations = 0
# Check all combinations
for location in locations:
for channel in channels:
total_combinations += 1
# Look for matching trace
found_match = False
for tr in st:
# check the recently downloaded stream
if (tr.stats.network == net and
tr.stats.station == sta and
tr.stats.location == (location if location else '') and
fnmatch.fnmatch(tr.stats.channel, channel)):
found_match = True
break
# also check the database
if db_manager.check_data_existence(tr.stats.network,
tr.stats.station,
tr.stats.location,
tr.stats.channel,
t0,t1):
found_match = True
break
if not found_match:
missing_combinations += 1
missing_channels.append(
f"{net}.{sta}.{location}.{channel}"
)
# Determine value for this station
if missing_combinations == total_combinations: # nothing returned
result[eq_id][station_key] = "ALL"
elif missing_combinations == 0: # everything returned
result[eq_id][station_key] = []
else: # partial return
result[eq_id][station_key] = missing_channels
return result
[docs]
def prune_requests(
requests: List[Tuple[str, str, str, str, str, str]],
db_manager: DatabaseManager,
sds_path: str,
min_request_window: float = 3
) -> List[Tuple[str, str, str, str, str, str]]:
"""
Remove overlapping requests where data already exists in the archive.
Checks both the database and filesystem for existing data and removes or
splits requests to avoid re-downloading data that should be there already.
Args:
requests: List of request tuples containing:
(network, station, location, channel, start_time, end_time)
db_manager: DatabaseManager instance for querying existing data.
sds_path: Root path of the SDS archive.
min_request_window: Minimum time window in seconds to keep a request.
Requests shorter than this are discarded. Default is 3 seconds.
Returns:
List of pruned request tuples, sorted by start time, network, and station.
Note:
This function will update the database if it finds files in the SDS
structure that aren't yet recorded in the database.
Example:
>>> requests = [("IU", "ANMO", "00", "BHZ", "2020-01-01", "2020-01-02")]
>>> pruned = prune_requests(requests, db_manager, "/data/SDS")
"""
if not requests:
return []
# Group requests by network for batch processing
requests_by_network = {}
for req in requests:
network = req[0]
if network not in requests_by_network:
requests_by_network[network] = []
requests_by_network[network].append(req)
pruned_requests = []
with db_manager.connection() as conn:
cursor = conn.cursor()
# Process each network as a batch
for network, network_requests in requests_by_network.items():
# Find min and max times for this batch to reduce query range
min_start = min(UTCDateTime(req[4]) for req in network_requests)
max_end = max(UTCDateTime(req[5]) for req in network_requests)
# Get all relevant data in one query
cursor.execute('''
SELECT network, station, location, channel, starttime, endtime
FROM archive_data
WHERE network = ? AND endtime >= ? AND starttime <= ?
''', (network, min_start.isoformat(), max_end.isoformat()))
# Organize existing data by (station, location, channel)
existing_data = {}
for row in cursor.fetchall():
key = (row[1], row[2], row[3]) # station, location, channel
if key not in existing_data:
existing_data[key] = []
existing_data[key].append((UTCDateTime(row[4]), UTCDateTime(row[5])))
# Process each request with the pre-fetched data
for req in network_requests:
network, station, location, channel, start_str, end_str = req
start_time = UTCDateTime(start_str)
end_time = UTCDateTime(end_str)
if end_time - start_time < min_request_window:
continue
key = (station, location, channel)
# Check if we need to look at the filesystem
if key not in existing_data or not existing_data[key]:
# Get files only when necessary
file_paths = get_sds_filenames(
network, station, location, channel,
start_time, end_time, sds_path
)
if file_paths:
# Update database and our cached data
populate_database_from_files(cursor, file_paths=file_paths)
cursor.execute('''
SELECT starttime, endtime FROM archive_data
WHERE network = ? AND station = ? AND location = ? AND channel = ?
AND endtime >= ? AND starttime <= ?
ORDER BY starttime
''', (network, station, location, channel,
start_time.isoformat(), end_time.isoformat()))
existing_data[key] = [(UTCDateTime(r[0]), UTCDateTime(r[1]))
for r in cursor.fetchall()]
else:
pruned_requests.append(req)
continue
# Check if this specific request is fully covered by existing data
relevant_intervals = [
(db_start, db_end) for db_start, db_end in existing_data.get(key, [])
if not (db_end <= start_time or db_start >= end_time)
]
if not relevant_intervals:
pruned_requests.append(req)
continue
# Sort intervals by start time
relevant_intervals.sort(key=lambda x: x[0])
# Find gaps in coverage
current_time = start_time
gaps = []
for db_start, db_end in relevant_intervals:
if current_time < db_start:
gaps.append((current_time, db_start))
current_time = max(current_time, db_end)
if current_time < end_time:
gaps.append((current_time, end_time))
# Add requests for each gap that's large enough
for gap_start, gap_end in gaps:
if gap_end - gap_start >= min_request_window:
pruned_requests.append((
network, station, location, channel,
gap_start.isoformat(),
gap_end.isoformat()
))
if pruned_requests:
pruned_requests.sort(key=lambda x: (x[4], x[0], x[1]))
return pruned_requests
[docs]
def archive_request(
request: Tuple[str, str, str, str, str, str],
waveform_clients: Dict[str, Client],
sds_path: str,
db_manager: DatabaseManager
) -> None:
"""
Download seismic data for a request and archive it in SDS format.
Retrieves waveform data from FDSN web services, saves it in SDS format,
and updates the database. Handles authentication, data merging, and
various error conditions.
Args:
request: Tuple containing (network, station, location, channel,
start_time, end_time)
waveform_clients: Dictionary mapping network codes to FDSN clients.
Special key 'open' is used for default client.
sds_path: Root path of the SDS archive.
db_manager: DatabaseManager instance for updating the database.
Note:
- Supports per-network and per-station authentication
- Handles splitting of large station list requests
- Performs data merging when files already exist
- Attempts STEIM2 compression, falls back to uncompressed format
- Groups traces by day to handle fragmented data efficiently
Example:
>>> clients = {'IU': Client('IRIS'), 'open': Client('IRIS')}
>>> request = ("IU", "ANMO", "00", "BHZ", "2020-01-01", "2020-01-02")
>>> archive_request(request, clients, "/data/seismic", db_manager)
"""
try:
t0 = UTCDateTime(request[4])
t1 = UTCDateTime(request[5])
# Double check that the request range is real and not some db artifact
if t1 - t0 < 1:
return
time0 = time.time()
# Select appropriate client
if request[0] in waveform_clients:
wc = waveform_clients[request[0]]
elif request[0] + '.' + request[1] in waveform_clients:
wc = waveform_clients[request[0] + '.' + request[1]]
else:
wc = waveform_clients['open']
kwargs = {
'network': request[0].upper(),
'station': request[1].upper(),
'location': request[2].upper(),
'channel': request[3].upper(),
'starttime': t0,
'endtime': t1
}
# Handle long station lists
if len(request[1]) > 24:
st = Stream()
split_stations = request[1].split(',')
for s in split_stations:
try:
st += wc.get_waveforms(
station=s,
**{k: v for k, v in kwargs.items() if k != 'station'}
)
except Exception as e:
if 'code: 204' in str(e):
print(f"\n No data for station {s}")
else:
print(format_error(s,e))
else:
st = wc.get_waveforms(**kwargs)
# Log download statistics
download_time = time.time() - time0
download_size = sum(tr.data.nbytes for tr in st) / 1024**2 # MB
print(f" > Downloaded {download_size:.2f} MB @ {download_size/download_time:.2f} MB/s")
except Exception as e:
if 'code: 204' in str(e):
print(f" ~ No data available")
else:
print(format_error(request[1].upper(),e))
return
# Group traces by day
traces_by_day = defaultdict(Stream)
for tr in st:
net = tr.stats.network
sta = tr.stats.station
loc = tr.stats.location
cha = tr.stats.channel
starttime = tr.stats.starttime
endtime = tr.stats.endtime
# Handle trace start leaking into previous day
day_boundary = UTCDateTime(starttime.date + timedelta(days=1))
if (day_boundary - starttime) <= tr.stats.delta:
starttime = day_boundary
current_time = UTCDateTime(starttime.date)
while current_time < endtime:
year = current_time.year
doy = current_time.julday
next_day = current_time + 86400
day_end = min(next_day - tr.stats.delta/3, endtime)
day_tr = tr.slice(current_time, day_end, nearest_sample=False)
day_key = (year, doy, net, sta, loc, cha)
traces_by_day[day_key] += day_tr
current_time = next_day
# Process each day's data
to_insert_db = []
for (year, doy, net, sta, loc, cha), day_stream in traces_by_day.items():
full_sds_path = os.path.join(sds_path, str(year), net, sta, f"{cha}.D")
filename = f"{net}.{sta}.{loc}.{cha}.D.{year}.{doy:03d}"
full_path = os.path.join(full_sds_path, filename)
os.makedirs(full_sds_path, exist_ok=True)
if os.path.exists(full_path):
try:
existing_st = streamread(full_path)
existing_st += day_stream
existing_st.merge(method=-1, fill_value=None)
existing_st._cleanup(misalignment_threshold=0.25)
if existing_st:
print(f" ... Merging {full_path}")
except Exception as e:
print(f"! Could not read {full_path}:\n {e}")
continue
else:
existing_st = day_stream
if existing_st:
print(f" ... Writing {full_path}")
existing_st = Stream([tr for tr in existing_st if len(tr.data) > 1]) # assume 1-sample traces are artifacts
if existing_st:
try:
# Try STEIM2 compression first
existing_st.write(full_path, format="MSEED", encoding='STEIM2')
to_insert_db.extend(stream_to_db_elements(existing_st))
except Exception as e:
if "Wrong dtype" in str(e):
# Fall back to uncompressed format
print("Data type not compatible with STEIM2, attempting uncompressed format...")
try:
existing_st.write(full_path, format="MSEED")
to_insert_db.extend(stream_to_db_elements(existing_st))
except Exception as e:
print(f"Failed to write uncompressed MSEED to {full_path}:\n {e}")
else:
print(f"Failed to write {full_path}:\n {e}")
# Update database
try:
num_inserted = db_manager.bulk_insert_archive_data(to_insert_db)
except Exception as e:
print("! Error with bulk_insert_archive_data:", e)
# MAIN RUN FUNCTIONS
# ==================================================================
[docs]
def setup_paths(settings: SeismoLoaderSettings) -> Tuple[SeismoLoaderSettings, DatabaseManager]:
"""Initialize paths and database for seismic data management.
Args:
settings: Configuration settings containing paths and database information.
Returns:
Tuple containing:
- Updated settings with validated paths
- Initialized DatabaseManager instance
Raises:
ValueError: If SDS path is not set in settings.
Example:
>>> settings = SeismoLoaderSettings()
>>> settings.sds_path = "/data/seismic"
>>> settings, db_manager = setup_paths(settings)
"""
sds_path = settings.sds_path
if not sds_path:
raise ValueError("\nSDS Path not set!")
# Setup SDS directory
if not os.path.exists(sds_path):
os.makedirs(sds_path)
# Initialize database manager
db_path = settings.db_path
db_manager = DatabaseManager(db_path)
settings.sds_path = sds_path
settings.db_path = db_path
return settings, db_manager
# not in use?
[docs]
def get_selected_stations_at_channel_level(settings: SeismoLoaderSettings) -> SeismoLoaderSettings:
"""
Update inventory information to include channel-level details for selected stations.
Retrieves detailed channel information for each station in the selected inventory
using the specified FDSN client.
Args:
settings: Configuration settings containing station selection and client information.
Returns:
Updated settings with refined station inventory including channel information.
Example:
>>> settings = SeismoLoaderSettings()
>>> settings = get_selected_stations_at_channel_level(settings)
"""
print("Running get_selected_stations_at_channel_level")
waveform_client = Client(settings.waveform.client)
station_client = Client(settings.station.client) if settings.station.client else waveform_client
invs = Inventory()
for network in settings.station.selected_invs:
for station in network:
try:
updated_inventory = station_client.get_stations(
network=network.code,
station=station.code,
level="channel"
)
invs += updated_inventory
except Exception as e:
print(f"Error updating station {station.code}:\n{e}")
settings.station.selected_invs = invs
return settings
[docs]
def get_stations(settings: SeismoLoaderSettings) -> Optional[Inventory]:
"""
Retrieve station inventory based on configured criteria.
Gets station information from FDSN web services or local inventory based on
settings, including geographic constraints, network/station filters, and channel
preferences.
Args:
settings: Configuration settings containing station selection criteria,
client information, and filtering preferences.
Returns:
Inventory containing matching stations, or None if no stations found
or if station service is unavailable.
Note:
The function applies several layers of filtering:
1. Basic network/station/location/channel criteria
2. Geographic constraints (if specified)
3. Station exclusions/inclusions
4. Channel and location preferences
5. Sample rate filtering
Example:
>>> settings = SeismoLoaderSettings()
>>> settings.station.network = "IU"
>>> inventory = get_stations(settings)
"""
print("Running get_stations")
starttime = UTCDateTime(settings.station.date_config.start_time)
endtime = UTCDateTime(settings.station.date_config.end_time)
if starttime >= endtime:
print("get_stations: Starttime greater than endtime!")
return None
waveform_client = Client(settings.waveform.client)
highest_sr_only = settings.station.highest_samplerate_only
cha_pref = settings.waveform.channel_pref
loc_pref = settings.waveform.location_pref
station_client = Client(settings.station.client) if settings.station.client else waveform_client
# Set default wildcards for unspecified codes
net = settings.station.network or '*'
sta = settings.station.station or '*'
loc = settings.station.location or '*'
cha = settings.station.channel or '*'
kwargs = {
'network': net,
'station': sta,
'location': loc,
'channel': cha,
'starttime': starttime,
'endtime': endtime,
'includerestricted': settings.station.include_restricted,
'level': settings.station.level.value
}
# Verify station service availability
if 'station' not in station_client.services:
print(f"Station service not available at {station_client.base_url}, no stations returned")
return None
# Remove unsupported parameters for this client
kwargs = {k: v for k, v in kwargs.items()
if k in station_client.services['station']}
inv = None
# Try loading local inventory if specified
if settings.station.local_inventory:
try:
inv = read_inventory(settings.station.local_inventory, level='channel')
except Exception as e:
print(f"Could not read {settings.station.local_inventory}:\n{e}")
# Query stations based on geographic constraints
elif settings.station.geo_constraint:
# Reduce number of circular constraints to reduce excessive client calls
bound_searches = [ele for ele in settings.station.geo_constraint
if ele.geo_type == GeoConstraintType.BOUNDING]
circle_searches = [ele for ele in settings.station.geo_constraint
if ele.geo_type == GeoConstraintType.CIRCLE]
if len(circle_searches) > 4: # not as strict for stations
new_circle_searches = []
circ_center_lat = sum(p.coords.lat for p in circle_searches) / len(circle_searches)
lon_radians = [np.radians(p.coords.lon) for p in circle_searches]
avg_x = sum(np.cos(r) for r in lon_radians) / len(circle_searches)
avg_y = sum(np.sin(r) for r in lon_radians) / len(circle_searches)
circ_center_lon = np.degrees(np.arctan2(avg_y, avg_x))
circ_distances = [ locations2degrees(circ_center_lat,circ_center_lon, p.coords.lat,p.coords.lon)
for p in circle_searches]
max_circ_distances = max(circ_distances)
mean_circ = copy.deepcopy(circle_searches[0])
mean_circ.coords.lat = circ_center_lat
mean_circ.coords.lon = circ_center_lon
if mean_circ.coords.min_radius > max_circ_distances:
mean_circ.coords.min_radius -= max_circ_distances
mean_circ.coords.max_radius += max_circ_distances
if max_circ_distances < 60: # in degrees. make this wider for stations
circle_searches = [mean_circ]
else: # go throught the list and remove what we can
new_circle_searches = [mean_circ]
for i, cs in enumerate(circle_searches):
if circ_distances[i] >= 60: # add any outliers
new_circle_searches.append(cs)
circle_searches = new_circle_searches
for geo in bound_searches + circle_searches:
_inv = None
try:
if geo.geo_type == GeoConstraintType.BOUNDING:
_inv = station_client.get_stations(
minlatitude=round(geo.coords.min_lat,4),
maxlatitude=round(geo.coords.max_lat,4),
minlongitude=round(geo.coords.min_lon,4),
maxlongitude=round(geo.coords.max_lon,4),
**kwargs
)
elif geo.geo_type == GeoConstraintType.CIRCLE:
_inv = station_client.get_stations(
minradius=max(0,round(geo.coords.min_radius,3)),
maxradius=min(180,round(geo.coords.max_radius,3)),
latitude=round(geo.coords.lat,4),
longitude=round(geo.coords.lon,4),
**kwargs
)
else:
print(f"Unknown Geometry type: {geo.geo_type}")
except FDSNNoDataException:
print(f"No stations found at {station_client.base_url} with given geographic bounds")
if _inv is not None:
inv = _inv if inv is None else inv + _inv
# Remove any events that may have been added by loosening geo searches. Also removes duplicates.
try:
inv = filter_inventory_by_geo_constraints(inv,settings.station.geo_constraint)
except Exception as e:
print("filter_inventory_by_geo_constraits issue:",e)
else: # Query without geographic constraints
try:
inv = station_client.get_stations(**kwargs)
except FDSNNoDataException:
print(f"No stations found at {station_client.base_url} with given parameters")
return None
if inv is None:
print("No inventory returned (!?)")
return None
# Apply station exclusions
if settings.station.exclude_stations: # a "SeismoQuery" object
for sq in settings.station.exclude_stations:
inv = inv.remove(network=sq.network, station=sq.station,
location=sq.location, channel=sq.channel)
# Add forced stations
if settings.station.force_stations: # a "SeismoQuery" object
for sq in settings.station.force_stations:
try:
inv += station_client.get_stations(
network=sq.network,
station=sq.station,
location='*' if sq.location is None else sq.location,
channel=sq.channel or '*',
starttime=sq.starttime,
endtime=sq.endtime,
level='channel'
)
except Exception as e:
print(f"Could not find requested station {net}.{sta} at {settings.station.client}\n{e}")
continue
# Apply final filters
if highest_sr_only:
inv = select_highest_samplerate(inv, minSR=5)
if cha_pref or loc_pref:
inv = get_preferred_channels(inv, cha_pref, loc_pref)
return inv
[docs]
def get_events(settings: SeismoLoaderSettings) -> List[Catalog]:
"""
Retrieve seismic event catalogs based on configured criteria.
Queries FDSN web services or loads local catalogs for seismic events matching
specified criteria including time range, magnitude, depth, and geographic constraints.
Args:
settings: Configuration settings containing event search criteria,
client information, and filtering preferences.
Returns:
List of ObsPy Catalog objects containing matching events.
Returns empty catalog if no events found.
Raises:
FileNotFoundError: If local catalog file not found.
PermissionError: If unable to access local catalog file.
ValueError: If invalid geographic constraint type specified.
Example:
>>> settings = SeismoLoaderSettings()
>>> settings.event.min_magnitude = 5.0
>>> catalogs = get_events(settings)
"""
print("Running get_events")
starttime = UTCDateTime(settings.event.date_config.start_time)
endtime = UTCDateTime(settings.event.date_config.end_time)
if starttime >= endtime:
print("get_events: Starttime greater than endtime!")
return Catalog()
waveform_client = Client(settings.waveform.client)
event_client = Client(settings.event.client) if settings.event.client else waveform_client
# Check for local catalog first
if settings.event.local_catalog:
try:
return read_events(settings.event.local_catalog)
except FileNotFoundError:
raise FileNotFoundError(f"File not found: {settings.event.local_catalog}")
except PermissionError:
raise PermissionError(f"Permission denied: {settings.event.local_catalog}")
except Exception as e:
raise Exception(f"Error reading catalog:\n{e}")
catalog = Catalog()
# Build query parameters
kwargs = {
'starttime': starttime,
'endtime': endtime,
'minmagnitude': settings.event.min_magnitude,
'maxmagnitude': settings.event.max_magnitude,
'mindepth': settings.event.min_depth,
'maxdepth': settings.event.max_depth,
'includeallorigins': settings.event.include_all_origins,
'includeallmagnitudes': settings.event.include_all_magnitudes,
'includearrivals': settings.event.include_arrivals,
'eventtype': settings.event.eventtype,
'catalog': settings.event.catalog,
'contributor': settings.event.contributor,
'updatedafter': settings.event.updatedafter
}
# Verify event service availability
if 'event' not in event_client.services:
print(f"Event service not available at {event_client.base_url}")
return catalog
# Remove unsupported parameters for this client
kwargs = {k: v for k, v in kwargs.items()
if k in event_client.services['event']}
# Handle global search case
if not settings.event.geo_constraint:
try:
cat = event_client.get_events(**kwargs)
print(f"Global Search: Found {len(cat)} events from {settings.event.client}")
catalog.extend(cat)
except FDSNNoDataException:
print("No events found in global search")
return catalog
# Handle geographic constraints
# But first.. reduce number of circular constraints to reduce excessive client calls
bound_searches = [ele for ele in settings.event.geo_constraint
if ele.geo_type == GeoConstraintType.BOUNDING]
circle_searches = [ele for ele in settings.event.geo_constraint
if ele.geo_type == GeoConstraintType.CIRCLE]
if len(circle_searches) > 1:
new_circle_searches = []
circ_center_lat = sum(p.coords.lat for p in circle_searches) / len(circle_searches)
lon_radians = [np.radians(p.coords.lon) for p in circle_searches]
avg_x = sum(np.cos(r) for r in lon_radians) / len(circle_searches)
avg_y = sum(np.sin(r) for r in lon_radians) / len(circle_searches)
circ_center_lon = np.degrees(np.arctan2(avg_y, avg_x))
circ_distances = [ locations2degrees(circ_center_lat,circ_center_lon, p.coords.lat,p.coords.lon)
for p in circle_searches]
max_circ_distances = max(circ_distances)
mean_circ = copy.deepcopy(circle_searches[0])
mean_circ.coords.lat = circ_center_lat
mean_circ.coords.lon = circ_center_lon
if mean_circ.coords.min_radius > max_circ_distances:
mean_circ.coords.min_radius -= max_circ_distances
mean_circ.coords.max_radius += max_circ_distances
if max_circ_distances < 15: # in degrees. all points packed in closely enough
circle_searches = [mean_circ]
else: # go throught the list and remove what we can
new_circle_searches = [mean_circ]
for i, cs in enumerate(circle_searches):
if circ_distances[i] >= 15: # add any outliers
new_circle_searches.append(cs)
circle_searches = new_circle_searches
for geo in bound_searches + circle_searches:
try:
if geo.geo_type == GeoConstraintType.CIRCLE:
cat = event_client.get_events(
latitude=round(geo.coords.lat,4),
longitude=round(geo.coords.lon,4),
minradius=max(0,round(geo.coords.min_radius,3)),
maxradius=min(180,round(geo.coords.max_radius,3)),
**kwargs
)
print(f"Found {len(cat)} events from {settings.event.client}")
catalog.extend(cat)
elif geo.geo_type == GeoConstraintType.BOUNDING:
cat = event_client.get_events(
minlatitude=round(geo.coords.min_lat,4),
minlongitude=round(geo.coords.min_lon,4),
maxlatitude=round(geo.coords.max_lat,4),
maxlongitude=round(geo.coords.max_lon,4),
**kwargs
)
print(f"Found {len(cat)} events from {settings.event.client}")
catalog.extend(cat)
else:
raise ValueError(f"Invalid event search type: {geo.geo_type.value}")
except FDSNNoDataException:
print(f"No events found for constraint: {geo.geo_type}")
continue
# Remove duplicates (*now done below)
#catalog = remove_duplicate_events(catalog)
# Re-filter to remove anything that eclipsed original search. Also removes duplicates.
try:
catalog = filter_catalog_by_geo_constraints(catalog,settings.event.geo_constraint)
except Exception as e:
print("filter_catalog_by_geo_constraints issue:",e)
return catalog
[docs]
def run_continuous(settings: SeismoLoaderSettings, stop_event: threading.Event = None):
"""
Retrieves continuous seismic data over long time intervals for a set of stations
defined by the `inv` parameter. The function manages multiple steps including
generating data requests, pruning unnecessary requests based on existing data,
combining requests for efficiency, and finally archiving the retrieved data.
The function uses a client setup based on the configuration in `settings` to
handle different data sources and authentication methods. Errors during client
creation or data retrieval are handled gracefully, with issues logged to the console.
Parameters:
- settings (SeismoLoaderSettings): Configuration settings containing client information,
authentication details, and database paths necessary for data retrieval and storage.
This should include the start and end times for data collection, database path,
and SDS archive path among other configurations.
- stop_event (threading.Event): Optional event flag for canceling the operation mid-execution.
If provided and set, the function will terminate gracefully at the next safe point.
Workflow:
1. Initialize clients for waveform data retrieval.
2. Retrieve station information based on settings.
3. Collect initial data requests for the given time interval.
4. Prune requests based on existing data in the database to avoid redundancy.
5. Combine similar requests to minimize the number of individual operations.
6. Update or create clients based on specific network credentials if necessary.
7. Execute data retrieval requests, archive data to disk, and update the database.
Raises:
- Exception: General exceptions could be raised due to misconfiguration, unsuccessful
data retrieval or client initialization errors. These exceptions are caught and logged,
but not re-raised, allowing the process to continue with other requests.
Notes:
- It is crucial to ensure that the settings object is correctly configured, especially
the client details and authentication credentials to avoid runtime errors.
- The function logs detailed information about the processing steps and errors to aid
in debugging and monitoring of data retrieval processes.
"""
print("Running run_continuous\n----------------------")
settings, db_manager = setup_paths(settings)
starttime = UTCDateTime(settings.station.date_config.start_time)
endtime = UTCDateTime(settings.station.date_config.end_time)
waveform_client = Client(settings.waveform.client)
# Sanity check times
endtime = min(endtime, UTCDateTime.now()-120)
if starttime > endtime:
print("run_continuous: Starttime greater than endtime!")
return True
# Collect requests
print("Collecting, combining, and pruning requests against database... ")
requests = collect_requests(settings.station.selected_invs,
starttime, endtime, days_per_request=settings.waveform.days_per_request,
cha_pref=settings.waveform.channel_pref,loc_pref=settings.waveform.location_pref)
# Remove any for data we already have (requires updated db)
# If force_redownload is flagged, then ignore request pruning
if settings.waveform.force_redownload:
print("Forcing re-download as requested...")
pruned_requests = requests
else:
# no message needed for default behaviour
pruned_requests = prune_requests(requests, db_manager, settings.sds_path)
# Break if nothing to do
if not pruned_requests:
print(f" ... All data already archived!\n")
return True
# Check for cancellation after request pruning
# This allows termination after database operations but before
# network operations and data downloads begin
if stop_event and stop_event.is_set():
print("Run cancelled!")
return None
# Combine these into fewer (but larger) requests
combined_requests = combine_requests(pruned_requests)
waveform_clients= {'open':waveform_client} #now a dictionary
requested_networks = [ele[0] for ele in combined_requests]
# May only work for network-wide credentials at the moment (99% use case)
for cred in settings.auths:
cred_net = cred.nslc_code.split('.')[0].upper()
if cred_net not in requested_networks:
continue
try:
new_client = Client(settings.waveform.client,
user=cred.username.upper(), password=cred.password)
waveform_clients.update({cred_net:new_client})
except:
print("Issue creating client: %s %s via %s:%s" % (settings.waveform.client,
cred.nslc_code, cred.username, cred.password))
continue
# Archive to disk and updated database
for request in combined_requests:
print(" ")
print("Requesting: ", request)
time.sleep(0.05) # to help ctrl-C out if needed
try:
archive_request(request, waveform_clients, settings.sds_path, db_manager)
except Exception as e:
print(f"Continuous request not successful: {request} with exception:\n {e}")
continue
# Check for cancellation before each individual request
# This allows termination between network requests, preventing
# unnecessary data downloads if the user cancels mid-process
# This is the only time consuming step so probably the only sensible place for a cancel break
if stop_event and stop_event.is_set():
print("Run cancelled!")
db_manager.join_continuous_segments(settings.processing.gap_tolerance)
return True
# Cleanup the database
try:
db_manager.join_continuous_segments(settings.processing.gap_tolerance)
except Exception as e:
print(f"! Error with join_continuous_segments:\n {e}")
return True
[docs]
def run_event(settings: SeismoLoaderSettings, stop_event: threading.Event = None):
"""
Processes and downloads seismic event data for each event in the provided catalog using
the specified settings and station inventory. The function manages multiple steps including
data requests, arrival time calculations, database updates, and data retrieval.
The function handles data retrieval from FDSN web services with support for authenticated
access and restricted data. Processing can be interrupted via the stop_event parameter,
and errors during execution are handled gracefully with detailed logging.
Parameters:
- settings (SeismoLoaderSettings): Configuration settings that include client details,
authentication credentials, event-specific parameters like radius and time window,
and paths for data storage.
- stop_event (threading.Event): Optional event flag for canceling the operation mid-execution.
If provided and set, the function will terminate gracefully at the next safe point.
Workflow:
1. Initialize paths and database connections
2. Load appropriate travel time model for arrival calculations
3. Process each event in the catalog:
a. Calculate arrival times and generate data requests
b. Update arrival information in database
c. Check for existing data and prune redundant requests
d. Download and archive new data
e. Add event metadata to traces (arrivals, distances, azimuths)
4. Combine data into event streams with complete metadata
Returns:
- List[obspy.Stream]: List of streams, each containing data for one event with
complete metadata including arrival times, distances, and azimuths. Returns None
if operation is canceled or no data is processed.
Raises:
- Exception: General exceptions from client creation, data retrieval, or processing
are caught and logged but not re-raised, allowing processing to continue with
remaining events.
Notes:
- The function supports threading and can be safely interrupted via stop_event
- Station metadata is enriched with event-specific information including arrivals
- Data is archived in SDS format and the database is updated accordingly
- Each stream in the output includes complete event metadata for analysis
"""
print(f"Running run_event\n-----------------")
settings, db_manager = setup_paths(settings)
waveform_client = Client(settings.waveform.client)
# Initialize travel time model
try:
ttmodel = TauPyModel(settings.event.model)
except Exception as e:
print(f"Falling back to IASP91 model: {str(e)}")
ttmodel = TauPyModel('IASP91')
all_event_traces = []
all_missing = {}
for i, eq in enumerate(settings.event.selected_catalogs):
if stop_event and stop_event.is_set():
print("\nCancelling run_event!")
# stop_event.clear() # i don't think these are needed / REVIEW
try:
print("\n~~ Cleaning up database ~~")
db_manager.join_continuous_segments(settings.processing.gap_tolerance)
except Exception as e:
print(f"! Error with join_continuous_segments: {str(e)}")
if all_event_traces:
return all_event_traces, all_missing
else:
return None
try:
event_region = eq.event_descriptions[0].text
except:
event_region = ""
print(" ") # hack to make streamlit in-app log insert a newline
print(
f"Processing event {i+1}/{len(settings.event.selected_catalogs)} | "
f"{event_region:^35} | "
f"{str(eq.origins[0].time)[0:16]} "
f"({eq.origins[0].latitude:.2f},{eq.origins[0].longitude:.2f})"
)
# Collect requests for this event
print("Collecting, combining, and pruning requests against database...")
try:
requests, new_arrivals, p_arrivals = collect_requests_event(
eq, settings.station.selected_invs,
model=ttmodel,
settings=settings
)
except Exception as e:
print(f"Issue running collect_requests_event in run_event:\n {e}")
# Update arrival database
if new_arrivals:
try:
db_manager.bulk_insert_arrival_data(new_arrivals)
except Exception as e:
print(f"Issue with run_event > bulk_insert_arrival_data:\n",{e})
# Process data requests
if settings.waveform.force_redownload:
print("Forcing re-download as requested...")
pruned_requests = requests
else:
try:
pruned_requests = prune_requests(requests, db_manager, settings.sds_path)
except Exception as e:
print(f"Issue with run_event > prune_requests:\n",{e})
if len(requests) > 0 and not pruned_requests:
print(f" ... All data already archived!\n")
# Download new data if needed
if pruned_requests:
try:
combined_requests = combine_requests(pruned_requests)
except Exception as e:
print(f"Issue with run_event > combine_requests:\n",{e})
if not combined_requests:
print("DEBUG: combined requests is empty? here was pruned_requests",pruned_requests)
continue
# Setup authenticated clients
waveform_clients = {'open': waveform_client}
requested_networks = [req[0] for req in combined_requests]
for cred in settings.auths:
cred_net = cred.nslc_code.split('.')[0].upper()
if cred_net not in requested_networks:
continue
try:
new_client = Client(
settings.waveform.client,
user=cred.username.upper(),
password=cred.password
)
waveform_clients[cred_net] = new_client
except Exception as e:
print(f"Issue creating client for {cred_net}:\n {str(e)}")
# Process requests
for request in combined_requests:
print(f" Requesting: {request}")
try:
archive_request(
request,
waveform_clients,
settings.sds_path,
db_manager
)
except Exception as e:
print(f"Error archiving request {request}:\n {str(e)}")
continue
if stop_event and stop_event.is_set():
print("\nCancelling run_event!")
# ? stop_event.clear()
try:
print("\n~~ Cleaning up database ~~")
db_manager.join_continuous_segments(settings.processing.gap_tolerance)
except Exception as e:
print(f"! Error with join_continuous_segments: {str(e)}")
if all_event_traces:
return all_event_traces, all_missing
else:
return None
# Now load everything in from our archive
event_stream = Stream()
for request in requests:
try:
st = get_local_waveform(request, settings)
if st:
# Add event metadata to traces
arrivals = db_manager.fetch_arrivals_distances(
eq.preferred_origin_id.id,
request[0].upper(), # network
request[1].upper() # station
)
if arrivals:
for tr in st:
tr.stats.resource_id = eq.resource_id.id
tr.stats.p_arrival = arrivals[0]
tr.stats.s_arrival = arrivals[1]
tr.stats.distance_km = arrivals[2]
tr.stats.distance_deg = arrivals[3]
tr.stats.azimuth = arrivals[4]
tr.stats.event_magnitude = eq.magnitudes[0].mag if hasattr(eq, 'magnitudes') and eq.magnitudes else 0.99
tr.stats.event_region = event_region
tr.stats.event_time = eq.origins[0].time
event_stream.extend(st)
except Exception as e:
print(f"Error reading data for {request[0].upper()}.{request[1].upper()}:\n {str(e)}")
continue
# Now attempt to keep track of what data was missing.
# Note that this is not catching out-of-bounds data, for better or worse (probably better)
if event_stream:
all_event_traces.extend(event_stream)
try:
missing = get_missing_from_request(db_manager,eq.resource_id.id,requests,event_stream)
except Exception as e:
print("get_missing_from_request issue:", e)
if missing:
all_missing.update(missing)
# Final database cleanup
try:
print("\n~~ Cleaning up database ~~")
db_manager.join_continuous_segments(settings.processing.gap_tolerance)
except Exception as e:
print(f"! Error with join_continuous_segments: {str(e)}")
# And return to ui/components/waveform.py
if all_event_traces:
return all_event_traces, all_missing
else:
return None
[docs]
def run_main(
settings: Optional[SeismoLoaderSettings] = None,
from_file: Optional[str] = None,
stop_event: threading.Event = None
) -> None:
"""Main entry point for seismic data retrieval and processing.
Coordinates the overall workflow for retrieving and processing seismic data,
handling both continuous and event-based data collection based on settings.
Args:
settings: Configuration settings for data retrieval and processing.
If None, settings must be provided via from_file.
from_file: Path to configuration file to load settings from.
Only used if settings is None.
stop_event: Optional event flag for canceling the operation mid-execution.
If provided and set, the function will terminate gracefully at the next safe point.
Returns:
The result from run_continuous or run_event, or None if cancelled.
Example:
>>> # Using settings object
>>> settings = SeismoLoaderSettings()
>>> settings.download_type = DownloadType.EVENT
>>> run_main(settings)
>>> # Using configuration file
>>> run_main(from_file="config.ini")
"""
if not settings and from_file:
settings = SeismoLoaderSettings()
settings = settings.from_cfg_file(cfg_source=from_file)
settings, db_manager = setup_paths(settings)
# Load client URL mappings
settings.client_url_mapping.load()
URL_MAPPINGS = settings.client_url_mapping.maps
# Determine download type
download_type = settings.download_type.value
if not is_in_enum(download_type, DownloadType):
download_type = DownloadType.CONTINUOUS
# Check for cancellation before starting any data processing
# This allows early termination before any resource-intensive operations begin
if stop_event and stop_event.is_set():
print("Run cancelled!")
return None
# Process continuous data
if download_type == DownloadType.CONTINUOUS:
settings.station.selected_invs = get_stations(settings)
return run_continuous(settings, stop_event)
# run_continuous(settings) # this doesn't return anything
# Process event-based data
if download_type == DownloadType.EVENT:
settings.event.selected_catalogs = get_events(settings)
settings.station.selected_invs = get_stations(settings)
event_traces, missing = run_event(settings, stop_event) # this returns a stream containing all the downloaded traces, and a dictionary of what's missing
return None