#!/mnt/storage/Development/dictation-service/.venv/bin/python import os import sys import queue import json import time import subprocess import threading import sounddevice as sd from vosk import Model, KaldiRecognizer from pynput.keyboard import Controller import logging import asyncio import aiohttp from openai import AsyncOpenAI from enum import Enum from dataclasses import dataclass from typing import List, Optional, Callable import gi gi.require_version('Gtk', '3.0') gi.require_version('Gdk', '3.0') from gi.repository import Gtk, GLib, Gdk import pyttsx3 # Setup logging logging.basicConfig(filename='/home/universal/.gemini/tmp/428d098e581799ff7817b2001dd545f7b891975897338dd78498cc16582e004f/debug.log', level=logging.DEBUG) # Configuration SHARED_MODELS_DIR = os.path.expanduser("~/.shared/models/vosk-models") MODEL_NAME = "vosk-model-en-us-0.22" MODEL_PATH = os.path.join(SHARED_MODELS_DIR, MODEL_NAME) SAMPLE_RATE = 16000 BLOCK_SIZE = 8000 DICTATION_LOCK_FILE = "listening.lock" CONVERSATION_LOCK_FILE = "conversation.lock" # VLLM Configuration VLLM_ENDPOINT = "http://127.0.0.1:8000/v1" VLLM_MODEL = "qwen-7b-quant" MAX_CONVERSATION_HISTORY = 10 TTS_ENABLED = True class AppState(Enum): """Application states for dictation and conversation modes""" IDLE = "idle" DICTATION = "dictation" CONVERSATION = "conversation" @dataclass class ConversationMessage: """Represents a single conversation message""" role: str # "user" or "assistant" content: str timestamp: float class TTSManager: """Manages text-to-speech functionality""" def __init__(self): self.engine = None self.enabled = TTS_ENABLED self._init_engine() def _init_engine(self): """Initialize TTS engine""" if not self.enabled: return try: self.engine = pyttsx3.init() # Configure voice properties for more natural speech voices = self.engine.getProperty('voices') if voices: # Try to find a good voice for voice in voices: if 'english' in voice.name.lower() or 'en_' in voice.id.lower(): self.engine.setProperty('voice', voice.id) break self.engine.setProperty('rate', 150) # Moderate speech rate self.engine.setProperty('volume', 0.8) logging.info("TTS engine initialized") except Exception as e: logging.error(f"Failed to initialize TTS: {e}") self.enabled = False def speak(self, text: str, on_start: Optional[Callable] = None, on_end: Optional[Callable] = None): """Speak text asynchronously""" if not self.enabled or not self.engine or not text.strip(): return def speak_in_thread(): try: if on_start: GLib.idle_add(on_start) self.engine.say(text) self.engine.runAndWait() if on_end: GLib.idle_add(on_end) except Exception as e: logging.error(f"TTS error: {e}") threading.Thread(target=speak_in_thread, daemon=True).start() class VLLMClient: """Client for VLLM API communication""" def __init__(self, endpoint: str = VLLM_ENDPOINT): self.endpoint = endpoint self.client = AsyncOpenAI( api_key="vllm-api-key", base_url=endpoint ) self._test_connection() def _test_connection(self): """Test connection to VLLM endpoint""" try: import requests response = requests.get(f"{self.endpoint}/models", timeout=2) if response.status_code == 200: logging.info(f"VLLM endpoint connected: {self.endpoint}") else: logging.warning(f"VLLM endpoint returned status: {response.status_code}") except Exception as e: logging.warning(f"VLLM endpoint test failed: {e}") async def get_response(self, messages: List[dict]) -> str: """Get AI response from VLLM""" try: response = await self.client.chat.completions.create( model=VLLM_MODEL, messages=messages, max_tokens=500, temperature=0.7 ) return response.choices[0].message.content.strip() except Exception as e: logging.error(f"VLLM API error: {e}") return "Sorry, I'm having trouble connecting right now." class ConversationGUI: """Simple GUI for conversation mode""" def __init__(self): self.window = None self.text_buffer = None self.input_entry = None self.end_call_button = None self.is_active = False def create_window(self): """Create the conversation GUI window""" if self.window: return self.window = Gtk.Window(title="AI Conversation") self.window.set_default_size(400, 300) self.window.set_border_width(10) # Main container vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6) self.window.add(vbox) # Conversation display scroll = Gtk.ScrolledWindow() scroll.set_policy(Gtk.PolicyType.AUTOMATIC, Gtk.PolicyType.AUTOMATIC) self.text_view = Gtk.TextView() self.text_view.set_editable(False) self.text_view.set_wrap_mode(Gtk.WrapMode.WORD) self.text_buffer = self.text_view.get_buffer() scroll.add(self.text_view) vbox.pack_start(scroll, True, True, 0) # Input area input_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=6) self.input_entry = Gtk.Entry() self.input_entry.set_placeholder_text("Type your message here...") self.input_entry.connect("key-press-event", self.on_key_press) send_button = Gtk.Button(label="Send") send_button.connect("clicked", self.on_send_clicked) input_box.pack_start(self.input_entry, True, True, 0) input_box.pack_start(send_button, False, False, 0) vbox.pack_start(input_box, False, False, 0) # Control buttons button_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=6) self.end_call_button = Gtk.Button(label="End Call") self.end_call_button.connect("clicked", self.on_end_call) self.end_call_button.get_style_context().add_class(Gtk.STYLE_CLASS_DESTRUCTIVE_ACTION) button_box.pack_start(self.end_call_button, True, True, 0) vbox.pack_start(button_box, False, False, 0) # Window events self.window.connect("destroy", self.on_destroy) def show(self): """Show the GUI window""" if not self.window: self.create_window() self.window.show_all() self.is_active = True self.add_message("system", "🤖 AI Conversation Started. Speak or type your message!") def hide(self): """Hide the GUI window""" if self.window: self.window.hide() self.is_active = False def add_message(self, role: str, message: str): """Add a message to the conversation display""" def _add_message(): if not self.text_buffer: return end_iter = self.text_buffer.get_end_iter() prefix = "👤 " if role == "user" else "🤖 " self.text_buffer.insert(end_iter, f"{prefix}{message}\n\n") # Auto-scroll to bottom end_iter = self.text_buffer.get_end_iter() mark = self.text_buffer.create_mark(None, end_iter, False) self.text_view.scroll_to_mark(mark, 0.0, False, 0.0, 0.0) if self.is_active: GLib.idle_add(_add_message) def on_key_press(self, widget, event): """Handle key press events in input""" if event.keyval == Gdk.KEY_Return: self.on_send_clicked(widget) return True return False def on_send_clicked(self, widget): """Handle send button click""" text = self.input_entry.get_text().strip() if text: self.input_entry.set_text("") # This will be handled by the conversation manager return text return None def on_end_call(self, widget): """Handle end call button click""" self.hide() def on_destroy(self, widget): """Handle window destroy""" self.is_active = False self.window = None self.text_buffer = None class ConversationManager: """Manages conversation state and AI interactions with persistent context""" def __init__(self): self.conversation_history: List[ConversationMessage] = [] self.persistent_history_file = "conversation_history.json" self.vllm_client = VLLMClient() self.tts_manager = TTSManager() self.gui = ConversationGUI() self.is_speaking = False self.max_history = MAX_CONVERSATION_HISTORY self.load_persistent_history() def load_persistent_history(self): """Load conversation history from persistent storage""" try: if os.path.exists(self.persistent_history_file): with open(self.persistent_history_file, 'r') as f: data = json.load(f) for msg_data in data: message = ConversationMessage( msg_data['role'], msg_data['content'], msg_data['timestamp'] ) self.conversation_history.append(message) logging.info(f"Loaded {len(self.conversation_history)} messages from persistent storage") except Exception as e: logging.error(f"Error loading conversation history: {e}") self.conversation_history = [] def save_persistent_history(self): """Save conversation history to persistent storage""" try: data = [] for msg in self.conversation_history: data.append({ 'role': msg.role, 'content': msg.content, 'timestamp': msg.timestamp }) with open(self.persistent_history_file, 'w') as f: json.dump(data, f, indent=2) logging.info("Conversation history saved") except Exception as e: logging.error(f"Error saving conversation history: {e}") def add_message(self, role: str, content: str): """Add message to conversation history""" message = ConversationMessage(role, content, time.time()) self.conversation_history.append(message) # Keep history within limits if len(self.conversation_history) > self.max_history: self.conversation_history = self.conversation_history[-self.max_history:] # Display in GUI self.gui.add_message(role, content) # Save to persistent storage self.save_persistent_history() logging.info(f"Added {role} message: {content[:50]}...") def get_messages_for_api(self) -> List[dict]: """Get conversation history formatted for API call""" messages = [] # Add system prompt messages.append({ "role": "system", "content": "You are a helpful AI assistant in a voice conversation. Be concise and natural in your responses." }) # Add conversation history for msg in self.conversation_history: messages.append({ "role": msg.role, "content": msg.content }) return messages async def process_user_input(self, text: str): """Process user input and generate AI response""" if not text.strip(): return # Add user message self.add_message("user", text) # Show GUI if not visible if not self.gui.is_active: self.gui.show() # Mark as speaking to prevent audio interruption self.is_speaking = True try: # Get AI response api_messages = self.get_messages_for_api() response = await self.vllm_client.get_response(api_messages) # Add AI response self.add_message("assistant", response) # Speak response if self.tts_manager.enabled: def on_tts_start(): logging.info("TTS started speaking") def on_tts_end(): self.is_speaking = False logging.info("TTS finished speaking") self.tts_manager.speak(response, on_tts_start, on_tts_end) else: self.is_speaking = False except Exception as e: logging.error(f"Error processing user input: {e}") self.is_speaking = False def start_conversation(self): """Start a new conversation session (maintains persistent context)""" self.gui.show() logging.info(f"Conversation session started with {len(self.conversation_history)} messages of context") def end_conversation(self): """End the current conversation session (preserves context for next call)""" self.gui.hide() logging.info("Conversation session ended (context preserved for next call)") def clear_all_history(self): """Clear all conversation history (for fresh start)""" self.conversation_history.clear() try: if os.path.exists(self.persistent_history_file): os.remove(self.persistent_history_file) except Exception as e: logging.error(f"Error removing history file: {e}") logging.info("All conversation history cleared") # Global State (Legacy support) is_listening = False keyboard = Controller() q = queue.Queue() last_partial_text = "" typing_thread = None should_type = False # New State Management app_state = AppState.IDLE conversation_manager = None # Voice Activity Detection (simple implementation) last_audio_time = 0 speech_threshold = 0.01 # seconds of silence before considering speech ended def send_notification(title, message, duration=2000): """Sends a system notification""" try: subprocess.run(["notify-send", "-t", str(duration), "-u", "low", title, message], capture_output=True, check=True) except (FileNotFoundError, subprocess.CalledProcessError): pass def download_model_if_needed(): """Download model if needed""" if not os.path.exists(MODEL_NAME): logging.info(f"Model '{MODEL_NAME}' not found. Downloading...") try: subprocess.check_call(["wget", f"https://alphacephei.com/vosk/models/{MODEL_NAME}.zip"]) subprocess.check_call(["unzip", f"{MODEL_NAME}.zip"]) logging.info("Download complete.") except Exception as e: logging.error(f"Error downloading model: {e}") sys.exit(1) def audio_callback(indata, frames, time, status): """Enhanced audio callback with voice activity detection""" global last_audio_time if status: logging.warning(status) # Track audio activity for voice activity detection if app_state == AppState.CONVERSATION: audio_level = abs(indata).mean() if audio_level > 0.01: # Simple threshold for speech detection last_audio_time = time.currentTime if app_state in [AppState.DICTATION, AppState.CONVERSATION]: q.put(bytes(indata)) def process_partial_text(text): """Process partial text based on current mode""" global last_partial_text if text and text != last_partial_text: last_partial_text = text if app_state == AppState.DICTATION: logging.info(f"💭 {text}") # Show brief notification for longer partial text if len(text) > 3: send_notification("🎤 Speaking", text[:50] + "..." if len(text) > 50 else text, 1000) elif app_state == AppState.CONVERSATION: logging.info(f"💭 [Conversation] {text}") async def process_final_text(text): """Process final text based on current mode""" global last_partial_text if not text.strip(): return formatted = text.strip() # Filter out spurious single words that are likely false positives if len(formatted.split()) == 1 and formatted.lower() in ['the', 'a', 'an', 'uh', 'huh', 'um', 'hmm']: logging.info(f"⏭️ Filtered out spurious word: {formatted}") return # Filter out very short results that are likely noise if len(formatted) < 2: logging.info(f"⏭️ Filtered out too short: {formatted}") return formatted = formatted[0].upper() + formatted[1:] if formatted else formatted if app_state == AppState.DICTATION: logging.info(f"✅ {formatted}") send_notification("✅ Said", formatted, 1500) # Type the text immediately try: keyboard.type(formatted + " ") logging.info(f"📝 Typed: {formatted}") except Exception as e: logging.error(f"Error typing: {e}") elif app_state == AppState.CONVERSATION: logging.info(f"✅ [Conversation] User said: {formatted}") # Process through conversation manager if conversation_manager and not conversation_manager.is_speaking: await conversation_manager.process_user_input(formatted) # Clear partial text last_partial_text = "" def continuous_audio_processor(): """Enhanced background thread with conversation support""" recognizer = None loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) while True: current_app_state = app_state if current_app_state != AppState.IDLE and recognizer is None: # Initialize recognizer when we start listening try: model = Model(MODEL_NAME) recognizer = KaldiRecognizer(model, SAMPLE_RATE) logging.info("Audio processor initialized") except Exception as e: logging.error(f"Failed to initialize recognizer: {e}") time.sleep(1) continue elif current_app_state == AppState.IDLE and recognizer is not None: # Clean up when we stop recognizer = None logging.info("Audio processor cleaned up") time.sleep(0.1) continue if current_app_state == AppState.IDLE: time.sleep(0.1) continue # Process audio when active try: data = q.get(timeout=0.1) if recognizer: # Process partial results if recognizer.PartialResult(): partial = json.loads(recognizer.PartialResult()) partial_text = partial.get("partial", "") if partial_text: process_partial_text(partial_text) # Process final results if recognizer.AcceptWaveform(data): result = json.loads(recognizer.Result()) final_text = result.get("text", "") if final_text: # Run async processing asyncio.run_coroutine_threadsafe(process_final_text(final_text), loop) except queue.Empty: continue except Exception as e: logging.error(f"Audio processing error: {e}") time.sleep(0.1) def show_streaming_feedback(): """Show visual feedback when dictation starts""" if app_state == AppState.DICTATION: send_notification("🎤 Dictation Active", "Speak now - text will appear live!", 3000) elif app_state == AppState.CONVERSATION: send_notification("🤖 Conversation Active", "Speak to talk with AI!", 3000) def main(): global app_state, conversation_manager try: logging.info("Starting enhanced AI dictation service") # Initialize conversation manager conversation_manager = ConversationManager() # Model Setup download_model_if_needed() logging.info("Model ready") # Start audio processing thread audio_thread = threading.Thread(target=continuous_audio_processor, daemon=True) audio_thread.start() logging.info("Audio processor thread started") logging.info("=== Enhanced AI Dictation Service Ready ===") logging.info("Features: Dictation (Alt+D) + AI Conversation (Ctrl+Alt+D)") # Open audio stream with sd.RawInputStream(samplerate=SAMPLE_RATE, blocksize=BLOCK_SIZE, dtype='int16', channels=1, callback=audio_callback): logging.info("Audio stream opened") while True: # Check lock files for state changes dictation_lock_exists = os.path.exists(DICTATION_LOCK_FILE) conversation_lock_exists = os.path.exists(CONVERSATION_LOCK_FILE) # Determine desired state if conversation_lock_exists: desired_state = AppState.CONVERSATION elif dictation_lock_exists: desired_state = AppState.DICTATION else: desired_state = AppState.IDLE # Handle state transitions if desired_state != app_state: old_state = app_state app_state = desired_state if app_state == AppState.DICTATION: logging.info("[Dictation] STARTED - Enhanced streaming mode") show_streaming_feedback() elif app_state == AppState.CONVERSATION: logging.info("[Conversation] STARTED - AI conversation mode") conversation_manager.start_conversation() show_streaming_feedback() elif old_state != AppState.IDLE: logging.info(f"[{old_state.value.upper()}] STOPPED") if old_state == AppState.CONVERSATION: conversation_manager.end_conversation() elif old_state == AppState.DICTATION: send_notification("🛑 Dictation Stopped", "Press Alt+D to resume", 2000) # Sleep to prevent busy waiting time.sleep(0.05) except KeyboardInterrupt: logging.info("\nExiting...") except Exception as e: logging.error(f"Fatal error: {e}") if __name__ == "__main__": main()