# PROJECT: A dual-manifold cognitive architecture that combines individual expertise with collective knowledge for scientific discovery through constraint optimization. # SUMMARY: The system models individual researchers' cognitive evolution (episodic→semantic→persona layers) and community knowledge as separate manifolds, then performs braided optimization to find novel research directions at their intersection using a repulsive novelty force and collective validation. # STEPS: 1. Parse researcher documents into timestamped chunks. 2. Build hybrid search with vector and BM25 indexes. 3. Distill cognitive trajectory via temporal analysis. 4. Construct persona graph with centrality measures. 5. Calculate gravity well via kernel density estimation. 6. Fetch community knowledge from OpenAlex API. 7. Compute individual resonance (alpha) scores. 8. Calculate collective feasibility (beta) scores. 9. Apply braiding formula with gated fusion. 10. Filter hallucinations and noise. 11. Generate optimal research direction P*. 12. Create linearized context for LLM prompting. 13. Coordinate multiple domain agents. 14. Validate suggestions with constraint checking. 15. Output personalized research proposals. # STRUCTURE: ``` dual-manifold-ai/ ├── config/ │ ├── __init__.py │ ├── settings.py │ └── constants.py ├── data/ │ ├── raw_documents/ │ ├── processed/ │ └── indices/ ├── src/ │ ├── episodic_memory/ │ │ ├── document_parser.py │ │ ├── hybrid_index.py │ │ └── chunk_manager.py │ ├── semantic_memory/ │ │ ├── temporal_distiller.py │ │ ├── cognitive_trajectory.py │ │ └── evolution_analyzer.py │ ├── persona_layer/ │ │ ├── knowledge_graph.py │ │ ├── centrality_calculator.py │ │ └── gravity_well.py │ ├── collective_manifold/ │ │ ├── openalex_client.py │ │ ├── community_graph.py │ │ └── wireframe_builder.py │ ├── braiding_processor/ │ │ ├── individual_resonance.py │ │ ├── collective_feasibility.py │ │ └── braiding_kernel.py │ ├── agents/ │ │ ├── coordinator.py │ │ ├── domain_agent.py │ │ └── author_agent.py │ ├── optimization/ │ │ ├── constraint_solver.py │ │ ├── novelty_repulsor.py │ │ └── goldilocks_finder.py │ └── utils/ │ ├── embeddings.py │ ├── graph_utils.py │ └── linearizer.py ├── tests/ │ ├── test_episodic.py │ ├── test_semantic.py │ └── test_braiding.py ├── api/ │ ├── server.py │ └── endpoints.py ├── notebooks/ │ ├── exploration.ipynb │ └── visualization.ipynb ├── requirements.txt ├── docker-compose.yml ├── Dockerfile └── README.md ``` # DETAILED EXPLANATION: 1. `config/settings.py`: Central configuration management for API keys and paths. 2. `config/constants.py`: Mathematical constants and thresholds for algorithms. 3. `data/raw_documents/`: Storage for researcher PDFs and text documents. 4. `data/processed/`: Cleaned and timestamped document chunks. 5. `data/indices/`: Persistent search indexes for fast retrieval. 6. `src/episodic_memory/document_parser.py`: Extracts text with metadata and timestamps. 7. `src/episodic_memory/hybrid_index.py`: Combines dense vectors with sparse BM25. 8. `src/episodic_memory/chunk_manager.py`: Creates semantic chunks with IDs. 9. `src/semantic_memory/temporal_distiller.py`: Analyzes evolution using LLM. 10. `src/semantic_memory/cognitive_trajectory.py`: Builds time series of concepts. 11. `src/semantic_memory/evolution_analyzer.py`: Detects shifts in research focus. 12. `src/persona_layer/knowledge_graph.py`: Constructs weighted graph from concepts. 13. `src/persona_layer/centrality_calculator.py`: Computes node importance metrics. 14. `src/persona_layer/gravity_well.py`: Creates kernel density estimation field. 15. `src/collective_manifold/openalex_client.py`: Fetches community publications. 16. `src/collective_manifold/community_graph.py`: Builds domain knowledge networks. 17. `src/collective_manifold/wireframe_builder.py`: Creates manifold estimation points. 18. `src/braiding_processor/individual_resonance.py`: Calculates alpha scores. 19. `src/braiding_processor/collective_feasibility.py`: Computes beta scores. 20. `src/braiding_processor/braiding_kernel.py`: Implements gated fusion formula. 21. `src/agents/coordinator.py`: Orchestrates multi-agent interactions. 22. `src/agents/domain_agent.py`: Specializes in specific scientific domains. 23. `src/agents/author_agent.py`: Models individual researcher persona. 24. `src/optimization/constraint_solver.py`: Solves dual constraint optimization. 25. `src/optimization/novelty_repulsor.py`: Implements repulsive force logic. 26. `src/optimization/goldilocks_finder.py`: Locates optimal intersection zones. 27. `src/utils/embeddings.py`: Handles text vectorization operations. 28. `src/utils/graph_utils.py`: Provides graph algorithms and traversals. 29. `src/utils/linearizer.py`: Converts complex structures to LLM prompts. 30. `tests/test_episodic.py`: Validates document parsing and indexing. 31. `tests/test_semantic.py`: Tests cognitive trajectory analysis. 32. `tests/test_braiding.py`: Verifies braiding algorithm correctness. 33. `api/server.py`: FastAPI server for system interaction. 34. `api/endpoints.py`: REST endpoints for research suggestions. 35. `notebooks/exploration.ipynb`: Interactive system exploration. 36. `notebooks/visualization.ipynb`: Gravity well and graph visualization. 37. `requirements.txt`: Python dependencies and versions. 38. `docker-compose.yml`: Service orchestration for deployment. 39. `Dockerfile`: Containerization configuration. 40. `README.md`: Comprehensive setup and usage guide. # CODE: ## config/settings.py ```python """ Central configuration for the dual-manifold cognitive architecture. Manages API keys, file paths, and system parameters. """ import os from typing import Dict, Any from dataclasses import dataclass, field from dotenv import load_dotenv load_dotenv() @dataclass class Settings: """System configuration settings.""" # API Keys OPENAI_API_KEY: str = os.getenv("OPENAI_API_KEY", "") OPENALEX_API_KEY: str = os.getenv("OPENALEX_API_KEY", "") HUGGINGFACE_TOKEN: str = os.getenv("HUGGINGFACE_TOKEN", "") # Paths DATA_DIR: str = os.getenv("DATA_DIR", "./data") RAW_DOCS_DIR: str = os.path.join(DATA_DIR, "raw_documents") PROCESSED_DIR: str = os.path.join(DATA_DIR, "processed") INDICES_DIR: str = os.path.join(DATA_DIR, "indices") LOGS_DIR: str = os.getenv("LOGS_DIR", "./logs") # Model configurations EMBEDDING_MODEL: str = "sentence-transformers/all-MiniLM-L6-v2" LLM_MODEL: str = "gpt-4-turbo-preview" CHUNK_SIZE: int = 1000 CHUNK_OVERLAP: int = 200 # Search parameters HYBRID_SEARCH_WEIGHT: float = 0.5 # Balance between dense and sparse TOP_K_RESULTS: int = 10 RECIPROCAL_RANK_K: int = 60 # Graph parameters CENTRALITY_MEASURE: str = "pagerank" MIN_EDGE_WEIGHT: float = 0.1 MAX_GRAPH_NODES: int = 1000 # Braiding parameters ALPHA_WEIGHT: float = 0.4 # Individual resonance BETA_WEIGHT: float = 0.4 # Collective feasibility GAMMA: float = 0.2 # Interaction term NOVELTY_THRESHOLD: float = 0.7 # Server settings API_HOST: str = "0.0.0.0" API_PORT: int = 8000 DEBUG_MODE: bool = os.getenv("DEBUG", "False").lower() == "true" # Cache settings CACHE_TTL: int = 3600 # 1 hour ENABLE_CACHE: bool = True def validate(self) -> None: """Validate configuration settings.""" required_keys = ["OPENAI_API_KEY", "OPENALEX_API_KEY"] missing = [key for key in required_keys if not getattr(self, key)] if missing: raise ValueError(f"Missing required environment variables: {missing}") # Create directories if they don't exist for dir_path in [self.DATA_DIR, self.RAW_DOCS_DIR, self.PROCESSED_DIR, self.INDICES_DIR, self.LOGS_DIR]: os.makedirs(dir_path, exist_ok=True) def to_dict(self) -> Dict[str, Any]: """Convert settings to dictionary.""" return {k: v for k, v in self.__dict__.items() if not k.startswith('_')} # Global settings instance settings = Settings() ``` ## src/episodic_memory/document_parser.py ```python """ Document parsing module for episodic memory layer. Extracts text with metadata, timestamps, and creates semantic chunks. """ import os import re from datetime import datetime from typing import List, Dict, Any, Optional, Tuple from dataclasses import dataclass, field import hashlib from pathlib import Path import pdfplumber from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.schema import Document as LangchainDocument @dataclass class DocumentChunk: """Represents a semantically coherent chunk of text.""" id: str text: str source_file: str chunk_index: int timestamp: datetime metadata: Dict[str, Any] = field(default_factory=dict) embeddings: Optional[List[float]] = None def to_dict(self) -> Dict[str, Any]: """Convert chunk to dictionary for storage.""" return { "id": self.id, "text": self.text, "source_file": self.source_file, "chunk_index": self.chunk_index, "timestamp": self.timestamp.isoformat(), "metadata": self.metadata } class DocumentParser: """Parses documents into timestamped chunks with metadata.""" def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200): """Initialize parser with chunking parameters.""" self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap self.text_splitter = RecursiveCharacterTextSplitter( chunk_size=chunk_size, chunk_overlap=chunk_overlap, length_function=len, separators=["\n\n", "\n", ". ", " ", ""] ) def extract_text_from_pdf(self, pdf_path: str) -> Tuple[str, Dict[str, Any]]: """Extract text from PDF file with metadata.""" text_parts = [] metadata = { "file_name": os.path.basename(pdf_path), "file_size": os.path.getsize(pdf_path), "page_count": 0, "extraction_date": datetime.now().isoformat() } try: with pdfplumber.open(pdf_path) as pdf: metadata["page_count"] = len(pdf.pages) for page_num, page in enumerate(pdf.pages): page_text = page.extract_text() if page_text: text_parts.append(f"Page {page_num + 1}:\n{page_text}") # Try to extract creation date from metadata if pdf.metadata: if 'CreationDate' in pdf.metadata: metadata["creation_date"] = pdf.metadata['CreationDate'] if 'Title' in pdf.metadata: metadata["title"] = pdf.metadata['Title'] if 'Author' in pdf.metadata: metadata["author"] = pdf.metadata['Author'] except Exception as e: raise ValueError(f"Failed to parse PDF {pdf_path}: {str(e)}") return "\n\n".join(text_parts), metadata def extract_text_from_txt(self, txt_path: str) -> Tuple[str, Dict[str, Any]]: """Extract text from plain text file.""" try: with open(txt_path, 'r', encoding='utf-8') as f: text = f.read() except UnicodeDecodeError: with open(txt_path, 'r', encoding='latin-1') as f: text = f.read() metadata = { "file_name": os.path.basename(txt_path), "file_size": os.path.getsize(txt_path), "extraction_date": datetime.now().isoformat() } return text, metadata def extract_timestamp(self, file_path: str, metadata: Dict[str, Any]) -> datetime: """Extract timestamp from file and metadata.""" # First try metadata if "creation_date" in metadata: try: # Handle PDF creation date format: D:20250101120000 date_str = metadata["creation_date"] if date_str.startswith("D:"): date_str = date_str[2:] return datetime.strm%d%H%M%S")str[:14], "%Y% except: pass # Try file modification time file_mtime = os.path.getmtime(file_path) return datetime.fromtimestamp(file_mtime) def create_chunk_id(self, source_file: str, chunk_index: int, text: str) -> str: """Create unique ID for chunk.""" content_hash = hashlib.md5(text.encode()).hexdigest()[:8] file_hash = hashlib.md5(source_file.encode()).hexdigest()[:8] return f"chunk_{file_hash}_{chunk_index}_{content_hash}" def parse_document(self, file_path: str) -> List[DocumentChunk]: """Parse a document into timestamped chunks.""" # Determine file type and extract text file_ext = os.path.splitext(file_path)[1].lower() if file_ext == '.pdf': text, metadata = self.extract_text_from_pdf(file_path) elif file_ext in ['.txt', '.md', '.csv']: text, metadata = self.extract_text_from_txt(file_path) else: raise ValueError(f"Unsupported file format: {file_ext}") # Extract timestamp timestamp = self.extract_timestamp(file_path, metadata) # Split into chunks langchain_docs = self.text_splitter.create_documents([text]) # Convert to our chunk format chunks = [] for idx, doc in enumerate(langchain_docs): chunk_id = self.create_chunk_id(file_path, idx, doc.page_content) chunk_metadata = metadata.copy() chunk_metadata.update({ "chunk_size": len(doc.page_content), "word_count": len(doc.page_content.split()) }) chunk = DocumentChunk( id=chunk_id, text=doc.page_content, source_file=file_path, chunk_index=idx, timestamp=timestamp, metadata=chunk_metadata ) chunks.append(chunk) return chunks def parse_directory(self, directory_path: str) -> List[DocumentChunk]: """Parse all documents in a directory.""" all_chunks = [] supported_extensions = ['.pdf', '.txt', '.md', '.csv'] for root, _, files in os.walk(directory_path): for file in files: file_ext = os.path.splitext(file)[1].lower() if file_ext in supported_extensions: file_path = os.path.join(root, file) try: chunks = self.parse_document(file_path) all_chunks.extend(chunks) print(f"Parsed {file_path}: {len(chunks)} chunks") except Exception as e: print(f"Error parsing {file_path}: {str(e)}") # Sort chunks by timestamp all_chunks.sort(key=lambda x: x.timestamp) return all_chunks ``` ## src/episodic_memory/hybrid_index.py ```python """ Hybrid search index combining dense vector embeddings and sparse BM25. Implements reciprocal rank fusion for result merging. """ import json import pickle from typing import List, Dict, Any, Tuple, Optional from pathlib import Path import numpy as np from rank_bm25 import BM25Okapi from sentence_transformers import SentenceTransformer import faiss class HybridIndex: """Combines dense vector index and sparse BM25 index for hybrid search.""" def __init__(self, embedding_model_name: str = "sentence-transformers/all-MiniLM-L6-v2"): """Initialize hybrid index with embedding model.""" self.embedding_model = SentenceTransformer(embedding_model_name) self.bm25_index: Optional[BM25Okapi] = None self.vector_index: Optional[faiss.IndexFlatIP] = None self.chunks: List[Dict[str, Any]] = [] self.tokenized_corpus: List[List[str]] = [] def create_tokenized_corpus(self, chunks: List[Dict[str, Any]]) -> List[List[str]]: """Tokenize text for BM25 indexing.""" tokenized = [] for chunk in chunks: # Simple tokenization - split by whitespace and lowercase tokens = chunk["text"].lower().split() # Remove very short tokens tokens = [t for t in tokens if len(t) > 2] tokenized.append(tokens) return tokenized def build_indexes(self, chunks: List[Dict[str, Any]]) -> None: """Build both dense and sparse indexes from chunks.""" self.chunks = chunks print(f"Building indexes for {len(chunks)} chunks...") # Build BM25 index print("Building BM25 index...") self.tokenized_corpus = self.create_tokenized_corpus(chunks) self.bm25_index = BM25Okapi(self.tokenized_corpus) # Build dense vector index print("Building dense vector index...") texts = [chunk["text"] for chunk in chunks] embeddings = self.embedding_model.encode(texts, show_progress_bar=True) embeddings = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True) # Initialize FAISS index dimension = embeddings.shape[1] self.vector_index = faiss.IndexFlatIP(dimension) self.vector_index.add(embeddings.astype('float32')) print("Indexes built successfully!") def dense_search(self, query: str, k: int = 10) -> List[Tuple[int, float]]: """Search using dense vector similarity.""" if self.vector_index is None: raise ValueError("Vector index not built. Call build_indexes first.") # Encode query query_embedding = self.embedding_model.encode([query])[0] query_embedding = query_embedding / np.linalg.norm(query_embedding) query_embedding = query_embedding.reshape(1, -1).astype('float32') # Search distances, indices = self.vector_index.search(query_embedding, k) # Convert to list of (index, score) results = [] for idx, dist in zip(indices[0], distances[0]): if idx != -1: # FAISS returns -1 for missing results results.append((int(idx), float(dist))) return results def sparse_search(self, query: str, k: int = 10) -> List[Tuple[int, float]]: """Search using BM25.""" if self.bm25_index is None: raise ValueError("BM25 index not built. Call build_indexes first.") # Tokenize query query_tokens = query.lower().split() query_tokens = [t for t in query_tokens if len(t) > 2] # Get scores scores = self.bm25_index.get_scores(query_tokens) # Get top k results top_indices = np.argsort(scores)[::-1][:k] # Convert to list of (index, score) results = [] for idx in top_indices: if scores[idx] > 0: # Only include positive scores results.append((int(idx), float(scores[idx]))) return results def reciprocal_rank_fusion(self, ranked_lists: List[List[Tuple[int, float]]], k: int = 60) -> List[Tuple[int, float]]: """Combine multiple ranked lists using reciprocal rank fusion.""" fused_scores = {} for rank_list in ranked_lists: for rank, (doc_id, _) in enumerate(rank_list): if doc_id not in fused_scores: fused_scores[doc_id] = 0.0 # RRF formula: 1 / (k + rank) fused_scores[doc_id] += 1.0 / (k + rank + 1) # Sort by fused score sorted_results = sorted(fused_scores.items(), key=lambda x: x[1], reverse=True) return [(doc_id, score) for doc_id, score in sorted_results] def hybrid_search(self, query: str, top_k: int = 10, dense_weight: float = 0.5, sparse_weight: float = 0.5) -> List[Dict[str, Any]]: """Perform hybrid search combining dense and sparse results.""" # Get results from both indexes dense_results = self.dense_search(query, k=top_k*2) sparse_results = self.sparse_search(query, k=top_k*2) # Normalize scores if dense_results: max_dense = max(score for _, score in dense_results) dense_results = [(idx, score/max_dense if max_dense > 0 else 0) for idx, score in dense_results] if sparse_results: max_sparse = max(score for _, score in sparse_results) sparse_results = [(idx, score/max_sparse if max_sparse > 0 else 0) for idx, score in sparse_results] # Apply weights weighted_dense = [(idx, score * dense_weight) for idx, score in dense_results] weighted_sparse = [(idx, score * sparse_weight) for idx, score in sparse_results] # Combine using reciprocal rank fusion fused_results = self.reciprocal_rank_fusion([weighted_dense, weighted_sparse]) # Get top k results top_results = fused_results[:top_k] # Format results with chunk information formatted_results = [] for doc_id, score in top_results: if doc_id < len(self.chunks): result = self.chunks[doc_id].copy() result["score"] = score result["chunk_id"] = result.get("id", f"chunk_{doc_id}") formatted_results.append(result) return formatted_results def save_indexes(self, save_dir: str) -> None: """Save indexes to disk.""" save_path = Path(save_dir) save_path.mkdir(parents=True, exist_ok=True) # Save chunks with open(save_path / "chunks.json", "w") as f: json.dump(self.chunks, f, indent=2, default=str) # Save BM25 index if self.bm25_index: with open(save_path / "bm25_index.pkl", "wb") as f: pickle.dump({ "bm25": self.bm25_index, "tokenized_corpus": self.tokenized_corpus }, f) # Save FAISS index if self.vector_index: faiss.write_index(self.vector_index, str(save_path / "vector_index.faiss")) print(f"Indexes saved to {save_dir}") def load_indexes(self, load_dir: str) -> None: """Load indexes from disk.""" load_path = Path(load_dir) # Load chunks with open(load_path / "chunks.json", "r") as f: self.chunks = json.load(f) # Load BM25 index bm25_path = load_path / "bm25_index.pkl" if bm25_path.exists(): with open(bm25_path, "rb") as f: bm25_data = pickle.load(f) self.bm25_index = bm25_data["bm25"] self.tokenized_corpus = bm25_data["tokenized_corpus"] # Load FAISS index faiss_path = load_path / "vector_index.faiss" if faiss_path.exists(): self.vector_index = faiss.read_index(str(faiss_path)) print(f"Indexes loaded from {load_dir}") ``` ## src/semantic_memory/temporal_distiller.py ```python """ Temporal distillation module for semantic memory layer. Analyzes cognitive evolution and extracts research trajectories. """ from typing import List, Dict, Any, Optional, Tuple from datetime import datetime, timedelta import statistics from collections import defaultdict import numpy as np from dataclasses import dataclass, field from langchain.chat_models import ChatOpenAI from langchain.prompts import ChatPromptTemplate from langchain.output_parsers import PydanticOutputParser from pydantic import BaseModel, Field @dataclass class TemporalConcept: """Represents a concept with temporal evolution data.""" name: str occurrences: List[datetime] = field(default_factory=list) contexts: List[str] = field(default_factory=list) strength: float = 0.0 # Frequency normalized by time trend: float = 0.0 # Positive = increasing, Negative = decreasing def add_occurrence(self, timestamp: datetime, context: str) -> None: """Add a new occurrence of this concept.""" self.occurrences.append(timestamp) self.contexts.append(context) self._update_stats() def _update_stats(self) -> None: """Update statistical measures.""" if len(self.occurrences) < 2: self.strength = len(self.occurrences) self.trend = 0 return # Sort occurrences sorted_occurrences = sorted(self.occurrences) # Calculate strength (frequency normalized by recency) total_days = (sorted_occurrences[-1] - sorted_occurrences[0]).days + 1 self.strength = len(self.occurrences) / max(1, total_days / 30) # Per month # Calculate trend (linear regression slope) if len(sorted_occurrences) >= 3: # Convert dates to numeric values (days since first occurrence) first_date = sorted_occurrences[0] x = np.array([(d - first_date).days for d in sorted_occurrences]) y = np.arange(len(x)) # Cumulative count # Simple linear regression if len(set(x)) > 1: # Need at least 2 unique x values slope, _ = np.polyfit(x, y, 1) self.trend = slope @dataclass class ResearchTrajectory: """Represents a researcher's cognitive trajectory over time.""" researcher_id: str time_periods: Dict[str, List[Dict[str, Any]]] = field(default_factory=dict) # Monthly buckets concepts: Dict[str, TemporalConcept] = field(default_factory=dict) focus_shifts: List[Dict[str, Any]] = field(default_factory=list) methodology_changes: List[Dict[str, Any]] = field(default_factory=list) def add_chunk(self, chunk: Dict[str, Any], extracted_concepts: List[str]) -> None: """Add a document chunk to the trajectory.""" timestamp = datetime.fromisoformat(chunk["timestamp"]) if isinstance(chunk["timestamp"], str) else chunk["timestamp"] # Add to time period bucket (monthly) period_key = timestamp.strftime("%Y-%m") if period_key not in self.time_periods: self.time_periods[period_key] = [] self.time_periods[period_key].append({ "chunk_id": chunk["id"], "text": chunk["text"], "concepts": extracted_concepts, "timestamp": timestamp.isoformat() }) # Update concept occurrences for concept in extracted_concepts: if concept not in self.concepts: self.concepts[concept] = TemporalConcept(name=concept) self.concepts[concept].add_occurrence(timestamp, chunk["text"][:200]) # First 200 chars as context class ConceptEvolutionAnalyzer: """Analyzes how concepts evolve over time in research documents.""" def __init__(self, llm_model: str = "gpt-4-turbo-preview"): """Initialize analyzer with LLM for concept extraction.""" self.llm = ChatOpenAI(model=llm_model, temperature=0.1) self.concept_cache = {} # Cache for concept extraction def extract_concepts(self, text: str, max_concepts: int = 10) -> List[str]: """Extract key concepts from text using LLM.""" # Check cache first cache_key = hash(text) if cache_key in self.concept_cache: return self.concept_cache[cache_key] prompt = ChatPromptTemplate.from_messages([ ("system", """You are a scientific concept extractor. Extract the key technical concepts, methodologies, and research topics from the following text. Return only the concepts as a comma-separated list. Be precise with technical terminology."""), ("human", "Text: {text}") ]) chain = prompt | self.llm response = chain.invoke({"text": text[:3000]}) # Limit text length # Parse response concepts = [c.strip() for c in response.content.split(",")] concepts = [c for c in concepts if c and len(c) > 2][:max_concepts] # Cache result self.concept_cache[cache_key] = concepts return concepts def analyze_trajectory(self, chunks: List[Dict[str, Any]], researcher_id: str = "default") -> ResearchTrajectory: """Analyze cognitive trajectory from document chunks.""" trajectory = ResearchTrajectory(researcher_id=researcher_id) print(f"Analyzing trajectory for {len(chunks)} chunks...") # Process chunks in chronological order sorted_chunks = sorted(chunks, key=lambda x: x["timestamp"]) for i, chunk in enumerate(sorted_chunks): if i % 10 == 0: print(f"Processed {i}/{len(sorted_chunks)} chunks...") # Extract concepts concepts = self.extract_concepts(chunk["text"]) # Add to trajectory trajectory.add_chunk(chunk, concepts) # Analyze focus shifts self._detect_focus_shifts(trajectory) # Analyze methodology changes self._detect_methodology_changes(trajectory) return trajectory def _detect_focus_shifts(self, trajectory: ResearchTrajectory) -> None: """Detect significant shifts in research focus.""" if len(trajectory.concepts) < 2: return # Get concepts sorted by occurrence count sorted_concepts = sorted( trajectory.concepts.items(), key=lambda x: len(x[1].occurrences), reverse=True ) # Analyze temporal patterns periods = sorted(trajectory.time_periods.keys()) if len(periods) < 3: return # Look for concepts that appear/disappear concept_period_presence = {} for concept_name, concept in trajectory.concepts.items(): periods_with_concept = set() for occurrence in concept.occurrences: period_key = occurrence.strftime("%Y-%m") periods_with_concept.add(period_key) concept_period_presence[concept_name] = periods_with_concept # Detect shifts (concept appears or disappears significantly) for i in range(1, len(periods)): current_period = periods[i] prev_period = periods[i-1] # Concepts that appeared in current period but not previous new_concepts = [] for concept_name, presence in concept_period_presence.items(): if current_period in presence and prev_period not in presence: # Check if this is a significant new focus concept = trajectory.concepts[concept_name] if concept.strength > 0.5: # Threshold new_concepts.append(concept_name) if new_concepts: trajectory.focus_shifts.append({ "period": current_period, "type": "new_focus", "concepts": new_concepts, "description": f"Started focusing on {', '.join(new_concepts[:3])}" }) def _detect_methodology_changes(self, trajectory: ResearchTrajectory) -> None: """Detect changes in research methodology.""" methodology_keywords = { "experimental", "theoretical", "computational", "simulation", "analysis", "modeling", "framework", "algorithm", "protocol", "statistical", "qualitative", "quantitative", "case_study", "survey", "interview", "observation", "longitudinal" } periods = sorted(trajectory.time_periods.keys()) for period in periods: period_chunks = trajectory.time_periods[period] period_text = " ".join([c["text"] for c in period_chunks]) period_text_lower = period_text.lower() methodologies = [] for method in methodology_keywords: if method in period_text_lower: methodologies.append(method) if methodologies: trajectory.methodology_changes.append({ "period": period, "methodologies": methodologies, "count": len(methodologies) }) def generate_trajectory_summary(self, trajectory: ResearchTrajectory) -> Dict[str, Any]: """Generate a summary of the research trajectory.""" # Get top concepts top_concepts = sorted( trajectory.concepts.items(), key=lambda x: x[1].strength, reverse=True )[:10] # Calculate trajectory metrics total_periods = len(trajectory.time_periods) concept_diversity = len(trajectory.concepts) focus_shifts_count = len(trajectory.focus_shifts) summary = { "researcher_id": trajectory.researcher_id, "time_span": { "start": min(trajectory.time_periods.keys()), "end": max(trajectory.time_periods.keys()), "total_periods": total_periods }, "concept_analysis": { "total_concepts": concept_diversity, "top_concepts": [ { "name": name, "strength": round(concept.strength, 2), "trend": round(concept.trend, 3), "occurrences": len(concept.occurrences) } for name, concept in top_concepts ] }, "dynamics": { "focus_shifts": trajectory.focus_shifts, "methodology_changes": trajectory.methodology_changes, "total_shifts": focus_shifts_count }, "trajectory_score": round( (concept_diversity * 0.3 + focus_shifts_count * 0.4 + total_periods * 0.3) / max(1, total_periods), 2 ) } return summary ``` ## src/persona_layer/knowledge_graph.py ```python """ Knowledge graph construction for persona layer. Builds weighted graph from temporal concepts with centrality measures. """ from typing import List, Dict, Any, Optional, Tuple, Set from dataclasses import dataclass, field import networkx as nx import numpy as np from collections import defaultdict @dataclass class GraphNode: """Represents a node in the knowledge graph.""" id: str name: str type: str # "concept", "methodology", "topic" weight: float = 1.0 centrality: float = 0.0 metadata: Dict[str, Any] = field(default_factory=dict) def to_dict(self) -> Dict[str, Any]: """Convert node to dictionary.""" return { "id": self.id, "name": self.name, "type": self.type, "weight": self.weight, "centrality": self.centrality, "metadata": self.metadata } @dataclass class GraphEdge: """Represents an edge in the knowledge graph.""" source: str target: str weight: float = 1.0 relation_type: str = "related_to" co_occurrence_count: int = 0 def to_dict(self) -> Dict[str, Any]: """Convert edge to dictionary.""" return { "source": self.source, "target": self.target, "weight": self.weight, "relation_type": self.relation_type, "co_occurrence_count": self.co_occurrence_count } class KnowledgeGraphBuilder: """Builds and manages the persona knowledge graph.""" def __init__(self): """Initialize graph builder.""" self.graph = nx.Graph() self.nodes: Dict[str, GraphNode] = {} self.edges: Dict[Tuple[str, str], GraphEdge] = {} self.node_counter = 0 def build_from_trajectory(self, trajectory_summary: Dict[str, Any]) -> None: """Build knowledge graph from research trajectory.""" print("Building knowledge graph from trajectory...") # Add concept nodes for concept_data in trajectory_summary["concept_analysis"]["top_concepts"]: node_id = f"concept_{concept_data['name'].replace(' ', '_').lower()}" node = GraphNode( id=node_id, name=concept_data["name"], type="concept", weight=concept_data["strength"], metadata={ "trend": concept_data["trend"], "occurrences": concept_data["occurrences"] } ) self.add_node(node) # Add methodology nodes from trajectory for method_change in trajectory_summary["dynamics"]["methodology_changes"]: for method in method_change["methodologies"]: node_id = f"method_{method}" if node_id not in self.nodes: node = GraphNode( id=node_id, name=method, type="methodology", weight=method_change["count"] / len(trajectory_summary["dynamics"]["methodology_changes"]) ) self.add_node(node) # Create edges based on co-occurrence in focus shifts self._create_edges_from_shifts(trajectory_summary["dynamics"]["focus_shifts"]) # Calculate centrality measures self.calculate_centrality() print(f"Graph built with {len(self.nodes)} nodes and {len(self.edges)} edges") def _create_edges_from_shifts(self, focus_shifts: List[Dict[str, Any]]) -> None: """Create edges between concepts that appear together in focus shifts.""" for shift in focus_shifts: concepts = shift.get("concepts", []) if len(concepts) >= 2: # Create edges between all pairs of concepts in this shift for i in range(len(concepts)): for j in range(i + 1, len(concepts)): node1_id = f"concept_{concepts[i].replace(' ', '_').lower()}" node2_id = f"concept_{concepts[j].replace(' ', '_').lower()}" if node1_id in self.nodes and node2_id in self.nodes: edge_key = tuple(sorted([node1_id, node2_id])) if edge_key in self.edges: # Update existing edge self.edges[edge_key].co_occurrence_count += 1 self.edges[edge_key].weight += 0.2 # Increase weight else: # Create new edge edge = GraphEdge( source=node1_id, target=node2_id, weight=1.0, relation_type="co_occurrence", co_occurrence_count=1 ) self.add_edge(edge) def add_node(self, node: GraphNode) -> None: """Add a node to the graph.""" self.nodes[node.id] = node self.graph.add_node(node.id, **node.to_dict()) def add_edge(self, edge: GraphEdge) -> None: """Add an edge to the graph.""" edge_key = (edge.source, edge.target) self.edges[edge_key] = edge self.graph.add_edge( edge.source, edge.target, weight=edge.weight, relation_type=edge.relation_type, co_occurrence_count=edge.co_occurrence_count ) def calculate_centrality(self, method: str = "pagerank") -> None: """Calculate centrality measures for all nodes.""" if method == "pagerank": centrality_scores = nx.pagerank(self.graph, weight='weight') elif method == "betweenness": centrality_scores = nx.betweenness_centrality(self.graph, weight='weight') elif method == "eigenvector": centrality_scores = nx.eigenvector_centrality(self.graph, weight='weight', max_iter=1000) else: raise ValueError(f"Unknown centrality method: {method}") # Update node centrality values for node_id, score in centrality_scores.items(): if node_id in self.nodes: self.nodes[node_id].centrality = score # Also update graph node attributes nx.set_node_attributes(self.graph, centrality_scores, 'centrality') def get_subgraph(self, node_ids: List[str], depth: int = 2) -> nx.Graph: """Get subgraph around specified nodes up to given depth.""" subgraph_nodes = set() for node_id in node_ids: if node_id in self.graph: # Add nodes within specified distance for other_node in nx.single_source_shortest_path_length(self.graph, node_id, cutoff=depth): subgraph_nodes.add(other_node) return self.graph.subgraph(subgraph_nodes) def find_connected_components(self) -> List[List[str]]: """Find connected components in the graph.""" components = [] for component in nx.connected_components(self.graph): components.append(list(component)) return components def get_node_neighbors(self, node_id: str, max_neighbors: int = 10) -> List[Dict[str, Any]]: """Get neighbors of a node with their edge weights.""" if node_id not in self.graph: return [] neighbors = [] for neighbor in self.graph.neighbors(node_id): edge_data = self.graph.get_edge_data(node_id, neighbor) neighbor_node = self.nodes.get(neighbor) if neighbor_node: neighbors.append({ "node": neighbor_node.to_dict(), "edge_weight": edge_data.get("weight", 1.0), "relation_type": edge_data.get("relation_type", "related_to") }) # Sort by edge weight neighbors.sort(key=lambda x: x["edge_weight"], reverse=True) return neighbors[:max_neighbors] def to_networkx(self) -> nx.Graph: """Get the underlying NetworkX graph.""" return self.graph def to_dict(self) -> Dict[str, Any]: """Convert graph to dictionary representation.""" return { "nodes": [node.to_dict() for node in self.nodes.values()], "edges": [edge.to_dict() for edge in self.edges.values()], "metrics": { "node_count": len(self.nodes), "edge_count": len(self.edges), "density": nx.density(self.graph), "average_degree": sum(dict(self.graph.degree()).values()) / len(self.nodes) if self.nodes else 0 } } def save_to_file(self, filepath: str) -> None: """Save graph to file.""" import json graph_data = self.to_dict() with open(filepath, 'w') as f: json.dump(graph_data, f, indent=2) print(f"Graph saved to {filepath}") def load_from_file(self, filepath: str) -> None: """Load graph from file.""" import json with open(filepath, 'r') as f: graph_data = json.load(f) # Clear existing graph self.graph = nx.Graph() self.nodes = {} self.edges = {} # Load nodes for node_data in graph_data["nodes"]: node = GraphNode( id=node_data["id"], name=node_data["name"], type=node_data["type"], weight=node_data["weight"], centrality=node_data["centrality"], metadata=node_data.get("metadata", {}) ) self.add_node(node) # Load edges for edge_data in graph_data["edges"]: edge = GraphEdge( source=edge_data["source"], target=edge_data["target"], weight=edge_data["weight"], relation_type=edge_data["relation_type"], co_occurrence_count=edge_data["co_occurrence_count"] ) self.add_edge(edge) print(f"Graph loaded from {filepath} with {len(self.nodes)} nodes") ``` ## src/braiding_processor/braiding_kernel.py ```python """ Braiding kernel implementation for dual-manifold fusion. Combines individual resonance and collective feasibility scores. """ from typing import Dict, List, Any, Tuple, Optional import numpy as np from dataclasses import dataclass, field from enum import Enum class FusionGateType(Enum): """Types of fusion gates for braiding.""" LINEAR = "linear" GEOMETRIC = "geometric" STRUCTURAL = "structural" GATED = "gated" @dataclass class BraidingParameters: """Parameters for the braiding algorithm.""" alpha_weight: float = 0.4 # Individual resonance weight beta_weight: float = 0.4 # Collective feasibility weight gamma: float = 0.2 # Interaction term coefficient novelty_threshold: float = 0.7 hallucination_threshold: float = 0.1 # Minimum beta for valid ideas noise_threshold: float = 0.1 # Minimum alpha for relevant ideas fusion_gate: FusionGateType = FusionGateType.STRUCTURAL def validate(self) -> None: """Validate parameter values.""" if not (0 <= self.alpha_weight <= 1): raise ValueError("alpha_weight must be between 0 and 1") if not (0 <= self.beta_weight <= 1): raise ValueError("beta_weight must be between 0 and 1") if not (0 <= self.gamma <= 1): raise ValueError("gamma must be between 0 and 1") if self.alpha_weight + self.beta_weight + self.gamma > 1.5: print("Warning: Sum of weights exceeds 1.5, may produce large scores") class BraidingKernel: """ Implements the braiding formula for combining individual and collective scores. S_braid = λ * α + (1-λ) * β + γ * (α * β) * G(α, β) where G is a structural gate function. """ def __init__(self, parameters: Optional[BraidingParameters] = None): """Initialize braiding kernel with parameters.""" self.params = parameters or BraidingParameters() self.params.validate() def calculate_individual_resonance(self, query_embedding: np.ndarray, persona_graph: Any, # Would be KnowledgeGraph type gravity_well: Any, # Would be GravityWell type alpha_cache: Dict[str, float] = None) -> float: """ Calculate alpha score: individual resonance. Measures how well the query aligns with researcher's established history. """ if alpha_cache and query_embedding.tobytes() in alpha_cache: return alpha_cache[query_embedding.tobytes()] # This is a simplified calculation # In practice, this would involve: # 1. Semantic similarity with graph nodes # 2. Distance from gravity well center # 3. Historical frequency of similar concepts # Placeholder calculation alpha = 0.5 # Base value # Adjust based on gravity well distance (closer = higher alpha) # distance = gravity_well.calculate_distance(query_embedding) # alpha *= np.exp(-distance) # Exponential decay # Adjust based on graph centrality of similar nodes # similar_nodes = persona_graph.find_similar_nodes(query_embedding) # if similar_nodes: # avg_centrality = np.mean([n.centrality for n in similar_nodes]) # alpha *= (0.5 + avg_centrality) # Cache result if alpha_cache is not None: alpha_cache[query_embedding.tobytes()] = alpha return alpha def calculate_collective_feasibility(self, query_embedding: np.ndarray, community_graph: Any, # Would be CommunityGraph type wireframe: Any, # Would be WireframeBuilder type beta_cache: Dict[str, float] = None) -> float: """ Calculate beta score: collective feasibility. Measures how strongly the query is supported by community knowledge. """ if beta_cache and query_embedding.tobytes() in beta_cache: return beta_cache[query_embedding.tobytes()] # This is a simplified calculation # In practice, this would involve: # 1. Random walk probability in community graph # 2. Citation network support # 3. Publication frequency of related concepts # Placeholder calculation beta = 0.5 # Base value # Adjust based on community graph connectivity # connected_nodes = community_graph.find_connected_nodes(query_embedding) # if connected_nodes: # beta *= (0.3 + 0.7 * len(connected_nodes) / 100) # Normalized # Adjust based on wireframe support # support = wireframe.calculate_support(query_embedding) # beta *= (0.5 + 0.5 * support) # Cache result if beta_cache is not None: beta_cache[query_embedding.tobytes()] = beta return beta def apply_structural_gate(self, alpha: float, beta: float) -> float: """ Apply structural gate function G(α, β). Filters hallucinations and irrelevant noise. """ gate_type = self.params.fusion_gate if gate_type == FusionGateType.LINEAR: # Simple linear combination return self.params.alpha_weight * alpha + self.params.beta_weight * beta elif gate_type == FusionGateType.GEOMETRIC: # Geometric mean emphasizes balanced scores if alpha > 0 and beta > 0: return (alpha * beta) ** 0.5 return 0 elif gate_type == FusionGateType.STRUCTURAL: # Structural gate from the paper # Filters hallucinations (high alpha, low beta) and noise (low alpha, high beta) # Check for hallucinations if alpha > self.params.novelty_threshold and beta < self.params.hallucination_threshold: return -alpha * 0.5 # Penalize hallucinations # Check for irrelevant noise if alpha < self.params.noise_threshold and beta > self.params.novelty_threshold: return -beta * 0.3 # Penalize irrelevant concepts # Valid combination interaction = alpha * beta linear_component = self.params.alpha_weight * alpha + self.params.beta_weight * beta return linear_component + self.params.gamma * interaction elif gate_type == FusionGateType.GATED: # Gated fusion with sigmoid activation gate = 1 / (1 + np.exp(-10 * (alpha * beta - 0.5))) # Sigmoid gate return gate * (alpha + beta) / 2 else: raise ValueError(f"Unknown fusion gate type: {gate_type}") def braid_scores(self, alpha: float, beta: float, query_text: Optional[str] = None) -> Dict[str, Any]: """ Calculate braided score using the full formula. Returns detailed scoring breakdown. """ # Apply structural gate gate_value = self.apply_structural_gate(alpha, beta) # Calculate final braided score if self.params.fusion_gate == FusionGateType.STRUCTURAL: # For structural gate, gate_value is already the final score braided_score = gate_value else: # For other gates, combine with interaction term interaction = alpha * beta braided_score = gate_value + self.params.gamma * interaction # Normalize score to [0, 1] range (can be negative for invalid ideas) braided_score = max(-1, min(1, braided_score)) # Determine validity is_valid = ( braided_score > 0 and alpha > self.params.noise_threshold and beta > self.params.hallucination_threshold ) # Classify result type if braided_score < 0: result_type = "invalid" elif alpha > self.params.novelty_threshold and beta < self.params.hallucination_threshold: result_type = "hallucination" elif alpha < self.params.noise_threshold and beta > self.params.novelty_threshold: result_type = "noise" elif braided_score > self.params.novelty_threshold: result_type = "novel" else: result_type = "conventional" # Calculate novelty score (how different from existing knowledge) novelty_score = alpha * (1 - beta) # High individual, low collective return { "braided_score": round(braided_score, 4), "alpha": round(alpha, 4), "beta": round(beta, 4), "gate_value": round(gate_value, 4), "interaction": round(alpha * beta, 4), "is_valid": is_valid, "result_type": result_type, "novelty_score": round(novelty_score, 4), "parameters": { "alpha_weight": self.params.alpha_weight, "beta_weight": self.params.beta_weight, "gamma": self.params.gamma, "fusion_gate": self.params.fusion_gate.value } } def braid_multiple_queries(self, queries: List[Tuple[np.ndarray, str]], persona_graph: Any, community_graph: Any, gravity_well: Any, wireframe: Any) -> List[Dict[str, Any]]: """ Braid multiple queries and return sorted results. """ results = [] alpha_cache = {} beta_cache = {} for query_embedding, query_text in queries: # Calculate individual and collective scores alpha = self.calculate_individual_resonance( query_embedding, persona_graph, gravity_well, alpha_cache ) beta = self.calculate_collective_feasibility( query_embedding, community_graph, wireframe, beta_cache ) # Braid scores braiding_result = self.braid_scores(alpha, beta, query_text) braiding_result["query"] = query_text braiding_result["query_embedding"] = query_embedding.tolist() results.append(braiding_result) # Sort by braided score (descending) results.sort(key=lambda x: x["braided_score"], reverse=True) return results def find_optimal_ideas(self, candidate_ideas: List[Dict[str, Any]], persona_graph: Any, community_graph: Any, top_k: int = 5) -> List[Dict[str, Any]]: """ Find optimal research ideas from candidate list. """ # Extract queries from candidate ideas queries = [] for idea in candidate_ideas: query_embedding = np.array(idea.get("embedding", [0] * 384)) # Default dimension query_text = idea.get("description", "") queries.append((query_embedding, query_text)) # Braid all queries braided_results = self.braid_multiple_queries( queries, persona_graph, community_graph, gravity_well=None, wireframe=None # Would need actual instances ) # Filter valid and novel ideas optimal_ideas = [] for result in braided_results: if result["is_valid"] and result["result_type"] == "novel": # Find original idea data original_idea = next( (idea for idea in candidate_ideas if idea.get("description") == result["query"]), None ) if original_idea: optimal_idea = original_idea.copy() optimal_idea.update({ "braiding_scores": result, "overall_score": result["braided_score"] }) optimal_ideas.append(optimal_idea) # Return top k ideas return optimal_ideas[:top_k] def optimize_parameters(self, training_data: List[Dict[str, Any]], validation_data: List[Dict[str, Any]]) -> BraidingParameters: """ Optimize braiding parameters using training data. This is a placeholder for actual optimization logic. """ print("Optimizing braiding parameters...") # Simple grid search (would be more sophisticated in practice) best_params = None best_score = -float('inf') for alpha_weight in [0.3, 0.4, 0.5]: for beta_weight in [0.3, 0.4, 0.5]: for gamma in [0.1, 0.2, 0.3]: params = BraidingParameters( alpha_weight=alpha_weight, beta_weight=beta_weight, gamma=gamma ) # Evaluate on validation data score = self._evaluate_parameters(params, validation_data) if score > best_score: best_score = score best_params = params print(f"Best score: {best_score}") return best_params def _evaluate_parameters(self, params: BraidingParameters, validation_data: List[Dict[str, Any]]) -> float: """ Evaluate parameters on validation data. Returns average score. """ self.params = params scores = [] for data_point in validation_data: alpha = data_point.get("alpha", 0.5) beta = data_point.get("beta", 0.5) expected_score = data_point.get("expected_score", 0) result = self.braid_scores(alpha, beta) predicted_score = result["braided_score"] # Calculate error (would use more sophisticated metric in practice) error = abs(predicted_score - expected_score) scores.append(1 - error) # Higher is better return np.mean(scores) if scores else 0 ``` ## README.md ```markdown # Dual Manifold Cognitive Architecture An advanced AI system that models individual researcher cognition and community knowledge as separate manifolds, then performs braided optimization to discover novel research directions. ## Overview This system implements the architecture described in the "AI Dual Manifold Cognitive Architecture" video, creating a cognitive digital twin of researchers that can: - Parse and analyze research documents over time - Build weighted knowledge graphs of expertise - Create gravity well representations of comfort zones - Access collective scientific knowledge via OpenAlex - Perform braided optimization to find novel research directions - Generate personalized research proposals ## Architecture ### Core Components 1. **Episodic Memory Layer** - Hybrid search (dense vectors + BM25) - Timestamped document chunks - Reciprocal rank fusion 2. **Semantic Memory Layer** - Temporal concept extraction - Cognitive trajectory analysis - Research focus shift detection 3. **Persona Layer** - Weighted knowledge graph construction - Centrality measure calculation - Gravity well/KDE representation 4. **Collective Manifold** - OpenAlex API integration - Community knowledge graph - Wireframe manifold estimation 5. **Braiding Processor** - Individual resonance (alpha) scoring - Collective feasibility (beta) scoring - Structural gate fusion - Novelty optimization ## Installation ### Prerequisites - Python 3.10+ - Docker (optional) - OpenAI API key - OpenAlex API key ### Quick Start ```bash # Clone repository git clone https://github.com/yourusername/dual-manifold-ai.git cd dual-manifold-ai # Create virtual environment python -m venv venv source venv/bin/activate # On Windows: venv\Scripts\activate # Install dependencies pip install -r requirements.txt # Set up environment variables cp .env.example .env # Edit .env with your API keys # Create data directories mkdir -p data/raw_documents mkdir -p data/processed mkdir -p data/indices # Run tests python -m pytest tests/ ``` ### Docker Installation ```bash # Build and run with Docker Compose docker-compose up --build # Or build individually docker build -t dual-manifold-ai . docker run -p 8000:8000 dual-manifold-ai ``` ## Configuration Edit `config/settings.py` or set environment variables: ```bash export OPENAI_API_KEY="your-key-here" export OPENALEX_API_KEY="your-key-here" export DATA_DIR="./data" export DEBUG="True" ``` ## Usage ### 1. Import Research Documents Place your research documents (PDFs, text files) in `data/raw_documents/`: ```bash cp ~/research_papers/*.pdf data/raw_documents/ ``` ### 2. Parse and Index Documents ```python from src.episodic_memory.document_parser import DocumentParser from src.episodic_memory.hybrid_index import HybridIndex from config.settings import settings # Parse documents parser = DocumentParser( chunk_size=settings.CHUNK_SIZE, chunk_overlap=settings.CHUNK_OVERLAP ) chunks = parser.parse_directory(settings.RAW_DOCS_DIR) # Build hybrid index index = HybridIndex(embedding_model_name=settings.EMBEDDING_MODEL) index.build_indexes([chunk.to_dict() for chunk in chunks]) # Save indexes index.save_indexes(settings.INDICES_DIR) ``` ### 3. Analyze Cognitive Trajectory ```python from src.semantic_memory.temporal_distiller import ConceptEvolutionAnalyzer analyzer = ConceptEvolutionAnalyzer(llm_model=settings.LLM_MODEL) trajectory = analyzer.analyze_trajectory( [chunk.to_dict() for chunk in chunks], researcher_id="researcher_001" ) summary = analyzer.generate_trajectory_summary(trajectory) print(f"Trajectory score: {summary['trajectory_score']}") ``` ### 4. Build Persona Knowledge Graph ```python from src.persona_layer.knowledge_graph import KnowledgeGraphBuilder graph_builder = KnowledgeGraphBuilder() graph_builder.build_from_trajectory(summary) # Calculate centrality graph_builder.calculate_centrality(method=settings.CENTRALITY_MEASURE) # Save graph graph_builder.save_to_file("data/persona_graph.json") ``` ### 5. Perform Braided Search ```python from src.braiding_processor.braiding_kernel import BraidingKernel from src.utils.embeddings import EmbeddingGenerator # Initialize components braiding_kernel = BraidingKernel() embedding_generator = EmbeddingGenerator(model_name=settings.EMBEDDING_MODEL) # Example research query query = "neural networks for drug discovery" query_embedding = embedding_generator.encode(query) # Calculate scores (simplified - would need actual graph instances) alpha = 0.7 # Individual resonance beta = 0.6 # Collective feasibility # Braid scores result = braiding_kernel.braid_scores(alpha, beta, query) print(f"Braided score: {result['braided_score']}") print(f"Result type: {result['result_type']}") ``` ### 6. Use the API Server ```bash # Start the API server uvicorn api.server:app --reload --host 0.0.0.0 --port 8000 ``` Then access the API at `http://localhost:8000/docs` for Swagger UI. ## API Endpoints - `POST /api/analyze/researcher` - Analyze researcher documents - `GET /api/trajectory/{researcher_id}` - Get cognitive trajectory - `POST /api/braid/suggest` - Get research suggestions - `GET /api/graph/{researcher_id}` - Get persona knowledge graph - `POST /api/optimize/parameters` - Optimize braiding parameters ## Example Research Proposal Generation ```python import requests # Example API call to get research suggestions response = requests.post( "http://localhost:8000/api/braid/suggest", json={ "researcher_id": "researcher_001", "query": "quantum machine learning applications", "max_suggestions": 3 } ) suggestions = response.json() for suggestion in suggestions: print(f"Title: {suggestion['title']}") print(f"Novelty Score: {suggestion['novelty_score']}") print(f"Description: {suggestion['description']}") print("---") ``` ## Configuration Parameters ### Braiding Parameters - `alpha_weight`: Weight for individual resonance (default: 0.4) - `beta_weight`: Weight for collective feasibility (default: 0.4) - `gamma`: Interaction term coefficient (default: 0.2) - `novelty_threshold`: Minimum score for novel ideas (default: 0.7) - `fusion_gate`: Type of fusion (linear, geometric, structural, gated) ### Search Parameters - `HYBRID_SEARCH_WEIGHT`: Balance between dense/sparse search (0.5) - `TOP_K_RESULTS`: Number of search results (10) - `CHUNK_SIZE`: Document chunk size (1000) - `CHUNK_OVERLAP`: Chunk overlap (200) ## Advanced Features ### Custom Embedding Models Edit `config/settings.py`: ```python EMBEDDING_MODEL = "sentence-transformers/all-mpnet-base-v2" ``` ### Multi-Researcher Analysis ```python # Analyze multiple researchers researchers = ["researcher_001", "researcher_002"] for researcher in researchers: # Load researcher-specific documents # Build individual persona # Compare trajectories ``` ### Real-time Updates ```python # Watch directory for new documents from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler class ResearchUpdateHandler(FileSystemEventHandler): def on_created(self, event): if event.src_path.endswith('.pdf'): print(f"New document: {event.src_path}") # Trigger re-analysis ``` ## Performance Tuning ### Index Optimization ```python # Adjust FAISS index parameters index = faiss.IndexIVFFlat( quantizer, dimension, nlist, faiss.METRIC_INNER_PRODUCT ) ``` ### Cache Configuration ```python # Enable Redis caching settings.ENABLE_CACHE = True settings.REDIS_URL = "redis://localhost:6379" ``` ## Troubleshooting ### Common Issues 1. **Memory Issues** - Reduce `CHUNK_SIZE` - Use smaller embedding models - Enable disk-based caching 2. **API Rate Limits** - Implement exponential backoff - Use request pooling - Cache API responses 3. **Slow Performance** - Enable GPU acceleration - Use batch processing - Optimize graph algorithms ### Debug Mode ```bash export DEBUG=True python -m debugpy --listen 5678 --wait-for-client main.py ``` ## Contributing 1. Fork the repository 2. Create a feature branch 3. Add tests for new functionality 4. Ensure all tests pass 5. Submit a pull request ### Development Setup ```bash # Install development dependencies pip install -r requirements-dev.txt # Run tests with coverage pytest --cov=src --cov-report=html # Type checking mypy src/ # Code formatting black src/ isort src/ ``` ## Citation If you use this system in your research, please cite: ``` Zeng, Q., Fan, B., Chen, Z., Ren, S., & Zhang, Z. (2025). MirrorMind: Empowering OmniScientist with Expert Perspectives. arXiv preprint arXiv:2511.XXXXX. ``` ## License MIT License - see LICENSE file for details. ## Support - Issues: [GitHub Issues](https://github.com/yourusername/dual-manifold-ai/issues) - Documentation: [ReadTheDocs](https://dual-manifold-ai.readthedocs.io) - Email: support@dualmanifold.ai ## Acknowledgments - Inspired by research from Tsinghua University - OpenAlex for scientific publication data - The AI research community for foundational work ``` ## SETUP: ```bash #!/bin/bash # setup.sh - Complete setup script for Dual Manifold Cognitive Architecture set -e # Exit on error echo "Setting up Dual Manifold Cognitive Architecture..." echo "==================================================" # Check Python version echo "Checking Python version..." python --version | grep -q "3.10" || echo "Warning: Python 3.10+ recommended" # Create project structure echo "Creating project structure..." mkdir -p config data/{raw_documents,processed,indices} src/{episodic_memory,semantic_memory,persona_layer,collective_manifold,braiding_processor,agents,optimization,utils} tests api notebooks logs # Create virtual environment echo "Creating virtual environment..." python -m venv venv source venv/bin/activate # On Windows: venv\Scripts\activate # Upgrade pip echo "Upgrading pip..." pip install --upgrade pip # Install dependencies echo "Installing dependencies..." pip install -r requirements.txt # Install development dependencies echo "Installing development dependencies..." pip install -r requirements-dev.txt # Create environment file echo "Creating environment configuration..." cat > .env << EOL # API Keys (replace with your actual keys) OPENAI_API_KEY=your_openai_api_key_here OPENALEX_API_KEY=your_openalex_api_key_here HUGGINGFACE_TOKEN=your_huggingface_token_here # Paths DATA_DIR=./data LOGS_DIR=./logs # Settings DEBUG=False ENABLE_CACHE=True EOL echo "Please edit .env file with your actual API keys!" # Create example configuration echo "Creating example configuration files..." # Create example document mkdir -p examples/documents cat > examples/documents/example_paper.txt << EOL Title: Advances in Graph Neural Networks for Molecular Modeling Author: Researcher A Date: 2024-01-15 Abstract: This paper explores the application of graph neural networks to molecular property prediction. We introduce a novel attention mechanism that improves prediction accuracy by 15% compared to baseline methods. Introduction: Molecular representation learning has been a challenging problem in computational chemistry. Traditional methods like Morgan fingerprints have limitations in capturing complex molecular structures. Methodology: We propose GNN-Mol, a graph neural network architecture with multi-head attention. The model processes molecular graphs where atoms are nodes and bonds are edges. Results: Our method achieves state-of-the-art results on the QM9 dataset, with particular improvements in predicting molecular dipole moments. Conclusion: Graph neural networks show great promise for molecular modeling, especially when combined with attention mechanisms. EOL # Create Docker configuration cat > Dockerfile << EOL FROM python:3.10-slim WORKDIR /app # Install system dependencies RUN apt-get update && apt-get install -y \ build-essential \ curl \ software-properties-common \ git \ && rm -rf /var/lib/apt/lists/* # Copy requirements COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # Copy application COPY . . # Create non-root user RUN useradd -m -u 1000 user RUN chown -R user:user /app USER user # Expose port EXPOSE 8000 # Health check HEALTHCHECK CMD curl --fail http://localhost:8000/health || exit 1 # Run application CMD ["uvicorn", "api.server:app", "--host", "0.0.0.0", "--port", "8000"] EOL # Create docker-compose file cat > docker-compose.yml << EOL version: '3.8' services: dual-manifold-ai: build: . ports: - "8000:8000" environment: - OPENAI_API_KEY=\${OPENAI_API_KEY} - OPENALEX_API_KEY=\${OPENALEX_API_KEY} - DEBUG=False volumes: - ./data:/app/data - ./logs:/app/logs healthcheck: test: ["CMD", "curl", "-f", "http://localhost:8000/health"] interval: 30s timeout: 10s retries: 3 restart: unless-stopped redis: image: redis:7-alpine ports: - "6379:6379" volumes: - redis-data:/data restart: unless-stopped postgres: image: postgres:15-alpine environment: POSTGRES_USER: dualmanifold POSTGRES_PASSWORD: securepassword POSTGRES_DB: dualmanifold_db ports: - "5432:5432" volumes: - postgres-data:/var/lib/postgresql/data restart: unless-stopped volumes: redis-data: postgres-data: EOL # Create test script cat > test_system.py << EOL #!/usr/bin/env python3 """ Test script for the Dual Manifold Cognitive Architecture. """ import sys import os sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from config.settings import settings def test_environment(): """Test basic environment setup.""" print("Testing environment setup...") # Check directories required_dirs = [ settings.DATA_DIR, settings.RAW_DOCS_DIR, settings.PROCESSED_DIR, settings.INDICES_DIR, settings.LOGS_DIR ] for dir_path in required_dirs: if os.path.exists(dir_path): print(f"✓ Directory exists: {dir_path}") else: print(f"✗ Missing directory: {dir_path}") return False # Check environment variables if settings.OPENAI_API_KEY == "your_openai_api_key_here": print("⚠ Warning: Using default OpenAI API key") print("Environment test passed!") return True def test_imports(): """Test that all modules can be imported.""" print("\nTesting module imports...") modules = [ "config.settings", "src.episodic_memory.document_parser", "src.episodic_memory.hybrid_index", "src.semantic_memory.temporal_distiller", "src.persona_layer.knowledge_graph", "src.braiding_processor.braiding_kernel" ] for module in modules: try: __import__(module) print(f"✓ Imported: {module}") except ImportError as e: print(f"✗ Failed to import {module}: {e}") return False print("Import test passed!") return True def main(): """Run all tests.""" print("=" * 50) print("Dual Manifold Cognitive Architecture - System Test") print("=" * 50) tests = [test_environment, test_imports] all_passed = True for test in tests: try: if not test(): all_passed = False except Exception as e: print(f"✗ Test failed with exception: {e}") all_passed = False print("\n" + "=" * 50) if all_passed: print("✅ All tests passed! System is ready.") print("\nNext steps:") print("1. Add your research documents to data/raw_documents/") print("2. Update API keys in .env file") print("3. Run: python examples/analyze_researcher.py") print("4. Start API server: uvicorn api.server:app --reload") else: print("❌ Some tests failed. Please check the errors above.") sys.exit(1) if __name__ == "__main__": main() EOL chmod +x test_system.py # Create example analysis script mkdir -p examples cat > examples/analyze_researcher.py << EOL #!/usr/bin/env python3 """ Example script to analyze a researcher's documents. """ import sys import os sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from config.settings import settings from src.episodic_memory.document_parser import DocumentParser from src.episodic_memory.hybrid_index import HybridIndex from src.semantic_memory.temporal_distiller import ConceptEvolutionAnalyzer from src.persona_layer.knowledge_graph import KnowledgeGraphBuilder def main(): """Main analysis pipeline.""" print("Starting researcher analysis pipeline...") # Step 1: Parse documents print("\n1. Parsing documents...") parser = DocumentParser( chunk_size=settings.CHUNK_SIZE, chunk_overlap=settings.CHUNK_OVERLAP ) # Copy example document to data directory import shutil example_doc = "examples/documents/example_paper.txt" if os.path.exists(example_doc): shutil.copy(example_doc, settings.RAW_DOCS_DIR) chunks = parser.parse_directory(settings.RAW_DOCS_DIR) if not chunks: print("No documents found. Please add documents to data/raw_documents/") return print(f"Parsed {len(chunks)} chunks from documents") # Step 2: Build search index print("\n2. Building hybrid search index...") index = HybridIndex(embedding_model_name=settings.EMBEDDING_MODEL) index.build_indexes([chunk.to_dict() for chunk in chunks]) index.save_indexes(settings.INDICES_DIR) # Test search test_query = "graph neural networks" results = index.hybrid_search(test_query, top_k=3) print(f"Test search for '{test_query}' found {len(results)} results") # Step 3: Analyze cognitive trajectory print("\n3. Analyzing cognitive trajectory...") analyzer = ConceptEvolutionAnalyzer(llm_model=settings.LLM_MODEL) trajectory = analyzer.analyze_trajectory( [chunk.to_dict() for chunk in chunks], researcher_id="example_researcher" ) summary = analyzer.generate_trajectory_summary(trajectory) print(f"Trajectory score: {summary['trajectory_score']}") print(f"Total concepts: {summary['concept_analysis']['total_concepts']}") print(f"Focus shifts: {summary['dynamics']['total_shifts']}") # Step 4: Build knowledge graph print("\n4. Building knowledge graph...") graph_builder = KnowledgeGraphBuilder() graph_builder.build_from_trajectory(summary) graph_builder.calculate_centrality(method=settings.CENTRALITY_MEASURE) graph_data = graph_builder.to_dict() print(f"Graph built with {graph_data['metrics']['node_count']} nodes") print(f"Graph density: {graph_data['metrics']['density']:.3f}") # Save results import json with open("data/researcher_analysis.json", "w") as f: json.dump({ "summary": summary, "graph": graph_data }, f, indent=2) print("\n✅ Analysis complete!") print(f"Results saved to data/researcher_analysis.json") if __name__ == "__main__": main() EOL chmod +x examples/analyze_researcher.py # Create requirements.txt cat > requirements.txt << EOL # Core dependencies python-dotenv>=1.0.0 pydantic>=2.0.0 pydantic-settings>=2.0.0 # Data processing pandas>=2.0.0 numpy>=1.24.0 scipy>=1.10.0 scikit-learn>=1.3.0 # Document parsing pdfplumber>=0.10.0 langchain>=0.1.0 langchain-openai>=0.0.1 pypdf>=3.17.0 # Embeddings and search sentence-transformers>=2.2.0 faiss-cpu>=1.7.0 rank-bm25>=0.2.2 chromadb>=0.4.0 # Graph processing networkx>=3.0 python-igraph>=0.10.0 # Machine learning torch>=2.0.0 transformers>=4.30.0 # API and web fastapi>=0.104.0 uvicorn[standard]>=0.24.0 httpx>=0.25.0 aiohttp>=3.9.0 # Utilities tqdm>=4.65.0 loguru>=0.7.0 cachetools>=5.3.0 redis>=5.0.0 # Development pytest>=7.4.0 pytest-asyncio>=0.21.0 pytest-cov>=4.1.0 black>=23.0.0 isort>=5.12.0 mypy>=1.5.0 pre-commit>=3.5.0 EOL # Create requirements-dev.txt cat > requirements-dev.txt << EOL # Testing pytest>=7.4.0 pytest-asyncio>=0.21.0 pytest-cov>=4.1.0 pytest-mock>=3.11.0 pytest-xdist>=3.3.0 # Code quality black>=23.0.0 isort>=5.12.0 flake8>=6.0.0 mypy>=1.5.0 pre-commit>=3.5.0 bandit>=1.7.0 safety>=2.3.0 # Documentation mkdocs>=1.5.0 mkdocs-material>=9.0.0 mkdocstrings[python]>=0.23.0 # Monitoring sentry-sdk>=1.35.0 prometheus-client>=0.18.0 # Notebooks jupyter>=1.0.0 jupyterlab>=4.0.0 ipywidgets>=8.0.0 EOL # Create pre-commit config cat > .pre-commit-config.yaml << EOL repos: - repo: https://github.com/pre-commit/pre-commit-hooks rev: v4.5.0 hooks: - id: trailing-whitespace - id: end-of-file-fixer - id: check-yaml - id: check-added-large-files - id: check-merge-conflict - id: check-case-conflict - id: check-toml - repo: https://github.com/psf/black rev: 23.11.0 hooks: - id: black language_version: python3.10 - repo: https://github.com/pycqa/isort rev: 5.12.0 hooks: - id: isort args: ["--profile", "black"] - repo: https://github.com/pycqa/flake8 rev: 6.1.0 hooks: - id: flake8 args: ["--max-line-length=88", "--extend-ignore=E203,W503"] - repo: https://github.com/pre-commit/mirrors-mypy rev: v1.5.1 hooks: - id: mypy additional_dependencies: - types-requests - types-pyyaml - types-redis - pydantic args: ["--ignore-missing-imports", "--strict"] - repo: https://github.com/PyCQA/bandit rev: 1.7.5 hooks: - id: bandit args: ["-c", "pyproject.toml"] - repo: https://github.com/python-poetry/poetry rev: 1.6.0 hooks: - id: poetry-check - id: poetry-lock EOL echo "Setup complete!" echo "" echo "To get started:" echo "1. Activate virtual environment: source venv/bin/activate" echo "2. Edit .env file with your API keys" echo "3. Run system test: ./test_system.py" echo "4. Try example analysis: python examples/analyze_researcher.py" echo "" echo "For API development:" echo "uvicorn api.server:app --reload --host 0.0.0.0 --port 8000" ``` # TAKEAWAYS: 1. Dual manifolds separate individual and collective knowledge spaces. 2. Braiding combines scores through gated structural fusion. 3. Hybrid search ensures precise technical term matching. 4. Temporal analysis reveals cognitive evolution patterns. 5. Gravity wells represent expertise comfort zones. 6. Novelty repulsors push researchers beyond existing knowledge. 7. Structural gates filter hallucinations and noise effectively. 8. Centrality measures quantify concept importance dynamically. 9. Linearization prepares complex graphs for LLM consumption. 10. Constraint optimization finds Goldilocks zone intersections. 11. Multi-agent coordination enables interdisciplinary discovery. 12. Non-parametric structures shift intelligence from model weights. 13. Markovian assumption breaks with historical dependencies. 14. Reciprocal rank fusion balances semantic and lexical search. 15. Kernel density estimation creates smooth manifold representations. 16. Research trajectories provide personalized cognitive models. # SUGGESTIONS: 1. Implement Riemannian manifold learning for curved spaces. 2. Add real-time document ingestion with filesystem monitoring. 3. Create interactive visualization dashboard for gravity wells. 3. Integrate with academic search engines beyond OpenAlex. 5. Develop federated learning for multi-researcher collaboration. 6. Add reinforcement learning for parameter optimization. 7. Implement quantum-inspired algorithms for complex optimization. 8. Create browser extension for seamless research integration. 9. Develop mobile app for on-the-go research suggestions. 10. Add multilingual support for international research. 11. Implement differential privacy for sensitive research data. 12. Create plugin system for custom domain agents. 13. Add blockchain for research provenance tracking. 14. Develop simulation environment for hypothesis testing. 15. Implement transfer learning between researcher personas. 16. Create API marketplace for specialized domain modules.