first commit
This commit is contained in:
214
memory/internal.py
Normal file
214
memory/internal.py
Normal file
@@ -0,0 +1,214 @@
|
||||
"""
|
||||
Internal utilities for the memory system.
|
||||
Port of OpenClaw's src/memory/internal.ts:
|
||||
• hashText — SHA-256 content hashing
|
||||
• chunkMarkdown — split markdown into overlapping chunks
|
||||
• listMemoryFiles — discover .md files in workspace
|
||||
• buildFileEntry — create MemoryFileEntry from a file
|
||||
• cosineSimilarity — vector similarity calculation
|
||||
"""
|
||||
|
||||
import hashlib
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from memory.types import MemoryChunk, MemoryFileEntry
|
||||
|
||||
|
||||
def hash_text(value: str) -> str:
|
||||
"""SHA-256 hash of text content. Mirrors OpenClaw's hashText()."""
|
||||
return hashlib.sha256(value.encode("utf-8")).hexdigest()
|
||||
|
||||
|
||||
def chunk_markdown(
|
||||
content: str,
|
||||
chunk_tokens: int = 512,
|
||||
chunk_overlap: int = 50,
|
||||
) -> list[MemoryChunk]:
|
||||
"""
|
||||
Split markdown content into overlapping chunks.
|
||||
Direct port of OpenClaw's chunkMarkdown() from internal.ts.
|
||||
|
||||
Uses character-based approximation: ~4 chars per token.
|
||||
"""
|
||||
lines = content.split("\n")
|
||||
if not lines:
|
||||
return []
|
||||
|
||||
max_chars = max(32, chunk_tokens * 4)
|
||||
overlap_chars = max(0, chunk_overlap * 4)
|
||||
chunks: list[MemoryChunk] = []
|
||||
|
||||
current: list[tuple[str, int]] = [] # (line_text, 1-indexed line_no)
|
||||
current_chars = 0
|
||||
|
||||
def flush() -> None:
|
||||
nonlocal current, current_chars
|
||||
if not current:
|
||||
return
|
||||
text = "\n".join(line for line, _ in current)
|
||||
start_line = current[0][1]
|
||||
end_line = current[-1][1]
|
||||
chunks.append(MemoryChunk(
|
||||
start_line=start_line,
|
||||
end_line=end_line,
|
||||
text=text,
|
||||
hash=hash_text(text),
|
||||
))
|
||||
|
||||
def carry_overlap() -> None:
|
||||
nonlocal current, current_chars
|
||||
if overlap_chars <= 0 or not current:
|
||||
current = []
|
||||
current_chars = 0
|
||||
return
|
||||
acc = 0
|
||||
kept: list[tuple[str, int]] = []
|
||||
for line_text, line_no in reversed(current):
|
||||
acc += len(line_text) + 1
|
||||
kept.insert(0, (line_text, line_no))
|
||||
if acc >= overlap_chars:
|
||||
break
|
||||
current = kept
|
||||
current_chars = sum(len(lt) + 1 for lt, _ in kept)
|
||||
|
||||
for i, line in enumerate(lines):
|
||||
line_no = i + 1
|
||||
# Handle very long lines by splitting into segments
|
||||
segments = [""] if not line else [
|
||||
line[start:start + max_chars]
|
||||
for start in range(0, len(line), max_chars)
|
||||
]
|
||||
for segment in segments:
|
||||
line_size = len(segment) + 1
|
||||
if current_chars + line_size > max_chars and current:
|
||||
flush()
|
||||
carry_overlap()
|
||||
current.append((segment, line_no))
|
||||
current_chars += line_size
|
||||
|
||||
flush()
|
||||
return chunks
|
||||
|
||||
|
||||
def list_memory_files(
|
||||
workspace_dir: str,
|
||||
extra_paths: list[str] | None = None,
|
||||
) -> list[str]:
|
||||
"""
|
||||
List all markdown files in the workspace memory directory.
|
||||
Port of OpenClaw's listMemoryFiles() from internal.ts.
|
||||
|
||||
Searches for:
|
||||
- MEMORY.md (or memory.md) in workspace root
|
||||
- All .md files in memory/ subdirectory
|
||||
- Any additional paths specified
|
||||
"""
|
||||
result: list[str] = []
|
||||
ws = Path(workspace_dir).expanduser().resolve()
|
||||
|
||||
# Check MEMORY.md and memory.md in workspace root
|
||||
for name in ("MEMORY.md", "memory.md"):
|
||||
candidate = ws / name
|
||||
if candidate.is_file() and not candidate.is_symlink():
|
||||
result.append(str(candidate))
|
||||
|
||||
# Check SOUL.md and USER.md (identity files)
|
||||
for name in ("SOUL.md", "USER.md"):
|
||||
candidate = ws / name
|
||||
if candidate.is_file() and not candidate.is_symlink():
|
||||
result.append(str(candidate))
|
||||
|
||||
# Walk memory/ subdirectory
|
||||
memory_dir = ws / "memory"
|
||||
if memory_dir.is_dir() and not memory_dir.is_symlink():
|
||||
_walk_md_files(memory_dir, result)
|
||||
|
||||
# Extra paths
|
||||
if extra_paths:
|
||||
for extra in extra_paths:
|
||||
p = Path(extra).expanduser().resolve()
|
||||
if p.is_symlink():
|
||||
continue
|
||||
if p.is_dir():
|
||||
_walk_md_files(p, result)
|
||||
elif p.is_file() and p.suffix == ".md":
|
||||
result.append(str(p))
|
||||
|
||||
# Deduplicate by resolved path
|
||||
seen: set[str] = set()
|
||||
deduped: list[str] = []
|
||||
for entry in result:
|
||||
real = os.path.realpath(entry)
|
||||
if real not in seen:
|
||||
seen.add(real)
|
||||
deduped.append(entry)
|
||||
|
||||
return deduped
|
||||
|
||||
|
||||
def _walk_md_files(directory: Path, result: list[str]) -> None:
|
||||
"""Recursively collect .md files from a directory."""
|
||||
try:
|
||||
for entry in sorted(directory.iterdir()):
|
||||
if entry.is_symlink():
|
||||
continue
|
||||
if entry.is_dir():
|
||||
_walk_md_files(entry, result)
|
||||
elif entry.is_file() and entry.suffix == ".md":
|
||||
result.append(str(entry))
|
||||
except PermissionError:
|
||||
pass
|
||||
|
||||
|
||||
def build_file_entry(abs_path: str, workspace_dir: str) -> MemoryFileEntry:
|
||||
"""
|
||||
Create a MemoryFileEntry from a file path.
|
||||
Port of OpenClaw's buildFileEntry() from internal.ts.
|
||||
"""
|
||||
stat = os.stat(abs_path)
|
||||
with open(abs_path, "r", encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
content_hash = hash_text(content)
|
||||
rel_path = os.path.relpath(abs_path, workspace_dir).replace("\\", "/")
|
||||
return MemoryFileEntry(
|
||||
path=rel_path,
|
||||
abs_path=abs_path,
|
||||
mtime_ms=stat.st_mtime * 1000,
|
||||
size=stat.st_size,
|
||||
hash=content_hash,
|
||||
)
|
||||
|
||||
|
||||
def cosine_similarity(a: list[float], b: list[float]) -> float:
|
||||
"""
|
||||
Compute cosine similarity between two vectors.
|
||||
Port of OpenClaw's cosineSimilarity() from internal.ts.
|
||||
"""
|
||||
if not a or not b:
|
||||
return 0.0
|
||||
length = min(len(a), len(b))
|
||||
dot = 0.0
|
||||
norm_a = 0.0
|
||||
norm_b = 0.0
|
||||
for i in range(length):
|
||||
av = a[i]
|
||||
bv = b[i]
|
||||
dot += av * bv
|
||||
norm_a += av * av
|
||||
norm_b += bv * bv
|
||||
if norm_a == 0.0 or norm_b == 0.0:
|
||||
return 0.0
|
||||
return dot / (norm_a ** 0.5 * norm_b ** 0.5)
|
||||
|
||||
|
||||
def normalize_embedding(vec: list[float]) -> list[float]:
|
||||
"""
|
||||
L2-normalize an embedding vector.
|
||||
Port of OpenClaw's sanitizeAndNormalizeEmbedding().
|
||||
"""
|
||||
sanitized = [v if isinstance(v, (int, float)) and v == v else 0.0 for v in vec]
|
||||
magnitude = sum(v * v for v in sanitized) ** 0.5
|
||||
if magnitude < 1e-10:
|
||||
return sanitized
|
||||
return [v / magnitude for v in sanitized]
|
||||
Reference in New Issue
Block a user