from io import StringIO
import streamlit as st
import threading
import time
from contextlib import redirect_stdout, redirect_stderr
from typing import Callable
from queue import Queue
from html import escape
[docs]
class ConsoleDisplay:
def __init__(self):
self.last_position = 0
self.accumulated_output = []
def _init_terminal_style(self):
"""Initialize terminal styling"""
st.markdown("""
<style>
/* Terminal container styling */
.terminal {
background-color: black;
color: #ffffff;
padding: 10px;
border-radius: 5px;
height: 800px;
overflow-y: auto;
white-space: pre;
tab-size: 4;
}
/* Terminal text styling */
.terminal pre,
.terminal code,
.terminal span,
.terminal div {
margin: 0;
white-space: pre-wrap;
word-wrap: break-word;
tab-size: 4;
font-family: 'Courier New', Courier, monospace !important;
font-size: 14px !important;
line-height: 1.4 !important;
}
/* Ensure consistent scrolling behavior */
.stMarkdown {
overflow-y: auto;
max-height: 800px;
}
/* Ensure consistent font size in all contexts */
.terminal * {
font-size: 14px !important;
}
</style>
""", unsafe_allow_html=True)
def _preserve_whitespace(self, text: str) -> str:
"""
Preserve leading whitespace by converting spaces to non-breaking spaces
but only at the start of each line
"""
lines = text.splitlines(True) # Keep line endings
preserved_lines = []
for line in lines:
# Count leading spaces
leading_space_count = len(line) - len(line.lstrip(' '))
if leading_space_count > 0:
# Replace leading spaces with
preserved_line = ' ' * leading_space_count + escape(line[leading_space_count:])
preserved_lines.append(preserved_line)
else:
preserved_lines.append(escape(line))
return ''.join(preserved_lines)
def _update_logs(self, output_buffer: StringIO, log_container: st.empty):
"""Update logs in terminal style"""
output_buffer.seek(self.last_position)
new_output = output_buffer.read()
if new_output:
# Add new output to accumulated output
self.accumulated_output.append(new_output)
# Preserve whitespace while escaping content
preserved_content = self._preserve_whitespace(''.join(self.accumulated_output))
# Create terminal display
log_text = (
'<div class="terminal" id="log-terminal">'
f'<pre>{preserved_content}</pre>'
'</div>'
'<script>'
'if (window.terminal_scroll === undefined) {'
' window.terminal_scroll = function() {'
' var terminalDiv = document.getElementById("log-terminal");'
' if (terminalDiv) {'
' terminalDiv.scrollTop = terminalDiv.scrollHeight;'
' }'
' };'
'}'
'window.terminal_scroll();'
'</script>'
)
# Update the display
log_container.markdown(log_text, unsafe_allow_html=True)
# Update buffer position
self.last_position = output_buffer.tell()
[docs]
def run_with_logs(self, process_func: Callable, status_message: str = "Processing...") -> tuple[bool, str]:
"""
Run a process with terminal-style logging
Args:
process_func: Function to execute
status_message: Status message to display
Returns:
tuple[bool, str]: (success status, error message if any)
"""
output_buffer = StringIO()
self.last_position = 0
self.accumulated_output = []
error_message = ""
status = st.status(status_message, expanded=True)
with status:
self._init_terminal_style()
log_container = st.empty()
log_container.markdown('<div class="terminal"></div>', unsafe_allow_html=True)
try:
with redirect_stdout(output_buffer), redirect_stderr(output_buffer):
print("Starting process...")
# Create a queue to get the return value from the thread
result_queue = Queue()
def wrapped_func():
try:
result = process_func()
result_queue.put(("success", result))
except Exception as e:
result_queue.put(("error", str(e)))
process_thread = threading.Thread(target=wrapped_func)
process_thread.start()
while process_thread.is_alive():
self._update_logs(output_buffer, log_container)
time.sleep(0.05)
process_thread.join()
self._update_logs(output_buffer, log_container)
# Get the result from the queue
status, result = result_queue.get()
if status == "error":
return False, result
return True, ""
except Exception as e:
error_message = str(e)
return False, error_message