112 lines
3.5 KiB
Python
112 lines
3.5 KiB
Python
"""
|
||
Hybrid search — merges vector similarity + BM25 keyword results.
|
||
Direct port of OpenClaw's src/memory/hybrid.ts.
|
||
|
||
The algorithm:
|
||
1. Run vector search → ranked by cosine similarity
|
||
2. Run FTS5 keyword search → ranked by BM25
|
||
3. Merge by weighted score: 0.7 × vector + 0.3 × keyword
|
||
4. Deduplicate by chunk ID
|
||
5. Sort by combined score (descending)
|
||
"""
|
||
|
||
import re
|
||
|
||
from memory.types import MemorySearchResult, MemorySource
|
||
|
||
|
||
def build_fts_query(raw: str) -> str | None:
|
||
"""
|
||
Build an FTS5 match query from raw text.
|
||
Port of OpenClaw's buildFtsQuery() — quotes each token
|
||
and joins with AND for a conjunctive match.
|
||
|
||
Example: "hello world" → '"hello" AND "world"'
|
||
"""
|
||
tokens = re.findall(r"[A-Za-z0-9_]+", raw)
|
||
if not tokens:
|
||
return None
|
||
quoted = [f'"{t}"' for t in tokens]
|
||
return " AND ".join(quoted)
|
||
|
||
|
||
def bm25_rank_to_score(rank: float) -> float:
|
||
"""
|
||
Convert FTS5 BM25 rank (negative = better) to a 0-1 score.
|
||
Port of OpenClaw's bm25RankToScore().
|
||
"""
|
||
normalized = max(0.0, rank) if isinstance(rank, (int, float)) else 999.0
|
||
return 1.0 / (1.0 + normalized)
|
||
|
||
|
||
def merge_hybrid_results(
|
||
vector: list[dict],
|
||
keyword: list[dict],
|
||
vector_weight: float = 0.7,
|
||
text_weight: float = 0.3,
|
||
) -> list[MemorySearchResult]:
|
||
"""
|
||
Merge vector and keyword search results with weighted scoring.
|
||
Direct port of OpenClaw's mergeHybridResults() from hybrid.ts.
|
||
|
||
Each vector result dict has: id, path, start_line, end_line, source, snippet, vector_score
|
||
Each keyword result dict has: id, path, start_line, end_line, source, snippet, text_score
|
||
"""
|
||
by_id: dict[str, dict] = {}
|
||
|
||
# Process vector results
|
||
for r in vector:
|
||
by_id[r["id"]] = {
|
||
"id": r["id"],
|
||
"path": r["path"],
|
||
"start_line": r["start_line"],
|
||
"end_line": r["end_line"],
|
||
"source": r["source"],
|
||
"snippet": r["snippet"],
|
||
"vector_score": r.get("vector_score", 0.0),
|
||
"text_score": 0.0,
|
||
}
|
||
|
||
# Process keyword results — merge with existing or create new
|
||
for r in keyword:
|
||
existing = by_id.get(r["id"])
|
||
if existing:
|
||
existing["text_score"] = r.get("text_score", 0.0)
|
||
# Prefer keyword snippet if available (often more relevant)
|
||
if r.get("snippet"):
|
||
existing["snippet"] = r["snippet"]
|
||
else:
|
||
by_id[r["id"]] = {
|
||
"id": r["id"],
|
||
"path": r["path"],
|
||
"start_line": r["start_line"],
|
||
"end_line": r["end_line"],
|
||
"source": r["source"],
|
||
"snippet": r["snippet"],
|
||
"vector_score": 0.0,
|
||
"text_score": r.get("text_score", 0.0),
|
||
}
|
||
|
||
# Compute weighted score and convert to MemorySearchResult
|
||
merged: list[MemorySearchResult] = []
|
||
for entry in by_id.values():
|
||
score = (
|
||
vector_weight * entry["vector_score"]
|
||
+ text_weight * entry["text_score"]
|
||
)
|
||
source = entry["source"]
|
||
if isinstance(source, str):
|
||
source = MemorySource(source)
|
||
merged.append(MemorySearchResult(
|
||
path=entry["path"],
|
||
start_line=entry["start_line"],
|
||
end_line=entry["end_line"],
|
||
score=score,
|
||
snippet=entry["snippet"],
|
||
source=source,
|
||
))
|
||
|
||
# Sort by score descending
|
||
merged.sort(key=lambda r: r.score, reverse=True)
|
||
return merged
|