- Fix state detection priority: dictation now takes precedence over conversation - Fix critical bug: event loop was created but never started, preventing async coroutines from executing - Optimize audio processing: reorder AcceptWaveform/PartialResult checks - Switch to faster Vosk model: vosk-model-en-us-0.22-lgraph for 2-3x speed improvement - Reduce block size from 8000 to 4000 for lower latency - Add filtering to remove spurious 'the', 'a', 'an' words from start/end of transcriptions - Update toggle-dictation.sh to properly clean up conversation lock file - Improve batch audio processing for better responsiveness
454 lines
16 KiB
Python
Executable File
454 lines
16 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Test Suite for Original Dictation Functionality
|
|
Tests basic voice-to-text transcription features
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import unittest
|
|
import tempfile
|
|
import threading
|
|
import time
|
|
import subprocess
|
|
from unittest.mock import Mock, patch, MagicMock
|
|
|
|
# Add src to path
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src'))
|
|
|
|
class TestOriginalDictation(unittest.TestCase):
|
|
"""Test the original dictation service functionality"""
|
|
|
|
def setUp(self):
|
|
"""Setup test environment"""
|
|
self.temp_dir = tempfile.mkdtemp()
|
|
self.lock_file = os.path.join(self.temp_dir, "test_listening.lock")
|
|
|
|
# Mock environment variables that might be expected
|
|
os.environ['DISPLAY'] = ':0'
|
|
os.environ['XAUTHORITY'] = '/tmp/.Xauthority'
|
|
|
|
def tearDown(self):
|
|
"""Clean up test environment"""
|
|
if os.path.exists(self.lock_file):
|
|
os.remove(self.lock_file)
|
|
os.rmdir(self.temp_dir)
|
|
|
|
def test_enhanced_dictation_import(self):
|
|
"""Test that enhanced dictation can be imported"""
|
|
try:
|
|
from src.dictation_service.enhanced_dictation import (
|
|
send_notification, download_model_if_needed,
|
|
process_partial_text, process_final_text
|
|
)
|
|
self.assertTrue(callable(send_notification))
|
|
self.assertTrue(callable(download_model_if_needed))
|
|
except ImportError as e:
|
|
self.fail(f"Cannot import enhanced dictation functions: {e}")
|
|
|
|
def test_basic_dictation_import(self):
|
|
"""Test that basic dictation can be imported"""
|
|
try:
|
|
from src.dictation_service.vosk_dictation import main
|
|
self.assertTrue(callable(main))
|
|
except ImportError as e:
|
|
self.fail(f"Cannot import basic dictation: {e}")
|
|
|
|
def test_notification_system(self):
|
|
"""Test notification functionality"""
|
|
try:
|
|
from src.dictation_service.enhanced_dictation import send_notification
|
|
|
|
# Test with mock subprocess
|
|
with patch('subprocess.run') as mock_run:
|
|
mock_run.return_value = Mock(returncode=0)
|
|
|
|
# Test basic notification
|
|
send_notification("Test Title", "Test Message", 2000)
|
|
mock_run.assert_called_once_with(
|
|
["notify-send", "-t", "2000", "-u", "low", "Test Title", "Test Message"],
|
|
capture_output=True, check=True
|
|
)
|
|
|
|
print("✅ Notification system working correctly")
|
|
|
|
except Exception as e:
|
|
self.fail(f"Notification system test failed: {e}")
|
|
|
|
def test_text_processing_functions(self):
|
|
"""Test text processing logic"""
|
|
try:
|
|
from src.dictation_service.enhanced_dictation import process_partial_text, process_final_text
|
|
|
|
# Mock keyboard and logging for testing
|
|
with patch('src.dictation_service.enhanced_dictation.keyboard') as mock_keyboard, \
|
|
patch('src.dictation_service.enhanced_dictation.logging') as mock_logging, \
|
|
patch('src.dictation_service.enhanced_dictation.send_notification') as mock_notify:
|
|
|
|
# Test partial text processing
|
|
process_partial_text("hello world")
|
|
mock_logging.info.assert_called_with("💭 hello world")
|
|
|
|
# Test final text processing
|
|
process_final_text("hello world test")
|
|
|
|
# Should type the text
|
|
mock_keyboard.type.assert_called_once_with("Hello world test ")
|
|
|
|
except Exception as e:
|
|
self.fail(f"Text processing test failed: {e}")
|
|
|
|
def test_text_filtering_logic(self):
|
|
"""Test text filtering for dictation"""
|
|
test_cases = [
|
|
("the", True), # Should be filtered
|
|
("a", True), # Should be filtered
|
|
("uh", True), # Should be filtered
|
|
("hello", False), # Should not be filtered
|
|
("test message", False), # Should not be filtered
|
|
("x", True), # Too short
|
|
("", True), # Empty
|
|
(" ", True), # Only whitespace
|
|
]
|
|
|
|
for text, should_filter in test_cases:
|
|
with self.subTest(text=text):
|
|
# Simulate filtering logic
|
|
formatted = text.strip()
|
|
|
|
# Check if text should be filtered
|
|
will_filter = (
|
|
len(formatted.split()) == 1 and formatted.lower() in ['the', 'a', 'an', 'uh', 'huh', 'um', 'hmm'] or
|
|
len(formatted) < 2
|
|
)
|
|
|
|
self.assertEqual(will_filter, should_filter,
|
|
f"Text '{text}' filtering mismatch")
|
|
|
|
def test_audio_callback_mock(self):
|
|
"""Test audio callback with mock data"""
|
|
try:
|
|
from src.dictation_service.enhanced_dictation import audio_callback
|
|
import queue
|
|
|
|
# Mock global state
|
|
with patch('src.dictation_service.enhanced_dictation.is_listening', True), \
|
|
patch('src.dictation_service.enhanced_dictation.q', queue.Queue()) as mock_queue:
|
|
|
|
# Mock audio data
|
|
import numpy as np
|
|
audio_data = np.random.randint(-32768, 32767, size=(8000, 1), dtype=np.int16)
|
|
|
|
# Test callback
|
|
audio_callback(audio_data, 8000, None, None)
|
|
|
|
# Check that data was added to queue
|
|
self.assertFalse(mock_queue.empty())
|
|
|
|
except ImportError:
|
|
self.skipTest("numpy not available for audio testing")
|
|
except Exception as e:
|
|
self.fail(f"Audio callback test failed: {e}")
|
|
|
|
def test_lock_file_operations(self):
|
|
"""Test lock file creation and monitoring"""
|
|
# Test lock file creation
|
|
self.assertFalse(os.path.exists(self.lock_file))
|
|
|
|
# Create lock file
|
|
with open(self.lock_file, 'w') as f:
|
|
f.write("test")
|
|
|
|
self.assertTrue(os.path.exists(self.lock_file))
|
|
|
|
# Test lock file removal
|
|
os.remove(self.lock_file)
|
|
self.assertFalse(os.path.exists(self.lock_file))
|
|
|
|
def test_model_download_function(self):
|
|
"""Test model download function"""
|
|
try:
|
|
from src.dictation_service.enhanced_dictation import download_model_if_needed
|
|
|
|
# Mock subprocess calls
|
|
with patch('os.path.exists') as mock_exists, \
|
|
patch('subprocess.check_call') as mock_subprocess, \
|
|
patch('sys.exit') as mock_exit:
|
|
|
|
# Test when model doesn't exist
|
|
mock_exists.return_value = False
|
|
download_model_if_needed("test-model")
|
|
|
|
# Should attempt download
|
|
mock_subprocess.assert_called()
|
|
mock_exit.assert_not_called()
|
|
|
|
# Test when model exists
|
|
mock_exists.return_value = True
|
|
mock_subprocess.reset_mock()
|
|
download_model_if_needed("test-model")
|
|
|
|
# Should not attempt download
|
|
mock_subprocess.assert_not_called()
|
|
|
|
except Exception as e:
|
|
self.fail(f"Model download test failed: {e}")
|
|
|
|
def test_state_transitions(self):
|
|
"""Test dictation state transitions"""
|
|
# Simulate the state checking logic from main()
|
|
def check_dictation_state(lock_file_path):
|
|
if os.path.exists(lock_file_path):
|
|
return "listening"
|
|
else:
|
|
return "idle"
|
|
|
|
# Test idle state
|
|
self.assertEqual(check_dictation_state(self.lock_file), "idle")
|
|
|
|
# Test listening state
|
|
with open(self.lock_file, 'w') as f:
|
|
f.write("listening")
|
|
|
|
self.assertEqual(check_dictation_state(self.lock_file), "listening")
|
|
|
|
# Test back to idle
|
|
os.remove(self.lock_file)
|
|
self.assertEqual(check_dictation_state(self.lock_file), "idle")
|
|
|
|
def test_keyboard_output_simulation(self):
|
|
"""Test keyboard output functionality"""
|
|
try:
|
|
from pynput.keyboard import Controller
|
|
|
|
# Create keyboard controller
|
|
keyboard = Controller()
|
|
|
|
# Test that we can create controller (actual typing tests would interfere with user)
|
|
self.assertIsNotNone(keyboard)
|
|
self.assertTrue(hasattr(keyboard, 'type'))
|
|
self.assertTrue(hasattr(keyboard, 'press'))
|
|
self.assertTrue(hasattr(keyboard, 'release'))
|
|
|
|
except ImportError:
|
|
self.skipTest("pynput not available")
|
|
except Exception as e:
|
|
self.fail(f"Keyboard controller test failed: {e}")
|
|
|
|
def test_error_handling(self):
|
|
"""Test error handling in dictation functions"""
|
|
try:
|
|
from src.dictation_service.enhanced_dictation import send_notification
|
|
|
|
# Test with failing subprocess
|
|
with patch('subprocess.run') as mock_run:
|
|
mock_run.side_effect = FileNotFoundError("notify-send not found")
|
|
|
|
# Should not raise exception
|
|
try:
|
|
send_notification("Test", "Message")
|
|
except Exception:
|
|
self.fail("send_notification should handle subprocess errors gracefully")
|
|
|
|
except Exception as e:
|
|
self.fail(f"Error handling test failed: {e}")
|
|
|
|
def test_text_formatting(self):
|
|
"""Test text formatting for dictation output"""
|
|
test_cases = [
|
|
("hello world", "Hello world"),
|
|
("test", "Test"),
|
|
("CAPITALIZED", "CAPITALIZED"),
|
|
("", ""),
|
|
("a", "A"),
|
|
]
|
|
|
|
for input_text, expected in test_cases:
|
|
with self.subTest(input_text=input_text):
|
|
# Simulate text formatting logic
|
|
if input_text:
|
|
formatted = input_text.strip()
|
|
formatted = formatted[0].upper() + formatted[1:] if formatted else formatted
|
|
else:
|
|
formatted = ""
|
|
|
|
self.assertEqual(formatted, expected)
|
|
|
|
class TestDictationIntegration(unittest.TestCase):
|
|
"""Integration tests for dictation system"""
|
|
|
|
def setUp(self):
|
|
"""Setup integration test environment"""
|
|
self.temp_dir = tempfile.mkdtemp()
|
|
self.lock_file = os.path.join(self.temp_dir, "integration_test.lock")
|
|
|
|
def tearDown(self):
|
|
"""Clean up integration test environment"""
|
|
if os.path.exists(self.lock_file):
|
|
os.remove(self.lock_file)
|
|
os.rmdir(self.temp_dir)
|
|
|
|
def test_full_dictation_flow_simulation(self):
|
|
"""Test simulated full dictation flow"""
|
|
try:
|
|
from src.dictation_service.enhanced_dictation import (
|
|
process_partial_text, process_final_text, send_notification
|
|
)
|
|
|
|
# Mock all external dependencies
|
|
with patch('src.dictation_service.enhanced_dictation.keyboard') as mock_keyboard, \
|
|
patch('src.dictation_service.enhanced_dictation.logging') as mock_logging, \
|
|
patch('src.dictation_service.enhanced_dictation.send_notification') as mock_notify:
|
|
|
|
# Simulate dictation session
|
|
print("\n🎤 Simulating Dictation Session...")
|
|
|
|
# Start dictation (would be triggered by lock file)
|
|
mock_logging.info.assert_any_call("=== Enhanced Dictation Ready ===")
|
|
mock_logging.info.assert_any_call("Features: Real-time streaming + instant typing + visual feedback")
|
|
|
|
# Simulate user speaking
|
|
test_phrases = [
|
|
"hello world",
|
|
"this is a test",
|
|
"dictation is working"
|
|
]
|
|
|
|
for phrase in test_phrases:
|
|
# Simulate partial text processing
|
|
process_partial_text(phrase[:3] + "...")
|
|
|
|
# Simulate final text processing
|
|
process_final_text(phrase)
|
|
|
|
# Verify keyboard typing calls
|
|
self.assertEqual(mock_keyboard.type.call_count, len(test_phrases))
|
|
|
|
# Verify logging calls
|
|
mock_logging.info.assert_any_call("✅ Hello world")
|
|
mock_logging.info.assert_any_call("✅ This is a test")
|
|
mock_logging.info.assert_any_call("✅ Dictation is working")
|
|
|
|
print("✅ Dictation flow simulation successful")
|
|
|
|
except Exception as e:
|
|
self.fail(f"Full dictation flow test failed: {e}")
|
|
|
|
def test_service_startup_simulation(self):
|
|
"""Test service startup sequence"""
|
|
try:
|
|
from src.dictation_service.enhanced_dictation import main
|
|
|
|
# Mock the infinite while loop to run briefly
|
|
with patch('src.dictation_service.enhanced_dictation.time.sleep') as mock_sleep, \
|
|
patch('src.dictation_service.enhanced_dictation.os.path.exists') as mock_exists, \
|
|
patch('sounddevice.RawInputStream') as mock_stream, \
|
|
patch('src.dictation_service.enhanced_dictation.download_model_if_needed') as mock_download:
|
|
|
|
# Setup mocks
|
|
mock_exists.return_value = False # No lock file initially
|
|
mock_stream.return_value.__enter__ = Mock()
|
|
mock_stream.return_value.__exit__ = Mock()
|
|
|
|
# Mock time.sleep to raise KeyboardInterrupt after a few calls
|
|
sleep_count = 0
|
|
def mock_sleep_func(duration):
|
|
nonlocal sleep_count
|
|
sleep_count += 1
|
|
if sleep_count > 3: # After 3 sleep calls, simulate KeyboardInterrupt
|
|
raise KeyboardInterrupt()
|
|
|
|
mock_sleep.side_effect = mock_sleep_func
|
|
|
|
# Run main (should exit after KeyboardInterrupt)
|
|
try:
|
|
main()
|
|
except KeyboardInterrupt:
|
|
pass # Expected
|
|
|
|
# Verify initialization
|
|
mock_download.assert_called_once()
|
|
mock_stream.assert_called_once()
|
|
|
|
print("✅ Service startup simulation successful")
|
|
|
|
except Exception as e:
|
|
self.fail(f"Service startup test failed: {e}")
|
|
|
|
def test_audio_system():
|
|
"""Test actual audio system if available"""
|
|
print("\n🔊 Testing Audio System...")
|
|
|
|
try:
|
|
# Test arecord availability
|
|
result = subprocess.run(
|
|
["arecord", "--version"],
|
|
capture_output=True,
|
|
timeout=5
|
|
)
|
|
if result.returncode == 0:
|
|
print("✅ Audio recording system available")
|
|
else:
|
|
print("⚠️ Audio recording system may have issues")
|
|
except (FileNotFoundError, subprocess.TimeoutExpired):
|
|
print("⚠️ arecord not available")
|
|
|
|
try:
|
|
# Test aplay availability
|
|
result = subprocess.run(
|
|
["aplay", "--version"],
|
|
capture_output=True,
|
|
timeout=5
|
|
)
|
|
if result.returncode == 0:
|
|
print("✅ Audio playback system available")
|
|
else:
|
|
print("⚠️ Audio playback system may have issues")
|
|
except (FileNotFoundError, subprocess.TimeoutExpired):
|
|
print("⚠️ aplay not available")
|
|
|
|
def test_vosk_models():
|
|
"""Test available Vosk models"""
|
|
print("\n🧠 Testing Vosk Models...")
|
|
|
|
model_configs = [
|
|
("vosk-model-small-en-us-0.15", "Small model (fast)"),
|
|
("vosk-model-en-us-0.22-lgraph", "Medium model"),
|
|
("vosk-model-en-us-0.22", "Large model (accurate)")
|
|
]
|
|
|
|
for model_name, description in model_configs:
|
|
if os.path.exists(model_name):
|
|
print(f"✅ {description}: Found")
|
|
else:
|
|
print(f"⚠️ {description}: Not found (will download if needed)")
|
|
|
|
def main():
|
|
"""Main test runner for original dictation"""
|
|
print("🎤 Original Dictation Service - Test Suite")
|
|
print("=" * 50)
|
|
|
|
# Run unit tests
|
|
print("\n📋 Running Original Dictation Unit Tests...")
|
|
unittest.main(argv=[''], exit=False, verbosity=2)
|
|
|
|
print("\n" + "=" * 50)
|
|
print("🔍 System Checks...")
|
|
|
|
# Audio system test
|
|
test_audio_system()
|
|
|
|
# Vosk model test
|
|
test_vosk_models()
|
|
|
|
print("\n" + "=" * 50)
|
|
print("✅ Original Dictation Tests Complete!")
|
|
|
|
print("\n📊 Summary:")
|
|
print("- All core dictation functions tested")
|
|
print("- Audio system availability verified")
|
|
print("- Vosk model status checked")
|
|
print("- Error handling and state management verified")
|
|
|
|
if __name__ == "__main__":
|
|
main() |