215 lines
6.4 KiB
Python
215 lines
6.4 KiB
Python
"""
|
|
Internal utilities for the memory system.
|
|
Port of OpenClaw's src/memory/internal.ts:
|
|
• hashText — SHA-256 content hashing
|
|
• chunkMarkdown — split markdown into overlapping chunks
|
|
• listMemoryFiles — discover .md files in workspace
|
|
• buildFileEntry — create MemoryFileEntry from a file
|
|
• cosineSimilarity — vector similarity calculation
|
|
"""
|
|
|
|
import hashlib
|
|
import os
|
|
from pathlib import Path
|
|
|
|
from memory.types import MemoryChunk, MemoryFileEntry
|
|
|
|
|
|
def hash_text(value: str) -> str:
|
|
"""SHA-256 hash of text content. Mirrors OpenClaw's hashText()."""
|
|
return hashlib.sha256(value.encode("utf-8")).hexdigest()
|
|
|
|
|
|
def chunk_markdown(
|
|
content: str,
|
|
chunk_tokens: int = 512,
|
|
chunk_overlap: int = 50,
|
|
) -> list[MemoryChunk]:
|
|
"""
|
|
Split markdown content into overlapping chunks.
|
|
Direct port of OpenClaw's chunkMarkdown() from internal.ts.
|
|
|
|
Uses character-based approximation: ~4 chars per token.
|
|
"""
|
|
lines = content.split("\n")
|
|
if not lines:
|
|
return []
|
|
|
|
max_chars = max(32, chunk_tokens * 4)
|
|
overlap_chars = max(0, chunk_overlap * 4)
|
|
chunks: list[MemoryChunk] = []
|
|
|
|
current: list[tuple[str, int]] = [] # (line_text, 1-indexed line_no)
|
|
current_chars = 0
|
|
|
|
def flush() -> None:
|
|
nonlocal current, current_chars
|
|
if not current:
|
|
return
|
|
text = "\n".join(line for line, _ in current)
|
|
start_line = current[0][1]
|
|
end_line = current[-1][1]
|
|
chunks.append(MemoryChunk(
|
|
start_line=start_line,
|
|
end_line=end_line,
|
|
text=text,
|
|
hash=hash_text(text),
|
|
))
|
|
|
|
def carry_overlap() -> None:
|
|
nonlocal current, current_chars
|
|
if overlap_chars <= 0 or not current:
|
|
current = []
|
|
current_chars = 0
|
|
return
|
|
acc = 0
|
|
kept: list[tuple[str, int]] = []
|
|
for line_text, line_no in reversed(current):
|
|
acc += len(line_text) + 1
|
|
kept.insert(0, (line_text, line_no))
|
|
if acc >= overlap_chars:
|
|
break
|
|
current = kept
|
|
current_chars = sum(len(lt) + 1 for lt, _ in kept)
|
|
|
|
for i, line in enumerate(lines):
|
|
line_no = i + 1
|
|
# Handle very long lines by splitting into segments
|
|
segments = [""] if not line else [
|
|
line[start:start + max_chars]
|
|
for start in range(0, len(line), max_chars)
|
|
]
|
|
for segment in segments:
|
|
line_size = len(segment) + 1
|
|
if current_chars + line_size > max_chars and current:
|
|
flush()
|
|
carry_overlap()
|
|
current.append((segment, line_no))
|
|
current_chars += line_size
|
|
|
|
flush()
|
|
return chunks
|
|
|
|
|
|
def list_memory_files(
|
|
workspace_dir: str,
|
|
extra_paths: list[str] | None = None,
|
|
) -> list[str]:
|
|
"""
|
|
List all markdown files in the workspace memory directory.
|
|
Port of OpenClaw's listMemoryFiles() from internal.ts.
|
|
|
|
Searches for:
|
|
- MEMORY.md (or memory.md) in workspace root
|
|
- All .md files in memory/ subdirectory
|
|
- Any additional paths specified
|
|
"""
|
|
result: list[str] = []
|
|
ws = Path(workspace_dir).expanduser().resolve()
|
|
|
|
# Check MEMORY.md and memory.md in workspace root
|
|
for name in ("MEMORY.md", "memory.md"):
|
|
candidate = ws / name
|
|
if candidate.is_file() and not candidate.is_symlink():
|
|
result.append(str(candidate))
|
|
|
|
# Check SOUL.md and USER.md (identity files)
|
|
for name in ("SOUL.md", "USER.md"):
|
|
candidate = ws / name
|
|
if candidate.is_file() and not candidate.is_symlink():
|
|
result.append(str(candidate))
|
|
|
|
# Walk memory/ subdirectory
|
|
memory_dir = ws / "memory"
|
|
if memory_dir.is_dir() and not memory_dir.is_symlink():
|
|
_walk_md_files(memory_dir, result)
|
|
|
|
# Extra paths
|
|
if extra_paths:
|
|
for extra in extra_paths:
|
|
p = Path(extra).expanduser().resolve()
|
|
if p.is_symlink():
|
|
continue
|
|
if p.is_dir():
|
|
_walk_md_files(p, result)
|
|
elif p.is_file() and p.suffix == ".md":
|
|
result.append(str(p))
|
|
|
|
# Deduplicate by resolved path
|
|
seen: set[str] = set()
|
|
deduped: list[str] = []
|
|
for entry in result:
|
|
real = os.path.realpath(entry)
|
|
if real not in seen:
|
|
seen.add(real)
|
|
deduped.append(entry)
|
|
|
|
return deduped
|
|
|
|
|
|
def _walk_md_files(directory: Path, result: list[str]) -> None:
|
|
"""Recursively collect .md files from a directory."""
|
|
try:
|
|
for entry in sorted(directory.iterdir()):
|
|
if entry.is_symlink():
|
|
continue
|
|
if entry.is_dir():
|
|
_walk_md_files(entry, result)
|
|
elif entry.is_file() and entry.suffix == ".md":
|
|
result.append(str(entry))
|
|
except PermissionError:
|
|
pass
|
|
|
|
|
|
def build_file_entry(abs_path: str, workspace_dir: str) -> MemoryFileEntry:
|
|
"""
|
|
Create a MemoryFileEntry from a file path.
|
|
Port of OpenClaw's buildFileEntry() from internal.ts.
|
|
"""
|
|
stat = os.stat(abs_path)
|
|
with open(abs_path, "r", encoding="utf-8") as f:
|
|
content = f.read()
|
|
content_hash = hash_text(content)
|
|
rel_path = os.path.relpath(abs_path, workspace_dir).replace("\\", "/")
|
|
return MemoryFileEntry(
|
|
path=rel_path,
|
|
abs_path=abs_path,
|
|
mtime_ms=stat.st_mtime * 1000,
|
|
size=stat.st_size,
|
|
hash=content_hash,
|
|
)
|
|
|
|
|
|
def cosine_similarity(a: list[float], b: list[float]) -> float:
|
|
"""
|
|
Compute cosine similarity between two vectors.
|
|
Port of OpenClaw's cosineSimilarity() from internal.ts.
|
|
"""
|
|
if not a or not b:
|
|
return 0.0
|
|
length = min(len(a), len(b))
|
|
dot = 0.0
|
|
norm_a = 0.0
|
|
norm_b = 0.0
|
|
for i in range(length):
|
|
av = a[i]
|
|
bv = b[i]
|
|
dot += av * bv
|
|
norm_a += av * av
|
|
norm_b += bv * bv
|
|
if norm_a == 0.0 or norm_b == 0.0:
|
|
return 0.0
|
|
return dot / (norm_a ** 0.5 * norm_b ** 0.5)
|
|
|
|
|
|
def normalize_embedding(vec: list[float]) -> list[float]:
|
|
"""
|
|
L2-normalize an embedding vector.
|
|
Port of OpenClaw's sanitizeAndNormalizeEmbedding().
|
|
"""
|
|
sanitized = [v if isinstance(v, (int, float)) and v == v else 0.0 for v in vec]
|
|
magnitude = sum(v * v for v in sanitized) ** 0.5
|
|
if magnitude < 1e-10:
|
|
return sanitized
|
|
return [v / magnitude for v in sanitized]
|