Bases: Module
Update (insert or upsert) data models in the given knowledge base.
Dispatches on the input's shape:
KnowledgeGraph (is_knowledge_graph) →
KnowledgeBase.update_knowledge_graph (bulk entities + relations).
KnowledgeGraphs wrapper (is_knowledge_graphs) →
KnowledgeBase.update_knowledge_graph once per wrapped KG
(the KB has no bulk multi-graph endpoint).
Entities wrapper (is_entities) →
KnowledgeBase.update_entities with the wrapped list (bulk nodes).
Relations wrapper (is_relations) →
KnowledgeBase.update_relations with the wrapped list (bulk edges).
Relation (is_relation) →
KnowledgeBase.update_relations (single edge; endpoints upserted
as needed).
Entity (is_entity) →
KnowledgeBase.update_entities (single node).
- Anything else →
KnowledgeBase.update (SQL row/table store);
the first declared field is used as the primary key for upsert.
To pass a Python list of entities or relations rather than a wrapper
data model, tree.map_structure flattens the inputs so each item
is dispatched individually — wrap as Entities / Relations
for a single bulk round-trip instead of N per-item ones.
The output is a name-mangled clone of the input ("updated_" +
name), passed through unchanged. The KB methods' return values
(assigned ids) are ignored — use the KB directly if you need them.
Parameters:
| Name |
Type |
Description |
Default |
knowledge_base
|
KnowledgeBase
|
The knowledge base to update.
|
None
|
name
|
str
|
Optional. The name of the module.
|
None
|
description
|
str
|
Optional. The description of the module.
|
None
|
trainable
|
bool
|
Whether the module's variables should be trainable.
|
False
|
Source code in synalinks/src/modules/knowledge/update_knowledge.py
| @synalinks_export(
[
"synalinks.modules.UpdateKnowledge",
"synalinks.UpdateKnowledge",
]
)
class UpdateKnowledge(Module):
"""Update (insert or upsert) data models in the given knowledge base.
Dispatches on the input's shape:
* ``KnowledgeGraph`` (``is_knowledge_graph``) →
`KnowledgeBase.update_knowledge_graph` (bulk entities + relations).
* ``KnowledgeGraphs`` wrapper (``is_knowledge_graphs``) →
`KnowledgeBase.update_knowledge_graph` once per wrapped KG
(the KB has no bulk multi-graph endpoint).
* ``Entities`` wrapper (``is_entities``) →
`KnowledgeBase.update_entities` with the wrapped list (bulk nodes).
* ``Relations`` wrapper (``is_relations``) →
`KnowledgeBase.update_relations` with the wrapped list (bulk edges).
* ``Relation`` (``is_relation``) →
`KnowledgeBase.update_relations` (single edge; endpoints upserted
as needed).
* ``Entity`` (``is_entity``) →
`KnowledgeBase.update_entities` (single node).
* Anything else → `KnowledgeBase.update` (SQL row/table store);
the first declared field is used as the primary key for upsert.
To pass a Python list of entities or relations rather than a wrapper
data model, ``tree.map_structure`` flattens the inputs so each item
is dispatched individually — wrap as ``Entities`` / ``Relations``
for a single bulk round-trip instead of N per-item ones.
The output is a name-mangled clone of the input (``"updated_" +
name``), passed through unchanged. The KB methods' return values
(assigned ids) are ignored — use the KB directly if you need them.
Args:
knowledge_base (KnowledgeBase): The knowledge base to update.
name (str): Optional. The name of the module.
description (str): Optional. The description of the module.
trainable (bool): Whether the module's variables should be trainable.
"""
def __init__(
self,
*,
knowledge_base=None,
name=None,
description=None,
trainable=False,
):
super().__init__(
name=name,
description=description,
trainable=trainable,
)
self.knowledge_base = _get_kb(knowledge_base)
async def _update(self, data_model):
# Order matters:
# * KnowledgeGraph passes is_entities AND is_relations
# (it has both fields) — check it FIRST.
# * is_relation also passes is_entity (a Relation has a
# `label` field) — relation check must come before entity.
if is_knowledge_graph(data_model):
await self.knowledge_base.update_knowledge_graph(data_model)
elif is_knowledge_graphs(data_model):
# No bulk multi-graph endpoint — iterate per KG.
for kg in data_model.get_nested_entity_list("knowledge_graphs") or []:
await self.knowledge_base.update_knowledge_graph(kg)
elif is_entities(data_model):
# Bulk wrapper — pass the underlying list to update_entities
# so the adapter sees one batched call instead of N per-entity
# round-trips.
await self.knowledge_base.update_entities(data_model.get("entities") or [])
elif is_relations(data_model):
await self.knowledge_base.update_relations(data_model.get("relations") or [])
elif is_relation(data_model):
await self.knowledge_base.update_relations(data_model)
elif is_entity(data_model):
await self.knowledge_base.update_entities(data_model)
else:
await self.knowledge_base.update(data_model)
return data_model.clone(name="updated_" + data_model.name)
async def call(self, inputs):
if not inputs:
return None
# Await each update on the current event loop, sequentially (avoids the
# transient thread-loops `run_maybe_nested` created — which orphaned
# litellm's global client and serialized DB writes anyway). flatten/pack
# mirrors `map_structure` since data models are tree leaves.
leaves = tree.flatten(inputs)
outputs = [await self._update(leaf) for leaf in leaves]
return tree.pack_sequence_as(inputs, outputs)
async def compute_output_spec(self, inputs):
return tree.map_structure(
lambda x: x.clone(name="updated_" + x.name),
inputs,
)
def get_config(self):
config = {
"name": self.name,
"description": self.description,
"trainable": self.trainable,
}
knowledge_base_config = {
"knowledge_base": serialization_lib.serialize_synalinks_object(
self.knowledge_base
)
}
return {**knowledge_base_config, **config}
@classmethod
def from_config(cls, config):
knowledge_base = serialization_lib.deserialize_synalinks_object(
config.pop("knowledge_base")
)
return cls(knowledge_base=knowledge_base, **config)
|