Skip to content

PathHybridFTSSearch module

PathHybridFTSSearch

Bases: Module

Hybrid (vec + BM25) variable-length path search, AND semantics.

LM-driven wrapper around KnowledgeBase.path_hybrid_fts_search. Each side is hybrid-searched (vec + fts) independently; the path's combined rrf_score is the sum of the subject-side and object-side hybrid scores — the 4-source RRF identity. Falls back to fulltext-only when no embedding model is configured.

Parameters:

Name Type Description Default
knowledge_base KnowledgeBase

The knowledge base to search. Required.

None
subj_schema dict

JSON schema of the subject entity. One of subj_schema, subj_entity_model, or subj_label must be provided.

None
subj_entity_model Entity | SymbolicDataModel

Subject entity model.

None
subj_label str

Subject entity label.

None
obj_schema dict

JSON schema of the object entity. One of obj_schema, obj_entity_model, or obj_label must be provided.

None
obj_entity_model Entity | SymbolicDataModel

Object entity model.

None
obj_label str

Object entity label.

None
rel_label str

Optional rel-label constraint applied to every hop.

None
min_hops int

Minimum hop count, inclusive. Defaults to 1.

1
max_hops int

Maximum hop count, inclusive. Defaults to 3.

3
k int

Maximum number of results. Defaults to 10.

10
k_rank int

RRF smoothing constant. Defaults to 60.

60
similarity_threshold float

Optional vector-distance threshold.

None
fulltext_threshold float

Optional BM25 threshold.

None
ef_search int

HNSW search-time candidate-list depth.

None
conjunctive bool

AND vs OR for the BM25 branch.

False
bm25_b float

Optional override for BM25's b parameter.

None
output_format str

"json" (default) or "csv".

'json'
name str

Module name.

None
description str

Module description.

None
trainable bool

Whether the module's variables should be trainable.

