# Synalinks
> Keras based LM framework for neuro-symbolic applications
Keras based LM framework for neuro-symbolic applications and In-Context learning
# Usage documentation
# Introduction
______________________________________________________________________
## What is Synalinks?
Synalinks is an open-source framework that makes it easy to create, evaluate, train, and deploy industry-standard Language Models (LMs) applications. Synalinks follows the principle of *progressive disclosure of complexity*: meaning that simple workflows should be quick and easy, while arbitrarily advanced ones should be possible via a clear path that builds upon what you've already learned.
Synalinks is an *adaptation of Keras 3* focused on neuro-symbolic systems and in-context reinforcement learning, an ensemble of techniques that enhance the LMs predictions and accuracy without changing the weights of the model. The goal of Synalinks is to facilitate the rapid setup of simple applications while providing the flexibility for researchers and advanced users to develop sophisticated systems.
______________________________________________________________________
Info
You can use the [`llms.txt`](/synalinks/llms.txt) or [`llms-full.txt`](/synalinks/llms-full.txt) to feed your favorite LMs with Synalinks documentation
## Who is Synalinks for?
Synalinks is designed for a diverse range of users, from professionals and AI researchers to students, independent developers, and hobbyists. It is suitable for anyone who wants to learn about AI by building/composing blocks or build solid foundations for enterprise-grade products. While a background in Machine Learning and Deep Learning can be advantageous — as Synalinks leverages design patterns from Keras, one of the most user-friendly and popular Deep Learning frameworks — it is not a prerequisite. Synalinks is designed to be accessible to anyone with programming skills in Python, making it a versatile and inclusive platform for AI development.
______________________________________________________________________
## Why use Synalinks?
Developping a successful LM application in a profesional context, beyond stateless chatbots, is difficult and typically include:
- **Building optimized prompts with examples/instructions at each step**: Synalinks uses advanced In-Context Reinforcement Learning techniques to optimize each prompt.
- **Pipelines that change over time**: Easily edit your pipelines, re-run your training, and you're good to go.
- **Ensuring the correctness of the LMs output**: Synalinks combines constrained structured output with In-Context RL to ensure both format and content correctness.
- **Async Optimization**: Synalinks automatically optimizes your pipelines by detecting parallel processes.
- **Assessing the performance of your application**: Synalinks provides built-in metrics and rewards to evaluate your workflows.
- **Configuring Language & Embedding Models**: Seamlessly integrate multiple LM providers like Ollama, OpenAI, Anthropic, Mistral or Groq.
- **Documenting your ML workflows**: Plot your workflows, training history, and evaluations; document everything.
- **Versioning the prompts/pipelines**: Each program is serializable into JSON so you can version it with git.
- **Deploying REST APIs**: Compatible out-of-the-box with FastAPI so your Data Scientists and Web Developers can stop tearing each other apart.
Synalinks can help you simplify these tasks by leveraging decade old practices in Deep Learning frameworks. We provide a comprehensive suite of tools and features designed to streamline the development process, making it easier to create, evaluate, train, document and deploy robust neuro-symbolic LMs applications.
______________________________________________________________________
Source code in `synalinks/src/trainers/trainer.py`
````
class Trainer:
def __init__(self):
self._lock = False
self._run_eagerly = False
self.compiled = False
self.reward = None
self.steps_per_execution = 1
# Can be set by callbacks in on_train_begin
self._initial_epoch = None
self._compute_reward_has_training_arg = (
"training" in inspect.signature(self.compute_reward).parameters
)
# Placeholders used in `compile`
self._optimizer = None
self._compile_reward = None
self._compile_metrics = None
self._reward_tracker = None
@tracking.no_automatic_dependency_tracking
def compile(
self,
optimizer=None,
reward=None,
reward_weights=None,
metrics=None,
run_eagerly=False,
steps_per_execution=1,
):
"""Configures the program for training.
Example:
```python
program.compile(
optimizer=synalinks.optimizers.RandomFewShot(),
reward=synalinks.rewards.ExactMatch(),
metrics=[
synalinks.metrics.MeanMetricWrapper(synalinks.rewards.exact_match),
],
)
```
Args:
optimizer (Optimizer): Optimizer instance. See `synalinks.optimizers`.
reward (Reward): Reward function. A `synalinks.rewards.Reward`
instance. See `synalinks.rewards`. A reward function is
any callable with the signature `reward = fn(y_true, y_pred)`,
where `y_true` are the ground truth values, and `y_pred`
are the program's predictions.
`y_true` should be a list of batch size length `[d0, .. dN]`.
`y_pred` should be a list of batch size length `[d0, .. dN]`.
The reward function should return a float.
reward_weights (list): Optional list specifying scalar coefficients
(Python floats) to weight the reward contributions of
different program outputs. The reward value that will be maximized
by the program will then be the *weighted sum* of all individual
rewards, weighted by the `reward_weights` coefficients. It is
expected to have a 1:1 mapping to the program's outputs.
metrics (list): List of metrics to be evaluated by the program during
training and testing. Each of it is a `synalinks.metrics.Metric`
instance. See `synalinks.metrics`. A function is any callable with the
signature `result = fn(y_true, y_pred)`.
run_eagerly (bool): If `True`, this program's forward pass
will never be compiled. It is recommended to leave this
as `False` when training (for best performance),
and to set it to `True` when debugging.
steps_per_execution (int): The number of batches to run
during each a single compiled function call. Running multiple
batches inside a single compiled function call can
greatly improve performance on TPUs or small programs with a large
Python overhead. At most, one full epoch will be run each
execution. If a number larger than the size of the epoch is
passed, the execution will be truncated to the size of the
epoch. Note that if `steps_per_execution` is set to `N`,
`Callback.on_batch_begin` and `Callback.on_batch_end` methods
will only be called every `N` batches (i.e. before/after
each compiled function execution).
"""
self._clear_previous_trainer_metrics()
self._optimizer = optimizer
self._optimizer.set_program(self)
if hasattr(self, "output_names"):
output_names = self.output_names
else:
output_names = None
if reward is not None:
self._compile_reward = CompileReward(
reward, reward_weights, output_names=output_names
)
self.reward = reward
if metrics is not None:
self._compile_metrics = CompileMetrics(metrics, output_names=output_names)
self.run_eagerly = run_eagerly
self.stop_training = False
self.compiled = True
self._reward_tracker = metrics_module.Mean(name="reward")
self.steps_per_execution = steps_per_execution
self._compile_config = serialization_lib.SerializableDict(
optimizer=optimizer,
reward=reward,
reward_weights=reward_weights,
metrics=metrics,
run_eagerly=run_eagerly,
steps_per_execution=steps_per_execution,
)
@property
def optimizer(self):
return self._optimizer
@property
def metrics(self):
# Order: reward tracker, individual reward trackers, compiled metrics,
# custom metrcis, submodule metrics.
metrics = []
if self.compiled:
if self._reward_tracker is not None:
metrics.append(self._reward_tracker)
if self._compile_metrics is not None:
metrics.append(self._compile_metrics)
if self._compile_reward is not None:
metrics.extend(self._compile_reward.metrics)
metrics.extend(self._metrics)
for module in self._flatten_modules(include_self=False):
if isinstance(module, Trainer):
# All Trainer-related metrics in submodules should be ignored
# because a new Trainer has been instantiated.
continue
metrics.extend(module.metrics)
return metrics
@property
def metrics_names(self):
return [m.name for m in self.metrics]
def reset_metrics(self):
for m in self.metrics:
m.reset_state()
def _get_own_metrics(self):
metrics = []
if self._reward_tracker is not None:
metrics.append(self._reward_tracker)
if self._compile_metrics is not None:
metrics.append(self._compile_metrics)
if self._compile_reward is not None:
metrics.extend(self._compile_reward.metrics)
metrics.extend(self._metrics)
return metrics
def _clear_previous_trainer_metrics(self):
for module in self._flatten_modules(include_self=False):
if not isinstance(module, Trainer):
continue
# A submodule might be a Trainer. In that case, we need to clear
# the Trainer-related metrics, as they are not usable when a
# new Trainer is instantiated.
for m in self._get_own_metrics():
module._tracker.untrack(m)
module._reward_tracker = None
module._compile_metrics = None
if module._compile_reward is not None:
module._compile_reward._metrics.clear()
module._metrics.clear()
@property
def run_eagerly(self):
return self._run_eagerly
@run_eagerly.setter
def run_eagerly(self, value):
self._run_eagerly = value
async def compute_reward(
self,
x=None,
y=None,
y_pred=None,
training=True,
):
"""Compute the total reward, validate it, and return it.
Subclasses can optionally override this method to provide custom reward
computation logic.
Args:
x (list): Input data.
y (list): Target data.
y_pred (list): Predictions returned by the program (output of `program(x)`).
training (bool): Whether we are training or evaluating the program.
Returns:
(float | None): The total reward as a scalar, or `None` if no reward results
(which is the case when called by `Program.test_step`).
"""
# The default implementation does not use `x` or `training`.
del x
del training
rewards = []
if self._compile_reward is not None:
for y_t, y_p in zip(y, y_pred):
reward = await self._compile_reward(y_t, y_p)
if reward is not None:
rewards.append(reward)
for reward in self.rewards:
rewards.append(numpy.sum(reward))
if len(rewards) == 1:
total_reward = rewards[0]
elif len(rewards) == 0:
total_reward = numpy.zeros(())
else:
total_reward = numpy.mean(rewards)
return float(total_reward)
def stateless_compute_reward(
self,
trainable_variables,
non_trainable_variables,
metrics_variables,
x=None,
y=None,
y_pred=None,
training=True,
):
var_mapping = list(zip(self.trainable_variables, trainable_variables))
var_mapping.extend(zip(self.non_trainable_variables, non_trainable_variables))
var_mapping.extend(zip(self.metrics_variables, metrics_variables))
with backend.StatelessScope(state_mapping=var_mapping) as scope:
# Note that this is needed for the regularization reward, which need
# the latest value of train/non-trainable variables.
reward = self._compute_reward(
x,
y,
y_pred,
training=training,
)
# Update non trainable vars (may have been updated in compute_reward)
non_trainable_variables = []
for v in self.non_trainable_variables:
new_v = scope.get_current_value(v)
non_trainable_variables.append(new_v)
# Update metrics vars (may have been updated in compute_reward)
metrics_variables = []
for v in self.metrics_variables:
new_v = scope.get_current_value(v)
metrics_variables.append(new_v)
return reward, (
trainable_variables,
non_trainable_variables,
metrics_variables,
)
async def compute_metrics(self, x, y, y_pred):
"""Update metric states and collect all metrics to be returned.
Subclasses can optionally override this method to provide custom metric
updating and collection logic. Custom metrics are not passed in
`compile()`, they can be created in `__init__` or `build`. They are
automatically tracked and returned by `self.metrics`.
```
Args:
x: Input data.
y: Target data.
y_pred: Predictions returned by the program output of `program.call(x)`.
Returns:
A `dict` containing values that will be passed to
`synalinks.callbacks.CallbackList.on_train_batch_end()`. Typically,
the values of the metrics listed in `self.metrics` are returned.
Example: `{'reward': 0.2, 'accuracy': 0.7}`.
"""
del x # The default implementation does not use `x`.
if self._compile_metrics is not None:
for y_t, y_p in zip(y, y_pred):
await self._compile_metrics.update_state(y_t, y_p)
return self.get_metrics_result()
def get_metrics_result(self):
"""Returns the program's metrics values as a dict.
If any of the metric result is a dict (containing multiple metrics),
each of them gets added to the top level returned dict of this method.
Returns:
(dict): A `dict` containing values of the metrics listed in `self.metrics`.
Example: `{'reward': 0.2, 'accuracy': 0.7}`.
"""
return_metrics = {}
for metric in self.metrics:
result = metric.result()
if isinstance(result, dict):
return_metrics.update(result)
else:
return_metrics[metric.name] = result
return python_utils.pythonify_logs(return_metrics)
async def fit(
self,
x=None,
y=None,
batch_size=1,
epochs=1,
verbose="auto",
callbacks=None,
validation_split=0.1,
validation_data=None,
shuffle=True,
initial_epoch=0,
steps_per_epoch=None,
validation_steps=None,
validation_batch_size=32,
validation_freq=1,
):
"""Trains the program for a fixed number of epochs (dataset iterations).
Args:
x (np.ndarray | generator): Input data. It can be:
- A NumPy array (or array-like), or a list of `DataModel` arrays
(in case the model has multiple inputs).
- A list of dict mapping input names to the corresponding `DataModel`s,
if the program has named inputs.
- A Python generator function yielding `(inputs, targets)`.
y (np.ndarray): Target data. Like the input data `x`, it can be either NumPy
array(s) of `DataModel`(s). If `x` is a Python generator function,
`y` should not be specified since targets will be obtained from
`x`.
batch_size (int): Integer or `None`.
Number of samples per batch of computation.
If unspecified, `batch_size` will default to 32.
Do not specify the `batch_size` if your input data `x` is a
Python generator function since they generate batches.
epochs (int): Integer. Number of epochs to train the program.
An epoch is an iteration over the entire `x` and `y`
data provided (unless the `steps_per_epoch` flag is set to
something other than None).
Note that in conjunction with `initial_epoch`,
`epochs` is to be understood as "final epoch".
The program is not trained for a number of iterations
given by `epochs`, but merely until the epoch
of index `epochs` is reached.
verbose (int): `"auto"`, 0, 1, or 2. Verbosity mode.
0 = silent, 1 = progress bar, 2 = one line per epoch.
"auto" becomes 1 for most cases.
Note that the progress bar is not
particularly useful when logged to a file,
so `verbose=2` is recommended when not running interactively
(e.g., in a production environment). Defaults to `"auto"`.
callbacks (list): List of `synalinks.callbacks.Callback` instances.
List of callbacks to apply during training.
See `synalinks.callbacks`. Note
`synalinks.callbacks.ProgbarLogger` and
`synalinks.callbacks.History` callbacks are created
automatically and need not be passed to `program.fit()`.
`synalinks.callbacks.ProgbarLogger` is created
or not based on the `verbose` argument in `program.fit()`.
validation_split (float): Float between 0 and 1.
Fraction of the training data to be used as validation data.
The program will set apart this fraction of the training data,
will not train on it, and will evaluate the reward and any program
metrics on this data at the end of each epoch. The validation
data is selected from the last samples in the `x` and `y` data
provided, before shuffling.
This argument is only supported when `x` and `y` are made of
data_models.
If both `validation_data` and `validation_split` are provided,
`validation_data` will override `validation_split`.
validation_data (tuple | iterator): Data on which to evaluate
the reward and any program metrics at the end of each epoch.
The program will not be trained on this data.
`validation_data` will override `validation_split`.
It can be:
- A tuple `(x_val, y_val)` of `DataModel`s lists.
shuffle (bool): Whether to shuffle the training data before each
epoch. This argument is ignored when `x` is a Python generator function.
initial_epoch (int): Integer.
Epoch at which to start training
(useful for resuming a previous training run).
steps_per_epoch (int): Integer or `None`.
Total number of steps (batches of samples) before declaring one
epoch finished and starting the next epoch. When training with
input data_models arrays, the default `None` means that the
value used is the number of samples in your dataset divided by
the batch size, or 1 if that cannot be determined.
If `x` is a Python generator function, the
epoch will run until the input dataset is exhausted. When
passing an infinitely repeating dataset, you must specify the
`steps_per_epoch` argument, otherwise the training will run
indefinitely.
validation_steps (int): Integer or `None`.
Only relevant if `validation_data` is provided.
Total number of steps (batches of samples) to draw before
stopping when performing validation at the end of every epoch.
If `validation_steps` is `None`, validation will run until the
`validation_data` dataset is exhausted. In the case of an
infinitely repeating dataset, it will run indefinitely. If
`validation_steps` is specified and only part of the dataset
is consumed, the evaluation will start from the beginning of the
dataset at each epoch. This ensures that the same validation
samples are used every time.
validation_batch_size (int): Integer or `None`.
Number of samples per validation batch.
If unspecified, will default to `batch_size`.
Do not specify the `validation_batch_size` if your data is a
`synalinks.utils.PyDataset`, `tf.data.Dataset`,
`torch.utils.data.DataLoader` or Python generator function
since they generate batches.
validation_freq (int): Only relevant if validation data is provided.
Specifies how many training epochs to run
before a new validation run is performed,
e.g. `validation_freq=2` runs validation every 2 epochs.
Returns:
(History): A `History` object. Its `History.history` attribute is
a record of training reward values and metrics values
at successive epochs, as well as validation reward values
and validation metrics values (if applicable).
"""
self._assert_compile_called("fit")
self._eval_epoch_iterator = None
val_y, val_y = None, None
if validation_split and validation_data is None:
# Create the validation data using the training data. Only supported
# for numpy arrays.
(x, y), validation_data = array_slicing.train_validation_split(
(x, y), validation_split=validation_split
)
if validation_data is not None:
(val_x, val_y) = data_adapter_utils.unpack_x_y(validation_data)
# Create an iterator that yields batches of input/target data.
epoch_iterator = EpochIterator(
x=x,
y=y,
batch_size=batch_size,
steps_per_epoch=steps_per_epoch,
shuffle=False,
steps_per_execution=self.steps_per_execution,
)
if not all(module.built for module in self._flatten_modules()):
# Build the model on one batch of data.
for _, data in epoch_iterator:
data_batch = data[0]
self._auto_build(
iterator=epoch_iterator,
data_batch=data_batch,
)
break
epoch_iterator.reset()
# Container that configures and calls callbacks.
if not isinstance(callbacks, callbacks_module.CallbackList):
callbacks = callbacks_module.CallbackList(
callbacks,
add_history=True,
add_progbar=verbose != 0,
verbose=verbose,
epochs=epochs,
steps=steps_per_epoch,
program=self,
)
self.stop_training = False
callbacks.on_train_begin()
training_logs = None
logs = {}
initial_epoch = self._initial_epoch or initial_epoch
if self.trainable_variables and isinstance(
self.optimizer, optimizers_module.Optimizer
):
await self.optimizer.on_train_begin(
self.trainable_variables,
)
for epoch in range(initial_epoch, epochs):
self.reset_metrics()
if self.trainable_variables and isinstance(
self.optimizer, optimizers_module.Optimizer
):
await self.optimizer.on_epoch_begin(
epoch,
self.trainable_variables,
)
callbacks.on_epoch_begin(epoch)
with epoch_iterator.catch_stop_iteration():
for step, iterator in epoch_iterator:
data = iterator[0]
x_batch, y_batch = data_adapter_utils.unpack_x_y(data)
if self.trainable_variables and isinstance(
self.optimizer, optimizers_module.Optimizer
):
await self.optimizer.on_batch_begin(
step,
epoch,
self.trainable_variables,
)
callbacks.on_train_batch_begin(step)
logs = await self.train_on_batch(
step=step,
x=x_batch,
y=y_batch,
val_x=val_x,
val_y=val_y,
return_dict=True,
)
val_logs = await self.evaluate(
x=val_x,
y=val_y,
batch_size=validation_batch_size or batch_size,
steps=validation_steps,
callbacks=callbacks,
_use_cached_eval_dataset=False,
)
if self.trainable_variables and isinstance(
self.optimizer, optimizers_module.Optimizer
):
await self.optimizer.on_batch_end(
step,
epoch,
self.trainable_variables,
)
callbacks.on_train_batch_end(step, logs)
if self.stop_training:
break
# Override with model metrics instead of last step logs if needed.
epoch_logs = dict(self._get_metrics_result_or_logs(logs))
# Run validation.
if validation_data is not None and self._should_eval(epoch, validation_freq):
# Create EpochIterator for evaluation and cache it.
if getattr(self, "_eval_epoch_iterator", None) is None:
self._eval_epoch_iterator = EpochIterator(
x=val_x,
y=val_y,
batch_size=validation_batch_size or batch_size,
steps_per_execution=self.steps_per_execution,
steps_per_epoch=validation_steps,
shuffle=False,
)
val_logs = await self.evaluate(
x=val_x,
y=val_y,
batch_size=validation_batch_size or batch_size,
steps=validation_steps,
callbacks=callbacks,
_use_cached_eval_dataset=True,
)
val_logs = {"val_" + name: val for name, val in val_logs.items()}
epoch_logs.update(val_logs)
if self.trainable_variables and isinstance(
self.optimizer, optimizers_module.Optimizer
):
await self.optimizer.on_epoch_end(
epoch,
self.trainable_variables,
)
callbacks.on_epoch_end(epoch, epoch_logs)
training_logs = epoch_logs
if self.stop_training:
break
# If _eval_epoch_iterator exists, delete it after all epochs are done.
if getattr(self, "_eval_epoch_iterator", None) is not None:
del self._eval_epoch_iterator
if self.trainable_variables and isinstance(
self.optimizer, optimizers_module.Optimizer
):
await self.optimizer.on_train_end(self.trainable_variables)
callbacks.on_train_end(logs=training_logs)
return self.history
async def evaluate(
self,
x=None,
y=None,
batch_size=32,
verbose="auto",
steps=None,
callbacks=None,
return_dict=True,
**kwargs,
):
"""Returns the reward value & metrics values for the program in test mode.
Computation is done in batches (see the `batch_size` arg.)
Args:
x (np.ndarray | generator): Input data. It can be:
- A NumPy array (or array-like), or a list of `DataModel` arrays
(in case the model has multiple inputs).
- A list of dict mapping input names to the corresponding `DataModel`s,
if the program has named inputs.
- A Python generator function yielding `(inputs, targets)`.
y (np.ndarray): Target data. Like the input data `x`, it can be either NumPy
array(s) of `DataModel`(s). If `x` is a Python generator function,
`y` should not be specified since targets will be obtained from
`x`.
batch_size (int): Integer or `None`.
Number of samples per batch of computation.
If unspecified, `batch_size` will default to 32.
Do not specify the `batch_size` if your input data `x` is a
Python generator function since they generate batches.
verbose (int | str): `"auto"`, 0, 1, or 2. Verbosity mode.
0 = silent, 1 = progress bar, 2 = single line.
`"auto"` becomes 1 for most cases.
Note that the progress bar is not
particularly useful when logged to a file, so `verbose=2` is
recommended when not running interactively
(e.g. in a production environment). Defaults to `"auto"`.
steps (int): Integer or `None`.
Total number of steps (batches of samples) to draw before
declaring the evaluation round finished. If `steps` is `None`,
it will run until `x` is exhausted. In the case of an infinitely
repeating dataset, it will run indefinitely.
callbacks (list): List of `synalinks.callbacks.Callback` instances.
List of callbacks to apply during evaluation.
return_dict (bool): If `True`, reward and metric results are returned as a
dict, with each key being the name of the metric.
If `False`, they are returned as a list.
Returns:
(float | list | dict): Scalar test reward
(if the program has a single output and no metrics)
or list of scalars (if the program has multiple outputs
and/or metrics). The attribute `program.metrics_names` will give you
the display labels for the scalar outputs.
"""
self._assert_compile_called("evaluate")
use_cached_eval_dataset = kwargs.pop("_use_cached_eval_dataset", False)
if kwargs:
raise ValueError(f"Arguments not recognized: {kwargs}")
# Create an iterator that yields batches of input/target data.
if use_cached_eval_dataset:
epoch_iterator = self._eval_epoch_iterator
else:
epoch_iterator = EpochIterator(
x=x,
y=y,
batch_size=batch_size,
steps_per_epoch=steps,
shuffle=False,
steps_per_execution=self.steps_per_execution,
)
if not all(module.built for module in self._flatten_modules()):
# Build the model on one batch of data.
for _, data in epoch_iterator:
data_batch = data[0]
self._auto_build(
iterator=epoch_iterator,
data_batch=data_batch,
)
break
epoch_iterator.reset()
# Container that configures and calls callbacks.
if not isinstance(callbacks, callbacks_module.CallbackList):
callbacks = callbacks_module.CallbackList(
callbacks,
add_history=False,
add_progbar=verbose != 0,
verbose=verbose,
epochs=1,
steps=epoch_iterator.num_batches,
program=self,
)
self.stop_evaluating = False
callbacks.on_test_begin()
logs = {}
self.reset_metrics()
for step, iterator in epoch_iterator:
callbacks.on_test_batch_begin(step)
data = iterator[0]
x_batch, y_batch = data_adapter_utils.unpack_x_y(data)
logs = await self.test_on_batch(
x=x_batch,
y=y_batch,
return_dict=True,
)
callbacks.on_test_batch_end(step, logs)
if self.stop_evaluating:
break
logs = self.get_metrics_result()
callbacks.on_test_end(logs)
if return_dict:
return logs
return self._flatten_metrics_in_order(logs)
async def predict(
self, x, batch_size=None, verbose="auto", steps=None, callbacks=None
):
"""Generates output predictions for the input samples.
Computation is done in batches. This method is designed for batch
processing of large numbers of inputs. It is not intended for use inside
of loops that iterate over your data and process small numbers of inputs
at a time.
For small numbers of inputs that fit in one batch,
directly use `__call__()` for faster execution, e.g.,
`program(x)`, or `program(x, training=False)` if you have modules
that behave differently during inference.
Args:
x (np.ndarray | generator): Input data. It can be:
- A NumPy array (or array-like), or a list of `DataModel` arrays
(in case the model has multiple inputs).
- A list of dict mapping input names to the corresponding `DataModel`s,
if the program has named inputs.
- A Python generator function yielding `(inputs, targets)`.
batch_size (int): Integer or `None`.
Number of samples per batch of computation.
If unspecified, `batch_size` will default to 32.
Do not specify the `batch_size` if your input data `x` is a
`synalinks.utils.PyDataset`, `tf.data.Dataset`,
`torch.utils.data.DataLoader` or Python generator function
since they generate batches.
verbose (int): `"auto"`, 0, 1, or 2. Verbosity mode.
0 = silent, 1 = progress bar, 2 = single line.
`"auto"` becomes 1 for most cases. Note that the progress bar
is not particularly useful when logged to a file,
so `verbose=2` is recommended when not running interactively
(e.g. in a production environment). Defaults to `"auto"`.
steps (int): Total number of steps (batches of samples) to draw before
declaring the prediction round finished. If `steps` is `None`,
it will run until `x` is exhausted. In the case of an infinitely
repeating dataset, it will run indefinitely.
callbacks (list): List of `synalinks.callbacks.Callback` instances.
List of callbacks to apply during prediction.
Returns:
(list): `JsonDataModel` array(s) of predictions.
If the pipeline failed, a None is added to the predictions.
"""
# Create an iterator that yields batches of input data.
epoch_iterator = EpochIterator(
x=x,
batch_size=batch_size,
steps_per_epoch=steps,
shuffle=False,
steps_per_execution=self.steps_per_execution,
)
# Container that configures and calls callbacks.
if not isinstance(callbacks, callbacks_module.CallbackList):
callbacks = callbacks_module.CallbackList(
callbacks,
add_history=True,
add_progbar=verbose != 0,
verbose=verbose,
epochs=1,
steps=epoch_iterator.num_batches,
model=self,
)
self.stop_predicting = False
callbacks.on_test_begin()
outputs = []
for step, iterator in epoch_iterator:
callbacks.on_predict_batch_begin(step)
data = iterator[0]
x_batch, _ = data_adapter_utils.unpack_x_y(data)
batch_outputs = await self.predict_on_batch(x_batch)
outputs.extend(batch_outputs)
callbacks.on_predict_batch_end(step, {"outputs": batch_outputs})
if self.stop_predicting:
break
callbacks.on_predict_end()
return np.array(outputs, dtype="object")
async def train_on_batch(
self,
step,
x,
y=None,
val_x=None,
val_y=None,
return_dict=False,
):
"""Runs a single optimization step on a single batch of data.
Args:
step (int): The training step.
x (np.ndarray): Input data. Must be array-like.
y (np.ndarray): Target data. Must be array-like.
val_x (np.ndarray): Input validation data. Must be array-like.
val_y (np.ndarray): Target validation data. Must be array-like.
return_dict (bool): If `True`, reward and metric results are returned as a
dict, with each key being the name of the metric. If `False`,
they are returned as a list.
Returns:
(float | list | dict): A scalar reward value
(when no metrics and `return_dict=False`), a list of reward
and metric values (if there are metrics and `return_dict=False`),
or a dict of metric and reward values (if `return_dict=True`).
"""
if self.trainable_variables and isinstance(
self.optimizer, optimizers_module.Optimizer
):
metrics = await self.optimizer.optimize(
step,
self.trainable_variables,
x=x,
y=y,
val_x=val_x,
val_y=val_y,
)
else:
warnings.warn("The program does not have any trainable variables.")
y_pred = await self.predict_on_batch(val_x)
reward = await self.compute_reward(
x=val_x,
y=val_y,
y_pred=y_pred,
)
await self._reward_tracker.update_state(reward)
metrics = await self.compute_metrics(val_x, val_y, y_pred)
if return_dict:
return metrics
return self._flatten_metrics_in_order(metrics)
async def test_on_batch(
self,
x,
y=None,
return_dict=False,
):
"""Test the program on a single batch of samples.
Args:
x (np.ndarray): Input data. Must be array-like.
y (np.ndarray): Target data. Must be array-like.
return_dict (bool): If `True`, reward and metric results are returned as a
dict, with each key being the name of the metric. If `False`,
they are returned as a list.
Returns:
(float | list | dict): A scalar reward value
(when no metrics and `return_dict=False`), a list of reward
and metric values (if there are metrics and `return_dict=False`),
or a dict of metric and reward values (if `return_dict=True`).
"""
y_pred = await self.predict_on_batch(x)
reward = await self.compute_reward(
x=x,
y=y,
y_pred=y_pred,
training=False,
)
await self._reward_tracker.update_state(reward)
metrics = await self.compute_metrics(x, y, y_pred)
if return_dict:
return metrics
return self._flatten_metrics_in_order(metrics)
async def predict_on_batch(self, x, training=False):
"""Returns predictions for a single batch of samples.
Args:
x (np.ndarray): Input data. Must be array-like.
training (bool): Boolean. True if training.
Returns:
(list): list(s) of JsonDataModel predictions.
"""
tasks = []
for inputs in x:
tasks.append(self(inputs, training=training))
y_pred = await asyncio.gather(*tasks)
return y_pred
def get_compile_config(self):
"""Returns a serialized config with information for compiling the program.
This method returns a config dictionary containing all the information
(optimizer, reward, metrics, etc.) with which the program was compiled.
Returns:
(dict): A dict containing information for compiling the program.
"""
if self.compiled and hasattr(self, "_compile_config"):
return self._compile_config.serialize()
def compile_from_config(self, config):
"""Compiles the program with the information given in config.
This method uses the information in the config (optimizer, reward,
metrics, etc.) to compile the program.
Args:
config (dict): Dict containing information for compiling the program.
"""
has_overridden_compile = self.__class__.compile != Trainer.compile
if has_overridden_compile:
warnings.warn(
"`compile()` was not called as part of program loading "
"because the program's `compile()` method is custom. "
"All subclassed Models that have `compile()` "
"overridden should also override "
"`get_compile_config()` and `compile_from_config(config)`. "
"Alternatively, you can "
"call `compile()` manually after loading.",
stacklevel=2,
)
return
config = serialization_lib.deserialize_synalinks_object(config)
self.compile(**config)
if hasattr(self, "optimizer") and self.built:
# Create optimizer variables/programs.
if not self.optimizer.built:
run_maybe_nested(self.optimizer.build(self.trainable_variables))
def _should_reward(self, epoch, validation_freq):
epoch = epoch + 1 # one-index the user-facing epoch.
if isinstance(validation_freq, int):
return epoch % validation_freq == 0
elif isinstance(validation_freq, list):
return epoch in validation_freq
else:
raise ValueError(
"Expected `validation_freq` to be a list or int. "
f"Received: validation_freq={validation_freq} of the "
f"type {type(validation_freq)}."
)
def _get_metrics_result_or_logs(self, logs):
"""Returns program metrics as a dict if the keys match with input logs.
When the training / evaluation is performed with an asynchronous steps,
the last scheduled `train / test_step` may not give the latest metrics
because it is not guaranteed to be executed the last. This method gets
metrics from the program directly instead of relying on the return from
last step function.
When the user has custom train / test step functions, the metrics
returned may be different from `Program.metrics`. In those instances,
this function will be no-op and return the logs passed in.
Args:
logs (dict): A `dict` of metrics returned by train / test step function.
Returns:
(dict): A `dict` containing values of the metrics listed in `self.metrics`
when logs and program metrics keys match. Otherwise it returns input
`logs`.
"""
metric_logs = self.get_metrics_result()
# Verify that train / test step logs passed and metric logs have
# matching keys. It could be different when using custom step functions,
# in which case we return the logs from the last step.
if isinstance(logs, dict) and set(logs.keys()) == set(metric_logs.keys()):
return metric_logs
return logs
def _flatten_metrics_in_order(self, logs):
"""Turns `logs` dict into a list as per key order of `metrics_names`."""
metric_names = []
for metric in self.metrics:
if isinstance(metric, CompileMetrics):
metric_names += [sub_metric.name for sub_metric in metric.metrics]
else:
metric_names.append(metric.name)
results = []
for name in metric_names:
if name in logs:
results.append(logs[name])
for key in sorted(logs.keys()):
if key not in metric_names:
results.append(logs[key])
if len(results) == 1:
return results[0]
return results
def _assert_compile_called(self, method_name=None):
if not self.compiled:
msg = "You must call `compile()` before "
if metrics_module:
msg += "using the program."
else:
msg += f"calling `{method_name}()`."
raise ValueError(msg)
def _auto_build(self, iterator=None, data_batch=None):
program_unbuilt = not all(module.built for module in self._flatten_modules())
compile_metrics_unbuilt = (
self._compile_metrics is not None and not self._compile_metrics.built
)
compile_reward_unbuilt = (
self._compile_reward is not None and not self._compile_reward.built
)
optimizer_unbuilt = self.optimizer is not None and not self.optimizer.built
if program_unbuilt or compile_metrics_unbuilt or compile_reward_unbuilt:
if data_batch is None:
for _, data_or_iterator in iterator:
if isinstance(data_or_iterator, (list, tuple)):
data_batch = data_or_iterator[0]
else:
data_batch = next(data_or_iterator)
break
(x, y) = data_batch
try:
y_pred = run_maybe_nested(self.predict_on_batch(x))
except Exception as e:
raise RuntimeError(
"Unable to automatically build the program. "
"Please build it yourself before calling "
"fit/evaluate/predict. "
"A program is 'built' when its variables have "
"been created and its `self.built` attribute "
"is True. Usually, calling the program on a batch "
"of data is the right way to build it.\n"
"Exception encountered:\n"
f"'{e}'"
)
if compile_metrics_unbuilt:
# Build all metric state with `backend.compute_output_spec`.
run_maybe_nested(
backend.compute_output_spec(
self.compute_metrics,
x,
y,
y_pred,
)
)
if compile_reward_unbuilt:
# Build `CompileReward` state with `backend.compute_output_spec`.
run_maybe_nested(
backend.compute_output_spec(
self.compute_reward,
x,
y,
y_pred,
training=False,
)
)
if optimizer_unbuilt:
# Build optimizer
run_maybe_nested(self.optimizer.build(self.trainable_variables))
self._post_build()
def _assert_compile_called(self, method_name=None):
if not self.compiled:
msg = "You must call `compile()` before "
if metrics_module:
msg += "using the model."
else:
msg += f"calling `{method_name}()`."
raise ValueError(msg)
def _should_eval(self, epoch, validation_freq):
epoch = epoch + 1 # one-index the user-facing epoch.
if isinstance(validation_freq, int):
return epoch % validation_freq == 0
elif isinstance(validation_freq, list):
return epoch in validation_freq
else:
raise ValueError(
"Expected `validation_freq` to be a list or int. "
f"Received: validation_freq={validation_freq} of the "
f"type {type(validation_freq)}."
)
````
## `compile(optimizer=None, reward=None, reward_weights=None, metrics=None, run_eagerly=False, steps_per_execution=1)`
Configures the program for training.
Example:
```
program.compile(
optimizer=synalinks.optimizers.RandomFewShot(),
reward=synalinks.rewards.ExactMatch(),
metrics=[
synalinks.metrics.MeanMetricWrapper(synalinks.rewards.exact_match),
],
)
```
Parameters:
| Name | Type | Description | Default |
| --------------------- | ----------- || ------- |
| `optimizer` | `Optimizer` | Optimizer instance. See synalinks.optimizers. | `None` |
| `reward` | `Reward` | Reward function. A synalinks.rewards.Reward instance. See synalinks.rewards. A reward function is any callable with the signature reward = fn(y_true, y_pred), where y_true are the ground truth values, and y_pred are the program's predictions. y_true should be a list of batch size length [d0, .. dN]. y_pred should be a list of batch size length [d0, .. dN]. The reward function should return a float. | `None` |
| `reward_weights` | `list` | Optional list specifying scalar coefficients (Python floats) to weight the reward contributions of different program outputs. The reward value that will be maximized by the program will then be the weighted sum of all individual rewards, weighted by the reward_weights coefficients. It is expected to have a 1:1 mapping to the program's outputs. | `None` |
| `metrics` | `list` | List of metrics to be evaluated by the program during training and testing. Each of it is a synalinks.metrics.Metric instance. See synalinks.metrics. A function is any callable with the signature result = fn(y_true, y_pred). | `None` |
| `run_eagerly` | `bool` | If True, this program's forward pass will never be compiled. It is recommended to leave this as False when training (for best performance), and to set it to True when debugging. | `False` |
| `steps_per_execution` | `int` | The number of batches to run during each a single compiled function call. Running multiple batches inside a single compiled function call can greatly improve performance on TPUs or small programs with a large Python overhead. At most, one full epoch will be run each execution. If a number larger than the size of the epoch is passed, the execution will be truncated to the size of the epoch. Note that if steps_per_execution is set to N, Callback.on_batch_begin and Callback.on_batch_end methods will only be called every N batches (i.e. before/after each compiled function execution). | `1` |
Source code in `synalinks/src/trainers/trainer.py`
````
@tracking.no_automatic_dependency_tracking
def compile(
self,
optimizer=None,
reward=None,
reward_weights=None,
metrics=None,
run_eagerly=False,
steps_per_execution=1,
):
"""Configures the program for training.
Example:
```python
program.compile(
optimizer=synalinks.optimizers.RandomFewShot(),
reward=synalinks.rewards.ExactMatch(),
metrics=[
synalinks.metrics.MeanMetricWrapper(synalinks.rewards.exact_match),
],
)
```
Args:
optimizer (Optimizer): Optimizer instance. See `synalinks.optimizers`.
reward (Reward): Reward function. A `synalinks.rewards.Reward`
instance. See `synalinks.rewards`. A reward function is
any callable with the signature `reward = fn(y_true, y_pred)`,
where `y_true` are the ground truth values, and `y_pred`
are the program's predictions.
`y_true` should be a list of batch size length `[d0, .. dN]`.
`y_pred` should be a list of batch size length `[d0, .. dN]`.
The reward function should return a float.
reward_weights (list): Optional list specifying scalar coefficients
(Python floats) to weight the reward contributions of
different program outputs. The reward value that will be maximized
by the program will then be the *weighted sum* of all individual
rewards, weighted by the `reward_weights` coefficients. It is
expected to have a 1:1 mapping to the program's outputs.
metrics (list): List of metrics to be evaluated by the program during
training and testing. Each of it is a `synalinks.metrics.Metric`
instance. See `synalinks.metrics`. A function is any callable with the
signature `result = fn(y_true, y_pred)`.
run_eagerly (bool): If `True`, this program's forward pass
will never be compiled. It is recommended to leave this
as `False` when training (for best performance),
and to set it to `True` when debugging.
steps_per_execution (int): The number of batches to run
during each a single compiled function call. Running multiple
batches inside a single compiled function call can
greatly improve performance on TPUs or small programs with a large
Python overhead. At most, one full epoch will be run each
execution. If a number larger than the size of the epoch is
passed, the execution will be truncated to the size of the
epoch. Note that if `steps_per_execution` is set to `N`,
`Callback.on_batch_begin` and `Callback.on_batch_end` methods
will only be called every `N` batches (i.e. before/after
each compiled function execution).
"""
self._clear_previous_trainer_metrics()
self._optimizer = optimizer
self._optimizer.set_program(self)
if hasattr(self, "output_names"):
output_names = self.output_names
else:
output_names = None
if reward is not None:
self._compile_reward = CompileReward(
reward, reward_weights, output_names=output_names
)
self.reward = reward
if metrics is not None:
self._compile_metrics = CompileMetrics(metrics, output_names=output_names)
self.run_eagerly = run_eagerly
self.stop_training = False
self.compiled = True
self._reward_tracker = metrics_module.Mean(name="reward")
self.steps_per_execution = steps_per_execution
self._compile_config = serialization_lib.SerializableDict(
optimizer=optimizer,
reward=reward,
reward_weights=reward_weights,
metrics=metrics,
run_eagerly=run_eagerly,
steps_per_execution=steps_per_execution,
)
````
## `compile_from_config(config)`
Compiles the program with the information given in config.
This method uses the information in the config (optimizer, reward, metrics, etc.) to compile the program.
Parameters:
| Name | Type | Description | Default |
| -------- | ------ | ------------------------------------------------------ | ---------- |
| `config` | `dict` | Dict containing information for compiling the program. | *required* |
Source code in `synalinks/src/trainers/trainer.py`
```
def compile_from_config(self, config):
"""Compiles the program with the information given in config.
This method uses the information in the config (optimizer, reward,
metrics, etc.) to compile the program.
Args:
config (dict): Dict containing information for compiling the program.
"""
has_overridden_compile = self.__class__.compile != Trainer.compile
if has_overridden_compile:
warnings.warn(
"`compile()` was not called as part of program loading "
"because the program's `compile()` method is custom. "
"All subclassed Models that have `compile()` "
"overridden should also override "
"`get_compile_config()` and `compile_from_config(config)`. "
"Alternatively, you can "
"call `compile()` manually after loading.",
stacklevel=2,
)
return
config = serialization_lib.deserialize_synalinks_object(config)
self.compile(**config)
if hasattr(self, "optimizer") and self.built:
# Create optimizer variables/programs.
if not self.optimizer.built:
run_maybe_nested(self.optimizer.build(self.trainable_variables))
```
## `compute_metrics(x, y, y_pred)`
Update metric states and collect all metrics to be returned.
Subclasses can optionally override this method to provide custom metric updating and collection logic. Custom metrics are not passed in `compile()`, they can be created in `__init__` or `build`. They are automatically tracked and returned by `self.metrics`.
```
Args:
x: Input data.
y: Target data.
y_pred: Predictions returned by the program output of `program.call(x)`.
Returns:
A `dict` containing values that will be passed to
`synalinks.callbacks.CallbackList.on_train_batch_end()`. Typically,
the values of the metrics listed in `self.metrics` are returned.
Example: `{'reward': 0.2, 'accuracy': 0.7}`.
Source code in `synalinks/src/trainers/trainer.py`
```
async def compute_metrics(self, x, y, y_pred): """Update metric states and collect all metrics to be returned.
````
Subclasses can optionally override this method to provide custom metric
updating and collection logic. Custom metrics are not passed in
`compile()`, they can be created in `__init__` or `build`. They are
automatically tracked and returned by `self.metrics`.
```
Args:
x: Input data.
y: Target data.
y_pred: Predictions returned by the program output of `program.call(x)`.
Returns:
A `dict` containing values that will be passed to
`synalinks.callbacks.CallbackList.on_train_batch_end()`. Typically,
the values of the metrics listed in `self.metrics` are returned.
Example: `{'reward': 0.2, 'accuracy': 0.7}`.
"""
del x # The default implementation does not use `x`.
if self._compile_metrics is not None:
for y_t, y_p in zip(y, y_pred):
await self._compile_metrics.update_state(y_t, y_p)
return self.get_metrics_result()
````
```
## `compute_reward(x=None, y=None, y_pred=None, training=True)`
Compute the total reward, validate it, and return it.
Subclasses can optionally override this method to provide custom reward
computation logic.
Parameters:
| Name | Type | Description | Default |
| --- | --- | --- | --- |
| `x` | `list` | Input data. | `None` |
| `y` | `list` | Target data. | `None` |
| `y_pred` | `list` | Predictions returned by the program (output of program(x)). | `None` |
| `training` | `bool` | Whether we are training or evaluating the program. | `True` |
Returns:
| Type | Description |
| --- | --- |
| `float | None` | The total reward as a scalar, or None if no reward results (which is the case when called by Program.test_step). |
Source code in `synalinks/src/trainers/trainer.py`
```
async def compute_reward( self, x=None, y=None, y_pred=None, training=True, ): """Compute the total reward, validate it, and return it.
```
Subclasses can optionally override this method to provide custom reward
computation logic.
Args:
x (list): Input data.
y (list): Target data.
y_pred (list): Predictions returned by the program (output of `program(x)`).
training (bool): Whether we are training or evaluating the program.
Returns:
(float | None): The total reward as a scalar, or `None` if no reward results
(which is the case when called by `Program.test_step`).
"""
# The default implementation does not use `x` or `training`.
del x
del training
rewards = []
if self._compile_reward is not None:
for y_t, y_p in zip(y, y_pred):
reward = await self._compile_reward(y_t, y_p)
if reward is not None:
rewards.append(reward)
for reward in self.rewards:
rewards.append(numpy.sum(reward))
if len(rewards) == 1:
total_reward = rewards[0]
elif len(rewards) == 0:
total_reward = numpy.zeros(())
else:
total_reward = numpy.mean(rewards)
return float(total_reward)
```
```
## `evaluate(x=None, y=None, batch_size=32, verbose='auto', steps=None, callbacks=None, return_dict=True, **kwargs)`
Returns the reward value & metrics values for the program in test mode.
Computation is done in batches (see the `batch_size` arg.)
Parameters:
| Name | Type | Description | Default |
| --- | --- | --- | --- |
| `x` | `ndarray | generator` | Input data. It can be: - A NumPy array (or array-like), or a list of DataModel arrays (in case the model has multiple inputs). - A list of dict mapping input names to the corresponding DataModels, if the program has named inputs. - A Python generator function yielding (inputs, targets). | `None` |
| `y` | `ndarray` | Target data. Like the input data x, it can be either NumPy array(s) of DataModel(s). If x is a Python generator function, y should not be specified since targets will be obtained from x. | `None` |
| `batch_size` | `int` | Integer or None. Number of samples per batch of computation. If unspecified, batch_size will default to 32. Do not specify the batch_size if your input data x is a Python generator function since they generate batches. | `32` |
| `verbose` | `int | str` | "auto", 0, 1, or 2. Verbosity mode. 0 = silent, 1 = progress bar, 2 = single line. "auto" becomes 1 for most cases. Note that the progress bar is not particularly useful when logged to a file, so verbose=2 is recommended when not running interactively (e.g. in a production environment). Defaults to "auto". | `'auto'` |
| `steps` | `int` | Integer or None. Total number of steps (batches of samples) to draw before declaring the evaluation round finished. If steps is None, it will run until x is exhausted. In the case of an infinitely repeating dataset, it will run indefinitely. | `None` |
| `callbacks` | `list` | List of synalinks.callbacks.Callback instances. List of callbacks to apply during evaluation. | `None` |
| `return_dict` | `bool` | If True, reward and metric results are returned as a dict, with each key being the name of the metric. If False, they are returned as a list. | `True` |
Returns:
| Type | Description |
| --- | --- |
| `float | list | dict` | Scalar test reward (if the program has a single output and no metrics) or list of scalars (if the program has multiple outputs and/or metrics). The attribute program.metrics_names will give you the display labels for the scalar outputs. |
Source code in `synalinks/src/trainers/trainer.py`
```
async def evaluate( self, x=None, y=None, batch_size=32, verbose="auto", steps=None, callbacks=None, return_dict=True, \*\*kwargs, ): """Returns the reward value & metrics values for the program in test mode.
```
Computation is done in batches (see the `batch_size` arg.)
Args:
x (np.ndarray | generator): Input data. It can be:
- A NumPy array (or array-like), or a list of `DataModel` arrays
(in case the model has multiple inputs).
- A list of dict mapping input names to the corresponding `DataModel`s,
if the program has named inputs.
- A Python generator function yielding `(inputs, targets)`.
y (np.ndarray): Target data. Like the input data `x`, it can be either NumPy
array(s) of `DataModel`(s). If `x` is a Python generator function,
`y` should not be specified since targets will be obtained from
`x`.
batch_size (int): Integer or `None`.
Number of samples per batch of computation.
If unspecified, `batch_size` will default to 32.
Do not specify the `batch_size` if your input data `x` is a
Python generator function since they generate batches.
verbose (int | str): `"auto"`, 0, 1, or 2. Verbosity mode.
0 = silent, 1 = progress bar, 2 = single line.
`"auto"` becomes 1 for most cases.
Note that the progress bar is not
particularly useful when logged to a file, so `verbose=2` is
recommended when not running interactively
(e.g. in a production environment). Defaults to `"auto"`.
steps (int): Integer or `None`.
Total number of steps (batches of samples) to draw before
declaring the evaluation round finished. If `steps` is `None`,
it will run until `x` is exhausted. In the case of an infinitely
repeating dataset, it will run indefinitely.
callbacks (list): List of `synalinks.callbacks.Callback` instances.
List of callbacks to apply during evaluation.
return_dict (bool): If `True`, reward and metric results are returned as a
dict, with each key being the name of the metric.
If `False`, they are returned as a list.
Returns:
(float | list | dict): Scalar test reward
(if the program has a single output and no metrics)
or list of scalars (if the program has multiple outputs
and/or metrics). The attribute `program.metrics_names` will give you
the display labels for the scalar outputs.
"""
self._assert_compile_called("evaluate")
use_cached_eval_dataset = kwargs.pop("_use_cached_eval_dataset", False)
if kwargs:
raise ValueError(f"Arguments not recognized: {kwargs}")
# Create an iterator that yields batches of input/target data.
if use_cached_eval_dataset:
epoch_iterator = self._eval_epoch_iterator
else:
epoch_iterator = EpochIterator(
x=x,
y=y,
batch_size=batch_size,
steps_per_epoch=steps,
shuffle=False,
steps_per_execution=self.steps_per_execution,
)
if not all(module.built for module in self._flatten_modules()):
# Build the model on one batch of data.
for _, data in epoch_iterator:
data_batch = data[0]
self._auto_build(
iterator=epoch_iterator,
data_batch=data_batch,
)
break
epoch_iterator.reset()
# Container that configures and calls callbacks.
if not isinstance(callbacks, callbacks_module.CallbackList):
callbacks = callbacks_module.CallbackList(
callbacks,
add_history=False,
add_progbar=verbose != 0,
verbose=verbose,
epochs=1,
steps=epoch_iterator.num_batches,
program=self,
)
self.stop_evaluating = False
callbacks.on_test_begin()
logs = {}
self.reset_metrics()
for step, iterator in epoch_iterator:
callbacks.on_test_batch_begin(step)
data = iterator[0]
x_batch, y_batch = data_adapter_utils.unpack_x_y(data)
logs = await self.test_on_batch(
x=x_batch,
y=y_batch,
return_dict=True,
)
callbacks.on_test_batch_end(step, logs)
if self.stop_evaluating:
break
logs = self.get_metrics_result()
callbacks.on_test_end(logs)
if return_dict:
return logs
return self._flatten_metrics_in_order(logs)
```
```
## `fit(x=None, y=None, batch_size=1, epochs=1, verbose='auto', callbacks=None, validation_split=0.1, validation_data=None, shuffle=True, initial_epoch=0, steps_per_epoch=None, validation_steps=None, validation_batch_size=32, validation_freq=1)`
Trains the program for a fixed number of epochs (dataset iterations).
Parameters:
| Name | Type | Description | Default |
| --- | --- | --- | --- |
| `x` | `ndarray | generator` | Input data. It can be: - A NumPy array (or array-like), or a list of DataModel arrays (in case the model has multiple inputs). - A list of dict mapping input names to the corresponding DataModels, if the program has named inputs. - A Python generator function yielding (inputs, targets). | `None` |
| `y` | `ndarray` | Target data. Like the input data x, it can be either NumPy array(s) of DataModel(s). If x is a Python generator function, y should not be specified since targets will be obtained from x. | `None` |
| `batch_size` | `int` | Integer or None. Number of samples per batch of computation. If unspecified, batch_size will default to 32. Do not specify the batch_size if your input data x is a Python generator function since they generate batches. | `1` |
| `epochs` | `int` | Integer. Number of epochs to train the program. An epoch is an iteration over the entire x and y data provided (unless the steps_per_epoch flag is set to something other than None). Note that in conjunction with initial_epoch, epochs is to be understood as "final epoch". The program is not trained for a number of iterations given by epochs, but merely until the epoch of index epochs is reached. | `1` |
| `verbose` | `int` | "auto", 0, 1, or 2. Verbosity mode. 0 = silent, 1 = progress bar, 2 = one line per epoch. "auto" becomes 1 for most cases. Note that the progress bar is not particularly useful when logged to a file, so verbose=2 is recommended when not running interactively (e.g., in a production environment). Defaults to "auto". | `'auto'` |
| `callbacks` | `list` | List of synalinks.callbacks.Callback instances. List of callbacks to apply during training. See synalinks.callbacks. Note synalinks.callbacks.ProgbarLogger and synalinks.callbacks.History callbacks are created automatically and need not be passed to program.fit(). synalinks.callbacks.ProgbarLogger is created or not based on the verbose argument in program.fit(). | `None` |
| `validation_split` | `float` | Float between 0 and 1. Fraction of the training data to be used as validation data. The program will set apart this fraction of the training data, will not train on it, and will evaluate the reward and any program metrics on this data at the end of each epoch. The validation data is selected from the last samples in the x and y data provided, before shuffling. This argument is only supported when x and y are made of data_models. If both validation_data and validation_split are provided, validation_data will override validation_split. | `0.1` |
| `validation_data` | `tuple | iterator` | Data on which to evaluate the reward and any program metrics at the end of each epoch. The program will not be trained on this data. validation_data will override validation_split. It can be: - A tuple (x_val, y_val) of DataModels lists. | `None` |
| `shuffle` | `bool` | Whether to shuffle the training data before each epoch. This argument is ignored when x is a Python generator function. | `True` |
| `initial_epoch` | `int` | Integer. Epoch at which to start training (useful for resuming a previous training run). | `0` |
| `steps_per_epoch` | `int` | Integer or None. Total number of steps (batches of samples) before declaring one epoch finished and starting the next epoch. When training with input data_models arrays, the default None means that the value used is the number of samples in your dataset divided by the batch size, or 1 if that cannot be determined. If x is a Python generator function, the epoch will run until the input dataset is exhausted. When passing an infinitely repeating dataset, you must specify the steps_per_epoch argument, otherwise the training will run indefinitely. | `None` |
| `validation_steps` | `int` | Integer or None. Only relevant if validation_data is provided. Total number of steps (batches of samples) to draw before stopping when performing validation at the end of every epoch. If validation_steps is None, validation will run until the validation_data dataset is exhausted. In the case of an infinitely repeating dataset, it will run indefinitely. If validation_steps is specified and only part of the dataset is consumed, the evaluation will start from the beginning of the dataset at each epoch. This ensures that the same validation samples are used every time. | `None` |
| `validation_batch_size` | `int` | Integer or None. Number of samples per validation batch. If unspecified, will default to batch_size. Do not specify the validation_batch_size if your data is a synalinks.utils.PyDataset, tf.data.Dataset, torch.utils.data.DataLoader or Python generator function since they generate batches. | `32` |
| `validation_freq` | `int` | Only relevant if validation data is provided. Specifies how many training epochs to run before a new validation run is performed, e.g. validation_freq=2 runs validation every 2 epochs. | `1` |
Returns:
| Type | Description |
| --- | --- |
| `History` | A History object. Its History.history attribute is a record of training reward values and metrics values at successive epochs, as well as validation reward values and validation metrics values (if applicable). |
Source code in `synalinks/src/trainers/trainer.py`
```
async def fit( self, x=None, y=None, batch_size=1, epochs=1, verbose="auto", callbacks=None, validation_split=0.1, validation_data=None, shuffle=True, initial_epoch=0, steps_per_epoch=None, validation_steps=None, validation_batch_size=32, validation_freq=1, ): """Trains the program for a fixed number of epochs (dataset iterations).
```
Args:
x (np.ndarray | generator): Input data. It can be:
- A NumPy array (or array-like), or a list of `DataModel` arrays
(in case the model has multiple inputs).
- A list of dict mapping input names to the corresponding `DataModel`s,
if the program has named inputs.
- A Python generator function yielding `(inputs, targets)`.
y (np.ndarray): Target data. Like the input data `x`, it can be either NumPy
array(s) of `DataModel`(s). If `x` is a Python generator function,
`y` should not be specified since targets will be obtained from
`x`.
batch_size (int): Integer or `None`.
Number of samples per batch of computation.
If unspecified, `batch_size` will default to 32.
Do not specify the `batch_size` if your input data `x` is a
Python generator function since they generate batches.
epochs (int): Integer. Number of epochs to train the program.
An epoch is an iteration over the entire `x` and `y`
data provided (unless the `steps_per_epoch` flag is set to
something other than None).
Note that in conjunction with `initial_epoch`,
`epochs` is to be understood as "final epoch".
The program is not trained for a number of iterations
given by `epochs`, but merely until the epoch
of index `epochs` is reached.
verbose (int): `"auto"`, 0, 1, or 2. Verbosity mode.
0 = silent, 1 = progress bar, 2 = one line per epoch.
"auto" becomes 1 for most cases.
Note that the progress bar is not
particularly useful when logged to a file,
so `verbose=2` is recommended when not running interactively
(e.g., in a production environment). Defaults to `"auto"`.
callbacks (list): List of `synalinks.callbacks.Callback` instances.
List of callbacks to apply during training.
See `synalinks.callbacks`. Note
`synalinks.callbacks.ProgbarLogger` and
`synalinks.callbacks.History` callbacks are created
automatically and need not be passed to `program.fit()`.
`synalinks.callbacks.ProgbarLogger` is created
or not based on the `verbose` argument in `program.fit()`.
validation_split (float): Float between 0 and 1.
Fraction of the training data to be used as validation data.
The program will set apart this fraction of the training data,
will not train on it, and will evaluate the reward and any program
metrics on this data at the end of each epoch. The validation
data is selected from the last samples in the `x` and `y` data
provided, before shuffling.
This argument is only supported when `x` and `y` are made of
data_models.
If both `validation_data` and `validation_split` are provided,
`validation_data` will override `validation_split`.
validation_data (tuple | iterator): Data on which to evaluate
the reward and any program metrics at the end of each epoch.
The program will not be trained on this data.
`validation_data` will override `validation_split`.
It can be:
- A tuple `(x_val, y_val)` of `DataModel`s lists.
shuffle (bool): Whether to shuffle the training data before each
epoch. This argument is ignored when `x` is a Python generator function.
initial_epoch (int): Integer.
Epoch at which to start training
(useful for resuming a previous training run).
steps_per_epoch (int): Integer or `None`.
Total number of steps (batches of samples) before declaring one
epoch finished and starting the next epoch. When training with
input data_models arrays, the default `None` means that the
value used is the number of samples in your dataset divided by
the batch size, or 1 if that cannot be determined.
If `x` is a Python generator function, the
epoch will run until the input dataset is exhausted. When
passing an infinitely repeating dataset, you must specify the
`steps_per_epoch` argument, otherwise the training will run
indefinitely.
validation_steps (int): Integer or `None`.
Only relevant if `validation_data` is provided.
Total number of steps (batches of samples) to draw before
stopping when performing validation at the end of every epoch.
If `validation_steps` is `None`, validation will run until the
`validation_data` dataset is exhausted. In the case of an
infinitely repeating dataset, it will run indefinitely. If
`validation_steps` is specified and only part of the dataset
is consumed, the evaluation will start from the beginning of the
dataset at each epoch. This ensures that the same validation
samples are used every time.
validation_batch_size (int): Integer or `None`.
Number of samples per validation batch.
If unspecified, will default to `batch_size`.
Do not specify the `validation_batch_size` if your data is a
`synalinks.utils.PyDataset`, `tf.data.Dataset`,
`torch.utils.data.DataLoader` or Python generator function
since they generate batches.
validation_freq (int): Only relevant if validation data is provided.
Specifies how many training epochs to run
before a new validation run is performed,
e.g. `validation_freq=2` runs validation every 2 epochs.
Returns:
(History): A `History` object. Its `History.history` attribute is
a record of training reward values and metrics values
at successive epochs, as well as validation reward values
and validation metrics values (if applicable).
"""
self._assert_compile_called("fit")
self._eval_epoch_iterator = None
val_y, val_y = None, None
if validation_split and validation_data is None:
# Create the validation data using the training data. Only supported
# for numpy arrays.
(x, y), validation_data = array_slicing.train_validation_split(
(x, y), validation_split=validation_split
)
if validation_data is not None:
(val_x, val_y) = data_adapter_utils.unpack_x_y(validation_data)
# Create an iterator that yields batches of input/target data.
epoch_iterator = EpochIterator(
x=x,
y=y,
batch_size=batch_size,
steps_per_epoch=steps_per_epoch,
shuffle=False,
steps_per_execution=self.steps_per_execution,
)
if not all(module.built for module in self._flatten_modules()):
# Build the model on one batch of data.
for _, data in epoch_iterator:
data_batch = data[0]
self._auto_build(
iterator=epoch_iterator,
data_batch=data_batch,
)
break
epoch_iterator.reset()
# Container that configures and calls callbacks.
if not isinstance(callbacks, callbacks_module.CallbackList):
callbacks = callbacks_module.CallbackList(
callbacks,
add_history=True,
add_progbar=verbose != 0,
verbose=verbose,
epochs=epochs,
steps=steps_per_epoch,
program=self,
)
self.stop_training = False
callbacks.on_train_begin()
training_logs = None
logs = {}
initial_epoch = self._initial_epoch or initial_epoch
if self.trainable_variables and isinstance(
self.optimizer, optimizers_module.Optimizer
):
await self.optimizer.on_train_begin(
self.trainable_variables,
)
for epoch in range(initial_epoch, epochs):
self.reset_metrics()
if self.trainable_variables and isinstance(
self.optimizer, optimizers_module.Optimizer
):
await self.optimizer.on_epoch_begin(
epoch,
self.trainable_variables,
)
callbacks.on_epoch_begin(epoch)
with epoch_iterator.catch_stop_iteration():
for step, iterator in epoch_iterator:
data = iterator[0]
x_batch, y_batch = data_adapter_utils.unpack_x_y(data)
if self.trainable_variables and isinstance(
self.optimizer, optimizers_module.Optimizer
):
await self.optimizer.on_batch_begin(
step,
epoch,
self.trainable_variables,
)
callbacks.on_train_batch_begin(step)
logs = await self.train_on_batch(
step=step,
x=x_batch,
y=y_batch,
val_x=val_x,
val_y=val_y,
return_dict=True,
)
val_logs = await self.evaluate(
x=val_x,
y=val_y,
batch_size=validation_batch_size or batch_size,
steps=validation_steps,
callbacks=callbacks,
_use_cached_eval_dataset=False,
)
if self.trainable_variables and isinstance(
self.optimizer, optimizers_module.Optimizer
):
await self.optimizer.on_batch_end(
step,
epoch,
self.trainable_variables,
)
callbacks.on_train_batch_end(step, logs)
if self.stop_training:
break
# Override with model metrics instead of last step logs if needed.
epoch_logs = dict(self._get_metrics_result_or_logs(logs))
# Run validation.
if validation_data is not None and self._should_eval(epoch, validation_freq):
# Create EpochIterator for evaluation and cache it.
if getattr(self, "_eval_epoch_iterator", None) is None:
self._eval_epoch_iterator = EpochIterator(
x=val_x,
y=val_y,
batch_size=validation_batch_size or batch_size,
steps_per_execution=self.steps_per_execution,
steps_per_epoch=validation_steps,
shuffle=False,
)
val_logs = await self.evaluate(
x=val_x,
y=val_y,
batch_size=validation_batch_size or batch_size,
steps=validation_steps,
callbacks=callbacks,
_use_cached_eval_dataset=True,
)
val_logs = {"val_" + name: val for name, val in val_logs.items()}
epoch_logs.update(val_logs)
if self.trainable_variables and isinstance(
self.optimizer, optimizers_module.Optimizer
):
await self.optimizer.on_epoch_end(
epoch,
self.trainable_variables,
)
callbacks.on_epoch_end(epoch, epoch_logs)
training_logs = epoch_logs
if self.stop_training:
break
# If _eval_epoch_iterator exists, delete it after all epochs are done.
if getattr(self, "_eval_epoch_iterator", None) is not None:
del self._eval_epoch_iterator
if self.trainable_variables and isinstance(
self.optimizer, optimizers_module.Optimizer
):
await self.optimizer.on_train_end(self.trainable_variables)
callbacks.on_train_end(logs=training_logs)
return self.history
```
```
## `get_compile_config()`
Returns a serialized config with information for compiling the program.
This method returns a config dictionary containing all the information
(optimizer, reward, metrics, etc.) with which the program was compiled.
Returns:
| Type | Description |
| --- | --- |
| `dict` | A dict containing information for compiling the program. |
Source code in `synalinks/src/trainers/trainer.py`
```
def get_compile_config(self): """Returns a serialized config with information for compiling the program.
```
This method returns a config dictionary containing all the information
(optimizer, reward, metrics, etc.) with which the program was compiled.
Returns:
(dict): A dict containing information for compiling the program.
"""
if self.compiled and hasattr(self, "_compile_config"):
return self._compile_config.serialize()
```
```
## `get_metrics_result()`
Returns the program's metrics values as a dict.
If any of the metric result is a dict (containing multiple metrics),
each of them gets added to the top level returned dict of this method.
Returns:
| Type | Description |
| --- | --- |
| `dict` | A dict containing values of the metrics listed in self.metrics. Example: {'reward': 0.2, 'accuracy': 0.7}. |
Source code in `synalinks/src/trainers/trainer.py`
```
def get_metrics_result(self): """Returns the program's metrics values as a dict.
```
If any of the metric result is a dict (containing multiple metrics),
each of them gets added to the top level returned dict of this method.
Returns:
(dict): A `dict` containing values of the metrics listed in `self.metrics`.
Example: `{'reward': 0.2, 'accuracy': 0.7}`.
"""
return_metrics = {}
for metric in self.metrics:
result = metric.result()
if isinstance(result, dict):
return_metrics.update(result)
else:
return_metrics[metric.name] = result
return python_utils.pythonify_logs(return_metrics)
```
```
## `predict(x, batch_size=None, verbose='auto', steps=None, callbacks=None)`
Generates output predictions for the input samples.
Computation is done in batches. This method is designed for batch
processing of large numbers of inputs. It is not intended for use inside
of loops that iterate over your data and process small numbers of inputs
at a time.
For small numbers of inputs that fit in one batch,
directly use `__call__()` for faster execution, e.g.,
`program(x)`, or `program(x, training=False)` if you have modules
that behave differently during inference.
Parameters:
| Name | Type | Description | Default |
| --- | --- | --- | --- |
| `x` | `ndarray | generator` | Input data. It can be: - A NumPy array (or array-like), or a list of DataModel arrays (in case the model has multiple inputs). - A list of dict mapping input names to the corresponding DataModels, if the program has named inputs. - A Python generator function yielding (inputs, targets). | *required* |
| `batch_size` | `int` | Integer or None. Number of samples per batch of computation. If unspecified, batch_size will default to 32. Do not specify the batch_size if your input data x is a synalinks.utils.PyDataset, tf.data.Dataset, torch.utils.data.DataLoader or Python generator function since they generate batches. | `None` |
| `verbose` | `int` | "auto", 0, 1, or 2. Verbosity mode. 0 = silent, 1 = progress bar, 2 = single line. "auto" becomes 1 for most cases. Note that the progress bar is not particularly useful when logged to a file, so verbose=2 is recommended when not running interactively (e.g. in a production environment). Defaults to "auto". | `'auto'` |
| `steps` | `int` | Total number of steps (batches of samples) to draw before declaring the prediction round finished. If steps is None, it will run until x is exhausted. In the case of an infinitely repeating dataset, it will run indefinitely. | `None` |
| `callbacks` | `list` | List of synalinks.callbacks.Callback instances. List of callbacks to apply during prediction. | `None` |
Returns:
| Type | Description |
| --- | --- |
| `list` | JsonDataModel array(s) of predictions. If the pipeline failed, a None is added to the predictions. |
Source code in `synalinks/src/trainers/trainer.py`
```
async def predict( self, x, batch_size=None, verbose="auto", steps=None, callbacks=None ): """Generates output predictions for the input samples.
```
Computation is done in batches. This method is designed for batch
processing of large numbers of inputs. It is not intended for use inside
of loops that iterate over your data and process small numbers of inputs
at a time.
For small numbers of inputs that fit in one batch,
directly use `__call__()` for faster execution, e.g.,
`program(x)`, or `program(x, training=False)` if you have modules
that behave differently during inference.
Args:
x (np.ndarray | generator): Input data. It can be:
- A NumPy array (or array-like), or a list of `DataModel` arrays
(in case the model has multiple inputs).
- A list of dict mapping input names to the corresponding `DataModel`s,
if the program has named inputs.
- A Python generator function yielding `(inputs, targets)`.
batch_size (int): Integer or `None`.
Number of samples per batch of computation.
If unspecified, `batch_size` will default to 32.
Do not specify the `batch_size` if your input data `x` is a
`synalinks.utils.PyDataset`, `tf.data.Dataset`,
`torch.utils.data.DataLoader` or Python generator function
since they generate batches.
verbose (int): `"auto"`, 0, 1, or 2. Verbosity mode.
0 = silent, 1 = progress bar, 2 = single line.
`"auto"` becomes 1 for most cases. Note that the progress bar
is not particularly useful when logged to a file,
so `verbose=2` is recommended when not running interactively
(e.g. in a production environment). Defaults to `"auto"`.
steps (int): Total number of steps (batches of samples) to draw before
declaring the prediction round finished. If `steps` is `None`,
it will run until `x` is exhausted. In the case of an infinitely
repeating dataset, it will run indefinitely.
callbacks (list): List of `synalinks.callbacks.Callback` instances.
List of callbacks to apply during prediction.
Returns:
(list): `JsonDataModel` array(s) of predictions.
If the pipeline failed, a None is added to the predictions.
"""
# Create an iterator that yields batches of input data.
epoch_iterator = EpochIterator(
x=x,
batch_size=batch_size,
steps_per_epoch=steps,
shuffle=False,
steps_per_execution=self.steps_per_execution,
)
# Container that configures and calls callbacks.
if not isinstance(callbacks, callbacks_module.CallbackList):
callbacks = callbacks_module.CallbackList(
callbacks,
add_history=True,
add_progbar=verbose != 0,
verbose=verbose,
epochs=1,
steps=epoch_iterator.num_batches,
model=self,
)
self.stop_predicting = False
callbacks.on_test_begin()
outputs = []
for step, iterator in epoch_iterator:
callbacks.on_predict_batch_begin(step)
data = iterator[0]
x_batch, _ = data_adapter_utils.unpack_x_y(data)
batch_outputs = await self.predict_on_batch(x_batch)
outputs.extend(batch_outputs)
callbacks.on_predict_batch_end(step, {"outputs": batch_outputs})
if self.stop_predicting:
break
callbacks.on_predict_end()
return np.array(outputs, dtype="object")
```
```
## `predict_on_batch(x, training=False)`
Returns predictions for a single batch of samples.
Parameters:
| Name | Type | Description | Default |
| --- | --- | --- | --- |
| `x` | `ndarray` | Input data. Must be array-like. | *required* |
| `training` | `bool` | Boolean. True if training. | `False` |
Returns:
| Type | Description |
| --- | --- |
| `list` | list(s) of JsonDataModel predictions. |
Source code in `synalinks/src/trainers/trainer.py`
```
async def predict_on_batch(self, x, training=False): """Returns predictions for a single batch of samples.
```
Args:
x (np.ndarray): Input data. Must be array-like.
training (bool): Boolean. True if training.
Returns:
(list): list(s) of JsonDataModel predictions.
"""
tasks = []
for inputs in x:
tasks.append(self(inputs, training=training))
y_pred = await asyncio.gather(*tasks)
return y_pred
```
```
## `test_on_batch(x, y=None, return_dict=False)`
Test the program on a single batch of samples.
Parameters:
| Name | Type | Description | Default |
| --- | --- | --- | --- |
| `x` | `ndarray` | Input data. Must be array-like. | *required* |
| `y` | `ndarray` | Target data. Must be array-like. | `None` |
| `return_dict` | `bool` | If True, reward and metric results are returned as a dict, with each key being the name of the metric. If False, they are returned as a list. | `False` |
Returns:
| Type | Description |
| --- | --- |
| `float | list | dict` | A scalar reward value (when no metrics and return_dict=False), a list of reward and metric values (if there are metrics and return_dict=False), or a dict of metric and reward values (if return_dict=True). |
Source code in `synalinks/src/trainers/trainer.py`
```
async def test_on_batch( self, x, y=None, return_dict=False, ): """Test the program on a single batch of samples.
```
Args:
x (np.ndarray): Input data. Must be array-like.
y (np.ndarray): Target data. Must be array-like.
return_dict (bool): If `True`, reward and metric results are returned as a
dict, with each key being the name of the metric. If `False`,
they are returned as a list.
Returns:
(float | list | dict): A scalar reward value
(when no metrics and `return_dict=False`), a list of reward
and metric values (if there are metrics and `return_dict=False`),
or a dict of metric and reward values (if `return_dict=True`).
"""
y_pred = await self.predict_on_batch(x)
reward = await self.compute_reward(
x=x,
y=y,
y_pred=y_pred,
training=False,
)
await self._reward_tracker.update_state(reward)
metrics = await self.compute_metrics(x, y, y_pred)
if return_dict:
return metrics
return self._flatten_metrics_in_order(metrics)
```
```
## `train_on_batch(step, x, y=None, val_x=None, val_y=None, return_dict=False)`
Runs a single optimization step on a single batch of data.
Parameters:
| Name | Type | Description | Default |
| --- | --- | --- | --- |
| `step` | `int` | The training step. | *required* |
| `x` | `ndarray` | Input data. Must be array-like. | *required* |
| `y` | `ndarray` | Target data. Must be array-like. | `None` |
| `val_x` | `ndarray` | Input validation data. Must be array-like. | `None` |
| `val_y` | `ndarray` | Target validation data. Must be array-like. | `None` |
| `return_dict` | `bool` | If True, reward and metric results are returned as a dict, with each key being the name of the metric. If False, they are returned as a list. | `False` |
Returns:
| Type | Description |
| --- | --- |
| `float | list | dict` | A scalar reward value (when no metrics and return_dict=False), a list of reward and metric values (if there are metrics and return_dict=False), or a dict of metric and reward values (if return_dict=True). |
Source code in `synalinks/src/trainers/trainer.py`
```
async def train_on_batch( self, step, x, y=None, val_x=None, val_y=None, return_dict=False, ): """Runs a single optimization step on a single batch of data.
```
Args:
step (int): The training step.
x (np.ndarray): Input data. Must be array-like.
y (np.ndarray): Target data. Must be array-like.
val_x (np.ndarray): Input validation data. Must be array-like.
val_y (np.ndarray): Target validation data. Must be array-like.
return_dict (bool): If `True`, reward and metric results are returned as a
dict, with each key being the name of the metric. If `False`,
they are returned as a list.
Returns:
(float | list | dict): A scalar reward value
(when no metrics and `return_dict=False`), a list of reward
and metric values (if there are metrics and `return_dict=False`),
or a dict of metric and reward values (if `return_dict=True`).
"""
if self.trainable_variables and isinstance(
self.optimizer, optimizers_module.Optimizer
):
metrics = await self.optimizer.optimize(
step,
self.trainable_variables,
x=x,
y=y,
val_x=val_x,
val_y=val_y,
)
else:
warnings.warn("The program does not have any trainable variables.")
y_pred = await self.predict_on_batch(val_x)
reward = await self.compute_reward(
x=val_x,
y=val_y,
y_pred=y_pred,
)
await self._reward_tracker.update_state(reward)
metrics = await self.compute_metrics(val_x, val_y, y_pred)
if return_dict:
return metrics
return self._flatten_metrics_in_order(metrics)
```
```
```
# The Program class
Bases: `Trainer`, `Module`
A program grouping modules into an object with training/inference features.
There is four ways to instantiate a `Program`:
### With the "Functional API"
You start from `Input`, you chain modules calls to specify the program's structure, and finally, you create your program from inputs and outputs:
```
import synalinks
import asyncio
class Query(synalinks.DataModel):
query: str = synalinks.Field(
description="The user query",
)
class AnswerWithThinking(synalinks.DataModel):
thinking: str = synalinks.Field(
description="Your step by step thinking process",
)
answer: float = synalinks.Field(
description="The correct numerical answer",
)
async def main():
language_model = synalinks.LanguageModel(
model="ollama/mistral",
)
x0 = synalinks.Input(data_model=Query)
x1 = await synalinks.Generator(
data_model=AnswerWithThinking,
language_model=language_model,
)(x0)
program = synalinks.Program(
inputs=x0,
outputs=x1,
name="chain_of_thought",
description="Useful to answer in a step by step manner.",
)
if __name__ == "__main__":
asyncio.run(main())
```
Note: Only dicts, lists, and tuples of input data models are supported. Nested inputs are not supported (e.g. lists of list or dicts of dict).
### By subclassing the `Program` class
In that case, you should define your modules in `__init__()` and you should implement the program's structure in `call()` .
```
import synalinks
import asyncio
class Query(synalinks.DataModel):
query: str = synalinks.Field(
description="The user query",
)
class AnswerWithThinking(synalinks.DataModel):
thinking: str = synalinks.Field(
description="Your step by step thinking process",
)
answer: float = synalinks.Field(
description="The correct numerical answer",
)
class ChainOfThought(synalinks.Program):
"""Useful to answer in a step by step manner.
The first line of the docstring is provided as description
for the program if not provided in the `super().__init__()`.
In a similar way the name is automatically infered based on
the class name if not provided.
"""
def __init__(
self,
language_model=None,
name=None,
description=None,
trainable=True,
):
super().__init__(
name=name,
description=description,
trainable=trainable,
)
self.answer = synalinks.Generator(
data_model=AnswerWithThinking,
language_model=language_model,
name="generator_"+self.name,
)
async def call(self, inputs, training=False):
if not inputs:
return None
x = await self.answer(inputs, training=training)
return x
def get_config(self):
config = {
"name": self.name,
"description": self.description,
"trainable": self.trainable,
}
language_model_config = {
"language_model": synalinks.saving.serialize_synalinks_object(
self.language_model
)
}
return {**config, **language_model_config}
@classmethod
def from_config(cls, config):
language_model = synalinks.saving.deserialize_synalinks_object(
config.pop("language_model")
)
return cls(language_model=language_model, **config)
async def main():
language_model = synalinks.LanguageModel(
model="ollama/mistral",
)
program = ChainOfThought(
language_model=language_model,
)
```
If you subclass `Program`, you can optionally have a `training` argument (boolean) in `call()`, which you can use to specify a different behavior in training and inference.
Once the program is created, you can config the program with rewards and metrics with `program.compile()`, train the program with `program.fit()`, or use the program to do prediction with `program.predict()` or `program()`.
To understand the difference between `program.predict()` or `program()`, read the [FAQ](https://synalinks.github.io/synalinks/FAQ/#whats-the-difference-between-program-methods-predict-and-__call__).
### Mixing the subclassing and the `Functional` API
This way of programming is recommended to encapsulate your application while providing an easy to use setup. It is the recommended way for most users as it avoid making your program/agents from scratch. In that case, you should implement only the `__init__()` and `build()` methods.
```
import synalinks
import asyncio
class Query(synalinks.DataModel):
query: str = synalinks.Field(
description="The user query",
)
class AnswerWithThinking(synalinks.DataModel):
thinking: str = synalinks.Field(
description="Your step by step thinking process",
)
answer: float = synalinks.Field(
description="The correct numerical answer",
)
async def main():
class ChainOfThought(synalinks.Program):
"""Useful to answer in a step by step manner."""
def __init__(
self,
language_model=None,
name=None,
description=None,
trainable=True,
):
super().__init__(
name=name,
description=description,
trainable=trainable,
)
self.language_model = language_model
async def build(self, inputs):
outputs = await synalinks.Generator(
data_model=AnswerWithThinking,
language_model=self.language_model,
)(inputs)
# Create your program using the functional API
super().__init__(
inputs=inputs,
outputs=outputs,
name=self.name,
description=self.description,
trainable=self.trainable,
)
language_model = synalinks.LanguageModel(
model="ollama/mistral",
)
program = ChainOfThought(
language_model=language_model,
)
if __name__ == "__main__":
asyncio.run(main())
```
This allows you to not have to implement the `call()` and serialization methods (`get_config()` and `from_config()`). The program will be built for any inputs the first time called.
### With the `Sequential` class
In addition, `synalinks.Sequential` is a special case of program where the program is purely a stack of single-input, single-output modules.
```
import synalinks
import asyncio
class Query(synalinks.DataModel):
query: str = synalinks.Field(
description="The user query",
)
class AnswerWithThinking(synalinks.DataModel):
thinking: str = synalinks.Field(
description="Your step by step thinking process",
)
answer: float = synalinks.Field(
description="The correct numerical answer",
)
async def main():
language_model = synalinks.LanguageModel(model="ollama/mistral")
program = synalinks.Sequential(
[
synalinks.Input(
data_model=Query,
),
synalinks.Generator(
data_model=AnswerWithThinking,
language_model=language_model,
),
],
name="chain_of_thought",
description="Useful to answer in a step by step manner.",
)
if __name__ == "__main__":
asyncio.run(main())
```
Source code in `synalinks/src/programs/program.py`
````
@synalinks_export(["synalinks.Program", "synalinks.programs.Program"])
class Program(Trainer, Module):
"""A program grouping modules into an object with training/inference features.
There is four ways to instantiate a `Program`:
## With the "Functional API"
You start from `Input`, you chain modules calls to specify the program's structure,
and finally, you create your program from inputs and outputs:
```python
import synalinks
import asyncio
class Query(synalinks.DataModel):
query: str = synalinks.Field(
description="The user query",
)
class AnswerWithThinking(synalinks.DataModel):
thinking: str = synalinks.Field(
description="Your step by step thinking process",
)
answer: float = synalinks.Field(
description="The correct numerical answer",
)
async def main():
language_model = synalinks.LanguageModel(
model="ollama/mistral",
)
x0 = synalinks.Input(data_model=Query)
x1 = await synalinks.Generator(
data_model=AnswerWithThinking,
language_model=language_model,
)(x0)
program = synalinks.Program(
inputs=x0,
outputs=x1,
name="chain_of_thought",
description="Useful to answer in a step by step manner.",
)
if __name__ == "__main__":
asyncio.run(main())
```
Note: Only dicts, lists, and tuples of input data models are supported. Nested
inputs are not supported (e.g. lists of list or dicts of dict).
## By subclassing the `Program` class
In that case, you should define your
modules in `__init__()` and you should implement the program's structure
in `call()` .
```python
import synalinks
import asyncio
class Query(synalinks.DataModel):
query: str = synalinks.Field(
description="The user query",
)
class AnswerWithThinking(synalinks.DataModel):
thinking: str = synalinks.Field(
description="Your step by step thinking process",
)
answer: float = synalinks.Field(
description="The correct numerical answer",
)
class ChainOfThought(synalinks.Program):
\"\"\"Useful to answer in a step by step manner.
The first line of the docstring is provided as description
for the program if not provided in the `super().__init__()`.
In a similar way the name is automatically infered based on
the class name if not provided.
\"\"\"
def __init__(
self,
language_model=None,
name=None,
description=None,
trainable=True,
):
super().__init__(
name=name,
description=description,
trainable=trainable,
)
self.answer = synalinks.Generator(
data_model=AnswerWithThinking,
language_model=language_model,
name="generator_"+self.name,
)
async def call(self, inputs, training=False):
if not inputs:
return None
x = await self.answer(inputs, training=training)
return x
def get_config(self):
config = {
"name": self.name,
"description": self.description,
"trainable": self.trainable,
}
language_model_config = \
{
"language_model": synalinks.saving.serialize_synalinks_object(
self.language_model
)
}
return {**config, **language_model_config}
@classmethod
def from_config(cls, config):
language_model = synalinks.saving.deserialize_synalinks_object(
config.pop("language_model")
)
return cls(language_model=language_model, **config)
async def main():
language_model = synalinks.LanguageModel(
model="ollama/mistral",
)
program = ChainOfThought(
language_model=language_model,
)
```
If you subclass `Program`, you can optionally have
a `training` argument (boolean) in `call()`, which you can use to specify
a different behavior in training and inference.
Once the program is created, you can config the program with rewards and metrics
with `program.compile()`, train the program with `program.fit()`, or use the program
to do prediction with `program.predict()` or `program()`.
To understand the difference between `program.predict()` or `program()`, read the
[FAQ](https://synalinks.github.io/synalinks/FAQ/#whats-the-difference-between-program-methods-predict-and-__call__).
## Mixing the subclassing and the `Functional` API
This way of programming is recommended to encapsulate your application while
providing an easy to use setup. It is the recommended way for most users as
it avoid making your program/agents from scratch.
In that case, you should implement only the `__init__()` and `build()` methods.
```python
import synalinks
import asyncio
class Query(synalinks.DataModel):
query: str = synalinks.Field(
description="The user query",
)
class AnswerWithThinking(synalinks.DataModel):
thinking: str = synalinks.Field(
description="Your step by step thinking process",
)
answer: float = synalinks.Field(
description="The correct numerical answer",
)
async def main():
class ChainOfThought(synalinks.Program):
\"\"\"Useful to answer in a step by step manner.\"\"\"
def __init__(
self,
language_model=None,
name=None,
description=None,
trainable=True,
):
super().__init__(
name=name,
description=description,
trainable=trainable,
)
self.language_model = language_model
async def build(self, inputs):
outputs = await synalinks.Generator(
data_model=AnswerWithThinking,
language_model=self.language_model,
)(inputs)
# Create your program using the functional API
super().__init__(
inputs=inputs,
outputs=outputs,
name=self.name,
description=self.description,
trainable=self.trainable,
)
language_model = synalinks.LanguageModel(
model="ollama/mistral",
)
program = ChainOfThought(
language_model=language_model,
)
if __name__ == "__main__":
asyncio.run(main())
```
This allows you to not have to implement the `call()` and serialization methods
(`get_config()` and `from_config()`). The program will be built for any inputs
the first time called.
## With the `Sequential` class
In addition, `synalinks.Sequential` is a special case of program where
the program is purely a stack of single-input, single-output modules.
```python
import synalinks
import asyncio
class Query(synalinks.DataModel):
query: str = synalinks.Field(
description="The user query",
)
class AnswerWithThinking(synalinks.DataModel):
thinking: str = synalinks.Field(
description="Your step by step thinking process",
)
answer: float = synalinks.Field(
description="The correct numerical answer",
)
async def main():
language_model = synalinks.LanguageModel(model="ollama/mistral")
program = synalinks.Sequential(
[
synalinks.Input(
data_model=Query,
),
synalinks.Generator(
data_model=AnswerWithThinking,
language_model=language_model,
),
],
name="chain_of_thought",
description="Useful to answer in a step by step manner.",
)
if __name__ == "__main__":
asyncio.run(main())
```
"""
def __new__(cls, *args, **kwargs):
# Signature detection for usage of `Program` as a `Functional`
if functional_init_arguments(args, kwargs) and cls == Program:
from synalinks.src.programs.functional import Functional
return Functional.__new__(Functional, *args, **kwargs)
return typing.cast(cls, super().__new__(cls))
def __init__(self, *args, **kwargs):
Trainer.__init__(self)
from synalinks.src.programs import functional
# Signature detection for usage of a `Program` subclass
# as a `Functional` subclass
if functional_init_arguments(args, kwargs):
inject_functional_program_class(self.__class__)
functional.Functional.__init__(self, *args, **kwargs)
else:
Module.__init__(self, *args, **kwargs)
async def call(self, *args, **kwargs):
raise NotImplementedError(
f"Program {self.__class__.__name__} does not have a `call()` "
"method implemented."
)
@property
def modules(self):
return list(self._flatten_modules(include_self=False, recursive=False))
@modules.setter
def modules(self, _):
raise AttributeError(
"`Program.modules` attribute is reserved and should not be used. "
"Please use another name."
)
def get_module(self, name=None, index=None):
"""Retrieves a module based on either its name (unique) or index.
If `name` and `index` are both provided, `index` will take precedence.
Indices are based on order of horizontal graph traversal (bottom-up).
Args:
name (str): String, name of module.
index (int): Integer, index of module.
Returns:
(Module): A module instance.
"""
if index is not None and name is not None:
raise ValueError(
"Provide only a module name or a module index. Received: "
f"index={index}, name={name}."
)
if index is not None:
if len(self.modules) <= index:
raise ValueError(
f"Was asked to retrieve module at index {index}"
f" but program only has {len(self.modules)}"
" modules."
)
else:
return self.modules[index]
if name is not None:
for module in self.modules:
if module.name == name:
return module
raise ValueError(
f"No such module: {name}. Existing modules are: "
f"{list(module.name for module in self.modules)}."
)
raise ValueError("Provide either a module name or module index at `get_module`.")
def summary(
self,
line_length=None,
positions=None,
print_fn=None,
expand_nested=False,
show_trainable=False,
module_range=None,
):
"""Prints a string summary of the program.
Args:
line_length (int): Total length of printed lines
(e.g. set this to adapt the display to different
terminal window sizes).
positions (list): Relative or absolute positions of log elements
in each line. If not provided, becomes
`[0.3, 0.6, 0.70, 1.]`. Defaults to `None`.
print_fn (Callable): Print function to use. By default, prints to `stdout`.
If `stdout` doesn't work in your environment, change to `print`.
It will be called on each line of the summary.
You can set it to a custom function
in order to capture the string summary.
expand_nested (bool): Whether to expand the nested models.
Defaults to `False`.
show_trainable (bool): Whether to show if a module is trainable.
Defaults to `False`.
module_range (list | tuple): a list or tuple of 2 strings,
which is the starting module name and ending module name
(both inclusive) indicating the range of modules to be printed
in summary. It also accepts regex patterns instead of exact
names. In this case, the start predicate will be
the first element that matches `module_range[0]`
and the end predicate will be the last element
that matches `module_range[1]`.
By default `None` considers all modules of the model.
Raises:
ValueError: if `summary()` is called before the model is built.
"""
summary_utils.print_summary(
self,
line_length=line_length,
positions=positions,
print_fn=print_fn,
expand_nested=expand_nested,
show_trainable=show_trainable,
module_range=module_range,
)
def save(self, filepath, overwrite=True, **kwargs):
"""Saves a program as a `.json` file.
Example:
```python
import synalinks
class Query(synalinks.DataModel):
query: str
class AnswerWithRationale(synalinks.DataModel):
rationale: str
answer: str
language_model = LanguageModel("ollama/mistral")
program = synalinks.Sequential(
[
synalinks.Input(data_model=Query),
synalinks.Generator(
data_model=AnswerWithRationale,
language_model=language_model,
),
],
)
program.save("program.json")
loaded_program = synalinks.programs.program_from_json("program.json")
```
The saved `.json` file contains:
- The program's configuration (architecture)
- The program's variables
- The program's optimizer's state (if any)
- The program's reward's state (if any)
Thus programs can be reinstantiated in the exact same state.
Args:
filepath (str | os.PathLike): `str` or `os.PathLike` object.
The path where to save the model. Must end in `.json`.
overwrite (bool): Whether we should overwrite any existing program at
the target location, or instead ask the user via
an interactive prompt. Default to `True`.
"""
from synalinks.src.saving import serialization_lib
filepath = file_utils.path_to_string(filepath)
if not filepath.endswith(".json"):
raise ValueError(
f"The filepath should ends with '.json', received filepath={filepath}"
)
program_config = serialization_lib.serialize_synalinks_object(self)
variables_config = self.get_state_tree()
program_config.update({"variables": variables_config})
program_config_string = json.dumps(program_config, indent=2)
if file_utils.exists(filepath) and not overwrite:
io_utils.ask_to_proceed_with_overwrite(filepath)
with open(filepath, "w") as f:
f.write(program_config_string)
async def build_from_config(self, config):
if not config:
return
status = False
if "input_schema" in config:
# Case: all inputs are in the first arg (possibly nested).
if utils.is_default(self.build):
status = self._build_by_run_for_single_pos_arg(config["input_schema"])
else:
try:
await self.build(config["input_schema"])
status = True
except:
pass
self._build_schemas_dict = config
elif "schemas_dict" in config:
# Case: inputs were recorded as multiple keyword arguments.
if utils.is_default(self.build):
status = self._build_by_run_for_kwargs(config["schemas_dict"])
else:
try:
await self.build(**config["schemas_dict"])
status = True
except:
pass
self._build_schemas_dict = config["schemas_dict"]
if not status:
warnings.warn(
f"Program '{self.name}' had a build config, but the program "
"cannot be built automatically in "
"`build_from_config(config)`. "
"You should implement "
"`def build_from_config(self, config)`, "
"and you might also want to implement the method "
" that generates the config at saving time, "
"`def get_build_config(self)`. "
"The method `build_from_config()` is meant to "
"create the state of the model (i.e. its variables) "
"upon deserialization.",
stacklevel=2,
)
def to_json(self, **kwargs):
"""Returns a JSON string containing the network configuration.
```python
json_string = program.to_json()
```
To load a network from a JSON save file, use
`synalinks.programs.program_from_json(json_string, custom_objects={...})`.
Args:
**kwargs (keyword arguments): Additional keyword arguments to be passed to
`json.dumps()`.
Returns:
(str): A JSON string.
"""
from synalinks.src.saving import serialization_lib
program_config = serialization_lib.serialize_synalinks_object(self)
return json.dumps(program_config, **kwargs)
@classmethod
def from_config(cls, config, custom_objects=None):
from synalinks.src.programs.functional import Functional
functional_config_keys = [
"name",
"modules",
"input_modules",
"output_modules",
]
is_functional_config = all(key in config for key in functional_config_keys)
argspec = inspect.getfullargspec(cls.__init__)
functional_init_args = inspect.getfullargspec(Functional.__init__).args[1:]
revivable_as_functional = (
cls in {Functional, Program}
or argspec.args[1:] == functional_init_args
or (argspec.varargs == "args" and argspec.varkw == "kwargs")
)
if is_functional_config and revivable_as_functional:
# Revive Functional model
# (but not Functional subclasses with a custom __init__)
from synalinks.src.programs.functional import functional_from_config
return functional_from_config(cls, config, custom_objects=custom_objects)
# Either the model has a custom __init__, or the config
# does not contain all the information necessary to
# revive a Functional model. This happens when the user creates
# subclassed models where `get_config()` is returning
# insufficient information to be considered a Functional model.
# In this case, we fall back to provide all config into the
# constructor of the class.
try:
return cls(**config)
except TypeError as e:
raise TypeError(
"Unable to revive program from config. When overriding "
"the `get_config()` method, make sure that the "
"returned config contains all items used as arguments "
f"in the constructor to {cls}, "
"which is the default behavior. "
"You can override this default behavior by defining a "
"`from_config(cls, config)` class method to specify "
"how to create an "
f"instance of {cls.__name__} from its config.\n\n"
f"Received config={config}\n\n"
f"Error encountered during deserialization: {e}"
)
def get_state_tree(self):
"""Retrieves tree-like structure of program variables.
This method allows retrieval of different program variables (trainable,
non-trainable, optimizer, and metrics). The variables are returned in a
nested dictionary format, where the keys correspond to the variable
names and the values are the nested representations of the variables.
Example:
```python
program.compile(
optimizer=synalinks.optimizers.RandomFewShot(),
reward=synalinks.rewards.ExactMatch(),
)
program.fit(x=x_train, y=y_train)
state_tree = program.get_state_tree()
```
Returns:
(dict): A dictionary containing the nested representations of the
requested variables. The keys are the variable names, and the
values are the corresponding nested dictionaries.
"""
variables = {}
variables["trainable_variables"] = self._create_nested_dict(
self.trainable_variables
)
variables["non_trainable_variables"] = self._create_nested_dict(
self.non_trainable_variables
)
if self.optimizer:
variables["optimizer_trainable_variables"] = self._create_nested_dict(
self.optimizer.trainable_variables
)
variables["optimizer_non_trainable_variables"] = self._create_nested_dict(
self.optimizer.non_trainable_variables
)
variables["metrics_variables"] = self._create_nested_dict(self.metrics_variables)
return variables
def _create_nested_dict(self, variables):
flat_dict = {}
for v in variables:
if v.path in flat_dict:
raise ValueError(
"The following variable path is found twice in the program: "
f"'{v.path}'. `get_state_tree()` can only be called when "
"all variable paths are unique. Make sure to give unique "
"names to your modules (and other objects)."
)
flat_dict[v.path] = v.get_json()
nested_dict = {}
for path, value in flat_dict.items():
parts = path.split("/")
current_dict = nested_dict
for part in parts[:-1]:
if part not in current_dict:
current_dict[part] = {}
current_dict = current_dict[part]
current_dict[parts[-1]] = value
return nested_dict
def set_state_tree(self, state_tree):
"""Assigns values to variables of the program.
This method takes a dictionary of nested variable values, which
represents the state tree of the program, and assigns them to the
corresponding variables of the program. The dictionary keys represent the
variable names (e.g., `'trainable_variables'`, `'optimizer_variables'`),
and the values are nested dictionaries containing the variable
paths and their corresponding values.
Args:
state_tree (dict): A dictionary representing the state tree of the program.
The keys are the variable names, and the values are nested
dictionaries representing the variable paths and their values.
"""
for k, v in state_tree.items():
path_value_dict = self._flatten_nested_dict(v)
if k == "trainable_variables":
self._assign_variable_values(self.trainable_variables, path_value_dict)
elif k == "non_trainable_variables":
self._assign_variable_values(
self.non_trainable_variables, path_value_dict
)
elif k == "optimizer_trainable_variables":
if self.optimizer:
self._assign_variable_values(
self.optimizer.trainable_variables, path_value_dict
)
elif k == "optimizer_non_trainable_variables":
if self.optimizer:
self._assign_variable_values(
self.optimizer.non_trainable_variables, path_value_dict
)
elif k == "metrics_variables":
self._assign_variable_values(self.metrics_variables, path_value_dict)
else:
raise ValueError(f"Unknown variable name: {k}")
def _assign_variable_values(self, variables, path_value_dict):
for full_path, value in path_value_dict.items():
path = "/".join(full_path.split("/")[:-1])
field_name = full_path.split("/")[-1]
for variable in variables:
if variable.path == path:
variable.get_json()[field_name] = value
def _flatten_nested_dict(self, nested_dict):
flat_dict = {}
def _flatten(current_dict, prefix=""):
for key, value in current_dict.items():
if isinstance(value, dict):
_flatten(value, prefix + key + "/")
else:
flat_dict[prefix + key] = value
_flatten(nested_dict)
return flat_dict
def save_variables(self, filepath, overwrite=True):
"""Saves all module variables to a `.variables.json` file.
Args:
filepath (str | pathlib.Path): `str` or `pathlib.Path` object.
Path where to save the program. Must end in `.variables.json`.
overwrite (bool): Whether we should overwrite any existing program
at the target location, or instead ask the user
via an interactive prompt.
"""
filepath = file_utils.path_to_string(filepath)
if not filepath.endswith(".variables.json"):
raise ValueError(
"The filepath should ends with '.variables.json', "
f"received filepath={filepath}"
)
config = self.get_state_tree()
config_string = json.dumps(config, indent=2)
if file_utils.exists(filepath) and not overwrite:
io_utils.ask_to_proceed_with_overwrite(filepath)
with open(filepath, "w") as f:
f.write(config_string)
def load_variables(self, filepath):
"""Load all module variables from a `.variable.json` file.
Args:
filepath (str | pathlib.Path): `str` or `pathlib.Path` object.
Path to load the program's variables from.
Must end in `.variables.json`.
"""
filepath = file_utils.path_to_string(filepath)
if not filepath.endswith(".variables.json"):
raise ValueError(
"The filepath should ends with '.variables.json', "
f"received filepath={filepath}"
)
with open(filepath, "r") as f:
state_tree_config = json.loads(f.read())
self.set_state_tree(state_tree_config)
@classmethod
def load(cls, filepath, custom_objects=None):
"""Load a program from a JSON file.
Example:
```python
import synalinks
loaded_program = synalinks.Program.load("program.json")
```
Args:
filepath (str | pathlib.Path): `str` or `pathlib.Path` object.
Path to load the program's variables from.
Must end in `.variables.json`.
custom_objects (dict): Optional dictionary mapping names
(strings) to custom classes or functions to be
considered during deserialization.
Returns:
(Program): A Synalinks program instance (uncompiled).
"""
filepath = file_utils.path_to_string(filepath)
if not filepath.endswith(".json"):
raise ValueError(
f"The filepath should ends with '.json', received filepath={filepath}"
)
with open(filepath, "r") as f:
json_config = f.read()
return program_from_json(json_config, custom_objects=custom_objects)
````
## `get_module(name=None, index=None)`
Retrieves a module based on either its name (unique) or index.
If `name` and `index` are both provided, `index` will take precedence. Indices are based on order of horizontal graph traversal (bottom-up).
Parameters:
| Name | Type | Description | Default |
| ------- | ----- | ------------------------- | ------- |
| `name` | `str` | String, name of module. | `None` |
| `index` | `int` | Integer, index of module. | `None` |
Returns:
| Type | Description |
| -------- | ------------------ |
| `Module` | A module instance. |
Source code in `synalinks/src/programs/program.py`
```
def get_module(self, name=None, index=None):
"""Retrieves a module based on either its name (unique) or index.
If `name` and `index` are both provided, `index` will take precedence.
Indices are based on order of horizontal graph traversal (bottom-up).
Args:
name (str): String, name of module.
index (int): Integer, index of module.
Returns:
(Module): A module instance.
"""
if index is not None and name is not None:
raise ValueError(
"Provide only a module name or a module index. Received: "
f"index={index}, name={name}."
)
if index is not None:
if len(self.modules) <= index:
raise ValueError(
f"Was asked to retrieve module at index {index}"
f" but program only has {len(self.modules)}"
" modules."
)
else:
return self.modules[index]
if name is not None:
for module in self.modules:
if module.name == name:
return module
raise ValueError(
f"No such module: {name}. Existing modules are: "
f"{list(module.name for module in self.modules)}."
)
raise ValueError("Provide either a module name or module index at `get_module`.")
```
## `get_state_tree()`
Retrieves tree-like structure of program variables.
This method allows retrieval of different program variables (trainable, non-trainable, optimizer, and metrics). The variables are returned in a nested dictionary format, where the keys correspond to the variable names and the values are the nested representations of the variables.
Example:
```
program.compile(
optimizer=synalinks.optimizers.RandomFewShot(),
reward=synalinks.rewards.ExactMatch(),
)
program.fit(x=x_train, y=y_train)
state_tree = program.get_state_tree()
```
Returns:
| Type | Description |
| ------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `dict` | A dictionary containing the nested representations of the requested variables. The keys are the variable names, and the values are the corresponding nested dictionaries. |
Source code in `synalinks/src/programs/program.py`
````
def get_state_tree(self):
"""Retrieves tree-like structure of program variables.
This method allows retrieval of different program variables (trainable,
non-trainable, optimizer, and metrics). The variables are returned in a
nested dictionary format, where the keys correspond to the variable
names and the values are the nested representations of the variables.
Example:
```python
program.compile(
optimizer=synalinks.optimizers.RandomFewShot(),
reward=synalinks.rewards.ExactMatch(),
)
program.fit(x=x_train, y=y_train)
state_tree = program.get_state_tree()
```
Returns:
(dict): A dictionary containing the nested representations of the
requested variables. The keys are the variable names, and the
values are the corresponding nested dictionaries.
"""
variables = {}
variables["trainable_variables"] = self._create_nested_dict(
self.trainable_variables
)
variables["non_trainable_variables"] = self._create_nested_dict(
self.non_trainable_variables
)
if self.optimizer:
variables["optimizer_trainable_variables"] = self._create_nested_dict(
self.optimizer.trainable_variables
)
variables["optimizer_non_trainable_variables"] = self._create_nested_dict(
self.optimizer.non_trainable_variables
)
variables["metrics_variables"] = self._create_nested_dict(self.metrics_variables)
return variables
````
## `load(filepath, custom_objects=None)`
Load a program from a JSON file.
Example:
```
import synalinks
loaded_program = synalinks.Program.load("program.json")
```
Parameters:
| Name | Type | Description | Default |
| ---------------- | ------ | ------------------------------------------------------------------------------------------------------------------- | --------------------------------------------------------------------------------------------------- |
| `filepath` | \`str | Path\` | str or pathlib.Path object. Path to load the program's variables from. Must end in .variables.json. |
| `custom_objects` | `dict` | Optional dictionary mapping names (strings) to custom classes or functions to be considered during deserialization. | `None` |
Returns:
| Type | Description |
| --------- | ------------------------------------------ |
| `Program` | A Synalinks program instance (uncompiled). |
Source code in `synalinks/src/programs/program.py`
````
@classmethod
def load(cls, filepath, custom_objects=None):
"""Load a program from a JSON file.
Example:
```python
import synalinks
loaded_program = synalinks.Program.load("program.json")
```
Args:
filepath (str | pathlib.Path): `str` or `pathlib.Path` object.
Path to load the program's variables from.
Must end in `.variables.json`.
custom_objects (dict): Optional dictionary mapping names
(strings) to custom classes or functions to be
considered during deserialization.
Returns:
(Program): A Synalinks program instance (uncompiled).
"""
filepath = file_utils.path_to_string(filepath)
if not filepath.endswith(".json"):
raise ValueError(
f"The filepath should ends with '.json', received filepath={filepath}"
)
with open(filepath, "r") as f:
json_config = f.read()
return program_from_json(json_config, custom_objects=custom_objects)
````
## `load_variables(filepath)`
Load all module variables from a `.variable.json` file.
Parameters:
| Name | Type | Description | Default |
| ---------- | ----- | ----------- | --------------------------------------------------------------------------------------------------- |
| `filepath` | \`str | Path\` | str or pathlib.Path object. Path to load the program's variables from. Must end in .variables.json. |
Source code in `synalinks/src/programs/program.py`
```
def load_variables(self, filepath):
"""Load all module variables from a `.variable.json` file.
Args:
filepath (str | pathlib.Path): `str` or `pathlib.Path` object.
Path to load the program's variables from.
Must end in `.variables.json`.
"""
filepath = file_utils.path_to_string(filepath)
if not filepath.endswith(".variables.json"):
raise ValueError(
"The filepath should ends with '.variables.json', "
f"received filepath={filepath}"
)
with open(filepath, "r") as f:
state_tree_config = json.loads(f.read())
self.set_state_tree(state_tree_config)
```
## `save(filepath, overwrite=True, **kwargs)`
Saves a program as a `.json` file.
Example:
```
import synalinks
class Query(synalinks.DataModel):
query: str
class AnswerWithRationale(synalinks.DataModel):
rationale: str
answer: str
language_model = LanguageModel("ollama/mistral")
program = synalinks.Sequential(
[
synalinks.Input(data_model=Query),
synalinks.Generator(
data_model=AnswerWithRationale,
language_model=language_model,
),
],
)
program.save("program.json")
loaded_program = synalinks.programs.program_from_json("program.json")
```
The saved `.json` file contains:
- The program's configuration (architecture)
- The program's variables
- The program's optimizer's state (if any)
- The program's reward's state (if any)
Thus programs can be reinstantiated in the exact same state.
Parameters:
| Name | Type | Description | Default |
| ----------- | ------ | -------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------- |
| `filepath` | \`str | PathLike\` | str or os.PathLike object. The path where to save the model. Must end in .json. |
| `overwrite` | `bool` | Whether we should overwrite any existing program at the target location, or instead ask the user via an interactive prompt. Default to True. | `True` |
Source code in `synalinks/src/programs/program.py`
````
def save(self, filepath, overwrite=True, **kwargs):
"""Saves a program as a `.json` file.
Example:
```python
import synalinks
class Query(synalinks.DataModel):
query: str
class AnswerWithRationale(synalinks.DataModel):
rationale: str
answer: str
language_model = LanguageModel("ollama/mistral")
program = synalinks.Sequential(
[
synalinks.Input(data_model=Query),
synalinks.Generator(
data_model=AnswerWithRationale,
language_model=language_model,
),
],
)
program.save("program.json")
loaded_program = synalinks.programs.program_from_json("program.json")
```
The saved `.json` file contains:
- The program's configuration (architecture)
- The program's variables
- The program's optimizer's state (if any)
- The program's reward's state (if any)
Thus programs can be reinstantiated in the exact same state.
Args:
filepath (str | os.PathLike): `str` or `os.PathLike` object.
The path where to save the model. Must end in `.json`.
overwrite (bool): Whether we should overwrite any existing program at
the target location, or instead ask the user via
an interactive prompt. Default to `True`.
"""
from synalinks.src.saving import serialization_lib
filepath = file_utils.path_to_string(filepath)
if not filepath.endswith(".json"):
raise ValueError(
f"The filepath should ends with '.json', received filepath={filepath}"
)
program_config = serialization_lib.serialize_synalinks_object(self)
variables_config = self.get_state_tree()
program_config.update({"variables": variables_config})
program_config_string = json.dumps(program_config, indent=2)
if file_utils.exists(filepath) and not overwrite:
io_utils.ask_to_proceed_with_overwrite(filepath)
with open(filepath, "w") as f:
f.write(program_config_string)
````
## `save_variables(filepath, overwrite=True)`
Saves all module variables to a `.variables.json` file.
Parameters:
| Name | Type | Description | Default |
| ----------- | ------ | --------------------------------------------------------------------------------------------------------------------------- | ---------------------------------------------------------------------------------------- |
| `filepath` | \`str | Path\` | str or pathlib.Path object. Path where to save the program. Must end in .variables.json. |
| `overwrite` | `bool` | Whether we should overwrite any existing program at the target location, or instead ask the user via an interactive prompt. | `True` |
Source code in `synalinks/src/programs/program.py`
```
def save_variables(self, filepath, overwrite=True):
"""Saves all module variables to a `.variables.json` file.
Args:
filepath (str | pathlib.Path): `str` or `pathlib.Path` object.
Path where to save the program. Must end in `.variables.json`.
overwrite (bool): Whether we should overwrite any existing program
at the target location, or instead ask the user
via an interactive prompt.
"""
filepath = file_utils.path_to_string(filepath)
if not filepath.endswith(".variables.json"):
raise ValueError(
"The filepath should ends with '.variables.json', "
f"received filepath={filepath}"
)
config = self.get_state_tree()
config_string = json.dumps(config, indent=2)
if file_utils.exists(filepath) and not overwrite:
io_utils.ask_to_proceed_with_overwrite(filepath)
with open(filepath, "w") as f:
f.write(config_string)
```
## `set_state_tree(state_tree)`
Assigns values to variables of the program.
This method takes a dictionary of nested variable values, which represents the state tree of the program, and assigns them to the corresponding variables of the program. The dictionary keys represent the variable names (e.g., `'trainable_variables'`, `'optimizer_variables'`), and the values are nested dictionaries containing the variable paths and their corresponding values.
Parameters:
| Name | Type | Description | Default |
| ------------ | ------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ---------- |
| `state_tree` | `dict` | A dictionary representing the state tree of the program. The keys are the variable names, and the values are nested dictionaries representing the variable paths and their values. | *required* |
Source code in `synalinks/src/programs/program.py`
```
def set_state_tree(self, state_tree):
"""Assigns values to variables of the program.
This method takes a dictionary of nested variable values, which
represents the state tree of the program, and assigns them to the
corresponding variables of the program. The dictionary keys represent the
variable names (e.g., `'trainable_variables'`, `'optimizer_variables'`),
and the values are nested dictionaries containing the variable
paths and their corresponding values.
Args:
state_tree (dict): A dictionary representing the state tree of the program.
The keys are the variable names, and the values are nested
dictionaries representing the variable paths and their values.
"""
for k, v in state_tree.items():
path_value_dict = self._flatten_nested_dict(v)
if k == "trainable_variables":
self._assign_variable_values(self.trainable_variables, path_value_dict)
elif k == "non_trainable_variables":
self._assign_variable_values(
self.non_trainable_variables, path_value_dict
)
elif k == "optimizer_trainable_variables":
if self.optimizer:
self._assign_variable_values(
self.optimizer.trainable_variables, path_value_dict
)
elif k == "optimizer_non_trainable_variables":
if self.optimizer:
self._assign_variable_values(
self.optimizer.non_trainable_variables, path_value_dict
)
elif k == "metrics_variables":
self._assign_variable_values(self.metrics_variables, path_value_dict)
else:
raise ValueError(f"Unknown variable name: {k}")
```
## `summary(line_length=None, positions=None, print_fn=None, expand_nested=False, show_trainable=False, module_range=None)`
Prints a string summary of the program.
Parameters:
| Name | Type | Description | Default |
| ---------------- | ---------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `line_length` | `int` | Total length of printed lines (e.g. set this to adapt the display to different terminal window sizes). | `None` |
| `positions` | `list` | Relative or absolute positions of log elements in each line. If not provided, becomes [0.3, 0.6, 0.70, 1.]. Defaults to None. | `None` |
| `print_fn` | `Callable` | Print function to use. By default, prints to stdout. If stdout doesn't work in your environment, change to print. It will be called on each line of the summary. You can set it to a custom function in order to capture the string summary. | `None` |
| `expand_nested` | `bool` | Whether to expand the nested models. Defaults to False. | `False` |
| `show_trainable` | `bool` | Whether to show if a module is trainable. Defaults to False. | `False` |
| `module_range` | \`list | tuple\` | a list or tuple of 2 strings, which is the starting module name and ending module name (both inclusive) indicating the range of modules to be printed in summary. It also accepts regex patterns instead of exact names. In this case, the start predicate will be the first element that matches module_range[0] and the end predicate will be the last element that matches module_range[1]. By default None considers all modules of the model. |
Raises:
| Type | Description |
| ------------ | ------------------------------------------------- |
| `ValueError` | if summary() is called before the model is built. |
Source code in `synalinks/src/programs/program.py`
```
def summary(
self,
line_length=None,
positions=None,
print_fn=None,
expand_nested=False,
show_trainable=False,
module_range=None,
):
"""Prints a string summary of the program.
Args:
line_length (int): Total length of printed lines
(e.g. set this to adapt the display to different
terminal window sizes).
positions (list): Relative or absolute positions of log elements
in each line. If not provided, becomes
`[0.3, 0.6, 0.70, 1.]`. Defaults to `None`.
print_fn (Callable): Print function to use. By default, prints to `stdout`.
If `stdout` doesn't work in your environment, change to `print`.
It will be called on each line of the summary.
You can set it to a custom function
in order to capture the string summary.
expand_nested (bool): Whether to expand the nested models.
Defaults to `False`.
show_trainable (bool): Whether to show if a module is trainable.
Defaults to `False`.
module_range (list | tuple): a list or tuple of 2 strings,
which is the starting module name and ending module name
(both inclusive) indicating the range of modules to be printed
in summary. It also accepts regex patterns instead of exact
names. In this case, the start predicate will be
the first element that matches `module_range[0]`
and the end predicate will be the last element
that matches `module_range[1]`.
By default `None` considers all modules of the model.
Raises:
ValueError: if `summary()` is called before the model is built.
"""
summary_utils.print_summary(
self,
line_length=line_length,
positions=positions,
print_fn=print_fn,
expand_nested=expand_nested,
show_trainable=show_trainable,
module_range=module_range,
)
```
## `to_json(**kwargs)`
Returns a JSON string containing the network configuration.
```
json_string = program.to_json()
```
To load a network from a JSON save file, use `synalinks.programs.program_from_json(json_string, custom_objects={...})`.
Parameters:
| Name | Type | Description | Default |
| ---------- | ------------------- | ---------------------------------------------------------- | ------- |
| `**kwargs` | `keyword arguments` | Additional keyword arguments to be passed to json.dumps(). | `{}` |
Returns:
| Type | Description |
| ----- | -------------- |
| `str` | A JSON string. |
Source code in `synalinks/src/programs/program.py`
````
def to_json(self, **kwargs):
"""Returns a JSON string containing the network configuration.
```python
json_string = program.to_json()
```
To load a network from a JSON save file, use
`synalinks.programs.program_from_json(json_string, custom_objects={...})`.
Args:
**kwargs (keyword arguments): Additional keyword arguments to be passed to
`json.dumps()`.
Returns:
(str): A JSON string.
"""
from synalinks.src.saving import serialization_lib
program_config = serialization_lib.serialize_synalinks_object(self)
return json.dumps(program_config, **kwargs)
````
# The Sequential class
Bases: `Program`
`Sequential` groups a linear stack of modules into a `Program`.
Examples:
```
program = synalinks.Sequential(
name="chain_of_thought",
description="Useful to answer in a step by step manner."
)
program.add(
synalinks.Input(
data_program=Query,
)
)
program.add(
synalinks.Generator(
data_program=AnswerWithRationale,
language_program=language_program,
)
)
# Note that you can also omit the initial `Input`.
# In that case the program doesn't have any variables until the first call
# to a training/evaluation method (since it isn't yet built):
program = synalinks.Sequential(
name="chain_of_thought",
description="Useful to answer in a step by step manner."
)
program.add(
synalinks.Generator(
data_program=AnswerWithRationale,
language_program=language_program,
)
)
# program.variables not created yet
# Whereas if you specify an `Input`, the program gets built
# continuously as you are adding modules:
program = synalinks.Sequential(
name="chain_of_thought",
description="Useful to answer in a step by step manner."
)
program.add(
synalinks.Input(
data_program=Query,
)
)
program.add(
synalinks.Generator(
data_program=AnswerWithRationale,
language_program=language_program,
)
)
# Note that when using the delayed-build pattern (no input specified),
# the program gets built the first time you call `fit`, `eval`, or `predict`,
# or the first time you call the program on some input data.
```
Source code in `synalinks/src/programs/sequential.py`
````
@synalinks_export(["synalinks.Sequential", "synalinks.programs.Sequential"])
class Sequential(Program):
"""`Sequential` groups a linear stack of modules into a `Program`.
Examples:
```python
program = synalinks.Sequential(
name="chain_of_thought",
description="Useful to answer in a step by step manner."
)
program.add(
synalinks.Input(
data_program=Query,
)
)
program.add(
synalinks.Generator(
data_program=AnswerWithRationale,
language_program=language_program,
)
)
# Note that you can also omit the initial `Input`.
# In that case the program doesn't have any variables until the first call
# to a training/evaluation method (since it isn't yet built):
program = synalinks.Sequential(
name="chain_of_thought",
description="Useful to answer in a step by step manner."
)
program.add(
synalinks.Generator(
data_program=AnswerWithRationale,
language_program=language_program,
)
)
# program.variables not created yet
# Whereas if you specify an `Input`, the program gets built
# continuously as you are adding modules:
program = synalinks.Sequential(
name="chain_of_thought",
description="Useful to answer in a step by step manner."
)
program.add(
synalinks.Input(
data_program=Query,
)
)
program.add(
synalinks.Generator(
data_program=AnswerWithRationale,
language_program=language_program,
)
)
# Note that when using the delayed-build pattern (no input specified),
# the program gets built the first time you call `fit`, `eval`, or `predict`,
# or the first time you call the program on some input data.
```
"""
def __new__(cls, *args, **kwargs):
return typing.cast(cls, super().__new__(cls))
def __init__(self, modules=None, trainable=True, name=None, description=None):
if description is None:
raise ValueError(
"All Sequential programs must have a `description`, "
"please add it to the constructor arguments"
)
super().__init__(trainable=trainable, name=name, description=description)
self._functional = None
self._modules = []
if modules:
for module in modules:
self.add(module, rebuild=False)
run_maybe_nested(self._maybe_rebuild())
def add(self, module, rebuild=True):
"""Adds a module instance on top of the module stack.
Args:
module (Module): Module instance.
rebuild (bool): If `True` rebuild the program.
"""
# If we are passed a SymbolicDataModel created by synalinks.Input(), we
# extract the input module from its synalinks history and use that.
if hasattr(module, "_synalinks_history"):
origin_module = module._synalinks_history[0]
if isinstance(origin_module, InputModule):
module = origin_module
if not isinstance(module, Module):
raise ValueError(
"Only instances of `synalinks.Module` can be "
f"added to a Sequential program. Received: {module} "
f"(of type {type(module)})"
)
if not self._is_module_name_unique(module):
raise ValueError(
"All modules added to a Sequential program "
f"should have unique names. Name '{module.name}' is already "
"the name of a module in this program. Update the `name` argument "
"to pass a unique name."
)
if (
isinstance(module, InputModule)
and self._modules
and isinstance(self._modules[0], InputModule)
):
raise ValueError(
f"Sequential program '{self.name}' has already been configured "
f"to use input schema {self._modules[0].input_schema}. You cannot "
f"add a different Input module to it."
)
self._modules.append(module)
if rebuild:
run_maybe_nested(self._maybe_rebuild())
else:
self.built = False
self._functional = None
def pop(self, rebuild=True):
"""Removes the last module in the program.
Args:
rebuild (bool): If `True` rebuild the program.
"""
module = self._modules.pop()
self.built = False
self._functional = None
if rebuild:
run_maybe_nested(self._maybe_rebuild())
return module
async def _maybe_rebuild(self):
self.built = False
self._functional = None
if isinstance(self._modules[0], InputModule) and len(self._modules) > 1:
input_schema = self._modules[0].get_schema()
await self.build(Input(schema=input_schema))
elif hasattr(self._modules[0], "input_schema") and len(self._modules) > 1:
# We can build the Sequential program if the first module has the
# `input_schema` property. This is most commonly found in Functional
# program.
input_schema = self._modules[0].input_schema
await self.build(Input(schema=input_schema))
def _lock_state(self):
# Unlike other modules, Sequential is mutable after build.
pass
def _obj_type(self):
return "Sequential"
async def build(self, inputs):
try:
input_schema = standardize_schema(inputs.get_schema())
except Exception:
# Do not attempt to build if the program does not have a single
# input.
return
if not self._modules:
raise ValueError(
f"Sequential program {self.name} cannot be built because it has "
"no modules. Call `program.add(module)`."
)
if isinstance(self._modules[0], InputModule):
if self._modules[0].get_schema() != input_schema:
raise ValueError(
f"Sequential program '{self.name}' has already been "
"configured to use input schema "
f"{self._modules[0].get_schema()}. You cannot build it "
f"with input_schema {input_schema}"
)
else:
self._modules = [InputModule(schema=input_schema)] + self._modules
# Build functional program
inputs = self._modules[0].output
x = inputs
for module in self._modules[1:]:
try:
x = await module(x)
except NotImplementedError:
# Can happen if spec inference is not implemented.
# TODO: consider reverting inbound nodes on modules processed.
return
except TypeError as e:
signature = inspect.signature(module.call)
positional_args = [
param
for param in signature.parameters.values()
if param.default == inspect.Parameter.empty
]
if len(positional_args) != 1:
raise ValueError(
"Modules added to a Sequential program "
"can only have a single positional argument, "
f"the input data model. Module {module.__class__.__name__} "
f"has multiple positional arguments: {positional_args}"
)
raise e
outputs = x
self._functional = Functional(inputs=inputs, outputs=outputs)
self.built = True
async def call(self, inputs, training=None):
if self._functional:
return await self._functional.call(inputs, training=training)
# Fallback: Just apply the module sequence.
# This typically happens if `inputs` is a nested struct.
for module in self.modules:
# During each iteration, `inputs` are the inputs to `module`, and
# `outputs` are the outputs of `module` applied to `inputs`. At the
# end of each iteration `inputs` is set to `outputs` to prepare for
# the next module.
kwargs = {}
if module._call_has_training_arg and training is not None:
kwargs["training"] = training
outputs = await module(inputs, **kwargs)
inputs = outputs
return outputs
@property
def modules(self):
"""Unlike Keras, also output the potentially auto-generated `InputModule`"""
return self._modules
@modules.setter
def modules(self, _):
raise AttributeError(
"`Sequential.modules` attribute is reserved and should not be used. "
"Use `add()` and `pop()` to change the modules in this program."
)
async def compute_output_spec(self, inputs, training=None):
if self._functional:
return await self._functional.compute_output_spec(
inputs,
training=training,
)
# Direct application
for module in self.modules:
outputs = await module.compute_output_spec(inputs, training=training)
inputs = outputs
return outputs
@property
def input_schema(self):
if self._functional:
return self._functional.input_schema
raise AttributeError(
f"Sequential program '{self.name}' has no defined input schema yet."
)
@property
def output_schema(self):
if self._functional:
return self._functional.output_schema
raise AttributeError(
f"Sequential program '{self.name}' has no defined output schema yet."
)
@property
def inputs(self):
if self._functional:
return self._functional.inputs
raise AttributeError(
f"Sequential program '{self.name}' has no defined inputs yet."
)
@property
def outputs(self):
if self._functional:
return self._functional.outputs
raise AttributeError(
f"Sequential program '{self.name}' has no defined outputs yet."
)
def _is_module_name_unique(self, module):
for ref_module in self._modules:
if module.name == ref_module.name and ref_module is not module:
return False
return True
def get_config(self):
serialize_fn = serialization_lib.serialize_synalinks_object
module_configs = []
for module in self.modules:
module_configs.append(serialize_fn(module))
config = Program.get_config(self)
config["name"] = self.name
config["description"] = self.description
config["modules"] = copy.deepcopy(module_configs)
if self._functional is not None:
config["build_input_schema"] = self._modules[0].input_schema
return config
@classmethod
def from_config(cls, config, custom_objects=None):
if "name" in config:
name = config["name"]
build_input_schema = config.get("build_input_schema")
module_configs = config["modules"]
else:
name = None
module_configs = config
if "description" in config:
description = config["description"]
else:
description = None
program = cls(name=name, description=description)
for module_config in module_configs:
module = serialization_lib.deserialize_synalinks_object(
module_config,
custom_objects=custom_objects,
)
program.add(module)
if (
not program._functional
and "build_input_schema" in locals()
and build_input_schema
and isinstance(build_input_schema, (tuple, list))
):
program.build(build_input_schema)
return program
````
## `modules`
Unlike Keras, also output the potentially auto-generated `InputModule`
## `add(module, rebuild=True)`
Adds a module instance on top of the module stack.
Parameters:
| Name | Type | Description | Default |
| --------- | -------- | ---------------------------- | ---------- |
| `module` | `Module` | Module instance. | *required* |
| `rebuild` | `bool` | If True rebuild the program. | `True` |
Source code in `synalinks/src/programs/sequential.py`
```
def add(self, module, rebuild=True):
"""Adds a module instance on top of the module stack.
Args:
module (Module): Module instance.
rebuild (bool): If `True` rebuild the program.
"""
# If we are passed a SymbolicDataModel created by synalinks.Input(), we
# extract the input module from its synalinks history and use that.
if hasattr(module, "_synalinks_history"):
origin_module = module._synalinks_history[0]
if isinstance(origin_module, InputModule):
module = origin_module
if not isinstance(module, Module):
raise ValueError(
"Only instances of `synalinks.Module` can be "
f"added to a Sequential program. Received: {module} "
f"(of type {type(module)})"
)
if not self._is_module_name_unique(module):
raise ValueError(
"All modules added to a Sequential program "
f"should have unique names. Name '{module.name}' is already "
"the name of a module in this program. Update the `name` argument "
"to pass a unique name."
)
if (
isinstance(module, InputModule)
and self._modules
and isinstance(self._modules[0], InputModule)
):
raise ValueError(
f"Sequential program '{self.name}' has already been configured "
f"to use input schema {self._modules[0].input_schema}. You cannot "
f"add a different Input module to it."
)
self._modules.append(module)
if rebuild:
run_maybe_nested(self._maybe_rebuild())
else:
self.built = False
self._functional = None
```
## `pop(rebuild=True)`
Removes the last module in the program.
Parameters:
| Name | Type | Description | Default |
| --------- | ------ | ---------------------------- | ------- |
| `rebuild` | `bool` | If True rebuild the program. | `True` |
Source code in `synalinks/src/programs/sequential.py`
```
def pop(self, rebuild=True):
"""Removes the last module in the program.
Args:
rebuild (bool): If `True` rebuild the program.
"""
module = self._modules.pop()
self.built = False
self._functional = None
if rebuild:
run_maybe_nested(self._maybe_rebuild())
return module
```
## `FunctionCallingAgent`
Bases: `Module`
A trainable parallel function calling agent.
The agent has 2 different modes:
- Autonomous: It will execute tools as soon as called.
- Non-autonomous: It will return the tool arguments as a ChatMessage.
In *autonomous* mode, the agent accept **any kind of data model input** and perform a final inference to eventually format its final answer if a `data_model` or `schema` is provided.
Example:
```
import synalinks
import asyncio
class Query(synalinks.DataModel):
query: str = synalinks.Field(
description="The user query",
)
class NumericalFinalAnswer(synalinks.DataModel):
final_answer: float = synalinks.Field(
description="The correct final numerical answer",
)
async def calculate(expression: str):
"""Calculate the result of a mathematical expression.
Args:
expression (str): The mathematical expression to calculate, such as
'2 + 2'. The expression can contain numbers, operators (+, -, *, /),
parentheses, and spaces.
"""
if not all(char in "0123456789+-*/(). " for char in expression):
return {
"result": None,
"log": (
"Error: invalid characters in expression. "
"The expression can only contain numbers, operators (+, -, *, /),"
" parentheses, and spaces NOT letters."
),
}
try:
# Evaluate the mathematical expression safely
result = round(float(eval(expression, {"__builtins__": None}, {})), 2)
return {
"result": result,
"log": "Successfully executed",
}
except Exception as e:
return {
"result": None,
"log": f"Error: {e}",
}
async def main():
language_model = synalinks.LanguageModel(model="ollama/mistral")
tools = [
synalinks.Tool(calculate),
]
inputs = synalinks.Input(data_model=Query)
outputs = await synalinks.FunctionCallingAgent(
data_model=NumericalFinalAnswer,
tools=tools,
language_model=language_model,
max_iterations=5,
autonomous=True,
)(inputs)
agent = synalinks.Program(
inputs=inputs,
outputs=outputs,
name="math_agent",
description="A math agent",
)
input_query = Query(query="How much is 152648 + 485?")
response = await agent(input_query)
print(response.prettify_json())
if __name__ == "__main__":
asyncio.run(main())
```
Result:
```
{
"query": "How much is 152648 + 485?",
"messages": [
{
"role": "assistant",
"content": "Performing simple addition",
"tool_calls": [
{
"id": "92a3657c-1a45-46e6-8173-df4255b8423b",
"name": "calculate",
"arguments": {
"expression": "152648 + 485"
}
}
]
},
{
"role": "tool",
"content": {
"result": 153133.0,
"log": "Successfully executed"
},
"tool_call_id": "92a3657c-1a45-46e6-8173-df4255b8423b",
},
{
"role": "assistant",
"content": "The user has asked for a simple addition calculation. The assistant used the 'calculate' tool to perform this task, and the result was returned successfully.",
}
],
"final_answer": 153133.0
}
```
In *non-autonomous* mode (also called human in the loop or interactive mode), the user needs to validate/edit the tool arguments and send it back to the agent. In this mode, the agent requires an `ChatMessages` data model as input and output an `ChatMessage` (or `ChatMessages` if `return_inputs_with_trajectory` is true) back to the user. In that case, the agent ignore the `max_iterations` argument, as it will only perform one **step at a time**.
Example:
```
import synalinks
import asyncio
MAX_ITERATIONS = 5
async def calculate(expression: str):
"""Calculate the result of a mathematical expression.
Args:
expression (str): The mathematical expression to calculate, such as
'2 + 2'. The expression can contain numbers, operators (+, -, *, /),
parentheses, and spaces.
"""
if not all(char in "0123456789+-*/(). " for char in expression):
return {
"result": None,
"log": (
"Error: invalid characters in expression. "
"The expression can only contain numbers, operators (+, -, *, /),"
" parentheses, and spaces NOT letters."
),
}
try:
# Evaluate the mathematical expression safely
result = round(float(eval(expression, {"__builtins__": None}, {})), 2)
return {
"result": result,
"log": "Successfully executed",
}
except Exception as e:
return {
"result": None,
"log": f"Error: {e}",
}
async def main():
language_model = synalinks.LanguageModel(
model="ollama/mistral",
)
tools = [
synalinks.Tool(calculate),
]
inputs = synalinks.Input(data_model=synalinks.ChatMessages)
outputs = await synalinks.FunctionCallingAgent(
tools=tools,
language_model=language_model,
return_inputs_with_trajectory=True,
autonomous=False,
)(inputs)
agent = synalinks.Program(
inputs=inputs,
outputs=outputs,
name="math_agent",
description="A math agent",
)
input_messages = synalinks.ChatMessages(
messages=[
synalinks.ChatMessage(
role="user",
content="How much is 152648 + 485?",
)
]
)
for i in range(MAX_ITERATIONS):
response = await agent(input_messages)
print("Assistant response (with trajectory):")
print(response.prettify_json())
assistant_message = response.get("messages")[-1]
if not assistant_message.get("tool_calls"):
break # We stop the loop if the agent didn't call any tool
# Validate the tool calls arguments (with an UI or CLI)
# Then re-inject the validated assistant response in the input_messages
# The corresponding tools will be called by the agent
# Here we assume everything is okay for the purpose of the demo.
input_messages.messages.append(assistant_message)
if __name__ == "__main__":
asyncio.run(main())
```
The FunctionCallingAgent is compatible with MCP tools, here is an example on how to use it:
```
import synalinks
import asyncio
import litellm
class Query(synalinks.DataModel):
"""Input query data model"""
query: str = synalinks.Field(
description="The user query",
)
class FinalAnswer(synalinks.DataModel):
"""Final answer data model"""
answer: str = synalinks.Field(
description="The correct final answer",
)
async def main():
language_model = synalinks.LanguageModel(
model="ollama/mistral",
)
mcp_client = synalinks.MultiServerMCPClient(
{
"math": {
"url": "http://localhost:8183/mcp/",
"transport": "streamable_http",
},
}
)
tools = await mcp_client.get_tools()
inputs = synalinks.Input(data_model=Query)
outputs = await synalinks.FunctionCallingAgent(
data_model=FinalAnswer,
tools=tools,
language_model=language_model,
max_iterations=5,
autonomous=True,
)(inputs)
agent = synalinks.Program(
inputs=inputs,
outputs=outputs,
name="mcp_math_agent",
description="A math agent that can use an external calculator",
)
input_query = Query(query="How much is 152648 + 485?")
response = await agent(input_query)
print(response.prettify_json())
if __name__ == "__main__":
asyncio.run(main())
```
Source code in `synalinks/src/modules/agents/function_calling_agent.py`
````
@synalinks_export(
[
"synalinks.modules.FunctionCallingAgent",
"synalinks.FunctionCallingAgent",
]
)
class FunctionCallingAgent(Module):
"""A trainable parallel function calling agent.
The agent has 2 different modes:
- Autonomous: It will execute tools as soon as called.
- Non-autonomous: It will return the tool arguments as a ChatMessage.
In *autonomous* mode, the agent accept **any kind of data model input** and perform a final inference to
eventually format its final answer if a `data_model` or `schema` is provided.
Example:
```python
import synalinks
import asyncio
class Query(synalinks.DataModel):
query: str = synalinks.Field(
description="The user query",
)
class NumericalFinalAnswer(synalinks.DataModel):
final_answer: float = synalinks.Field(
description="The correct final numerical answer",
)
async def calculate(expression: str):
\"""Calculate the result of a mathematical expression.
Args:
expression (str): The mathematical expression to calculate, such as
'2 + 2'. The expression can contain numbers, operators (+, -, *, /),
parentheses, and spaces.
\"""
if not all(char in "0123456789+-*/(). " for char in expression):
return {
"result": None,
"log": (
"Error: invalid characters in expression. "
"The expression can only contain numbers, operators (+, -, *, /),"
" parentheses, and spaces NOT letters."
),
}
try:
# Evaluate the mathematical expression safely
result = round(float(eval(expression, {"__builtins__": None}, {})), 2)
return {
"result": result,
"log": "Successfully executed",
}
except Exception as e:
return {
"result": None,
"log": f"Error: {e}",
}
async def main():
language_model = synalinks.LanguageModel(model="ollama/mistral")
tools = [
synalinks.Tool(calculate),
]
inputs = synalinks.Input(data_model=Query)
outputs = await synalinks.FunctionCallingAgent(
data_model=NumericalFinalAnswer,
tools=tools,
language_model=language_model,
max_iterations=5,
autonomous=True,
)(inputs)
agent = synalinks.Program(
inputs=inputs,
outputs=outputs,
name="math_agent",
description="A math agent",
)
input_query = Query(query="How much is 152648 + 485?")
response = await agent(input_query)
print(response.prettify_json())
if __name__ == "__main__":
asyncio.run(main())
```
Result:
```json
{
"query": "How much is 152648 + 485?",
"messages": [
{
"role": "assistant",
"content": "Performing simple addition",
"tool_calls": [
{
"id": "92a3657c-1a45-46e6-8173-df4255b8423b",
"name": "calculate",
"arguments": {
"expression": "152648 + 485"
}
}
]
},
{
"role": "tool",
"content": {
"result": 153133.0,
"log": "Successfully executed"
},
"tool_call_id": "92a3657c-1a45-46e6-8173-df4255b8423b",
},
{
"role": "assistant",
"content": "The user has asked for a simple addition calculation. The assistant used the 'calculate' tool to perform this task, and the result was returned successfully.",
}
],
"final_answer": 153133.0
}
```
In *non-autonomous* mode (also called human in the loop or interactive mode), the
user needs to validate/edit the tool arguments and send it back to the agent. In this
mode, the agent requires an `ChatMessages` data model as input and output an
`ChatMessage` (or `ChatMessages` if `return_inputs_with_trajectory` is true)
back to the user. In that case, the agent ignore the `max_iterations` argument,
as it will only perform one **step at a time**.
Example:
```python
import synalinks
import asyncio
MAX_ITERATIONS = 5
async def calculate(expression: str):
\"""Calculate the result of a mathematical expression.
Args:
expression (str): The mathematical expression to calculate, such as
'2 + 2'. The expression can contain numbers, operators (+, -, *, /),
parentheses, and spaces.
\"""
if not all(char in "0123456789+-*/(). " for char in expression):
return {
"result": None,
"log": (
"Error: invalid characters in expression. "
"The expression can only contain numbers, operators (+, -, *, /),"
" parentheses, and spaces NOT letters."
),
}
try:
# Evaluate the mathematical expression safely
result = round(float(eval(expression, {"__builtins__": None}, {})), 2)
return {
"result": result,
"log": "Successfully executed",
}
except Exception as e:
return {
"result": None,
"log": f"Error: {e}",
}
async def main():
language_model = synalinks.LanguageModel(
model="ollama/mistral",
)
tools = [
synalinks.Tool(calculate),
]
inputs = synalinks.Input(data_model=synalinks.ChatMessages)
outputs = await synalinks.FunctionCallingAgent(
tools=tools,
language_model=language_model,
return_inputs_with_trajectory=True,
autonomous=False,
)(inputs)
agent = synalinks.Program(
inputs=inputs,
outputs=outputs,
name="math_agent",
description="A math agent",
)
input_messages = synalinks.ChatMessages(
messages=[
synalinks.ChatMessage(
role="user",
content="How much is 152648 + 485?",
)
]
)
for i in range(MAX_ITERATIONS):
response = await agent(input_messages)
print("Assistant response (with trajectory):")
print(response.prettify_json())
assistant_message = response.get("messages")[-1]
if not assistant_message.get("tool_calls"):
break # We stop the loop if the agent didn't call any tool
# Validate the tool calls arguments (with an UI or CLI)
# Then re-inject the validated assistant response in the input_messages
# The corresponding tools will be called by the agent
# Here we assume everything is okay for the purpose of the demo.
input_messages.messages.append(assistant_message)
if __name__ == "__main__":
asyncio.run(main())
```
The FunctionCallingAgent is compatible with MCP tools,
here is an example on how to use it:
```python
import synalinks
import asyncio
import litellm
class Query(synalinks.DataModel):
\"""Input query data model\"""
query: str = synalinks.Field(
description="The user query",
)
class FinalAnswer(synalinks.DataModel):
\"""Final answer data model\"""
answer: str = synalinks.Field(
description="The correct final answer",
)
async def main():
language_model = synalinks.LanguageModel(
model="ollama/mistral",
)
mcp_client = synalinks.MultiServerMCPClient(
{
"math": {
"url": "http://localhost:8183/mcp/",
"transport": "streamable_http",
},
}
)
tools = await mcp_client.get_tools()
inputs = synalinks.Input(data_model=Query)
outputs = await synalinks.FunctionCallingAgent(
data_model=FinalAnswer,
tools=tools,
language_model=language_model,
max_iterations=5,
autonomous=True,
)(inputs)
agent = synalinks.Program(
inputs=inputs,
outputs=outputs,
name="mcp_math_agent",
description="A math agent that can use an external calculator",
)
input_query = Query(query="How much is 152648 + 485?")
response = await agent(input_query)
print(response.prettify_json())
if __name__ == "__main__":
asyncio.run(main())
```
"""
def __init__(
self,
schema=None,
data_model=None,
language_model=None,
prompt_template=None,
examples=None,
instructions=None,
use_inputs_schema=False,
use_outputs_schema=False,
tools=None,
autonomous=True,
return_inputs_with_trajectory=True,
max_iterations=5,
name=None,
description=None,
):
super().__init__(
name=name,
description=description,
)
if not schema and data_model:
schema = data_model.get_schema()
self.schema = schema
self.prompt_template = prompt_template
if not instructions:
instructions = get_default_instructions()
self.instructions = instructions
self.examples = examples
self.use_inputs_schema = use_inputs_schema
self.use_outputs_schema = use_outputs_schema
self.language_model = language_model
self.tools = {}
if not tools:
raise ValueError("You must set the `tools` argument")
for tool in tools:
self.tools[tool.name] = tool
tool_calls_schema = dynamic_tool_calls(tools=tools)
self.autonomous = autonomous
self.return_inputs_with_trajectory = return_inputs_with_trajectory
self.max_iterations = max_iterations
self.tool_calls_generator = ChainOfThought(
schema=tool_calls_schema,
prompt_template=self.prompt_template,
examples=self.examples,
instructions=self.instructions,
use_inputs_schema=self.use_inputs_schema,
use_outputs_schema=self.use_outputs_schema,
language_model=self.language_model,
name="tool_calls_generator_"+self.name,
)
if self.schema and self.autonomous:
self.final_generator = ChainOfThought(
schema=self.schema,
language_model=self.language_model,
instructions=self.instructions,
return_inputs=self.return_inputs_with_trajectory,
name="final_generator_"+self.name,
)
async def call(self, inputs, training=False):
if not inputs:
return None
if self.autonomous:
if not is_chat_messages(inputs):
trajectory = await ops.concat(
inputs,
ChatMessages(),
name="trajectory_" + self.name,
)
else:
trajectory = inputs
else:
if not is_chat_messages(inputs):
raise ValueError(
"In interactive mode, the FunctionCallingAgent "
"needs an ChatMessages-like data model as inputs"
)
trajectory = inputs
agent_messages = trajectory.get("messages")
if self.autonomous:
for i in range(self.max_iterations):
tool_calls = await self.tool_calls_generator(trajectory)
if not tool_calls:
assistant_message = ChatMessage(
role=ChatRole.ASSISTANT,
content="Something went wrong while trying to decide the next action.",
)
agent_messages.append(assistant_message.get_json())
break
assistant_message = ChatMessage(
role=ChatRole.ASSISTANT,
content=tool_calls.get("thinking", ""),
)
if not tool_calls.get("tool_calls"):
agent_messages.append(assistant_message.get_json())
break
tasks = []
tool_calls_ids = []
for tool_call in tool_calls.get("tool_calls"):
tool_name = tool_call.get("tool_name")
tools_arguments = out_mask_json(tool_call, mask=["tool_name"])
tool_call_id = str(uuid.uuid4())
tool_calls_ids.append(tool_call_id)
assistant_message.tool_calls.append(
ToolCall(
id=tool_call_id,
name=tool_name,
arguments=tools_arguments,
)
)
tasks.append(self.tools[tool_name](**tools_arguments))
agent_messages.append(assistant_message.get_json())
tool_results = await asyncio.gather(*tasks, return_exceptions=True)
for j, tool_result in enumerate(tool_results):
tool_call_id = tool_calls_ids[j]
if isinstance(tool_result, Exception):
agent_messages.append(
ChatMessage(
role=ChatRole.TOOL,
tool_call_id=tool_call_id,
content="error: %s" % str(tool_result),
).get_json()
)
else:
agent_messages.append(
ChatMessage(
role=ChatRole.TOOL,
tool_call_id=tool_call_id,
content=tool_result,
).get_json()
)
trajectory.update({"messages": agent_messages})
if self.schema:
return await self.final_generator(trajectory)
else:
return trajectory
else:
if len(agent_messages) > 0:
if agent_messages[-1].get("role") == ChatRole.ASSISTANT:
tasks = []
tool_calls_ids = []
tool_calls = agent_messages[-1].get("tool_calls")
for tool_call in tool_calls:
tool_name = tool_call.get("name")
tools_arguments = tool_call.get("arguments")
tool_call_id = tool_call.get("id")
tool_calls_ids.append(tool_call_id)
tasks.append(self.tools[tool_name](**tools_arguments))
tool_results = await asyncio.gather(*tasks, return_exceptions=True)
for j, tool_result in enumerate(tool_results):
tool_call_id = tool_calls_ids[j]
if isinstance(tool_result, Exception):
agent_messages.append(
ChatMessage(
role=ChatRole.TOOL,
tool_call_id=tool_call_id,
content="error: %s" % str(tool_result),
).get_json()
)
else:
agent_messages.append(
ChatMessage(
role=ChatRole.TOOL,
tool_call_id=tool_call_id,
content=tool_result,
).get_json()
)
trajectory.update({"messages": agent_messages})
tool_calls = await self.tool_calls_generator(trajectory)
assistant_message = ChatMessage(
role=ChatRole.ASSISTANT,
content=tool_calls.get("thinking", ""),
tool_calls=[],
)
for tool_call in tool_calls.get("tool_calls", []):
tool_name = tool_call.get("tool_name")
tools_arguments = out_mask_json(tool_call, mask=["tool_name"])
tool_call_id = str(uuid.uuid4())
assistant_message.tool_calls.append(
ToolCall(
id=tool_call_id,
name=tool_name,
arguments=tools_arguments,
)
)
agent_messages.append(assistant_message.get_json())
trajectory.update({"messages": agent_messages})
if self.return_inputs_with_trajectory:
return JsonDataModel(
json=trajectory.get_json(),
schema=ChatMessages.get_schema(),
name=self.name,
)
else:
return JsonDataModel(
json=assistant_message.get_json(),
schema=ChatMessage.get_schema(),
name=self.name,
)
async def compute_output_spec(self, inputs, training=False):
if self.autonomous:
_ = await self.tool_calls_generator(inputs)
if self.schema:
return await self.final_generator(inputs)
else:
return SymbolicDataModel(
schema=ChatMessages.get_schema(),
name=self.name,
)
else:
if not is_chat_messages(inputs):
raise ValueError(
"In interactive mode, the FunctionCallingAgent "
"needs an ChatMessages-like data model as inputs"
)
_ = await self.tool_calls_generator(inputs)
if self.return_inputs_with_trajectory:
return SymbolicDataModel(
schema=ChatMessages.get_schema(),
name=self.name,
)
else:
return SymbolicDataModel(
schema=ChatMessage.get_schema(),
name=self.name,
)
def get_config(self):
config = {
"schema": self.schema,
"prompt_template": self.prompt_template,
"examples": self.examples,
"instructions": self.instructions,
"use_inputs_schema": self.use_inputs_schema,
"use_outputs_schema": self.use_outputs_schema,
"autonomous": self.autonomous,
"max_iterations": self.max_iterations,
"return_inputs_with_trajectory": self.return_inputs_with_trajectory,
"name": self.name,
"description": self.description,
}
language_model_config = {
"language_model": serialization_lib.serialize_synalinks_object(
self.language_model,
)
}
tools_config = {
"tools": [
serialization_lib.serialize_synalinks_object(tool)
for tool in self.tools.values()
]
}
return {**config, **language_model_config, **tools_config}
@classmethod
def from_config(cls, config):
tools = [
serialization_lib.deserialize_synalinks_object(tool)
for tool in config.pop("tools")
]
language_model = serialization_lib.deserialize_synalinks_object(
config.pop("language_model")
)
return cls(
language_model=language_model,
tools=tools,
**config,
)
````
## `get_default_instructions()`
The default parallel function calling agent instructions.
Source code in `synalinks/src/modules/agents/function_calling_agent.py`
```
def get_default_instructions():
"""The default parallel function calling agent instructions."""
return """
Think step by step: Use the thinking field to elaborate what you observe and what do you need to accomplish next.
Reflect on prior steps: Review your previous actions and their outcomes to avoid unnecessary repetition.
Avoid unnecessary actions: If you already have enough information to complete the user task, return an empty tool calls array.
""".strip()
```
## `Action`
Bases: `Module`
Use a `LanguageModel` to perform a function call given the input datamodel.
This module use structured output to call a given Python function. This module can be used in agents or traditional workflows seamlessly, it use the input data model to infer the function parameters.
The output of this module contains the inputs infered by the language model as well as the outputs of the function call.
Note: The function **MUST** return a JSON object dict and be asynchronous.
Example:
```
import synalinks
import asyncio
async def main():
class Query(synalinks.DataModel):
query: str
async def calculate(expression: str):
"""Calculate the result of a mathematical expression.
Args:
expression (str): The mathematical expression to calculate, such as
'2 + 2'. The expression can contain numbers, operators (+, -, *, /),
parentheses, and spaces.
"""
if not all(char in "0123456789+-*/(). " for char in expression):
return {
"result": None,
"log": "Error: invalid characters in expression",
}
try:
# Evaluate the mathematical expression safely
result = round(float(eval(expression, {"__builtins__": None}, {})), 2)
return {
"result": result,
"log": "Successfully executed",
}
except Exception as e:
return {
"result": None,
"log": f"Error: {e}",
}
language_model = synalinks.LanguageModel(
model="ollama/mistral",
)
x0 = synalinks.Input(data_model=Query)
x1 = await synalinks.Action(
fn=calculate,
language_model=language_model,
)(x0)
program = synalinks.Program(
inputs=x0,
outputs=x1,
name="calculator",
description="This program perform the calculation of an expression",
)
if __name__ == "__main__":
asyncio.run(main())
```
Parameters:
| Name | Type | Description | Default |
| -------------------- | --------------- | ---------------------------------------------------------------------------------------------------------------------------- | ---------- |
| `fn` | `Callable` | The function to call. | *required* |
| `language_model` | `LanguageModel` | The language model to use. | `None` |
| `prompt_template` | `str` | The default jinja2 prompt template to use (see Generator). | `None` |
| `examples` | `list` | The default examples to use in the prompt (see Generator). | `None` |
| `instructions` | `list` | The default instructions to use (see Generator). | `None` |
| `seed_instructions` | `list` | Optional. A list of instructions to use as seed for the optimization. If not provided, use the default instructions as seed. | `None` |
| `temperature` | `float` | Optional. The temperature for the LM call. | `0.0` |
| `use_inputs_schema` | `bool` | Optional. Whether or not use the inputs schema in the prompt (Default to False) (see Generator). | `False` |
| `use_outputs_schema` | `bool` | Optional. Whether or not use the outputs schema in the prompt (Default to False) (see Generator). | `False` |
| `name` | `str` | Optional. The name of the module. | `None` |
| `description` | `str` | Optional. The description of the module. | `None` |
| `trainable` | `bool` | Whether the module's variables should be trainable. | `True` |
Source code in `synalinks/src/modules/core/action.py`
````
@synalinks_export(
[
"synalinks.modules.Action",
"synalinks.Action",
]
)
class Action(Module):
"""Use a `LanguageModel` to perform a function call given the input datamodel.
This module use structured output to call a given Python function.
This module can be used in agents or traditional workflows seamlessly,
it use the input data model to infer the function parameters.
The output of this module contains the inputs infered by the language model
as well as the outputs of the function call.
Note: The function **MUST** return a JSON object dict and be asynchronous.
Example:
```python
import synalinks
import asyncio
async def main():
class Query(synalinks.DataModel):
query: str
async def calculate(expression: str):
\"""Calculate the result of a mathematical expression.
Args:
expression (str): The mathematical expression to calculate, such as
'2 + 2'. The expression can contain numbers, operators (+, -, *, /),
parentheses, and spaces.
\"""
if not all(char in "0123456789+-*/(). " for char in expression):
return {
"result": None,
"log": "Error: invalid characters in expression",
}
try:
# Evaluate the mathematical expression safely
result = round(float(eval(expression, {"__builtins__": None}, {})), 2)
return {
"result": result,
"log": "Successfully executed",
}
except Exception as e:
return {
"result": None,
"log": f"Error: {e}",
}
language_model = synalinks.LanguageModel(
model="ollama/mistral",
)
x0 = synalinks.Input(data_model=Query)
x1 = await synalinks.Action(
fn=calculate,
language_model=language_model,
)(x0)
program = synalinks.Program(
inputs=x0,
outputs=x1,
name="calculator",
description="This program perform the calculation of an expression",
)
if __name__ == "__main__":
asyncio.run(main())
```
Args:
fn (Callable): The function to call.
language_model (LanguageModel): The language model to use.
prompt_template (str): The default jinja2 prompt template
to use (see `Generator`).
examples (list): The default examples to use in the prompt
(see `Generator`).
instructions (list): The default instructions to use (see `Generator`).
seed_instructions (list): Optional. A list of instructions to use as seed for the
optimization. If not provided, use the default instructions as seed.
temperature (float): Optional. The temperature for the LM call.
use_inputs_schema (bool): Optional. Whether or not use the inputs schema in
the prompt (Default to False) (see `Generator`).
use_outputs_schema (bool): Optional. Whether or not use the outputs schema in
the prompt (Default to False) (see `Generator`).
name (str): Optional. The name of the module.
description (str): Optional. The description of the module.
trainable (bool): Whether the module's variables should be trainable.
"""
def __init__(
self,
fn,
language_model=None,
prompt_template=None,
examples=None,
instructions=None,
seed_instructions=None,
temperature=0.0,
use_inputs_schema=False,
use_outputs_schema=False,
name=None,
description=None,
trainable=True,
):
super().__init__(
name=name,
description=description,
trainable=trainable,
)
self.fn = fn
schema = tool_utils.Tool(fn).get_tool_schema()
self.language_model = language_model
self.prompt_template = prompt_template
self.examples = examples
self.instructions = instructions
self.seed_instructions = seed_instructions
self.temperature = temperature
self.use_inputs_schema = use_inputs_schema
self.use_outputs_schema = use_outputs_schema
self.action = Generator(
schema=schema,
language_model=self.language_model,
prompt_template=self.prompt_template,
examples=self.examples,
instructions=self.instructions,
seed_instructions=self.seed_instructions,
temperature=self.temperature,
use_inputs_schema=self.use_inputs_schema,
use_outputs_schema=self.use_outputs_schema,
name="generator_"+self.name,
)
async def call(self, inputs, training=False):
if not inputs:
return None
fn_inputs = await self.action(inputs, training=training)
try:
fn_outputs = await self.fn(**fn_inputs.get_json())
except Exception as e:
fn_outputs = {"error": str(e)}
generic_io = GenericIO(inputs=fn_inputs.get_json(), outputs=fn_outputs)
return JsonDataModel(
json=GenericAction(action=generic_io.get_json()).get_json(),
schema=GenericAction.get_schema(),
name=self.name,
)
async def compute_output_spec(self, inputs, training=False):
_ = await self.action(inputs)
return SymbolicDataModel(schema=GenericAction.get_schema(), name=self.name)
def get_config(self):
config = {
"fn": self.fn,
"prompt_template": self.prompt_template,
"examples": self.examples,
"instructions": self.instructions,
"seed_instructions": self.seed_instructions,
"temperature": self.temperature,
"name": self.name,
"description": self.description,
"trainable": self.trainable,
}
language_model_config = {
"language_model": serialization_lib.serialize_synalinks_object(
self.language_model
)
}
return {**config, **language_model_config}
@classmethod
def from_config(cls, config):
language_model = serialization_lib.deserialize_synalinks_object(
config.pop("language_model")
)
return cls(language_model=language_model, **config)
````
## `GenericAction`
Bases: `DataModel`
A generic action with inputs/outputs
Source code in `synalinks/src/modules/core/action.py`
```
class GenericAction(DataModel):
"""A generic action with inputs/outputs"""
action: GenericIO = Field(description="An action already performed")
```
## `Branch`
Bases: `Module`
Use a `LanguageModel` to select which module to call based on an arbitrary input, a question and a list of labels.
The selected branch output the data model computed using the inputs and module's branch, while the others output `None`.
Example:
```
import synalinks
import asyncio
async def main():
class Query(synalinks.DataModel):
query: str
class Answer(synalinks.DataModel):
answer: str
class AnswerWithCritique(synalinks.DataModel):
thinking: str
critique: str
answer: str
language_model = synalinks.LanguageModel(
model="ollama/mistral",
)
x0 = synalinks.Input(data_model=Query)
(x1, x2) = await synalinks.Branch(
question="What is the difficulty level of the above query?",
labels=["easy", "difficult"],
branches=[
synalinks.Generator(
data_model=Answer,
language_model=language_model,
),
synalinks.Generator(
data_model=AnswerWithCritique,
language_model=language_model,
),
],
language_model=language_model,
)(x0)
x3 = x1 | x2
program = synalinks.Program(
inputs=x0,
outputs=x3,
name="adaptative_chain_of_thought",
description="Useful to answer step by step only when needed",
)
if __name__ == "__main__":
asyncio.run(main())
```
Parameters:
| Name | Type | Description | Default |
| -------------------- | --------------- | ---------------------------------------------------------------------------------------------------------------------------- | ---------- |
| `question` | `str` | The question to ask. | `None` |
| `labels` | `list` | The list of labels to choose from (strings). | `None` |
| `branches` | `list` | The list of modules or programs to select from. | `None` |
| `inject_decision` | `bool` | If True, inject the decision to the branch inputs. (default to True). | `True` |
| `return_decision` | `bool` | If True, return the decision with the branch outputs. (default to True). | `True` |
| `language_model` | `LanguageModel` | The language model to use. | `None` |
| `prompt_template` | `str` | The default jinja2 prompt template to use (see Generator). | `None` |
| `examples` | `list` | The default examples to use in the prompt (see Decision). | `None` |
| `instructions` | `list` | The default instructions to use (see Decision). | `None` |
| `seed_instructions` | `list` | Optional. A list of instructions to use as seed for the optimization. If not provided, use the default instructions as seed. | `None` |
| `temperature` | `float` | Optional. The temperature for the LM call. | `0.0` |
| `use_inputs_schema` | `bool` | Optional. Whether or not use the inputs schema in the decision prompt (Default to False) (see Decision). | `False` |
| `use_outputs_schema` | `bool` | Optional. Whether or not use the outputs schema in the decision prompt (Default to False) (see Decision). | `False` |
| `decision_type` | `bool` | Optional. The type of decision module to use. | `Decision` |
| `name` | `str` | Optional. The name of the module. | `None` |
| `description` | `str` | Optional. The description of the module. | `None` |
| `trainable` | `bool` | Whether the module's variables should be trainable. | `True` |
Source code in `synalinks/src/modules/core/branch.py`
````
@synalinks_export(["synalinks.modules.Branch", "synalinks.Branch"])
class Branch(Module):
"""Use a `LanguageModel` to select which module to call based on an arbitrary
input, a question and a list of labels.
The selected branch output the data model computed using
the inputs and module's branch, while the others output `None`.
Example:
```python
import synalinks
import asyncio
async def main():
class Query(synalinks.DataModel):
query: str
class Answer(synalinks.DataModel):
answer: str
class AnswerWithCritique(synalinks.DataModel):
thinking: str
critique: str
answer: str
language_model = synalinks.LanguageModel(
model="ollama/mistral",
)
x0 = synalinks.Input(data_model=Query)
(x1, x2) = await synalinks.Branch(
question="What is the difficulty level of the above query?",
labels=["easy", "difficult"],
branches=[
synalinks.Generator(
data_model=Answer,
language_model=language_model,
),
synalinks.Generator(
data_model=AnswerWithCritique,
language_model=language_model,
),
],
language_model=language_model,
)(x0)
x3 = x1 | x2
program = synalinks.Program(
inputs=x0,
outputs=x3,
name="adaptative_chain_of_thought",
description="Useful to answer step by step only when needed",
)
if __name__ == "__main__":
asyncio.run(main())
```
Args:
question (str): The question to ask.
labels (list): The list of labels to choose from (strings).
branches (list): The list of modules or programs to select from.
inject_decision (bool): If True, inject the decision to the branch inputs.
(default to True).
return_decision (bool): If True, return the decision with the branch outputs.
(default to True).
language_model (LanguageModel): The language model to use.
prompt_template (str): The default jinja2 prompt template
to use (see `Generator`).
examples (list): The default examples to use in the prompt
(see `Decision`).
instructions (list): The default instructions to use (see `Decision`).
seed_instructions (list): Optional. A list of instructions to use as seed for the
optimization. If not provided, use the default instructions as seed.
temperature (float): Optional. The temperature for the LM call.
use_inputs_schema (bool): Optional. Whether or not use the inputs
schema in the decision prompt (Default to False) (see `Decision`).
use_outputs_schema (bool): Optional. Whether or not use the outputs
schema in the decision prompt (Default to False) (see `Decision`).
decision_type (bool): Optional. The type of decision module to use.
name (str): Optional. The name of the module.
description (str): Optional. The description of the module.
trainable (bool): Whether the module's variables should be trainable.
"""
def __init__(
self,
question=None,
labels=None,
branches=None,
inject_decision=True,
return_decision=True,
language_model=None,
prompt_template=None,
examples=None,
instructions=None,
seed_instructions=None,
temperature=0.0,
use_inputs_schema=False,
use_outputs_schema=False,
decision_type=Decision,
name=None,
description=None,
trainable=True,
**kwargs,
):
super().__init__(
name=name,
description=description,
trainable=trainable,
)
if not branches:
raise ValueError("The `branches` argument must be provided.")
if not isinstance(branches, list):
raise ValueError("The `branches` must be a list of `Module` or `Program`.")
if len(labels) != len(branches):
raise ValueError("The `labels` and `branches` must have the same length.")
self.question = question
self.labels = labels
self.branches = {labels[i]: m for i, m in enumerate(branches)}
self.inject_decision = inject_decision
self.return_decision = return_decision
self.language_model = language_model
self.prompt_template = prompt_template
self.examples = examples
self.instructions = instructions
self.seed_instructions = seed_instructions
self.temperature = temperature
self.use_inputs_schema = use_inputs_schema
self.use_outputs_schema = use_outputs_schema
self.decision = decision_type(
question=self.question,
labels=self.labels,
language_model=self.language_model,
prompt_template=self.prompt_template,
examples=self.examples,
instructions=self.instructions,
seed_instructions=self.seed_instructions,
temperature=self.temperature,
use_inputs_schema=self.use_inputs_schema,
use_outputs_schema=self.use_outputs_schema,
name="decision_" + self.name,
)
async def call(self, inputs, training=False):
outputs = [None] * len(self.branches)
if not inputs:
return tuple(outputs)
decision = await self.decision(
inputs,
training=training,
)
choice = decision.get("choice", decision.get("choices"))
if not choice:
return tuple(outputs)
if self.inject_decision:
inputs = await ops.concat(
inputs,
decision,
name="inputs_with_decision_" + self.name,
)
tasks = []
async def execute_branch(
inputs, module=None, decision=None, return_decision=False
):
if not inputs:
return None
if return_decision:
return await ops.logical_and(
decision,
await module(inputs),
)
else:
return await module(inputs)
for label in self.labels:
module = self.branches[label]
selected = False
if isinstance(choice, str):
if label == choice:
selected = True
elif isinstance(choice, (list, set)):
if label in choice:
selected = True
if selected and module:
tasks.append(
execute_branch(
inputs,
module,
decision,
return_decision=self.return_decision,
)
)
else:
tasks.append(execute_branch(None))
outputs = await asyncio.gather(*tasks)
return tuple(outputs)
async def compute_output_spec(self, inputs, training=False):
outputs = []
decision = await self.decision(
inputs,
training=training,
)
if self.inject_decision:
inputs = await ops.concat(
inputs,
decision,
name="inputs_with_decision_" + self.name,
)
for label in self.labels:
module = self.branches[label]
if self.return_decision:
outputs.append(
await ops.logical_and(
decision,
await module(
inputs,
training=training,
),
name="with_decision_" + self.name,
)
)
else:
outputs.append(
await module(
inputs,
training=training,
)
)
return tuple(outputs)
def get_config(self):
config = {
"question": self.question,
"labels": self.labels,
"inject_decision": self.inject_decision,
"return_decision": self.return_decision,
"prompt_template": self.prompt_template,
"examples": self.examples,
"instructions": self.instructions,
"seed_instructions": self.seed_instructions,
"temperature": self.temperature,
"use_inputs_schema": self.use_inputs_schema,
"use_outputs_schema": self.use_outputs_schema,
"name": self.name,
"description": self.description,
"trainable": self.trainable,
}
language_model_config = {
"language_model": serialization_lib.serialize_synalinks_object(
self.language_model
)
}
branches_config = {
"branches": [
serialization_lib.serialize_synalinks_object(branch)
for branch in self.branches.values()
]
}
return {**config, **language_model_config, **branches_config}
@classmethod
def from_config(cls, config, custom_objects=None):
language_model = serialization_lib.deserialize_synalinks_object(
config.pop("language_model")
)
branches = [
serialization_lib.deserialize_synalinks_object(
branch_config, custom_objects=custom_objects
)
for branch_config in config.pop("branches")
]
return cls(language_model=language_model, branches=branches, **config)
````
## `Decision`
Bases: `Module`
Perform a decision on the given input based on a question and a list of labels.
This module dynamically create an `Enum` schema based on the given labels and use it to generate a possible answer using structured output.
This ensure that the LM answer is **always** one of the provided labels.
Example:
```
import synalinks
import asyncio
async def main():
language_model = synalinks.LanguageModel(
model="ollama/mistral",
)
x0 = synalinks.Input(data_model=synalinks.ChatMessages)
x1 = await synalinks.Decision(
question="What is the danger level of the discussion?",
labels=["low", "medium", "high"],
language_model=language_model,
)(x0)
program = synalinks.Program(
inputs=x0,
outputs=x1,
name="discussion_danger_assessment",
description="This program assesses the level of danger in a discussion.",
)
if __name__ == "__main__":
asyncio.run(main())
```
You can view this module, as performing a single label classification on the input.
Parameters:
| Name | Type | Description | Default |
| -------------------- | --------------- | ---------------------------------------------------------------------------------------------------------------------------- | ------- |
| `question` | `str` | The question to ask. | `None` |
| `labels` | `list` | The list of labels to choose from (strings). | `None` |
| `language_model` | `LanguageModel` | The language model to use. | `None` |
| `prompt_template` | `str` | The default jinja2 prompt template to use (see Generator). | `None` |
| `examples` | `list` | The default examples to use in the prompt (see Generator). | `None` |
| `instructions` | `list` | The default instructions to use (see Generator). | `None` |
| `seed_instructions` | `list` | Optional. A list of instructions to use as seed for the optimization. If not provided, use the default instructions as seed. | `None` |
| `temperature` | `float` | Optional. The temperature for the LM call. | `0.0` |
| `use_inputs_schema` | `bool` | Optional. Whether or not use the inputs schema in the prompt (Default to False) (see Generator). | `False` |
| `use_outputs_schema` | `bool` | Optional. Whether or not use the outputs schema in the prompt (Default to False) (see Generator). | `False` |
| `name` | `str` | Optional. The name of the module. | `None` |
| `description` | `str` | Optional. The description of the module. | `None` |
| `trainable` | `bool` | Whether the module's variables should be trainable. | `True` |
Source code in `synalinks/src/modules/core/decision.py`
````
@synalinks_export(["synalinks.modules.Decision", "synalinks.Decision"])
class Decision(Module):
"""Perform a decision on the given input based on a question and a list of labels.
This module dynamically create an `Enum` schema based on the given labels and
use it to generate a possible answer using structured output.
This ensure that the LM answer is **always** one of the provided labels.
Example:
```python
import synalinks
import asyncio
async def main():
language_model = synalinks.LanguageModel(
model="ollama/mistral",
)
x0 = synalinks.Input(data_model=synalinks.ChatMessages)
x1 = await synalinks.Decision(
question="What is the danger level of the discussion?",
labels=["low", "medium", "high"],
language_model=language_model,
)(x0)
program = synalinks.Program(
inputs=x0,
outputs=x1,
name="discussion_danger_assessment",
description="This program assesses the level of danger in a discussion.",
)
if __name__ == "__main__":
asyncio.run(main())
```
You can view this module, as performing a single label classification on the input.
Args:
question (str): The question to ask.
labels (list): The list of labels to choose from (strings).
language_model (LanguageModel): The language model to use.
prompt_template (str): The default jinja2 prompt template
to use (see `Generator`).
examples (list): The default examples to use in the prompt
(see `Generator`).
instructions (list): The default instructions to use (see `Generator`).
seed_instructions (list): Optional. A list of instructions to use as seed for the
optimization. If not provided, use the default instructions as seed.
temperature (float): Optional. The temperature for the LM call.
use_inputs_schema (bool): Optional. Whether or not use the inputs schema in
the prompt (Default to False) (see `Generator`).
use_outputs_schema (bool): Optional. Whether or not use the outputs schema in
the prompt (Default to False) (see `Generator`).
name (str): Optional. The name of the module.
description (str): Optional. The description of the module.
trainable (bool): Whether the module's variables should be trainable.
"""
def __init__(
self,
question=None,
labels=None,
language_model=None,
prompt_template=None,
examples=None,
instructions=None,
seed_instructions=None,
temperature=0.0,
use_inputs_schema=False,
use_outputs_schema=False,
name=None,
description=None,
trainable=True,
):
super().__init__(
name=name,
description=description,
trainable=trainable,
)
if not question:
raise ValueError("The `question` argument must be provided.")
if not labels:
raise ValueError("The `labels` argument must be provided.")
if not isinstance(labels, list):
raise ValueError("The `labels` parameter must be a list of string.")
schema = dynamic_enum(DecisionAnswer.get_schema(), "choice", labels)
self.schema = schema
self.question = question
self.labels = labels
self.language_model = language_model
self.prompt_template = prompt_template
self.examples = examples
if not instructions:
instructions = default_decision_instructions(self.labels)
self.instructions = instructions
self.temperature = temperature
self.use_inputs_schema = use_inputs_schema
self.use_outputs_schema = use_outputs_schema
self.decision = Generator(
schema=self.schema,
language_model=self.language_model,
prompt_template=self.prompt_template,
examples=self.examples,
instructions=self.instructions,
temperature=self.temperature,
use_inputs_schema=self.use_inputs_schema,
use_outputs_schema=self.use_outputs_schema,
name="generator_" + self.name,
)
async def call(self, inputs, training=False):
if not inputs:
return None
inputs = await ops.concat(
inputs,
Question(question=self.question),
name="inputs_with_question_" + self.name,
)
result = await self.decision(inputs, training=training)
return result
def get_config(self):
config = {
"question": self.question,
"labels": self.labels,
"prompt_template": self.prompt_template,
"examples": self.examples,
"instructions": self.instructions,
"seed_instructions": self.seed_instructions,
"temperature": self.temperature,
"use_inputs_schema": self.use_inputs_schema,
"use_outputs_schema": self.use_outputs_schema,
"name": self.name,
"description": self.description,
"trainable": self.trainable,
}
language_model_config = {
"language_model": serialization_lib.serialize_synalinks_object(
self.language_model
)
}
return {**config, **language_model_config}
@classmethod
def from_config(cls, config):
language_model = serialization_lib.deserialize_synalinks_object(
config.pop("language_model")
)
return cls(language_model=language_model, **config)
````
## `default_decision_instructions(labels)`
The decision default instructions
Source code in `synalinks/src/modules/core/decision.py`
```
def default_decision_instructions(labels):
"""The decision default instructions"""
return f"""
You will be given a question, your task is to answer step-by-step to choose one the following labels: {labels}
""".strip()
```
## `Generator`
Bases: `Module`
Use a `LanguageModel` to generate a data model from an arbitrary input data model.
Example:
```
import synalinks
import asyncio
async def main():
class Query(DataModel):
query: str = synalinks.Field(
description="The user query",
)
class AnswerWithCritique(synalinks.DataModel):
thinking: str = synalinks.Field(
description="Your step by step thinking",
)
critique: str = synalinks.Field(
description="The critique of the above thinking",
)
answer: str = synalinks.Field(
description="The correct answer",
)
language_model = synalinks.LanguageModel(
model="ollama/mistral",
)
x0 = synalinks.Input(data_model=Query)
x1 = await synalinks.Generator(
data_model=AnswerWithCritique,
language_model=language_model,
)(x0)
program = synalinks.Program(
inputs=x0,
outputs=x1,
name="chain_of_thought_with_critique",
description="Useful to answer step by step and evaluate your answer",
)
if __name__ == "__main__":
asyncio.run(main())
```
Parameters:
| Name | Type | Description | Default |
| -------------------- | --------------- | ---------------------------------------------------------------------------------------------------------------------------- | --------------- |
| `schema` | `dict` | The target JSON schema. If not provided use the data_model to infer it. | `None` |
| `data_model` | \`DataModel | SymbolicDataModel | JsonDataModel\` |
| `language_model` | `LanguageModel` | The language model to use. | `None` |
| `prompt_template` | `str` | The jinja2 prompt template. | `None` |
| `examples` | `list` | The default list of examples, the examples are a list of tuples containing input/output JSON pairs. | `None` |
| `instructions` | `str` | The default instructions being a string containing instructions for the language model. | `None` |
| `seed_instructions` | `list` | Optional. A list of instructions to use as seed for the optimization. If not provided, use the default instructions as seed. | `None` |
| `use_inputs_schema` | `bool` | Optional. Whether or not use the inputs schema in the prompt (Default to False). | `False` |
| `use_outputs_schema` | `bool` | Optional. Whether or not use the outputs schema in the prompt (Default to False). | `False` |
| `return_inputs` | `bool` | Optional. Whether or not to concatenate the inputs to the outputs (Default to False). | `False` |
| `temperature` | `float` | Optional. The temperature for the LM call. | `0.0` |
| `streaming` | `str` | Optional. If true stream the LM response, enabled only if schema is None and only during inference (not during training). | `False` |
| `name` | `str` | Optional. The name of the module. | `None` |
| `description` | `str` | Optional. The description of the module. | `None` |
| `trainable` | `bool` | Whether the module's variables should be trainable. | `True` |
Source code in `synalinks/src/modules/core/generator.py`
````
@synalinks_export(["synalinks.modules.Generator", "synalinks.Generator"])
class Generator(Module):
"""
Use a `LanguageModel` to generate a data model from an arbitrary input data model.
Example:
```python
import synalinks
import asyncio
async def main():
class Query(DataModel):
query: str = synalinks.Field(
description="The user query",
)
class AnswerWithCritique(synalinks.DataModel):
thinking: str = synalinks.Field(
description="Your step by step thinking",
)
critique: str = synalinks.Field(
description="The critique of the above thinking",
)
answer: str = synalinks.Field(
description="The correct answer",
)
language_model = synalinks.LanguageModel(
model="ollama/mistral",
)
x0 = synalinks.Input(data_model=Query)
x1 = await synalinks.Generator(
data_model=AnswerWithCritique,
language_model=language_model,
)(x0)
program = synalinks.Program(
inputs=x0,
outputs=x1,
name="chain_of_thought_with_critique",
description="Useful to answer step by step and evaluate your answer",
)
if __name__ == "__main__":
asyncio.run(main())
```
Args:
schema (dict): The target JSON schema.
If not provided use the `data_model` to infer it.
data_model (DataModel | SymbolicDataModel | JsonDataModel): The target data
model for structured output.
language_model (LanguageModel): The language model to use.
prompt_template (str): The jinja2 prompt template.
examples (list): The default list of examples, the examples
are a list of tuples containing input/output JSON pairs.
instructions (str): The default instructions being a string containing
instructions for the language model.
seed_instructions (list): Optional. A list of instructions to use as seed for the
optimization. If not provided, use the default instructions as seed.
use_inputs_schema (bool): Optional. Whether or not use the inputs schema in
the prompt (Default to False).
use_outputs_schema (bool): Optional. Whether or not use the outputs schema in
the prompt (Default to False).
return_inputs (bool): Optional. Whether or not to concatenate the inputs to
the outputs (Default to False).
temperature (float): Optional. The temperature for the LM call.
streaming (str): Optional. If true stream the LM response, enabled only if
`schema` is `None` and only during inference (not during training).
name (str): Optional. The name of the module.
description (str): Optional. The description of the module.
trainable (bool): Whether the module's variables should be trainable.
"""
def __init__(
self,
schema=None,
data_model=None,
language_model=None,
prompt_template=None,
examples=None,
instructions=None,
seed_instructions=None,
use_inputs_schema=False,
use_outputs_schema=False,
return_inputs=False,
temperature=0.0,
streaming=False,
name=None,
description=None,
trainable=True,
):
super().__init__(
name=name,
description=description,
trainable=trainable,
)
if not schema and data_model:
schema = data_model.get_schema()
self.schema = schema
if not language_model:
raise ValueError("You should provide `language_model` parameter.")
self.language_model = language_model
if not prompt_template:
prompt_template = default_prompt_template()
self.prompt_template = prompt_template
if not examples:
examples = []
self.examples = examples
if not instructions and self.schema:
data_model_keys = list(self.schema["properties"].keys())
instructions = default_instructions(data_model_keys)
self.instructions = instructions
self.return_inputs = return_inputs
self.temperature = temperature
self.use_inputs_schema = use_inputs_schema
self.use_outputs_schema = use_outputs_schema
if schema and streaming:
streaming = False
self.streaming = streaming
predictions = [
Prediction(
inputs=example[0],
outputs=example[1],
reward=None,
).get_json()
for example in examples
]
if not seed_instructions:
seed_instructions = []
self.seed_instructions = seed_instructions
seed_candidates = [
{
"instructions": seed_instruction,
}
for seed_instruction in self.seed_instructions
]
self.state = self.add_variable(
initializer=Instructions(
instructions=instructions,
examples=predictions,
seed_candidates=seed_candidates,
).get_json(),
data_model=Instructions,
name="state_" + self.name,
)
async def call(self, inputs, training=False):
if not inputs:
return None
msgs = self.format_messages(inputs)
if self.streaming and not training:
streaming = True
else:
streaming = False
result = await ops.predict(
msgs,
schema=self.schema,
language_model=self.language_model,
streaming=streaming,
name="prediction_" + self.name,
temperature=self.temperature,
)
if streaming:
return result
if result:
if training:
predictions = self.state.get("current_predictions")
predictions.append(
{
"inputs": inputs.get_json(),
"outputs": result.get_json(),
"reward": None,
}
)
if self.return_inputs:
return await ops.concat(
inputs,
result,
name="with_inputs_" + self.name,
)
else:
return result
return None
async def compute_output_spec(self, inputs, training=False):
if self.schema:
if self.return_inputs:
return await ops.concat(
inputs,
SymbolicDataModel(
schema=self.schema,
name=self.name,
),
name="with_inputs_" + self.name,
)
else:
return SymbolicDataModel(
schema=self.schema,
name=self.name,
)
else:
if self.return_inputs:
return await ops.concat(
inputs,
SymbolicDataModel(
schema=ChatMessage.get_schema(),
name=self.name,
),
name="with_inputs_" + self.name,
)
else:
return SymbolicDataModel(
schema=ChatMessage.get_schema(),
name=self.name,
)
def format_messages(self, inputs=None):
template = jinja2.Template(self.prompt_template)
rendered_prompt = template.render(
inputs_schema=inputs.get_schema() if self.use_inputs_schema else None,
outputs_schema=self.schema if self.use_outputs_schema else None,
examples=[
(pred.get("inputs"), pred.get("outputs"))
for pred in self.state.get("examples")
],
instructions=self.state.get("instructions"),
inputs=inputs.get_json() if inputs else None,
)
matches = XML_TAGS_REGEX.findall(rendered_prompt)
extracted_tags = [(match[0], match[1].strip()) for match in matches]
msgs = ChatMessages()
for message in extracted_tags:
role, content = message
if content:
msgs.messages.append(ChatMessage(role=role, content=content))
return msgs
def get_config(self):
config = {
"schema": self.schema,
"prompt_template": self.prompt_template,
"examples": self.examples,
"instructions": self.instructions,
"seed_instructions": self.seed_instructions,
"use_inputs_schema": self.use_inputs_schema,
"use_outputs_schema": self.use_outputs_schema,
"return_inputs": self.return_inputs,
"temperature": self.temperature,
"name": self.name,
"description": self.description,
"trainable": self.trainable,
}
language_model_config = {
"language_model": serialization_lib.serialize_synalinks_object(
self.language_model,
)
}
return {
**config,
**language_model_config,
}
@classmethod
def from_config(cls, config):
language_model = serialization_lib.deserialize_synalinks_object(
config.pop("language_model"),
)
return cls(
language_model=language_model,
**config,
)
````
## `default_prompt_template()`
Returns the default prompt template.
Returns:
| Type | Description |
| ----- | ---------------------------- |
| `str` | The default prompt template. |
Source code in `synalinks/src/modules/core/generator.py`
```
@synalinks_export("synalinks.default_prompt_template")
def default_prompt_template():
"""Returns the default prompt template.
Returns:
(str): The default prompt template.
"""
return """
{{ instructions }}
{% if inputs_schema %}
{{ inputs_schema }}
{% endif %}{% if outputs_schema %}
{{ outputs_schema }}
{% endif %}{% if examples %}
{% for example in examples %}
Input:
{{ example[0] }}
Output:
{{ example[1] }}
{% endfor %}
{% endif %}
{% if inputs %}
Input:
{{ inputs }}
Output:
{% endif %}
""".strip()
```
## `Identity`
Bases: `Module`
Identity module.
This module should be used as a placeholder when no operation is to be performed. The module just returns its `inputs` argument as output.
This module can be really useful during development process in order to implement the whole program architecture before the individual modules.
It avoid any data models naming issue that you could have by just forwarding the inputs, that way you can implement the general program architecture, validate it and implement the individual modules later.
Example:
```
import synalinks
class MyAwesomeModule(synalinks.Program):
def __init__(
name=None,
description=None,
trainable=True,
):
super().__init__(
name=name,
description=description,
trainable=trainable,
)
async def build(self, inputs):
outputs = await synalinks.Identity()(inputs)
super().__init__(
inputs=inputs,
outputs=outputs,
name=self.name,
description=self.description,
trainable=self.trainable,
)
```
Parameters:
| Name | Type | Description | Default |
| ---------- | ------------------- | ------------------------------ | ------- |
| `**kwargs` | `keyword arguments` | The default module's arguments | `{}` |
Source code in `synalinks/src/modules/core/identity.py`
````
@synalinks_export(["synalinks.modules.Identity", "synalinks.Identity"])
class Identity(Module):
"""Identity module.
This module should be used as a placeholder when no operation is to be
performed. The module just returns its `inputs` argument as output.
This module can be really useful during development process in order to
implement the whole program architecture before the individual modules.
It avoid any data models naming issue that you could have by just
forwarding the inputs, that way you can implement the general
program architecture, validate it and implement the individual
modules later.
Example:
```python
import synalinks
class MyAwesomeModule(synalinks.Program):
def __init__(
name=None,
description=None,
trainable=True,
):
super().__init__(
name=name,
description=description,
trainable=trainable,
)
async def build(self, inputs):
outputs = await synalinks.Identity()(inputs)
super().__init__(
inputs=inputs,
outputs=outputs,
name=self.name,
description=self.description,
trainable=self.trainable,
)
```
Args:
**kwargs (keyword arguments): The default module's arguments
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.built = True
async def call(self, inputs):
if isinstance(inputs, (JsonDataModel, SymbolicDataModel)):
return inputs.clone()
return tree.map_structure(
lambda x: x.clone(),
inputs,
)
````
## `Input(schema=None, data_model=None, optional=False, name=None)`
Used to instantiate a `SymbolicDataModel`.
A `SymbolicDataModel` is a symbolic data model-like object, which we augment with certain attributes that allow us to build a Synalinks `Program` just by knowing the inputs and outputs of the program (similar to Keras symbolic tensor).
Example:
```
import synalinks
class Query(synalinks.DataModel):
query: str
inputs = synalinks.Input(data_model=Query)
# You can also create it using a JSON schema like this:
inputs = synalinks.Input(schema=Query.get_schema())
# Or using a symbolic datamodel:
inputs = synalinks.Input(data_model=Query.to_symbolic_data_model())
```
Parameters:
| Name | Type | Description | Default |
| ------------ | ----------- | --------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- |
| `schema` | `dict` | A Json schema of the data_model. If not provided uses the data_model argument. | `None` |
| `data_model` | `DataModel` | Optional existing data model to wrap into the Input layer. If set, the module will use this data_model rather than creating a new placeholder data model. | `None` |
| `optional` | `bool` | Whether the input is optional or not. An optional input can accept None values. | `False` |
| `name` | `string` | Optional name string for the module. Should be unique in a program (do not reuse the same name twice). It will be autogenerated if it isn't provided. | `None` |
Returns:
| Type | Description |
| ------------------- | --------------------------------------------------------------------- |
| `SymbolicDataModel` | The symbolic data model corresponding to the given data model/schema. |
Source code in `synalinks/src/modules/core/input_module.py`
````
@synalinks_export(["synalinks.modules.Input", "synalinks.Input"])
def Input(
schema=None,
data_model=None,
optional=False,
name=None,
):
"""Used to instantiate a `SymbolicDataModel`.
A `SymbolicDataModel` is a symbolic data model-like object, which we augment with
certain attributes that allow us to build a Synalinks `Program` just by knowing the
inputs and outputs of the program (similar to Keras symbolic tensor).
Example:
```python
import synalinks
class Query(synalinks.DataModel):
query: str
inputs = synalinks.Input(data_model=Query)
# You can also create it using a JSON schema like this:
inputs = synalinks.Input(schema=Query.get_schema())
# Or using a symbolic datamodel:
inputs = synalinks.Input(data_model=Query.to_symbolic_data_model())
```
Args:
schema (dict): A Json schema of the data_model.
If not provided uses the `data_model` argument.
data_model (DataModel): Optional existing data model to wrap into
the `Input` layer. If set, the module will use this data_model rather
than creating a new placeholder data model.
optional (bool): Whether the input is optional or not.
An optional input can accept `None` values.
name (string): Optional name string for the module.
Should be unique in a program (do not reuse the same name twice).
It will be autogenerated if it isn't provided.
Returns:
(SymbolicDataModel): The symbolic data model corresponding to
the given data model/schema.
"""
module = InputModule(
schema=schema,
input_data_model=data_model.to_symbolic_data_model() if data_model else None,
optional=optional,
name=name,
)
return module.output
````
## `Not`
Bases: `Module`
Not module.
This module should be used as a placeholder when no operation is to be performed and the output should be None.
This module is useful to implement stop conditions when combined with a conditional branch or as placeholder (like the Identity) before implementing guards that leverage the xor operation.
Parameters:
| Name | Type | Description | Default |
| ---------- | ------------------- | ------------------------------ | ------- |
| `**kwargs` | `keyword arguments` | The default module's arguments | `{}` |
Source code in `synalinks/src/modules/core/not_module.py`
```
@synalinks_export(["synalinks.modules.Not", "synalinks.Not"])
class Not(Module):
"""Not module.
This module should be used as a placeholder when no operation is to be
performed and the output should be None.
This module is useful to implement stop conditions when combined with a conditional
branch or as placeholder (like the Identity) before implementing guards that leverage
the xor operation.
Args:
**kwargs (keyword arguments): The default module's arguments
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.built = True
async def call(self, inputs):
if isinstance(inputs, (JsonDataModel, SymbolicDataModel)):
return None
return tree.map_structure(
lambda x: None,
inputs,
)
async def compute_output_spec(self, inputs):
if isinstance(inputs, (JsonDataModel, SymbolicDataModel)):
return inputs.clone()
return tree.map_structure(
lambda x: x.clone(),
inputs,
)
```
## `Embedding`
Bases: `Module`
Extracts and updates the embedding vector of entities.
This module is designed to work with `Entity`, `Relation`, `Entities`, `Relations` or `KnowledgeGraph` data models. It supports to mask the entity fields in order to keep **only one** field to embed per entity.
**Note**: Each entity should have the *same field* to compute the embedding from like a `name` or `description` field using `in_mask`. **Or** every entity should have *only one field left* after masking using `out_mask` argument.
```
import synalinks
import asyncio
from typing import Literal
class Document(synalinks.Entity):
label: Literal["Document"]
text: str = synalinks.Field(
description="The document content",
)
async def main():
inputs = synalinks.Input(data_model=Document)
outputs = await synalinks.Embedding(
embedding_model=embedding_model,
in_mask=["text"],
)(inputs)
program = synalinks.Program(
inputs=inputs,
outputs=outputs,
name="embbed_document",
description="Embbed the given documents"
)
doc = Document(
label="Document",
text="my document",
)
result = await program(doc)
if __name__ == "__main__":
asyncio.run(main())
```
If you want to process batch asynchronously use `program.predict()` instead, see the [FAQ](https://synalinks.github.io/synalinks/FAQ/#whats-the-difference-between-program-methods-predict-and-__call__) to understand the difference between `program()` and `program.predict()`
Here is an example:
```
import synalinks
import asyncio
import numpy as np
from typing import Literal
class Document(synalinks.Entity):
label: Literal["Document"]
text: str = synalinks.Field(
description="The document content",
)
async def main():
inputs = synalinks.Input(data_model=Document)
outputs = await synalinks.Embedding(
embedding_model=embedding_model,
in_mask=["text"],
)(inputs)
program = synalinks.Program(
inputs=inputs,
outputs=outputs,
name="embbed_document",
description="Embbed the given documents"
)
doc1 = Document(label="Document", text="my document 1")
doc2 = Document(label="Document", text="my document 2")
doc3 = Document(label="Document", text="my document 3")
docs = np.array([doc1, doc2, doc3], dtype="object")
embedded_docs = await program.predict(docs)
if __name__ == "__main__":
asyncio.run(main())
```
Parameters:
| Name | Type | Description | Default |
| ----------------- | ---------------- | --------------------------------------------------- | ------- |
| `embedding_model` | `EmbeddingModel` | The embedding model to use. | `None` |
| `in_mask` | `list` | A mask applied to keep specific entity fields. | `None` |
| `out_mask` | `list` | A mask applied to remove specific entity fields. | `None` |
| `name` | `str` | Optional. The name of the module. | `None` |
| `description` | `str` | Optional. The description of the module. | `None` |
| `trainable` | `bool` | Whether the module's variables should be trainable. | `False` |
Source code in `synalinks/src/modules/knowledge/embedding.py`
````
@synalinks_export(
[
"synalinks.modules.Embedding",
"synalinks.Embedding",
]
)
class Embedding(Module):
"""Extracts and updates the embedding vector of entities.
This module is designed to work with `Entity`, `Relation`, `Entities`,
`Relations` or `KnowledgeGraph` data models. It supports to mask the
entity fields in order to keep **only one** field to embed per entity.
**Note**: Each entity should have the *same field* to compute the embedding
from like a `name` or `description` field using `in_mask`.
**Or** every entity should have *only one field left* after masking using
`out_mask` argument.
```python
import synalinks
import asyncio
from typing import Literal
class Document(synalinks.Entity):
label: Literal["Document"]
text: str = synalinks.Field(
description="The document content",
)
async def main():
inputs = synalinks.Input(data_model=Document)
outputs = await synalinks.Embedding(
embedding_model=embedding_model,
in_mask=["text"],
)(inputs)
program = synalinks.Program(
inputs=inputs,
outputs=outputs,
name="embbed_document",
description="Embbed the given documents"
)
doc = Document(
label="Document",
text="my document",
)
result = await program(doc)
if __name__ == "__main__":
asyncio.run(main())
```
If you want to process batch asynchronously
use `program.predict()` instead, see the [FAQ](https://synalinks.github.io/synalinks/FAQ/#whats-the-difference-between-program-methods-predict-and-__call__)
to understand the difference between `program()` and `program.predict()`
Here is an example:
```python
import synalinks
import asyncio
import numpy as np
from typing import Literal
class Document(synalinks.Entity):
label: Literal["Document"]
text: str = synalinks.Field(
description="The document content",
)
async def main():
inputs = synalinks.Input(data_model=Document)
outputs = await synalinks.Embedding(
embedding_model=embedding_model,
in_mask=["text"],
)(inputs)
program = synalinks.Program(
inputs=inputs,
outputs=outputs,
name="embbed_document",
description="Embbed the given documents"
)
doc1 = Document(label="Document", text="my document 1")
doc2 = Document(label="Document", text="my document 2")
doc3 = Document(label="Document", text="my document 3")
docs = np.array([doc1, doc2, doc3], dtype="object")
embedded_docs = await program.predict(docs)
if __name__ == "__main__":
asyncio.run(main())
```
Args:
embedding_model (EmbeddingModel): The embedding model to use.
in_mask (list): A mask applied to keep specific entity fields.
out_mask (list): A mask applied to remove specific entity fields.
name (str): Optional. The name of the module.
description (str): Optional. The description of the module.
trainable (bool): Whether the module's variables should be trainable.
"""
def __init__(
self,
embedding_model=None,
in_mask=None,
out_mask=None,
name=None,
description=None,
trainable=False,
):
super().__init__(
name=name,
description=description,
trainable=trainable,
)
self.embedding_model = embedding_model
self.in_mask = in_mask
self.out_mask = out_mask
async def _embed_entity(self, entity):
# # Check if entity is already embedded and has valid embeddings
embeddings = entity.get("embeddings")
if embeddings:
warnings.warn(
"Embeddings already generated for entity.Returning original entity."
)
return JsonDataModel(
json=entity.get_json(),
schema=entity.get_schema(),
name=entity.name + "_embedded",
)
# Apply masking to the entity
filtered_entity = entity # Default to original entity
if self.out_mask:
filtered_entity = await ops.out_mask(
entity,
mask=self.out_mask,
recursive=False,
name=entity.name + "_out_mask",
)
elif self.in_mask:
filtered_entity = await ops.in_mask(
entity,
mask=self.in_mask,
recursive=False,
name=entity.name + "_in_mask",
)
# Generate embeddings
embeddings = await ops.embedding(
filtered_entity,
embedding_model=self.embedding_model,
name=entity.name + "_embedding",
)
# Validate embeddings
if not embeddings or not embeddings.get("embeddings"):
warnings.warn(
f"No embeddings generated for entity {entity.name}. "
"Please check that your schema is correct."
)
return None
embedding_list = embeddings.get("embeddings")
if len(embedding_list) != 1:
warnings.warn(
"Entities can only have one embedding vector per entity, "
"adjust `Embedding` module's `in_mask` or `out_mask` "
"to keep only one field. Skipping embedding."
)
return None
# Add embedding to entity
vector = embedding_list[0]
return await ops.concat(
entity,
EmbeddingVector(embedding=vector),
name=entity.name + "_embedded",
)
async def _embed_relation(self, relation):
subj = relation.get_nested_entity("subj")
obj = relation.get_nested_entity("obj")
if not subj or not obj:
return None
embedded_subj = await self._embed_entity(subj)
embedded_obj = await self._embed_entity(obj)
relation_json = copy.deepcopy(relation.get_json())
relation_json.update(
{
"subj": embedded_subj.get_json(),
"obj": embedded_obj.get_json(),
}
)
outputs_schema = copy.deepcopy(relation.get_schema())
# Update schema definitions for embedded entities
if outputs_schema.get("$defs"):
subj_label = subj.get("label")
obj_label = obj.get("label")
if subj_label and subj_label in outputs_schema["$defs"]:
embedded_subj_schema = embedded_subj.get_schema()
if embedded_subj_schema.get("properties"):
outputs_schema["$defs"][subj_label]["properties"].update(
embedded_subj_schema["properties"]
)
if obj_label and obj_label in outputs_schema["$defs"]:
embedded_obj_schema = embedded_obj.get_schema()
if embedded_obj_schema.get("properties"):
outputs_schema["$defs"][obj_label]["properties"].update(
embedded_obj_schema["properties"]
)
return JsonDataModel(
json=relation_json,
schema=outputs_schema,
name=relation.name + "_embedded",
)
async def call(self, inputs):
if not inputs:
return None
if is_knowledge_graph(inputs):
entities_json = []
relations_json = []
outputs_schema = copy.deepcopy(inputs.get_schema())
# Process entities
for entity in inputs.get_nested_entity_list("entities"):
embedded_entity = await self._embed_entity(entity)
if embedded_entity:
entities_json.append(embedded_entity.get_json())
# Update schema definitions
if outputs_schema.get("$defs"):
entity_label = entity.get("label")
if entity_label and entity_label in outputs_schema["$defs"]:
embedded_schema = embedded_entity.get_schema()
if embedded_schema.get("properties"):
outputs_schema["$defs"][entity_label][
"properties"
].update(embedded_schema["properties"])
# Process relations
for relation in inputs.get_nested_entity_list("relations"):
embedded_relation = await self._embed_relation(relation)
if embedded_relation:
relations_json.append(embedded_relation.get_json())
embedded_schema = embedded_relation.get_schema()
if embedded_schema.get("$defs") and outputs_schema.get("$defs"):
for def_key, def_value in embedded_schema["$defs"].items():
if def_key in outputs_schema["$defs"]:
# Merge properties if they exist
if def_value.get("properties") and outputs_schema[
"$defs"
][def_key].get("properties"):
outputs_schema["$defs"][def_key]["properties"].update(
def_value["properties"]
)
else:
outputs_schema["$defs"][def_key] = def_value
# Update output JSON
outputs_json = inputs.get_json()
outputs_json.update({"entities": entities_json, "relations": relations_json})
return JsonDataModel(
json=outputs_json,
schema=outputs_schema,
name=inputs.name + "_embedded",
)
elif is_entities(inputs):
entities_json = []
outputs_schema = copy.deepcopy(inputs.get_schema())
# Process all entities and collect schema updates
for entity in inputs.get_nested_entity_list("entities"):
embedded_entity = await self._embed_entity(entity)
if embedded_entity:
entities_json.append(embedded_entity.get_json())
# Update schema definitions
if outputs_schema.get("$defs"):
entity_label = entity.get("label")
if entity_label and entity_label in outputs_schema["$defs"]:
embedded_schema = embedded_entity.get_schema()
if embedded_schema.get("properties"):
outputs_schema["$defs"][entity_label][
"properties"
].update(embedded_schema["properties"])
# Update output JSON with embedded entities
outputs_json = inputs.get_json()
outputs_json.update({"entities": entities_json})
return JsonDataModel(
json=outputs_json,
schema=outputs_schema,
name=inputs.name + "_embedded",
)
elif is_relations(inputs):
relations_json = []
outputs_schema = copy.deepcopy(inputs.get_schema())
# Process all relations
for relation in inputs.get_nested_entity_list("relations"):
embedded_relation = await self._embed_relation(relation)
if embedded_relation:
relations_json.append(embedded_relation.get_json())
# Merge schema definitions from embedded relation
embedded_schema = embedded_relation.get_schema()
if embedded_schema.get("$defs") and outputs_schema.get("$defs"):
for def_key, def_value in embedded_schema["$defs"].items():
if def_key in outputs_schema["$defs"]:
# Merge properties if they exist
if def_value.get("properties") and outputs_schema[
"$defs"
][def_key].get("properties"):
outputs_schema["$defs"][def_key]["properties"].update(
def_value["properties"]
)
else:
outputs_schema["$defs"][def_key] = def_value
# Update output JSON
outputs_json = inputs.get_json()
outputs_json.update({"relations": relations_json})
return JsonDataModel(
json=outputs_json,
schema=outputs_schema,
name=inputs.name + "_embedded",
)
elif is_relation(inputs):
return await self._embed_relation(inputs)
elif is_entity(inputs):
return await self._embed_entity(inputs)
else:
return None
async def compute_output_spec(self, inputs):
return inputs.clone(name=inputs.name + "_embedded")
def get_config(self):
config = {
"in_mask": self.in_mask,
"out_mask": self.out_mask,
"name": self.name,
"description": self.description,
"trainable": self.trainable,
}
embedding_model_config = {
"embedding_model": serialization_lib.serialize_synalinks_object(
self.embedding_model
)
}
return {**embedding_model_config, **config}
@classmethod
def from_config(cls, config):
embedding_model = serialization_lib.deserialize_synalinks_object(
config.pop("embedding_model")
)
return cls(embedding_model=embedding_model, **config)
````
## `UpdateKnowledge`
Bases: `Module`
Update the given knowledge base.
This module requires an `Entity`, `Relation`, `Entities`, `Relations` or `KnowledgeGraph` data model as input.
This module perform alignment automatically, also called deduplication, by using the similarity search of the knowledge base. This way of performing alignment is more performant than using a linear alignement algorithm as it use the hierarchical small world neighbors (HSWN) algorithm of the knowledge base.
It however needs to have the entities embeded using the `Embedding` module before updating the knwoledge base.
Parameters:
| Name | Type | Description | Default |
| ---------------- | --------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- |
| `knowledge_base` | `KnowledgeBase` | The knowledge base to update. | `None` |
| `threshold` | `float` | Similarity threshold for entity alignment. Entities with similarity above this threshold may be merged. Should be between 0.0 and 1.0 (Defaults to 0.8). | `0.8` |
| `name` | `str` | Optional. The name of the module. | `None` |
| `description` | `str` | Optional. The description of the module. | `None` |
| `trainable` | `bool` | Whether the module's variables should be trainable. | `False` |
Source code in `synalinks/src/modules/knowledge/update_knowledge.py`
```
@synalinks_export(
[
"synalinks.modules.UpdateKnowledge",
"synalinks.UpdateKnowledge",
]
)
class UpdateKnowledge(Module):
"""Update the given knowledge base.
This module requires an `Entity`, `Relation`, `Entities`,
`Relations` or `KnowledgeGraph` data model as input.
This module perform alignment automatically, also called deduplication,
by using the similarity search of the knowledge base. This way of performing
alignment is more performant than using a linear alignement algorithm as it use
the hierarchical small world neighbors (HSWN) algorithm of the knowledge base.
It however needs to have the entities embeded using the `Embedding` module before
updating the knwoledge base.
Args:
knowledge_base (KnowledgeBase): The knowledge base to update.
threshold (float): Similarity threshold for entity alignment.
Entities with similarity above this threshold may be merged.
Should be between 0.0 and 1.0 (Defaults to 0.8).
name (str): Optional. The name of the module.
description (str): Optional. The description of the module.
trainable (bool): Whether the module's variables should be trainable.
"""
def __init__(
self,
knowledge_base=None,
threshold=0.8,
name=None,
description=None,
trainable=False,
):
super().__init__(
name=name,
description=description,
trainable=trainable,
)
self.knowledge_base = knowledge_base
self.threshold = threshold
async def call(self, inputs):
if not inputs:
return None
if is_knowledge_graph(inputs):
for entity in inputs.get_nested_entity_list("entities"):
_ = await ops.update_knowledge(
entity,
knowledge_base=self.knowledge_base,
threshold=self.threshold,
name=entity.name + "_updated",
)
for relation in inputs.get_nested_entity_list("relations"):
subj = relation.get_nested_entity("subj")
if not subj:
continue
_ = await ops.update_knowledge(
subj,
knowledge_base=self.knowledge_base,
threshold=self.threshold,
name=subj.name + "_updated",
)
obj = relation.get_nested_entity("obj")
if not obj:
continue
_ = await ops.update_knowledge(
obj,
knowledge_base=self.knowledge_base,
threshold=self.threshold,
name=obj.name + "_updated",
)
_ = await ops.update_knowledge(
relation,
knowledge_base=self.knowledge_base,
threshold=self.threshold,
name=relation.name + "_updated",
)
return inputs.clone(name=inputs.name + "_updated")
elif is_entities(inputs):
for entity in inputs.get_nested_entity_list("entities"):
_ = await ops.update_knowledge(
entity,
knowledge_base=self.knowledge_base,
threshold=self.threshold,
name=entity.name + "_updated",
)
return inputs.clone(name=inputs.name + "_updated")
elif is_relations(inputs):
for relation in inputs.get_nested_entity_list("relations"):
subj = relation.get_nested_entity("subj")
if not subj:
continue
_ = await ops.update_knowledge(
subj,
knowledge_base=self.knowledge_base,
threshold=self.threshold,
name=subj.name + "_updated",
)
obj = relation.get_nested_entity("obj")
if not obj:
continue
_ = await ops.update_knowledge(
obj,
knowledge_base=self.knowledge_base,
threshold=self.threshold,
name=obj.name + "_updated",
)
_ = await ops.update_knowledge(
relation,
knowledge_base=self.knowledge_base,
threshold=self.threshold,
name=relation.name + "_updated",
)
return inputs.clone(name=inputs.name + "_updated")
elif is_relation(inputs):
subj = inputs.get_nested_entity("subj")
_ = await ops.update_knowledge(
subj,
knowledge_base=self.knowledge_base,
threshold=self.threshold,
name=subj.name + "_updated",
)
obj = inputs.get_nested_entity("obj")
_ = await ops.update_knowledge(
obj,
knowledge_base=self.knowledge_base,
threshold=self.threshold,
name=obj.name + "_updated",
)
return inputs.clone(name=inputs.name + "_updated")
elif is_entity(inputs):
_ = await ops.update_knowledge(
inputs,
knowledge_base=self.knowledge_base,
threshold=self.threshold,
name=inputs.name + "_updated",
)
return inputs.clone(name=inputs.name + "_updated")
else:
return None
async def compute_output_spec(self, inputs):
return inputs.clone()
def get_config(self):
config = {
"threshold": self.threshold,
"name": self.name,
"description": self.description,
"trainable": self.trainable,
}
knowledge_base_config = {
"knowledge_base": serialization_lib.serialize_synalinks_object(
self.knowledge_base
)
}
return {**knowledge_base_config, **config}
@classmethod
def from_config(cls, config):
knowledge_base = serialization_lib.deserialize_synalinks_object(
config.pop("knowledge_base")
)
return cls(knowledge_base=knowledge_base, **config)
```
## `And`
Bases: `Module`
Perform a logical And operation.
It takes as input a list of data models, and returns a concatenation of them.
If any input is None, then it output None.
Table:
| `x1` | `x2` | Logical And (`&`) |
| ------ | ------ | ----------------- |
| `x1` | `x2` | `x1 + x2` |
| `x1` | `None` | `None` |
| `None` | `x2` | `None` |
| `None` | `None` | `None` |
Parameters:
| Name | Type | Description | Default |
| ---------- | ------------------- | ------------------------------------------ | ------- |
| `**kwargs` | `keyword arguments` | Standard keyword arguments for the module. | `{}` |
Source code in `synalinks/src/modules/merging/logical_and.py`
```
@synalinks_export(
[
"synalinks.And",
"synalinks.modules.And",
]
)
class And(Module):
"""Perform a logical And operation.
It takes as input a list of data models,
and returns a concatenation of them.
If any input is None, then it output None.
Table:
| `x1` | `x2` | Logical And (`&`) |
| ------ | ------ | ----------------- |
| `x1` | `x2` | `x1 + x2` |
| `x1` | `None` | `None` |
| `None` | `x2` | `None` |
| `None` | `None` | `None` |
Args:
**kwargs (keyword arguments): Standard keyword arguments for the module.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
async def call(self, inputs, training=False):
output = inputs[0]
for i in range(1, len(inputs)):
output = await ops.logical_and(
output,
inputs[i],
name=f"module_and_{i}_" + self.name,
)
return output
```
## `Concat`
Bases: `Module`
Perform a concatenation operation.
It takes as input a list of data models, and returns a concatenation of them.
If any input is None, an exception is raised.
Table:
| `x1` | `x2` | Concat (`+`) |
| ------ | ------ | ------------ |
| `x1` | `x2` | `x1 + x2` |
| `x1` | `None` | `Exception` |
| `None` | `x2` | `Exception` |
| `None` | `None` | `Exception` |
Parameters:
| Name | Type | Description | Default |
| ---------- | ------------------- | ------------------------------------------ | ------- |
| `**kwargs` | `keyword arguments` | Standard keyword arguments for the module. | `{}` |
Source code in `synalinks/src/modules/merging/concat.py`
```
@synalinks_export(
[
"synalinks.Concat",
"synalinks.Concatenate",
"synalinks.modules.Concat",
"synalinks.modules.Concatenate",
]
)
class Concat(Module):
"""Perform a concatenation operation.
It takes as input a list of data models,
and returns a concatenation of them.
If any input is None, an exception is raised.
Table:
| `x1` | `x2` | Concat (`+`) |
| ------ | ------ | ----------------- |
| `x1` | `x2` | `x1 + x2` |
| `x1` | `None` | `Exception` |
| `None` | `x2` | `Exception` |
| `None` | `None` | `Exception` |
Args:
**kwargs (keyword arguments): Standard keyword arguments for the module.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
async def call(self, inputs, training=False):
output = inputs[0]
for i in range(1, len(inputs)):
output = await ops.concat(
output,
inputs[i],
name=f"module_concat_{i}_" + self.name,
)
return output
```
## `Or`
Bases: `Module`
Perform a logical Or operation.
It takes as input a list of data models, and returns a concatenation of them (if all are provided) otherwise it output the one that is not None.
If any input is None, it is ignored.
Table:
| `x1` | `x2` | Logical Or (`|`) | | --- | --- | --- | | `x1` | `x2` | `x1 + x2` | | `x1` | `None` | `x1` | | `None` | `x2` | `x2` | | `None` | `None` | `None` |
Parameters:
| Name | Type | Description | Default |
| ---------- | ------------------- | ------------------------------------------ | ------- |
| `**kwargs` | `keyword arguments` | Standard keyword arguments for the module. | `{}` |
Source code in `synalinks/src/modules/merging/logical_or.py`
```
@synalinks_export(
[
"synalinks.Or",
"synalinks.modules.Or",
]
)
class Or(Module):
"""Perform a logical Or operation.
It takes as input a list of data models,
and returns a concatenation of them (if all are provided)
otherwise it output the one that is not None.
If any input is None, it is ignored.
Table:
| `x1` | `x2` | Logical Or (`|`) |
| ------ | ------ | ---------------- |
| `x1` | `x2` | `x1 + x2` |
| `x1` | `None` | `x1` |
| `None` | `x2` | `x2` |
| `None` | `None` | `None` |
Args:
**kwargs (keyword arguments): Standard keyword arguments for the module.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
async def call(self, inputs, training=False):
output = inputs[0]
for i in range(1, len(inputs)):
output = await ops.logical_or(
output,
inputs[i],
name=f"module_or_{i}_" + self.name,
)
return output
```
## `Xor`
Bases: `Module`
Perform a logical Xor operation.
It takes as input a list of data models, If more than two data models are not None, then it output None. otherwise it output the one that is not None.
Table:
| `x1` | `x2` | Logical Xor (`^`) |
| ------ | ------ | ----------------- |
| `x1` | `x2` | `None` |
| `x1` | `None` | `x1` |
| `None` | `x2` | `x2` |
| `None` | `None` | `None` |
Parameters:
| Name | Type | Description | Default |
| ---------- | ------------------- | ------------------------------------------ | ------- |
| `**kwargs` | `keyword arguments` | Standard keyword arguments for the module. | `{}` |
Source code in `synalinks/src/modules/merging/logical_xor.py`
```
@synalinks_export(
[
"synalinks.Xor",
"synalinks.modules.Xor",
]
)
class Xor(Module):
"""Perform a logical Xor operation.
It takes as input a list of data models,
If more than two data models are not None, then it output None.
otherwise it output the one that is not None.
Table:
| `x1` | `x2` | Logical Xor (`^`)|
| ------ | ------ | ---------------- |
| `x1` | `x2` | `None` |
| `x1` | `None` | `x1` |
| `None` | `x2` | `x2` |
| `None` | `None` | `None` |
Args:
**kwargs (keyword arguments): Standard keyword arguments for the module.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
async def compute_output_spec(self, inputs, training=False):
return inputs[0].clone()
async def call(self, inputs, training=False):
output = inputs[0]
for i in range(1, len(inputs)):
if inputs[i]:
if not output:
output = inputs[i]
else:
return None
return output.clone(name=self.name)
```
## `EntityRetriever`
Bases: `Module`
Retrieve entities from a knowledge base, based on the embedding vector.
This module is useful to implement vector-only (retrieval augmented generation) RAG systems, for KAG (knowledge augmented generation) systems see the `KnowledgeRetriever` module.
If you give multiple entity models to this module, the LM will select the most suitable one to perform the search. Having multiple entity models to search for is an easy way to enhance the performance of you RAG system by having multiple indexes (one per entity model type).
```
import synalinks
import asyncio
from typing import Literal
class Query(synalinks.DataModel):
query: str = synalinks.Field(
description="The user query",
)
class Answer(synalinks.DataModel):
query: str = synalinks.Field(
description="The answer to the user query",
)
class Document(synalinks.Entity):
label: Literal["Document"]
filename: str = synalinks.Field(
description="The document's filename",
)
text: str = synalinks.Field(
description="The document's text",
)
class Chunk(synalinks.Entity):
label: Literal["Chunk"]
text: str = synalinks.Field(
description="The chunk's text",
)
class IsPartOf(synalinks.Relation):
subj: Chunk
label: Literal["IsPartOf"]
obj: Document
language_model = synalinks.LanguageModel(
model="ollama/mistral",
)
embedding_model = synalinks.EmbeddingModel(
model="ollama/mxbai-embed-large"
)
knowledge_base = synalinks.KnowledgeBase(
uri="neo4j://localhost:7687",
entity_models=[Country, City],
relation_models=[IsCapitalOf, IsCityOf],
embedding_model=embedding_model,
metric="cosine",
)
async def main():
inputs = synalinks.Input(data_model=Query)
x = await synalinks.EntityRetriever(
entity_models=[Chunk],
language_model=language_model,
knowledge_base=knowledge_base,
)(inputs)
outputs = await synalinks.Generator(
data_model=Answer,
language_model=language_model,
)(x)
program = synalinks.Program(
inputs=inputs,
outputs=outputs,
name="rag_program",
description="A naive RAG program",
)
if __name__ == "__main__":
asyncio.run(main())
```
Parameters:
| Name | Type | Description | Default |
| -------------------- | --------------- | -------------------------------------------------------------------------------------------------------------------------------------------------- | ------- |
| `knowledge_base` | `KnowledgeBase` | The knowledge base to use. | `None` |
| `language_model` | `LanguageModel` | The language model to use. | `None` |
| `entity_models` | `list` | The list of entities models to search for being a list of Entity data models. | `None` |
| `k` | `int` | Maximum number of similar entities to return (Defaults to 10). | `10` |
| `threshold` | `float` | Minimum similarity score for results. Entities with similarity below this threshold are excluded. Should be between 0.0 and 1.0 (Defaults to 0.5). | `0.5` |
| `prompt_template` | `str` | The default jinja2 prompt template to use (see Generator). | `None` |
| `examples` | `list` | The default list of examples, the examples are a list of tuples containing input/output JSON pairs. | `None` |
| `instructions` | `str` | The default instructions being a string containing instructions for the language model. | `None` |
| `seed_instructions` | `list` | Optional. A list of instructions to use as seed for the optimization. If not provided, use the default instructions as seed. | `None` |
| `temperature` | `float` | Optional. The temperature for the LM call. | `0.0` |
| `use_inputs_schema` | `bool` | Optional. Whether or not use the inputs schema in the prompt (Default to False) (see Generator). | `False` |
| `use_outputs_schema` | `bool` | Optional. Whether or not use the outputs schema in the prompt (Default to False) (see Generator). | `False` |
| `return_inputs` | `bool` | Optional. Whether or not to concatenate the inputs to the outputs (Default to True). | `True` |
| `return_query` | `bool` | Optional. Whether or not to concatenate the search query to the outputs (Default to True). | `True` |
| `name` | `str` | Optional. The name of the module. | `None` |
| `description` | `str` | Optional. The description of the module. | `None` |
| `trainable` | `bool` | Whether the module's variables should be trainable. | `True` |
Source code in `synalinks/src/modules/retrievers/entity_retriever.py`
````
@synalinks_export(
[
"synalinks.modules.EntityRetriever",
"synalinks.EntityRetriever",
]
)
class EntityRetriever(Module):
"""Retrieve entities from a knowledge base, based on the embedding vector.
This module is useful to implement vector-only (retrieval augmented generation) RAG
systems, for KAG (knowledge augmented generation) systems see the
`KnowledgeRetriever` module.
If you give multiple entity models to this module, the LM will select the most
suitable one to perform the search. Having multiple entity models to search
for is an easy way to enhance the performance of you RAG system by having
multiple indexes (one per entity model type).
```python
import synalinks
import asyncio
from typing import Literal
class Query(synalinks.DataModel):
query: str = synalinks.Field(
description="The user query",
)
class Answer(synalinks.DataModel):
query: str = synalinks.Field(
description="The answer to the user query",
)
class Document(synalinks.Entity):
label: Literal["Document"]
filename: str = synalinks.Field(
description="The document's filename",
)
text: str = synalinks.Field(
description="The document's text",
)
class Chunk(synalinks.Entity):
label: Literal["Chunk"]
text: str = synalinks.Field(
description="The chunk's text",
)
class IsPartOf(synalinks.Relation):
subj: Chunk
label: Literal["IsPartOf"]
obj: Document
language_model = synalinks.LanguageModel(
model="ollama/mistral",
)
embedding_model = synalinks.EmbeddingModel(
model="ollama/mxbai-embed-large"
)
knowledge_base = synalinks.KnowledgeBase(
uri="neo4j://localhost:7687",
entity_models=[Country, City],
relation_models=[IsCapitalOf, IsCityOf],
embedding_model=embedding_model,
metric="cosine",
)
async def main():
inputs = synalinks.Input(data_model=Query)
x = await synalinks.EntityRetriever(
entity_models=[Chunk],
language_model=language_model,
knowledge_base=knowledge_base,
)(inputs)
outputs = await synalinks.Generator(
data_model=Answer,
language_model=language_model,
)(x)
program = synalinks.Program(
inputs=inputs,
outputs=outputs,
name="rag_program",
description="A naive RAG program",
)
if __name__ == "__main__":
asyncio.run(main())
```
Args:
knowledge_base (KnowledgeBase): The knowledge base to use.
language_model (LanguageModel): The language model to use.
entity_models (list): The list of entities models to search for
being a list of `Entity` data models.
k (int): Maximum number of similar entities to return
(Defaults to 10).
threshold (float): Minimum similarity score for results.
Entities with similarity below this threshold are excluded.
Should be between 0.0 and 1.0 (Defaults to 0.5).
prompt_template (str): The default jinja2 prompt template
to use (see `Generator`).
examples (list): The default list of examples, the examples
are a list of tuples containing input/output JSON pairs.
instructions (str): The default instructions being a string containing
instructions for the language model.
seed_instructions (list): Optional. A list of instructions to use as seed for the
optimization. If not provided, use the default instructions as seed.
temperature (float): Optional. The temperature for the LM call.
use_inputs_schema (bool): Optional. Whether or not use the inputs schema in
the prompt (Default to False) (see `Generator`).
use_outputs_schema (bool): Optional. Whether or not use the outputs schema in
the prompt (Default to False) (see `Generator`).
return_inputs (bool): Optional. Whether or not to concatenate the inputs to
the outputs (Default to True).
return_query (bool): Optional. Whether or not to concatenate the search query to
the outputs (Default to True).
name (str): Optional. The name of the module.
description (str): Optional. The description of the module.
trainable (bool): Whether the module's variables should be trainable.
"""
def __init__(
self,
knowledge_base=None,
language_model=None,
entity_models=None,
k=10,
threshold=0.5,
prompt_template=None,
examples=None,
instructions=None,
seed_instructions=None,
temperature=0.0,
use_inputs_schema=False,
use_outputs_schema=False,
return_inputs=True,
return_query=True,
name=None,
description=None,
trainable=True,
):
super().__init__(
name=name,
description=description,
trainable=trainable,
)
self.knowledge_base = knowledge_base
self.language_model = language_model
self.k = k
self.threshold = threshold
self.prompt_template = prompt_template
self.examples = examples
if entity_models:
node_labels = [
entity_model.get_schema().get("title") for entity_model in entity_models
]
else:
node_labels = []
if not instructions:
instructions = default_entity_retriever_instructions(node_labels)
self.instructions = instructions
self.seed_instructions = seed_instructions
self.temperature = temperature
self.use_inputs_schema = use_inputs_schema
self.use_outputs_schema = use_outputs_schema
self.return_inputs = return_inputs
self.return_query = return_query
self.schema = SimilaritySearch.get_schema()
self.schema = dynamic_enum(
schema=self.schema,
prop_to_update="entity_label",
labels=node_labels,
description="The entity label to search for",
)
self.query_generator = Generator(
schema=self.schema,
language_model=self.language_model,
prompt_template=self.prompt_template,
examples=self.examples,
instructions=self.instructions,
seed_instructions=self.seed_instructions,
temperature=self.temperature,
use_inputs_schema=self.use_inputs_schema,
use_outputs_schema=self.use_outputs_schema,
return_inputs=False,
name="query_generator_" + self.name,
)
async def call(self, inputs, training=False):
if not inputs:
return None
similarity_search_query = await self.query_generator(
inputs,
training=training,
)
if self.return_inputs:
if self.return_query:
return await ops.concat(
inputs,
await ops.concat(
similarity_search_query,
await ops.similarity_search(
similarity_search_query,
knowledge_base=self.knowledge_base,
k=self.k,
threshold=self.threshold,
name="similarity_search_" + self.name,
),
name="similarity_search_with_query_and_inputs_"+self.name,
),
)
else:
return await ops.concat(
inputs,
await ops.similarity_search(
similarity_search_query,
knowledge_base=self.knowledge_base,
k=self.k,
threshold=self.threshold,
name="similarity_search_" + self.name,
),
name="similarity_search_with_inputs_" + self.name,
)
else:
if self.return_query:
return await ops.concat(
similarity_search_query,
await ops.similarity_search(
similarity_search_query,
knowledge_base=self.knowledge_base,
k=self.k,
threshold=self.threshold,
name="similarity_search_" + self.name,
),
name="similarity_search_with_query_" + self.name,
)
else:
return await ops.similarity_search(
similarity_search_query,
knowledge_base=self.knowledge_base,
k=self.k,
threshold=self.threshold,
name="similarity_search_" + self.name,
)
def get_config(self):
config = {
"k": self.question,
"threshold": self.labels,
"prompt_template": self.prompt_template,
"examples": self.examples,
"instructions": self.instructions,
"seed_instructions": self.seed_instructions,
"temperature": self.temperature,
"use_inputs_schema": self.use_inputs_schema,
"use_outputs_schema": self.use_outputs_schema,
"return_inputs": self.return_inputs,
"return_query": self.return_query,
"name": self.name,
"description": self.description,
"trainable": self.trainable,
}
knowledge_base_config = {
"knowledge_base": serialization_lib.serialize_synalinks_object(
self.knowledge_base,
)
}
language_model_config = {
"language_model": serialization_lib.serialize_synalinks_object(
self.teacher_language_model,
)
}
entity_models_config = {
"entity_models": [
(
serialization_lib.serialize_synalinks_object(
entity_model.to_symbolic_data_model(
name="entity_model" + (f"_{i}_" if i > 0 else "_") + self.name
)
)
if not is_symbolic_data_model(entity_model)
else serialization_lib.serialize_synalinks_object(entity_model)
)
for i, entity_model in enumerate(self.entity_models)
]
}
return {
**config,
**knowledge_base_config,
**language_model_config,
**entity_models_config,
}
@classmethod
def from_config(cls, config):
knowledge_base = serialization_lib.deserialize_synalinks_object(
config.pop("knowledge_base"),
)
language_model = serialization_lib.deserialize_synalinks_object(
config.pop("language_model"),
)
entity_models_config = config.pop("entity_models")
entity_models = [
serialization_lib.deserialize_synalinks_object(entity_model)
for entity_model in entity_models_config
]
return cls(
knowledge_base=knowledge_base,
entity_models=entity_models,
language_model=teacher_language_model,
**config,
)
````
## `default_entity_retriever_instructions(entity_labels)`
The default instructions for the entity retriever
Source code in `synalinks/src/modules/retrievers/entity_retriever.py`
```
def default_entity_retriever_instructions(entity_labels):
"""The default instructions for the entity retriever"""
return f"""
Your task is to retrive entities among the following entity labels: {entity_labels}.
First, decide step-by-step which entity label you need, then describe the entities you are looking for.
The `similarity search` field should be a short description of the entities to match.
""".strip()
```
## `TripletRetriever`
Bases: `Module`
Retrieve triplets using a hybrid neuro-symbolic approach.
Unlike the Text2Cypher approach, this retriever is 100% guaranteed to generate a valid Cypher query **every time**.
It doesn't need to have the graph schema in the prompt, thus helping the language models by avoiding prompt confusion, because the nodes and relation labels are enforced dynamically using **constrained structured output** (similar to the `Decision` module).
It works by using the language model to infer the subject, object and relation labels to search for, along with a similarity search field for the object and subject triplets.
These parameters are then used to *programmatically create a valid Cypher query*.
This approach not only ensures the syntactical correctness of the Cypher query but also protects the graph database from Cypher injections that could arise from an adversarial prompt injection.
```
import synalinks
import asyncio
from typing import Literal
class Query(synalinks.DataModel):
query: str = synalinks.Field(
description="The user query",
)
class Answer(synalinks.DataModel):
query: str = synalinks.Field(
description="The answer to the user query",
)
class Country(synalinks.Entity):
label: Literal["Country"]
name: str = synalinks.Field(
description="The country's name",
)
class City(synalinks.Entity):
label: Literal["City"]
name: str = synalinks.Field(
description="The city's name",
)
class IsCapitalOf(synalinks.Relation):
subj: City
label: Literal["IsCapitalOf"]
obj: Country
class IsCityOf(synalinks.Relation):
subj: City
label: Literal["IsCityOf"]
obj: Country
language_model = synalinks.LanguageModel(
model="ollama/mistral",
)
embedding_model = synalinks.EmbeddingModel(
model="ollama/mxbai-embed-large"
)
knowledge_base = synalinks.KnowledgeBase(
uri="neo4j://localhost:7687",
entity_models=[Country, City],
relation_models=[IsCapitalOf, IsCityOf],
embedding_model=embedding_model,
metric="cosine",
)
async def main():
inputs = synalinks.Input(data_model=Query)
x = await synalinks.KnowledgeRetriever(
entity_models=[Country, City],
relation_models=[IsCityOf, IsCapitalOf]
language_model=language_model,
knowledge_base=knowledge_base,
)(inputs)
outputs = await synalinks.Generator(
data_model=Answer,
language_model=language_model,
)(x)
program = synalinks.Program(
inputs=inputs,
outputs=outputs,
name="kag_program",
description="A simple KAG program",
)
if __name__ == "__main__":
asyncio.run(main())
```
Parameters:
| Name | Type | Description | Default |
| -------------------- | --------------- | -------------------------------------------------------------------------------------------------------------------------------------------------- | ------- |
| `knowledge_base` | `KnowledgeBase` | The knowledge base to use. | `None` |
| `language_model` | `LanguageModel` | The language model to use. | `None` |
| `entity_models` | `list` | The list of entities models to search for being a list of Entity data models. | `None` |
| `relation_models` | `list` | The list of relations models to seach for. being a list of Relation data models. | `None` |
| `k` | `int` | Maximum number of similar entities to return (Defaults to 10). | `10` |
| `threshold` | `float` | Minimum similarity score for results. Entities with similarity below this threshold are excluded. Should be between 0.0 and 1.0 (Defaults to 0.5). | `0.5` |
| `prompt_template` | `str` | The default jinja2 prompt template to use (see Generator). | `None` |
| `examples` | `list` | The default list of examples, the examples are a list of tuples containing input/output JSON pairs. | `None` |
| `instructions` | `str` | The default instructions being a string containing instructions for the language model. | `None` |
| `seed_instructions` | `list` | Optional. A list of instructions to use as seed for the optimization. If not provided, use the default instructions as seed. | `None` |
| `temperature` | `float` | Optional. The temperature for the LM call. | `0.0` |
| `use_inputs_schema` | `bool` | Optional. Whether or not use the inputs schema in the prompt (Default to False) (see Generator). | `False` |
| `use_outputs_schema` | `bool` | Optional. Whether or not use the outputs schema in the prompt (Default to False) (see Generator). | `False` |
| `return_inputs` | `bool` | Optional. Whether or not to concatenate the inputs to the outputs (Default to True). | `True` |
| `return_query` | `bool` | Optional. Whether or not to concatenate the search query to the outputs (Default to True). | `True` |
| `name` | `str` | Optional. The name of the module. | `None` |
| `description` | `str` | Optional. The description of the module. | `None` |
| `trainable` | `bool` | Whether the module's variables should be trainable. | `True` |
Source code in `synalinks/src/modules/retrievers/triplet_retriever.py`
````
@synalinks_export(
[
"synalinks.modules.TripletRetriever",
"synalinks.TripletRetriever",
]
)
class TripletRetriever(Module):
"""Retrieve triplets using a hybrid neuro-symbolic approach.
Unlike the Text2Cypher approach, this retriever is 100%
guaranteed to generate a valid Cypher query **every time**.
It doesn't need to have the graph schema in the prompt, thus
helping the language models by avoiding prompt confusion, because
the nodes and relation labels are enforced dynamically
using **constrained structured output** (similar to the `Decision` module).
It works by using the language model to infer the subject, object and
relation labels to search for, along with a similarity search
field for the object and subject triplets.
These parameters are then used to *programmatically create a valid Cypher query*.
This approach not only ensures the syntactical correctness of the
Cypher query but also protects the graph database from Cypher
injections that could arise from an adversarial prompt injection.
```python
import synalinks
import asyncio
from typing import Literal
class Query(synalinks.DataModel):
query: str = synalinks.Field(
description="The user query",
)
class Answer(synalinks.DataModel):
query: str = synalinks.Field(
description="The answer to the user query",
)
class Country(synalinks.Entity):
label: Literal["Country"]
name: str = synalinks.Field(
description="The country's name",
)
class City(synalinks.Entity):
label: Literal["City"]
name: str = synalinks.Field(
description="The city's name",
)
class IsCapitalOf(synalinks.Relation):
subj: City
label: Literal["IsCapitalOf"]
obj: Country
class IsCityOf(synalinks.Relation):
subj: City
label: Literal["IsCityOf"]
obj: Country
language_model = synalinks.LanguageModel(
model="ollama/mistral",
)
embedding_model = synalinks.EmbeddingModel(
model="ollama/mxbai-embed-large"
)
knowledge_base = synalinks.KnowledgeBase(
uri="neo4j://localhost:7687",
entity_models=[Country, City],
relation_models=[IsCapitalOf, IsCityOf],
embedding_model=embedding_model,
metric="cosine",
)
async def main():
inputs = synalinks.Input(data_model=Query)
x = await synalinks.KnowledgeRetriever(
entity_models=[Country, City],
relation_models=[IsCityOf, IsCapitalOf]
language_model=language_model,
knowledge_base=knowledge_base,
)(inputs)
outputs = await synalinks.Generator(
data_model=Answer,
language_model=language_model,
)(x)
program = synalinks.Program(
inputs=inputs,
outputs=outputs,
name="kag_program",
description="A simple KAG program",
)
if __name__ == "__main__":
asyncio.run(main())
```
Args:
knowledge_base (KnowledgeBase): The knowledge base to use.
language_model (LanguageModel): The language model to use.
entity_models (list): The list of entities models to search for
being a list of `Entity` data models.
relation_models (list): The list of relations models to seach for.
being a list of `Relation` data models.
k (int): Maximum number of similar entities to return
(Defaults to 10).
threshold (float): Minimum similarity score for results.
Entities with similarity below this threshold are excluded.
Should be between 0.0 and 1.0 (Defaults to 0.5).
prompt_template (str): The default jinja2 prompt template
to use (see `Generator`).
examples (list): The default list of examples, the examples
are a list of tuples containing input/output JSON pairs.
instructions (str): The default instructions being a string containing
instructions for the language model.
seed_instructions (list): Optional. A list of instructions to use as seed for the
optimization. If not provided, use the default instructions as seed.
temperature (float): Optional. The temperature for the LM call.
use_inputs_schema (bool): Optional. Whether or not use the inputs schema in
the prompt (Default to False) (see `Generator`).
use_outputs_schema (bool): Optional. Whether or not use the outputs schema in
the prompt (Default to False) (see `Generator`).
return_inputs (bool): Optional. Whether or not to concatenate the inputs to
the outputs (Default to True).
return_query (bool): Optional. Whether or not to concatenate the search query to
the outputs (Default to True).
name (str): Optional. The name of the module.
description (str): Optional. The description of the module.
trainable (bool): Whether the module's variables should be trainable.
"""
def __init__(
self,
knowledge_base=None,
language_model=None,
entity_models=None,
relation_models=None,
k=10,
threshold=0.5,
prompt_template=None,
examples=None,
instructions=None,
seed_instructions=None,
temperature=0.0,
use_inputs_schema=False,
use_outputs_schema=False,
return_inputs=True,
return_query=True,
name=None,
description=None,
trainable=True,
):
super().__init__(
name=name,
description=description,
trainable=trainable,
)
self.knowledge_base = knowledge_base
self.language_model = language_model
self.k = k
self.threshold = threshold
self.prompt_template = prompt_template
self.examples = examples
if entity_models:
node_labels = [
entity_model.get_schema().get("title") for entity_model in entity_models
]
else:
node_labels = []
if relation_models:
relation_labels = [
relation_model.get_schema().get("title")
for relation_model in relation_models
]
else:
relation_labels = []
if not instructions:
instructions = default_triplet_retriever_instructions(
node_labels, relation_labels
)
self.instructions = instructions
self.seed_instructions = seed_instructions
self.temperature = temperature
self.use_inputs_schema = use_inputs_schema
self.use_outputs_schema = use_outputs_schema
self.return_inputs = return_inputs
self.return_query = return_query
self.schema = TripletSearch.get_schema()
self.schema = dynamic_enum(
schema=self.schema,
prop_to_update="subject_label",
labels=node_labels,
description="The subject label to match",
)
self.schema = dynamic_enum(
schema=self.schema,
prop_to_update="relation_label",
labels=relation_labels,
description="The relation label to match",
)
self.schema = dynamic_enum(
schema=self.schema,
prop_to_update="object_label",
labels=node_labels,
description="The object label to match",
)
self.query_generator = ChainOfThought(
schema=self.schema,
language_model=self.language_model,
prompt_template=self.prompt_template,
examples=self.examples,
instructions=self.instructions,
seed_instructions=self.seed_instructions,
temperature=self.temperature,
use_inputs_schema=self.use_inputs_schema,
use_outputs_schema=self.use_outputs_schema,
return_inputs=False,
name="query_generator_" + self.name,
)
async def call(self, inputs, training=False):
if not inputs:
return None
triplet_search_query = await self.query_generator(
inputs,
training=training,
)
if self.return_inputs:
if self.return_query:
return await ops.logical_and(
inputs,
await ops.logical_and(
triplet_search_query,
await ops.triplet_search(
triplet_search_query,
knowledge_base=self.knowledge_base,
k=self.k,
threshold=self.threshold,
name="similarity_search_" + self.name,
),
name="similarity_search_with_query_and_inputs_" + self.name,
),
)
else:
return await ops.logical_and(
inputs,
await ops.triplet_search(
triplet_search_query,
knowledge_base=self.knowledge_base,
k=self.k,
threshold=self.threshold,
name="similarity_search_" + self.name,
),
name="similarity_search_with_inputs_" + self.name,
)
else:
if self.return_query:
return await ops.logical_and(
triplet_search_query,
await ops.triplet_search(
triplet_search_query,
knowledge_base=self.knowledge_base,
k=self.k,
threshold=self.threshold,
name="similarity_search_" + self.name,
),
name="similarity_search_with_query_" + self.name,
)
else:
return await ops.triplet_search(
triplet_search_query,
knowledge_base=self.knowledge_base,
k=self.k,
threshold=self.threshold,
name="similarity_search_" + self.name,
)
def get_config(self):
config = {
"k": self.question,
"threshold": self.labels,
"prompt_template": self.prompt_template,
"examples": self.examples,
"instructions": self.instructions,
"seed_instructions": self.seed_instructions,
"temperature": self.temperature,
"use_inputs_schema": self.use_inputs_schema,
"use_outputs_schema": self.use_outputs_schema,
"return_inputs": self.return_inputs,
"return_query": self.return_query,
"name": self.name,
"description": self.description,
"trainable": self.trainable,
}
knowledge_base_config = {
"knowledge_base": serialization_lib.serialize_synalinks_object(
self.knowledge_base,
)
}
language_model_config = {
"language_model": serialization_lib.serialize_synalinks_object(
self.language_model,
)
}
entity_models_config = {
"entity_models": [
(
serialization_lib.serialize_synalinks_object(
entity_model.to_symbolic_data_model(
name="entity_model" + (f"_{i}_" if i > 0 else "_") + self.name
)
)
if not is_symbolic_data_model(entity_model)
else serialization_lib.serialize_synalinks_object(entity_model)
)
for i, entity_model in enumerate(self.entity_models)
]
}
relation_models_config = {
"relation_models": [
(
serialization_lib.serialize_synalinks_object(
relation_model.to_symbolic_data_model(
name="relation_model" + (f"_{i}_" if i > 0 else "_") + self.name
)
)
if not is_symbolic_data_model(relation_model)
else serialization_lib.serialize_synalinks_object(relation_model)
)
for i, relation_model in enumerate(self.relation_models)
]
}
return {
**config,
**knowledge_base_config,
**language_model_config,
**entity_models_config,
**relation_models_config,
}
@classmethod
def from_config(cls, config):
knowledge_base = serialization_lib.deserialize_synalinks_object(
config.pop("knowledge_base"),
)
language_model = serialization_lib.deserialize_synalinks_object(
config.pop("language_model"),
)
entity_models_config = config.pop("entity_models")
entity_models = [
serialization_lib.deserialize_synalinks_object(entity_model)
for entity_model in entity_models_config
]
relation_models_config = config.pop("relation_models")
relation_models = [
serialization_lib.deserialize_synalinks_object(relation_model)
for relation_model in relation_models_config
]
return cls(
knowledge_base=knowledge_base,
entity_models=entity_models,
relation_models=relation_models,
language_model=teacher_language_model,
**config,
)
````
## `default_triplet_retriever_instructions(entity_labels, relation_labels)`
The default instructions for the triplet retriever
Source code in `synalinks/src/modules/retrievers/triplet_retriever.py`
```
def default_triplet_retriever_instructions(entity_labels, relation_labels):
"""The default instructions for the triplet retriever"""
return f"""
Your task is to retrive triplets among the following entity labels: {entity_labels} and relation labels: {relation_labels}.
Think about the triplet you are looking for, which relation label do you need, then the subject and object label.
The similarity search parameters should be a short natural language string describing the entities to match.
Remember to replace the similarity search with `?` for the entity you are looking for.
""".strip()
```
## `PythonScript`
Bases: `Trainable`
The python code to transform a JSON object into another JSON object
Source code in `synalinks/src/modules/synthesis/python_synthesis.py`
```
class PythonScript(Trainable):
"""The python code to transform a JSON object into another JSON object"""
python_script: str = Field(
description="The python script to transform a JSON object into another object"
)
```
## `PythonSynthesis`
Bases: `Module`
A code Python code transformation on JSON data.
**Note**: This module is **NOT** completly safe (yet) for business applications. Its is only provided for reseach purposes on program synthesis. Altought the code don't evolve during inference, so it can't be prompt injected.
This module features a python code as trainable variable, allowing the optimizers to refine the code during the training loop based on iterative feedback and automatic selection of the best script.
This module works **ONLY** with advanced optimizers (**NOT** the `RandomFewShot` optimizer).
The module executes the entire Python script and expects the result to be stored in a variable named 'result' at the end of execution.
Example:
```
import synalinks
import asyncio
default_python_script = \
"""
def transform(inputs):
# TODO implement the code to transform the input grid into the output grid
return {"output_grid": inputs.get("input_grid")}
result = transform(inputs)
"""
async def main():
inputs = synalinks.Input(
data_model=synalinks.datasets.arcagi.get_input_data_model(),
)
outputs = await synalinks.PythonSynthesis(
data_model=synalinks.datasets.arcagi.get_output_data_model()
python_script=default_python_script,
default_return_value={"output_grid": [[]]},
)(inputs)
program = synalinks.Program(
inputs=inputs,
outputs=outputs,
name="python_script_synthesis",
description="A program to solve ARCAGI with python code",
)
```
If you want to explore the future of neuro-symbolic self-evolving systems, contact us. While these systems are not "hard" to code thanks to Synalinks, they requires technical knowledge and a deep understanding of multiple AI paradigm.
Parameters:
| Name | Type | Description | Default |
| ---------------------- | ----------- | ----------------------------------------------------------------------------------------------------------------------------------- | --------------- |
| `schema` | `dict` | The target JSON schema. If not provided use the data_model to infer it. | `None` |
| `data_model` | \`DataModel | SymbolicDataModel | JsonDataModel\` |
| `python_script` | `str` | The default Python script. | `None` |
| `seed_scripts` | `list` | Optional. A list of Python scripts to use as seed for the evolution. If not provided, create a seed from the default configuration. | `None` |
| `default_return_value` | `dict` | Default return value. | `None` |
| `timeout` | `int` | Maximum execution time in seconds. (Default 5 seconds). | `5` |
| `name` | `str` | Optional. The name of the module. | `None` |
| `description` | `str` | Optional. The description of the module. | `None` |
| `trainable` | `bool` | Whether the module's variables should be trainable. | `True` |
Source code in `synalinks/src/modules/synthesis/python_synthesis.py`
````
@synalinks_export(
[
"synalinks.modules.PythonSynthesis",
"synalinks.PythonSynthesis",
]
)
class PythonSynthesis(Module):
"""A code Python code transformation on JSON data.
**Note**: This module is **NOT** completly safe (yet) for business applications.
Its is only provided for reseach purposes on program synthesis.
Altought the code don't evolve during inference, so it can't be prompt injected.
This module features a python code as trainable variable, allowing the optimizers
to refine the code during the training loop based on iterative feedback and
automatic selection of the best script.
This module works **ONLY** with advanced optimizers (**NOT** the `RandomFewShot` optimizer).
The module executes the entire Python script and expects the result to be stored
in a variable named 'result' at the end of execution.
Example:
```python
import synalinks
import asyncio
default_python_script = \\
\"\"\"
def transform(inputs):
# TODO implement the code to transform the input grid into the output grid
return {"output_grid": inputs.get("input_grid")}
result = transform(inputs)
\"\"\"
async def main():
inputs = synalinks.Input(
data_model=synalinks.datasets.arcagi.get_input_data_model(),
)
outputs = await synalinks.PythonSynthesis(
data_model=synalinks.datasets.arcagi.get_output_data_model()
python_script=default_python_script,
default_return_value={"output_grid": [[]]},
)(inputs)
program = synalinks.Program(
inputs=inputs,
outputs=outputs,
name="python_script_synthesis",
description="A program to solve ARCAGI with python code",
)
```
If you want to explore the future of neuro-symbolic self-evolving systems, contact us.
While these systems are not "hard" to code thanks to Synalinks, they requires
technical knowledge and a deep understanding of multiple AI paradigm.
Args:
schema (dict): The target JSON schema.
If not provided use the `data_model` to infer it.
data_model (DataModel | SymbolicDataModel | JsonDataModel): The target data
model for structured output.
python_script (str): The default Python script.
seed_scripts (list): Optional. A list of Python scripts to use as seed for the evolution.
If not provided, create a seed from the default configuration.
default_return_value (dict): Default return value.
timeout (int): Maximum execution time in seconds. (Default 5 seconds).
name (str): Optional. The name of the module.
description (str): Optional. The description of the module.
trainable (bool): Whether the module's variables should be trainable.
"""
def __init__(
self,
schema=None,
data_model=None,
python_script=None,
seed_scripts=None,
default_return_value=None,
timeout=5,
sandbox=False,
name=None,
description=None,
trainable=True,
):
super().__init__(
name=name,
description=description,
trainable=trainable,
)
if not schema and data_model:
schema = data_model.get_schema()
self.schema = schema
if not python_script:
raise ValueError("You should provide the `python_script` argument")
self.python_script = python_script
if not default_return_value:
raise ValueError("You should provide the `default_return_value` argument")
try:
jsonschema.validate(default_return_value, self.schema)
except ValidationError as e:
raise ValueError(
f"`default_return_value` parameter does not conform to schema: {e}"
)
self.default_return_value = default_return_value
self.timeout = timeout
if not seed_scripts:
seed_scripts = []
self.seed_scripts = seed_scripts
seed_candidates = [
{"python_script": seed_script} for seed_script in self.seed_scripts
]
self.state = self.add_variable(
initializer=PythonScript(
python_script=self.python_script,
seed_candidates=seed_candidates,
).get_json(),
data_model=PythonScript,
name="state_" + self.name,
)
async def execute(self, inputs, python_script):
"""Execute the Python script with timeout using multiprocessing.
"""
result_queue = Queue()
process = Process(
target=_execute_script_in_process,
args=(python_script, inputs.get_json(), self.schema, result_queue)
)
process.start()
start_time = asyncio.get_event_loop().time()
timeout_remaining = self.timeout
while process.is_alive() and timeout_remaining > 0:
await asyncio.sleep(0.1)
elapsed = asyncio.get_event_loop().time() - start_time
timeout_remaining = self.timeout - elapsed
if process.is_alive():
process.terminate()
process.join(timeout=1)
if process.is_alive():
process.kill()
process.join()
return None, "", f"Timeout Error: Script execution exceeded {self.timeout} second(s)\n"
process.join()
if not result_queue.empty():
result, stdout, stderr = result_queue.get()
return result, stdout, stderr
else:
return None, "", "Error: Process terminated unexpectedly\n"
async def call(self, inputs, training=False):
if not inputs:
return None
python_script = self.state.get("python_script")
result, stdout, stderr = await self.execute(inputs, python_script)
if training:
predictions = self.state.get("current_predictions")
if result:
predictions.append(
{
"inputs": {
**inputs.get_json(),
},
"outputs": {
**result,
"stdout": stdout,
"stderr": stderr,
},
"reward": None,
}
)
else:
predictions.append(
{
"inputs": {
**inputs.get_json(),
},
"outputs": {
"stdout": stdout,
"stderr": stderr,
},
"reward": None,
}
)
if result:
return JsonDataModel(
json={
**result,
"stdout": stdout,
"stderr": stderr,
},
schema=self.schema,
name=self.name,
)
else:
return JsonDataModel(
json={
**self.default_return_value,
"stdout": stdout,
"stderr": stderr,
},
schema=self.schema,
name=self.name,
)
async def compute_output_spec(self, inputs, training=False):
return await ops.concat(
SymbolicDataModel(schema=self.schema),
PythonConsoleLog,
name=self.name,
)
def get_config(self):
config = {
"schema": self.schema,
"python_script": self.python_script,
"seed_scripts": self.seed_scripts,
"default_return_value": self.default_return_value,
"name": self.name,
"description": self.description,
"trainable": self.trainable,
}
return config
@classmethod
def from_config(cls, config):
return cls(**config)
````
### `execute(inputs, python_script)`
Execute the Python script with timeout using multiprocessing.
Source code in `synalinks/src/modules/synthesis/python_synthesis.py`
```
async def execute(self, inputs, python_script):
"""Execute the Python script with timeout using multiprocessing.
"""
result_queue = Queue()
process = Process(
target=_execute_script_in_process,
args=(python_script, inputs.get_json(), self.schema, result_queue)
)
process.start()
start_time = asyncio.get_event_loop().time()
timeout_remaining = self.timeout
while process.is_alive() and timeout_remaining > 0:
await asyncio.sleep(0.1)
elapsed = asyncio.get_event_loop().time() - start_time
timeout_remaining = self.timeout - elapsed
if process.is_alive():
process.terminate()
process.join(timeout=1)
if process.is_alive():
process.kill()
process.join()
return None, "", f"Timeout Error: Script execution exceeded {self.timeout} second(s)\n"
process.join()
if not result_queue.empty():
result, stdout, stderr = result_queue.get()
return result, stdout, stderr
else:
return None, "", "Error: Process terminated unexpectedly\n"
```
## `TimeoutException`
Bases: `Exception`
Exception raised when script execution times out
Source code in `synalinks/src/modules/synthesis/python_synthesis.py`
```
class TimeoutException(Exception):
"""Exception raised when script execution times out"""
pass
```
## `SequentialPlan`
Bases: `Trainable`
The sequential step by step plan to achieve the task
Source code in `synalinks/src/modules/synthesis/sequential_plan_synthesis.py`
```
class SequentialPlan(Trainable):
"""The sequential step by step plan to achieve the task"""
steps: List[str] = Field(
description="The list of steps",
)
```
## `SequentialPlanSynthesis`
Bases: `Module`
A module that executes a sequential plan of steps.
This module features a sequential plan as a trainable variable, allowing optimizers to refine the plan during the training loop based on iterative feedback.
Basically learning to plan based on iterative feedback and automatic selection of the best plan.
The module executes each step in the plan sequentially, passing the output of each step as input to the next step. The runner is responsible for executing each individual step. The most common runners are usually a `FunctionCallingAgent`, `ChainOfThought` or `Generator` module, but you can use any Module or Program.
This module start by defaut without any plan, so it is equivalent to a single runner call.
This module works **ONLY** with advanced optimizers (**NOT** the `RandomFewShot` optimizer).
**Note**: The inputs are forwarded to the runner each time by concatenating the inputs with the previous steps outputs. So **ensure that the runner doesn't returns the inputs**, use `return_inputs=False` or `return_inputs_with_trajectory=False` when configuring your runner.
Example:
```
import synalinks
import asyncio
class Query(synalinks.DataModel):
query: str = synalinks.Field(
description="The user query",
)
class FinalReport(synalinks.DataModel):
report: str = synalinks.Field(
description="The final report",
)
class TaskSummary(synalinks.DataModel):
summary: str = synalinks.Field(
description="The summary of the executed task",
)
async def main():
tools = # ... tools definition (see `FunctionCallingAgent`)
inputs = synalinks.Input(data_model=Query)
outputs = await synalinks.SequentialPlanSynthesis(
data_model=FinalReport,
language_model=language_model,
runner=synalinks.FunctionCallingAgent(
data_model=TaskSummary,
language_model=language_model,
tools=tools,
return_inputs_with_trajectory=False,
),
)(inputs)
program = synalinks.Program(
inputs=inputs,
outputs=outputs,
name="planner_agent",
description="An agent that learn a step by step plan to achieve a task",
)
```
Parameters:
| Name | Type | Description | Default |
| ---------------- | --------------- | -------------------------------------------------------------------------------------------------------------- | --------------------------------------------- |
| `schema` | `dict` | The target JSON schema. If not provided use the data_model to infer it. | `None` |
| `data_model` | \`DataModel | SymbolicDataModel | JsonDataModel\` |
| `language_model` | `LanguageModel` | The language model to use. | `None` |
| `steps` | `list` | Optional. The default list of steps being a list of strings. | `None` |
| `seed_steps` | `list` | Optional. A list of steps to use as seed for the optimization. If not provided, use the default steps as seed. | `None` |
| `runner` | \`Module | Program\` | Required. The runner that executes each step. |
| `return_inputs` | `bool` | Optional. Whether or not to concatenate the inputs to the outputs (Default to False). | `True` |
| `name` | `str` | Optional. The name of the module. | `None` |
| `description` | `str` | Optional. The description of the module. | `None` |
| `trainable` | `bool` | Whether the module's variables should be trainable. | `True` |
Source code in `synalinks/src/modules/synthesis/sequential_plan_synthesis.py`
````
class SequentialPlanSynthesis(Module):
"""A module that executes a sequential plan of steps.
This module features a sequential plan as a trainable variable, allowing optimizers
to refine the plan during the training loop based on iterative feedback.
Basically learning to plan based on iterative feedback and automatic selection of the best plan.
The module executes each step in the plan sequentially, passing the output of each
step as input to the next step. The runner is responsible for executing
each individual step. The most common runners are usually a `FunctionCallingAgent`,
`ChainOfThought` or `Generator` module, but you can use any Module or Program.
This module start by defaut without any plan, so it is equivalent to a single runner call.
This module works **ONLY** with advanced optimizers (**NOT** the `RandomFewShot` optimizer).
**Note**: The inputs are forwarded to the runner each time by concatenating the inputs with
the previous steps outputs. So **ensure that the runner doesn't returns the inputs**, use
`return_inputs=False` or `return_inputs_with_trajectory=False` when configuring your runner.
Example:
```python
import synalinks
import asyncio
class Query(synalinks.DataModel):
query: str = synalinks.Field(
description="The user query",
)
class FinalReport(synalinks.DataModel):
report: str = synalinks.Field(
description="The final report",
)
class TaskSummary(synalinks.DataModel):
summary: str = synalinks.Field(
description="The summary of the executed task",
)
async def main():
tools = # ... tools definition (see `FunctionCallingAgent`)
inputs = synalinks.Input(data_model=Query)
outputs = await synalinks.SequentialPlanSynthesis(
data_model=FinalReport,
language_model=language_model,
runner=synalinks.FunctionCallingAgent(
data_model=TaskSummary,
language_model=language_model,
tools=tools,
return_inputs_with_trajectory=False,
),
)(inputs)
program = synalinks.Program(
inputs=inputs,
outputs=outputs,
name="planner_agent",
description="An agent that learn a step by step plan to achieve a task",
)
```
Args:
schema (dict): The target JSON schema.
If not provided use the `data_model` to infer it.
data_model (DataModel | SymbolicDataModel | JsonDataModel): The target data
model for structured output.
language_model (LanguageModel): The language model to use.
steps (list): Optional. The default list of steps being a list of strings.
seed_steps (list): Optional. A list of steps to use as seed for the
optimization. If not provided, use the default steps as seed.
runner (Module | Program): Required. The runner that executes each step.
return_inputs (bool): Optional. Whether or not to concatenate the inputs to
the outputs (Default to False).
name (str): Optional. The name of the module.
description (str): Optional. The description of the module.
trainable (bool): Whether the module's variables should be trainable.
"""
def __init__(
self,
schema=None,
data_model=None,
language_model=None,
steps=None,
seed_steps=None,
runner=None,
return_inputs=True,
name=None,
description=None,
trainable=True,
):
super().__init__(
name=name,
description=description,
trainable=trainable,
)
if not schema and data_model:
schema = data_model.get_schema()
self.schema = schema
if not steps:
steps = []
self.steps = steps
if not seed_steps:
seed_steps = [[]]
self.seed_steps = seed_steps
if not runner:
raise ValueError("The `runner` parameter is required.")
if not isinstance(runner, Module):
raise ValueError("The `runner` parameter should be a `Module` or `Program`.")
self.language_model = language_model
self.runner = runner
self.return_inputs = return_inputs
self.state = self.add_variable(
initializer=SequentialPlan(
steps=self.steps,
seed_candidates=self.seed_steps,
).get_json(),
data_model=SequentialPlan,
name="state"+self.name,
)
self.final_generator = ChainOfThought(
schema=self.schema,
language_model=self.language_model,
return_inputs=self.return_inputs,
name="final_generator_"+self.name,
)
async def call(self, inputs, training=False):
if not inputs:
return None
steps = self.state.get("steps")
previous_steps = None
if steps:
for i, step in enumerate(steps):
step_result = await self.runner(inputs, training=training)
if not previous_steps:
previous_steps = step_result
else:
previous_steps = await ops.concat(
previous_steps,
step_result,
name=+f"step_{i}_with_inputs"+self.name,
)
inputs = await ops.concat(
inputs,
await ops.concat(
previous_steps,
Step(step=step),
name=f"step_{i}_"+self.name,
),
name=f"step_{i}_with_inputs_"+self.name,
)
else:
result = await self.runner(inputs, training=training)
inputs = await ops.concat(
inputs,
result,
name="with_inputs_"+self.name,
)
return await self.final_generator(inputs, training=training)
async def compute_output_spec(self, inputs, training=False):
_ = await self.runner(inputs)
return await self.final_generator(inputs)
def get_config(self):
config = {
"schema": self.schema,
"steps": self.steps,
"seed_steps": self.seed_steps,
"return_inputs": self.return_inputs,
"name": self.name,
"description": self.description,
"trainable": self.trainable,
}
language_model_config = {
"language_model": serialization_lib.serialize_synalinks_object(
self.language_model,
)
}
runner_config = {
"runner": serialization_lib.serialize_synalinks_object(
self.runner,
)
}
return {
**config,
**language_model_config,
**runner_config,
}
@classmethod
def from_config(cls, config):
language_model = serialization_lib.deserialize_synalinks_object(
config.pop("language_model"),
)
runner = serialization_lib.deserialize_synalinks_object(
config.pop("runner"),
)
return cls(
language_model=language_model,
runner=runner,
**config,
)
````
## `Step`
Bases: `DataModel`
The individual step to execute
Source code in `synalinks/src/modules/synthesis/sequential_plan_synthesis.py`
```
class Step(DataModel):
"""The individual step to execute"""
step: str = Field(
description="The step to execute",
)
```
## `ChainOfThought`
Bases: `Module`
Useful to answer in a step by step manner.
This component concatenate thinking fields to your data model/schema and generate a prediction allowing the LM to think step by step before answering.
The parameter K specify the number of thinking fields to add (Default to 1).
Example:
```
import synalink
import asyncio
class Query(synalinks.DataModel):
query: str = synalinks.Field(
description="The user query",
)
class Answer(synalinks.DataModel):
answer: str = synalinks.Field(
description="The correct answer",
)
async def main():
language_model = synalinks.LanguageModel(
model="ollama/mistral",
)
x0 = synalinks.Input(data_model=Query)
x1 = await synalinks.ChainOfThought(
data_model=Answer,
language_model=language_model,
k=3,
)(x0)
program = synalinks.Program(
inputs=x0,
outputs=x1,
name="answer_with_chain_of_thought",
description="Useful to answer step by step",
)
if __name__ == "__main__":
asyncio.run(main())
```
References
- [Chain-of-Thought Prompting Elicits Reasoning in Large Language Models](https://arxiv.org/abs/2201.11903)
Parameters:
| Name | Type | Description | Default |
| -------------------- | --------------- | ---------------------------------------------------------------------------------------------------------------------------- | --------------- |
| `schema` | `dict` | The target JSON schema. If not provided use the data_model to infer it. | `None` |
| `data_model` | \`DataModel | SymbolicDataModel | JsonDataModel\` |
| `language_model` | `LanguageModel` | The language model to use. | `None` |
| `prompt_template` | `str` | The jinja2 prompt template (see Generator). | `None` |
| `examples` | `list` | The default list of examples, the examples are a list of tuples containing input/output JSON pairs. | `None` |
| `instructions` | `str` | The default instructions being a string containing instructions for the language model. | `None` |
| `seed_instructions` | `list` | Optional. A list of instructions to use as seed for the optimization. If not provided, use the default instructions as seed. | `None` |
| `temperature` | `float` | Optional. The temperature for the LM call. | `0.0` |
| `use_inputs_schema` | `bool` | Optional. Whether or not use the inputs schema in the prompt (Default to False) (see Generator). | `False` |
| `use_outputs_schema` | `bool` | Optional. Whether or not use the outputs schema in the prompt (Default to False) (see Generator). | `False` |
| `k` | `int` | The number of thinking fields to add. | `1` |
| `return_inputs` | `bool` | Optional. Whether or not to concatenate the inputs to the outputs (Default to False) (see Generator). | `False` |
| `name` | `str` | Optional. The name of the module. | `None` |
| `description` | `str` | Optional. The description of the module. | `None` |
| `trainable` | `bool` | Whether the module's variables should be trainable. | `True` |
Source code in `synalinks/src/modules/ttc/chain_of_thought.py`
````
@synalinks_export(
[
"synalinks.modules.ChainOfThought",
"synalinks.ChainOfThought",
]
)
class ChainOfThought(Module):
"""Useful to answer in a step by step manner.
This component concatenate thinking fields to your data model/schema and generate
a prediction allowing the LM to think step by step before answering.
The parameter K specify the number of thinking fields to add (Default to 1).
Example:
```python
import synalink
import asyncio
class Query(synalinks.DataModel):
query: str = synalinks.Field(
description="The user query",
)
class Answer(synalinks.DataModel):
answer: str = synalinks.Field(
description="The correct answer",
)
async def main():
language_model = synalinks.LanguageModel(
model="ollama/mistral",
)
x0 = synalinks.Input(data_model=Query)
x1 = await synalinks.ChainOfThought(
data_model=Answer,
language_model=language_model,
k=3,
)(x0)
program = synalinks.Program(
inputs=x0,
outputs=x1,
name="answer_with_chain_of_thought",
description="Useful to answer step by step",
)
if __name__ == "__main__":
asyncio.run(main())
```
References:
- [Chain-of-Thought Prompting Elicits Reasoning in Large Language Models](https://arxiv.org/abs/2201.11903)
Args:
schema (dict): The target JSON schema.
If not provided use the `data_model` to infer it.
data_model (DataModel | SymbolicDataModel | JsonDataModel): The target data model.
language_model (LanguageModel): The language model to use.
prompt_template (str): The jinja2 prompt template (see `Generator`).
examples (list): The default list of examples, the examples
are a list of tuples containing input/output JSON pairs.
instructions (str): The default instructions being a string containing
instructions for the language model.
seed_instructions (list): Optional. A list of instructions to use as seed for the
optimization. If not provided, use the default instructions as seed.
temperature (float): Optional. The temperature for the LM call.
use_inputs_schema (bool): Optional. Whether or not use the inputs schema in
the prompt (Default to False) (see `Generator`).
use_outputs_schema (bool): Optional. Whether or not use the outputs schema in
the prompt (Default to False) (see `Generator`).
k (int): The number of thinking fields to add.
return_inputs (bool): Optional. Whether or not to concatenate the inputs to
the outputs (Default to False) (see `Generator`).
name (str): Optional. The name of the module.
description (str): Optional. The description of the module.
trainable (bool): Whether the module's variables should be trainable.
"""
def __init__(
self,
schema=None,
data_model=None,
language_model=None,
prompt_template=None,
examples=None,
instructions=None,
seed_instructions=None,
temperature=0.0,
use_inputs_schema=False,
use_outputs_schema=False,
k=1,
return_inputs=False,
name=None,
description=None,
trainable=True,
):
super().__init__(
name=name,
description=description,
trainable=trainable,
)
if not schema and data_model:
schema = data_model.get_schema()
self.schema = schema
self.language_model = language_model
self.prompt_template = prompt_template
self.examples = examples
self.instructions = instructions
self.seed_instructions = seed_instructions
self.temperature = temperature
self.use_inputs_schema = use_inputs_schema
self.use_outputs_schema = use_outputs_schema
self.return_inputs = return_inputs
self.k = k
thinking_data_model = Thinking
if k > 1:
for _ in range(k - 1):
thinking_data_model += Thinking
final_data_model = thinking_data_model + SymbolicDataModel(schema=self.schema)
self.generator = Generator(
data_model=final_data_model,
language_model=self.language_model,
prompt_template=self.prompt_template,
examples=self.examples,
instructions=self.instructions,
seed_instructions=self.seed_instructions,
temperature=self.temperature,
use_inputs_schema=self.use_inputs_schema,
use_outputs_schema=self.use_outputs_schema,
return_inputs=self.return_inputs,
name="generator_"+self.name,
)
async def call(self, inputs, training=False):
return await self.generator(inputs, training=training)
def get_config(self):
config = {
"schema": self.schema,
"prompt_template": self.prompt_template,
"examples": self.examples,
"instructions": self.instructions,
"seed_instructions": self.seed_instructions,
"temperature": self.temperature,
"use_inputs_schema": self.use_inputs_schema,
"use_outputs_schema": self.use_outputs_schema,
"return_inputs": self.return_inputs,
"k": self.k,
"name": self.name,
"description": self.description,
"trainable": self.trainable,
}
language_model_config = {
"language_model": serialization_lib.serialize_synalinks_object(
self.language_model,
)
}
return {
**config,
**language_model_config,
}
@classmethod
def from_config(cls, config):
language_model = serialization_lib.deserialize_synalinks_object(
config.pop("language_model"),
)
return cls(
language_model=language_model,
**config,
)
````
## `SelfCritique`
Bases: `Module`
Useful to critique the given inputs.
This component critique the inputs given and eventually generate an intermediate reward between 0.0 and 1.0.
You can enable or disable the intermediate reward computation by using the `return_reward` flag (default to True).
To have more accurate results, ensure that the inputs are provided along with the output to evaluate using `return_inputs` in your modules.
Example:
```
import synalink
import asyncio
class Query(synalinks.DataModel):
query: str = synalinks.Field(
description="The user query",
)
class Answer(synalinks.DataModel):
answer: str = synalinks.Field(
description="The correct answer",
)
async def main():
language_model = synalinks.LanguageModel(
model="ollama/mistral",
)
x0 = synalinks.Input(data_model=Query)
x1 = await synalinks.ChainOfThought(
data_model=Answer,
language_model=language_model,
return_inputs=True,
)(x0)
x2 = await synalinks.SelfCritique(
language_model=language_model,
)(x1)
program = synalinks.Program(
inputs=x0,
outputs=x2,
name="answer_with_cot_and_self_critique",
description="Useful to answer accurately",
)
if __name__ == "__main__":
asyncio.run(main())
```
Parameters:
| Name | Type | Description | Default |
| -------------------- | --------------- | ---------------------------------------------------------------------------------------------------------------------------- | ------- |
| `language_model` | `LanguageModel` | The language model to use. | `None` |
| `prompt_template` | `str` | The jinja2 prompt template (see Generator). | `None` |
| `examples` | `list` | The default list of examples, the examples are a list of tuples containing input/output JSON pairs. | `None` |
| `instructions` | `str` | The default instructions being a string containing instructions for the language model. | `None` |
| `seed_instructions` | `list` | Optional. A list of instructions to use as seed for the optimization. If not provided, use the default instructions as seed. | `None` |
| `temperature` | `float` | Optional. The temperature for the LM call. | `0.0` |
| `use_inputs_schema` | `bool` | Optional. Whether or not use the inputs schema in the prompt (Default to False) (see Generator). | `False` |
| `use_outputs_schema` | `bool` | Optional. Whether or not use the outputs schema in the prompt (Default to False) (see Generator). | `False` |
| `return_reward` | `bool` | Optional. Whether or not to compute an intermediate reward. | `True` |
| `return_inputs` | `bool` | Optional. Whether or not to concatenate the inputs to the outputs (Default to True) (see Generator). | `True` |
| `name` | `str` | Optional. The name of the module. | `None` |
| `description` | `str` | Optional. The description of the module. | `None` |
| `trainable` | `bool` | Whether the module's variables should be trainable. | `True` |
Source code in `synalinks/src/modules/ttc/self_critique.py`
````
@synalinks_export(
[
"synalinks.modules.SelfCritique",
"synalinks.SelfCritique",
]
)
class SelfCritique(Module):
"""Useful to critique the given inputs.
This component critique the inputs given and eventually generate
an intermediate reward between 0.0 and 1.0.
You can enable or disable the intermediate reward computation by
using the `return_reward` flag (default to True).
To have more accurate results, ensure that the inputs are provided along
with the output to evaluate using `return_inputs` in your modules.
Example:
```python
import synalink
import asyncio
class Query(synalinks.DataModel):
query: str = synalinks.Field(
description="The user query",
)
class Answer(synalinks.DataModel):
answer: str = synalinks.Field(
description="The correct answer",
)
async def main():
language_model = synalinks.LanguageModel(
model="ollama/mistral",
)
x0 = synalinks.Input(data_model=Query)
x1 = await synalinks.ChainOfThought(
data_model=Answer,
language_model=language_model,
return_inputs=True,
)(x0)
x2 = await synalinks.SelfCritique(
language_model=language_model,
)(x1)
program = synalinks.Program(
inputs=x0,
outputs=x2,
name="answer_with_cot_and_self_critique",
description="Useful to answer accurately",
)
if __name__ == "__main__":
asyncio.run(main())
```
Args:
language_model (LanguageModel): The language model to use.
prompt_template (str): The jinja2 prompt template (see `Generator`).
examples (list): The default list of examples, the examples
are a list of tuples containing input/output JSON pairs.
instructions (str): The default instructions being a string containing
instructions for the language model.
seed_instructions (list): Optional. A list of instructions to use as seed for the
optimization. If not provided, use the default instructions as seed.
temperature (float): Optional. The temperature for the LM call.
use_inputs_schema (bool): Optional. Whether or not use the inputs schema in
the prompt (Default to False) (see `Generator`).
use_outputs_schema (bool): Optional. Whether or not use the outputs schema in
the prompt (Default to False) (see `Generator`).
return_reward (bool): Optional. Whether or not to compute an intermediate reward.
return_inputs (bool): Optional. Whether or not to concatenate the inputs to
the outputs (Default to True) (see `Generator`).
name (str): Optional. The name of the module.
description (str): Optional. The description of the module.
trainable (bool): Whether the module's variables should be trainable.
"""
def __init__(
self,
language_model=None,
prompt_template=None,
examples=None,
instructions=None,
seed_instructions=None,
temperature=0.0,
use_inputs_schema=False,
use_outputs_schema=False,
return_reward=True,
return_inputs=True,
name=None,
description=None,
trainable=True,
):
super().__init__(
name=name,
description=description,
trainable=trainable,
)
self.language_model = language_model
self.prompt_template = prompt_template
self.examples = examples
self.instructions = instructions
self.seed_instructions = seed_instructions
self.temperature = temperature
self.use_inputs_schema = use_inputs_schema
self.use_outputs_schema = use_outputs_schema
self.return_reward = return_reward
self.return_inputs = return_inputs
if self.return_reward:
schema = CritiqueWithReward.get_schema()
else:
schema = Critique.get_schema()
self.generator = Generator(
schema=schema,
language_model=self.language_model,
prompt_template=self.prompt_template,
examples=self.examples,
instructions=self.instructions,
seed_instructions=self.seed_instructions,
temperature=self.temperature,
use_inputs_schema=self.use_inputs_schema,
use_outputs_schema=self.use_outputs_schema,
return_inputs=self.return_inputs,
name="generator_" + self.name,
)
async def call(self, inputs, training=False):
return await self.generator(inputs, training=training)
def get_config(self):
config = {
"prompt_template": self.prompt_template,
"examples": self.examples,
"instructions": self.instructions,
"seed_instructions": self.seed_instructions,
"temperature": self.temperature,
"use_inputs_schema": self.use_inputs_schema,
"use_outputs_schema": self.use_outputs_schema,
"return_reward": self.return_reward,
"return_inputs": self.return_inputs,
"name": self.name,
"description": self.description,
"trainable": self.trainable,
}
language_model_config = {
"language_model": serialization_lib.serialize_synalinks_object(
self.language_model,
)
}
return {
**config,
**language_model_config,
}
@classmethod
def from_config(cls, config):
language_model = serialization_lib.deserialize_synalinks_object(
config.pop("language_model"),
)
return cls(
language_model=language_model,
**config,
)
````
## `CosineSimilarity`
Bases: `RewardFunctionWrapper`
Computes the cosine similarity between `y_true` and `y_pred`.
Formula:
```
reward = (sum(l2_norm(y_true) * l2_norm(y_pred))+1) / 2
```
The formula is similar to the classic cosine similarity used in deep learning, but scaled to [0.0, 1.0] and adjusted to have a reward that tend towards 1.0 if the two objects are similar (and 0.0 otherwise).
Example:
```
program.compile(
reward=synalinks.rewards.CosineSimilarity(
embedding_model=embedding_model
)
optimizer=synalinks.optimizers.RandomFewShot(),
)
```
Parameters:
| Name | Type | Description | Default |
| ----------------- | ---------------- | --------------------------------------------------------------------------------------- | --------------------- |
| `embedding_model` | `EmbeddingModel` | The embedding model to use to compute the cosine similarity. | `None` |
| `axis` | `int` | (Optional) Defaults to -1. The dimension along which the cosine similarity is computed. | `-1` |
| `name` | `str` | (Optional) string name of the reward instance. | `'cosine_similarity'` |
| `in_mask` | `list` | (Optional) list of keys to keep to compute the reward. | `None` |
| `out_mask` | `list` | (Optional) list of keys to remove to compute the reward. | `None` |
Source code in `synalinks/src/rewards/cosine_similarity.py`
````
@synalinks_export(
[
"synalinks.CosineSimilarity",
"synalinks.rewards.CosineSimilarity",
]
)
class CosineSimilarity(RewardFunctionWrapper):
"""
Computes the cosine similarity between `y_true` and `y_pred`.
Formula:
```
reward = (sum(l2_norm(y_true) * l2_norm(y_pred))+1) / 2
```
The formula is similar to the classic cosine similarity used in deep learning,
but scaled to [0.0, 1.0] and adjusted to have a reward that tend
towards 1.0 if the two objects are similar (and 0.0 otherwise).
Example:
```python
program.compile(
reward=synalinks.rewards.CosineSimilarity(
embedding_model=embedding_model
)
optimizer=synalinks.optimizers.RandomFewShot(),
)
```
Args:
embedding_model (EmbeddingModel): The embedding model to use to compute the
cosine similarity.
axis (int): (Optional) Defaults to `-1`. The dimension along which the cosine
similarity is computed.
name (str): (Optional) string name of the reward instance.
in_mask (list): (Optional) list of keys to keep to compute the reward.
out_mask (list): (Optional) list of keys to remove to compute the reward.
"""
def __init__(
self,
embedding_model=None,
axis=-1,
name="cosine_similarity",
in_mask=None,
out_mask=None,
):
super().__init__(
fn=cosine_similarity,
name=name,
in_mask=in_mask,
out_mask=out_mask,
axis=axis,
embedding_model=embedding_model,
)
def get_config(self):
config = Reward.get_config()
from synalinks.src.saving.serialization_lib import serialize_synalinks_object
embedding_model_config = {
"embedding_model": serialize_synalinks_object(self.embedding_model)
}
return {**config, **embedding_model_config}
@classmethod
def from_config(cls, config):
from synalinks.saving.serialization_lib import deserialize_synalinks_object
embedding_model = deserialize_synalinks_object(config.pop("embedding_model"))
return cls(embedding_model=embedding_model, **config)
````
## `cosine_similarity(y_true, y_pred, embedding_model=None, axis=-1)`
Computes the cosine similarity between `y_true` and `y_pred`.
Formula:
```
reward = (sum(l2_norm(y_true) * l2_norm(y_pred))+1) / 2
```
The formula is similar to the classic cosine similarity used in deep learning, but scaled to [0.0, 1.0] and adjusted to have a reward that tend towards 1.0 if the two objects are similar (and 0.0 otherwise).
Parameters:
| Name | Type | Description | Default |
| ----------------- | ---------------- | --------------------------------------------------------------------------------------- | ---------- |
| `y_true` | `JsonDataModel` | The ground truth JSON data_model. | *required* |
| `y_pred` | `JsonDataModel` | The predicted JSON data_model. | *required* |
| `embedding_model` | `EmbeddingModel` | The embedding model to use to compute the cosine similarity. | `None` |
| `axis` | `int` | (Optional) Defaults to -1. The dimension along which the cosine similarity is computed. | `-1` |
Returns:
| Type | Description |
| ------- | ----------------------------------------------------------------------------------------- |
| `float` | The reward value, which tend to 1.0 if the values are similar, and towards 0.0 otherwise. |
Source code in `synalinks/src/rewards/cosine_similarity.py`
````
@synalinks_export("synalinks.rewards.cosine_similarity")
async def cosine_similarity(y_true, y_pred, embedding_model=None, axis=-1):
"""
Computes the cosine similarity between `y_true` and `y_pred`.
Formula:
```
reward = (sum(l2_norm(y_true) * l2_norm(y_pred))+1) / 2
```
The formula is similar to the classic cosine similarity used in deep learning,
but scaled to [0.0, 1.0] and adjusted to have a reward that tend
towards 1.0 if the two objects are similar (and 0.0 otherwise).
Args:
y_true (JsonDataModel): The ground truth JSON data_model.
y_pred (JsonDataModel): The predicted JSON data_model.
embedding_model (EmbeddingModel): The embedding model to use to compute the
cosine similarity.
axis (int): (Optional) Defaults to `-1`. The dimension along which the cosine
similarity is computed.
Returns:
(float): The reward value, which tend to 1.0 if the values are similar,
and towards 0.0 otherwise.
"""
reward = 0.0
if y_pred is not None:
y_true = await ops.embedding(y_true, embedding_model=embedding_model)
y_pred = await ops.embedding(y_pred, embedding_model=embedding_model)
y_true = np.convert_to_tensor(y_true.get("embeddings"))
y_pred = np.convert_to_tensor(y_pred.get("embeddings"))
y_true, y_pred = squeeze_or_expand_to_same_rank(y_true, y_pred)
y_pred = np.normalize(y_pred, axis=axis)
y_true = np.normalize(y_true, axis=axis)
reward = (np.sum(y_true * y_pred, axis=axis) + 1) / 2
return reward
````
## `ExactMatch`
Bases: `RewardFunctionWrapper`
Computes the exact match between `y_true` and `y_pred`.
Example:
```
program.compile(
reward=synalinks.rewards.ExactMatch(),
optimizer=synalinks.optimizers.RandomFewShot(),
)
```
Parameters:
| Name | Type | Description | Default |
| ---------- | ------ | ------------------------------------------------------- | --------------- |
| `name` | `str` | Optional. string name of the reward instance. | `'exact_match'` |
| `in_mask` | `list` | Optional. list of keys to keep to compute the reward. | `None` |
| `out_mask` | `list` | Optional. list of keys to remove to compute the reward. | `None` |
Source code in `synalinks/src/rewards/exact_match.py`
````
@synalinks_export(
[
"synalinks.ExactMatch",
"synalinks.rewards.ExactMatch",
]
)
class ExactMatch(RewardFunctionWrapper):
"""Computes the exact match between `y_true` and `y_pred`.
Example:
```python
program.compile(
reward=synalinks.rewards.ExactMatch(),
optimizer=synalinks.optimizers.RandomFewShot(),
)
```
Args:
name (str): Optional. string name of the reward instance.
in_mask (list): Optional. list of keys to keep to compute the reward.
out_mask (list): Optional. list of keys to remove to compute the reward.
"""
def __init__(
self,
name="exact_match",
in_mask=None,
out_mask=None,
):
super().__init__(
fn=exact_match,
name=name,
in_mask=in_mask,
out_mask=out_mask,
)
def get_config(self):
return {
"name": self.name,
"in_mask": self.in_mask,
"out_mask": self.out_mask,
}
@classmethod
def from_config(cls, config):
return cls(**config)
````
## `exact_match(y_true, y_pred)`
Computes the exact match between `y_true` and `y_pred`.
If their values are equal, it returns a reward of 1.0; otherwise, it returns 0.0.
Parameters:
| Name | Type | Description | Default |
| -------- | --------------- | --------------------------------- | ---------- |
| `y_true` | `JsonDataModel` | The ground truth JSON data_model. | *required* |
| `y_pred` | `JsonDataModel` | The predicted JSON data_model. | *required* |
Returns:
| Type | Description |
| ------- | ------------------------------------------------------------------------------ |
| `float` | The reward value, which is 1.0 if the values match exactly, and 0.0 otherwise. |
Source code in `synalinks/src/rewards/exact_match.py`
```
@synalinks_export("synalinks.rewards.exact_match")
async def exact_match(y_true, y_pred):
"""
Computes the exact match between `y_true` and `y_pred`.
If their values are equal, it returns a reward of 1.0; otherwise, it returns 0.0.
Args:
y_true (JsonDataModel): The ground truth JSON data_model.
y_pred (JsonDataModel): The predicted JSON data_model.
Returns:
(float): The reward value, which is 1.0 if the values match exactly,
and 0.0 otherwise.
"""
reward = 0.0
if y_pred is not None:
if y_pred.get_json() == y_true.get_json():
reward = 1.0
return reward
```
## `LMAsJudge`
Bases: `ProgramAsJudge`
Evaluate the output of a program using a `LanguageModel`.
Example:
```
async def main():
# ... program definition
program.compile(
reward=synalinks.rewards.LMAsJudge(
language_model=language_model,
)
optimizer=synalinks.optimizers.RandomFewShot(),
)
history = await program.fit(...)
```
Parameters:
| Name | Type | Description | Default |
| ----------------- | --------------- | ---------------------------------------------------------- | --------------- |
| `language_model` | `LanguageModel` | The language model to use. | `None` |
| `prompt_template` | `str` | The default jinja2 prompt template to use (see Generator). | `None` |
| `instructions` | `list` | The default instructions to use (see Generator). | `None` |
| `examples` | `list` | The default examples to use in the prompt (see Generator). | `None` |
| `name` | `str` | Optional. string name of the reward instance. | `'lm_as_judge'` |
| `in_mask` | `list` | Optional. list of keys to keep to compute the reward. | `None` |
| `out_mask` | `list` | Optional. list of keys to remove to compute the reward. | `None` |
Source code in `synalinks/src/rewards/lm_as_judge.py`
````
@synalinks_export(
[
"synalinks.LMAsJudge",
"synalinks.rewards.LMAsJudge",
]
)
class LMAsJudge(ProgramAsJudge):
"""Evaluate the output of a program using a `LanguageModel`.
Example:
```python
async def main():
# ... program definition
program.compile(
reward=synalinks.rewards.LMAsJudge(
language_model=language_model,
)
optimizer=synalinks.optimizers.RandomFewShot(),
)
history = await program.fit(...)
```
Args:
language_model (LanguageModel): The language model to use.
prompt_template (str): The default jinja2 prompt template
to use (see `Generator`).
instructions (list): The default instructions to use (see `Generator`).
examples (list): The default examples to use in the prompt
(see `Generator`).
name (str): Optional. string name of the reward instance.
in_mask (list): Optional. list of keys to keep to compute the reward.
out_mask (list): Optional. list of keys to remove to compute the reward.
"""
def __init__(
self,
language_model=None,
prompt_template=None,
examples=None,
instructions=None,
name="lm_as_judge",
in_mask=None,
out_mask=None,
):
program = LMAsJudgeProgram(
language_model=language_model,
prompt_template=prompt_template,
examples=examples,
instructions=instructions,
)
super().__init__(
program=program,
name=name,
in_mask=in_mask,
out_mask=out_mask,
)
````
## `LMAsJudgeProgram`
Bases: `Program`
Evaluate the output of a program using a `LanguageModel`.
Parameters:
| Name | Type | Description | Default |
| ----------------- | --------------- | ---------------------------------------------------------- | ------- |
| `language_model` | `LanguageModel` | The language model to use. | `None` |
| `prompt_template` | `str` | The default jinja2 prompt template to use (see Generator). | `None` |
| `examples` | `list` | The default examples to use in the prompt (see Generator). | `None` |
| `instructions` | `list` | The default instructions to use (see Generator). | `None` |
| `name` | `str` | Optional. The name of the program. | `None` |
| `description` | `str` | Optional. The description of the program. | `None` |
| `trainable` | `bool` | Whether the program's variables should be trainable. | `True` |
Source code in `synalinks/src/rewards/lm_as_judge.py`
```
class LMAsJudgeProgram(Program):
"""Evaluate the output of a program using a `LanguageModel`.
Args:
language_model (LanguageModel): The language model to use.
prompt_template (str): The default jinja2 prompt template
to use (see `Generator`).
examples (list): The default examples to use in the prompt
(see `Generator`).
instructions (list): The default instructions to use (see `Generator`).
name (str): Optional. The name of the program.
description (str): Optional. The description of the program.
trainable (bool): Whether the program's variables should be trainable.
"""
def __init__(
self,
language_model=None,
prompt_template=None,
examples=None,
instructions=None,
name=None,
description=None,
trainable=True,
):
super().__init__(
name=name,
description=description,
trainable=trainable,
)
self.critique = SelfCritique(
language_model=language_model,
prompt_template=prompt_template,
examples=examples,
instructions=instructions,
name="self_critique_" + self.name,
)
self.language_model = language_model
self.prompt_template = prompt_template
self.examples = examples
self.instructions = instructions
async def call(self, inputs):
if not isinstance(inputs, (list, tuple)):
raise ValueError("The inputs should be a list or tuple.")
if len(inputs) != 2:
raise ValueError("The inputs of the program should have a length of 2.")
y_true = inputs[0]
y_pred = inputs[1]
if y_true:
y_true = await ops.prefix(
y_true,
prefix="gold",
name="gold_y_true",
)
return await self.critique(
await ops.concat(
y_true,
y_pred,
name="y_true_with_y_pred",
)
)
else:
return await self.critique(y_pred)
def get_config(self):
config = {
"prompt_template": self.prompt_template,
"examples": self.examples,
"instructions": self.instructions,
"name": self.name,
"description": self.description,
"trainable": self.trainable,
}
language_model_config = {
"language_model": serialization_lib.serialize_synalinks_object(
self.language_model
)
}
return {**language_model_config, **config}
@classmethod
def from_config(cls, config):
language_model = serialization_lib.deserialize_synalinks_object(
config.pop("language_model")
)
return cls(language_model=language_model, **config)
```
## `ProgramAsJudge`
Bases: `Reward`
Wrap a `Program` into a `Reward`.
You can use this to create advanced reward functions that use a Synalinks `Program`. The program should have two inputs and one output.
**Note:** The output data model/schema should have a field named `reward`.
Example:
```
# ... your program declaration
program = synalinks.Program(
inputs=x0,
outputs=xn,
)
program.compile(
reward=synalinks.rewards.ProgramAsJudge(program=program)
optimizer=synalinks.optimizers.RandomFewShot(),
)
```
Parameters:
| Name | Type | Description | Default |
| ---------- | --------- | ------------------------------------------------------- | ---------- |
| `program` | `Program` | The reward program to wrap. | *required* |
| `name` | `str` | Optional. string name of the reward instance. | `None` |
| `in_mask` | `list` | Optional. list of keys to keep to compute the reward. | `None` |
| `out_mask` | `list` | Optional. list of keys to remove to compute the reward. | `None` |
Source code in `synalinks/src/rewards/reward_wrappers.py`
````
@synalinks_export(
[
"synalinks.ProgramAsJudge",
"synalinks.rewards.ProgramAsJudge",
]
)
class ProgramAsJudge(Reward):
"""Wrap a `Program` into a `Reward`.
You can use this to create advanced reward functions that use a Synalinks `Program`.
The program should have two inputs and one output.
**Note:** The output data model/schema should have a field named `reward`.
Example:
```python
# ... your program declaration
program = synalinks.Program(
inputs=x0,
outputs=xn,
)
program.compile(
reward=synalinks.rewards.ProgramAsJudge(program=program)
optimizer=synalinks.optimizers.RandomFewShot(),
)
```
Args:
program (Program): The reward program to wrap.
name (str): Optional. string name of the reward instance.
in_mask (list): Optional. list of keys to keep to compute the reward.
out_mask (list): Optional. list of keys to remove to compute the reward.
"""
def __init__(
self,
program,
reduction="mean",
name=None,
in_mask=None,
out_mask=None,
):
super().__init__(
name=name,
reduction=reduction,
in_mask=in_mask,
out_mask=out_mask,
)
self.program = program
async def call(self, y_true, y_pred):
result = await self.program([y_true, y_pred])
return float(result.get("reward", 0.0))
def get_config(self):
config = super().get_config()
config.update({"program": self.program})
return config
def __repr__(self):
return f""
````
## `RewardFunctionWrapper`
Bases: `Reward`
Wrap a stateless function into a `Reward`.
You can use this to quickly build a reward from a function. The function needs to have the signature `fn(y_true, y_pred)`.
Example:
```
def my_reward(y_true, y_pred):
# ...
return reward
program.compile(
reward=synalinks.rewards.RewardFunctionWrapper,
optimizer=synalinks.optimizers.RandomFewShot()
)
```
Parameters:
| Name | Type | Description | Default |
| ---------- | ------------------- | --------------------------------------------------------------------------- | ---------- |
| `fn` | `callable` | The reward function to wrap, with signature fn(y_true, y_pred, \*\*kwargs). | *required* |
| `name` | `str` | Optional. string name of the reward instance. | `None` |
| `in_mask` | `list` | Optional. list of keys to keep to compute the reward. | `None` |
| `out_mask` | `list` | Optional. list of keys to remove to compute the reward. | `None` |
| `**kwargs` | `keyword arguments` | Keyword arguments to pass on to fn. | `{}` |
Source code in `synalinks/src/rewards/reward_wrappers.py`
````
@synalinks_export("synalinks.rewards.RewardFunctionWrapper")
class RewardFunctionWrapper(Reward):
"""Wrap a stateless function into a `Reward`.
You can use this to quickly build a reward from a function. The function needs
to have the signature `fn(y_true, y_pred)`.
Example:
```python
def my_reward(y_true, y_pred):
# ...
return reward
program.compile(
reward=synalinks.rewards.RewardFunctionWrapper,
optimizer=synalinks.optimizers.RandomFewShot()
)
```
Args:
fn (callable): The reward function to wrap, with signature
`fn(y_true, y_pred, **kwargs)`.
name (str): Optional. string name of the reward instance.
in_mask (list): Optional. list of keys to keep to compute the reward.
out_mask (list): Optional. list of keys to remove to compute the reward.
**kwargs (keyword arguments): Keyword arguments to pass on to `fn`.
"""
def __init__(
self,
fn,
reduction="mean",
name=None,
in_mask=None,
out_mask=None,
**kwargs,
):
super().__init__(
name=name,
reduction=reduction,
in_mask=in_mask,
out_mask=out_mask,
)
self.fn = fn
self._fn_kwargs = kwargs
async def call(self, y_true, y_pred):
return await self.fn(y_true, y_pred, **self._fn_kwargs)
def get_config(self):
config = super().get_config()
config.update({"fn": serialization_lib.serialize_synalinks_object(self.fn)})
config.update(serialization_lib.serialize_synalinks_object(self._fn_kwargs))
return config
@classmethod
def from_config(cls, config):
if "fn" in config:
config = serialization_lib.deserialize_synalinks_object(config)
return cls(**config)
def __repr__(self):
return f""
````