Kade Heyborne 71c305a201
Major refactoring: v0.2.0 - Simplify to core dictation & read-aloud features
This is a comprehensive refactoring that transforms the dictation service from a
complex multi-mode application into two clean, focused features:
1. Voice dictation with system tray icon
2. On-demand read-aloud via Ctrl+middle-click

## Key Changes

### Dictation Service Enhancements
- Add GTK/AppIndicator3 system tray icon for visual status
- Remove all notification spam (dictation start/stop/status)
- Icon states: microphone-muted (OFF) → microphone-high (ON)
- Click tray icon to toggle dictation (same as Alt+D)
- Simplify ai_dictation_simple.py by removing conversation mode

### Read-Aloud Service Redesign
- Replace automatic clipboard reader with on-demand Ctrl+middle-click
- New middle_click_reader.py service
- Works anywhere: highlight text, Ctrl+middle-click to read
- Uses Edge-TTS (Christopher voice) with mpv playback
- Lock file prevents feedback with dictation service

### Conversation Mode Removed
- Delete all VLLM/conversation code (VLLMClient, ConversationManager, TTS)
- Archive 5 old implementations to archive/old_implementations/
- Remove conversation-related scripts and services
- Clean separation of concerns for future reintegration if needed

### Dependencies Cleanup
- Remove: openai, aiohttp, pyttsx3, requests (conversation deps)
- Keep: PyGObject, pynput, sounddevice, vosk, numpy, edge-tts
- Net reduction: 4 packages removed, 6 core packages retained

### Testing Improvements
- Add test_dictation_service.py (8 tests) 
- Add test_middle_click.py (11 tests) 
- Fix test_run.py to use correct model path
- Total: 19 unit tests passing
- Delete obsolete test files (test_suite, test_vllm_integration, etc.)

### Documentation
- Add CHANGES.md with complete changelog
- Add docs/MIGRATION_GUIDE.md for upgrading
- Add README.md with quick start guide
- Update docs/README.md with current features only
- Add justfile for common tasks

### New Services & Scripts
- Add middle-click-reader.service (systemd)
- Add scripts/setup-middle-click-reader.sh
- Add desktop files for autostart
- Remove toggle-conversation.sh (obsolete)

## Impact

**Code Quality**
- Net change: -6,007 lines (596 added, 6,603 deleted)
- Simpler architecture, easier maintenance
- Better test coverage (19 tests vs mixed before)
- Cleaner separation of concerns

**User Experience**
- No notification spam during dictation
- Clean visual status via tray icon
- Full control over read-aloud (no unwanted readings)
- Better performance (fewer background processes)

**Privacy**
- No conversation data stored
- No VLLM connection needed
- All processing local except Edge-TTS text

## Migration Notes

Users upgrading should:
1. Run `uv sync` to update dependencies
2. Restart dictation.service to get tray icon
3. Run scripts/setup-middle-click-reader.sh for new read-aloud
4. Remove old read-aloud.service if present

See docs/MIGRATION_GUIDE.md for details.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2025-12-10 19:11:06 -07:00

635 lines
22 KiB
Python

