Source code for seed_vault.ui.components.data_explorer

import streamlit as st
import pandas as pd
import time
import sqlite3
from pathlib import Path
from uuid import uuid4
from collections import deque

from streamlit_ace import st_ace

from seed_vault.models.config import SeismoLoaderSettings
from seed_vault.service.db import DatabaseManager


# @st.cache_data()
[docs] def get_queries() -> deque: return deque(maxlen=50)
[docs] def match_pk_fk(val): if not isinstance(val, (int, type(None))): raise TypeError(f'Expected type None or int, not {type(val)}, {val =}') # Assuming the function processes the value to determine primary key/foreign key status if val is None: return "Not a key" elif val == 1: return "Primary Key" elif val == 2: return "Foreign Key" else: return "Other"
[docs] def rename_duplicate_cols(data_frame: pd.DataFrame) -> None: """ for each duplicated column it will add a suffix with a number (col, col_2, col_3... ) :param data_frame: DataFrame :return: None """ new_cols = [] prev_cols = [] # previously iterated columns in for loop for col in data_frame.columns: prev_cols.append(col) count = prev_cols.count(col) if count > 1: new_cols.append(f'{col}_{count}') else: new_cols.append(col) data_frame.columns = new_cols
[docs] class DataExplorerComponent: settings: SeismoLoaderSettings db_manager: DatabaseManager queries: deque[dict] def __init__(self, settings: SeismoLoaderSettings): self.settings = settings self.db_manager = DatabaseManager(self.settings.db_path) self.queries = get_queries()
[docs] def render_schema(self): show_types = st.checkbox('Show types', value=True, help='Show data types for each column ?') schema = '' with self.db_manager.connection() as conn: cursor = conn.cursor() tables = cursor.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall() for x in tables: table = x[0] schema += f'\n\n * {table}:' cursor_table = conn.cursor() for row in cursor_table.execute(f"PRAGMA table_info('{table}')"): col_name = row[1] col_type = row[2].upper() if show_types is True else '' schema += f'\n - {col_name:<15} {col_type} \t {match_pk_fk(row[5])}' st.text('DataBase Schema:') st.text(schema)
[docs] def render_query_history(self): st.write(f'Total Queries: {len(self.queries)}') for dct in reversed(self.queries): st.markdown('---') cols = st.columns(3) # cols[0].text(dct['time']) # server time is not synchronized with the user's timezone cols[1].text(f'Exec time: {dct["exec_time_ms"]}ms') cols[2].text(f'Message: {dct["message"]}') st.markdown(f'```sql \n{dct["query"]} \n```')
[docs] def render_query(self): with st.container(): # query = st.text_area( # label='SQL Query', # value='SELECT * FROM archive_data LIMIT 10', # height=160, # key='query', # help='All queries are executed by the SQLite3 engine. Drag the bottom right corner to expand the window' # ) query = st_ace( value="SELECT * FROM archive_data LIMIT 10", language="sql", # theme="monokai", key="ace_sql_editor" ) timer_start = time.perf_counter() if query: try: has_error, message, df = self.db_manager.execute_query(query) except Exception as E: st.warning(E) else: # display dataframe and stats ms_elapsed = int((time.perf_counter() - timer_start) * 1000) cols = st.columns([1,1,2]) cols[0].text(f'Exec time: {ms_elapsed}ms') cols[1].text(f'Last Query: {time.strftime("%X")}') cols[2].text(f'Message: {message}') if has_error: st.error(message) else: st.success(message) if df is not None: if df.columns.has_duplicates: rename_duplicate_cols(df) st.dataframe(df) # save query and stats for query-history tab if len(self.queries) == 0 or (len(self.queries) > 0 and query != self.queries[-1]['query']): self.queries.append( {'time': time.strftime("%X"), 'query': query, 'exec_time_ms': ms_elapsed, 'message': message})
[docs] def render_example_queries(self): c1,c2 = st.columns([1,1]) with c1: with st.expander("SELECT", expanded=True): st.code("SELECT * FROM archive_data", language="sql") st.code("SELECT * FROM arrival_data", language="sql") st.code("SELECT * FROM archive_data LIMIT 100", language="sql") st.code("SELECT DISTINCT network, station FROM archive_data", language="sql") with c2: with st.expander("DELETE", expanded=True): st.code('DELETE FROM archive_data where network="IU" and station="DAV"', language="sql")
[docs] def render(self): st.markdown("[DB Explorer Documentation](https://auscope.github.io/seed-vault/app_db_explorer.html)") tab1, tab2, tab3, tab4 = st.tabs(['Execute SQL', 'Query History', 'DB Schema', 'Example Queries']) with tab1: self.render_query() with tab2: self.render_query_history() with tab3: self.render_schema() with tab4: self.render_example_queries()