True
Source code in synalinks/src/modules/retrievers/path_hybrid_fts_search.py
@synalinks_export(
    [
        "synalinks.modules.PathHybridFTSSearch",
        "synalinks.PathHybridFTSSearch",
    ]
)
class PathHybridFTSSearch(Module):
    """Hybrid (vec + BM25) variable-length path search, AND semantics.

    LM-driven wrapper around
    `KnowledgeBase.path_hybrid_fts_search`. Each side is
    hybrid-searched (vec + fts) independently; the path's combined
    ``rrf_score`` is the sum of the subject-side and object-side
    hybrid scores — the 4-source RRF identity. Falls back to
    fulltext-only when no embedding model is configured.

    Args:
        knowledge_base (KnowledgeBase): The knowledge base to search.
            Required.
        subj_schema (dict): JSON schema of the subject entity. One of
            ``subj_schema``, ``subj_entity_model``, or ``subj_label``
            must be provided.
        subj_entity_model (Entity | SymbolicDataModel): Subject entity model.
        subj_label (str): Subject entity label.
        obj_schema (dict): JSON schema of the object entity. One of
            ``obj_schema``, ``obj_entity_model``, or ``obj_label``
            must be provided.
        obj_entity_model (Entity | SymbolicDataModel): Object entity model.
        obj_label (str): Object entity label.
        rel_label (str): Optional rel-label constraint applied to
            every hop.
        min_hops (int): Minimum hop count, inclusive. Defaults to 1.
        max_hops (int): Maximum hop count, inclusive. Defaults to 3.
        k (int): Maximum number of results. Defaults to 10.
        k_rank (int): RRF smoothing constant. Defaults to 60.
        similarity_threshold (float): Optional vector-distance threshold.
        fulltext_threshold (float): Optional BM25 threshold.
        ef_search (int): HNSW search-time candidate-list depth.
        conjunctive (bool): AND vs OR for the BM25 branch.
        bm25_b (float): Optional override for BM25's ``b`` parameter.
        output_format (str): ``"json"`` (default) or ``"csv"``.
        name (str): Module name.
        description (str): Module description.
        trainable (bool): Whether the module's variables should be
            trainable.
    """

    def __init__(
        self,
        *,
        knowledge_base=None,
        language_model=None,
        subj_schema=None,
        subj_entity_model=None,
        subj_label: Optional[str] = None,
        obj_schema=None,
        obj_entity_model=None,
        obj_label: Optional[str] = None,
        rel_label: Optional[str] = None,
        min_hops: int = 1,
        max_hops: int = 3,
        k: int = 10,
        k_rank: int = 60,
        similarity_threshold: Optional[float] = None,
        fulltext_threshold: Optional[float] = None,
        ef_search: Optional[int] = None,
        conjunctive: bool = False,
        bm25_b: Optional[float] = None,
        output_format: str = "json",
        prompt_template: Optional[str] = None,
        examples: Optional[list] = None,
        instructions: Optional[str] = None,
        seed_instructions: Optional[str] = None,
        temperature: float = 0.0,
        use_inputs_schema: bool = False,
        use_outputs_schema: bool = False,
        return_inputs: bool = True,
        return_query: bool = True,
        name: Optional[str] = None,
        description: Optional[str] = None,
        trainable: bool = True,
    ):
        super().__init__(
            name=name,
            description=description,
            trainable=trainable,
        )
        self.knowledge_base = _get_kb(knowledge_base)
        self.language_model = _get_lm(language_model)

        self.subj_schema, self.subj_label = resolve_endpoint(
            subj_schema, subj_entity_model, subj_label, "subj"
        )
        self.subj_entity_model = subj_entity_model
        self.obj_schema, self.obj_label = resolve_endpoint(
            obj_schema, obj_entity_model, obj_label, "obj"
        )
        self.obj_entity_model = obj_entity_model
        self.rel_label = rel_label

        if min_hops < 1 or max_hops < min_hops:
            raise ValueError(
                f"Invalid hop range: min_hops={min_hops}, "
                f"max_hops={max_hops}. Require 1 <= min_hops <= max_hops."
            )
        self.min_hops = min_hops
        self.max_hops = max_hops

        if output_format not in ("json", "csv"):
            raise ValueError(
                f"`output_format` must be 'json' or 'csv', got {output_format!r}"
            )
        self.output_format = output_format

        if not isinstance(k, int) or k < 1:
            raise ValueError(f"`k` must be a positive integer, got {k!r}")
        self.k = k
        self.k_rank = k_rank
        self.similarity_threshold = similarity_threshold
        self.fulltext_threshold = fulltext_threshold
        self.ef_search = ef_search
        self.conjunctive = conjunctive
        self.bm25_b = bm25_b

        self.prompt_template = prompt_template
        self.examples = examples
        self.instructions = instructions
        self.seed_instructions = seed_instructions
        self.temperature = temperature
        self.use_inputs_schema = use_inputs_schema
        self.use_outputs_schema = use_outputs_schema
        self.return_inputs = return_inputs
        self.return_query = return_query

        self.query_generator = Generator(
            data_model=PathHybridFTSSearchInput,
            language_model=self.language_model,
            prompt_template=self.prompt_template,
            examples=self.examples,
            instructions=self.instructions,
            seed_instructions=self.seed_instructions,
            temperature=self.temperature,
            use_inputs_schema=self.use_inputs_schema,
            use_outputs_schema=self.use_outputs_schema,
            return_inputs=False,
            name="path_hybrid_fts_search_query_generator_" + self.name,
        )

    async def call(self, inputs, training=False):
        if not inputs:
            return None

        query = await self.query_generator(inputs, training=training)
        if not query:
            return None
        payload = query.get_json()
        subj_queries = payload.get("subj_similarity_search", [])
        obj_queries = payload.get("obj_similarity_search", [])
        subj_keywords = payload.get("subj_keywords")
        obj_keywords = payload.get("obj_keywords")
        if not subj_queries or not obj_queries:
            return None

        rows = await self.knowledge_base.path_hybrid_fts_search(
            subj_text_or_texts=subj_queries,
            obj_text_or_texts=obj_queries,
            subj_keywords=subj_keywords,
            obj_keywords=obj_keywords,
            subj_label=self.subj_label,
            obj_label=self.obj_label,
            label=self.rel_label,
            min_hops=self.min_hops,
            max_hops=self.max_hops,
            k=self.k,
            k_rank=self.k_rank,
            similarity_threshold=self.similarity_threshold,
            fulltext_threshold=self.fulltext_threshold,
            ef_search=self.ef_search,
            conjunctive=self.conjunctive,
            bm25_b=self.bm25_b,
            output_format=self.output_format,
        )
        results = JsonDataModel(
            json={"result": rows},
            schema=GenericResult.get_schema(),
            name=self.name,
        )
        if self.return_query:
            results = await ops.logical_and(
                query,
                results,
                name="results_with_query_" + self.name,
            )
        if self.return_inputs:
            results = await ops.logical_and(
                inputs,
                results,
                name="results_with_inputs_" + self.name,
            )
        return results

    async def compute_output_spec(self, inputs, training=False):
        query = await self.query_generator(inputs, training=training)
        results = SymbolicDataModel(
            schema=GenericResult.get_schema(),
            name=self.name,
        )
        if self.return_query:
            results = await ops.logical_and(
                query,
                results,
                name="results_with_query_" + self.name,
            )
        if self.return_inputs:
            results = await ops.logical_and(
                inputs,
                results,
                name="results_with_inputs_" + self.name,
            )
        return results

    def get_config(self):
        config = {
            "subj_schema": self.subj_schema,
            "subj_label": self.subj_label,
            "obj_schema": self.obj_schema,
            "obj_label": self.obj_label,
            "rel_label": self.rel_label,
            "min_hops": self.min_hops,
            "max_hops": self.max_hops,
            "k": self.k,
            "k_rank": self.k_rank,
            "similarity_threshold": self.similarity_threshold,
            "fulltext_threshold": self.fulltext_threshold,
            "ef_search": self.ef_search,
            "conjunctive": self.conjunctive,
            "bm25_b": self.bm25_b,
            "output_format": self.output_format,
            "prompt_template": self.prompt_template,
            "examples": self.examples,
            "instructions": self.instructions,
            "seed_instructions": self.seed_instructions,
            "temperature": self.temperature,
            "use_inputs_schema": self.use_inputs_schema,
            "use_outputs_schema": self.use_outputs_schema,
            "return_inputs": self.return_inputs,
            "return_query": self.return_query,
            "name": self.name,
            "description": self.description,
            "trainable": self.trainable,
        }
        knowledge_base_config = {
            "knowledge_base": serialization_lib.serialize_synalinks_object(
                self.knowledge_base,
            )
        }
        language_model_config = {
            "language_model": serialization_lib.serialize_synalinks_object(
                self.language_model,
            )
        }
        endpoint_models_config = {
            "subj_entity_model": serialize_entity_model(
                self.subj_entity_model, "subj_entity_model_" + self.name
            ),
            "obj_entity_model": serialize_entity_model(
                self.obj_entity_model, "obj_entity_model_" + self.name
            ),
        }
        return {
            **config,
            **knowledge_base_config,
            **language_model_config,
            **endpoint_models_config,
        }

    @classmethod
    def from_config(cls, config):
        knowledge_base = serialization_lib.deserialize_synalinks_object(
            config.pop("knowledge_base")
        )
        language_model = serialization_lib.deserialize_synalinks_object(
            config.pop("language_model")
        )
        subj_entity_model = deserialize_entity_model(
            config.pop("subj_entity_model", None)
        )
        obj_entity_model = deserialize_entity_model(config.pop("obj_entity_model", None))
        return cls(
            knowledge_base=knowledge_base,
            language_model=language_model,
            subj_entity_model=subj_entity_model,
            obj_entity_model=obj_entity_model,
            **config,
        )

PathHybridFTSSearchInput

Bases: DataModel

Input shape for PathHybridFTSSearch.

The *_keywords lists are optional — when omitted, the similarity-search text is reused for BM25 scoring on that side.

Source code in synalinks/src/modules/retrievers/path_hybrid_fts_search.py
class PathHybridFTSSearchInput(DataModel):
    """Input shape for `PathHybridFTSSearch`.

    The ``*_keywords`` lists are optional — when omitted, the
    similarity-search text is reused for BM25 scoring on that side.
    """

    subj_similarity_search: List[str] = Field(
        description="Natural-language queries for the subject vector branch",
    )
    obj_similarity_search: List[str] = Field(
        description="Natural-language queries for the object vector branch",
    )
    subj_keywords: Optional[List[str]] = Field(
        description="Optional keyword queries for the subject BM25 branch",
        default=None,
    )
    obj_keywords: Optional[List[str]] = Field(
        description="Optional keyword queries for the object BM25 branch",
        default=None,
    )