#!/mnt/storage/Development/dictation-service/.venv/bin/python
import os
import sys
import queue
import json
import time
import subprocess
import threading
import sounddevice as sd
from vosk import Model, KaldiRecognizer
from pynput.keyboard import Controller
import logging
import asyncio
import aiohttp
from openai import AsyncOpenAI
from enum import Enum
from dataclasses import dataclass
from typing import List, Optional, Callable
import gi
gi.require_version('Gtk', '3.0')
gi.require_version('Gdk', '3.0')
from gi.repository import Gtk, GLib, Gdk
import pyttsx3
# Setup logging
logging.basicConfig(filename='/home/universal/.gemini/tmp/428d098e581799ff7817b2001dd545f7b891975897338dd78498cc16582e004f/debug.log', level=logging.DEBUG)
# Configuration
SHARED_MODELS_DIR = os.path.expanduser("~/.shared/models/vosk-models")
MODEL_NAME = "vosk-model-en-us-0.22"
MODEL_PATH = os.path.join(SHARED_MODELS_DIR, MODEL_NAME)
SAMPLE_RATE = 16000
BLOCK_SIZE = 8000
DICTATION_LOCK_FILE = "listening.lock"
CONVERSATION_LOCK_FILE = "conversation.lock"
# VLLM Configuration
VLLM_ENDPOINT = "http://127.0.0.1:8000/v1"
VLLM_MODEL = "qwen-7b-quant"
MAX_CONVERSATION_HISTORY = 10
TTS_ENABLED = True
class AppState(Enum):
"""Application states for dictation and conversation modes"""
IDLE = "idle"
DICTATION = "dictation"
CONVERSATION = "conversation"
@dataclass
class ConversationMessage:
"""Represents a single conversation message"""
role: str # "user" or "assistant"
content: str
timestamp: float
class TTSManager:
"""Manages text-to-speech functionality"""
def __init__(self):
self.engine = None
self.enabled = TTS_ENABLED
self._init_engine()
def _init_engine(self):
"""Initialize TTS engine"""
if not self.enabled:
return
try:
self.engine = pyttsx3.init()
# Configure voice properties for more natural speech
voices = self.engine.getProperty('voices')
if voices:
# Try to find a good voice
for voice in voices:
if 'english' in voice.name.lower() or 'en_' in voice.id.lower():
self.engine.setProperty('voice', voice.id)
break
self.engine.setProperty('rate', 150) # Moderate speech rate
self.engine.setProperty('volume', 0.8)
logging.info("TTS engine initialized")
except Exception as e:
logging.error(f"Failed to initialize TTS: {e}")
self.enabled = False
def speak(self, text: str, on_start: Optional[Callable] = None, on_end: Optional[Callable] = None):
"""Speak text asynchronously"""
if not self.enabled or not self.engine or not text.strip():
return
def speak_in_thread():
try:
if on_start:
GLib.idle_add(on_start)
self.engine.say(text)
self.engine.runAndWait()
if on_end:
GLib.idle_add(on_end)
except Exception as e:
logging.error(f"TTS error: {e}")
threading.Thread(target=speak_in_thread, daemon=True).start()
class VLLMClient:
"""Client for VLLM API communication"""
def __init__(self, endpoint: str = VLLM_ENDPOINT):
self.endpoint = endpoint
self.client = AsyncOpenAI(
api_key="vllm-api-key",
base_url=endpoint
)
self._test_connection()
def _test_connection(self):
"""Test connection to VLLM endpoint"""
try:
import requests
response = requests.get(f"{self.endpoint}/models", timeout=2)
if response.status_code == 200:
logging.info(f"VLLM endpoint connected: {self.endpoint}")
else:
logging.warning(f"VLLM endpoint returned status: {response.status_code}")
except Exception as e:
logging.warning(f"VLLM endpoint test failed: {e}")
async def get_response(self, messages: List[dict]) -> str:
"""Get AI response from VLLM"""
try:
response = await self.client.chat.completions.create(
model=VLLM_MODEL,
messages=messages,
max_tokens=500,
temperature=0.7
)
return response.choices[0].message.content.strip()
except Exception as e:
logging.error(f"VLLM API error: {e}")
return "Sorry, I'm having trouble connecting right now."
class ConversationGUI:
"""Simple GUI for conversation mode"""
def __init__(self):
self.window = None
self.text_buffer = None
self.input_entry = None
self.end_call_button = None
self.is_active = False
def create_window(self):
"""Create the conversation GUI window"""
if self.window:
return
self.window = Gtk.Window(title="AI Conversation")
self.window.set_default_size(400, 300)
self.window.set_border_width(10)
# Main container
vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6)
self.window.add(vbox)
# Conversation display
scroll = Gtk.ScrolledWindow()
scroll.set_policy(Gtk.PolicyType.AUTOMATIC, Gtk.PolicyType.AUTOMATIC)
self.text_view = Gtk.TextView()
self.text_view.set_editable(False)
self.text_view.set_wrap_mode(Gtk.WrapMode.WORD)
self.text_buffer = self.text_view.get_buffer()
scroll.add(self.text_view)
vbox.pack_start(scroll, True, True, 0)
# Input area
input_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=6)
self.input_entry = Gtk.Entry()
self.input_entry.set_placeholder_text("Type your message here...")
self.input_entry.connect("key-press-event", self.on_key_press)
send_button = Gtk.Button(label="Send")
send_button.connect("clicked", self.on_send_clicked)
input_box.pack_start(self.input_entry, True, True, 0)
input_box.pack_start(send_button, False, False, 0)
vbox.pack_start(input_box, False, False, 0)
# Control buttons
button_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=6)
self.end_call_button = Gtk.Button(label="End Call")
self.end_call_button.connect("clicked", self.on_end_call)
self.end_call_button.get_style_context().add_class(Gtk.STYLE_CLASS_DESTRUCTIVE_ACTION)
button_box.pack_start(self.end_call_button, True, True, 0)
vbox.pack_start(button_box, False, False, 0)
# Window events
self.window.connect("destroy", self.on_destroy)
def show(self):
"""Show the GUI window"""
if not self.window:
self.create_window()
self.window.show_all()
self.is_active = True
self.add_message("system", "🤖 AI Conversation Started. Speak or type your message!")
def hide(self):
"""Hide the GUI window"""
if self.window:
self.window.hide()
self.is_active = False
def add_message(self, role: str, message: str):
"""Add a message to the conversation display"""
def _add_message():
if not self.text_buffer:
return
end_iter = self.text_buffer.get_end_iter()
prefix = "👤 " if role == "user" else "🤖 "
self.text_buffer.insert(end_iter, f"{prefix}{message}\n\n")
# Auto-scroll to bottom
end_iter = self.text_buffer.get_end_iter()
mark = self.text_buffer.create_mark(None, end_iter, False)
self.text_view.scroll_to_mark(mark, 0.0, False, 0.0, 0.0)
if self.is_active:
GLib.idle_add(_add_message)
def on_key_press(self, widget, event):
"""Handle key press events in input"""
if event.keyval == Gdk.KEY_Return:
self.on_send_clicked(widget)
return True
return False
def on_send_clicked(self, widget):
"""Handle send button click"""
text = self.input_entry.get_text().strip()
if text:
self.input_entry.set_text("")
# This will be handled by the conversation manager
return text
return None
def on_end_call(self, widget):
"""Handle end call button click"""
self.hide()
def on_destroy(self, widget):
"""Handle window destroy"""
self.is_active = False
self.window = None
self.text_buffer = None
class ConversationManager:
"""Manages conversation state and AI interactions with persistent context"""
def __init__(self):
self.conversation_history: List[ConversationMessage] = []
self.persistent_history_file = "conversation_history.json"
self.vllm_client = VLLMClient()
self.tts_manager = TTSManager()
self.gui = ConversationGUI()
self.is_speaking = False
self.max_history = MAX_CONVERSATION_HISTORY
self.load_persistent_history()
def load_persistent_history(self):
"""Load conversation history from persistent storage"""
try:
if os.path.exists(self.persistent_history_file):
with open(self.persistent_history_file, 'r') as f:
data = json.load(f)
for msg_data in data:
message = ConversationMessage(
msg_data['role'],
msg_data['content'],
msg_data['timestamp']
)
self.conversation_history.append(message)
logging.info(f"Loaded {len(self.conversation_history)} messages from persistent storage")
except Exception as e:
logging.error(f"Error loading conversation history: {e}")
self.conversation_history = []
def save_persistent_history(self):
"""Save conversation history to persistent storage"""
try:
data = []
for msg in self.conversation_history:
data.append({
'role': msg.role,
'content': msg.content,
'timestamp': msg.timestamp
})
with open(self.persistent_history_file, 'w') as f:
json.dump(data, f, indent=2)
logging.info("Conversation history saved")
except Exception as e:
logging.error(f"Error saving conversation history: {e}")
def add_message(self, role: str, content: str):
"""Add message to conversation history"""
message = ConversationMessage(role, content, time.time())
self.conversation_history.append(message)
# Keep history within limits
if len(self.conversation_history) > self.max_history:
self.conversation_history = self.conversation_history[-self.max_history:]
# Display in GUI
self.gui.add_message(role, content)
# Save to persistent storage
self.save_persistent_history()
logging.info(f"Added {role} message: {content[:50]}...")
def get_messages_for_api(self) -> List[dict]:
"""Get conversation history formatted for API call"""
messages = []
# Add system prompt
messages.append({
"role": "system",
"content": "You are a helpful AI assistant in a voice conversation. Be concise and natural in your responses."
})
# Add conversation history
for msg in self.conversation_history:
messages.append({
"role": msg.role,
"content": msg.content
})
return messages
async def process_user_input(self, text: str):
"""Process user input and generate AI response"""
if not text.strip():
return
# Add user message
self.add_message("user", text)
# Show GUI if not visible
if not self.gui.is_active:
self.gui.show()
# Mark as speaking to prevent audio interruption
self.is_speaking = True
try:
# Get AI response
api_messages = self.get_messages_for_api()
response = await self.vllm_client.get_response(api_messages)
# Add AI response
self.add_message("assistant", response)
# Speak response
if self.tts_manager.enabled:
def on_tts_start():
logging.info("TTS started speaking")
def on_tts_end():
self.is_speaking = False
logging.info("TTS finished speaking")
self.tts_manager.speak(response, on_tts_start, on_tts_end)
else:
self.is_speaking = False
except Exception as e:
logging.error(f"Error processing user input: {e}")
self.is_speaking = False
def start_conversation(self):
"""Start a new conversation session (maintains persistent context)"""
self.gui.show()
logging.info(f"Conversation session started with {len(self.conversation_history)} messages of context")
def end_conversation(self):
"""End the current conversation session (preserves context for next call)"""
self.gui.hide()
logging.info("Conversation session ended (context preserved for next call)")
def clear_all_history(self):
"""Clear all conversation history (for fresh start)"""
self.conversation_history.clear()
try:
if os.path.exists(self.persistent_history_file):
os.remove(self.persistent_history_file)
except Exception as e:
logging.error(f"Error removing history file: {e}")
logging.info("All conversation history cleared")
# Global State (Legacy support)
is_listening = False
keyboard = Controller()
q = queue.Queue()
last_partial_text = ""
typing_thread = None
should_type = False
# New State Management
app_state = AppState.IDLE
conversation_manager = None
# Voice Activity Detection (simple implementation)
last_audio_time = 0
speech_threshold = 0.01 # seconds of silence before considering speech ended
def send_notification(title, message, duration=2000):
"""Sends a system notification"""
try:
subprocess.run(["notify-send", "-t", str(duration), "-u", "low", title, message],
capture_output=True, check=True)
except (FileNotFoundError, subprocess.CalledProcessError):
pass
def download_model_if_needed():
"""Download model if needed"""
if not os.path.exists(MODEL_NAME):
logging.info(f"Model '{MODEL_NAME}' not found. Downloading...")
try:
subprocess.check_call(["wget", f"https://alphacephei.com/vosk/models/{MODEL_NAME}.zip"])
subprocess.check_call(["unzip", f"{MODEL_NAME}.zip"])
logging.info("Download complete.")
except Exception as e:
logging.error(f"Error downloading model: {e}")
sys.exit(1)
def audio_callback(indata, frames, time, status):
"""Enhanced audio callback with voice activity detection"""
global last_audio_time
if status:
logging.warning(status)
# Track audio activity for voice activity detection
if app_state == AppState.CONVERSATION:
audio_level = abs(indata).mean()
if audio_level > 0.01: # Simple threshold for speech detection
last_audio_time = time.currentTime
if app_state in [AppState.DICTATION, AppState.CONVERSATION]:
q.put(bytes(indata))
def process_partial_text(text):
"""Process partial text based on current mode"""
global last_partial_text
if text and text != last_partial_text:
last_partial_text = text
if app_state == AppState.DICTATION:
logging.info(f"💭 {text}")
# Show brief notification for longer partial text
if len(text) > 3:
send_notification("🎤 Speaking", text[:50] + "..." if len(text) > 50 else text, 1000)
elif app_state == AppState.CONVERSATION:
logging.info(f"💭 [Conversation] {text}")
async def process_final_text(text):
"""Process final text based on current mode"""
global last_partial_text
if not text.strip():
return
formatted = text.strip()
# Filter out spurious single words that are likely false positives
if len(formatted.split()) == 1 and formatted.lower() in ['the', 'a', 'an', 'uh', 'huh', 'um', 'hmm']:
logging.info(f"⏭️ Filtered out spurious word: {formatted}")
return
# Filter out very short results that are likely noise
if len(formatted) < 2:
logging.info(f"⏭️ Filtered out too short: {formatted}")
return
formatted = formatted[0].upper() + formatted[1:] if formatted else formatted
if app_state == AppState.DICTATION:
logging.info(f"{formatted}")
send_notification("✅ Said", formatted, 1500)
# Type the text immediately
try:
keyboard.type(formatted + " ")
logging.info(f"📝 Typed: {formatted}")
except Exception as e:
logging.error(f"Error typing: {e}")
elif app_state == AppState.CONVERSATION:
logging.info(f"✅ [Conversation] User said: {formatted}")
# Process through conversation manager
if conversation_manager and not conversation_manager.is_speaking:
await conversation_manager.process_user_input(formatted)
# Clear partial text
last_partial_text = ""
def continuous_audio_processor():
"""Enhanced background thread with conversation support"""
recognizer = None
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
while True:
current_app_state = app_state
if current_app_state != AppState.IDLE and recognizer is None:
# Initialize recognizer when we start listening
try:
model = Model(MODEL_NAME)
recognizer = KaldiRecognizer(model, SAMPLE_RATE)
logging.info("Audio processor initialized")
except Exception as e:
logging.error(f"Failed to initialize recognizer: {e}")
time.sleep(1)
continue
elif current_app_state == AppState.IDLE and recognizer is not None:
# Clean up when we stop
recognizer = None
logging.info("Audio processor cleaned up")
time.sleep(0.1)
continue
if current_app_state == AppState.IDLE:
time.sleep(0.1)
continue
# Process audio when active
try:
data = q.get(timeout=0.1)
if recognizer:
# Process partial results
if recognizer.PartialResult():
partial = json.loads(recognizer.PartialResult())
partial_text = partial.get("partial", "")
if partial_text:
process_partial_text(partial_text)
# Process final results
if recognizer.AcceptWaveform(data):
result = json.loads(recognizer.Result())
final_text = result.get("text", "")
if final_text:
# Run async processing
asyncio.run_coroutine_threadsafe(process_final_text(final_text), loop)
except queue.Empty:
continue
except Exception as e:
logging.error(f"Audio processing error: {e}")
time.sleep(0.1)
def show_streaming_feedback():
"""Show visual feedback when dictation starts"""
if app_state == AppState.DICTATION:
send_notification("🎤 Dictation Active", "Speak now - text will appear live!", 3000)
elif app_state == AppState.CONVERSATION:
send_notification("🤖 Conversation Active", "Speak to talk with AI!", 3000)
def main():
global app_state, conversation_manager
try:
logging.info("Starting enhanced AI dictation service")
# Initialize conversation manager
conversation_manager = ConversationManager()
# Model Setup
download_model_if_needed()
logging.info("Model ready")
# Start audio processing thread
audio_thread = threading.Thread(target=continuous_audio_processor, daemon=True)
audio_thread.start()
logging.info("Audio processor thread started")
logging.info("=== Enhanced AI Dictation Service Ready ===")
logging.info("Features: Dictation (Alt+D) + AI Conversation (Ctrl+Alt+D)")
# Open audio stream
with sd.RawInputStream(samplerate=SAMPLE_RATE, blocksize=BLOCK_SIZE, dtype='int16',
channels=1, callback=audio_callback):
logging.info("Audio stream opened")
while True:
# Check lock files for state changes
dictation_lock_exists = os.path.exists(DICTATION_LOCK_FILE)
conversation_lock_exists = os.path.exists(CONVERSATION_LOCK_FILE)
# Determine desired state
if conversation_lock_exists:
desired_state = AppState.CONVERSATION
elif dictation_lock_exists:
desired_state = AppState.DICTATION
else:
desired_state = AppState.IDLE
# Handle state transitions
if desired_state != app_state:
old_state = app_state
app_state = desired_state
if app_state == AppState.DICTATION:
logging.info("[Dictation] STARTED - Enhanced streaming mode")
show_streaming_feedback()
elif app_state == AppState.CONVERSATION:
logging.info("[Conversation] STARTED - AI conversation mode")
conversation_manager.start_conversation()
show_streaming_feedback()
elif old_state != AppState.IDLE:
logging.info(f"[{old_state.value.upper()}] STOPPED")
if old_state == AppState.CONVERSATION:
conversation_manager.end_conversation()
elif old_state == AppState.DICTATION:
send_notification("🛑 Dictation Stopped", "Press Alt+D to resume", 2000)
# Sleep to prevent busy waiting
time.sleep(0.05)
except KeyboardInterrupt:
logging.info("\nExiting...")
except Exception as e:
logging.error(f"Fatal error: {e}")
if __name__ == "__main__":
main()