Source code for seed_vault.ui.components.continuous_waveform

# seed_vault/ui/components/continuous_waveform.py

from typing import List
import streamlit as st
from datetime import datetime, date, timezone
from copy import deepcopy
import threading
import sys
import time
import queue
from html import escape
from seed_vault.models.config import SeismoLoaderSettings
from seed_vault.service.seismoloader import run_continuous
from seed_vault.ui.components.display_log import ConsoleDisplay
from seed_vault.service.utils import convert_to_datetime, get_time_interval, shift_time, parse_inv
from seed_vault.ui.app_pages.helpers.common import save_filter

# Create a global stop event for cancellation
stop_event = threading.Event()
# Create a global queue for logs
log_queue = queue.Queue()

[docs] class ContinuousFilterMenu: """A menu component for filtering and controlling continuous waveform data. This class provides a user interface for managing continuous waveform data retrieval, including time range selection and station filtering. Attributes: settings (SeismoLoaderSettings): Configuration settings for seismic data processing. old_settings (SeismoLoaderSettings): Previous state of settings for change detection. old_time_state (dict): Previous time range state for change detection. last_button_pressed (str): Last button interaction for UI state management. todo_nets (List[str]): List of networks to process. """ def __init__(self, settings: SeismoLoaderSettings): """Initialize the ContinuousFilterMenu. Args: settings (SeismoLoaderSettings): Configuration settings for seismic data processing. """ self.settings = settings self.old_settings = deepcopy(settings) # Track previous time state self.old_time_state = { 'start_time': self.settings.station.date_config.start_time, 'end_time': self.settings.station.date_config.end_time } self.last_button_pressed = None self.todo_nets = None # Check if date range is valid self.validate_date_range()
[docs] def validate_date_range(self): """Validate that the selected date range is valid. This method checks that the end time is not earlier than the start time and updates the session state accordingly. Note: The validation result is stored in st.session_state["date_range_valid"]. """ start_time = self.settings.station.date_config.start_time end_time = self.settings.station.date_config.end_time # Check if dates exist if start_time is None or end_time is None: st.session_state["date_range_valid"] = False return # Convert to datetime objects with UTC timezone try: # Handle string inputs if isinstance(start_time, str): start_time = datetime.fromisoformat(start_time).replace(tzinfo=timezone.utc) # Handle datetime inputs elif hasattr(start_time, 'tzinfo'): if start_time.tzinfo is None: start_time = start_time.replace(tzinfo=timezone.utc) if isinstance(end_time, str): end_time = datetime.fromisoformat(end_time).replace(tzinfo=timezone.utc) elif hasattr(end_time, 'tzinfo'): if end_time.tzinfo is None: end_time = end_time.replace(tzinfo=timezone.utc) # Now we can safely compare st.session_state["date_range_valid"] = end_time > start_time except (ValueError, AttributeError, TypeError): # Catch any conversion or comparison errors st.session_state["date_range_valid"] = False
[docs] def refresh_filters(self): """Check for changes in time range and settings, trigger UI updates. This method compares current settings with previous state and triggers a UI refresh if changes are detected. It also handles saving of settings. Note: The method uses Streamlit's rerun mechanism to update the UI when changes are detected. """ current_time_state = { 'start_time': self.settings.station.date_config.start_time, 'end_time': self.settings.station.date_config.end_time } # Check if time state changed - use deep comparison for datetime objects # it doesn't seem to stay updated always.. possible not working correctly in __init__ time_changed = (current_time_state['start_time'] != self.old_time_state['start_time'] or current_time_state['end_time'] != self.old_time_state['end_time']) if time_changed: self.old_time_state = { 'start_time': current_time_state['start_time'], 'end_time': current_time_state['end_time'] } # Validate date range whenever time changes self.validate_date_range() save_filter(self.settings) st.rerun() # Check if other settings changed changes = self.settings.has_changed(self.old_settings) if changes.get('has_changed', False): self.old_settings = deepcopy(self.settings) save_filter(self.settings) st.rerun()
[docs] def refresh_filters_OLD(self): """Check for changes and trigger updates""" current_time_state = { 'start_time': self.settings.station.date_config.start_time, 'end_time': self.settings.station.date_config.end_time } # Check if time state changed if current_time_state != self.old_time_state: self.old_time_state = current_time_state.copy() save_filter(self.settings) st.rerun() # Check if settings changed changes = self.settings.has_changed(self.old_settings) if changes.get('has_changed', False): self.old_settings = deepcopy(self.settings) save_filter(self.settings) st.rerun()
[docs] def render(self): """Render the continuous waveform filter menu interface. This method creates the UI for continuous waveform data management, including: - Time range selection with year/month/week controls - Date and time input fields - Quick selection buttons for common time ranges - Network/station/location/channel information display The interface is organized in expandable sections: - Time Range Adjustment - Submitted NSLCs Information Note: The interface provides both manual input and quick selection options for time range management. """ st.sidebar.title("Download Parameters") ## Get the list of items about to be downloaded # ...For some reason this snippet only works in render # & we don't want to re-run it needlessly if not self.todo_nets: self.todo_nets,self.todo_stas,self.todo_locs,self.todo_chas = \ parse_inv(self.settings.station.selected_invs) with st.sidebar.expander("Adjust Time Range?", expanded=True): start_date, start_time = convert_to_datetime(self.settings.station.date_config.start_time) end_date, end_time = convert_to_datetime(self.settings.station.date_config.end_time) # Row 1: Year controls col1, col2, col3, col4 = st.columns(4) # Row 2: Month controls col5, col6, col7, col8 = st.columns(4) # Row 3: Week controls col9, col10, col11, col12 = st.columns(4) # Row 4: Day controls #col13, col14, col15, col16 = st.columns(4) # Year controls with col1: if st.button("-Year", key="start-year-minus"): self.settings.station.date_config.start_time = shift_time( self.settings.station.date_config.start_time, 'year', -1) self.validate_date_range() self.refresh_filters() with col2: if st.button("+Year", key="start-year-plus"): self.settings.station.date_config.start_time = shift_time( self.settings.station.date_config.start_time, 'year', 1) self.validate_date_range() self.refresh_filters() with col3: if st.button("-Year", key="end-year-minus"): self.settings.station.date_config.end_time = shift_time( self.settings.station.date_config.end_time, 'year', -1) self.validate_date_range() self.refresh_filters() with col4: if st.button("+Year", key="end-year-plus"): self.settings.station.date_config.end_time = shift_time( self.settings.station.date_config.end_time, 'year', 1) self.validate_date_range() self.refresh_filters() # Month controls with col5: if st.button("-Month", key="start-month-minus"): self.settings.station.date_config.start_time = shift_time( self.settings.station.date_config.start_time, 'month', -1) self.validate_date_range() self.refresh_filters() with col6: if st.button("+Month", key="start-month-plus"): self.settings.station.date_config.start_time = shift_time( self.settings.station.date_config.start_time, 'month', 1) self.validate_date_range() self.refresh_filters() with col7: if st.button("-Month", key="end-month-minus"): self.settings.station.date_config.end_time = shift_time( self.settings.station.date_config.end_time, 'month', -1) self.validate_date_range() self.refresh_filters() with col8: if st.button("+Month", key="end-month-plus"): self.settings.station.date_config.end_time = shift_time( self.settings.station.date_config.end_time, 'month', 1) self.validate_date_range() self.refresh_filters() # Week controls with col9: if st.button("-Week", key="start-week-minus"): self.settings.station.date_config.start_time = shift_time( self.settings.station.date_config.start_time, 'week', -1) self.validate_date_range() self.refresh_filters() with col10: if st.button("+Week", key="start-week-plus"): self.settings.station.date_config.start_time = shift_time( self.settings.station.date_config.start_time, 'week', 1) self.validate_date_range() self.refresh_filters() with col11: if st.button("-Week", key="end-week-minus"): self.settings.station.date_config.end_time = shift_time( self.settings.station.date_config.end_time, 'week', -1) self.validate_date_range() self.refresh_filters() with col12: if st.button("+Week", key="end-week-plus"): self.settings.station.date_config.end_time = shift_time( self.settings.station.date_config.end_time, 'week', 1) self.validate_date_range() self.refresh_filters() # Day controls (...overkill) """ with col13: if st.button("- Day", key="start-day-minus"): self.settings.station.date_config.start_time = shift_time( self.settings.station.date_config.start_time, 'day', -1) self.refresh_filters() with col14: if st.button("+ Day", key="start-day-plus"): self.settings.station.date_config.start_time = shift_time( self.settings.station.date_config.start_time, 'day', 1) self.refresh_filters() with col15: if st.button("- Day", key="end-day-minus"): self.settings.station.date_config.end_time = shift_time( self.settings.station.date_config.end_time, 'day', -1) self.refresh_filters() with col16: if st.button("+ Day", key="end-day-plus"): self.settings.station.date_config.end_time = shift_time( self.settings.station.date_config.end_time, 'day', 1) self.refresh_filters() """ c1, c2 = st.columns([1,1]) with c1: new_start_date = st.date_input("Start Date", value=start_date) new_start_time = st.time_input("Start Time (UTC)", value=start_time) # Handle cases where only date or only time has changed date_changed = new_start_date != start_date time_changed = new_start_time != start_time if date_changed or time_changed: new_start = datetime.combine(new_start_date, new_start_time) self.settings.station.date_config.start_time = new_start self.last_button_pressed = None self.validate_date_range() self.refresh_filters() with c2: new_end_date = st.date_input("End Date", value=end_date) new_end_time = st.time_input("End Time (UTC)", value=end_time) # Handle cases where only date or only time has changed date_changed = new_end_date != end_date time_changed = new_end_time != end_time if date_changed or time_changed: new_end = datetime.combine(new_end_date, new_end_time) self.settings.station.date_config.end_time = new_end self.last_button_pressed = None self.validate_date_range() self.refresh_filters() # Display validation message if date range is invalid if not st.session_state.get("date_range_valid", True): st.error("End time must be > start time") # also keep the last month/week/day options c21,c22,c23 = st.columns([1,1,1]) with c21: if st.button('Last Month', key="station-set-last-month"): end_time, start_time = get_time_interval('month') self.settings.station.date_config.end_time = end_time self.settings.station.date_config.start_time = start_time self.refresh_filters() with c22: if st.button('Last Week', key="station-set-last-week"): end_time, start_time = get_time_interval('week') self.settings.station.date_config.end_time = end_time self.settings.station.date_config.start_time = start_time self.refresh_filters() with c23: if st.button('Last Day', key="station-set-last-day"): end_time, start_time = get_time_interval('day') self.settings.station.date_config.end_time = end_time self.settings.station.date_config.start_time = start_time self.refresh_filters() with st.sidebar.expander("Submitted NSLCs:", expanded=True): st.caption(f"Networks: {','.join(self.todo_nets) if self.todo_nets else 'None'}") st.caption(f"Stations: {','.join(self.todo_stas) if self.todo_stas else 'None'}") st.caption(f"Locations: {','.join([loc if loc != '' else '--' for loc in self.todo_locs]) if self.todo_locs else 'None'}") st.caption(f"Channels: {','.join(self.todo_chas) if self.todo_chas else 'None'}")
[docs] class ContinuousDisplay: """A component for displaying and managing continuous waveform data. This class handles the display and processing of continuous waveform data, including data retrieval, logging, and UI updates. Attributes: settings (SeismoLoaderSettings): Configuration settings for seismic data processing. filter_menu (ContinuousFilterMenu): Menu component for filtering waveforms. console (ConsoleDisplay): Console for logging output. """ def __init__(self, settings: SeismoLoaderSettings, filter_menu: ContinuousFilterMenu): """Initialize the ContinuousDisplay component. Args: settings (SeismoLoaderSettings): Configuration settings for seismic data processing. filter_menu (ContinuousFilterMenu): Menu component for filtering waveforms. """ self.settings = settings self.filter_menu = filter_menu self.console = ConsoleDisplay()
[docs] def process_continuous_data(self): """Process continuous waveform data in a background thread with logging. This method sets up a custom logging system, retrieves continuous waveform data, and handles any errors or cancellations during the process. Note: The method updates the session state with processing status and logs. """ # Custom stdout/stderr handler that writes to both the original streams and our queue class QueueLogger: def __init__(self, original_stream, queue): self.original_stream = original_stream self.queue = queue self.buffer = "" def write(self, text): self.original_stream.write(text) self.buffer += text if '\n' in text: lines = self.buffer.split('\n') for line in lines[:-1]: # All complete lines if line: # Skip empty lines self.queue.put(line) self.buffer = lines[-1] # Keep any partial line # Also handle case where no newline but we have content elif text and len(self.buffer) > 80: # Buffer getting long, flush it self.queue.put(self.buffer) self.buffer = "" def flush(self): self.original_stream.flush() if self.buffer: # Flush any remaining content in buffer self.queue.put(self.buffer) self.buffer = "" # Set up queue loggers original_stdout = sys.stdout original_stderr = sys.stderr sys.stdout = QueueLogger(original_stdout, log_queue) sys.stderr = QueueLogger(original_stderr, log_queue) try: # Print initial message to show logging is working print("Starting continuous waveform download process...") # Run the continuous download with stop_event for cancellation result = run_continuous(self.settings, stop_event) if result: success = True print("Download completed successfully.") else: success = False print("Download failed or was cancelled.") except Exception as e: success = False print(f"Error: {str(e)}") # This will be captured in the output finally: # Flush any remaining content sys.stdout.flush() sys.stderr.flush() # Restore original stdout/stderr sys.stdout = original_stdout sys.stderr = original_stderr st.session_state.update({ "query_done": True, "is_downloading": False, "trigger_rerun": True })
[docs] def render(self): """Render the continuous waveform display interface. This method creates the main UI for continuous waveform visualization, including: - Download controls - Status indicators - Real-time log display - Progress tracking """ st.title("Continuous Waveform Archiving") # Create three columns for the controls col1, col2 = st.columns(2) # Get Waveforms button in first column with col1: get_waveforms_button = st.button( "Download Waveforms", key="download_continuous", disabled=st.session_state.get("is_downloading", False) or not st.session_state.get("date_range_valid", True), use_container_width=True ) # Cancel Download button in second column with col2: if st.button("Cancel Download", key="cancel_continuous_download", disabled=not st.session_state.get("is_downloading", False), use_container_width=True): stop_event.set() # Signal cancellation st.warning("Cancelling download...") st.session_state.update({ "is_downloading": False, "polling_active": False, "download_cancelled": True # Add this flag to track cancellation }) st.rerun() # Download status indicator status_container = st.empty() # Show appropriate status message if get_waveforms_button: status_container.info("Starting continuous waveform download...") self.retrieve_waveforms() elif st.session_state.get("is_downloading"): st.spinner("Downloading continuous waveforms... (this may take several minutes)") # Display real-time logs in the waveform view during download log_container = st.empty() # Process any new log entries from the queue new_logs = False while not log_queue.empty(): try: log_entry = log_queue.get_nowait() if not self.console.accumulated_output: self.console.accumulated_output = [] self.console.accumulated_output.append(log_entry) new_logs = True except queue.Empty: break # Save logs to session state if updated if new_logs or self.console.accumulated_output: st.session_state["log_entries"] = self.console.accumulated_output # Display logs in the waveform view if self.console.accumulated_output: # Add the initial header line if it's not already there if not any("Running run_continuous" in line for line in self.console.accumulated_output): self.console.accumulated_output.insert(0, "Running run_continuous\n-----------------------") st.session_state["log_entries"] = self.console.accumulated_output # Initialize terminal styling self.console._init_terminal_style() escaped_content = escape('\n'.join(self.console.accumulated_output)) log_text = ( '<div class="terminal" id="log-terminal" style="max-height: 700px; background-color: black; color: #ffffff; padding: 10px; border-radius: 5px; overflow-y: auto;">' f'<pre style="margin: 0; white-space: pre; tab-size: 4; font-family: \'Courier New\', Courier, monospace; font-size: 14px; line-height: 1.4;">{escaped_content}</pre>' '</div>' '<script>' 'if (window.terminal_scroll === undefined) {' ' window.terminal_scroll = function() {' ' var terminalDiv = document.getElementById("log-terminal");' ' if (terminalDiv) {' ' terminalDiv.scrollTop = terminalDiv.scrollHeight;' ' }' ' };' '}' 'window.terminal_scroll();' '</script>' ) log_container.markdown(log_text, unsafe_allow_html=True) elif st.session_state.get("download_cancelled"): status_container.warning("Download was cancelled by user.") elif st.session_state.get("query_done"): status_container.success("Continuous data processing completed successfully!")
[docs] def retrieve_waveforms(self): """Initiate continuous waveform retrieval in a background thread. This method starts a new thread for continuous waveform data retrieval and updates the UI state accordingly. Note: The method handles thread creation, state management, and UI updates. """ stop_event.clear() # Reset cancellation flag st.session_state["query_thread"] = threading.Thread(target=self.process_continuous_data, daemon=True) st.session_state["query_thread"].start() st.session_state.update({ "is_downloading": True, "query_done": False, "polling_active": True, "download_cancelled": False # Reset cancellation flag when starting new download }) st.rerun()
[docs] class ContinuousComponents: """A component for managing continuous waveform data processing. This class coordinates the interaction between the filter menu, display, and logging components for continuous waveform data processing. Attributes: settings (SeismoLoaderSettings): Configuration settings for seismic data processing. filter_menu (ContinuousFilterMenu): Menu component for filtering waveforms. display (ContinuousDisplay): Display component for waveform visualization. console (ConsoleDisplay): Console for logging output. """ def __init__(self, settings: SeismoLoaderSettings): """Initialize the ContinuousComponents. Args: settings (SeismoLoaderSettings): Configuration settings for seismic data processing. """ self.settings = settings self.filter_menu = ContinuousFilterMenu(settings) self.display = ContinuousDisplay(settings, self.filter_menu) self.console = ConsoleDisplay() # Initialize console with logs from session state if they exist if "log_entries" in st.session_state and st.session_state["log_entries"]: self.console.accumulated_output = st.session_state["log_entries"] # Pass console to ContinuousDisplay self.display.console = self.console # Initialize session state required_states = { "is_downloading": False, "query_done": False, "polling_active": False, "query_thread": None, "trigger_rerun": False, "log_entries": [], "date_range_valid": True } for key, val in required_states.items(): if key not in st.session_state: st.session_state[key] = val
[docs] def render_polling_ui(self): """Handle UI updates while monitoring background thread status. This method processes log entries from the queue and updates the UI based on the background thread's status. Note: The method uses Streamlit's rerun mechanism to update the UI when new logs are available or when the thread status changes. """ if st.session_state.get("is_downloading", False): query_thread = st.session_state.get("query_thread") # Process any new log entries from the queue new_logs = False while not log_queue.empty(): try: log_entry = log_queue.get_nowait() if not self.console.accumulated_output: self.console.accumulated_output = [] self.console.accumulated_output.append(log_entry) new_logs = True except queue.Empty: break # Save logs to session state if updated if new_logs: st.session_state["log_entries"] = self.console.accumulated_output # Trigger rerun to update the UI with new logs st.rerun() if query_thread and not query_thread.is_alive(): try: query_thread.join() except Exception as e: st.error(f"Error in background thread: {e}") # Add error to console output if not self.console.accumulated_output: self.console.accumulated_output = [] self.console.accumulated_output.append(f"Error: {str(e)}") st.session_state["log_entries"] = self.console.accumulated_output st.session_state.update({ "is_downloading": False, "query_done": True, "query_thread": None, "polling_active": False }) st.rerun() # Always trigger a rerun while polling is active to check for new logs if st.session_state.get("polling_active"): time.sleep(0.2) # Shorter pause for more frequent updates st.rerun()
[docs] def render(self): """Render the complete continuous waveform interface. This method creates the main UI for continuous waveform processing, including: - Download and log view tabs - Filter menu in sidebar - Real-time status updates - Log display and management """ # Initialize tab selection in session state if not exists if "continuous_active_tab" not in st.session_state: st.session_state["continuous_active_tab"] = 0 # Default to download tab # Auto-switch to log tab during download if new logs are available if st.session_state.get("is_downloading", False) and log_queue.qsize() > 0: st.session_state["continuous_active_tab"] = 0 # Keep on download tab to show real-time logs # Create tabs for Download and Log views tab_names = ["📊 Download View", "📝 Log View"] download_tab, log_tab = st.tabs(tab_names) # Always render filter menu (sidebar) first self.filter_menu.render() # Handle content based on active tab with download_tab: self.display.render() # Handle polling for background thread updates self.render_polling_ui() with log_tab: # If we're switching to log tab and download is complete, # make sure all logs are transferred from queue to accumulated_output if not st.session_state.get("is_downloading", False): # Process any remaining logs in the queue while not log_queue.empty(): try: log_entry = log_queue.get_nowait() if not self.console.accumulated_output: self.console.accumulated_output = [] self.console.accumulated_output.append(log_entry) except queue.Empty: break # Save to session state if self.console.accumulated_output: st.session_state["log_entries"] = self.console.accumulated_output # Render log view st.title("Continuous Waveform Logs") if self.console.accumulated_output: # Initialize terminal styling self.console._init_terminal_style() # Display logs escaped_content = escape('\n'.join(self.console.accumulated_output)) log_text = ( '<div class="terminal" id="log-terminal">' f'<pre style="margin: 0; white-space: pre; tab-size: 4; font-family: \'Courier New\', Courier, monospace; font-size: 14px; line-height: 1.4;">{escaped_content}</pre>' '</div>' '<script>' 'if (window.terminal_scroll === undefined) {' ' window.terminal_scroll = function() {' ' var terminalDiv = document.getElementById("log-terminal");' ' if (terminalDiv) {' ' terminalDiv.scrollTop = terminalDiv.scrollHeight;' ' }' ' };' '}' 'window.terminal_scroll();' '</script>' ) st.markdown(log_text, unsafe_allow_html=True) else: st.info("No logs available. Start a download to generate logs.")