""" Internal utilities for the memory system. Port of OpenClaw's src/memory/internal.ts: • hashText — SHA-256 content hashing • chunkMarkdown — split markdown into overlapping chunks • listMemoryFiles — discover .md files in workspace • buildFileEntry — create MemoryFileEntry from a file • cosineSimilarity — vector similarity calculation """ import hashlib import os from pathlib import Path from memory.types import MemoryChunk, MemoryFileEntry def hash_text(value: str) -> str: """SHA-256 hash of text content. Mirrors OpenClaw's hashText().""" return hashlib.sha256(value.encode("utf-8")).hexdigest() def chunk_markdown( content: str, chunk_tokens: int = 512, chunk_overlap: int = 50, ) -> list[MemoryChunk]: """ Split markdown content into overlapping chunks. Direct port of OpenClaw's chunkMarkdown() from internal.ts. Uses character-based approximation: ~4 chars per token. """ lines = content.split("\n") if not lines: return [] max_chars = max(32, chunk_tokens * 4) overlap_chars = max(0, chunk_overlap * 4) chunks: list[MemoryChunk] = [] current: list[tuple[str, int]] = [] # (line_text, 1-indexed line_no) current_chars = 0 def flush() -> None: nonlocal current, current_chars if not current: return text = "\n".join(line for line, _ in current) start_line = current[0][1] end_line = current[-1][1] chunks.append(MemoryChunk( start_line=start_line, end_line=end_line, text=text, hash=hash_text(text), )) def carry_overlap() -> None: nonlocal current, current_chars if overlap_chars <= 0 or not current: current = [] current_chars = 0 return acc = 0 kept: list[tuple[str, int]] = [] for line_text, line_no in reversed(current): acc += len(line_text) + 1 kept.insert(0, (line_text, line_no)) if acc >= overlap_chars: break current = kept current_chars = sum(len(lt) + 1 for lt, _ in kept) for i, line in enumerate(lines): line_no = i + 1 # Handle very long lines by splitting into segments segments = [""] if not line else [ line[start:start + max_chars] for start in range(0, len(line), max_chars) ] for segment in segments: line_size = len(segment) + 1 if current_chars + line_size > max_chars and current: flush() carry_overlap() current.append((segment, line_no)) current_chars += line_size flush() return chunks def list_memory_files( workspace_dir: str, extra_paths: list[str] | None = None, ) -> list[str]: """ List all markdown files in the workspace memory directory. Port of OpenClaw's listMemoryFiles() from internal.ts. Searches for: - MEMORY.md (or memory.md) in workspace root - All .md files in memory/ subdirectory - Any additional paths specified """ result: list[str] = [] ws = Path(workspace_dir).expanduser().resolve() # Check MEMORY.md and memory.md in workspace root for name in ("MEMORY.md", "memory.md"): candidate = ws / name if candidate.is_file() and not candidate.is_symlink(): result.append(str(candidate)) # Check SOUL.md and USER.md (identity files) for name in ("SOUL.md", "USER.md"): candidate = ws / name if candidate.is_file() and not candidate.is_symlink(): result.append(str(candidate)) # Walk memory/ subdirectory memory_dir = ws / "memory" if memory_dir.is_dir() and not memory_dir.is_symlink(): _walk_md_files(memory_dir, result) # Extra paths if extra_paths: for extra in extra_paths: p = Path(extra).expanduser().resolve() if p.is_symlink(): continue if p.is_dir(): _walk_md_files(p, result) elif p.is_file() and p.suffix == ".md": result.append(str(p)) # Deduplicate by resolved path seen: set[str] = set() deduped: list[str] = [] for entry in result: real = os.path.realpath(entry) if real not in seen: seen.add(real) deduped.append(entry) return deduped def _walk_md_files(directory: Path, result: list[str]) -> None: """Recursively collect .md files from a directory.""" try: for entry in sorted(directory.iterdir()): if entry.is_symlink(): continue if entry.is_dir(): _walk_md_files(entry, result) elif entry.is_file() and entry.suffix == ".md": result.append(str(entry)) except PermissionError: pass def build_file_entry(abs_path: str, workspace_dir: str) -> MemoryFileEntry: """ Create a MemoryFileEntry from a file path. Port of OpenClaw's buildFileEntry() from internal.ts. """ stat = os.stat(abs_path) with open(abs_path, "r", encoding="utf-8") as f: content = f.read() content_hash = hash_text(content) rel_path = os.path.relpath(abs_path, workspace_dir).replace("\\", "/") return MemoryFileEntry( path=rel_path, abs_path=abs_path, mtime_ms=stat.st_mtime * 1000, size=stat.st_size, hash=content_hash, ) def cosine_similarity(a: list[float], b: list[float]) -> float: """ Compute cosine similarity between two vectors. Port of OpenClaw's cosineSimilarity() from internal.ts. """ if not a or not b: return 0.0 length = min(len(a), len(b)) dot = 0.0 norm_a = 0.0 norm_b = 0.0 for i in range(length): av = a[i] bv = b[i] dot += av * bv norm_a += av * av norm_b += bv * bv if norm_a == 0.0 or norm_b == 0.0: return 0.0 return dot / (norm_a ** 0.5 * norm_b ** 0.5) def normalize_embedding(vec: list[float]) -> list[float]: """ L2-normalize an embedding vector. Port of OpenClaw's sanitizeAndNormalizeEmbedding(). """ sanitized = [v if isinstance(v, (int, float)) and v == v else 0.0 for v in vec] magnitude = sum(v * v for v in sanitized) ** 0.5 if magnitude < 1e-10: return sanitized return [v / magnitude for v in sanitized]