- Fix state detection priority: dictation now takes precedence over conversation - Fix critical bug: event loop was created but never started, preventing async coroutines from executing - Optimize audio processing: reorder AcceptWaveform/PartialResult checks - Switch to faster Vosk model: vosk-model-en-us-0.22-lgraph for 2-3x speed improvement - Reduce block size from 8000 to 4000 for lower latency - Add filtering to remove spurious 'the', 'a', 'an' words from start/end of transcriptions - Update toggle-dictation.sh to properly clean up conversation lock file - Improve batch audio processing for better responsiveness
379 lines
12 KiB
Python
379 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
End-to-End Test Suite for Dictation Service
|
|
Tests the complete dictation pipeline from keybindings to audio processing
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import time
|
|
import subprocess
|
|
import tempfile
|
|
import threading
|
|
import queue
|
|
import json
|
|
from pathlib import Path
|
|
|
|
try:
|
|
import sounddevice as sd
|
|
import numpy as np
|
|
from vosk import Model, KaldiRecognizer
|
|
|
|
AUDIO_DEPS_AVAILABLE = True
|
|
except ImportError:
|
|
AUDIO_DEPS_AVAILABLE = False
|
|
|
|
# Test configuration
|
|
TEST_DIR = Path("/mnt/storage/Development/dictation-service")
|
|
LOCK_FILES = {
|
|
"dictation": TEST_DIR / "listening.lock",
|
|
"conversation": TEST_DIR / "conversation.lock",
|
|
}
|
|
|
|
|
|
class DictationServiceTester:
|
|
def __init__(self):
|
|
self.results = []
|
|
self.errors = []
|
|
|
|
def log(self, message, level="INFO"):
|
|
"""Log test results"""
|
|
timestamp = time.strftime("%H:%M:%S")
|
|
print(f"[{timestamp}] {level}: {message}")
|
|
self.results.append(f"{level}: {message}")
|
|
|
|
def error(self, message):
|
|
"""Log errors"""
|
|
self.log(message, "ERROR")
|
|
self.errors.append(message)
|
|
|
|
def test_lock_file_operations(self):
|
|
"""Test 1: Lock file creation and removal"""
|
|
self.log("Testing lock file operations...")
|
|
|
|
# Test dictation lock
|
|
dictation_lock = LOCK_FILES["dictation"]
|
|
|
|
# Ensure clean state
|
|
if dictation_lock.exists():
|
|
dictation_lock.unlink()
|
|
|
|
# Test creation
|
|
dictation_lock.touch()
|
|
if dictation_lock.exists():
|
|
self.log("✓ Dictation lock file creation works")
|
|
else:
|
|
self.error("✗ Dictation lock file creation failed")
|
|
|
|
# Test removal
|
|
dictation_lock.unlink()
|
|
if not dictation_lock.exists():
|
|
self.log("✓ Dictation lock file removal works")
|
|
else:
|
|
self.error("✗ Dictation lock file removal failed")
|
|
|
|
# Test conversation lock
|
|
conv_lock = LOCK_FILES["conversation"]
|
|
|
|
# Ensure clean state
|
|
if conv_lock.exists():
|
|
conv_lock.unlink()
|
|
|
|
# Test creation
|
|
conv_lock.touch()
|
|
if conv_lock.exists():
|
|
self.log("✓ Conversation lock file creation works")
|
|
else:
|
|
self.error("✗ Conversation lock file creation failed")
|
|
|
|
conv_lock.unlink()
|
|
|
|
def test_toggle_scripts(self):
|
|
"""Test 2: Toggle script functionality"""
|
|
self.log("Testing toggle scripts...")
|
|
|
|
# Test dictation toggle
|
|
toggle_script = TEST_DIR / "scripts" / "toggle-dictation.sh"
|
|
|
|
# Ensure clean state
|
|
if LOCK_FILES["dictation"].exists():
|
|
LOCK_FILES["dictation"].unlink()
|
|
|
|
# Run toggle script
|
|
result = subprocess.run([str(toggle_script)], capture_output=True, text=True)
|
|
if result.returncode == 0:
|
|
self.log("✓ Dictation toggle script executed successfully")
|
|
if LOCK_FILES["dictation"].exists():
|
|
self.log("✓ Dictation lock file created by script")
|
|
else:
|
|
self.error("✗ Dictation lock file not created by script")
|
|
else:
|
|
self.error(f"✗ Dictation toggle script failed: {result.stderr}")
|
|
|
|
# Toggle again to remove lock
|
|
result = subprocess.run([str(toggle_script)], capture_output=True, text=True)
|
|
if result.returncode == 0 and not LOCK_FILES["dictation"].exists():
|
|
self.log("✓ Dictation toggle script properly removes lock file")
|
|
else:
|
|
self.error("✗ Dictation toggle script failed to remove lock file")
|
|
|
|
def test_service_status(self):
|
|
"""Test 3: Service status and responsiveness"""
|
|
self.log("Testing service status...")
|
|
|
|
# Check if dictation service is running
|
|
result = subprocess.run(
|
|
["systemctl", "--user", "is-active", "dictation.service"],
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
if result.returncode == 0 and result.stdout.strip() == "active":
|
|
self.log("✓ Dictation service is active")
|
|
else:
|
|
self.error(f"✗ Dictation service not active: {result.stdout.strip()}")
|
|
|
|
# Check keybinding listener service
|
|
result = subprocess.run(
|
|
["systemctl", "--user", "is-active", "keybinding-listener.service"],
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
if result.returncode == 0 and result.stdout.strip() == "active":
|
|
self.log("✓ Keybinding listener service is active")
|
|
else:
|
|
self.error(
|
|
f"✗ Keybinding listener service not active: {result.stdout.strip()}"
|
|
)
|
|
|
|
def test_audio_devices(self):
|
|
"""Test 4: Audio device availability"""
|
|
self.log("Testing audio devices...")
|
|
|
|
if not AUDIO_DEPS_AVAILABLE:
|
|
self.error("✗ Audio dependencies not available")
|
|
return
|
|
|
|
try:
|
|
devices = sd.query_devices()
|
|
input_devices = []
|
|
|
|
# Handle different sounddevice API versions
|
|
if isinstance(devices, list):
|
|
for i, device in enumerate(devices):
|
|
try:
|
|
if (
|
|
hasattr(device, "get")
|
|
and device.get("max_input_channels", 0) > 0
|
|
):
|
|
input_devices.append(device)
|
|
elif (
|
|
hasattr(device, "__getitem__")
|
|
and len(device) > 2
|
|
and device[2] > 0
|
|
):
|
|
input_devices.append(device)
|
|
except:
|
|
continue
|
|
|
|
if input_devices:
|
|
self.log(f"✓ Found {len(input_devices)} audio input device(s)")
|
|
try:
|
|
default_input = sd.query_devices(kind="input")
|
|
if default_input:
|
|
device_name = (
|
|
default_input.get("name", "Unknown")
|
|
if hasattr(default_input, "get")
|
|
else str(default_input)
|
|
)
|
|
self.log(f"✓ Default input device available")
|
|
else:
|
|
self.error("✗ No default input device found")
|
|
except:
|
|
self.log("✓ Audio devices found (default device check skipped)")
|
|
else:
|
|
self.error("✗ No audio input devices found")
|
|
|
|
except Exception as e:
|
|
self.error(f"✗ Audio device test failed: {e}")
|
|
|
|
def test_vosk_model(self):
|
|
"""Test 5: Vosk model loading and recognition"""
|
|
self.log("Testing Vosk model...")
|
|
|
|
if not AUDIO_DEPS_AVAILABLE:
|
|
self.error("✗ Audio dependencies not available for Vosk testing")
|
|
return
|
|
|
|
try:
|
|
model_path = (
|
|
TEST_DIR / "src" / "dictation_service" / "vosk-model-small-en-us-0.15"
|
|
)
|
|
if model_path.exists():
|
|
self.log("✓ Vosk model directory exists")
|
|
|
|
# Try to load model
|
|
model = Model(str(model_path))
|
|
self.log("✓ Vosk model loaded successfully")
|
|
|
|
# Test recognizer
|
|
rec = KaldiRecognizer(model, 16000)
|
|
self.log("✓ Vosk recognizer created successfully")
|
|
|
|
# Test with dummy audio data
|
|
dummy_audio = np.random.randint(-32768, 32767, 1600, dtype=np.int16)
|
|
if rec.AcceptWaveform(dummy_audio.tobytes()):
|
|
result = json.loads(rec.Result())
|
|
self.log(
|
|
f"✓ Vosk recognition test passed: {result.get('text', 'no text')}"
|
|
)
|
|
else:
|
|
self.log("✓ Vosk recognition accepts audio data")
|
|
|
|
else:
|
|
self.error("✗ Vosk model directory not found")
|
|
|
|
except Exception as e:
|
|
self.error(f"✗ Vosk model test failed: {e}")
|
|
|
|
def test_keybinding_simulation(self):
|
|
"""Test 6: Keybinding simulation"""
|
|
self.log("Testing keybinding simulation...")
|
|
|
|
# Test direct script execution
|
|
toggle_script = TEST_DIR / "scripts" / "toggle-dictation.sh"
|
|
|
|
# Clean state
|
|
if LOCK_FILES["dictation"].exists():
|
|
LOCK_FILES["dictation"].unlink()
|
|
|
|
# Simulate keybinding by running script
|
|
result = subprocess.run(
|
|
[str(toggle_script)],
|
|
capture_output=True,
|
|
text=True,
|
|
env={"DISPLAY": ":1", "XAUTHORITY": "/run/user/1000/gdm/Xauthority"},
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
self.log("✓ Keybinding simulation (script execution) works")
|
|
if LOCK_FILES["dictation"].exists():
|
|
self.log("✓ Lock file created via simulated keybinding")
|
|
else:
|
|
self.error("✗ Lock file not created via simulated keybinding")
|
|
else:
|
|
self.error(f"✗ Keybinding simulation failed: {result.stderr}")
|
|
|
|
def test_service_logs(self):
|
|
"""Test 7: Check service logs for errors"""
|
|
self.log("Checking service logs...")
|
|
|
|
# Check dictation service logs
|
|
result = subprocess.run(
|
|
[
|
|
"journalctl",
|
|
"--user",
|
|
"-u",
|
|
"dictation.service",
|
|
"-n",
|
|
"10",
|
|
"--no-pager",
|
|
],
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
if "error" in result.stdout.lower() or "exception" in result.stdout.lower():
|
|
self.error("✗ Errors found in dictation service logs")
|
|
self.log(f"Log excerpt: {result.stdout[-500:]}")
|
|
else:
|
|
self.log("✓ No obvious errors in dictation service logs")
|
|
|
|
# Check keybinding listener logs
|
|
result = subprocess.run(
|
|
[
|
|
"journalctl",
|
|
"--user",
|
|
"-u",
|
|
"keybinding-listener.service",
|
|
"-n",
|
|
"10",
|
|
"--no-pager",
|
|
],
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
if "error" in result.stdout.lower() or "exception" in result.stdout.lower():
|
|
self.error("✗ Errors found in keybinding listener logs")
|
|
self.log(f"Log excerpt: {result.stdout[-500:]}")
|
|
else:
|
|
self.log("✓ No obvious errors in keybinding listener logs")
|
|
|
|
def test_end_to_end_flow(self):
|
|
"""Test 8: End-to-end dictation flow"""
|
|
self.log("Testing end-to-end dictation flow...")
|
|
|
|
# This is a simplified e2e test - in a real scenario we'd need to:
|
|
# 1. Start dictation mode
|
|
# 2. Send audio data
|
|
# 3. Check if text is generated
|
|
# 4. Stop dictation mode
|
|
|
|
# For now, just test the basic flow
|
|
self.log("Note: Full e2e audio processing test requires manual testing")
|
|
self.log("Basic components tested above should enable manual e2e testing")
|
|
|
|
def run_all_tests(self):
|
|
"""Run all tests"""
|
|
self.log("Starting Dictation Service E2E Test Suite")
|
|
self.log("=" * 50)
|
|
|
|
test_methods = [
|
|
self.test_lock_file_operations,
|
|
self.test_toggle_scripts,
|
|
self.test_service_status,
|
|
self.test_audio_devices,
|
|
self.test_vosk_model,
|
|
self.test_keybinding_simulation,
|
|
self.test_service_logs,
|
|
self.test_end_to_end_flow,
|
|
]
|
|
|
|
for test_method in test_methods:
|
|
try:
|
|
test_method()
|
|
self.log("-" * 30)
|
|
except Exception as e:
|
|
self.error(f"Test {test_method.__name__} crashed: {e}")
|
|
self.log("-" * 30)
|
|
|
|
# Summary
|
|
self.log("=" * 50)
|
|
self.log("TEST SUMMARY")
|
|
self.log(f"Total tests: {len(test_methods)}")
|
|
self.log(f"Errors: {len(self.errors)}")
|
|
|
|
if self.errors:
|
|
self.log("FAILED TESTS:")
|
|
for error in self.errors:
|
|
self.log(f" - {error}")
|
|
return False
|
|
else:
|
|
self.log("ALL TESTS PASSED ✓")
|
|
return True
|
|
|
|
|
|
def main():
|
|
tester = DictationServiceTester()
|
|
success = tester.run_all_tests()
|
|
|
|
# Print full results
|
|
print("\n" + "=" * 50)
|
|
print("FULL TEST RESULTS:")
|
|
for result in tester.results:
|
|
print(result)
|
|
|
|
return 0 if success else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|