- Complete planning documentation for 5-phase development - UI design specifications and integration - Domain architecture and directory templates - Technical specifications and requirements - Knowledge incorporation strategies - Dana language reference and integration notes
82 KiB
PROJECT:
A dual-manifold cognitive architecture that combines individual expertise with collective knowledge for scientific discovery through constraint optimization.
SUMMARY:
The system models individual researchers' cognitive evolution (episodic→semantic→persona layers) and community knowledge as separate manifolds, then performs braided optimization to find novel research directions at their intersection using a repulsive novelty force and collective validation.
STEPS:
- Parse researcher documents into timestamped chunks.
- Build hybrid search with vector and BM25 indexes.
- Distill cognitive trajectory via temporal analysis.
- Construct persona graph with centrality measures.
- Calculate gravity well via kernel density estimation.
- Fetch community knowledge from OpenAlex API.
- Compute individual resonance (alpha) scores.
- Calculate collective feasibility (beta) scores.
- Apply braiding formula with gated fusion.
- Filter hallucinations and noise.
- Generate optimal research direction P*.
- Create linearized context for LLM prompting.
- Coordinate multiple domain agents.
- Validate suggestions with constraint checking.
- Output personalized research proposals.
STRUCTURE:
dual-manifold-ai/
├── config/
│ ├── __init__.py
│ ├── settings.py
│ └── constants.py
├── data/
│ ├── raw_documents/
│ ├── processed/
│ └── indices/
├── src/
│ ├── episodic_memory/
│ │ ├── document_parser.py
│ │ ├── hybrid_index.py
│ │ └── chunk_manager.py
│ ├── semantic_memory/
│ │ ├── temporal_distiller.py
│ │ ├── cognitive_trajectory.py
│ │ └── evolution_analyzer.py
│ ├── persona_layer/
│ │ ├── knowledge_graph.py
│ │ ├── centrality_calculator.py
│ │ └── gravity_well.py
│ ├── collective_manifold/
│ │ ├── openalex_client.py
│ │ ├── community_graph.py
│ │ └── wireframe_builder.py
│ ├── braiding_processor/
│ │ ├── individual_resonance.py
│ │ ├── collective_feasibility.py
│ │ └── braiding_kernel.py
│ ├── agents/
│ │ ├── coordinator.py
│ │ ├── domain_agent.py
│ │ └── author_agent.py
│ ├── optimization/
│ │ ├── constraint_solver.py
│ │ ├── novelty_repulsor.py
│ │ └── goldilocks_finder.py
│ └── utils/
│ ├── embeddings.py
│ ├── graph_utils.py
│ └── linearizer.py
├── tests/
│ ├── test_episodic.py
│ ├── test_semantic.py
│ └── test_braiding.py
├── api/
│ ├── server.py
│ └── endpoints.py
├── notebooks/
│ ├── exploration.ipynb
│ └── visualization.ipynb
├── requirements.txt
├── docker-compose.yml
├── Dockerfile
└── README.md
DETAILED EXPLANATION:
config/settings.py: Central configuration management for API keys and paths.config/constants.py: Mathematical constants and thresholds for algorithms.data/raw_documents/: Storage for researcher PDFs and text documents.data/processed/: Cleaned and timestamped document chunks.data/indices/: Persistent search indexes for fast retrieval.src/episodic_memory/document_parser.py: Extracts text with metadata and timestamps.src/episodic_memory/hybrid_index.py: Combines dense vectors with sparse BM25.src/episodic_memory/chunk_manager.py: Creates semantic chunks with IDs.src/semantic_memory/temporal_distiller.py: Analyzes evolution using LLM.src/semantic_memory/cognitive_trajectory.py: Builds time series of concepts.src/semantic_memory/evolution_analyzer.py: Detects shifts in research focus.src/persona_layer/knowledge_graph.py: Constructs weighted graph from concepts.src/persona_layer/centrality_calculator.py: Computes node importance metrics.src/persona_layer/gravity_well.py: Creates kernel density estimation field.src/collective_manifold/openalex_client.py: Fetches community publications.src/collective_manifold/community_graph.py: Builds domain knowledge networks.src/collective_manifold/wireframe_builder.py: Creates manifold estimation points.src/braiding_processor/individual_resonance.py: Calculates alpha scores.src/braiding_processor/collective_feasibility.py: Computes beta scores.src/braiding_processor/braiding_kernel.py: Implements gated fusion formula.src/agents/coordinator.py: Orchestrates multi-agent interactions.src/agents/domain_agent.py: Specializes in specific scientific domains.src/agents/author_agent.py: Models individual researcher persona.src/optimization/constraint_solver.py: Solves dual constraint optimization.src/optimization/novelty_repulsor.py: Implements repulsive force logic.src/optimization/goldilocks_finder.py: Locates optimal intersection zones.src/utils/embeddings.py: Handles text vectorization operations.src/utils/graph_utils.py: Provides graph algorithms and traversals.src/utils/linearizer.py: Converts complex structures to LLM prompts.tests/test_episodic.py: Validates document parsing and indexing.tests/test_semantic.py: Tests cognitive trajectory analysis.tests/test_braiding.py: Verifies braiding algorithm correctness.api/server.py: FastAPI server for system interaction.api/endpoints.py: REST endpoints for research suggestions.notebooks/exploration.ipynb: Interactive system exploration.notebooks/visualization.ipynb: Gravity well and graph visualization.requirements.txt: Python dependencies and versions.docker-compose.yml: Service orchestration for deployment.Dockerfile: Containerization configuration.README.md: Comprehensive setup and usage guide.
CODE:
config/settings.py
"""
Central configuration for the dual-manifold cognitive architecture.
Manages API keys, file paths, and system parameters.
"""
import os
from typing import Dict, Any
from dataclasses import dataclass, field
from dotenv import load_dotenv
load_dotenv()
@dataclass
class Settings:
"""System configuration settings."""
# API Keys
OPENAI_API_KEY: str = os.getenv("OPENAI_API_KEY", "")
OPENALEX_API_KEY: str = os.getenv("OPENALEX_API_KEY", "")
HUGGINGFACE_TOKEN: str = os.getenv("HUGGINGFACE_TOKEN", "")
# Paths
DATA_DIR: str = os.getenv("DATA_DIR", "./data")
RAW_DOCS_DIR: str = os.path.join(DATA_DIR, "raw_documents")
PROCESSED_DIR: str = os.path.join(DATA_DIR, "processed")
INDICES_DIR: str = os.path.join(DATA_DIR, "indices")
LOGS_DIR: str = os.getenv("LOGS_DIR", "./logs")
# Model configurations
EMBEDDING_MODEL: str = "sentence-transformers/all-MiniLM-L6-v2"
LLM_MODEL: str = "gpt-4-turbo-preview"
CHUNK_SIZE: int = 1000
CHUNK_OVERLAP: int = 200
# Search parameters
HYBRID_SEARCH_WEIGHT: float = 0.5 # Balance between dense and sparse
TOP_K_RESULTS: int = 10
RECIPROCAL_RANK_K: int = 60
# Graph parameters
CENTRALITY_MEASURE: str = "pagerank"
MIN_EDGE_WEIGHT: float = 0.1
MAX_GRAPH_NODES: int = 1000
# Braiding parameters
ALPHA_WEIGHT: float = 0.4 # Individual resonance
BETA_WEIGHT: float = 0.4 # Collective feasibility
GAMMA: float = 0.2 # Interaction term
NOVELTY_THRESHOLD: float = 0.7
# Server settings
API_HOST: str = "0.0.0.0"
API_PORT: int = 8000
DEBUG_MODE: bool = os.getenv("DEBUG", "False").lower() == "true"
# Cache settings
CACHE_TTL: int = 3600 # 1 hour
ENABLE_CACHE: bool = True
def validate(self) -> None:
"""Validate configuration settings."""
required_keys = ["OPENAI_API_KEY", "OPENALEX_API_KEY"]
missing = [key for key in required_keys if not getattr(self, key)]
if missing:
raise ValueError(f"Missing required environment variables: {missing}")
# Create directories if they don't exist
for dir_path in [self.DATA_DIR, self.RAW_DOCS_DIR,
self.PROCESSED_DIR, self.INDICES_DIR, self.LOGS_DIR]:
os.makedirs(dir_path, exist_ok=True)
def to_dict(self) -> Dict[str, Any]:
"""Convert settings to dictionary."""
return {k: v for k, v in self.__dict__.items() if not k.startswith('_')}
# Global settings instance
settings = Settings()
src/episodic_memory/document_parser.py
"""
Document parsing module for episodic memory layer.
Extracts text with metadata, timestamps, and creates semantic chunks.
"""
import os
import re
from datetime import datetime
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass, field
import hashlib
from pathlib import Path
import pdfplumber
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.schema import Document as LangchainDocument
@dataclass
class DocumentChunk:
"""Represents a semantically coherent chunk of text."""
id: str
text: str
source_file: str
chunk_index: int
timestamp: datetime
metadata: Dict[str, Any] = field(default_factory=dict)
embeddings: Optional[List[float]] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert chunk to dictionary for storage."""
return {
"id": self.id,
"text": self.text,
"source_file": self.source_file,
"chunk_index": self.chunk_index,
"timestamp": self.timestamp.isoformat(),
"metadata": self.metadata
}
class DocumentParser:
"""Parses documents into timestamped chunks with metadata."""
def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
"""Initialize parser with chunking parameters."""
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
separators=["\n\n", "\n", ". ", " ", ""]
)
def extract_text_from_pdf(self, pdf_path: str) -> Tuple[str, Dict[str, Any]]:
"""Extract text from PDF file with metadata."""
text_parts = []
metadata = {
"file_name": os.path.basename(pdf_path),
"file_size": os.path.getsize(pdf_path),
"page_count": 0,
"extraction_date": datetime.now().isoformat()
}
try:
with pdfplumber.open(pdf_path) as pdf:
metadata["page_count"] = len(pdf.pages)
for page_num, page in enumerate(pdf.pages):
page_text = page.extract_text()
if page_text:
text_parts.append(f"Page {page_num + 1}:\n{page_text}")
# Try to extract creation date from metadata
if pdf.metadata:
if 'CreationDate' in pdf.metadata:
metadata["creation_date"] = pdf.metadata['CreationDate']
if 'Title' in pdf.metadata:
metadata["title"] = pdf.metadata['Title']
if 'Author' in pdf.metadata:
metadata["author"] = pdf.metadata['Author']
except Exception as e:
raise ValueError(f"Failed to parse PDF {pdf_path}: {str(e)}")
return "\n\n".join(text_parts), metadata
def extract_text_from_txt(self, txt_path: str) -> Tuple[str, Dict[str, Any]]:
"""Extract text from plain text file."""
try:
with open(txt_path, 'r', encoding='utf-8') as f:
text = f.read()
except UnicodeDecodeError:
with open(txt_path, 'r', encoding='latin-1') as f:
text = f.read()
metadata = {
"file_name": os.path.basename(txt_path),
"file_size": os.path.getsize(txt_path),
"extraction_date": datetime.now().isoformat()
}
return text, metadata
def extract_timestamp(self, file_path: str, metadata: Dict[str, Any]) -> datetime:
"""Extract timestamp from file and metadata."""
# First try metadata
if "creation_date" in metadata:
try:
# Handle PDF creation date format: D:20250101120000
date_str = metadata["creation_date"]
if date_str.startswith("D:"):
date_str = date_str[2:]
return datetime.strm%d%H%M%S")str[:14], "%Y%
except:
pass
# Try file modification time
file_mtime = os.path.getmtime(file_path)
return datetime.fromtimestamp(file_mtime)
def create_chunk_id(self, source_file: str, chunk_index: int, text: str) -> str:
"""Create unique ID for chunk."""
content_hash = hashlib.md5(text.encode()).hexdigest()[:8]
file_hash = hashlib.md5(source_file.encode()).hexdigest()[:8]
return f"chunk_{file_hash}_{chunk_index}_{content_hash}"
def parse_document(self, file_path: str) -> List[DocumentChunk]:
"""Parse a document into timestamped chunks."""
# Determine file type and extract text
file_ext = os.path.splitext(file_path)[1].lower()
if file_ext == '.pdf':
text, metadata = self.extract_text_from_pdf(file_path)
elif file_ext in ['.txt', '.md', '.csv']:
text, metadata = self.extract_text_from_txt(file_path)
else:
raise ValueError(f"Unsupported file format: {file_ext}")
# Extract timestamp
timestamp = self.extract_timestamp(file_path, metadata)
# Split into chunks
langchain_docs = self.text_splitter.create_documents([text])
# Convert to our chunk format
chunks = []
for idx, doc in enumerate(langchain_docs):
chunk_id = self.create_chunk_id(file_path, idx, doc.page_content)
chunk_metadata = metadata.copy()
chunk_metadata.update({
"chunk_size": len(doc.page_content),
"word_count": len(doc.page_content.split())
})
chunk = DocumentChunk(
id=chunk_id,
text=doc.page_content,
source_file=file_path,
chunk_index=idx,
timestamp=timestamp,
metadata=chunk_metadata
)
chunks.append(chunk)
return chunks
def parse_directory(self, directory_path: str) -> List[DocumentChunk]:
"""Parse all documents in a directory."""
all_chunks = []
supported_extensions = ['.pdf', '.txt', '.md', '.csv']
for root, _, files in os.walk(directory_path):
for file in files:
file_ext = os.path.splitext(file)[1].lower()
if file_ext in supported_extensions:
file_path = os.path.join(root, file)
try:
chunks = self.parse_document(file_path)
all_chunks.extend(chunks)
print(f"Parsed {file_path}: {len(chunks)} chunks")
except Exception as e:
print(f"Error parsing {file_path}: {str(e)}")
# Sort chunks by timestamp
all_chunks.sort(key=lambda x: x.timestamp)
return all_chunks
src/episodic_memory/hybrid_index.py
"""
Hybrid search index combining dense vector embeddings and sparse BM25.
Implements reciprocal rank fusion for result merging.
"""
import json
import pickle
from typing import List, Dict, Any, Tuple, Optional
from pathlib import Path
import numpy as np
from rank_bm25 import BM25Okapi
from sentence_transformers import SentenceTransformer
import faiss
class HybridIndex:
"""Combines dense vector index and sparse BM25 index for hybrid search."""
def __init__(self, embedding_model_name: str = "sentence-transformers/all-MiniLM-L6-v2"):
"""Initialize hybrid index with embedding model."""
self.embedding_model = SentenceTransformer(embedding_model_name)
self.bm25_index: Optional[BM25Okapi] = None
self.vector_index: Optional[faiss.IndexFlatIP] = None
self.chunks: List[Dict[str, Any]] = []
self.tokenized_corpus: List[List[str]] = []
def create_tokenized_corpus(self, chunks: List[Dict[str, Any]]) -> List[List[str]]:
"""Tokenize text for BM25 indexing."""
tokenized = []
for chunk in chunks:
# Simple tokenization - split by whitespace and lowercase
tokens = chunk["text"].lower().split()
# Remove very short tokens
tokens = [t for t in tokens if len(t) > 2]
tokenized.append(tokens)
return tokenized
def build_indexes(self, chunks: List[Dict[str, Any]]) -> None:
"""Build both dense and sparse indexes from chunks."""
self.chunks = chunks
print(f"Building indexes for {len(chunks)} chunks...")
# Build BM25 index
print("Building BM25 index...")
self.tokenized_corpus = self.create_tokenized_corpus(chunks)
self.bm25_index = BM25Okapi(self.tokenized_corpus)
# Build dense vector index
print("Building dense vector index...")
texts = [chunk["text"] for chunk in chunks]
embeddings = self.embedding_model.encode(texts, show_progress_bar=True)
embeddings = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True)
# Initialize FAISS index
dimension = embeddings.shape[1]
self.vector_index = faiss.IndexFlatIP(dimension)
self.vector_index.add(embeddings.astype('float32'))
print("Indexes built successfully!")
def dense_search(self, query: str, k: int = 10) -> List[Tuple[int, float]]:
"""Search using dense vector similarity."""
if self.vector_index is None:
raise ValueError("Vector index not built. Call build_indexes first.")
# Encode query
query_embedding = self.embedding_model.encode([query])[0]
query_embedding = query_embedding / np.linalg.norm(query_embedding)
query_embedding = query_embedding.reshape(1, -1).astype('float32')
# Search
distances, indices = self.vector_index.search(query_embedding, k)
# Convert to list of (index, score)
results = []
for idx, dist in zip(indices[0], distances[0]):
if idx != -1: # FAISS returns -1 for missing results
results.append((int(idx), float(dist)))
return results
def sparse_search(self, query: str, k: int = 10) -> List[Tuple[int, float]]:
"""Search using BM25."""
if self.bm25_index is None:
raise ValueError("BM25 index not built. Call build_indexes first.")
# Tokenize query
query_tokens = query.lower().split()
query_tokens = [t for t in query_tokens if len(t) > 2]
# Get scores
scores = self.bm25_index.get_scores(query_tokens)
# Get top k results
top_indices = np.argsort(scores)[::-1][:k]
# Convert to list of (index, score)
results = []
for idx in top_indices:
if scores[idx] > 0: # Only include positive scores
results.append((int(idx), float(scores[idx])))
return results
def reciprocal_rank_fusion(self, ranked_lists: List[List[Tuple[int, float]]], k: int = 60) -> List[Tuple[int, float]]:
"""Combine multiple ranked lists using reciprocal rank fusion."""
fused_scores = {}
for rank_list in ranked_lists:
for rank, (doc_id, _) in enumerate(rank_list):
if doc_id not in fused_scores:
fused_scores[doc_id] = 0.0
# RRF formula: 1 / (k + rank)
fused_scores[doc_id] += 1.0 / (k + rank + 1)
# Sort by fused score
sorted_results = sorted(fused_scores.items(), key=lambda x: x[1], reverse=True)
return [(doc_id, score) for doc_id, score in sorted_results]
def hybrid_search(self, query: str, top_k: int = 10,
dense_weight: float = 0.5, sparse_weight: float = 0.5) -> List[Dict[str, Any]]:
"""Perform hybrid search combining dense and sparse results."""
# Get results from both indexes
dense_results = self.dense_search(query, k=top_k*2)
sparse_results = self.sparse_search(query, k=top_k*2)
# Normalize scores
if dense_results:
max_dense = max(score for _, score in dense_results)
dense_results = [(idx, score/max_dense if max_dense > 0 else 0)
for idx, score in dense_results]
if sparse_results:
max_sparse = max(score for _, score in sparse_results)
sparse_results = [(idx, score/max_sparse if max_sparse > 0 else 0)
for idx, score in sparse_results]
# Apply weights
weighted_dense = [(idx, score * dense_weight) for idx, score in dense_results]
weighted_sparse = [(idx, score * sparse_weight) for idx, score in sparse_results]
# Combine using reciprocal rank fusion
fused_results = self.reciprocal_rank_fusion([weighted_dense, weighted_sparse])
# Get top k results
top_results = fused_results[:top_k]
# Format results with chunk information
formatted_results = []
for doc_id, score in top_results:
if doc_id < len(self.chunks):
result = self.chunks[doc_id].copy()
result["score"] = score
result["chunk_id"] = result.get("id", f"chunk_{doc_id}")
formatted_results.append(result)
return formatted_results
def save_indexes(self, save_dir: str) -> None:
"""Save indexes to disk."""
save_path = Path(save_dir)
save_path.mkdir(parents=True, exist_ok=True)
# Save chunks
with open(save_path / "chunks.json", "w") as f:
json.dump(self.chunks, f, indent=2, default=str)
# Save BM25 index
if self.bm25_index:
with open(save_path / "bm25_index.pkl", "wb") as f:
pickle.dump({
"bm25": self.bm25_index,
"tokenized_corpus": self.tokenized_corpus
}, f)
# Save FAISS index
if self.vector_index:
faiss.write_index(self.vector_index, str(save_path / "vector_index.faiss"))
print(f"Indexes saved to {save_dir}")
def load_indexes(self, load_dir: str) -> None:
"""Load indexes from disk."""
load_path = Path(load_dir)
# Load chunks
with open(load_path / "chunks.json", "r") as f:
self.chunks = json.load(f)
# Load BM25 index
bm25_path = load_path / "bm25_index.pkl"
if bm25_path.exists():
with open(bm25_path, "rb") as f:
bm25_data = pickle.load(f)
self.bm25_index = bm25_data["bm25"]
self.tokenized_corpus = bm25_data["tokenized_corpus"]
# Load FAISS index
faiss_path = load_path / "vector_index.faiss"
if faiss_path.exists():
self.vector_index = faiss.read_index(str(faiss_path))
print(f"Indexes loaded from {load_dir}")
src/semantic_memory/temporal_distiller.py
"""
Temporal distillation module for semantic memory layer.
Analyzes cognitive evolution and extracts research trajectories.
"""
from typing import List, Dict, Any, Optional, Tuple
from datetime import datetime, timedelta
import statistics
from collections import defaultdict
import numpy as np
from dataclasses import dataclass, field
from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.output_parsers import PydanticOutputParser
from pydantic import BaseModel, Field
@dataclass
class TemporalConcept:
"""Represents a concept with temporal evolution data."""
name: str
occurrences: List[datetime] = field(default_factory=list)
contexts: List[str] = field(default_factory=list)
strength: float = 0.0 # Frequency normalized by time
trend: float = 0.0 # Positive = increasing, Negative = decreasing
def add_occurrence(self, timestamp: datetime, context: str) -> None:
"""Add a new occurrence of this concept."""
self.occurrences.append(timestamp)
self.contexts.append(context)
self._update_stats()
def _update_stats(self) -> None:
"""Update statistical measures."""
if len(self.occurrences) < 2:
self.strength = len(self.occurrences)
self.trend = 0
return
# Sort occurrences
sorted_occurrences = sorted(self.occurrences)
# Calculate strength (frequency normalized by recency)
total_days = (sorted_occurrences[-1] - sorted_occurrences[0]).days + 1
self.strength = len(self.occurrences) / max(1, total_days / 30) # Per month
# Calculate trend (linear regression slope)
if len(sorted_occurrences) >= 3:
# Convert dates to numeric values (days since first occurrence)
first_date = sorted_occurrences[0]
x = np.array([(d - first_date).days for d in sorted_occurrences])
y = np.arange(len(x)) # Cumulative count
# Simple linear regression
if len(set(x)) > 1: # Need at least 2 unique x values
slope, _ = np.polyfit(x, y, 1)
self.trend = slope
@dataclass
class ResearchTrajectory:
"""Represents a researcher's cognitive trajectory over time."""
researcher_id: str
time_periods: Dict[str, List[Dict[str, Any]]] = field(default_factory=dict) # Monthly buckets
concepts: Dict[str, TemporalConcept] = field(default_factory=dict)
focus_shifts: List[Dict[str, Any]] = field(default_factory=list)
methodology_changes: List[Dict[str, Any]] = field(default_factory=list)
def add_chunk(self, chunk: Dict[str, Any], extracted_concepts: List[str]) -> None:
"""Add a document chunk to the trajectory."""
timestamp = datetime.fromisoformat(chunk["timestamp"]) if isinstance(chunk["timestamp"], str) else chunk["timestamp"]
# Add to time period bucket (monthly)
period_key = timestamp.strftime("%Y-%m")
if period_key not in self.time_periods:
self.time_periods[period_key] = []
self.time_periods[period_key].append({
"chunk_id": chunk["id"],
"text": chunk["text"],
"concepts": extracted_concepts,
"timestamp": timestamp.isoformat()
})
# Update concept occurrences
for concept in extracted_concepts:
if concept not in self.concepts:
self.concepts[concept] = TemporalConcept(name=concept)
self.concepts[concept].add_occurrence(timestamp, chunk["text"][:200]) # First 200 chars as context
class ConceptEvolutionAnalyzer:
"""Analyzes how concepts evolve over time in research documents."""
def __init__(self, llm_model: str = "gpt-4-turbo-preview"):
"""Initialize analyzer with LLM for concept extraction."""
self.llm = ChatOpenAI(model=llm_model, temperature=0.1)
self.concept_cache = {} # Cache for concept extraction
def extract_concepts(self, text: str, max_concepts: int = 10) -> List[str]:
"""Extract key concepts from text using LLM."""
# Check cache first
cache_key = hash(text)
if cache_key in self.concept_cache:
return self.concept_cache[cache_key]
prompt = ChatPromptTemplate.from_messages([
("system", """You are a scientific concept extractor. Extract the key technical concepts,
methodologies, and research topics from the following text. Return only the concepts as a
comma-separated list. Be precise with technical terminology."""),
("human", "Text: {text}")
])
chain = prompt | self.llm
response = chain.invoke({"text": text[:3000]}) # Limit text length
# Parse response
concepts = [c.strip() for c in response.content.split(",")]
concepts = [c for c in concepts if c and len(c) > 2][:max_concepts]
# Cache result
self.concept_cache[cache_key] = concepts
return concepts
def analyze_trajectory(self, chunks: List[Dict[str, Any]], researcher_id: str = "default") -> ResearchTrajectory:
"""Analyze cognitive trajectory from document chunks."""
trajectory = ResearchTrajectory(researcher_id=researcher_id)
print(f"Analyzing trajectory for {len(chunks)} chunks...")
# Process chunks in chronological order
sorted_chunks = sorted(chunks, key=lambda x: x["timestamp"])
for i, chunk in enumerate(sorted_chunks):
if i % 10 == 0:
print(f"Processed {i}/{len(sorted_chunks)} chunks...")
# Extract concepts
concepts = self.extract_concepts(chunk["text"])
# Add to trajectory
trajectory.add_chunk(chunk, concepts)
# Analyze focus shifts
self._detect_focus_shifts(trajectory)
# Analyze methodology changes
self._detect_methodology_changes(trajectory)
return trajectory
def _detect_focus_shifts(self, trajectory: ResearchTrajectory) -> None:
"""Detect significant shifts in research focus."""
if len(trajectory.concepts) < 2:
return
# Get concepts sorted by occurrence count
sorted_concepts = sorted(
trajectory.concepts.items(),
key=lambda x: len(x[1].occurrences),
reverse=True
)
# Analyze temporal patterns
periods = sorted(trajectory.time_periods.keys())
if len(periods) < 3:
return
# Look for concepts that appear/disappear
concept_period_presence = {}
for concept_name, concept in trajectory.concepts.items():
periods_with_concept = set()
for occurrence in concept.occurrences:
period_key = occurrence.strftime("%Y-%m")
periods_with_concept.add(period_key)
concept_period_presence[concept_name] = periods_with_concept
# Detect shifts (concept appears or disappears significantly)
for i in range(1, len(periods)):
current_period = periods[i]
prev_period = periods[i-1]
# Concepts that appeared in current period but not previous
new_concepts = []
for concept_name, presence in concept_period_presence.items():
if current_period in presence and prev_period not in presence:
# Check if this is a significant new focus
concept = trajectory.concepts[concept_name]
if concept.strength > 0.5: # Threshold
new_concepts.append(concept_name)
if new_concepts:
trajectory.focus_shifts.append({
"period": current_period,
"type": "new_focus",
"concepts": new_concepts,
"description": f"Started focusing on {', '.join(new_concepts[:3])}"
})
def _detect_methodology_changes(self, trajectory: ResearchTrajectory) -> None:
"""Detect changes in research methodology."""
methodology_keywords = {
"experimental", "theoretical", "computational", "simulation",
"analysis", "modeling", "framework", "algorithm", "protocol",
"statistical", "qualitative", "quantitative", "case_study",
"survey", "interview", "observation", "longitudinal"
}
periods = sorted(trajectory.time_periods.keys())
for period in periods:
period_chunks = trajectory.time_periods[period]
period_text = " ".join([c["text"] for c in period_chunks])
period_text_lower = period_text.lower()
methodologies = []
for method in methodology_keywords:
if method in period_text_lower:
methodologies.append(method)
if methodologies:
trajectory.methodology_changes.append({
"period": period,
"methodologies": methodologies,
"count": len(methodologies)
})
def generate_trajectory_summary(self, trajectory: ResearchTrajectory) -> Dict[str, Any]:
"""Generate a summary of the research trajectory."""
# Get top concepts
top_concepts = sorted(
trajectory.concepts.items(),
key=lambda x: x[1].strength,
reverse=True
)[:10]
# Calculate trajectory metrics
total_periods = len(trajectory.time_periods)
concept_diversity = len(trajectory.concepts)
focus_shifts_count = len(trajectory.focus_shifts)
summary = {
"researcher_id": trajectory.researcher_id,
"time_span": {
"start": min(trajectory.time_periods.keys()),
"end": max(trajectory.time_periods.keys()),
"total_periods": total_periods
},
"concept_analysis": {
"total_concepts": concept_diversity,
"top_concepts": [
{
"name": name,
"strength": round(concept.strength, 2),
"trend": round(concept.trend, 3),
"occurrences": len(concept.occurrences)
}
for name, concept in top_concepts
]
},
"dynamics": {
"focus_shifts": trajectory.focus_shifts,
"methodology_changes": trajectory.methodology_changes,
"total_shifts": focus_shifts_count
},
"trajectory_score": round(
(concept_diversity * 0.3 +
focus_shifts_count * 0.4 +
total_periods * 0.3) / max(1, total_periods),
2
)
}
return summary
src/persona_layer/knowledge_graph.py
"""
Knowledge graph construction for persona layer.
Builds weighted graph from temporal concepts with centrality measures.
"""
from typing import List, Dict, Any, Optional, Tuple, Set
from dataclasses import dataclass, field
import networkx as nx
import numpy as np
from collections import defaultdict
@dataclass
class GraphNode:
"""Represents a node in the knowledge graph."""
id: str
name: str
type: str # "concept", "methodology", "topic"
weight: float = 1.0
centrality: float = 0.0
metadata: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
"""Convert node to dictionary."""
return {
"id": self.id,
"name": self.name,
"type": self.type,
"weight": self.weight,
"centrality": self.centrality,
"metadata": self.metadata
}
@dataclass
class GraphEdge:
"""Represents an edge in the knowledge graph."""
source: str
target: str
weight: float = 1.0
relation_type: str = "related_to"
co_occurrence_count: int = 0
def to_dict(self) -> Dict[str, Any]:
"""Convert edge to dictionary."""
return {
"source": self.source,
"target": self.target,
"weight": self.weight,
"relation_type": self.relation_type,
"co_occurrence_count": self.co_occurrence_count
}
class KnowledgeGraphBuilder:
"""Builds and manages the persona knowledge graph."""
def __init__(self):
"""Initialize graph builder."""
self.graph = nx.Graph()
self.nodes: Dict[str, GraphNode] = {}
self.edges: Dict[Tuple[str, str], GraphEdge] = {}
self.node_counter = 0
def build_from_trajectory(self, trajectory_summary: Dict[str, Any]) -> None:
"""Build knowledge graph from research trajectory."""
print("Building knowledge graph from trajectory...")
# Add concept nodes
for concept_data in trajectory_summary["concept_analysis"]["top_concepts"]:
node_id = f"concept_{concept_data['name'].replace(' ', '_').lower()}"
node = GraphNode(
id=node_id,
name=concept_data["name"],
type="concept",
weight=concept_data["strength"],
metadata={
"trend": concept_data["trend"],
"occurrences": concept_data["occurrences"]
}
)
self.add_node(node)
# Add methodology nodes from trajectory
for method_change in trajectory_summary["dynamics"]["methodology_changes"]:
for method in method_change["methodologies"]:
node_id = f"method_{method}"
if node_id not in self.nodes:
node = GraphNode(
id=node_id,
name=method,
type="methodology",
weight=method_change["count"] / len(trajectory_summary["dynamics"]["methodology_changes"])
)
self.add_node(node)
# Create edges based on co-occurrence in focus shifts
self._create_edges_from_shifts(trajectory_summary["dynamics"]["focus_shifts"])
# Calculate centrality measures
self.calculate_centrality()
print(f"Graph built with {len(self.nodes)} nodes and {len(self.edges)} edges")
def _create_edges_from_shifts(self, focus_shifts: List[Dict[str, Any]]) -> None:
"""Create edges between concepts that appear together in focus shifts."""
for shift in focus_shifts:
concepts = shift.get("concepts", [])
if len(concepts) >= 2:
# Create edges between all pairs of concepts in this shift
for i in range(len(concepts)):
for j in range(i + 1, len(concepts)):
node1_id = f"concept_{concepts[i].replace(' ', '_').lower()}"
node2_id = f"concept_{concepts[j].replace(' ', '_').lower()}"
if node1_id in self.nodes and node2_id in self.nodes:
edge_key = tuple(sorted([node1_id, node2_id]))
if edge_key in self.edges:
# Update existing edge
self.edges[edge_key].co_occurrence_count += 1
self.edges[edge_key].weight += 0.2 # Increase weight
else:
# Create new edge
edge = GraphEdge(
source=node1_id,
target=node2_id,
weight=1.0,
relation_type="co_occurrence",
co_occurrence_count=1
)
self.add_edge(edge)
def add_node(self, node: GraphNode) -> None:
"""Add a node to the graph."""
self.nodes[node.id] = node
self.graph.add_node(node.id, **node.to_dict())
def add_edge(self, edge: GraphEdge) -> None:
"""Add an edge to the graph."""
edge_key = (edge.source, edge.target)
self.edges[edge_key] = edge
self.graph.add_edge(
edge.source,
edge.target,
weight=edge.weight,
relation_type=edge.relation_type,
co_occurrence_count=edge.co_occurrence_count
)
def calculate_centrality(self, method: str = "pagerank") -> None:
"""Calculate centrality measures for all nodes."""
if method == "pagerank":
centrality_scores = nx.pagerank(self.graph, weight='weight')
elif method == "betweenness":
centrality_scores = nx.betweenness_centrality(self.graph, weight='weight')
elif method == "eigenvector":
centrality_scores = nx.eigenvector_centrality(self.graph, weight='weight', max_iter=1000)
else:
raise ValueError(f"Unknown centrality method: {method}")
# Update node centrality values
for node_id, score in centrality_scores.items():
if node_id in self.nodes:
self.nodes[node_id].centrality = score
# Also update graph node attributes
nx.set_node_attributes(self.graph, centrality_scores, 'centrality')
def get_subgraph(self, node_ids: List[str], depth: int = 2) -> nx.Graph:
"""Get subgraph around specified nodes up to given depth."""
subgraph_nodes = set()
for node_id in node_ids:
if node_id in self.graph:
# Add nodes within specified distance
for other_node in nx.single_source_shortest_path_length(self.graph, node_id, cutoff=depth):
subgraph_nodes.add(other_node)
return self.graph.subgraph(subgraph_nodes)
def find_connected_components(self) -> List[List[str]]:
"""Find connected components in the graph."""
components = []
for component in nx.connected_components(self.graph):
components.append(list(component))
return components
def get_node_neighbors(self, node_id: str, max_neighbors: int = 10) -> List[Dict[str, Any]]:
"""Get neighbors of a node with their edge weights."""
if node_id not in self.graph:
return []
neighbors = []
for neighbor in self.graph.neighbors(node_id):
edge_data = self.graph.get_edge_data(node_id, neighbor)
neighbor_node = self.nodes.get(neighbor)
if neighbor_node:
neighbors.append({
"node": neighbor_node.to_dict(),
"edge_weight": edge_data.get("weight", 1.0),
"relation_type": edge_data.get("relation_type", "related_to")
})
# Sort by edge weight
neighbors.sort(key=lambda x: x["edge_weight"], reverse=True)
return neighbors[:max_neighbors]
def to_networkx(self) -> nx.Graph:
"""Get the underlying NetworkX graph."""
return self.graph
def to_dict(self) -> Dict[str, Any]:
"""Convert graph to dictionary representation."""
return {
"nodes": [node.to_dict() for node in self.nodes.values()],
"edges": [edge.to_dict() for edge in self.edges.values()],
"metrics": {
"node_count": len(self.nodes),
"edge_count": len(self.edges),
"density": nx.density(self.graph),
"average_degree": sum(dict(self.graph.degree()).values()) / len(self.nodes) if self.nodes else 0
}
}
def save_to_file(self, filepath: str) -> None:
"""Save graph to file."""
import json
graph_data = self.to_dict()
with open(filepath, 'w') as f:
json.dump(graph_data, f, indent=2)
print(f"Graph saved to {filepath}")
def load_from_file(self, filepath: str) -> None:
"""Load graph from file."""
import json
with open(filepath, 'r') as f:
graph_data = json.load(f)
# Clear existing graph
self.graph = nx.Graph()
self.nodes = {}
self.edges = {}
# Load nodes
for node_data in graph_data["nodes"]:
node = GraphNode(
id=node_data["id"],
name=node_data["name"],
type=node_data["type"],
weight=node_data["weight"],
centrality=node_data["centrality"],
metadata=node_data.get("metadata", {})
)
self.add_node(node)
# Load edges
for edge_data in graph_data["edges"]:
edge = GraphEdge(
source=edge_data["source"],
target=edge_data["target"],
weight=edge_data["weight"],
relation_type=edge_data["relation_type"],
co_occurrence_count=edge_data["co_occurrence_count"]
)
self.add_edge(edge)
print(f"Graph loaded from {filepath} with {len(self.nodes)} nodes")
src/braiding_processor/braiding_kernel.py
"""
Braiding kernel implementation for dual-manifold fusion.
Combines individual resonance and collective feasibility scores.
"""
from typing import Dict, List, Any, Tuple, Optional
import numpy as np
from dataclasses import dataclass, field
from enum import Enum
class FusionGateType(Enum):
"""Types of fusion gates for braiding."""
LINEAR = "linear"
GEOMETRIC = "geometric"
STRUCTURAL = "structural"
GATED = "gated"
@dataclass
class BraidingParameters:
"""Parameters for the braiding algorithm."""
alpha_weight: float = 0.4 # Individual resonance weight
beta_weight: float = 0.4 # Collective feasibility weight
gamma: float = 0.2 # Interaction term coefficient
novelty_threshold: float = 0.7
hallucination_threshold: float = 0.1 # Minimum beta for valid ideas
noise_threshold: float = 0.1 # Minimum alpha for relevant ideas
fusion_gate: FusionGateType = FusionGateType.STRUCTURAL
def validate(self) -> None:
"""Validate parameter values."""
if not (0 <= self.alpha_weight <= 1):
raise ValueError("alpha_weight must be between 0 and 1")
if not (0 <= self.beta_weight <= 1):
raise ValueError("beta_weight must be between 0 and 1")
if not (0 <= self.gamma <= 1):
raise ValueError("gamma must be between 0 and 1")
if self.alpha_weight + self.beta_weight + self.gamma > 1.5:
print("Warning: Sum of weights exceeds 1.5, may produce large scores")
class BraidingKernel:
"""
Implements the braiding formula for combining individual and collective scores.
S_braid = λ * α + (1-λ) * β + γ * (α * β) * G(α, β)
where G is a structural gate function.
"""
def __init__(self, parameters: Optional[BraidingParameters] = None):
"""Initialize braiding kernel with parameters."""
self.params = parameters or BraidingParameters()
self.params.validate()
def calculate_individual_resonance(self,
query_embedding: np.ndarray,
persona_graph: Any, # Would be KnowledgeGraph type
gravity_well: Any, # Would be GravityWell type
alpha_cache: Dict[str, float] = None) -> float:
"""
Calculate alpha score: individual resonance.
Measures how well the query aligns with researcher's established history.
"""
if alpha_cache and query_embedding.tobytes() in alpha_cache:
return alpha_cache[query_embedding.tobytes()]
# This is a simplified calculation
# In practice, this would involve:
# 1. Semantic similarity with graph nodes
# 2. Distance from gravity well center
# 3. Historical frequency of similar concepts
# Placeholder calculation
alpha = 0.5 # Base value
# Adjust based on gravity well distance (closer = higher alpha)
# distance = gravity_well.calculate_distance(query_embedding)
# alpha *= np.exp(-distance) # Exponential decay
# Adjust based on graph centrality of similar nodes
# similar_nodes = persona_graph.find_similar_nodes(query_embedding)
# if similar_nodes:
# avg_centrality = np.mean([n.centrality for n in similar_nodes])
# alpha *= (0.5 + avg_centrality)
# Cache result
if alpha_cache is not None:
alpha_cache[query_embedding.tobytes()] = alpha
return alpha
def calculate_collective_feasibility(self,
query_embedding: np.ndarray,
community_graph: Any, # Would be CommunityGraph type
wireframe: Any, # Would be WireframeBuilder type
beta_cache: Dict[str, float] = None) -> float:
"""
Calculate beta score: collective feasibility.
Measures how strongly the query is supported by community knowledge.
"""
if beta_cache and query_embedding.tobytes() in beta_cache:
return beta_cache[query_embedding.tobytes()]
# This is a simplified calculation
# In practice, this would involve:
# 1. Random walk probability in community graph
# 2. Citation network support
# 3. Publication frequency of related concepts
# Placeholder calculation
beta = 0.5 # Base value
# Adjust based on community graph connectivity
# connected_nodes = community_graph.find_connected_nodes(query_embedding)
# if connected_nodes:
# beta *= (0.3 + 0.7 * len(connected_nodes) / 100) # Normalized
# Adjust based on wireframe support
# support = wireframe.calculate_support(query_embedding)
# beta *= (0.5 + 0.5 * support)
# Cache result
if beta_cache is not None:
beta_cache[query_embedding.tobytes()] = beta
return beta
def apply_structural_gate(self, alpha: float, beta: float) -> float:
"""
Apply structural gate function G(α, β).
Filters hallucinations and irrelevant noise.
"""
gate_type = self.params.fusion_gate
if gate_type == FusionGateType.LINEAR:
# Simple linear combination
return self.params.alpha_weight * alpha + self.params.beta_weight * beta
elif gate_type == FusionGateType.GEOMETRIC:
# Geometric mean emphasizes balanced scores
if alpha > 0 and beta > 0:
return (alpha * beta) ** 0.5
return 0
elif gate_type == FusionGateType.STRUCTURAL:
# Structural gate from the paper
# Filters hallucinations (high alpha, low beta) and noise (low alpha, high beta)
# Check for hallucinations
if alpha > self.params.novelty_threshold and beta < self.params.hallucination_threshold:
return -alpha * 0.5 # Penalize hallucinations
# Check for irrelevant noise
if alpha < self.params.noise_threshold and beta > self.params.novelty_threshold:
return -beta * 0.3 # Penalize irrelevant concepts
# Valid combination
interaction = alpha * beta
linear_component = self.params.alpha_weight * alpha + self.params.beta_weight * beta
return linear_component + self.params.gamma * interaction
elif gate_type == FusionGateType.GATED:
# Gated fusion with sigmoid activation
gate = 1 / (1 + np.exp(-10 * (alpha * beta - 0.5))) # Sigmoid gate
return gate * (alpha + beta) / 2
else:
raise ValueError(f"Unknown fusion gate type: {gate_type}")
def braid_scores(self,
alpha: float,
beta: float,
query_text: Optional[str] = None) -> Dict[str, Any]:
"""
Calculate braided score using the full formula.
Returns detailed scoring breakdown.
"""
# Apply structural gate
gate_value = self.apply_structural_gate(alpha, beta)
# Calculate final braided score
if self.params.fusion_gate == FusionGateType.STRUCTURAL:
# For structural gate, gate_value is already the final score
braided_score = gate_value
else:
# For other gates, combine with interaction term
interaction = alpha * beta
braided_score = gate_value + self.params.gamma * interaction
# Normalize score to [0, 1] range (can be negative for invalid ideas)
braided_score = max(-1, min(1, braided_score))
# Determine validity
is_valid = (
braided_score > 0 and
alpha > self.params.noise_threshold and
beta > self.params.hallucination_threshold
)
# Classify result type
if braided_score < 0:
result_type = "invalid"
elif alpha > self.params.novelty_threshold and beta < self.params.hallucination_threshold:
result_type = "hallucination"
elif alpha < self.params.noise_threshold and beta > self.params.novelty_threshold:
result_type = "noise"
elif braided_score > self.params.novelty_threshold:
result_type = "novel"
else:
result_type = "conventional"
# Calculate novelty score (how different from existing knowledge)
novelty_score = alpha * (1 - beta) # High individual, low collective
return {
"braided_score": round(braided_score, 4),
"alpha": round(alpha, 4),
"beta": round(beta, 4),
"gate_value": round(gate_value, 4),
"interaction": round(alpha * beta, 4),
"is_valid": is_valid,
"result_type": result_type,
"novelty_score": round(novelty_score, 4),
"parameters": {
"alpha_weight": self.params.alpha_weight,
"beta_weight": self.params.beta_weight,
"gamma": self.params.gamma,
"fusion_gate": self.params.fusion_gate.value
}
}
def braid_multiple_queries(self,
queries: List[Tuple[np.ndarray, str]],
persona_graph: Any,
community_graph: Any,
gravity_well: Any,
wireframe: Any) -> List[Dict[str, Any]]:
"""
Braid multiple queries and return sorted results.
"""
results = []
alpha_cache = {}
beta_cache = {}
for query_embedding, query_text in queries:
# Calculate individual and collective scores
alpha = self.calculate_individual_resonance(
query_embedding, persona_graph, gravity_well, alpha_cache
)
beta = self.calculate_collective_feasibility(
query_embedding, community_graph, wireframe, beta_cache
)
# Braid scores
braiding_result = self.braid_scores(alpha, beta, query_text)
braiding_result["query"] = query_text
braiding_result["query_embedding"] = query_embedding.tolist()
results.append(braiding_result)
# Sort by braided score (descending)
results.sort(key=lambda x: x["braided_score"], reverse=True)
return results
def find_optimal_ideas(self,
candidate_ideas: List[Dict[str, Any]],
persona_graph: Any,
community_graph: Any,
top_k: int = 5) -> List[Dict[str, Any]]:
"""
Find optimal research ideas from candidate list.
"""
# Extract queries from candidate ideas
queries = []
for idea in candidate_ideas:
query_embedding = np.array(idea.get("embedding", [0] * 384)) # Default dimension
query_text = idea.get("description", "")
queries.append((query_embedding, query_text))
# Braid all queries
braided_results = self.braid_multiple_queries(
queries, persona_graph, community_graph,
gravity_well=None, wireframe=None # Would need actual instances
)
# Filter valid and novel ideas
optimal_ideas = []
for result in braided_results:
if result["is_valid"] and result["result_type"] == "novel":
# Find original idea data
original_idea = next(
(idea for idea in candidate_ideas
if idea.get("description") == result["query"]),
None
)
if original_idea:
optimal_idea = original_idea.copy()
optimal_idea.update({
"braiding_scores": result,
"overall_score": result["braided_score"]
})
optimal_ideas.append(optimal_idea)
# Return top k ideas
return optimal_ideas[:top_k]
def optimize_parameters(self,
training_data: List[Dict[str, Any]],
validation_data: List[Dict[str, Any]]) -> BraidingParameters:
"""
Optimize braiding parameters using training data.
This is a placeholder for actual optimization logic.
"""
print("Optimizing braiding parameters...")
# Simple grid search (would be more sophisticated in practice)
best_params = None
best_score = -float('inf')
for alpha_weight in [0.3, 0.4, 0.5]:
for beta_weight in [0.3, 0.4, 0.5]:
for gamma in [0.1, 0.2, 0.3]:
params = BraidingParameters(
alpha_weight=alpha_weight,
beta_weight=beta_weight,
gamma=gamma
)
# Evaluate on validation data
score = self._evaluate_parameters(params, validation_data)
if score > best_score:
best_score = score
best_params = params
print(f"Best score: {best_score}")
return best_params
def _evaluate_parameters(self,
params: BraidingParameters,
validation_data: List[Dict[str, Any]]) -> float:
"""
Evaluate parameters on validation data.
Returns average score.
"""
self.params = params
scores = []
for data_point in validation_data:
alpha = data_point.get("alpha", 0.5)
beta = data_point.get("beta", 0.5)
expected_score = data_point.get("expected_score", 0)
result = self.braid_scores(alpha, beta)
predicted_score = result["braided_score"]
# Calculate error (would use more sophisticated metric in practice)
error = abs(predicted_score - expected_score)
scores.append(1 - error) # Higher is better
return np.mean(scores) if scores else 0
README.md
# Dual Manifold Cognitive Architecture
An advanced AI system that models individual researcher cognition and community knowledge as separate manifolds, then performs braided optimization to discover novel research directions.
## Overview
This system implements the architecture described in the "AI Dual Manifold Cognitive Architecture" video, creating a cognitive digital twin of researchers that can:
- Parse and analyze research documents over time
- Build weighted knowledge graphs of expertise
- Create gravity well representations of comfort zones
- Access collective scientific knowledge via OpenAlex
- Perform braided optimization to find novel research directions
- Generate personalized research proposals
## Architecture
### Core Components
1. **Episodic Memory Layer**
- Hybrid search (dense vectors + BM25)
- Timestamped document chunks
- Reciprocal rank fusion
2. **Semantic Memory Layer**
- Temporal concept extraction
- Cognitive trajectory analysis
- Research focus shift detection
3. **Persona Layer**
- Weighted knowledge graph construction
- Centrality measure calculation
- Gravity well/KDE representation
4. **Collective Manifold**
- OpenAlex API integration
- Community knowledge graph
- Wireframe manifold estimation
5. **Braiding Processor**
- Individual resonance (alpha) scoring
- Collective feasibility (beta) scoring
- Structural gate fusion
- Novelty optimization
## Installation
### Prerequisites
- Python 3.10+
- Docker (optional)
- OpenAI API key
- OpenAlex API key
### Quick Start
```bash
# Clone repository
git clone https://github.com/yourusername/dual-manifold-ai.git
cd dual-manifold-ai
# Create virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install dependencies
pip install -r requirements.txt
# Set up environment variables
cp .env.example .env
# Edit .env with your API keys
# Create data directories
mkdir -p data/raw_documents
mkdir -p data/processed
mkdir -p data/indices
# Run tests
python -m pytest tests/
Docker Installation
# Build and run with Docker Compose
docker-compose up --build
# Or build individually
docker build -t dual-manifold-ai .
docker run -p 8000:8000 dual-manifold-ai
Configuration
Edit config/settings.py or set environment variables:
export OPENAI_API_KEY="your-key-here"
export OPENALEX_API_KEY="your-key-here"
export DATA_DIR="./data"
export DEBUG="True"
Usage
1. Import Research Documents
Place your research documents (PDFs, text files) in data/raw_documents/:
cp ~/research_papers/*.pdf data/raw_documents/
2. Parse and Index Documents
from src.episodic_memory.document_parser import DocumentParser
from src.episodic_memory.hybrid_index import HybridIndex
from config.settings import settings
# Parse documents
parser = DocumentParser(
chunk_size=settings.CHUNK_SIZE,
chunk_overlap=settings.CHUNK_OVERLAP
)
chunks = parser.parse_directory(settings.RAW_DOCS_DIR)
# Build hybrid index
index = HybridIndex(embedding_model_name=settings.EMBEDDING_MODEL)
index.build_indexes([chunk.to_dict() for chunk in chunks])
# Save indexes
index.save_indexes(settings.INDICES_DIR)
3. Analyze Cognitive Trajectory
from src.semantic_memory.temporal_distiller import ConceptEvolutionAnalyzer
analyzer = ConceptEvolutionAnalyzer(llm_model=settings.LLM_MODEL)
trajectory = analyzer.analyze_trajectory(
[chunk.to_dict() for chunk in chunks],
researcher_id="researcher_001"
)
summary = analyzer.generate_trajectory_summary(trajectory)
print(f"Trajectory score: {summary['trajectory_score']}")
4. Build Persona Knowledge Graph
from src.persona_layer.knowledge_graph import KnowledgeGraphBuilder
graph_builder = KnowledgeGraphBuilder()
graph_builder.build_from_trajectory(summary)
# Calculate centrality
graph_builder.calculate_centrality(method=settings.CENTRALITY_MEASURE)
# Save graph
graph_builder.save_to_file("data/persona_graph.json")
5. Perform Braided Search
from src.braiding_processor.braiding_kernel import BraidingKernel
from src.utils.embeddings import EmbeddingGenerator
# Initialize components
braiding_kernel = BraidingKernel()
embedding_generator = EmbeddingGenerator(model_name=settings.EMBEDDING_MODEL)
# Example research query
query = "neural networks for drug discovery"
query_embedding = embedding_generator.encode(query)
# Calculate scores (simplified - would need actual graph instances)
alpha = 0.7 # Individual resonance
beta = 0.6 # Collective feasibility
# Braid scores
result = braiding_kernel.braid_scores(alpha, beta, query)
print(f"Braided score: {result['braided_score']}")
print(f"Result type: {result['result_type']}")
6. Use the API Server
# Start the API server
uvicorn api.server:app --reload --host 0.0.0.0 --port 8000
Then access the API at http://localhost:8000/docs for Swagger UI.
API Endpoints
POST /api/analyze/researcher- Analyze researcher documentsGET /api/trajectory/{researcher_id}- Get cognitive trajectoryPOST /api/braid/suggest- Get research suggestionsGET /api/graph/{researcher_id}- Get persona knowledge graphPOST /api/optimize/parameters- Optimize braiding parameters
Example Research Proposal Generation
import requests
# Example API call to get research suggestions
response = requests.post(
"http://localhost:8000/api/braid/suggest",
json={
"researcher_id": "researcher_001",
"query": "quantum machine learning applications",
"max_suggestions": 3
}
)
suggestions = response.json()
for suggestion in suggestions:
print(f"Title: {suggestion['title']}")
print(f"Novelty Score: {suggestion['novelty_score']}")
print(f"Description: {suggestion['description']}")
print("---")
Configuration Parameters
Braiding Parameters
alpha_weight: Weight for individual resonance (default: 0.4)beta_weight: Weight for collective feasibility (default: 0.4)gamma: Interaction term coefficient (default: 0.2)novelty_threshold: Minimum score for novel ideas (default: 0.7)fusion_gate: Type of fusion (linear, geometric, structural, gated)
Search Parameters
HYBRID_SEARCH_WEIGHT: Balance between dense/sparse search (0.5)TOP_K_RESULTS: Number of search results (10)CHUNK_SIZE: Document chunk size (1000)CHUNK_OVERLAP: Chunk overlap (200)
Advanced Features
Custom Embedding Models
Edit config/settings.py:
EMBEDDING_MODEL = "sentence-transformers/all-mpnet-base-v2"
Multi-Researcher Analysis
# Analyze multiple researchers
researchers = ["researcher_001", "researcher_002"]
for researcher in researchers:
# Load researcher-specific documents
# Build individual persona
# Compare trajectories
Real-time Updates
# Watch directory for new documents
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class ResearchUpdateHandler(FileSystemEventHandler):
def on_created(self, event):
if event.src_path.endswith('.pdf'):
print(f"New document: {event.src_path}")
# Trigger re-analysis
Performance Tuning
Index Optimization
# Adjust FAISS index parameters
index = faiss.IndexIVFFlat(
quantizer,
dimension,
nlist,
faiss.METRIC_INNER_PRODUCT
)
Cache Configuration
# Enable Redis caching
settings.ENABLE_CACHE = True
settings.REDIS_URL = "redis://localhost:6379"
Troubleshooting
Common Issues
-
Memory Issues
- Reduce
CHUNK_SIZE - Use smaller embedding models
- Enable disk-based caching
- Reduce
-
API Rate Limits
- Implement exponential backoff
- Use request pooling
- Cache API responses
-
Slow Performance
- Enable GPU acceleration
- Use batch processing
- Optimize graph algorithms
Debug Mode
export DEBUG=True
python -m debugpy --listen 5678 --wait-for-client main.py
Contributing
- Fork the repository
- Create a feature branch
- Add tests for new functionality
- Ensure all tests pass
- Submit a pull request
Development Setup
# Install development dependencies
pip install -r requirements-dev.txt
# Run tests with coverage
pytest --cov=src --cov-report=html
# Type checking
mypy src/
# Code formatting
black src/
isort src/
Citation
If you use this system in your research, please cite:
Zeng, Q., Fan, B., Chen, Z., Ren, S., & Zhang, Z. (2025).
MirrorMind: Empowering OmniScientist with Expert Perspectives.
arXiv preprint arXiv:2511.XXXXX.
License
MIT License - see LICENSE file for details.
Support
- Issues: GitHub Issues
- Documentation: ReadTheDocs
- Email: support@dualmanifold.ai
Acknowledgments
- Inspired by research from Tsinghua University
- OpenAlex for scientific publication data
- The AI research community for foundational work
## SETUP:
```bash
#!/bin/bash
# setup.sh - Complete setup script for Dual Manifold Cognitive Architecture
set -e # Exit on error
echo "Setting up Dual Manifold Cognitive Architecture..."
echo "=================================================="
# Check Python version
echo "Checking Python version..."
python --version | grep -q "3.10" || echo "Warning: Python 3.10+ recommended"
# Create project structure
echo "Creating project structure..."
mkdir -p config data/{raw_documents,processed,indices} src/{episodic_memory,semantic_memory,persona_layer,collective_manifold,braiding_processor,agents,optimization,utils} tests api notebooks logs
# Create virtual environment
echo "Creating virtual environment..."
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Upgrade pip
echo "Upgrading pip..."
pip install --upgrade pip
# Install dependencies
echo "Installing dependencies..."
pip install -r requirements.txt
# Install development dependencies
echo "Installing development dependencies..."
pip install -r requirements-dev.txt
# Create environment file
echo "Creating environment configuration..."
cat > .env << EOL
# API Keys (replace with your actual keys)
OPENAI_API_KEY=your_openai_api_key_here
OPENALEX_API_KEY=your_openalex_api_key_here
HUGGINGFACE_TOKEN=your_huggingface_token_here
# Paths
DATA_DIR=./data
LOGS_DIR=./logs
# Settings
DEBUG=False
ENABLE_CACHE=True
EOL
echo "Please edit .env file with your actual API keys!"
# Create example configuration
echo "Creating example configuration files..."
# Create example document
mkdir -p examples/documents
cat > examples/documents/example_paper.txt << EOL
Title: Advances in Graph Neural Networks for Molecular Modeling
Author: Researcher A
Date: 2024-01-15
Abstract: This paper explores the application of graph neural networks to molecular property prediction. We introduce a novel attention mechanism that improves prediction accuracy by 15% compared to baseline methods.
Introduction: Molecular representation learning has been a challenging problem in computational chemistry. Traditional methods like Morgan fingerprints have limitations in capturing complex molecular structures.
Methodology: We propose GNN-Mol, a graph neural network architecture with multi-head attention. The model processes molecular graphs where atoms are nodes and bonds are edges.
Results: Our method achieves state-of-the-art results on the QM9 dataset, with particular improvements in predicting molecular dipole moments.
Conclusion: Graph neural networks show great promise for molecular modeling, especially when combined with attention mechanisms.
EOL
# Create Docker configuration
cat > Dockerfile << EOL
FROM python:3.10-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
build-essential \
curl \
software-properties-common \
git \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application
COPY . .
# Create non-root user
RUN useradd -m -u 1000 user
RUN chown -R user:user /app
USER user
# Expose port
EXPOSE 8000
# Health check
HEALTHCHECK CMD curl --fail http://localhost:8000/health || exit 1
# Run application
CMD ["uvicorn", "api.server:app", "--host", "0.0.0.0", "--port", "8000"]
EOL
# Create docker-compose file
cat > docker-compose.yml << EOL
version: '3.8'
services:
dual-manifold-ai:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=\${OPENAI_API_KEY}
- OPENALEX_API_KEY=\${OPENALEX_API_KEY}
- DEBUG=False
volumes:
- ./data:/app/data
- ./logs:/app/logs
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
restart: unless-stopped
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis-data:/data
restart: unless-stopped
postgres:
image: postgres:15-alpine
environment:
POSTGRES_USER: dualmanifold
POSTGRES_PASSWORD: securepassword
POSTGRES_DB: dualmanifold_db
ports:
- "5432:5432"
volumes:
- postgres-data:/var/lib/postgresql/data
restart: unless-stopped
volumes:
redis-data:
postgres-data:
EOL
# Create test script
cat > test_system.py << EOL
#!/usr/bin/env python3
"""
Test script for the Dual Manifold Cognitive Architecture.
"""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from config.settings import settings
def test_environment():
"""Test basic environment setup."""
print("Testing environment setup...")
# Check directories
required_dirs = [
settings.DATA_DIR,
settings.RAW_DOCS_DIR,
settings.PROCESSED_DIR,
settings.INDICES_DIR,
settings.LOGS_DIR
]
for dir_path in required_dirs:
if os.path.exists(dir_path):
print(f"✓ Directory exists: {dir_path}")
else:
print(f"✗ Missing directory: {dir_path}")
return False
# Check environment variables
if settings.OPENAI_API_KEY == "your_openai_api_key_here":
print("⚠ Warning: Using default OpenAI API key")
print("Environment test passed!")
return True
def test_imports():
"""Test that all modules can be imported."""
print("\nTesting module imports...")
modules = [
"config.settings",
"src.episodic_memory.document_parser",
"src.episodic_memory.hybrid_index",
"src.semantic_memory.temporal_distiller",
"src.persona_layer.knowledge_graph",
"src.braiding_processor.braiding_kernel"
]
for module in modules:
try:
__import__(module)
print(f"✓ Imported: {module}")
except ImportError as e:
print(f"✗ Failed to import {module}: {e}")
return False
print("Import test passed!")
return True
def main():
"""Run all tests."""
print("=" * 50)
print("Dual Manifold Cognitive Architecture - System Test")
print("=" * 50)
tests = [test_environment, test_imports]
all_passed = True
for test in tests:
try:
if not test():
all_passed = False
except Exception as e:
print(f"✗ Test failed with exception: {e}")
all_passed = False
print("\n" + "=" * 50)
if all_passed:
print("✅ All tests passed! System is ready.")
print("\nNext steps:")
print("1. Add your research documents to data/raw_documents/")
print("2. Update API keys in .env file")
print("3. Run: python examples/analyze_researcher.py")
print("4. Start API server: uvicorn api.server:app --reload")
else:
print("❌ Some tests failed. Please check the errors above.")
sys.exit(1)
if __name__ == "__main__":
main()
EOL
chmod +x test_system.py
# Create example analysis script
mkdir -p examples
cat > examples/analyze_researcher.py << EOL
#!/usr/bin/env python3
"""
Example script to analyze a researcher's documents.
"""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from config.settings import settings
from src.episodic_memory.document_parser import DocumentParser
from src.episodic_memory.hybrid_index import HybridIndex
from src.semantic_memory.temporal_distiller import ConceptEvolutionAnalyzer
from src.persona_layer.knowledge_graph import KnowledgeGraphBuilder
def main():
"""Main analysis pipeline."""
print("Starting researcher analysis pipeline...")
# Step 1: Parse documents
print("\n1. Parsing documents...")
parser = DocumentParser(
chunk_size=settings.CHUNK_SIZE,
chunk_overlap=settings.CHUNK_OVERLAP
)
# Copy example document to data directory
import shutil
example_doc = "examples/documents/example_paper.txt"
if os.path.exists(example_doc):
shutil.copy(example_doc, settings.RAW_DOCS_DIR)
chunks = parser.parse_directory(settings.RAW_DOCS_DIR)
if not chunks:
print("No documents found. Please add documents to data/raw_documents/")
return
print(f"Parsed {len(chunks)} chunks from documents")
# Step 2: Build search index
print("\n2. Building hybrid search index...")
index = HybridIndex(embedding_model_name=settings.EMBEDDING_MODEL)
index.build_indexes([chunk.to_dict() for chunk in chunks])
index.save_indexes(settings.INDICES_DIR)
# Test search
test_query = "graph neural networks"
results = index.hybrid_search(test_query, top_k=3)
print(f"Test search for '{test_query}' found {len(results)} results")
# Step 3: Analyze cognitive trajectory
print("\n3. Analyzing cognitive trajectory...")
analyzer = ConceptEvolutionAnalyzer(llm_model=settings.LLM_MODEL)
trajectory = analyzer.analyze_trajectory(
[chunk.to_dict() for chunk in chunks],
researcher_id="example_researcher"
)
summary = analyzer.generate_trajectory_summary(trajectory)
print(f"Trajectory score: {summary['trajectory_score']}")
print(f"Total concepts: {summary['concept_analysis']['total_concepts']}")
print(f"Focus shifts: {summary['dynamics']['total_shifts']}")
# Step 4: Build knowledge graph
print("\n4. Building knowledge graph...")
graph_builder = KnowledgeGraphBuilder()
graph_builder.build_from_trajectory(summary)
graph_builder.calculate_centrality(method=settings.CENTRALITY_MEASURE)
graph_data = graph_builder.to_dict()
print(f"Graph built with {graph_data['metrics']['node_count']} nodes")
print(f"Graph density: {graph_data['metrics']['density']:.3f}")
# Save results
import json
with open("data/researcher_analysis.json", "w") as f:
json.dump({
"summary": summary,
"graph": graph_data
}, f, indent=2)
print("\n✅ Analysis complete!")
print(f"Results saved to data/researcher_analysis.json")
if __name__ == "__main__":
main()
EOL
chmod +x examples/analyze_researcher.py
# Create requirements.txt
cat > requirements.txt << EOL
# Core dependencies
python-dotenv>=1.0.0
pydantic>=2.0.0
pydantic-settings>=2.0.0
# Data processing
pandas>=2.0.0
numpy>=1.24.0
scipy>=1.10.0
scikit-learn>=1.3.0
# Document parsing
pdfplumber>=0.10.0
langchain>=0.1.0
langchain-openai>=0.0.1
pypdf>=3.17.0
# Embeddings and search
sentence-transformers>=2.2.0
faiss-cpu>=1.7.0
rank-bm25>=0.2.2
chromadb>=0.4.0
# Graph processing
networkx>=3.0
python-igraph>=0.10.0
# Machine learning
torch>=2.0.0
transformers>=4.30.0
# API and web
fastapi>=0.104.0
uvicorn[standard]>=0.24.0
httpx>=0.25.0
aiohttp>=3.9.0
# Utilities
tqdm>=4.65.0
loguru>=0.7.0
cachetools>=5.3.0
redis>=5.0.0
# Development
pytest>=7.4.0
pytest-asyncio>=0.21.0
pytest-cov>=4.1.0
black>=23.0.0
isort>=5.12.0
mypy>=1.5.0
pre-commit>=3.5.0
EOL
# Create requirements-dev.txt
cat > requirements-dev.txt << EOL
# Testing
pytest>=7.4.0
pytest-asyncio>=0.21.0
pytest-cov>=4.1.0
pytest-mock>=3.11.0
pytest-xdist>=3.3.0
# Code quality
black>=23.0.0
isort>=5.12.0
flake8>=6.0.0
mypy>=1.5.0
pre-commit>=3.5.0
bandit>=1.7.0
safety>=2.3.0
# Documentation
mkdocs>=1.5.0
mkdocs-material>=9.0.0
mkdocstrings[python]>=0.23.0
# Monitoring
sentry-sdk>=1.35.0
prometheus-client>=0.18.0
# Notebooks
jupyter>=1.0.0
jupyterlab>=4.0.0
ipywidgets>=8.0.0
EOL
# Create pre-commit config
cat > .pre-commit-config.yaml << EOL
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.5.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: check-added-large-files
- id: check-merge-conflict
- id: check-case-conflict
- id: check-toml
- repo: https://github.com/psf/black
rev: 23.11.0
hooks:
- id: black
language_version: python3.10
- repo: https://github.com/pycqa/isort
rev: 5.12.0
hooks:
- id: isort
args: ["--profile", "black"]
- repo: https://github.com/pycqa/flake8
rev: 6.1.0
hooks:
- id: flake8
args: ["--max-line-length=88", "--extend-ignore=E203,W503"]
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.5.1
hooks:
- id: mypy
additional_dependencies:
- types-requests
- types-pyyaml
- types-redis
- pydantic
args: ["--ignore-missing-imports", "--strict"]
- repo: https://github.com/PyCQA/bandit
rev: 1.7.5
hooks:
- id: bandit
args: ["-c", "pyproject.toml"]
- repo: https://github.com/python-poetry/poetry
rev: 1.6.0
hooks:
- id: poetry-check
- id: poetry-lock
EOL
echo "Setup complete!"
echo ""
echo "To get started:"
echo "1. Activate virtual environment: source venv/bin/activate"
echo "2. Edit .env file with your API keys"
echo "3. Run system test: ./test_system.py"
echo "4. Try example analysis: python examples/analyze_researcher.py"
echo ""
echo "For API development:"
echo "uvicorn api.server:app --reload --host 0.0.0.0 --port 8000"
TAKEAWAYS:
- Dual manifolds separate individual and collective knowledge spaces.
- Braiding combines scores through gated structural fusion.
- Hybrid search ensures precise technical term matching.
- Temporal analysis reveals cognitive evolution patterns.
- Gravity wells represent expertise comfort zones.
- Novelty repulsors push researchers beyond existing knowledge.
- Structural gates filter hallucinations and noise effectively.
- Centrality measures quantify concept importance dynamically.
- Linearization prepares complex graphs for LLM consumption.
- Constraint optimization finds Goldilocks zone intersections.
- Multi-agent coordination enables interdisciplinary discovery.
- Non-parametric structures shift intelligence from model weights.
- Markovian assumption breaks with historical dependencies.
- Reciprocal rank fusion balances semantic and lexical search.
- Kernel density estimation creates smooth manifold representations.
- Research trajectories provide personalized cognitive models.
SUGGESTIONS:
- Implement Riemannian manifold learning for curved spaces.
- Add real-time document ingestion with filesystem monitoring.
- Create interactive visualization dashboard for gravity wells.
- Integrate with academic search engines beyond OpenAlex.
- Develop federated learning for multi-researcher collaboration.
- Add reinforcement learning for parameter optimization.
- Implement quantum-inspired algorithms for complex optimization.
- Create browser extension for seamless research integration.
- Develop mobile app for on-the-go research suggestions.
- Add multilingual support for international research.
- Implement differential privacy for sensitive research data.
- Create plugin system for custom domain agents.
- Add blockchain for research provenance tracking.
- Develop simulation environment for hypothesis testing.
- Implement transfer learning between researcher personas.
- Create API marketplace for specialized domain modules.