# Synalinks > Keras based LM framework for neuro-symbolic applications Keras based LM framework for neuro-symbolic applications and In-Context learning # Usage documentation # Introduction ______________________________________________________________________ ## What is Synalinks? Synalinks is an open-source framework that makes it easy to create, evaluate, train, and deploy industry-standard Language Models (LMs) applications. Synalinks follows the principle of *progressive disclosure of complexity*: meaning that simple workflows should be quick and easy, while arbitrarily advanced ones should be possible via a clear path that builds upon what you've already learned. Synalinks is an *adaptation of Keras 3* focused on neuro-symbolic systems and in-context reinforcement learning, an ensemble of techniques that enhance the LMs predictions and accuracy without changing the weights of the model. The goal of Synalinks is to facilitate the rapid setup of simple applications while providing the flexibility for researchers and advanced users to develop sophisticated systems. ______________________________________________________________________ Info You can use the [`llms.txt`](/synalinks/llms.txt) or [`llms-full.txt`](/synalinks/llms-full.txt) to feed your favorite LMs with Synalinks documentation ## Who is Synalinks for? Synalinks is designed for a diverse range of users, from professionals and AI researchers to students, independent developers, and hobbyists. It is suitable for anyone who wants to learn about AI by building/composing blocks or build solid foundations for enterprise-grade products. While a background in Machine Learning and Deep Learning can be advantageous — as Synalinks leverages design patterns from Keras, one of the most user-friendly and popular Deep Learning frameworks — it is not a prerequisite. Synalinks is designed to be accessible to anyone with programming skills in Python, making it a versatile and inclusive platform for AI development. ______________________________________________________________________ ## Why use Synalinks? Developping a successful LM application in a profesional context, beyond stateless chatbots, is difficult and typically include: - **Building optimized prompts with examples/instructions at each step**: Synalinks uses advanced In-Context Reinforcement Learning techniques to optimize each prompt. - **Pipelines that change over time**: Easily edit your pipelines, re-run your training, and you're good to go. - **Ensuring the correctness of the LMs output**: Synalinks combines constrained structured output with In-Context RL to ensure both format and content correctness. - **Async Optimization**: Synalinks automatically optimizes your pipelines by detecting parallel processes. - **Assessing the performance of your application**: Synalinks provides built-in metrics and rewards to evaluate your workflows. - **Configuring Language & Embedding Models**: Seamlessly integrate multiple LM providers like Ollama, OpenAI, Anthropic, Mistral or Groq. - **Documenting your ML workflows**: Plot your workflows, training history, and evaluations; document everything. - **Versioning the prompts/pipelines**: Each program is serializable into JSON so you can version it with git. - **Deploying REST APIs**: Compatible out-of-the-box with FastAPI so your Data Scientists and Web Developers can stop tearing each other apart. Synalinks can help you simplify these tasks by leveraging decade old practices in Deep Learning frameworks. We provide a comprehensive suite of tools and features designed to streamline the development process, making it easier to create, evaluate, train, document and deploy robust neuro-symbolic LMs applications. ______________________________________________________________________ Source code in `synalinks/src/trainers/trainer.py` ```` class Trainer: def __init__(self): self._lock = False self._run_eagerly = False self.compiled = False self.reward = None self.steps_per_execution = 1 # Can be set by callbacks in on_train_begin self._initial_epoch = None self._compute_reward_has_training_arg = ( "training" in inspect.signature(self.compute_reward).parameters ) # Placeholders used in `compile` self._optimizer = None self._compile_reward = None self._compile_metrics = None self._reward_tracker = None @tracking.no_automatic_dependency_tracking def compile( self, optimizer=None, reward=None, reward_weights=None, metrics=None, run_eagerly=False, steps_per_execution=1, ): """Configures the program for training. Example: ```python program.compile( optimizer=synalinks.optimizers.RandomFewShot(), reward=synalinks.rewards.ExactMatch(), metrics=[ synalinks.metrics.MeanMetricWrapper(synalinks.rewards.exact_match), ], ) ``` Args: optimizer (Optimizer): Optimizer instance. See `synalinks.optimizers`. reward (Reward): Reward function. A `synalinks.rewards.Reward` instance. See `synalinks.rewards`. A reward function is any callable with the signature `reward = fn(y_true, y_pred)`, where `y_true` are the ground truth values, and `y_pred` are the program's predictions. `y_true` should be a list of batch size length `[d0, .. dN]`. `y_pred` should be a list of batch size length `[d0, .. dN]`. The reward function should return a float. reward_weights (list): Optional list specifying scalar coefficients (Python floats) to weight the reward contributions of different program outputs. The reward value that will be maximized by the program will then be the *weighted sum* of all individual rewards, weighted by the `reward_weights` coefficients. It is expected to have a 1:1 mapping to the program's outputs. metrics (list): List of metrics to be evaluated by the program during training and testing. Each of it is a `synalinks.metrics.Metric` instance. See `synalinks.metrics`. A function is any callable with the signature `result = fn(y_true, y_pred)`. run_eagerly (bool): If `True`, this program's forward pass will never be compiled. It is recommended to leave this as `False` when training (for best performance), and to set it to `True` when debugging. steps_per_execution (int): The number of batches to run during each a single compiled function call. Running multiple batches inside a single compiled function call can greatly improve performance on TPUs or small programs with a large Python overhead. At most, one full epoch will be run each execution. If a number larger than the size of the epoch is passed, the execution will be truncated to the size of the epoch. Note that if `steps_per_execution` is set to `N`, `Callback.on_batch_begin` and `Callback.on_batch_end` methods will only be called every `N` batches (i.e. before/after each compiled function execution). """ self._clear_previous_trainer_metrics() self._optimizer = optimizer self._optimizer.set_program(self) if hasattr(self, "output_names"): output_names = self.output_names else: output_names = None if reward is not None: self._compile_reward = CompileReward( reward, reward_weights, output_names=output_names ) self.reward = reward if metrics is not None: self._compile_metrics = CompileMetrics(metrics, output_names=output_names) self.run_eagerly = run_eagerly self.stop_training = False self.compiled = True self._reward_tracker = metrics_module.Mean(name="reward") self.steps_per_execution = steps_per_execution self._compile_config = serialization_lib.SerializableDict( optimizer=optimizer, reward=reward, reward_weights=reward_weights, metrics=metrics, run_eagerly=run_eagerly, steps_per_execution=steps_per_execution, ) @property def optimizer(self): return self._optimizer @property def metrics(self): # Order: reward tracker, individual reward trackers, compiled metrics, # custom metrcis, submodule metrics. metrics = [] if self.compiled: if self._reward_tracker is not None: metrics.append(self._reward_tracker) if self._compile_metrics is not None: metrics.append(self._compile_metrics) if self._compile_reward is not None: metrics.extend(self._compile_reward.metrics) metrics.extend(self._metrics) for module in self._flatten_modules(include_self=False): if isinstance(module, Trainer): # All Trainer-related metrics in submodules should be ignored # because a new Trainer has been instantiated. continue metrics.extend(module.metrics) return metrics @property def metrics_names(self): return [m.name for m in self.metrics] def reset_metrics(self): for m in self.metrics: m.reset_state() def _get_own_metrics(self): metrics = [] if self._reward_tracker is not None: metrics.append(self._reward_tracker) if self._compile_metrics is not None: metrics.append(self._compile_metrics) if self._compile_reward is not None: metrics.extend(self._compile_reward.metrics) metrics.extend(self._metrics) return metrics def _clear_previous_trainer_metrics(self): for module in self._flatten_modules(include_self=False): if not isinstance(module, Trainer): continue # A submodule might be a Trainer. In that case, we need to clear # the Trainer-related metrics, as they are not usable when a # new Trainer is instantiated. for m in self._get_own_metrics(): module._tracker.untrack(m) module._reward_tracker = None module._compile_metrics = None if module._compile_reward is not None: module._compile_reward._metrics.clear() module._metrics.clear() @property def run_eagerly(self): return self._run_eagerly @run_eagerly.setter def run_eagerly(self, value): self._run_eagerly = value async def compute_reward( self, x=None, y=None, y_pred=None, training=True, ): """Compute the total reward, validate it, and return it. Subclasses can optionally override this method to provide custom reward computation logic. Args: x (list): Input data. y (list): Target data. y_pred (list): Predictions returned by the program (output of `program(x)`). training (bool): Whether we are training or evaluating the program. Returns: (float | None): The total reward as a scalar, or `None` if no reward results (which is the case when called by `Program.test_step`). """ # The default implementation does not use `x` or `training`. del x del training rewards = [] if self._compile_reward is not None: for y_t, y_p in zip(y, y_pred): reward = await self._compile_reward(y_t, y_p) if reward is not None: rewards.append(reward) for reward in self.rewards: rewards.append(numpy.sum(reward)) if len(rewards) == 1: total_reward = rewards[0] elif len(rewards) == 0: total_reward = numpy.zeros(()) else: total_reward = numpy.mean(rewards) return float(total_reward) def stateless_compute_reward( self, trainable_variables, non_trainable_variables, metrics_variables, x=None, y=None, y_pred=None, training=True, ): var_mapping = list(zip(self.trainable_variables, trainable_variables)) var_mapping.extend(zip(self.non_trainable_variables, non_trainable_variables)) var_mapping.extend(zip(self.metrics_variables, metrics_variables)) with backend.StatelessScope(state_mapping=var_mapping) as scope: # Note that this is needed for the regularization reward, which need # the latest value of train/non-trainable variables. reward = self._compute_reward( x, y, y_pred, training=training, ) # Update non trainable vars (may have been updated in compute_reward) non_trainable_variables = [] for v in self.non_trainable_variables: new_v = scope.get_current_value(v) non_trainable_variables.append(new_v) # Update metrics vars (may have been updated in compute_reward) metrics_variables = [] for v in self.metrics_variables: new_v = scope.get_current_value(v) metrics_variables.append(new_v) return reward, ( trainable_variables, non_trainable_variables, metrics_variables, ) async def compute_metrics(self, x, y, y_pred): """Update metric states and collect all metrics to be returned. Subclasses can optionally override this method to provide custom metric updating and collection logic. Custom metrics are not passed in `compile()`, they can be created in `__init__` or `build`. They are automatically tracked and returned by `self.metrics`. ``` Args: x: Input data. y: Target data. y_pred: Predictions returned by the program output of `program.call(x)`. Returns: A `dict` containing values that will be passed to `synalinks.callbacks.CallbackList.on_train_batch_end()`. Typically, the values of the metrics listed in `self.metrics` are returned. Example: `{'reward': 0.2, 'accuracy': 0.7}`. """ del x # The default implementation does not use `x`. if self._compile_metrics is not None: for y_t, y_p in zip(y, y_pred): await self._compile_metrics.update_state(y_t, y_p) return self.get_metrics_result() def get_metrics_result(self): """Returns the program's metrics values as a dict. If any of the metric result is a dict (containing multiple metrics), each of them gets added to the top level returned dict of this method. Returns: (dict): A `dict` containing values of the metrics listed in `self.metrics`. Example: `{'reward': 0.2, 'accuracy': 0.7}`. """ return_metrics = {} for metric in self.metrics: result = metric.result() if isinstance(result, dict): return_metrics.update(result) else: return_metrics[metric.name] = result return python_utils.pythonify_logs(return_metrics) async def fit( self, x=None, y=None, batch_size=1, epochs=1, verbose="auto", callbacks=None, validation_split=0.1, validation_data=None, shuffle=True, initial_epoch=0, steps_per_epoch=None, validation_steps=None, validation_batch_size=32, validation_freq=1, ): """Trains the program for a fixed number of epochs (dataset iterations). Args: x (np.ndarray | generator): Input data. It can be: - A NumPy array (or array-like), or a list of `DataModel` arrays (in case the model has multiple inputs). - A list of dict mapping input names to the corresponding `DataModel`s, if the program has named inputs. - A Python generator function yielding `(inputs, targets)`. y (np.ndarray): Target data. Like the input data `x`, it can be either NumPy array(s) of `DataModel`(s). If `x` is a Python generator function, `y` should not be specified since targets will be obtained from `x`. batch_size (int): Integer or `None`. Number of samples per batch of computation. If unspecified, `batch_size` will default to 32. Do not specify the `batch_size` if your input data `x` is a Python generator function since they generate batches. epochs (int): Integer. Number of epochs to train the program. An epoch is an iteration over the entire `x` and `y` data provided (unless the `steps_per_epoch` flag is set to something other than None). Note that in conjunction with `initial_epoch`, `epochs` is to be understood as "final epoch". The program is not trained for a number of iterations given by `epochs`, but merely until the epoch of index `epochs` is reached. verbose (int): `"auto"`, 0, 1, or 2. Verbosity mode. 0 = silent, 1 = progress bar, 2 = one line per epoch. "auto" becomes 1 for most cases. Note that the progress bar is not particularly useful when logged to a file, so `verbose=2` is recommended when not running interactively (e.g., in a production environment). Defaults to `"auto"`. callbacks (list): List of `synalinks.callbacks.Callback` instances. List of callbacks to apply during training. See `synalinks.callbacks`. Note `synalinks.callbacks.ProgbarLogger` and `synalinks.callbacks.History` callbacks are created automatically and need not be passed to `program.fit()`. `synalinks.callbacks.ProgbarLogger` is created or not based on the `verbose` argument in `program.fit()`. validation_split (float): Float between 0 and 1. Fraction of the training data to be used as validation data. The program will set apart this fraction of the training data, will not train on it, and will evaluate the reward and any program metrics on this data at the end of each epoch. The validation data is selected from the last samples in the `x` and `y` data provided, before shuffling. This argument is only supported when `x` and `y` are made of data_models. If both `validation_data` and `validation_split` are provided, `validation_data` will override `validation_split`. validation_data (tuple | iterator): Data on which to evaluate the reward and any program metrics at the end of each epoch. The program will not be trained on this data. `validation_data` will override `validation_split`. It can be: - A tuple `(x_val, y_val)` of `DataModel`s lists. shuffle (bool): Whether to shuffle the training data before each epoch. This argument is ignored when `x` is a Python generator function. initial_epoch (int): Integer. Epoch at which to start training (useful for resuming a previous training run). steps_per_epoch (int): Integer or `None`. Total number of steps (batches of samples) before declaring one epoch finished and starting the next epoch. When training with input data_models arrays, the default `None` means that the value used is the number of samples in your dataset divided by the batch size, or 1 if that cannot be determined. If `x` is a Python generator function, the epoch will run until the input dataset is exhausted. When passing an infinitely repeating dataset, you must specify the `steps_per_epoch` argument, otherwise the training will run indefinitely. validation_steps (int): Integer or `None`. Only relevant if `validation_data` is provided. Total number of steps (batches of samples) to draw before stopping when performing validation at the end of every epoch. If `validation_steps` is `None`, validation will run until the `validation_data` dataset is exhausted. In the case of an infinitely repeating dataset, it will run indefinitely. If `validation_steps` is specified and only part of the dataset is consumed, the evaluation will start from the beginning of the dataset at each epoch. This ensures that the same validation samples are used every time. validation_batch_size (int): Integer or `None`. Number of samples per validation batch. If unspecified, will default to `batch_size`. Do not specify the `validation_batch_size` if your data is a `synalinks.utils.PyDataset`, `tf.data.Dataset`, `torch.utils.data.DataLoader` or Python generator function since they generate batches. validation_freq (int): Only relevant if validation data is provided. Specifies how many training epochs to run before a new validation run is performed, e.g. `validation_freq=2` runs validation every 2 epochs. Returns: (History): A `History` object. Its `History.history` attribute is a record of training reward values and metrics values at successive epochs, as well as validation reward values and validation metrics values (if applicable). """ self._assert_compile_called("fit") self._eval_epoch_iterator = None val_y, val_y = None, None if validation_split and validation_data is None: # Create the validation data using the training data. Only supported # for numpy arrays. (x, y), validation_data = array_slicing.train_validation_split( (x, y), validation_split=validation_split ) if validation_data is not None: (val_x, val_y) = data_adapter_utils.unpack_x_y(validation_data) # Create an iterator that yields batches of input/target data. epoch_iterator = EpochIterator( x=x, y=y, batch_size=batch_size, steps_per_epoch=steps_per_epoch, shuffle=False, steps_per_execution=self.steps_per_execution, ) if not all(module.built for module in self._flatten_modules()): # Build the model on one batch of data. for _, data in epoch_iterator: data_batch = data[0] self._auto_build( iterator=epoch_iterator, data_batch=data_batch, ) break epoch_iterator.reset() # Container that configures and calls callbacks. if not isinstance(callbacks, callbacks_module.CallbackList): callbacks = callbacks_module.CallbackList( callbacks, add_history=True, add_progbar=verbose != 0, verbose=verbose, epochs=epochs, steps=steps_per_epoch, program=self, ) self.stop_training = False callbacks.on_train_begin() training_logs = None logs = {} initial_epoch = self._initial_epoch or initial_epoch if self.trainable_variables and isinstance( self.optimizer, optimizers_module.Optimizer ): await self.optimizer.on_train_begin( self.trainable_variables, ) for epoch in range(initial_epoch, epochs): self.reset_metrics() if self.trainable_variables and isinstance( self.optimizer, optimizers_module.Optimizer ): await self.optimizer.on_epoch_begin( epoch, self.trainable_variables, ) callbacks.on_epoch_begin(epoch) with epoch_iterator.catch_stop_iteration(): for step, iterator in epoch_iterator: data = iterator[0] x_batch, y_batch = data_adapter_utils.unpack_x_y(data) if self.trainable_variables and isinstance( self.optimizer, optimizers_module.Optimizer ): await self.optimizer.on_batch_begin( step, epoch, self.trainable_variables, ) callbacks.on_train_batch_begin(step) logs = await self.train_on_batch( step=step, x=x_batch, y=y_batch, val_x=val_x, val_y=val_y, return_dict=True, ) val_logs = await self.evaluate( x=val_x, y=val_y, batch_size=validation_batch_size or batch_size, steps=validation_steps, callbacks=callbacks, _use_cached_eval_dataset=False, ) if self.trainable_variables and isinstance( self.optimizer, optimizers_module.Optimizer ): await self.optimizer.on_batch_end( step, epoch, self.trainable_variables, ) callbacks.on_train_batch_end(step, logs) if self.stop_training: break # Override with model metrics instead of last step logs if needed. epoch_logs = dict(self._get_metrics_result_or_logs(logs)) # Run validation. if validation_data is not None and self._should_eval(epoch, validation_freq): # Create EpochIterator for evaluation and cache it. if getattr(self, "_eval_epoch_iterator", None) is None: self._eval_epoch_iterator = EpochIterator( x=val_x, y=val_y, batch_size=validation_batch_size or batch_size, steps_per_execution=self.steps_per_execution, steps_per_epoch=validation_steps, shuffle=False, ) val_logs = await self.evaluate( x=val_x, y=val_y, batch_size=validation_batch_size or batch_size, steps=validation_steps, callbacks=callbacks, _use_cached_eval_dataset=True, ) val_logs = {"val_" + name: val for name, val in val_logs.items()} epoch_logs.update(val_logs) if self.trainable_variables and isinstance( self.optimizer, optimizers_module.Optimizer ): await self.optimizer.on_epoch_end( epoch, self.trainable_variables, ) callbacks.on_epoch_end(epoch, epoch_logs) training_logs = epoch_logs if self.stop_training: break # If _eval_epoch_iterator exists, delete it after all epochs are done. if getattr(self, "_eval_epoch_iterator", None) is not None: del self._eval_epoch_iterator if self.trainable_variables and isinstance( self.optimizer, optimizers_module.Optimizer ): await self.optimizer.on_train_end(self.trainable_variables) callbacks.on_train_end(logs=training_logs) return self.history async def evaluate( self, x=None, y=None, batch_size=32, verbose="auto", steps=None, callbacks=None, return_dict=True, **kwargs, ): """Returns the reward value & metrics values for the program in test mode. Computation is done in batches (see the `batch_size` arg.) Args: x (np.ndarray | generator): Input data. It can be: - A NumPy array (or array-like), or a list of `DataModel` arrays (in case the model has multiple inputs). - A list of dict mapping input names to the corresponding `DataModel`s, if the program has named inputs. - A Python generator function yielding `(inputs, targets)`. y (np.ndarray): Target data. Like the input data `x`, it can be either NumPy array(s) of `DataModel`(s). If `x` is a Python generator function, `y` should not be specified since targets will be obtained from `x`. batch_size (int): Integer or `None`. Number of samples per batch of computation. If unspecified, `batch_size` will default to 32. Do not specify the `batch_size` if your input data `x` is a Python generator function since they generate batches. verbose (int | str): `"auto"`, 0, 1, or 2. Verbosity mode. 0 = silent, 1 = progress bar, 2 = single line. `"auto"` becomes 1 for most cases. Note that the progress bar is not particularly useful when logged to a file, so `verbose=2` is recommended when not running interactively (e.g. in a production environment). Defaults to `"auto"`. steps (int): Integer or `None`. Total number of steps (batches of samples) to draw before declaring the evaluation round finished. If `steps` is `None`, it will run until `x` is exhausted. In the case of an infinitely repeating dataset, it will run indefinitely. callbacks (list): List of `synalinks.callbacks.Callback` instances. List of callbacks to apply during evaluation. return_dict (bool): If `True`, reward and metric results are returned as a dict, with each key being the name of the metric. If `False`, they are returned as a list. Returns: (float | list | dict): Scalar test reward (if the program has a single output and no metrics) or list of scalars (if the program has multiple outputs and/or metrics). The attribute `program.metrics_names` will give you the display labels for the scalar outputs. """ self._assert_compile_called("evaluate") use_cached_eval_dataset = kwargs.pop("_use_cached_eval_dataset", False) if kwargs: raise ValueError(f"Arguments not recognized: {kwargs}") # Create an iterator that yields batches of input/target data. if use_cached_eval_dataset: epoch_iterator = self._eval_epoch_iterator else: epoch_iterator = EpochIterator( x=x, y=y, batch_size=batch_size, steps_per_epoch=steps, shuffle=False, steps_per_execution=self.steps_per_execution, ) if not all(module.built for module in self._flatten_modules()): # Build the model on one batch of data. for _, data in epoch_iterator: data_batch = data[0] self._auto_build( iterator=epoch_iterator, data_batch=data_batch, ) break epoch_iterator.reset() # Container that configures and calls callbacks. if not isinstance(callbacks, callbacks_module.CallbackList): callbacks = callbacks_module.CallbackList( callbacks, add_history=False, add_progbar=verbose != 0, verbose=verbose, epochs=1, steps=epoch_iterator.num_batches, program=self, ) self.stop_evaluating = False callbacks.on_test_begin() logs = {} self.reset_metrics() for step, iterator in epoch_iterator: callbacks.on_test_batch_begin(step) data = iterator[0] x_batch, y_batch = data_adapter_utils.unpack_x_y(data) logs = await self.test_on_batch( x=x_batch, y=y_batch, return_dict=True, ) callbacks.on_test_batch_end(step, logs) if self.stop_evaluating: break logs = self.get_metrics_result() callbacks.on_test_end(logs) if return_dict: return logs return self._flatten_metrics_in_order(logs) async def predict( self, x, batch_size=None, verbose="auto", steps=None, callbacks=None ): """Generates output predictions for the input samples. Computation is done in batches. This method is designed for batch processing of large numbers of inputs. It is not intended for use inside of loops that iterate over your data and process small numbers of inputs at a time. For small numbers of inputs that fit in one batch, directly use `__call__()` for faster execution, e.g., `program(x)`, or `program(x, training=False)` if you have modules that behave differently during inference. Args: x (np.ndarray | generator): Input data. It can be: - A NumPy array (or array-like), or a list of `DataModel` arrays (in case the model has multiple inputs). - A list of dict mapping input names to the corresponding `DataModel`s, if the program has named inputs. - A Python generator function yielding `(inputs, targets)`. batch_size (int): Integer or `None`. Number of samples per batch of computation. If unspecified, `batch_size` will default to 32. Do not specify the `batch_size` if your input data `x` is a `synalinks.utils.PyDataset`, `tf.data.Dataset`, `torch.utils.data.DataLoader` or Python generator function since they generate batches. verbose (int): `"auto"`, 0, 1, or 2. Verbosity mode. 0 = silent, 1 = progress bar, 2 = single line. `"auto"` becomes 1 for most cases. Note that the progress bar is not particularly useful when logged to a file, so `verbose=2` is recommended when not running interactively (e.g. in a production environment). Defaults to `"auto"`. steps (int): Total number of steps (batches of samples) to draw before declaring the prediction round finished. If `steps` is `None`, it will run until `x` is exhausted. In the case of an infinitely repeating dataset, it will run indefinitely. callbacks (list): List of `synalinks.callbacks.Callback` instances. List of callbacks to apply during prediction. Returns: (list): `JsonDataModel` array(s) of predictions. If the pipeline failed, a None is added to the predictions. """ # Create an iterator that yields batches of input data. epoch_iterator = EpochIterator( x=x, batch_size=batch_size, steps_per_epoch=steps, shuffle=False, steps_per_execution=self.steps_per_execution, ) # Container that configures and calls callbacks. if not isinstance(callbacks, callbacks_module.CallbackList): callbacks = callbacks_module.CallbackList( callbacks, add_history=True, add_progbar=verbose != 0, verbose=verbose, epochs=1, steps=epoch_iterator.num_batches, model=self, ) self.stop_predicting = False callbacks.on_test_begin() outputs = [] for step, iterator in epoch_iterator: callbacks.on_predict_batch_begin(step) data = iterator[0] x_batch, _ = data_adapter_utils.unpack_x_y(data) batch_outputs = await self.predict_on_batch(x_batch) outputs.extend(batch_outputs) callbacks.on_predict_batch_end(step, {"outputs": batch_outputs}) if self.stop_predicting: break callbacks.on_predict_end() return np.array(outputs, dtype="object") async def train_on_batch( self, step, x, y=None, val_x=None, val_y=None, return_dict=False, ): """Runs a single optimization step on a single batch of data. Args: step (int): The training step. x (np.ndarray): Input data. Must be array-like. y (np.ndarray): Target data. Must be array-like. val_x (np.ndarray): Input validation data. Must be array-like. val_y (np.ndarray): Target validation data. Must be array-like. return_dict (bool): If `True`, reward and metric results are returned as a dict, with each key being the name of the metric. If `False`, they are returned as a list. Returns: (float | list | dict): A scalar reward value (when no metrics and `return_dict=False`), a list of reward and metric values (if there are metrics and `return_dict=False`), or a dict of metric and reward values (if `return_dict=True`). """ if self.trainable_variables and isinstance( self.optimizer, optimizers_module.Optimizer ): metrics = await self.optimizer.optimize( step, self.trainable_variables, x=x, y=y, val_x=val_x, val_y=val_y, ) else: warnings.warn("The program does not have any trainable variables.") y_pred = await self.predict_on_batch(val_x) reward = await self.compute_reward( x=val_x, y=val_y, y_pred=y_pred, ) await self._reward_tracker.update_state(reward) metrics = await self.compute_metrics(val_x, val_y, y_pred) if return_dict: return metrics return self._flatten_metrics_in_order(metrics) async def test_on_batch( self, x, y=None, return_dict=False, ): """Test the program on a single batch of samples. Args: x (np.ndarray): Input data. Must be array-like. y (np.ndarray): Target data. Must be array-like. return_dict (bool): If `True`, reward and metric results are returned as a dict, with each key being the name of the metric. If `False`, they are returned as a list. Returns: (float | list | dict): A scalar reward value (when no metrics and `return_dict=False`), a list of reward and metric values (if there are metrics and `return_dict=False`), or a dict of metric and reward values (if `return_dict=True`). """ y_pred = await self.predict_on_batch(x) reward = await self.compute_reward( x=x, y=y, y_pred=y_pred, training=False, ) await self._reward_tracker.update_state(reward) metrics = await self.compute_metrics(x, y, y_pred) if return_dict: return metrics return self._flatten_metrics_in_order(metrics) async def predict_on_batch(self, x, training=False): """Returns predictions for a single batch of samples. Args: x (np.ndarray): Input data. Must be array-like. training (bool): Boolean. True if training. Returns: (list): list(s) of JsonDataModel predictions. """ tasks = [] for inputs in x: tasks.append(self(inputs, training=training)) y_pred = await asyncio.gather(*tasks) return y_pred def get_compile_config(self): """Returns a serialized config with information for compiling the program. This method returns a config dictionary containing all the information (optimizer, reward, metrics, etc.) with which the program was compiled. Returns: (dict): A dict containing information for compiling the program. """ if self.compiled and hasattr(self, "_compile_config"): return self._compile_config.serialize() def compile_from_config(self, config): """Compiles the program with the information given in config. This method uses the information in the config (optimizer, reward, metrics, etc.) to compile the program. Args: config (dict): Dict containing information for compiling the program. """ has_overridden_compile = self.__class__.compile != Trainer.compile if has_overridden_compile: warnings.warn( "`compile()` was not called as part of program loading " "because the program's `compile()` method is custom. " "All subclassed Models that have `compile()` " "overridden should also override " "`get_compile_config()` and `compile_from_config(config)`. " "Alternatively, you can " "call `compile()` manually after loading.", stacklevel=2, ) return config = serialization_lib.deserialize_synalinks_object(config) self.compile(**config) if hasattr(self, "optimizer") and self.built: # Create optimizer variables/programs. if not self.optimizer.built: run_maybe_nested(self.optimizer.build(self.trainable_variables)) def _should_reward(self, epoch, validation_freq): epoch = epoch + 1 # one-index the user-facing epoch. if isinstance(validation_freq, int): return epoch % validation_freq == 0 elif isinstance(validation_freq, list): return epoch in validation_freq else: raise ValueError( "Expected `validation_freq` to be a list or int. " f"Received: validation_freq={validation_freq} of the " f"type {type(validation_freq)}." ) def _get_metrics_result_or_logs(self, logs): """Returns program metrics as a dict if the keys match with input logs. When the training / evaluation is performed with an asynchronous steps, the last scheduled `train / test_step` may not give the latest metrics because it is not guaranteed to be executed the last. This method gets metrics from the program directly instead of relying on the return from last step function. When the user has custom train / test step functions, the metrics returned may be different from `Program.metrics`. In those instances, this function will be no-op and return the logs passed in. Args: logs (dict): A `dict` of metrics returned by train / test step function. Returns: (dict): A `dict` containing values of the metrics listed in `self.metrics` when logs and program metrics keys match. Otherwise it returns input `logs`. """ metric_logs = self.get_metrics_result() # Verify that train / test step logs passed and metric logs have # matching keys. It could be different when using custom step functions, # in which case we return the logs from the last step. if isinstance(logs, dict) and set(logs.keys()) == set(metric_logs.keys()): return metric_logs return logs def _flatten_metrics_in_order(self, logs): """Turns `logs` dict into a list as per key order of `metrics_names`.""" metric_names = [] for metric in self.metrics: if isinstance(metric, CompileMetrics): metric_names += [sub_metric.name for sub_metric in metric.metrics] else: metric_names.append(metric.name) results = [] for name in metric_names: if name in logs: results.append(logs[name]) for key in sorted(logs.keys()): if key not in metric_names: results.append(logs[key]) if len(results) == 1: return results[0] return results def _assert_compile_called(self, method_name=None): if not self.compiled: msg = "You must call `compile()` before " if metrics_module: msg += "using the program." else: msg += f"calling `{method_name}()`." raise ValueError(msg) def _auto_build(self, iterator=None, data_batch=None): program_unbuilt = not all(module.built for module in self._flatten_modules()) compile_metrics_unbuilt = ( self._compile_metrics is not None and not self._compile_metrics.built ) compile_reward_unbuilt = ( self._compile_reward is not None and not self._compile_reward.built ) optimizer_unbuilt = self.optimizer is not None and not self.optimizer.built if program_unbuilt or compile_metrics_unbuilt or compile_reward_unbuilt: if data_batch is None: for _, data_or_iterator in iterator: if isinstance(data_or_iterator, (list, tuple)): data_batch = data_or_iterator[0] else: data_batch = next(data_or_iterator) break (x, y) = data_batch try: y_pred = run_maybe_nested(self.predict_on_batch(x)) except Exception as e: raise RuntimeError( "Unable to automatically build the program. " "Please build it yourself before calling " "fit/evaluate/predict. " "A program is 'built' when its variables have " "been created and its `self.built` attribute " "is True. Usually, calling the program on a batch " "of data is the right way to build it.\n" "Exception encountered:\n" f"'{e}'" ) if compile_metrics_unbuilt: # Build all metric state with `backend.compute_output_spec`. run_maybe_nested( backend.compute_output_spec( self.compute_metrics, x, y, y_pred, ) ) if compile_reward_unbuilt: # Build `CompileReward` state with `backend.compute_output_spec`. run_maybe_nested( backend.compute_output_spec( self.compute_reward, x, y, y_pred, training=False, ) ) if optimizer_unbuilt: # Build optimizer run_maybe_nested(self.optimizer.build(self.trainable_variables)) self._post_build() def _assert_compile_called(self, method_name=None): if not self.compiled: msg = "You must call `compile()` before " if metrics_module: msg += "using the model." else: msg += f"calling `{method_name}()`." raise ValueError(msg) def _should_eval(self, epoch, validation_freq): epoch = epoch + 1 # one-index the user-facing epoch. if isinstance(validation_freq, int): return epoch % validation_freq == 0 elif isinstance(validation_freq, list): return epoch in validation_freq else: raise ValueError( "Expected `validation_freq` to be a list or int. " f"Received: validation_freq={validation_freq} of the " f"type {type(validation_freq)}." ) ```` ## `compile(optimizer=None, reward=None, reward_weights=None, metrics=None, run_eagerly=False, steps_per_execution=1)` Configures the program for training. Example: ``` program.compile( optimizer=synalinks.optimizers.RandomFewShot(), reward=synalinks.rewards.ExactMatch(), metrics=[ synalinks.metrics.MeanMetricWrapper(synalinks.rewards.exact_match), ], ) ``` Parameters: | Name | Type | Description | Default | | --------------------- | ----------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- | | `optimizer` | `Optimizer` | Optimizer instance. See synalinks.optimizers. | `None` | | `reward` | `Reward` | Reward function. A synalinks.rewards.Reward instance. See synalinks.rewards. A reward function is any callable with the signature reward = fn(y_true, y_pred), where y_true are the ground truth values, and y_pred are the program's predictions. y_true should be a list of batch size length [d0, .. dN]. y_pred should be a list of batch size length [d0, .. dN]. The reward function should return a float. | `None` | | `reward_weights` | `list` | Optional list specifying scalar coefficients (Python floats) to weight the reward contributions of different program outputs. The reward value that will be maximized by the program will then be the weighted sum of all individual rewards, weighted by the reward_weights coefficients. It is expected to have a 1:1 mapping to the program's outputs. | `None` | | `metrics` | `list` | List of metrics to be evaluated by the program during training and testing. Each of it is a synalinks.metrics.Metric instance. See synalinks.metrics. A function is any callable with the signature result = fn(y_true, y_pred). | `None` | | `run_eagerly` | `bool` | If True, this program's forward pass will never be compiled. It is recommended to leave this as False when training (for best performance), and to set it to True when debugging. | `False` | | `steps_per_execution` | `int` | The number of batches to run during each a single compiled function call. Running multiple batches inside a single compiled function call can greatly improve performance on TPUs or small programs with a large Python overhead. At most, one full epoch will be run each execution. If a number larger than the size of the epoch is passed, the execution will be truncated to the size of the epoch. Note that if steps_per_execution is set to N, Callback.on_batch_begin and Callback.on_batch_end methods will only be called every N batches (i.e. before/after each compiled function execution). | `1` | Source code in `synalinks/src/trainers/trainer.py` ```` @tracking.no_automatic_dependency_tracking def compile( self, optimizer=None, reward=None, reward_weights=None, metrics=None, run_eagerly=False, steps_per_execution=1, ): """Configures the program for training. Example: ```python program.compile( optimizer=synalinks.optimizers.RandomFewShot(), reward=synalinks.rewards.ExactMatch(), metrics=[ synalinks.metrics.MeanMetricWrapper(synalinks.rewards.exact_match), ], ) ``` Args: optimizer (Optimizer): Optimizer instance. See `synalinks.optimizers`. reward (Reward): Reward function. A `synalinks.rewards.Reward` instance. See `synalinks.rewards`. A reward function is any callable with the signature `reward = fn(y_true, y_pred)`, where `y_true` are the ground truth values, and `y_pred` are the program's predictions. `y_true` should be a list of batch size length `[d0, .. dN]`. `y_pred` should be a list of batch size length `[d0, .. dN]`. The reward function should return a float. reward_weights (list): Optional list specifying scalar coefficients (Python floats) to weight the reward contributions of different program outputs. The reward value that will be maximized by the program will then be the *weighted sum* of all individual rewards, weighted by the `reward_weights` coefficients. It is expected to have a 1:1 mapping to the program's outputs. metrics (list): List of metrics to be evaluated by the program during training and testing. Each of it is a `synalinks.metrics.Metric` instance. See `synalinks.metrics`. A function is any callable with the signature `result = fn(y_true, y_pred)`. run_eagerly (bool): If `True`, this program's forward pass will never be compiled. It is recommended to leave this as `False` when training (for best performance), and to set it to `True` when debugging. steps_per_execution (int): The number of batches to run during each a single compiled function call. Running multiple batches inside a single compiled function call can greatly improve performance on TPUs or small programs with a large Python overhead. At most, one full epoch will be run each execution. If a number larger than the size of the epoch is passed, the execution will be truncated to the size of the epoch. Note that if `steps_per_execution` is set to `N`, `Callback.on_batch_begin` and `Callback.on_batch_end` methods will only be called every `N` batches (i.e. before/after each compiled function execution). """ self._clear_previous_trainer_metrics() self._optimizer = optimizer self._optimizer.set_program(self) if hasattr(self, "output_names"): output_names = self.output_names else: output_names = None if reward is not None: self._compile_reward = CompileReward( reward, reward_weights, output_names=output_names ) self.reward = reward if metrics is not None: self._compile_metrics = CompileMetrics(metrics, output_names=output_names) self.run_eagerly = run_eagerly self.stop_training = False self.compiled = True self._reward_tracker = metrics_module.Mean(name="reward") self.steps_per_execution = steps_per_execution self._compile_config = serialization_lib.SerializableDict( optimizer=optimizer, reward=reward, reward_weights=reward_weights, metrics=metrics, run_eagerly=run_eagerly, steps_per_execution=steps_per_execution, ) ```` ## `compile_from_config(config)` Compiles the program with the information given in config. This method uses the information in the config (optimizer, reward, metrics, etc.) to compile the program. Parameters: | Name | Type | Description | Default | | -------- | ------ | ------------------------------------------------------ | ---------- | | `config` | `dict` | Dict containing information for compiling the program. | *required* | Source code in `synalinks/src/trainers/trainer.py` ``` def compile_from_config(self, config): """Compiles the program with the information given in config. This method uses the information in the config (optimizer, reward, metrics, etc.) to compile the program. Args: config (dict): Dict containing information for compiling the program. """ has_overridden_compile = self.__class__.compile != Trainer.compile if has_overridden_compile: warnings.warn( "`compile()` was not called as part of program loading " "because the program's `compile()` method is custom. " "All subclassed Models that have `compile()` " "overridden should also override " "`get_compile_config()` and `compile_from_config(config)`. " "Alternatively, you can " "call `compile()` manually after loading.", stacklevel=2, ) return config = serialization_lib.deserialize_synalinks_object(config) self.compile(**config) if hasattr(self, "optimizer") and self.built: # Create optimizer variables/programs. if not self.optimizer.built: run_maybe_nested(self.optimizer.build(self.trainable_variables)) ``` ## `compute_metrics(x, y, y_pred)` Update metric states and collect all metrics to be returned. Subclasses can optionally override this method to provide custom metric updating and collection logic. Custom metrics are not passed in `compile()`, they can be created in `__init__` or `build`. They are automatically tracked and returned by `self.metrics`. ``` Args: x: Input data. y: Target data. y_pred: Predictions returned by the program output of `program.call(x)`. Returns: A `dict` containing values that will be passed to `synalinks.callbacks.CallbackList.on_train_batch_end()`. Typically, the values of the metrics listed in `self.metrics` are returned. Example: `{'reward': 0.2, 'accuracy': 0.7}`. Source code in `synalinks/src/trainers/trainer.py` ``` async def compute_metrics(self, x, y, y_pred): """Update metric states and collect all metrics to be returned. ```` Subclasses can optionally override this method to provide custom metric updating and collection logic. Custom metrics are not passed in `compile()`, they can be created in `__init__` or `build`. They are automatically tracked and returned by `self.metrics`. ``` Args: x: Input data. y: Target data. y_pred: Predictions returned by the program output of `program.call(x)`. Returns: A `dict` containing values that will be passed to `synalinks.callbacks.CallbackList.on_train_batch_end()`. Typically, the values of the metrics listed in `self.metrics` are returned. Example: `{'reward': 0.2, 'accuracy': 0.7}`. """ del x # The default implementation does not use `x`. if self._compile_metrics is not None: for y_t, y_p in zip(y, y_pred): await self._compile_metrics.update_state(y_t, y_p) return self.get_metrics_result() ```` ``` ## `compute_reward(x=None, y=None, y_pred=None, training=True)` Compute the total reward, validate it, and return it. Subclasses can optionally override this method to provide custom reward computation logic. Parameters: | Name | Type | Description | Default | | --- | --- | --- | --- | | `x` | `list` | Input data. | `None` | | `y` | `list` | Target data. | `None` | | `y_pred` | `list` | Predictions returned by the program (output of program(x)). | `None` | | `training` | `bool` | Whether we are training or evaluating the program. | `True` | Returns: | Type | Description | | --- | --- | | `float | None` | The total reward as a scalar, or None if no reward results (which is the case when called by Program.test_step). | Source code in `synalinks/src/trainers/trainer.py` ``` async def compute_reward( self, x=None, y=None, y_pred=None, training=True, ): """Compute the total reward, validate it, and return it. ``` Subclasses can optionally override this method to provide custom reward computation logic. Args: x (list): Input data. y (list): Target data. y_pred (list): Predictions returned by the program (output of `program(x)`). training (bool): Whether we are training or evaluating the program. Returns: (float | None): The total reward as a scalar, or `None` if no reward results (which is the case when called by `Program.test_step`). """ # The default implementation does not use `x` or `training`. del x del training rewards = [] if self._compile_reward is not None: for y_t, y_p in zip(y, y_pred): reward = await self._compile_reward(y_t, y_p) if reward is not None: rewards.append(reward) for reward in self.rewards: rewards.append(numpy.sum(reward)) if len(rewards) == 1: total_reward = rewards[0] elif len(rewards) == 0: total_reward = numpy.zeros(()) else: total_reward = numpy.mean(rewards) return float(total_reward) ``` ``` ## `evaluate(x=None, y=None, batch_size=32, verbose='auto', steps=None, callbacks=None, return_dict=True, **kwargs)` Returns the reward value & metrics values for the program in test mode. Computation is done in batches (see the `batch_size` arg.) Parameters: | Name | Type | Description | Default | | --- | --- | --- | --- | | `x` | `ndarray | generator` | Input data. It can be: - A NumPy array (or array-like), or a list of DataModel arrays (in case the model has multiple inputs). - A list of dict mapping input names to the corresponding DataModels, if the program has named inputs. - A Python generator function yielding (inputs, targets). | `None` | | `y` | `ndarray` | Target data. Like the input data x, it can be either NumPy array(s) of DataModel(s). If x is a Python generator function, y should not be specified since targets will be obtained from x. | `None` | | `batch_size` | `int` | Integer or None. Number of samples per batch of computation. If unspecified, batch_size will default to 32. Do not specify the batch_size if your input data x is a Python generator function since they generate batches. | `32` | | `verbose` | `int | str` | "auto", 0, 1, or 2. Verbosity mode. 0 = silent, 1 = progress bar, 2 = single line. "auto" becomes 1 for most cases. Note that the progress bar is not particularly useful when logged to a file, so verbose=2 is recommended when not running interactively (e.g. in a production environment). Defaults to "auto". | `'auto'` | | `steps` | `int` | Integer or None. Total number of steps (batches of samples) to draw before declaring the evaluation round finished. If steps is None, it will run until x is exhausted. In the case of an infinitely repeating dataset, it will run indefinitely. | `None` | | `callbacks` | `list` | List of synalinks.callbacks.Callback instances. List of callbacks to apply during evaluation. | `None` | | `return_dict` | `bool` | If True, reward and metric results are returned as a dict, with each key being the name of the metric. If False, they are returned as a list. | `True` | Returns: | Type | Description | | --- | --- | | `float | list | dict` | Scalar test reward (if the program has a single output and no metrics) or list of scalars (if the program has multiple outputs and/or metrics). The attribute program.metrics_names will give you the display labels for the scalar outputs. | Source code in `synalinks/src/trainers/trainer.py` ``` async def evaluate( self, x=None, y=None, batch_size=32, verbose="auto", steps=None, callbacks=None, return_dict=True, \*\*kwargs, ): """Returns the reward value & metrics values for the program in test mode. ``` Computation is done in batches (see the `batch_size` arg.) Args: x (np.ndarray | generator): Input data. It can be: - A NumPy array (or array-like), or a list of `DataModel` arrays (in case the model has multiple inputs). - A list of dict mapping input names to the corresponding `DataModel`s, if the program has named inputs. - A Python generator function yielding `(inputs, targets)`. y (np.ndarray): Target data. Like the input data `x`, it can be either NumPy array(s) of `DataModel`(s). If `x` is a Python generator function, `y` should not be specified since targets will be obtained from `x`. batch_size (int): Integer or `None`. Number of samples per batch of computation. If unspecified, `batch_size` will default to 32. Do not specify the `batch_size` if your input data `x` is a Python generator function since they generate batches. verbose (int | str): `"auto"`, 0, 1, or 2. Verbosity mode. 0 = silent, 1 = progress bar, 2 = single line. `"auto"` becomes 1 for most cases. Note that the progress bar is not particularly useful when logged to a file, so `verbose=2` is recommended when not running interactively (e.g. in a production environment). Defaults to `"auto"`. steps (int): Integer or `None`. Total number of steps (batches of samples) to draw before declaring the evaluation round finished. If `steps` is `None`, it will run until `x` is exhausted. In the case of an infinitely repeating dataset, it will run indefinitely. callbacks (list): List of `synalinks.callbacks.Callback` instances. List of callbacks to apply during evaluation. return_dict (bool): If `True`, reward and metric results are returned as a dict, with each key being the name of the metric. If `False`, they are returned as a list. Returns: (float | list | dict): Scalar test reward (if the program has a single output and no metrics) or list of scalars (if the program has multiple outputs and/or metrics). The attribute `program.metrics_names` will give you the display labels for the scalar outputs. """ self._assert_compile_called("evaluate") use_cached_eval_dataset = kwargs.pop("_use_cached_eval_dataset", False) if kwargs: raise ValueError(f"Arguments not recognized: {kwargs}") # Create an iterator that yields batches of input/target data. if use_cached_eval_dataset: epoch_iterator = self._eval_epoch_iterator else: epoch_iterator = EpochIterator( x=x, y=y, batch_size=batch_size, steps_per_epoch=steps, shuffle=False, steps_per_execution=self.steps_per_execution, ) if not all(module.built for module in self._flatten_modules()): # Build the model on one batch of data. for _, data in epoch_iterator: data_batch = data[0] self._auto_build( iterator=epoch_iterator, data_batch=data_batch, ) break epoch_iterator.reset() # Container that configures and calls callbacks. if not isinstance(callbacks, callbacks_module.CallbackList): callbacks = callbacks_module.CallbackList( callbacks, add_history=False, add_progbar=verbose != 0, verbose=verbose, epochs=1, steps=epoch_iterator.num_batches, program=self, ) self.stop_evaluating = False callbacks.on_test_begin() logs = {} self.reset_metrics() for step, iterator in epoch_iterator: callbacks.on_test_batch_begin(step) data = iterator[0] x_batch, y_batch = data_adapter_utils.unpack_x_y(data) logs = await self.test_on_batch( x=x_batch, y=y_batch, return_dict=True, ) callbacks.on_test_batch_end(step, logs) if self.stop_evaluating: break logs = self.get_metrics_result() callbacks.on_test_end(logs) if return_dict: return logs return self._flatten_metrics_in_order(logs) ``` ``` ## `fit(x=None, y=None, batch_size=1, epochs=1, verbose='auto', callbacks=None, validation_split=0.1, validation_data=None, shuffle=True, initial_epoch=0, steps_per_epoch=None, validation_steps=None, validation_batch_size=32, validation_freq=1)` Trains the program for a fixed number of epochs (dataset iterations). Parameters: | Name | Type | Description | Default | | --- | --- | --- | --- | | `x` | `ndarray | generator` | Input data. It can be: - A NumPy array (or array-like), or a list of DataModel arrays (in case the model has multiple inputs). - A list of dict mapping input names to the corresponding DataModels, if the program has named inputs. - A Python generator function yielding (inputs, targets). | `None` | | `y` | `ndarray` | Target data. Like the input data x, it can be either NumPy array(s) of DataModel(s). If x is a Python generator function, y should not be specified since targets will be obtained from x. | `None` | | `batch_size` | `int` | Integer or None. Number of samples per batch of computation. If unspecified, batch_size will default to 32. Do not specify the batch_size if your input data x is a Python generator function since they generate batches. | `1` | | `epochs` | `int` | Integer. Number of epochs to train the program. An epoch is an iteration over the entire x and y data provided (unless the steps_per_epoch flag is set to something other than None). Note that in conjunction with initial_epoch, epochs is to be understood as "final epoch". The program is not trained for a number of iterations given by epochs, but merely until the epoch of index epochs is reached. | `1` | | `verbose` | `int` | "auto", 0, 1, or 2. Verbosity mode. 0 = silent, 1 = progress bar, 2 = one line per epoch. "auto" becomes 1 for most cases. Note that the progress bar is not particularly useful when logged to a file, so verbose=2 is recommended when not running interactively (e.g., in a production environment). Defaults to "auto". | `'auto'` | | `callbacks` | `list` | List of synalinks.callbacks.Callback instances. List of callbacks to apply during training. See synalinks.callbacks. Note synalinks.callbacks.ProgbarLogger and synalinks.callbacks.History callbacks are created automatically and need not be passed to program.fit(). synalinks.callbacks.ProgbarLogger is created or not based on the verbose argument in program.fit(). | `None` | | `validation_split` | `float` | Float between 0 and 1. Fraction of the training data to be used as validation data. The program will set apart this fraction of the training data, will not train on it, and will evaluate the reward and any program metrics on this data at the end of each epoch. The validation data is selected from the last samples in the x and y data provided, before shuffling. This argument is only supported when x and y are made of data_models. If both validation_data and validation_split are provided, validation_data will override validation_split. | `0.1` | | `validation_data` | `tuple | iterator` | Data on which to evaluate the reward and any program metrics at the end of each epoch. The program will not be trained on this data. validation_data will override validation_split. It can be: - A tuple (x_val, y_val) of DataModels lists. | `None` | | `shuffle` | `bool` | Whether to shuffle the training data before each epoch. This argument is ignored when x is a Python generator function. | `True` | | `initial_epoch` | `int` | Integer. Epoch at which to start training (useful for resuming a previous training run). | `0` | | `steps_per_epoch` | `int` | Integer or None. Total number of steps (batches of samples) before declaring one epoch finished and starting the next epoch. When training with input data_models arrays, the default None means that the value used is the number of samples in your dataset divided by the batch size, or 1 if that cannot be determined. If x is a Python generator function, the epoch will run until the input dataset is exhausted. When passing an infinitely repeating dataset, you must specify the steps_per_epoch argument, otherwise the training will run indefinitely. | `None` | | `validation_steps` | `int` | Integer or None. Only relevant if validation_data is provided. Total number of steps (batches of samples) to draw before stopping when performing validation at the end of every epoch. If validation_steps is None, validation will run until the validation_data dataset is exhausted. In the case of an infinitely repeating dataset, it will run indefinitely. If validation_steps is specified and only part of the dataset is consumed, the evaluation will start from the beginning of the dataset at each epoch. This ensures that the same validation samples are used every time. | `None` | | `validation_batch_size` | `int` | Integer or None. Number of samples per validation batch. If unspecified, will default to batch_size. Do not specify the validation_batch_size if your data is a synalinks.utils.PyDataset, tf.data.Dataset, torch.utils.data.DataLoader or Python generator function since they generate batches. | `32` | | `validation_freq` | `int` | Only relevant if validation data is provided. Specifies how many training epochs to run before a new validation run is performed, e.g. validation_freq=2 runs validation every 2 epochs. | `1` | Returns: | Type | Description | | --- | --- | | `History` | A History object. Its History.history attribute is a record of training reward values and metrics values at successive epochs, as well as validation reward values and validation metrics values (if applicable). | Source code in `synalinks/src/trainers/trainer.py` ``` async def fit( self, x=None, y=None, batch_size=1, epochs=1, verbose="auto", callbacks=None, validation_split=0.1, validation_data=None, shuffle=True, initial_epoch=0, steps_per_epoch=None, validation_steps=None, validation_batch_size=32, validation_freq=1, ): """Trains the program for a fixed number of epochs (dataset iterations). ``` Args: x (np.ndarray | generator): Input data. It can be: - A NumPy array (or array-like), or a list of `DataModel` arrays (in case the model has multiple inputs). - A list of dict mapping input names to the corresponding `DataModel`s, if the program has named inputs. - A Python generator function yielding `(inputs, targets)`. y (np.ndarray): Target data. Like the input data `x`, it can be either NumPy array(s) of `DataModel`(s). If `x` is a Python generator function, `y` should not be specified since targets will be obtained from `x`. batch_size (int): Integer or `None`. Number of samples per batch of computation. If unspecified, `batch_size` will default to 32. Do not specify the `batch_size` if your input data `x` is a Python generator function since they generate batches. epochs (int): Integer. Number of epochs to train the program. An epoch is an iteration over the entire `x` and `y` data provided (unless the `steps_per_epoch` flag is set to something other than None). Note that in conjunction with `initial_epoch`, `epochs` is to be understood as "final epoch". The program is not trained for a number of iterations given by `epochs`, but merely until the epoch of index `epochs` is reached. verbose (int): `"auto"`, 0, 1, or 2. Verbosity mode. 0 = silent, 1 = progress bar, 2 = one line per epoch. "auto" becomes 1 for most cases. Note that the progress bar is not particularly useful when logged to a file, so `verbose=2` is recommended when not running interactively (e.g., in a production environment). Defaults to `"auto"`. callbacks (list): List of `synalinks.callbacks.Callback` instances. List of callbacks to apply during training. See `synalinks.callbacks`. Note `synalinks.callbacks.ProgbarLogger` and `synalinks.callbacks.History` callbacks are created automatically and need not be passed to `program.fit()`. `synalinks.callbacks.ProgbarLogger` is created or not based on the `verbose` argument in `program.fit()`. validation_split (float): Float between 0 and 1. Fraction of the training data to be used as validation data. The program will set apart this fraction of the training data, will not train on it, and will evaluate the reward and any program metrics on this data at the end of each epoch. The validation data is selected from the last samples in the `x` and `y` data provided, before shuffling. This argument is only supported when `x` and `y` are made of data_models. If both `validation_data` and `validation_split` are provided, `validation_data` will override `validation_split`. validation_data (tuple | iterator): Data on which to evaluate the reward and any program metrics at the end of each epoch. The program will not be trained on this data. `validation_data` will override `validation_split`. It can be: - A tuple `(x_val, y_val)` of `DataModel`s lists. shuffle (bool): Whether to shuffle the training data before each epoch. This argument is ignored when `x` is a Python generator function. initial_epoch (int): Integer. Epoch at which to start training (useful for resuming a previous training run). steps_per_epoch (int): Integer or `None`. Total number of steps (batches of samples) before declaring one epoch finished and starting the next epoch. When training with input data_models arrays, the default `None` means that the value used is the number of samples in your dataset divided by the batch size, or 1 if that cannot be determined. If `x` is a Python generator function, the epoch will run until the input dataset is exhausted. When passing an infinitely repeating dataset, you must specify the `steps_per_epoch` argument, otherwise the training will run indefinitely. validation_steps (int): Integer or `None`. Only relevant if `validation_data` is provided. Total number of steps (batches of samples) to draw before stopping when performing validation at the end of every epoch. If `validation_steps` is `None`, validation will run until the `validation_data` dataset is exhausted. In the case of an infinitely repeating dataset, it will run indefinitely. If `validation_steps` is specified and only part of the dataset is consumed, the evaluation will start from the beginning of the dataset at each epoch. This ensures that the same validation samples are used every time. validation_batch_size (int): Integer or `None`. Number of samples per validation batch. If unspecified, will default to `batch_size`. Do not specify the `validation_batch_size` if your data is a `synalinks.utils.PyDataset`, `tf.data.Dataset`, `torch.utils.data.DataLoader` or Python generator function since they generate batches. validation_freq (int): Only relevant if validation data is provided. Specifies how many training epochs to run before a new validation run is performed, e.g. `validation_freq=2` runs validation every 2 epochs. Returns: (History): A `History` object. Its `History.history` attribute is a record of training reward values and metrics values at successive epochs, as well as validation reward values and validation metrics values (if applicable). """ self._assert_compile_called("fit") self._eval_epoch_iterator = None val_y, val_y = None, None if validation_split and validation_data is None: # Create the validation data using the training data. Only supported # for numpy arrays. (x, y), validation_data = array_slicing.train_validation_split( (x, y), validation_split=validation_split ) if validation_data is not None: (val_x, val_y) = data_adapter_utils.unpack_x_y(validation_data) # Create an iterator that yields batches of input/target data. epoch_iterator = EpochIterator( x=x, y=y, batch_size=batch_size, steps_per_epoch=steps_per_epoch, shuffle=False, steps_per_execution=self.steps_per_execution, ) if not all(module.built for module in self._flatten_modules()): # Build the model on one batch of data. for _, data in epoch_iterator: data_batch = data[0] self._auto_build( iterator=epoch_iterator, data_batch=data_batch, ) break epoch_iterator.reset() # Container that configures and calls callbacks. if not isinstance(callbacks, callbacks_module.CallbackList): callbacks = callbacks_module.CallbackList( callbacks, add_history=True, add_progbar=verbose != 0, verbose=verbose, epochs=epochs, steps=steps_per_epoch, program=self, ) self.stop_training = False callbacks.on_train_begin() training_logs = None logs = {} initial_epoch = self._initial_epoch or initial_epoch if self.trainable_variables and isinstance( self.optimizer, optimizers_module.Optimizer ): await self.optimizer.on_train_begin( self.trainable_variables, ) for epoch in range(initial_epoch, epochs): self.reset_metrics() if self.trainable_variables and isinstance( self.optimizer, optimizers_module.Optimizer ): await self.optimizer.on_epoch_begin( epoch, self.trainable_variables, ) callbacks.on_epoch_begin(epoch) with epoch_iterator.catch_stop_iteration(): for step, iterator in epoch_iterator: data = iterator[0] x_batch, y_batch = data_adapter_utils.unpack_x_y(data) if self.trainable_variables and isinstance( self.optimizer, optimizers_module.Optimizer ): await self.optimizer.on_batch_begin( step, epoch, self.trainable_variables, ) callbacks.on_train_batch_begin(step) logs = await self.train_on_batch( step=step, x=x_batch, y=y_batch, val_x=val_x, val_y=val_y, return_dict=True, ) val_logs = await self.evaluate( x=val_x, y=val_y, batch_size=validation_batch_size or batch_size, steps=validation_steps, callbacks=callbacks, _use_cached_eval_dataset=False, ) if self.trainable_variables and isinstance( self.optimizer, optimizers_module.Optimizer ): await self.optimizer.on_batch_end( step, epoch, self.trainable_variables, ) callbacks.on_train_batch_end(step, logs) if self.stop_training: break # Override with model metrics instead of last step logs if needed. epoch_logs = dict(self._get_metrics_result_or_logs(logs)) # Run validation. if validation_data is not None and self._should_eval(epoch, validation_freq): # Create EpochIterator for evaluation and cache it. if getattr(self, "_eval_epoch_iterator", None) is None: self._eval_epoch_iterator = EpochIterator( x=val_x, y=val_y, batch_size=validation_batch_size or batch_size, steps_per_execution=self.steps_per_execution, steps_per_epoch=validation_steps, shuffle=False, ) val_logs = await self.evaluate( x=val_x, y=val_y, batch_size=validation_batch_size or batch_size, steps=validation_steps, callbacks=callbacks, _use_cached_eval_dataset=True, ) val_logs = {"val_" + name: val for name, val in val_logs.items()} epoch_logs.update(val_logs) if self.trainable_variables and isinstance( self.optimizer, optimizers_module.Optimizer ): await self.optimizer.on_epoch_end( epoch, self.trainable_variables, ) callbacks.on_epoch_end(epoch, epoch_logs) training_logs = epoch_logs if self.stop_training: break # If _eval_epoch_iterator exists, delete it after all epochs are done. if getattr(self, "_eval_epoch_iterator", None) is not None: del self._eval_epoch_iterator if self.trainable_variables and isinstance( self.optimizer, optimizers_module.Optimizer ): await self.optimizer.on_train_end(self.trainable_variables) callbacks.on_train_end(logs=training_logs) return self.history ``` ``` ## `get_compile_config()` Returns a serialized config with information for compiling the program. This method returns a config dictionary containing all the information (optimizer, reward, metrics, etc.) with which the program was compiled. Returns: | Type | Description | | --- | --- | | `dict` | A dict containing information for compiling the program. | Source code in `synalinks/src/trainers/trainer.py` ``` def get_compile_config(self): """Returns a serialized config with information for compiling the program. ``` This method returns a config dictionary containing all the information (optimizer, reward, metrics, etc.) with which the program was compiled. Returns: (dict): A dict containing information for compiling the program. """ if self.compiled and hasattr(self, "_compile_config"): return self._compile_config.serialize() ``` ``` ## `get_metrics_result()` Returns the program's metrics values as a dict. If any of the metric result is a dict (containing multiple metrics), each of them gets added to the top level returned dict of this method. Returns: | Type | Description | | --- | --- | | `dict` | A dict containing values of the metrics listed in self.metrics. Example: {'reward': 0.2, 'accuracy': 0.7}. | Source code in `synalinks/src/trainers/trainer.py` ``` def get_metrics_result(self): """Returns the program's metrics values as a dict. ``` If any of the metric result is a dict (containing multiple metrics), each of them gets added to the top level returned dict of this method. Returns: (dict): A `dict` containing values of the metrics listed in `self.metrics`. Example: `{'reward': 0.2, 'accuracy': 0.7}`. """ return_metrics = {} for metric in self.metrics: result = metric.result() if isinstance(result, dict): return_metrics.update(result) else: return_metrics[metric.name] = result return python_utils.pythonify_logs(return_metrics) ``` ``` ## `predict(x, batch_size=None, verbose='auto', steps=None, callbacks=None)` Generates output predictions for the input samples. Computation is done in batches. This method is designed for batch processing of large numbers of inputs. It is not intended for use inside of loops that iterate over your data and process small numbers of inputs at a time. For small numbers of inputs that fit in one batch, directly use `__call__()` for faster execution, e.g., `program(x)`, or `program(x, training=False)` if you have modules that behave differently during inference. Parameters: | Name | Type | Description | Default | | --- | --- | --- | --- | | `x` | `ndarray | generator` | Input data. It can be: - A NumPy array (or array-like), or a list of DataModel arrays (in case the model has multiple inputs). - A list of dict mapping input names to the corresponding DataModels, if the program has named inputs. - A Python generator function yielding (inputs, targets). | *required* | | `batch_size` | `int` | Integer or None. Number of samples per batch of computation. If unspecified, batch_size will default to 32. Do not specify the batch_size if your input data x is a synalinks.utils.PyDataset, tf.data.Dataset, torch.utils.data.DataLoader or Python generator function since they generate batches. | `None` | | `verbose` | `int` | "auto", 0, 1, or 2. Verbosity mode. 0 = silent, 1 = progress bar, 2 = single line. "auto" becomes 1 for most cases. Note that the progress bar is not particularly useful when logged to a file, so verbose=2 is recommended when not running interactively (e.g. in a production environment). Defaults to "auto". | `'auto'` | | `steps` | `int` | Total number of steps (batches of samples) to draw before declaring the prediction round finished. If steps is None, it will run until x is exhausted. In the case of an infinitely repeating dataset, it will run indefinitely. | `None` | | `callbacks` | `list` | List of synalinks.callbacks.Callback instances. List of callbacks to apply during prediction. | `None` | Returns: | Type | Description | | --- | --- | | `list` | JsonDataModel array(s) of predictions. If the pipeline failed, a None is added to the predictions. | Source code in `synalinks/src/trainers/trainer.py` ``` async def predict( self, x, batch_size=None, verbose="auto", steps=None, callbacks=None ): """Generates output predictions for the input samples. ``` Computation is done in batches. This method is designed for batch processing of large numbers of inputs. It is not intended for use inside of loops that iterate over your data and process small numbers of inputs at a time. For small numbers of inputs that fit in one batch, directly use `__call__()` for faster execution, e.g., `program(x)`, or `program(x, training=False)` if you have modules that behave differently during inference. Args: x (np.ndarray | generator): Input data. It can be: - A NumPy array (or array-like), or a list of `DataModel` arrays (in case the model has multiple inputs). - A list of dict mapping input names to the corresponding `DataModel`s, if the program has named inputs. - A Python generator function yielding `(inputs, targets)`. batch_size (int): Integer or `None`. Number of samples per batch of computation. If unspecified, `batch_size` will default to 32. Do not specify the `batch_size` if your input data `x` is a `synalinks.utils.PyDataset`, `tf.data.Dataset`, `torch.utils.data.DataLoader` or Python generator function since they generate batches. verbose (int): `"auto"`, 0, 1, or 2. Verbosity mode. 0 = silent, 1 = progress bar, 2 = single line. `"auto"` becomes 1 for most cases. Note that the progress bar is not particularly useful when logged to a file, so `verbose=2` is recommended when not running interactively (e.g. in a production environment). Defaults to `"auto"`. steps (int): Total number of steps (batches of samples) to draw before declaring the prediction round finished. If `steps` is `None`, it will run until `x` is exhausted. In the case of an infinitely repeating dataset, it will run indefinitely. callbacks (list): List of `synalinks.callbacks.Callback` instances. List of callbacks to apply during prediction. Returns: (list): `JsonDataModel` array(s) of predictions. If the pipeline failed, a None is added to the predictions. """ # Create an iterator that yields batches of input data. epoch_iterator = EpochIterator( x=x, batch_size=batch_size, steps_per_epoch=steps, shuffle=False, steps_per_execution=self.steps_per_execution, ) # Container that configures and calls callbacks. if not isinstance(callbacks, callbacks_module.CallbackList): callbacks = callbacks_module.CallbackList( callbacks, add_history=True, add_progbar=verbose != 0, verbose=verbose, epochs=1, steps=epoch_iterator.num_batches, model=self, ) self.stop_predicting = False callbacks.on_test_begin() outputs = [] for step, iterator in epoch_iterator: callbacks.on_predict_batch_begin(step) data = iterator[0] x_batch, _ = data_adapter_utils.unpack_x_y(data) batch_outputs = await self.predict_on_batch(x_batch) outputs.extend(batch_outputs) callbacks.on_predict_batch_end(step, {"outputs": batch_outputs}) if self.stop_predicting: break callbacks.on_predict_end() return np.array(outputs, dtype="object") ``` ``` ## `predict_on_batch(x, training=False)` Returns predictions for a single batch of samples. Parameters: | Name | Type | Description | Default | | --- | --- | --- | --- | | `x` | `ndarray` | Input data. Must be array-like. | *required* | | `training` | `bool` | Boolean. True if training. | `False` | Returns: | Type | Description | | --- | --- | | `list` | list(s) of JsonDataModel predictions. | Source code in `synalinks/src/trainers/trainer.py` ``` async def predict_on_batch(self, x, training=False): """Returns predictions for a single batch of samples. ``` Args: x (np.ndarray): Input data. Must be array-like. training (bool): Boolean. True if training. Returns: (list): list(s) of JsonDataModel predictions. """ tasks = [] for inputs in x: tasks.append(self(inputs, training=training)) y_pred = await asyncio.gather(*tasks) return y_pred ``` ``` ## `test_on_batch(x, y=None, return_dict=False)` Test the program on a single batch of samples. Parameters: | Name | Type | Description | Default | | --- | --- | --- | --- | | `x` | `ndarray` | Input data. Must be array-like. | *required* | | `y` | `ndarray` | Target data. Must be array-like. | `None` | | `return_dict` | `bool` | If True, reward and metric results are returned as a dict, with each key being the name of the metric. If False, they are returned as a list. | `False` | Returns: | Type | Description | | --- | --- | | `float | list | dict` | A scalar reward value (when no metrics and return_dict=False), a list of reward and metric values (if there are metrics and return_dict=False), or a dict of metric and reward values (if return_dict=True). | Source code in `synalinks/src/trainers/trainer.py` ``` async def test_on_batch( self, x, y=None, return_dict=False, ): """Test the program on a single batch of samples. ``` Args: x (np.ndarray): Input data. Must be array-like. y (np.ndarray): Target data. Must be array-like. return_dict (bool): If `True`, reward and metric results are returned as a dict, with each key being the name of the metric. If `False`, they are returned as a list. Returns: (float | list | dict): A scalar reward value (when no metrics and `return_dict=False`), a list of reward and metric values (if there are metrics and `return_dict=False`), or a dict of metric and reward values (if `return_dict=True`). """ y_pred = await self.predict_on_batch(x) reward = await self.compute_reward( x=x, y=y, y_pred=y_pred, training=False, ) await self._reward_tracker.update_state(reward) metrics = await self.compute_metrics(x, y, y_pred) if return_dict: return metrics return self._flatten_metrics_in_order(metrics) ``` ``` ## `train_on_batch(step, x, y=None, val_x=None, val_y=None, return_dict=False)` Runs a single optimization step on a single batch of data. Parameters: | Name | Type | Description | Default | | --- | --- | --- | --- | | `step` | `int` | The training step. | *required* | | `x` | `ndarray` | Input data. Must be array-like. | *required* | | `y` | `ndarray` | Target data. Must be array-like. | `None` | | `val_x` | `ndarray` | Input validation data. Must be array-like. | `None` | | `val_y` | `ndarray` | Target validation data. Must be array-like. | `None` | | `return_dict` | `bool` | If True, reward and metric results are returned as a dict, with each key being the name of the metric. If False, they are returned as a list. | `False` | Returns: | Type | Description | | --- | --- | | `float | list | dict` | A scalar reward value (when no metrics and return_dict=False), a list of reward and metric values (if there are metrics and return_dict=False), or a dict of metric and reward values (if return_dict=True). | Source code in `synalinks/src/trainers/trainer.py` ``` async def train_on_batch( self, step, x, y=None, val_x=None, val_y=None, return_dict=False, ): """Runs a single optimization step on a single batch of data. ``` Args: step (int): The training step. x (np.ndarray): Input data. Must be array-like. y (np.ndarray): Target data. Must be array-like. val_x (np.ndarray): Input validation data. Must be array-like. val_y (np.ndarray): Target validation data. Must be array-like. return_dict (bool): If `True`, reward and metric results are returned as a dict, with each key being the name of the metric. If `False`, they are returned as a list. Returns: (float | list | dict): A scalar reward value (when no metrics and `return_dict=False`), a list of reward and metric values (if there are metrics and `return_dict=False`), or a dict of metric and reward values (if `return_dict=True`). """ if self.trainable_variables and isinstance( self.optimizer, optimizers_module.Optimizer ): metrics = await self.optimizer.optimize( step, self.trainable_variables, x=x, y=y, val_x=val_x, val_y=val_y, ) else: warnings.warn("The program does not have any trainable variables.") y_pred = await self.predict_on_batch(val_x) reward = await self.compute_reward( x=val_x, y=val_y, y_pred=y_pred, ) await self._reward_tracker.update_state(reward) metrics = await self.compute_metrics(val_x, val_y, y_pred) if return_dict: return metrics return self._flatten_metrics_in_order(metrics) ``` ``` ``` # The Program class Bases: `Trainer`, `Module` A program grouping modules into an object with training/inference features. There is four ways to instantiate a `Program`: ### With the "Functional API" You start from `Input`, you chain modules calls to specify the program's structure, and finally, you create your program from inputs and outputs: ``` import synalinks import asyncio class Query(synalinks.DataModel): query: str = synalinks.Field( description="The user query", ) class AnswerWithThinking(synalinks.DataModel): thinking: str = synalinks.Field( description="Your step by step thinking process", ) answer: float = synalinks.Field( description="The correct numerical answer", ) async def main(): language_model = synalinks.LanguageModel( model="ollama/mistral", ) x0 = synalinks.Input(data_model=Query) x1 = await synalinks.Generator( data_model=AnswerWithThinking, language_model=language_model, )(x0) program = synalinks.Program( inputs=x0, outputs=x1, name="chain_of_thought", description="Useful to answer in a step by step manner.", ) if __name__ == "__main__": asyncio.run(main()) ``` Note: Only dicts, lists, and tuples of input data models are supported. Nested inputs are not supported (e.g. lists of list or dicts of dict). ### By subclassing the `Program` class In that case, you should define your modules in `__init__()` and you should implement the program's structure in `call()` . ``` import synalinks import asyncio class Query(synalinks.DataModel): query: str = synalinks.Field( description="The user query", ) class AnswerWithThinking(synalinks.DataModel): thinking: str = synalinks.Field( description="Your step by step thinking process", ) answer: float = synalinks.Field( description="The correct numerical answer", ) class ChainOfThought(synalinks.Program): """Useful to answer in a step by step manner. The first line of the docstring is provided as description for the program if not provided in the `super().__init__()`. In a similar way the name is automatically infered based on the class name if not provided. """ def __init__( self, language_model=None, name=None, description=None, trainable=True, ): super().__init__( name=name, description=description, trainable=trainable, ) self.answer = synalinks.Generator( data_model=AnswerWithThinking, language_model=language_model, name="generator_"+self.name, ) async def call(self, inputs, training=False): if not inputs: return None x = await self.answer(inputs, training=training) return x def get_config(self): config = { "name": self.name, "description": self.description, "trainable": self.trainable, } language_model_config = { "language_model": synalinks.saving.serialize_synalinks_object( self.language_model ) } return {**config, **language_model_config} @classmethod def from_config(cls, config): language_model = synalinks.saving.deserialize_synalinks_object( config.pop("language_model") ) return cls(language_model=language_model, **config) async def main(): language_model = synalinks.LanguageModel( model="ollama/mistral", ) program = ChainOfThought( language_model=language_model, ) ``` If you subclass `Program`, you can optionally have a `training` argument (boolean) in `call()`, which you can use to specify a different behavior in training and inference. Once the program is created, you can config the program with rewards and metrics with `program.compile()`, train the program with `program.fit()`, or use the program to do prediction with `program.predict()` or `program()`. To understand the difference between `program.predict()` or `program()`, read the [FAQ](https://synalinks.github.io/synalinks/FAQ/#whats-the-difference-between-program-methods-predict-and-__call__). ### Mixing the subclassing and the `Functional` API This way of programming is recommended to encapsulate your application while providing an easy to use setup. It is the recommended way for most users as it avoid making your program/agents from scratch. In that case, you should implement only the `__init__()` and `build()` methods. ``` import synalinks import asyncio class Query(synalinks.DataModel): query: str = synalinks.Field( description="The user query", ) class AnswerWithThinking(synalinks.DataModel): thinking: str = synalinks.Field( description="Your step by step thinking process", ) answer: float = synalinks.Field( description="The correct numerical answer", ) async def main(): class ChainOfThought(synalinks.Program): """Useful to answer in a step by step manner.""" def __init__( self, language_model=None, name=None, description=None, trainable=True, ): super().__init__( name=name, description=description, trainable=trainable, ) self.language_model = language_model async def build(self, inputs): outputs = await synalinks.Generator( data_model=AnswerWithThinking, language_model=self.language_model, )(inputs) # Create your program using the functional API super().__init__( inputs=inputs, outputs=outputs, name=self.name, description=self.description, trainable=self.trainable, ) language_model = synalinks.LanguageModel( model="ollama/mistral", ) program = ChainOfThought( language_model=language_model, ) if __name__ == "__main__": asyncio.run(main()) ``` This allows you to not have to implement the `call()` and serialization methods (`get_config()` and `from_config()`). The program will be built for any inputs the first time called. ### With the `Sequential` class In addition, `synalinks.Sequential` is a special case of program where the program is purely a stack of single-input, single-output modules. ``` import synalinks import asyncio class Query(synalinks.DataModel): query: str = synalinks.Field( description="The user query", ) class AnswerWithThinking(synalinks.DataModel): thinking: str = synalinks.Field( description="Your step by step thinking process", ) answer: float = synalinks.Field( description="The correct numerical answer", ) async def main(): language_model = synalinks.LanguageModel(model="ollama/mistral") program = synalinks.Sequential( [ synalinks.Input( data_model=Query, ), synalinks.Generator( data_model=AnswerWithThinking, language_model=language_model, ), ], name="chain_of_thought", description="Useful to answer in a step by step manner.", ) if __name__ == "__main__": asyncio.run(main()) ``` Source code in `synalinks/src/programs/program.py` ```` @synalinks_export(["synalinks.Program", "synalinks.programs.Program"]) class Program(Trainer, Module): """A program grouping modules into an object with training/inference features. There is four ways to instantiate a `Program`: ## With the "Functional API" You start from `Input`, you chain modules calls to specify the program's structure, and finally, you create your program from inputs and outputs: ```python import synalinks import asyncio class Query(synalinks.DataModel): query: str = synalinks.Field( description="The user query", ) class AnswerWithThinking(synalinks.DataModel): thinking: str = synalinks.Field( description="Your step by step thinking process", ) answer: float = synalinks.Field( description="The correct numerical answer", ) async def main(): language_model = synalinks.LanguageModel( model="ollama/mistral", ) x0 = synalinks.Input(data_model=Query) x1 = await synalinks.Generator( data_model=AnswerWithThinking, language_model=language_model, )(x0) program = synalinks.Program( inputs=x0, outputs=x1, name="chain_of_thought", description="Useful to answer in a step by step manner.", ) if __name__ == "__main__": asyncio.run(main()) ``` Note: Only dicts, lists, and tuples of input data models are supported. Nested inputs are not supported (e.g. lists of list or dicts of dict). ## By subclassing the `Program` class In that case, you should define your modules in `__init__()` and you should implement the program's structure in `call()` . ```python import synalinks import asyncio class Query(synalinks.DataModel): query: str = synalinks.Field( description="The user query", ) class AnswerWithThinking(synalinks.DataModel): thinking: str = synalinks.Field( description="Your step by step thinking process", ) answer: float = synalinks.Field( description="The correct numerical answer", ) class ChainOfThought(synalinks.Program): \"\"\"Useful to answer in a step by step manner. The first line of the docstring is provided as description for the program if not provided in the `super().__init__()`. In a similar way the name is automatically infered based on the class name if not provided. \"\"\" def __init__( self, language_model=None, name=None, description=None, trainable=True, ): super().__init__( name=name, description=description, trainable=trainable, ) self.answer = synalinks.Generator( data_model=AnswerWithThinking, language_model=language_model, name="generator_"+self.name, ) async def call(self, inputs, training=False): if not inputs: return None x = await self.answer(inputs, training=training) return x def get_config(self): config = { "name": self.name, "description": self.description, "trainable": self.trainable, } language_model_config = \ { "language_model": synalinks.saving.serialize_synalinks_object( self.language_model ) } return {**config, **language_model_config} @classmethod def from_config(cls, config): language_model = synalinks.saving.deserialize_synalinks_object( config.pop("language_model") ) return cls(language_model=language_model, **config) async def main(): language_model = synalinks.LanguageModel( model="ollama/mistral", ) program = ChainOfThought( language_model=language_model, ) ``` If you subclass `Program`, you can optionally have a `training` argument (boolean) in `call()`, which you can use to specify a different behavior in training and inference. Once the program is created, you can config the program with rewards and metrics with `program.compile()`, train the program with `program.fit()`, or use the program to do prediction with `program.predict()` or `program()`. To understand the difference between `program.predict()` or `program()`, read the [FAQ](https://synalinks.github.io/synalinks/FAQ/#whats-the-difference-between-program-methods-predict-and-__call__). ## Mixing the subclassing and the `Functional` API This way of programming is recommended to encapsulate your application while providing an easy to use setup. It is the recommended way for most users as it avoid making your program/agents from scratch. In that case, you should implement only the `__init__()` and `build()` methods. ```python import synalinks import asyncio class Query(synalinks.DataModel): query: str = synalinks.Field( description="The user query", ) class AnswerWithThinking(synalinks.DataModel): thinking: str = synalinks.Field( description="Your step by step thinking process", ) answer: float = synalinks.Field( description="The correct numerical answer", ) async def main(): class ChainOfThought(synalinks.Program): \"\"\"Useful to answer in a step by step manner.\"\"\" def __init__( self, language_model=None, name=None, description=None, trainable=True, ): super().__init__( name=name, description=description, trainable=trainable, ) self.language_model = language_model async def build(self, inputs): outputs = await synalinks.Generator( data_model=AnswerWithThinking, language_model=self.language_model, )(inputs) # Create your program using the functional API super().__init__( inputs=inputs, outputs=outputs, name=self.name, description=self.description, trainable=self.trainable, ) language_model = synalinks.LanguageModel( model="ollama/mistral", ) program = ChainOfThought( language_model=language_model, ) if __name__ == "__main__": asyncio.run(main()) ``` This allows you to not have to implement the `call()` and serialization methods (`get_config()` and `from_config()`). The program will be built for any inputs the first time called. ## With the `Sequential` class In addition, `synalinks.Sequential` is a special case of program where the program is purely a stack of single-input, single-output modules. ```python import synalinks import asyncio class Query(synalinks.DataModel): query: str = synalinks.Field( description="The user query", ) class AnswerWithThinking(synalinks.DataModel): thinking: str = synalinks.Field( description="Your step by step thinking process", ) answer: float = synalinks.Field( description="The correct numerical answer", ) async def main(): language_model = synalinks.LanguageModel(model="ollama/mistral") program = synalinks.Sequential( [ synalinks.Input( data_model=Query, ), synalinks.Generator( data_model=AnswerWithThinking, language_model=language_model, ), ], name="chain_of_thought", description="Useful to answer in a step by step manner.", ) if __name__ == "__main__": asyncio.run(main()) ``` """ def __new__(cls, *args, **kwargs): # Signature detection for usage of `Program` as a `Functional` if functional_init_arguments(args, kwargs) and cls == Program: from synalinks.src.programs.functional import Functional return Functional.__new__(Functional, *args, **kwargs) return typing.cast(cls, super().__new__(cls)) def __init__(self, *args, **kwargs): Trainer.__init__(self) from synalinks.src.programs import functional # Signature detection for usage of a `Program` subclass # as a `Functional` subclass if functional_init_arguments(args, kwargs): inject_functional_program_class(self.__class__) functional.Functional.__init__(self, *args, **kwargs) else: Module.__init__(self, *args, **kwargs) async def call(self, *args, **kwargs): raise NotImplementedError( f"Program {self.__class__.__name__} does not have a `call()` " "method implemented." ) @property def modules(self): return list(self._flatten_modules(include_self=False, recursive=False)) @modules.setter def modules(self, _): raise AttributeError( "`Program.modules` attribute is reserved and should not be used. " "Please use another name." ) def get_module(self, name=None, index=None): """Retrieves a module based on either its name (unique) or index. If `name` and `index` are both provided, `index` will take precedence. Indices are based on order of horizontal graph traversal (bottom-up). Args: name (str): String, name of module. index (int): Integer, index of module. Returns: (Module): A module instance. """ if index is not None and name is not None: raise ValueError( "Provide only a module name or a module index. Received: " f"index={index}, name={name}." ) if index is not None: if len(self.modules) <= index: raise ValueError( f"Was asked to retrieve module at index {index}" f" but program only has {len(self.modules)}" " modules." ) else: return self.modules[index] if name is not None: for module in self.modules: if module.name == name: return module raise ValueError( f"No such module: {name}. Existing modules are: " f"{list(module.name for module in self.modules)}." ) raise ValueError("Provide either a module name or module index at `get_module`.") def summary( self, line_length=None, positions=None, print_fn=None, expand_nested=False, show_trainable=False, module_range=None, ): """Prints a string summary of the program. Args: line_length (int): Total length of printed lines (e.g. set this to adapt the display to different terminal window sizes). positions (list): Relative or absolute positions of log elements in each line. If not provided, becomes `[0.3, 0.6, 0.70, 1.]`. Defaults to `None`. print_fn (Callable): Print function to use. By default, prints to `stdout`. If `stdout` doesn't work in your environment, change to `print`. It will be called on each line of the summary. You can set it to a custom function in order to capture the string summary. expand_nested (bool): Whether to expand the nested models. Defaults to `False`. show_trainable (bool): Whether to show if a module is trainable. Defaults to `False`. module_range (list | tuple): a list or tuple of 2 strings, which is the starting module name and ending module name (both inclusive) indicating the range of modules to be printed in summary. It also accepts regex patterns instead of exact names. In this case, the start predicate will be the first element that matches `module_range[0]` and the end predicate will be the last element that matches `module_range[1]`. By default `None` considers all modules of the model. Raises: ValueError: if `summary()` is called before the model is built. """ summary_utils.print_summary( self, line_length=line_length, positions=positions, print_fn=print_fn, expand_nested=expand_nested, show_trainable=show_trainable, module_range=module_range, ) def save(self, filepath, overwrite=True, **kwargs): """Saves a program as a `.json` file. Example: ```python import synalinks class Query(synalinks.DataModel): query: str class AnswerWithRationale(synalinks.DataModel): rationale: str answer: str language_model = LanguageModel("ollama/mistral") program = synalinks.Sequential( [ synalinks.Input(data_model=Query), synalinks.Generator( data_model=AnswerWithRationale, language_model=language_model, ), ], ) program.save("program.json") loaded_program = synalinks.programs.program_from_json("program.json") ``` The saved `.json` file contains: - The program's configuration (architecture) - The program's variables - The program's optimizer's state (if any) - The program's reward's state (if any) Thus programs can be reinstantiated in the exact same state. Args: filepath (str | os.PathLike): `str` or `os.PathLike` object. The path where to save the model. Must end in `.json`. overwrite (bool): Whether we should overwrite any existing program at the target location, or instead ask the user via an interactive prompt. Default to `True`. """ from synalinks.src.saving import serialization_lib filepath = file_utils.path_to_string(filepath) if not filepath.endswith(".json"): raise ValueError( f"The filepath should ends with '.json', received filepath={filepath}" ) program_config = serialization_lib.serialize_synalinks_object(self) variables_config = self.get_state_tree() program_config.update({"variables": variables_config}) program_config_string = json.dumps(program_config, indent=2) if file_utils.exists(filepath) and not overwrite: io_utils.ask_to_proceed_with_overwrite(filepath) with open(filepath, "w") as f: f.write(program_config_string) async def build_from_config(self, config): if not config: return status = False if "input_schema" in config: # Case: all inputs are in the first arg (possibly nested). if utils.is_default(self.build): status = self._build_by_run_for_single_pos_arg(config["input_schema"]) else: try: await self.build(config["input_schema"]) status = True except: pass self._build_schemas_dict = config elif "schemas_dict" in config: # Case: inputs were recorded as multiple keyword arguments. if utils.is_default(self.build): status = self._build_by_run_for_kwargs(config["schemas_dict"]) else: try: await self.build(**config["schemas_dict"]) status = True except: pass self._build_schemas_dict = config["schemas_dict"] if not status: warnings.warn( f"Program '{self.name}' had a build config, but the program " "cannot be built automatically in " "`build_from_config(config)`. " "You should implement " "`def build_from_config(self, config)`, " "and you might also want to implement the method " " that generates the config at saving time, " "`def get_build_config(self)`. " "The method `build_from_config()` is meant to " "create the state of the model (i.e. its variables) " "upon deserialization.", stacklevel=2, ) def to_json(self, **kwargs): """Returns a JSON string containing the network configuration. ```python json_string = program.to_json() ``` To load a network from a JSON save file, use `synalinks.programs.program_from_json(json_string, custom_objects={...})`. Args: **kwargs (keyword arguments): Additional keyword arguments to be passed to `json.dumps()`. Returns: (str): A JSON string. """ from synalinks.src.saving import serialization_lib program_config = serialization_lib.serialize_synalinks_object(self) return json.dumps(program_config, **kwargs) @classmethod def from_config(cls, config, custom_objects=None): from synalinks.src.programs.functional import Functional functional_config_keys = [ "name", "modules", "input_modules", "output_modules", ] is_functional_config = all(key in config for key in functional_config_keys) argspec = inspect.getfullargspec(cls.__init__) functional_init_args = inspect.getfullargspec(Functional.__init__).args[1:] revivable_as_functional = ( cls in {Functional, Program} or argspec.args[1:] == functional_init_args or (argspec.varargs == "args" and argspec.varkw == "kwargs") ) if is_functional_config and revivable_as_functional: # Revive Functional model # (but not Functional subclasses with a custom __init__) from synalinks.src.programs.functional import functional_from_config return functional_from_config(cls, config, custom_objects=custom_objects) # Either the model has a custom __init__, or the config # does not contain all the information necessary to # revive a Functional model. This happens when the user creates # subclassed models where `get_config()` is returning # insufficient information to be considered a Functional model. # In this case, we fall back to provide all config into the # constructor of the class. try: return cls(**config) except TypeError as e: raise TypeError( "Unable to revive program from config. When overriding " "the `get_config()` method, make sure that the " "returned config contains all items used as arguments " f"in the constructor to {cls}, " "which is the default behavior. " "You can override this default behavior by defining a " "`from_config(cls, config)` class method to specify " "how to create an " f"instance of {cls.__name__} from its config.\n\n" f"Received config={config}\n\n" f"Error encountered during deserialization: {e}" ) def get_state_tree(self): """Retrieves tree-like structure of program variables. This method allows retrieval of different program variables (trainable, non-trainable, optimizer, and metrics). The variables are returned in a nested dictionary format, where the keys correspond to the variable names and the values are the nested representations of the variables. Example: ```python program.compile( optimizer=synalinks.optimizers.RandomFewShot(), reward=synalinks.rewards.ExactMatch(), ) program.fit(x=x_train, y=y_train) state_tree = program.get_state_tree() ``` Returns: (dict): A dictionary containing the nested representations of the requested variables. The keys are the variable names, and the values are the corresponding nested dictionaries. """ variables = {} variables["trainable_variables"] = self._create_nested_dict( self.trainable_variables ) variables["non_trainable_variables"] = self._create_nested_dict( self.non_trainable_variables ) if self.optimizer: variables["optimizer_trainable_variables"] = self._create_nested_dict( self.optimizer.trainable_variables ) variables["optimizer_non_trainable_variables"] = self._create_nested_dict( self.optimizer.non_trainable_variables ) variables["metrics_variables"] = self._create_nested_dict(self.metrics_variables) return variables def _create_nested_dict(self, variables): flat_dict = {} for v in variables: if v.path in flat_dict: raise ValueError( "The following variable path is found twice in the program: " f"'{v.path}'. `get_state_tree()` can only be called when " "all variable paths are unique. Make sure to give unique " "names to your modules (and other objects)." ) flat_dict[v.path] = v.get_json() nested_dict = {} for path, value in flat_dict.items(): parts = path.split("/") current_dict = nested_dict for part in parts[:-1]: if part not in current_dict: current_dict[part] = {} current_dict = current_dict[part] current_dict[parts[-1]] = value return nested_dict def set_state_tree(self, state_tree): """Assigns values to variables of the program. This method takes a dictionary of nested variable values, which represents the state tree of the program, and assigns them to the corresponding variables of the program. The dictionary keys represent the variable names (e.g., `'trainable_variables'`, `'optimizer_variables'`), and the values are nested dictionaries containing the variable paths and their corresponding values. Args: state_tree (dict): A dictionary representing the state tree of the program. The keys are the variable names, and the values are nested dictionaries representing the variable paths and their values. """ for k, v in state_tree.items(): path_value_dict = self._flatten_nested_dict(v) if k == "trainable_variables": self._assign_variable_values(self.trainable_variables, path_value_dict) elif k == "non_trainable_variables": self._assign_variable_values( self.non_trainable_variables, path_value_dict ) elif k == "optimizer_trainable_variables": if self.optimizer: self._assign_variable_values( self.optimizer.trainable_variables, path_value_dict ) elif k == "optimizer_non_trainable_variables": if self.optimizer: self._assign_variable_values( self.optimizer.non_trainable_variables, path_value_dict ) elif k == "metrics_variables": self._assign_variable_values(self.metrics_variables, path_value_dict) else: raise ValueError(f"Unknown variable name: {k}") def _assign_variable_values(self, variables, path_value_dict): for full_path, value in path_value_dict.items(): path = "/".join(full_path.split("/")[:-1]) field_name = full_path.split("/")[-1] for variable in variables: if variable.path == path: variable.get_json()[field_name] = value def _flatten_nested_dict(self, nested_dict): flat_dict = {} def _flatten(current_dict, prefix=""): for key, value in current_dict.items(): if isinstance(value, dict): _flatten(value, prefix + key + "/") else: flat_dict[prefix + key] = value _flatten(nested_dict) return flat_dict def save_variables(self, filepath, overwrite=True): """Saves all module variables to a `.variables.json` file. Args: filepath (str | pathlib.Path): `str` or `pathlib.Path` object. Path where to save the program. Must end in `.variables.json`. overwrite (bool): Whether we should overwrite any existing program at the target location, or instead ask the user via an interactive prompt. """ filepath = file_utils.path_to_string(filepath) if not filepath.endswith(".variables.json"): raise ValueError( "The filepath should ends with '.variables.json', " f"received filepath={filepath}" ) config = self.get_state_tree() config_string = json.dumps(config, indent=2) if file_utils.exists(filepath) and not overwrite: io_utils.ask_to_proceed_with_overwrite(filepath) with open(filepath, "w") as f: f.write(config_string) def load_variables(self, filepath): """Load all module variables from a `.variable.json` file. Args: filepath (str | pathlib.Path): `str` or `pathlib.Path` object. Path to load the program's variables from. Must end in `.variables.json`. """ filepath = file_utils.path_to_string(filepath) if not filepath.endswith(".variables.json"): raise ValueError( "The filepath should ends with '.variables.json', " f"received filepath={filepath}" ) with open(filepath, "r") as f: state_tree_config = json.loads(f.read()) self.set_state_tree(state_tree_config) @classmethod def load(cls, filepath, custom_objects=None): """Load a program from a JSON file. Example: ```python import synalinks loaded_program = synalinks.Program.load("program.json") ``` Args: filepath (str | pathlib.Path): `str` or `pathlib.Path` object. Path to load the program's variables from. Must end in `.variables.json`. custom_objects (dict): Optional dictionary mapping names (strings) to custom classes or functions to be considered during deserialization. Returns: (Program): A Synalinks program instance (uncompiled). """ filepath = file_utils.path_to_string(filepath) if not filepath.endswith(".json"): raise ValueError( f"The filepath should ends with '.json', received filepath={filepath}" ) with open(filepath, "r") as f: json_config = f.read() return program_from_json(json_config, custom_objects=custom_objects) ```` ## `get_module(name=None, index=None)` Retrieves a module based on either its name (unique) or index. If `name` and `index` are both provided, `index` will take precedence. Indices are based on order of horizontal graph traversal (bottom-up). Parameters: | Name | Type | Description | Default | | ------- | ----- | ------------------------- | ------- | | `name` | `str` | String, name of module. | `None` | | `index` | `int` | Integer, index of module. | `None` | Returns: | Type | Description | | -------- | ------------------ | | `Module` | A module instance. | Source code in `synalinks/src/programs/program.py` ``` def get_module(self, name=None, index=None): """Retrieves a module based on either its name (unique) or index. If `name` and `index` are both provided, `index` will take precedence. Indices are based on order of horizontal graph traversal (bottom-up). Args: name (str): String, name of module. index (int): Integer, index of module. Returns: (Module): A module instance. """ if index is not None and name is not None: raise ValueError( "Provide only a module name or a module index. Received: " f"index={index}, name={name}." ) if index is not None: if len(self.modules) <= index: raise ValueError( f"Was asked to retrieve module at index {index}" f" but program only has {len(self.modules)}" " modules." ) else: return self.modules[index] if name is not None: for module in self.modules: if module.name == name: return module raise ValueError( f"No such module: {name}. Existing modules are: " f"{list(module.name for module in self.modules)}." ) raise ValueError("Provide either a module name or module index at `get_module`.") ``` ## `get_state_tree()` Retrieves tree-like structure of program variables. This method allows retrieval of different program variables (trainable, non-trainable, optimizer, and metrics). The variables are returned in a nested dictionary format, where the keys correspond to the variable names and the values are the nested representations of the variables. Example: ``` program.compile( optimizer=synalinks.optimizers.RandomFewShot(), reward=synalinks.rewards.ExactMatch(), ) program.fit(x=x_train, y=y_train) state_tree = program.get_state_tree() ``` Returns: | Type | Description | | ------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | `dict` | A dictionary containing the nested representations of the requested variables. The keys are the variable names, and the values are the corresponding nested dictionaries. | Source code in `synalinks/src/programs/program.py` ```` def get_state_tree(self): """Retrieves tree-like structure of program variables. This method allows retrieval of different program variables (trainable, non-trainable, optimizer, and metrics). The variables are returned in a nested dictionary format, where the keys correspond to the variable names and the values are the nested representations of the variables. Example: ```python program.compile( optimizer=synalinks.optimizers.RandomFewShot(), reward=synalinks.rewards.ExactMatch(), ) program.fit(x=x_train, y=y_train) state_tree = program.get_state_tree() ``` Returns: (dict): A dictionary containing the nested representations of the requested variables. The keys are the variable names, and the values are the corresponding nested dictionaries. """ variables = {} variables["trainable_variables"] = self._create_nested_dict( self.trainable_variables ) variables["non_trainable_variables"] = self._create_nested_dict( self.non_trainable_variables ) if self.optimizer: variables["optimizer_trainable_variables"] = self._create_nested_dict( self.optimizer.trainable_variables ) variables["optimizer_non_trainable_variables"] = self._create_nested_dict( self.optimizer.non_trainable_variables ) variables["metrics_variables"] = self._create_nested_dict(self.metrics_variables) return variables ```` ## `load(filepath, custom_objects=None)` Load a program from a JSON file. Example: ``` import synalinks loaded_program = synalinks.Program.load("program.json") ``` Parameters: | Name | Type | Description | Default | | ---------------- | ------ | ------------------------------------------------------------------------------------------------------------------- | --------------------------------------------------------------------------------------------------- | | `filepath` | \`str | Path\` | str or pathlib.Path object. Path to load the program's variables from. Must end in .variables.json. | | `custom_objects` | `dict` | Optional dictionary mapping names (strings) to custom classes or functions to be considered during deserialization. | `None` | Returns: | Type | Description | | --------- | ------------------------------------------ | | `Program` | A Synalinks program instance (uncompiled). | Source code in `synalinks/src/programs/program.py` ```` @classmethod def load(cls, filepath, custom_objects=None): """Load a program from a JSON file. Example: ```python import synalinks loaded_program = synalinks.Program.load("program.json") ``` Args: filepath (str | pathlib.Path): `str` or `pathlib.Path` object. Path to load the program's variables from. Must end in `.variables.json`. custom_objects (dict): Optional dictionary mapping names (strings) to custom classes or functions to be considered during deserialization. Returns: (Program): A Synalinks program instance (uncompiled). """ filepath = file_utils.path_to_string(filepath) if not filepath.endswith(".json"): raise ValueError( f"The filepath should ends with '.json', received filepath={filepath}" ) with open(filepath, "r") as f: json_config = f.read() return program_from_json(json_config, custom_objects=custom_objects) ```` ## `load_variables(filepath)` Load all module variables from a `.variable.json` file. Parameters: | Name | Type | Description | Default | | ---------- | ----- | ----------- | --------------------------------------------------------------------------------------------------- | | `filepath` | \`str | Path\` | str or pathlib.Path object. Path to load the program's variables from. Must end in .variables.json. | Source code in `synalinks/src/programs/program.py` ``` def load_variables(self, filepath): """Load all module variables from a `.variable.json` file. Args: filepath (str | pathlib.Path): `str` or `pathlib.Path` object. Path to load the program's variables from. Must end in `.variables.json`. """ filepath = file_utils.path_to_string(filepath) if not filepath.endswith(".variables.json"): raise ValueError( "The filepath should ends with '.variables.json', " f"received filepath={filepath}" ) with open(filepath, "r") as f: state_tree_config = json.loads(f.read()) self.set_state_tree(state_tree_config) ``` ## `save(filepath, overwrite=True, **kwargs)` Saves a program as a `.json` file. Example: ``` import synalinks class Query(synalinks.DataModel): query: str class AnswerWithRationale(synalinks.DataModel): rationale: str answer: str language_model = LanguageModel("ollama/mistral") program = synalinks.Sequential( [ synalinks.Input(data_model=Query), synalinks.Generator( data_model=AnswerWithRationale, language_model=language_model, ), ], ) program.save("program.json") loaded_program = synalinks.programs.program_from_json("program.json") ``` The saved `.json` file contains: - The program's configuration (architecture) - The program's variables - The program's optimizer's state (if any) - The program's reward's state (if any) Thus programs can be reinstantiated in the exact same state. Parameters: | Name | Type | Description | Default | | ----------- | ------ | -------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------- | | `filepath` | \`str | PathLike\` | str or os.PathLike object. The path where to save the model. Must end in .json. | | `overwrite` | `bool` | Whether we should overwrite any existing program at the target location, or instead ask the user via an interactive prompt. Default to True. | `True` | Source code in `synalinks/src/programs/program.py` ```` def save(self, filepath, overwrite=True, **kwargs): """Saves a program as a `.json` file. Example: ```python import synalinks class Query(synalinks.DataModel): query: str class AnswerWithRationale(synalinks.DataModel): rationale: str answer: str language_model = LanguageModel("ollama/mistral") program = synalinks.Sequential( [ synalinks.Input(data_model=Query), synalinks.Generator( data_model=AnswerWithRationale, language_model=language_model, ), ], ) program.save("program.json") loaded_program = synalinks.programs.program_from_json("program.json") ``` The saved `.json` file contains: - The program's configuration (architecture) - The program's variables - The program's optimizer's state (if any) - The program's reward's state (if any) Thus programs can be reinstantiated in the exact same state. Args: filepath (str | os.PathLike): `str` or `os.PathLike` object. The path where to save the model. Must end in `.json`. overwrite (bool): Whether we should overwrite any existing program at the target location, or instead ask the user via an interactive prompt. Default to `True`. """ from synalinks.src.saving import serialization_lib filepath = file_utils.path_to_string(filepath) if not filepath.endswith(".json"): raise ValueError( f"The filepath should ends with '.json', received filepath={filepath}" ) program_config = serialization_lib.serialize_synalinks_object(self) variables_config = self.get_state_tree() program_config.update({"variables": variables_config}) program_config_string = json.dumps(program_config, indent=2) if file_utils.exists(filepath) and not overwrite: io_utils.ask_to_proceed_with_overwrite(filepath) with open(filepath, "w") as f: f.write(program_config_string) ```` ## `save_variables(filepath, overwrite=True)` Saves all module variables to a `.variables.json` file. Parameters: | Name | Type | Description | Default | | ----------- | ------ | --------------------------------------------------------------------------------------------------------------------------- | ---------------------------------------------------------------------------------------- | | `filepath` | \`str | Path\` | str or pathlib.Path object. Path where to save the program. Must end in .variables.json. | | `overwrite` | `bool` | Whether we should overwrite any existing program at the target location, or instead ask the user via an interactive prompt. | `True` | Source code in `synalinks/src/programs/program.py` ``` def save_variables(self, filepath, overwrite=True): """Saves all module variables to a `.variables.json` file. Args: filepath (str | pathlib.Path): `str` or `pathlib.Path` object. Path where to save the program. Must end in `.variables.json`. overwrite (bool): Whether we should overwrite any existing program at the target location, or instead ask the user via an interactive prompt. """ filepath = file_utils.path_to_string(filepath) if not filepath.endswith(".variables.json"): raise ValueError( "The filepath should ends with '.variables.json', " f"received filepath={filepath}" ) config = self.get_state_tree() config_string = json.dumps(config, indent=2) if file_utils.exists(filepath) and not overwrite: io_utils.ask_to_proceed_with_overwrite(filepath) with open(filepath, "w") as f: f.write(config_string) ``` ## `set_state_tree(state_tree)` Assigns values to variables of the program. This method takes a dictionary of nested variable values, which represents the state tree of the program, and assigns them to the corresponding variables of the program. The dictionary keys represent the variable names (e.g., `'trainable_variables'`, `'optimizer_variables'`), and the values are nested dictionaries containing the variable paths and their corresponding values. Parameters: | Name | Type | Description | Default | | ------------ | ------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ---------- | | `state_tree` | `dict` | A dictionary representing the state tree of the program. The keys are the variable names, and the values are nested dictionaries representing the variable paths and their values. | *required* | Source code in `synalinks/src/programs/program.py` ``` def set_state_tree(self, state_tree): """Assigns values to variables of the program. This method takes a dictionary of nested variable values, which represents the state tree of the program, and assigns them to the corresponding variables of the program. The dictionary keys represent the variable names (e.g., `'trainable_variables'`, `'optimizer_variables'`), and the values are nested dictionaries containing the variable paths and their corresponding values. Args: state_tree (dict): A dictionary representing the state tree of the program. The keys are the variable names, and the values are nested dictionaries representing the variable paths and their values. """ for k, v in state_tree.items(): path_value_dict = self._flatten_nested_dict(v) if k == "trainable_variables": self._assign_variable_values(self.trainable_variables, path_value_dict) elif k == "non_trainable_variables": self._assign_variable_values( self.non_trainable_variables, path_value_dict ) elif k == "optimizer_trainable_variables": if self.optimizer: self._assign_variable_values( self.optimizer.trainable_variables, path_value_dict ) elif k == "optimizer_non_trainable_variables": if self.optimizer: self._assign_variable_values( self.optimizer.non_trainable_variables, path_value_dict ) elif k == "metrics_variables": self._assign_variable_values(self.metrics_variables, path_value_dict) else: raise ValueError(f"Unknown variable name: {k}") ``` ## `summary(line_length=None, positions=None, print_fn=None, expand_nested=False, show_trainable=False, module_range=None)` Prints a string summary of the program. Parameters: | Name | Type | Description | Default | | ---------------- | ---------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | `line_length` | `int` | Total length of printed lines (e.g. set this to adapt the display to different terminal window sizes). | `None` | | `positions` | `list` | Relative or absolute positions of log elements in each line. If not provided, becomes [0.3, 0.6, 0.70, 1.]. Defaults to None. | `None` | | `print_fn` | `Callable` | Print function to use. By default, prints to stdout. If stdout doesn't work in your environment, change to print. It will be called on each line of the summary. You can set it to a custom function in order to capture the string summary. | `None` | | `expand_nested` | `bool` | Whether to expand the nested models. Defaults to False. | `False` | | `show_trainable` | `bool` | Whether to show if a module is trainable. Defaults to False. | `False` | | `module_range` | \`list | tuple\` | a list or tuple of 2 strings, which is the starting module name and ending module name (both inclusive) indicating the range of modules to be printed in summary. It also accepts regex patterns instead of exact names. In this case, the start predicate will be the first element that matches module_range[0] and the end predicate will be the last element that matches module_range[1]. By default None considers all modules of the model. | Raises: | Type | Description | | ------------ | ------------------------------------------------- | | `ValueError` | if summary() is called before the model is built. | Source code in `synalinks/src/programs/program.py` ``` def summary( self, line_length=None, positions=None, print_fn=None, expand_nested=False, show_trainable=False, module_range=None, ): """Prints a string summary of the program. Args: line_length (int): Total length of printed lines (e.g. set this to adapt the display to different terminal window sizes). positions (list): Relative or absolute positions of log elements in each line. If not provided, becomes `[0.3, 0.6, 0.70, 1.]`. Defaults to `None`. print_fn (Callable): Print function to use. By default, prints to `stdout`. If `stdout` doesn't work in your environment, change to `print`. It will be called on each line of the summary. You can set it to a custom function in order to capture the string summary. expand_nested (bool): Whether to expand the nested models. Defaults to `False`. show_trainable (bool): Whether to show if a module is trainable. Defaults to `False`. module_range (list | tuple): a list or tuple of 2 strings, which is the starting module name and ending module name (both inclusive) indicating the range of modules to be printed in summary. It also accepts regex patterns instead of exact names. In this case, the start predicate will be the first element that matches `module_range[0]` and the end predicate will be the last element that matches `module_range[1]`. By default `None` considers all modules of the model. Raises: ValueError: if `summary()` is called before the model is built. """ summary_utils.print_summary( self, line_length=line_length, positions=positions, print_fn=print_fn, expand_nested=expand_nested, show_trainable=show_trainable, module_range=module_range, ) ``` ## `to_json(**kwargs)` Returns a JSON string containing the network configuration. ``` json_string = program.to_json() ``` To load a network from a JSON save file, use `synalinks.programs.program_from_json(json_string, custom_objects={...})`. Parameters: | Name | Type | Description | Default | | ---------- | ------------------- | ---------------------------------------------------------- | ------- | | `**kwargs` | `keyword arguments` | Additional keyword arguments to be passed to json.dumps(). | `{}` | Returns: | Type | Description | | ----- | -------------- | | `str` | A JSON string. | Source code in `synalinks/src/programs/program.py` ```` def to_json(self, **kwargs): """Returns a JSON string containing the network configuration. ```python json_string = program.to_json() ``` To load a network from a JSON save file, use `synalinks.programs.program_from_json(json_string, custom_objects={...})`. Args: **kwargs (keyword arguments): Additional keyword arguments to be passed to `json.dumps()`. Returns: (str): A JSON string. """ from synalinks.src.saving import serialization_lib program_config = serialization_lib.serialize_synalinks_object(self) return json.dumps(program_config, **kwargs) ```` # The Sequential class Bases: `Program` `Sequential` groups a linear stack of modules into a `Program`. Examples: ``` program = synalinks.Sequential( name="chain_of_thought", description="Useful to answer in a step by step manner." ) program.add( synalinks.Input( data_program=Query, ) ) program.add( synalinks.Generator( data_program=AnswerWithRationale, language_program=language_program, ) ) # Note that you can also omit the initial `Input`. # In that case the program doesn't have any variables until the first call # to a training/evaluation method (since it isn't yet built): program = synalinks.Sequential( name="chain_of_thought", description="Useful to answer in a step by step manner." ) program.add( synalinks.Generator( data_program=AnswerWithRationale, language_program=language_program, ) ) # program.variables not created yet # Whereas if you specify an `Input`, the program gets built # continuously as you are adding modules: program = synalinks.Sequential( name="chain_of_thought", description="Useful to answer in a step by step manner." ) program.add( synalinks.Input( data_program=Query, ) ) program.add( synalinks.Generator( data_program=AnswerWithRationale, language_program=language_program, ) ) # Note that when using the delayed-build pattern (no input specified), # the program gets built the first time you call `fit`, `eval`, or `predict`, # or the first time you call the program on some input data. ``` Source code in `synalinks/src/programs/sequential.py` ```` @synalinks_export(["synalinks.Sequential", "synalinks.programs.Sequential"]) class Sequential(Program): """`Sequential` groups a linear stack of modules into a `Program`. Examples: ```python program = synalinks.Sequential( name="chain_of_thought", description="Useful to answer in a step by step manner." ) program.add( synalinks.Input( data_program=Query, ) ) program.add( synalinks.Generator( data_program=AnswerWithRationale, language_program=language_program, ) ) # Note that you can also omit the initial `Input`. # In that case the program doesn't have any variables until the first call # to a training/evaluation method (since it isn't yet built): program = synalinks.Sequential( name="chain_of_thought", description="Useful to answer in a step by step manner." ) program.add( synalinks.Generator( data_program=AnswerWithRationale, language_program=language_program, ) ) # program.variables not created yet # Whereas if you specify an `Input`, the program gets built # continuously as you are adding modules: program = synalinks.Sequential( name="chain_of_thought", description="Useful to answer in a step by step manner." ) program.add( synalinks.Input( data_program=Query, ) ) program.add( synalinks.Generator( data_program=AnswerWithRationale, language_program=language_program, ) ) # Note that when using the delayed-build pattern (no input specified), # the program gets built the first time you call `fit`, `eval`, or `predict`, # or the first time you call the program on some input data. ``` """ def __new__(cls, *args, **kwargs): return typing.cast(cls, super().__new__(cls)) def __init__(self, modules=None, trainable=True, name=None, description=None): if description is None: raise ValueError( "All Sequential programs must have a `description`, " "please add it to the constructor arguments" ) super().__init__(trainable=trainable, name=name, description=description) self._functional = None self._modules = [] if modules: for module in modules: self.add(module, rebuild=False) run_maybe_nested(self._maybe_rebuild()) def add(self, module, rebuild=True): """Adds a module instance on top of the module stack. Args: module (Module): Module instance. rebuild (bool): If `True` rebuild the program. """ # If we are passed a SymbolicDataModel created by synalinks.Input(), we # extract the input module from its synalinks history and use that. if hasattr(module, "_synalinks_history"): origin_module = module._synalinks_history[0] if isinstance(origin_module, InputModule): module = origin_module if not isinstance(module, Module): raise ValueError( "Only instances of `synalinks.Module` can be " f"added to a Sequential program. Received: {module} " f"(of type {type(module)})" ) if not self._is_module_name_unique(module): raise ValueError( "All modules added to a Sequential program " f"should have unique names. Name '{module.name}' is already " "the name of a module in this program. Update the `name` argument " "to pass a unique name." ) if ( isinstance(module, InputModule) and self._modules and isinstance(self._modules[0], InputModule) ): raise ValueError( f"Sequential program '{self.name}' has already been configured " f"to use input schema {self._modules[0].input_schema}. You cannot " f"add a different Input module to it." ) self._modules.append(module) if rebuild: run_maybe_nested(self._maybe_rebuild()) else: self.built = False self._functional = None def pop(self, rebuild=True): """Removes the last module in the program. Args: rebuild (bool): If `True` rebuild the program. """ module = self._modules.pop() self.built = False self._functional = None if rebuild: run_maybe_nested(self._maybe_rebuild()) return module async def _maybe_rebuild(self): self.built = False self._functional = None if isinstance(self._modules[0], InputModule) and len(self._modules) > 1: input_schema = self._modules[0].get_schema() await self.build(Input(schema=input_schema)) elif hasattr(self._modules[0], "input_schema") and len(self._modules) > 1: # We can build the Sequential program if the first module has the # `input_schema` property. This is most commonly found in Functional # program. input_schema = self._modules[0].input_schema await self.build(Input(schema=input_schema)) def _lock_state(self): # Unlike other modules, Sequential is mutable after build. pass def _obj_type(self): return "Sequential" async def build(self, inputs): try: input_schema = standardize_schema(inputs.get_schema()) except Exception: # Do not attempt to build if the program does not have a single # input. return if not self._modules: raise ValueError( f"Sequential program {self.name} cannot be built because it has " "no modules. Call `program.add(module)`." ) if isinstance(self._modules[0], InputModule): if self._modules[0].get_schema() != input_schema: raise ValueError( f"Sequential program '{self.name}' has already been " "configured to use input schema " f"{self._modules[0].get_schema()}. You cannot build it " f"with input_schema {input_schema}" ) else: self._modules = [InputModule(schema=input_schema)] + self._modules # Build functional program inputs = self._modules[0].output x = inputs for module in self._modules[1:]: try: x = await module(x) except NotImplementedError: # Can happen if spec inference is not implemented. # TODO: consider reverting inbound nodes on modules processed. return except TypeError as e: signature = inspect.signature(module.call) positional_args = [ param for param in signature.parameters.values() if param.default == inspect.Parameter.empty ] if len(positional_args) != 1: raise ValueError( "Modules added to a Sequential program " "can only have a single positional argument, " f"the input data model. Module {module.__class__.__name__} " f"has multiple positional arguments: {positional_args}" ) raise e outputs = x self._functional = Functional(inputs=inputs, outputs=outputs) self.built = True async def call(self, inputs, training=None): if self._functional: return await self._functional.call(inputs, training=training) # Fallback: Just apply the module sequence. # This typically happens if `inputs` is a nested struct. for module in self.modules: # During each iteration, `inputs` are the inputs to `module`, and # `outputs` are the outputs of `module` applied to `inputs`. At the # end of each iteration `inputs` is set to `outputs` to prepare for # the next module. kwargs = {} if module._call_has_training_arg and training is not None: kwargs["training"] = training outputs = await module(inputs, **kwargs) inputs = outputs return outputs @property def modules(self): """Unlike Keras, also output the potentially auto-generated `InputModule`""" return self._modules @modules.setter def modules(self, _): raise AttributeError( "`Sequential.modules` attribute is reserved and should not be used. " "Use `add()` and `pop()` to change the modules in this program." ) async def compute_output_spec(self, inputs, training=None): if self._functional: return await self._functional.compute_output_spec( inputs, training=training, ) # Direct application for module in self.modules: outputs = await module.compute_output_spec(inputs, training=training) inputs = outputs return outputs @property def input_schema(self): if self._functional: return self._functional.input_schema raise AttributeError( f"Sequential program '{self.name}' has no defined input schema yet." ) @property def output_schema(self): if self._functional: return self._functional.output_schema raise AttributeError( f"Sequential program '{self.name}' has no defined output schema yet." ) @property def inputs(self): if self._functional: return self._functional.inputs raise AttributeError( f"Sequential program '{self.name}' has no defined inputs yet." ) @property def outputs(self): if self._functional: return self._functional.outputs raise AttributeError( f"Sequential program '{self.name}' has no defined outputs yet." ) def _is_module_name_unique(self, module): for ref_module in self._modules: if module.name == ref_module.name and ref_module is not module: return False return True def get_config(self): serialize_fn = serialization_lib.serialize_synalinks_object module_configs = [] for module in self.modules: module_configs.append(serialize_fn(module)) config = Program.get_config(self) config["name"] = self.name config["description"] = self.description config["modules"] = copy.deepcopy(module_configs) if self._functional is not None: config["build_input_schema"] = self._modules[0].input_schema return config @classmethod def from_config(cls, config, custom_objects=None): if "name" in config: name = config["name"] build_input_schema = config.get("build_input_schema") module_configs = config["modules"] else: name = None module_configs = config if "description" in config: description = config["description"] else: description = None program = cls(name=name, description=description) for module_config in module_configs: module = serialization_lib.deserialize_synalinks_object( module_config, custom_objects=custom_objects, ) program.add(module) if ( not program._functional and "build_input_schema" in locals() and build_input_schema and isinstance(build_input_schema, (tuple, list)) ): program.build(build_input_schema) return program ```` ## `modules` Unlike Keras, also output the potentially auto-generated `InputModule` ## `add(module, rebuild=True)` Adds a module instance on top of the module stack. Parameters: | Name | Type | Description | Default | | --------- | -------- | ---------------------------- | ---------- | | `module` | `Module` | Module instance. | *required* | | `rebuild` | `bool` | If True rebuild the program. | `True` | Source code in `synalinks/src/programs/sequential.py` ``` def add(self, module, rebuild=True): """Adds a module instance on top of the module stack. Args: module (Module): Module instance. rebuild (bool): If `True` rebuild the program. """ # If we are passed a SymbolicDataModel created by synalinks.Input(), we # extract the input module from its synalinks history and use that. if hasattr(module, "_synalinks_history"): origin_module = module._synalinks_history[0] if isinstance(origin_module, InputModule): module = origin_module if not isinstance(module, Module): raise ValueError( "Only instances of `synalinks.Module` can be " f"added to a Sequential program. Received: {module} " f"(of type {type(module)})" ) if not self._is_module_name_unique(module): raise ValueError( "All modules added to a Sequential program " f"should have unique names. Name '{module.name}' is already " "the name of a module in this program. Update the `name` argument " "to pass a unique name." ) if ( isinstance(module, InputModule) and self._modules and isinstance(self._modules[0], InputModule) ): raise ValueError( f"Sequential program '{self.name}' has already been configured " f"to use input schema {self._modules[0].input_schema}. You cannot " f"add a different Input module to it." ) self._modules.append(module) if rebuild: run_maybe_nested(self._maybe_rebuild()) else: self.built = False self._functional = None ``` ## `pop(rebuild=True)` Removes the last module in the program. Parameters: | Name | Type | Description | Default | | --------- | ------ | ---------------------------- | ------- | | `rebuild` | `bool` | If True rebuild the program. | `True` | Source code in `synalinks/src/programs/sequential.py` ``` def pop(self, rebuild=True): """Removes the last module in the program. Args: rebuild (bool): If `True` rebuild the program. """ module = self._modules.pop() self.built = False self._functional = None if rebuild: run_maybe_nested(self._maybe_rebuild()) return module ``` ## `FunctionCallingAgent` Bases: `Module` A trainable parallel function calling agent. The agent has 2 different modes: - Autonomous: It will execute tools as soon as called. - Non-autonomous: It will return the tool arguments as a ChatMessage. In *autonomous* mode, the agent accept **any kind of data model input** and perform a final inference to eventually format its final answer if a `data_model` or `schema` is provided. Example: ``` import synalinks import asyncio class Query(synalinks.DataModel): query: str = synalinks.Field( description="The user query", ) class NumericalFinalAnswer(synalinks.DataModel): final_answer: float = synalinks.Field( description="The correct final numerical answer", ) async def calculate(expression: str): """Calculate the result of a mathematical expression. Args: expression (str): The mathematical expression to calculate, such as '2 + 2'. The expression can contain numbers, operators (+, -, *, /), parentheses, and spaces. """ if not all(char in "0123456789+-*/(). " for char in expression): return { "result": None, "log": ( "Error: invalid characters in expression. " "The expression can only contain numbers, operators (+, -, *, /)," " parentheses, and spaces NOT letters." ), } try: # Evaluate the mathematical expression safely result = round(float(eval(expression, {"__builtins__": None}, {})), 2) return { "result": result, "log": "Successfully executed", } except Exception as e: return { "result": None, "log": f"Error: {e}", } async def main(): language_model = synalinks.LanguageModel(model="ollama/mistral") tools = [ synalinks.Tool(calculate), ] inputs = synalinks.Input(data_model=Query) outputs = await synalinks.FunctionCallingAgent( data_model=NumericalFinalAnswer, tools=tools, language_model=language_model, max_iterations=5, autonomous=True, )(inputs) agent = synalinks.Program( inputs=inputs, outputs=outputs, name="math_agent", description="A math agent", ) input_query = Query(query="How much is 152648 + 485?") response = await agent(input_query) print(response.prettify_json()) if __name__ == "__main__": asyncio.run(main()) ``` Result: ``` { "query": "How much is 152648 + 485?", "messages": [ { "role": "assistant", "content": "Performing simple addition", "tool_calls": [ { "id": "92a3657c-1a45-46e6-8173-df4255b8423b", "name": "calculate", "arguments": { "expression": "152648 + 485" } } ] }, { "role": "tool", "content": { "result": 153133.0, "log": "Successfully executed" }, "tool_call_id": "92a3657c-1a45-46e6-8173-df4255b8423b", }, { "role": "assistant", "content": "The user has asked for a simple addition calculation. The assistant used the 'calculate' tool to perform this task, and the result was returned successfully.", } ], "final_answer": 153133.0 } ``` In *non-autonomous* mode (also called human in the loop or interactive mode), the user needs to validate/edit the tool arguments and send it back to the agent. In this mode, the agent requires an `ChatMessages` data model as input and output an `ChatMessage` (or `ChatMessages` if `return_inputs_with_trajectory` is true) back to the user. In that case, the agent ignore the `max_iterations` argument, as it will only perform one **step at a time**. Example: ``` import synalinks import asyncio MAX_ITERATIONS = 5 async def calculate(expression: str): """Calculate the result of a mathematical expression. Args: expression (str): The mathematical expression to calculate, such as '2 + 2'. The expression can contain numbers, operators (+, -, *, /), parentheses, and spaces. """ if not all(char in "0123456789+-*/(). " for char in expression): return { "result": None, "log": ( "Error: invalid characters in expression. " "The expression can only contain numbers, operators (+, -, *, /)," " parentheses, and spaces NOT letters." ), } try: # Evaluate the mathematical expression safely result = round(float(eval(expression, {"__builtins__": None}, {})), 2) return { "result": result, "log": "Successfully executed", } except Exception as e: return { "result": None, "log": f"Error: {e}", } async def main(): language_model = synalinks.LanguageModel( model="ollama/mistral", ) tools = [ synalinks.Tool(calculate), ] inputs = synalinks.Input(data_model=synalinks.ChatMessages) outputs = await synalinks.FunctionCallingAgent( tools=tools, language_model=language_model, return_inputs_with_trajectory=True, autonomous=False, )(inputs) agent = synalinks.Program( inputs=inputs, outputs=outputs, name="math_agent", description="A math agent", ) input_messages = synalinks.ChatMessages( messages=[ synalinks.ChatMessage( role="user", content="How much is 152648 + 485?", ) ] ) for i in range(MAX_ITERATIONS): response = await agent(input_messages) print("Assistant response (with trajectory):") print(response.prettify_json()) assistant_message = response.get("messages")[-1] if not assistant_message.get("tool_calls"): break # We stop the loop if the agent didn't call any tool # Validate the tool calls arguments (with an UI or CLI) # Then re-inject the validated assistant response in the input_messages # The corresponding tools will be called by the agent # Here we assume everything is okay for the purpose of the demo. input_messages.messages.append(assistant_message) if __name__ == "__main__": asyncio.run(main()) ``` The FunctionCallingAgent is compatible with MCP tools, here is an example on how to use it: ``` import synalinks import asyncio import litellm class Query(synalinks.DataModel): """Input query data model""" query: str = synalinks.Field( description="The user query", ) class FinalAnswer(synalinks.DataModel): """Final answer data model""" answer: str = synalinks.Field( description="The correct final answer", ) async def main(): language_model = synalinks.LanguageModel( model="ollama/mistral", ) mcp_client = synalinks.MultiServerMCPClient( { "math": { "url": "http://localhost:8183/mcp/", "transport": "streamable_http", }, } ) tools = await mcp_client.get_tools() inputs = synalinks.Input(data_model=Query) outputs = await synalinks.FunctionCallingAgent( data_model=FinalAnswer, tools=tools, language_model=language_model, max_iterations=5, autonomous=True, )(inputs) agent = synalinks.Program( inputs=inputs, outputs=outputs, name="mcp_math_agent", description="A math agent that can use an external calculator", ) input_query = Query(query="How much is 152648 + 485?") response = await agent(input_query) print(response.prettify_json()) if __name__ == "__main__": asyncio.run(main()) ``` Source code in `synalinks/src/modules/agents/function_calling_agent.py` ```` @synalinks_export( [ "synalinks.modules.FunctionCallingAgent", "synalinks.FunctionCallingAgent", ] ) class FunctionCallingAgent(Module): """A trainable parallel function calling agent. The agent has 2 different modes: - Autonomous: It will execute tools as soon as called. - Non-autonomous: It will return the tool arguments as a ChatMessage. In *autonomous* mode, the agent accept **any kind of data model input** and perform a final inference to eventually format its final answer if a `data_model` or `schema` is provided. Example: ```python import synalinks import asyncio class Query(synalinks.DataModel): query: str = synalinks.Field( description="The user query", ) class NumericalFinalAnswer(synalinks.DataModel): final_answer: float = synalinks.Field( description="The correct final numerical answer", ) async def calculate(expression: str): \"""Calculate the result of a mathematical expression. Args: expression (str): The mathematical expression to calculate, such as '2 + 2'. The expression can contain numbers, operators (+, -, *, /), parentheses, and spaces. \""" if not all(char in "0123456789+-*/(). " for char in expression): return { "result": None, "log": ( "Error: invalid characters in expression. " "The expression can only contain numbers, operators (+, -, *, /)," " parentheses, and spaces NOT letters." ), } try: # Evaluate the mathematical expression safely result = round(float(eval(expression, {"__builtins__": None}, {})), 2) return { "result": result, "log": "Successfully executed", } except Exception as e: return { "result": None, "log": f"Error: {e}", } async def main(): language_model = synalinks.LanguageModel(model="ollama/mistral") tools = [ synalinks.Tool(calculate), ] inputs = synalinks.Input(data_model=Query) outputs = await synalinks.FunctionCallingAgent( data_model=NumericalFinalAnswer, tools=tools, language_model=language_model, max_iterations=5, autonomous=True, )(inputs) agent = synalinks.Program( inputs=inputs, outputs=outputs, name="math_agent", description="A math agent", ) input_query = Query(query="How much is 152648 + 485?") response = await agent(input_query) print(response.prettify_json()) if __name__ == "__main__": asyncio.run(main()) ``` Result: ```json { "query": "How much is 152648 + 485?", "messages": [ { "role": "assistant", "content": "Performing simple addition", "tool_calls": [ { "id": "92a3657c-1a45-46e6-8173-df4255b8423b", "name": "calculate", "arguments": { "expression": "152648 + 485" } } ] }, { "role": "tool", "content": { "result": 153133.0, "log": "Successfully executed" }, "tool_call_id": "92a3657c-1a45-46e6-8173-df4255b8423b", }, { "role": "assistant", "content": "The user has asked for a simple addition calculation. The assistant used the 'calculate' tool to perform this task, and the result was returned successfully.", } ], "final_answer": 153133.0 } ``` In *non-autonomous* mode (also called human in the loop or interactive mode), the user needs to validate/edit the tool arguments and send it back to the agent. In this mode, the agent requires an `ChatMessages` data model as input and output an `ChatMessage` (or `ChatMessages` if `return_inputs_with_trajectory` is true) back to the user. In that case, the agent ignore the `max_iterations` argument, as it will only perform one **step at a time**. Example: ```python import synalinks import asyncio MAX_ITERATIONS = 5 async def calculate(expression: str): \"""Calculate the result of a mathematical expression. Args: expression (str): The mathematical expression to calculate, such as '2 + 2'. The expression can contain numbers, operators (+, -, *, /), parentheses, and spaces. \""" if not all(char in "0123456789+-*/(). " for char in expression): return { "result": None, "log": ( "Error: invalid characters in expression. " "The expression can only contain numbers, operators (+, -, *, /)," " parentheses, and spaces NOT letters." ), } try: # Evaluate the mathematical expression safely result = round(float(eval(expression, {"__builtins__": None}, {})), 2) return { "result": result, "log": "Successfully executed", } except Exception as e: return { "result": None, "log": f"Error: {e}", } async def main(): language_model = synalinks.LanguageModel( model="ollama/mistral", ) tools = [ synalinks.Tool(calculate), ] inputs = synalinks.Input(data_model=synalinks.ChatMessages) outputs = await synalinks.FunctionCallingAgent( tools=tools, language_model=language_model, return_inputs_with_trajectory=True, autonomous=False, )(inputs) agent = synalinks.Program( inputs=inputs, outputs=outputs, name="math_agent", description="A math agent", ) input_messages = synalinks.ChatMessages( messages=[ synalinks.ChatMessage( role="user", content="How much is 152648 + 485?", ) ] ) for i in range(MAX_ITERATIONS): response = await agent(input_messages) print("Assistant response (with trajectory):") print(response.prettify_json()) assistant_message = response.get("messages")[-1] if not assistant_message.get("tool_calls"): break # We stop the loop if the agent didn't call any tool # Validate the tool calls arguments (with an UI or CLI) # Then re-inject the validated assistant response in the input_messages # The corresponding tools will be called by the agent # Here we assume everything is okay for the purpose of the demo. input_messages.messages.append(assistant_message) if __name__ == "__main__": asyncio.run(main()) ``` The FunctionCallingAgent is compatible with MCP tools, here is an example on how to use it: ```python import synalinks import asyncio import litellm class Query(synalinks.DataModel): \"""Input query data model\""" query: str = synalinks.Field( description="The user query", ) class FinalAnswer(synalinks.DataModel): \"""Final answer data model\""" answer: str = synalinks.Field( description="The correct final answer", ) async def main(): language_model = synalinks.LanguageModel( model="ollama/mistral", ) mcp_client = synalinks.MultiServerMCPClient( { "math": { "url": "http://localhost:8183/mcp/", "transport": "streamable_http", }, } ) tools = await mcp_client.get_tools() inputs = synalinks.Input(data_model=Query) outputs = await synalinks.FunctionCallingAgent( data_model=FinalAnswer, tools=tools, language_model=language_model, max_iterations=5, autonomous=True, )(inputs) agent = synalinks.Program( inputs=inputs, outputs=outputs, name="mcp_math_agent", description="A math agent that can use an external calculator", ) input_query = Query(query="How much is 152648 + 485?") response = await agent(input_query) print(response.prettify_json()) if __name__ == "__main__": asyncio.run(main()) ``` """ def __init__( self, schema=None, data_model=None, language_model=None, prompt_template=None, examples=None, instructions=None, use_inputs_schema=False, use_outputs_schema=False, tools=None, autonomous=True, return_inputs_with_trajectory=True, max_iterations=5, name=None, description=None, ): super().__init__( name=name, description=description, ) if not schema and data_model: schema = data_model.get_schema() self.schema = schema self.prompt_template = prompt_template if not instructions: instructions = get_default_instructions() self.instructions = instructions self.examples = examples self.use_inputs_schema = use_inputs_schema self.use_outputs_schema = use_outputs_schema self.language_model = language_model self.tools = {} if not tools: raise ValueError("You must set the `tools` argument") for tool in tools: self.tools[tool.name] = tool tool_calls_schema = dynamic_tool_calls(tools=tools) self.autonomous = autonomous self.return_inputs_with_trajectory = return_inputs_with_trajectory self.max_iterations = max_iterations self.tool_calls_generator = ChainOfThought( schema=tool_calls_schema, prompt_template=self.prompt_template, examples=self.examples, instructions=self.instructions, use_inputs_schema=self.use_inputs_schema, use_outputs_schema=self.use_outputs_schema, language_model=self.language_model, name="tool_calls_generator_"+self.name, ) if self.schema and self.autonomous: self.final_generator = ChainOfThought( schema=self.schema, language_model=self.language_model, instructions=self.instructions, return_inputs=self.return_inputs_with_trajectory, name="final_generator_"+self.name, ) async def call(self, inputs, training=False): if not inputs: return None if self.autonomous: if not is_chat_messages(inputs): trajectory = await ops.concat( inputs, ChatMessages(), name="trajectory_" + self.name, ) else: trajectory = inputs else: if not is_chat_messages(inputs): raise ValueError( "In interactive mode, the FunctionCallingAgent " "needs an ChatMessages-like data model as inputs" ) trajectory = inputs agent_messages = trajectory.get("messages") if self.autonomous: for i in range(self.max_iterations): tool_calls = await self.tool_calls_generator(trajectory) if not tool_calls: assistant_message = ChatMessage( role=ChatRole.ASSISTANT, content="Something went wrong while trying to decide the next action.", ) agent_messages.append(assistant_message.get_json()) break assistant_message = ChatMessage( role=ChatRole.ASSISTANT, content=tool_calls.get("thinking", ""), ) if not tool_calls.get("tool_calls"): agent_messages.append(assistant_message.get_json()) break tasks = [] tool_calls_ids = [] for tool_call in tool_calls.get("tool_calls"): tool_name = tool_call.get("tool_name") tools_arguments = out_mask_json(tool_call, mask=["tool_name"]) tool_call_id = str(uuid.uuid4()) tool_calls_ids.append(tool_call_id) assistant_message.tool_calls.append( ToolCall( id=tool_call_id, name=tool_name, arguments=tools_arguments, ) ) tasks.append(self.tools[tool_name](**tools_arguments)) agent_messages.append(assistant_message.get_json()) tool_results = await asyncio.gather(*tasks, return_exceptions=True) for j, tool_result in enumerate(tool_results): tool_call_id = tool_calls_ids[j] if isinstance(tool_result, Exception): agent_messages.append( ChatMessage( role=ChatRole.TOOL, tool_call_id=tool_call_id, content="error: %s" % str(tool_result), ).get_json() ) else: agent_messages.append( ChatMessage( role=ChatRole.TOOL, tool_call_id=tool_call_id, content=tool_result, ).get_json() ) trajectory.update({"messages": agent_messages}) if self.schema: return await self.final_generator(trajectory) else: return trajectory else: if len(agent_messages) > 0: if agent_messages[-1].get("role") == ChatRole.ASSISTANT: tasks = [] tool_calls_ids = [] tool_calls = agent_messages[-1].get("tool_calls") for tool_call in tool_calls: tool_name = tool_call.get("name") tools_arguments = tool_call.get("arguments") tool_call_id = tool_call.get("id") tool_calls_ids.append(tool_call_id) tasks.append(self.tools[tool_name](**tools_arguments)) tool_results = await asyncio.gather(*tasks, return_exceptions=True) for j, tool_result in enumerate(tool_results): tool_call_id = tool_calls_ids[j] if isinstance(tool_result, Exception): agent_messages.append( ChatMessage( role=ChatRole.TOOL, tool_call_id=tool_call_id, content="error: %s" % str(tool_result), ).get_json() ) else: agent_messages.append( ChatMessage( role=ChatRole.TOOL, tool_call_id=tool_call_id, content=tool_result, ).get_json() ) trajectory.update({"messages": agent_messages}) tool_calls = await self.tool_calls_generator(trajectory) assistant_message = ChatMessage( role=ChatRole.ASSISTANT, content=tool_calls.get("thinking", ""), tool_calls=[], ) for tool_call in tool_calls.get("tool_calls", []): tool_name = tool_call.get("tool_name") tools_arguments = out_mask_json(tool_call, mask=["tool_name"]) tool_call_id = str(uuid.uuid4()) assistant_message.tool_calls.append( ToolCall( id=tool_call_id, name=tool_name, arguments=tools_arguments, ) ) agent_messages.append(assistant_message.get_json()) trajectory.update({"messages": agent_messages}) if self.return_inputs_with_trajectory: return JsonDataModel( json=trajectory.get_json(), schema=ChatMessages.get_schema(), name=self.name, ) else: return JsonDataModel( json=assistant_message.get_json(), schema=ChatMessage.get_schema(), name=self.name, ) async def compute_output_spec(self, inputs, training=False): if self.autonomous: _ = await self.tool_calls_generator(inputs) if self.schema: return await self.final_generator(inputs) else: return SymbolicDataModel( schema=ChatMessages.get_schema(), name=self.name, ) else: if not is_chat_messages(inputs): raise ValueError( "In interactive mode, the FunctionCallingAgent " "needs an ChatMessages-like data model as inputs" ) _ = await self.tool_calls_generator(inputs) if self.return_inputs_with_trajectory: return SymbolicDataModel( schema=ChatMessages.get_schema(), name=self.name, ) else: return SymbolicDataModel( schema=ChatMessage.get_schema(), name=self.name, ) def get_config(self): config = { "schema": self.schema, "prompt_template": self.prompt_template, "examples": self.examples, "instructions": self.instructions, "use_inputs_schema": self.use_inputs_schema, "use_outputs_schema": self.use_outputs_schema, "autonomous": self.autonomous, "max_iterations": self.max_iterations, "return_inputs_with_trajectory": self.return_inputs_with_trajectory, "name": self.name, "description": self.description, } language_model_config = { "language_model": serialization_lib.serialize_synalinks_object( self.language_model, ) } tools_config = { "tools": [ serialization_lib.serialize_synalinks_object(tool) for tool in self.tools.values() ] } return {**config, **language_model_config, **tools_config} @classmethod def from_config(cls, config): tools = [ serialization_lib.deserialize_synalinks_object(tool) for tool in config.pop("tools") ] language_model = serialization_lib.deserialize_synalinks_object( config.pop("language_model") ) return cls( language_model=language_model, tools=tools, **config, ) ```` ## `get_default_instructions()` The default parallel function calling agent instructions. Source code in `synalinks/src/modules/agents/function_calling_agent.py` ``` def get_default_instructions(): """The default parallel function calling agent instructions.""" return """ Think step by step: Use the thinking field to elaborate what you observe and what do you need to accomplish next. Reflect on prior steps: Review your previous actions and their outcomes to avoid unnecessary repetition. Avoid unnecessary actions: If you already have enough information to complete the user task, return an empty tool calls array. """.strip() ``` ## `Action` Bases: `Module` Use a `LanguageModel` to perform a function call given the input datamodel. This module use structured output to call a given Python function. This module can be used in agents or traditional workflows seamlessly, it use the input data model to infer the function parameters. The output of this module contains the inputs infered by the language model as well as the outputs of the function call. Note: The function **MUST** return a JSON object dict and be asynchronous. Example: ``` import synalinks import asyncio async def main(): class Query(synalinks.DataModel): query: str async def calculate(expression: str): """Calculate the result of a mathematical expression. Args: expression (str): The mathematical expression to calculate, such as '2 + 2'. The expression can contain numbers, operators (+, -, *, /), parentheses, and spaces. """ if not all(char in "0123456789+-*/(). " for char in expression): return { "result": None, "log": "Error: invalid characters in expression", } try: # Evaluate the mathematical expression safely result = round(float(eval(expression, {"__builtins__": None}, {})), 2) return { "result": result, "log": "Successfully executed", } except Exception as e: return { "result": None, "log": f"Error: {e}", } language_model = synalinks.LanguageModel( model="ollama/mistral", ) x0 = synalinks.Input(data_model=Query) x1 = await synalinks.Action( fn=calculate, language_model=language_model, )(x0) program = synalinks.Program( inputs=x0, outputs=x1, name="calculator", description="This program perform the calculation of an expression", ) if __name__ == "__main__": asyncio.run(main()) ``` Parameters: | Name | Type | Description | Default | | -------------------- | --------------- | ---------------------------------------------------------------------------------------------------------------------------- | ---------- | | `fn` | `Callable` | The function to call. | *required* | | `language_model` | `LanguageModel` | The language model to use. | `None` | | `prompt_template` | `str` | The default jinja2 prompt template to use (see Generator). | `None` | | `examples` | `list` | The default examples to use in the prompt (see Generator). | `None` | | `instructions` | `list` | The default instructions to use (see Generator). | `None` | | `seed_instructions` | `list` | Optional. A list of instructions to use as seed for the optimization. If not provided, use the default instructions as seed. | `None` | | `temperature` | `float` | Optional. The temperature for the LM call. | `0.0` | | `use_inputs_schema` | `bool` | Optional. Whether or not use the inputs schema in the prompt (Default to False) (see Generator). | `False` | | `use_outputs_schema` | `bool` | Optional. Whether or not use the outputs schema in the prompt (Default to False) (see Generator). | `False` | | `name` | `str` | Optional. The name of the module. | `None` | | `description` | `str` | Optional. The description of the module. | `None` | | `trainable` | `bool` | Whether the module's variables should be trainable. | `True` | Source code in `synalinks/src/modules/core/action.py` ```` @synalinks_export( [ "synalinks.modules.Action", "synalinks.Action", ] ) class Action(Module): """Use a `LanguageModel` to perform a function call given the input datamodel. This module use structured output to call a given Python function. This module can be used in agents or traditional workflows seamlessly, it use the input data model to infer the function parameters. The output of this module contains the inputs infered by the language model as well as the outputs of the function call. Note: The function **MUST** return a JSON object dict and be asynchronous. Example: ```python import synalinks import asyncio async def main(): class Query(synalinks.DataModel): query: str async def calculate(expression: str): \"""Calculate the result of a mathematical expression. Args: expression (str): The mathematical expression to calculate, such as '2 + 2'. The expression can contain numbers, operators (+, -, *, /), parentheses, and spaces. \""" if not all(char in "0123456789+-*/(). " for char in expression): return { "result": None, "log": "Error: invalid characters in expression", } try: # Evaluate the mathematical expression safely result = round(float(eval(expression, {"__builtins__": None}, {})), 2) return { "result": result, "log": "Successfully executed", } except Exception as e: return { "result": None, "log": f"Error: {e}", } language_model = synalinks.LanguageModel( model="ollama/mistral", ) x0 = synalinks.Input(data_model=Query) x1 = await synalinks.Action( fn=calculate, language_model=language_model, )(x0) program = synalinks.Program( inputs=x0, outputs=x1, name="calculator", description="This program perform the calculation of an expression", ) if __name__ == "__main__": asyncio.run(main()) ``` Args: fn (Callable): The function to call. language_model (LanguageModel): The language model to use. prompt_template (str): The default jinja2 prompt template to use (see `Generator`). examples (list): The default examples to use in the prompt (see `Generator`). instructions (list): The default instructions to use (see `Generator`). seed_instructions (list): Optional. A list of instructions to use as seed for the optimization. If not provided, use the default instructions as seed. temperature (float): Optional. The temperature for the LM call. use_inputs_schema (bool): Optional. Whether or not use the inputs schema in the prompt (Default to False) (see `Generator`). use_outputs_schema (bool): Optional. Whether or not use the outputs schema in the prompt (Default to False) (see `Generator`). name (str): Optional. The name of the module. description (str): Optional. The description of the module. trainable (bool): Whether the module's variables should be trainable. """ def __init__( self, fn, language_model=None, prompt_template=None, examples=None, instructions=None, seed_instructions=None, temperature=0.0, use_inputs_schema=False, use_outputs_schema=False, name=None, description=None, trainable=True, ): super().__init__( name=name, description=description, trainable=trainable, ) self.fn = fn schema = tool_utils.Tool(fn).get_tool_schema() self.language_model = language_model self.prompt_template = prompt_template self.examples = examples self.instructions = instructions self.seed_instructions = seed_instructions self.temperature = temperature self.use_inputs_schema = use_inputs_schema self.use_outputs_schema = use_outputs_schema self.action = Generator( schema=schema, language_model=self.language_model, prompt_template=self.prompt_template, examples=self.examples, instructions=self.instructions, seed_instructions=self.seed_instructions, temperature=self.temperature, use_inputs_schema=self.use_inputs_schema, use_outputs_schema=self.use_outputs_schema, name="generator_"+self.name, ) async def call(self, inputs, training=False): if not inputs: return None fn_inputs = await self.action(inputs, training=training) try: fn_outputs = await self.fn(**fn_inputs.get_json()) except Exception as e: fn_outputs = {"error": str(e)} generic_io = GenericIO(inputs=fn_inputs.get_json(), outputs=fn_outputs) return JsonDataModel( json=GenericAction(action=generic_io.get_json()).get_json(), schema=GenericAction.get_schema(), name=self.name, ) async def compute_output_spec(self, inputs, training=False): _ = await self.action(inputs) return SymbolicDataModel(schema=GenericAction.get_schema(), name=self.name) def get_config(self): config = { "fn": self.fn, "prompt_template": self.prompt_template, "examples": self.examples, "instructions": self.instructions, "seed_instructions": self.seed_instructions, "temperature": self.temperature, "name": self.name, "description": self.description, "trainable": self.trainable, } language_model_config = { "language_model": serialization_lib.serialize_synalinks_object( self.language_model ) } return {**config, **language_model_config} @classmethod def from_config(cls, config): language_model = serialization_lib.deserialize_synalinks_object( config.pop("language_model") ) return cls(language_model=language_model, **config) ```` ## `GenericAction` Bases: `DataModel` A generic action with inputs/outputs Source code in `synalinks/src/modules/core/action.py` ``` class GenericAction(DataModel): """A generic action with inputs/outputs""" action: GenericIO = Field(description="An action already performed") ``` ## `Branch` Bases: `Module` Use a `LanguageModel` to select which module to call based on an arbitrary input, a question and a list of labels. The selected branch output the data model computed using the inputs and module's branch, while the others output `None`. Example: ``` import synalinks import asyncio async def main(): class Query(synalinks.DataModel): query: str class Answer(synalinks.DataModel): answer: str class AnswerWithCritique(synalinks.DataModel): thinking: str critique: str answer: str language_model = synalinks.LanguageModel( model="ollama/mistral", ) x0 = synalinks.Input(data_model=Query) (x1, x2) = await synalinks.Branch( question="What is the difficulty level of the above query?", labels=["easy", "difficult"], branches=[ synalinks.Generator( data_model=Answer, language_model=language_model, ), synalinks.Generator( data_model=AnswerWithCritique, language_model=language_model, ), ], language_model=language_model, )(x0) x3 = x1 | x2 program = synalinks.Program( inputs=x0, outputs=x3, name="adaptative_chain_of_thought", description="Useful to answer step by step only when needed", ) if __name__ == "__main__": asyncio.run(main()) ``` Parameters: | Name | Type | Description | Default | | -------------------- | --------------- | ---------------------------------------------------------------------------------------------------------------------------- | ---------- | | `question` | `str` | The question to ask. | `None` | | `labels` | `list` | The list of labels to choose from (strings). | `None` | | `branches` | `list` | The list of modules or programs to select from. | `None` | | `inject_decision` | `bool` | If True, inject the decision to the branch inputs. (default to True). | `True` | | `return_decision` | `bool` | If True, return the decision with the branch outputs. (default to True). | `True` | | `language_model` | `LanguageModel` | The language model to use. | `None` | | `prompt_template` | `str` | The default jinja2 prompt template to use (see Generator). | `None` | | `examples` | `list` | The default examples to use in the prompt (see Decision). | `None` | | `instructions` | `list` | The default instructions to use (see Decision). | `None` | | `seed_instructions` | `list` | Optional. A list of instructions to use as seed for the optimization. If not provided, use the default instructions as seed. | `None` | | `temperature` | `float` | Optional. The temperature for the LM call. | `0.0` | | `use_inputs_schema` | `bool` | Optional. Whether or not use the inputs schema in the decision prompt (Default to False) (see Decision). | `False` | | `use_outputs_schema` | `bool` | Optional. Whether or not use the outputs schema in the decision prompt (Default to False) (see Decision). | `False` | | `decision_type` | `bool` | Optional. The type of decision module to use. | `Decision` | | `name` | `str` | Optional. The name of the module. | `None` | | `description` | `str` | Optional. The description of the module. | `None` | | `trainable` | `bool` | Whether the module's variables should be trainable. | `True` | Source code in `synalinks/src/modules/core/branch.py` ```` @synalinks_export(["synalinks.modules.Branch", "synalinks.Branch"]) class Branch(Module): """Use a `LanguageModel` to select which module to call based on an arbitrary input, a question and a list of labels. The selected branch output the data model computed using the inputs and module's branch, while the others output `None`. Example: ```python import synalinks import asyncio async def main(): class Query(synalinks.DataModel): query: str class Answer(synalinks.DataModel): answer: str class AnswerWithCritique(synalinks.DataModel): thinking: str critique: str answer: str language_model = synalinks.LanguageModel( model="ollama/mistral", ) x0 = synalinks.Input(data_model=Query) (x1, x2) = await synalinks.Branch( question="What is the difficulty level of the above query?", labels=["easy", "difficult"], branches=[ synalinks.Generator( data_model=Answer, language_model=language_model, ), synalinks.Generator( data_model=AnswerWithCritique, language_model=language_model, ), ], language_model=language_model, )(x0) x3 = x1 | x2 program = synalinks.Program( inputs=x0, outputs=x3, name="adaptative_chain_of_thought", description="Useful to answer step by step only when needed", ) if __name__ == "__main__": asyncio.run(main()) ``` Args: question (str): The question to ask. labels (list): The list of labels to choose from (strings). branches (list): The list of modules or programs to select from. inject_decision (bool): If True, inject the decision to the branch inputs. (default to True). return_decision (bool): If True, return the decision with the branch outputs. (default to True). language_model (LanguageModel): The language model to use. prompt_template (str): The default jinja2 prompt template to use (see `Generator`). examples (list): The default examples to use in the prompt (see `Decision`). instructions (list): The default instructions to use (see `Decision`). seed_instructions (list): Optional. A list of instructions to use as seed for the optimization. If not provided, use the default instructions as seed. temperature (float): Optional. The temperature for the LM call. use_inputs_schema (bool): Optional. Whether or not use the inputs schema in the decision prompt (Default to False) (see `Decision`). use_outputs_schema (bool): Optional. Whether or not use the outputs schema in the decision prompt (Default to False) (see `Decision`). decision_type (bool): Optional. The type of decision module to use. name (str): Optional. The name of the module. description (str): Optional. The description of the module. trainable (bool): Whether the module's variables should be trainable. """ def __init__( self, question=None, labels=None, branches=None, inject_decision=True, return_decision=True, language_model=None, prompt_template=None, examples=None, instructions=None, seed_instructions=None, temperature=0.0, use_inputs_schema=False, use_outputs_schema=False, decision_type=Decision, name=None, description=None, trainable=True, **kwargs, ): super().__init__( name=name, description=description, trainable=trainable, ) if not branches: raise ValueError("The `branches` argument must be provided.") if not isinstance(branches, list): raise ValueError("The `branches` must be a list of `Module` or `Program`.") if len(labels) != len(branches): raise ValueError("The `labels` and `branches` must have the same length.") self.question = question self.labels = labels self.branches = {labels[i]: m for i, m in enumerate(branches)} self.inject_decision = inject_decision self.return_decision = return_decision self.language_model = language_model self.prompt_template = prompt_template self.examples = examples self.instructions = instructions self.seed_instructions = seed_instructions self.temperature = temperature self.use_inputs_schema = use_inputs_schema self.use_outputs_schema = use_outputs_schema self.decision = decision_type( question=self.question, labels=self.labels, language_model=self.language_model, prompt_template=self.prompt_template, examples=self.examples, instructions=self.instructions, seed_instructions=self.seed_instructions, temperature=self.temperature, use_inputs_schema=self.use_inputs_schema, use_outputs_schema=self.use_outputs_schema, name="decision_" + self.name, ) async def call(self, inputs, training=False): outputs = [None] * len(self.branches) if not inputs: return tuple(outputs) decision = await self.decision( inputs, training=training, ) choice = decision.get("choice", decision.get("choices")) if not choice: return tuple(outputs) if self.inject_decision: inputs = await ops.concat( inputs, decision, name="inputs_with_decision_" + self.name, ) tasks = [] async def execute_branch( inputs, module=None, decision=None, return_decision=False ): if not inputs: return None if return_decision: return await ops.logical_and( decision, await module(inputs), ) else: return await module(inputs) for label in self.labels: module = self.branches[label] selected = False if isinstance(choice, str): if label == choice: selected = True elif isinstance(choice, (list, set)): if label in choice: selected = True if selected and module: tasks.append( execute_branch( inputs, module, decision, return_decision=self.return_decision, ) ) else: tasks.append(execute_branch(None)) outputs = await asyncio.gather(*tasks) return tuple(outputs) async def compute_output_spec(self, inputs, training=False): outputs = [] decision = await self.decision( inputs, training=training, ) if self.inject_decision: inputs = await ops.concat( inputs, decision, name="inputs_with_decision_" + self.name, ) for label in self.labels: module = self.branches[label] if self.return_decision: outputs.append( await ops.logical_and( decision, await module( inputs, training=training, ), name="with_decision_" + self.name, ) ) else: outputs.append( await module( inputs, training=training, ) ) return tuple(outputs) def get_config(self): config = { "question": self.question, "labels": self.labels, "inject_decision": self.inject_decision, "return_decision": self.return_decision, "prompt_template": self.prompt_template, "examples": self.examples, "instructions": self.instructions, "seed_instructions": self.seed_instructions, "temperature": self.temperature, "use_inputs_schema": self.use_inputs_schema, "use_outputs_schema": self.use_outputs_schema, "name": self.name, "description": self.description, "trainable": self.trainable, } language_model_config = { "language_model": serialization_lib.serialize_synalinks_object( self.language_model ) } branches_config = { "branches": [ serialization_lib.serialize_synalinks_object(branch) for branch in self.branches.values() ] } return {**config, **language_model_config, **branches_config} @classmethod def from_config(cls, config, custom_objects=None): language_model = serialization_lib.deserialize_synalinks_object( config.pop("language_model") ) branches = [ serialization_lib.deserialize_synalinks_object( branch_config, custom_objects=custom_objects ) for branch_config in config.pop("branches") ] return cls(language_model=language_model, branches=branches, **config) ```` ## `Decision` Bases: `Module` Perform a decision on the given input based on a question and a list of labels. This module dynamically create an `Enum` schema based on the given labels and use it to generate a possible answer using structured output. This ensure that the LM answer is **always** one of the provided labels. Example: ``` import synalinks import asyncio async def main(): language_model = synalinks.LanguageModel( model="ollama/mistral", ) x0 = synalinks.Input(data_model=synalinks.ChatMessages) x1 = await synalinks.Decision( question="What is the danger level of the discussion?", labels=["low", "medium", "high"], language_model=language_model, )(x0) program = synalinks.Program( inputs=x0, outputs=x1, name="discussion_danger_assessment", description="This program assesses the level of danger in a discussion.", ) if __name__ == "__main__": asyncio.run(main()) ``` You can view this module, as performing a single label classification on the input. Parameters: | Name | Type | Description | Default | | -------------------- | --------------- | ---------------------------------------------------------------------------------------------------------------------------- | ------- | | `question` | `str` | The question to ask. | `None` | | `labels` | `list` | The list of labels to choose from (strings). | `None` | | `language_model` | `LanguageModel` | The language model to use. | `None` | | `prompt_template` | `str` | The default jinja2 prompt template to use (see Generator). | `None` | | `examples` | `list` | The default examples to use in the prompt (see Generator). | `None` | | `instructions` | `list` | The default instructions to use (see Generator). | `None` | | `seed_instructions` | `list` | Optional. A list of instructions to use as seed for the optimization. If not provided, use the default instructions as seed. | `None` | | `temperature` | `float` | Optional. The temperature for the LM call. | `0.0` | | `use_inputs_schema` | `bool` | Optional. Whether or not use the inputs schema in the prompt (Default to False) (see Generator). | `False` | | `use_outputs_schema` | `bool` | Optional. Whether or not use the outputs schema in the prompt (Default to False) (see Generator). | `False` | | `name` | `str` | Optional. The name of the module. | `None` | | `description` | `str` | Optional. The description of the module. | `None` | | `trainable` | `bool` | Whether the module's variables should be trainable. | `True` | Source code in `synalinks/src/modules/core/decision.py` ```` @synalinks_export(["synalinks.modules.Decision", "synalinks.Decision"]) class Decision(Module): """Perform a decision on the given input based on a question and a list of labels. This module dynamically create an `Enum` schema based on the given labels and use it to generate a possible answer using structured output. This ensure that the LM answer is **always** one of the provided labels. Example: ```python import synalinks import asyncio async def main(): language_model = synalinks.LanguageModel( model="ollama/mistral", ) x0 = synalinks.Input(data_model=synalinks.ChatMessages) x1 = await synalinks.Decision( question="What is the danger level of the discussion?", labels=["low", "medium", "high"], language_model=language_model, )(x0) program = synalinks.Program( inputs=x0, outputs=x1, name="discussion_danger_assessment", description="This program assesses the level of danger in a discussion.", ) if __name__ == "__main__": asyncio.run(main()) ``` You can view this module, as performing a single label classification on the input. Args: question (str): The question to ask. labels (list): The list of labels to choose from (strings). language_model (LanguageModel): The language model to use. prompt_template (str): The default jinja2 prompt template to use (see `Generator`). examples (list): The default examples to use in the prompt (see `Generator`). instructions (list): The default instructions to use (see `Generator`). seed_instructions (list): Optional. A list of instructions to use as seed for the optimization. If not provided, use the default instructions as seed. temperature (float): Optional. The temperature for the LM call. use_inputs_schema (bool): Optional. Whether or not use the inputs schema in the prompt (Default to False) (see `Generator`). use_outputs_schema (bool): Optional. Whether or not use the outputs schema in the prompt (Default to False) (see `Generator`). name (str): Optional. The name of the module. description (str): Optional. The description of the module. trainable (bool): Whether the module's variables should be trainable. """ def __init__( self, question=None, labels=None, language_model=None, prompt_template=None, examples=None, instructions=None, seed_instructions=None, temperature=0.0, use_inputs_schema=False, use_outputs_schema=False, name=None, description=None, trainable=True, ): super().__init__( name=name, description=description, trainable=trainable, ) if not question: raise ValueError("The `question` argument must be provided.") if not labels: raise ValueError("The `labels` argument must be provided.") if not isinstance(labels, list): raise ValueError("The `labels` parameter must be a list of string.") schema = dynamic_enum(DecisionAnswer.get_schema(), "choice", labels) self.schema = schema self.question = question self.labels = labels self.language_model = language_model self.prompt_template = prompt_template self.examples = examples if not instructions: instructions = default_decision_instructions(self.labels) self.instructions = instructions self.temperature = temperature self.use_inputs_schema = use_inputs_schema self.use_outputs_schema = use_outputs_schema self.decision = Generator( schema=self.schema, language_model=self.language_model, prompt_template=self.prompt_template, examples=self.examples, instructions=self.instructions, temperature=self.temperature, use_inputs_schema=self.use_inputs_schema, use_outputs_schema=self.use_outputs_schema, name="generator_" + self.name, ) async def call(self, inputs, training=False): if not inputs: return None inputs = await ops.concat( inputs, Question(question=self.question), name="inputs_with_question_" + self.name, ) result = await self.decision(inputs, training=training) return result def get_config(self): config = { "question": self.question, "labels": self.labels, "prompt_template": self.prompt_template, "examples": self.examples, "instructions": self.instructions, "seed_instructions": self.seed_instructions, "temperature": self.temperature, "use_inputs_schema": self.use_inputs_schema, "use_outputs_schema": self.use_outputs_schema, "name": self.name, "description": self.description, "trainable": self.trainable, } language_model_config = { "language_model": serialization_lib.serialize_synalinks_object( self.language_model ) } return {**config, **language_model_config} @classmethod def from_config(cls, config): language_model = serialization_lib.deserialize_synalinks_object( config.pop("language_model") ) return cls(language_model=language_model, **config) ```` ## `default_decision_instructions(labels)` The decision default instructions Source code in `synalinks/src/modules/core/decision.py` ``` def default_decision_instructions(labels): """The decision default instructions""" return f""" You will be given a question, your task is to answer step-by-step to choose one the following labels: {labels} """.strip() ``` ## `Generator` Bases: `Module` Use a `LanguageModel` to generate a data model from an arbitrary input data model. Example: ``` import synalinks import asyncio async def main(): class Query(DataModel): query: str = synalinks.Field( description="The user query", ) class AnswerWithCritique(synalinks.DataModel): thinking: str = synalinks.Field( description="Your step by step thinking", ) critique: str = synalinks.Field( description="The critique of the above thinking", ) answer: str = synalinks.Field( description="The correct answer", ) language_model = synalinks.LanguageModel( model="ollama/mistral", ) x0 = synalinks.Input(data_model=Query) x1 = await synalinks.Generator( data_model=AnswerWithCritique, language_model=language_model, )(x0) program = synalinks.Program( inputs=x0, outputs=x1, name="chain_of_thought_with_critique", description="Useful to answer step by step and evaluate your answer", ) if __name__ == "__main__": asyncio.run(main()) ``` Parameters: | Name | Type | Description | Default | | -------------------- | --------------- | ---------------------------------------------------------------------------------------------------------------------------- | --------------- | | `schema` | `dict` | The target JSON schema. If not provided use the data_model to infer it. | `None` | | `data_model` | \`DataModel | SymbolicDataModel | JsonDataModel\` | | `language_model` | `LanguageModel` | The language model to use. | `None` | | `prompt_template` | `str` | The jinja2 prompt template. | `None` | | `examples` | `list` | The default list of examples, the examples are a list of tuples containing input/output JSON pairs. | `None` | | `instructions` | `str` | The default instructions being a string containing instructions for the language model. | `None` | | `seed_instructions` | `list` | Optional. A list of instructions to use as seed for the optimization. If not provided, use the default instructions as seed. | `None` | | `use_inputs_schema` | `bool` | Optional. Whether or not use the inputs schema in the prompt (Default to False). | `False` | | `use_outputs_schema` | `bool` | Optional. Whether or not use the outputs schema in the prompt (Default to False). | `False` | | `return_inputs` | `bool` | Optional. Whether or not to concatenate the inputs to the outputs (Default to False). | `False` | | `temperature` | `float` | Optional. The temperature for the LM call. | `0.0` | | `streaming` | `str` | Optional. If true stream the LM response, enabled only if schema is None and only during inference (not during training). | `False` | | `name` | `str` | Optional. The name of the module. | `None` | | `description` | `str` | Optional. The description of the module. | `None` | | `trainable` | `bool` | Whether the module's variables should be trainable. | `True` | Source code in `synalinks/src/modules/core/generator.py` ```` @synalinks_export(["synalinks.modules.Generator", "synalinks.Generator"]) class Generator(Module): """ Use a `LanguageModel` to generate a data model from an arbitrary input data model. Example: ```python import synalinks import asyncio async def main(): class Query(DataModel): query: str = synalinks.Field( description="The user query", ) class AnswerWithCritique(synalinks.DataModel): thinking: str = synalinks.Field( description="Your step by step thinking", ) critique: str = synalinks.Field( description="The critique of the above thinking", ) answer: str = synalinks.Field( description="The correct answer", ) language_model = synalinks.LanguageModel( model="ollama/mistral", ) x0 = synalinks.Input(data_model=Query) x1 = await synalinks.Generator( data_model=AnswerWithCritique, language_model=language_model, )(x0) program = synalinks.Program( inputs=x0, outputs=x1, name="chain_of_thought_with_critique", description="Useful to answer step by step and evaluate your answer", ) if __name__ == "__main__": asyncio.run(main()) ``` Args: schema (dict): The target JSON schema. If not provided use the `data_model` to infer it. data_model (DataModel | SymbolicDataModel | JsonDataModel): The target data model for structured output. language_model (LanguageModel): The language model to use. prompt_template (str): The jinja2 prompt template. examples (list): The default list of examples, the examples are a list of tuples containing input/output JSON pairs. instructions (str): The default instructions being a string containing instructions for the language model. seed_instructions (list): Optional. A list of instructions to use as seed for the optimization. If not provided, use the default instructions as seed. use_inputs_schema (bool): Optional. Whether or not use the inputs schema in the prompt (Default to False). use_outputs_schema (bool): Optional. Whether or not use the outputs schema in the prompt (Default to False). return_inputs (bool): Optional. Whether or not to concatenate the inputs to the outputs (Default to False). temperature (float): Optional. The temperature for the LM call. streaming (str): Optional. If true stream the LM response, enabled only if `schema` is `None` and only during inference (not during training). name (str): Optional. The name of the module. description (str): Optional. The description of the module. trainable (bool): Whether the module's variables should be trainable. """ def __init__( self, schema=None, data_model=None, language_model=None, prompt_template=None, examples=None, instructions=None, seed_instructions=None, use_inputs_schema=False, use_outputs_schema=False, return_inputs=False, temperature=0.0, streaming=False, name=None, description=None, trainable=True, ): super().__init__( name=name, description=description, trainable=trainable, ) if not schema and data_model: schema = data_model.get_schema() self.schema = schema if not language_model: raise ValueError("You should provide `language_model` parameter.") self.language_model = language_model if not prompt_template: prompt_template = default_prompt_template() self.prompt_template = prompt_template if not examples: examples = [] self.examples = examples if not instructions and self.schema: data_model_keys = list(self.schema["properties"].keys()) instructions = default_instructions(data_model_keys) self.instructions = instructions self.return_inputs = return_inputs self.temperature = temperature self.use_inputs_schema = use_inputs_schema self.use_outputs_schema = use_outputs_schema if schema and streaming: streaming = False self.streaming = streaming predictions = [ Prediction( inputs=example[0], outputs=example[1], reward=None, ).get_json() for example in examples ] if not seed_instructions: seed_instructions = [] self.seed_instructions = seed_instructions seed_candidates = [ { "instructions": seed_instruction, } for seed_instruction in self.seed_instructions ] self.state = self.add_variable( initializer=Instructions( instructions=instructions, examples=predictions, seed_candidates=seed_candidates, ).get_json(), data_model=Instructions, name="state_" + self.name, ) async def call(self, inputs, training=False): if not inputs: return None msgs = self.format_messages(inputs) if self.streaming and not training: streaming = True else: streaming = False result = await ops.predict( msgs, schema=self.schema, language_model=self.language_model, streaming=streaming, name="prediction_" + self.name, temperature=self.temperature, ) if streaming: return result if result: if training: predictions = self.state.get("current_predictions") predictions.append( { "inputs": inputs.get_json(), "outputs": result.get_json(), "reward": None, } ) if self.return_inputs: return await ops.concat( inputs, result, name="with_inputs_" + self.name, ) else: return result return None async def compute_output_spec(self, inputs, training=False): if self.schema: if self.return_inputs: return await ops.concat( inputs, SymbolicDataModel( schema=self.schema, name=self.name, ), name="with_inputs_" + self.name, ) else: return SymbolicDataModel( schema=self.schema, name=self.name, ) else: if self.return_inputs: return await ops.concat( inputs, SymbolicDataModel( schema=ChatMessage.get_schema(), name=self.name, ), name="with_inputs_" + self.name, ) else: return SymbolicDataModel( schema=ChatMessage.get_schema(), name=self.name, ) def format_messages(self, inputs=None): template = jinja2.Template(self.prompt_template) rendered_prompt = template.render( inputs_schema=inputs.get_schema() if self.use_inputs_schema else None, outputs_schema=self.schema if self.use_outputs_schema else None, examples=[ (pred.get("inputs"), pred.get("outputs")) for pred in self.state.get("examples") ], instructions=self.state.get("instructions"), inputs=inputs.get_json() if inputs else None, ) matches = XML_TAGS_REGEX.findall(rendered_prompt) extracted_tags = [(match[0], match[1].strip()) for match in matches] msgs = ChatMessages() for message in extracted_tags: role, content = message if content: msgs.messages.append(ChatMessage(role=role, content=content)) return msgs def get_config(self): config = { "schema": self.schema, "prompt_template": self.prompt_template, "examples": self.examples, "instructions": self.instructions, "seed_instructions": self.seed_instructions, "use_inputs_schema": self.use_inputs_schema, "use_outputs_schema": self.use_outputs_schema, "return_inputs": self.return_inputs, "temperature": self.temperature, "name": self.name, "description": self.description, "trainable": self.trainable, } language_model_config = { "language_model": serialization_lib.serialize_synalinks_object( self.language_model, ) } return { **config, **language_model_config, } @classmethod def from_config(cls, config): language_model = serialization_lib.deserialize_synalinks_object( config.pop("language_model"), ) return cls( language_model=language_model, **config, ) ```` ## `default_prompt_template()` Returns the default prompt template. Returns: | Type | Description | | ----- | ---------------------------- | | `str` | The default prompt template. | Source code in `synalinks/src/modules/core/generator.py` ``` @synalinks_export("synalinks.default_prompt_template") def default_prompt_template(): """Returns the default prompt template. Returns: (str): The default prompt template. """ return """ {{ instructions }} {% if inputs_schema %} {{ inputs_schema }} {% endif %}{% if outputs_schema %} {{ outputs_schema }} {% endif %}{% if examples %} {% for example in examples %} Input: {{ example[0] }} Output: {{ example[1] }} {% endfor %} {% endif %} {% if inputs %} Input: {{ inputs }} Output: {% endif %} """.strip() ``` ## `Identity` Bases: `Module` Identity module. This module should be used as a placeholder when no operation is to be performed. The module just returns its `inputs` argument as output. This module can be really useful during development process in order to implement the whole program architecture before the individual modules. It avoid any data models naming issue that you could have by just forwarding the inputs, that way you can implement the general program architecture, validate it and implement the individual modules later. Example: ``` import synalinks class MyAwesomeModule(synalinks.Program): def __init__( name=None, description=None, trainable=True, ): super().__init__( name=name, description=description, trainable=trainable, ) async def build(self, inputs): outputs = await synalinks.Identity()(inputs) super().__init__( inputs=inputs, outputs=outputs, name=self.name, description=self.description, trainable=self.trainable, ) ``` Parameters: | Name | Type | Description | Default | | ---------- | ------------------- | ------------------------------ | ------- | | `**kwargs` | `keyword arguments` | The default module's arguments | `{}` | Source code in `synalinks/src/modules/core/identity.py` ```` @synalinks_export(["synalinks.modules.Identity", "synalinks.Identity"]) class Identity(Module): """Identity module. This module should be used as a placeholder when no operation is to be performed. The module just returns its `inputs` argument as output. This module can be really useful during development process in order to implement the whole program architecture before the individual modules. It avoid any data models naming issue that you could have by just forwarding the inputs, that way you can implement the general program architecture, validate it and implement the individual modules later. Example: ```python import synalinks class MyAwesomeModule(synalinks.Program): def __init__( name=None, description=None, trainable=True, ): super().__init__( name=name, description=description, trainable=trainable, ) async def build(self, inputs): outputs = await synalinks.Identity()(inputs) super().__init__( inputs=inputs, outputs=outputs, name=self.name, description=self.description, trainable=self.trainable, ) ``` Args: **kwargs (keyword arguments): The default module's arguments """ def __init__(self, **kwargs): super().__init__(**kwargs) self.built = True async def call(self, inputs): if isinstance(inputs, (JsonDataModel, SymbolicDataModel)): return inputs.clone() return tree.map_structure( lambda x: x.clone(), inputs, ) ```` ## `Input(schema=None, data_model=None, optional=False, name=None)` Used to instantiate a `SymbolicDataModel`. A `SymbolicDataModel` is a symbolic data model-like object, which we augment with certain attributes that allow us to build a Synalinks `Program` just by knowing the inputs and outputs of the program (similar to Keras symbolic tensor). Example: ``` import synalinks class Query(synalinks.DataModel): query: str inputs = synalinks.Input(data_model=Query) # You can also create it using a JSON schema like this: inputs = synalinks.Input(schema=Query.get_schema()) # Or using a symbolic datamodel: inputs = synalinks.Input(data_model=Query.to_symbolic_data_model()) ``` Parameters: | Name | Type | Description | Default | | ------------ | ----------- | --------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- | | `schema` | `dict` | A Json schema of the data_model. If not provided uses the data_model argument. | `None` | | `data_model` | `DataModel` | Optional existing data model to wrap into the Input layer. If set, the module will use this data_model rather than creating a new placeholder data model. | `None` | | `optional` | `bool` | Whether the input is optional or not. An optional input can accept None values. | `False` | | `name` | `string` | Optional name string for the module. Should be unique in a program (do not reuse the same name twice). It will be autogenerated if it isn't provided. | `None` | Returns: | Type | Description | | ------------------- | --------------------------------------------------------------------- | | `SymbolicDataModel` | The symbolic data model corresponding to the given data model/schema. | Source code in `synalinks/src/modules/core/input_module.py` ```` @synalinks_export(["synalinks.modules.Input", "synalinks.Input"]) def Input( schema=None, data_model=None, optional=False, name=None, ): """Used to instantiate a `SymbolicDataModel`. A `SymbolicDataModel` is a symbolic data model-like object, which we augment with certain attributes that allow us to build a Synalinks `Program` just by knowing the inputs and outputs of the program (similar to Keras symbolic tensor). Example: ```python import synalinks class Query(synalinks.DataModel): query: str inputs = synalinks.Input(data_model=Query) # You can also create it using a JSON schema like this: inputs = synalinks.Input(schema=Query.get_schema()) # Or using a symbolic datamodel: inputs = synalinks.Input(data_model=Query.to_symbolic_data_model()) ``` Args: schema (dict): A Json schema of the data_model. If not provided uses the `data_model` argument. data_model (DataModel): Optional existing data model to wrap into the `Input` layer. If set, the module will use this data_model rather than creating a new placeholder data model. optional (bool): Whether the input is optional or not. An optional input can accept `None` values. name (string): Optional name string for the module. Should be unique in a program (do not reuse the same name twice). It will be autogenerated if it isn't provided. Returns: (SymbolicDataModel): The symbolic data model corresponding to the given data model/schema. """ module = InputModule( schema=schema, input_data_model=data_model.to_symbolic_data_model() if data_model else None, optional=optional, name=name, ) return module.output ```` ## `Not` Bases: `Module` Not module. This module should be used as a placeholder when no operation is to be performed and the output should be None. This module is useful to implement stop conditions when combined with a conditional branch or as placeholder (like the Identity) before implementing guards that leverage the xor operation. Parameters: | Name | Type | Description | Default | | ---------- | ------------------- | ------------------------------ | ------- | | `**kwargs` | `keyword arguments` | The default module's arguments | `{}` | Source code in `synalinks/src/modules/core/not_module.py` ``` @synalinks_export(["synalinks.modules.Not", "synalinks.Not"]) class Not(Module): """Not module. This module should be used as a placeholder when no operation is to be performed and the output should be None. This module is useful to implement stop conditions when combined with a conditional branch or as placeholder (like the Identity) before implementing guards that leverage the xor operation. Args: **kwargs (keyword arguments): The default module's arguments """ def __init__(self, **kwargs): super().__init__(**kwargs) self.built = True async def call(self, inputs): if isinstance(inputs, (JsonDataModel, SymbolicDataModel)): return None return tree.map_structure( lambda x: None, inputs, ) async def compute_output_spec(self, inputs): if isinstance(inputs, (JsonDataModel, SymbolicDataModel)): return inputs.clone() return tree.map_structure( lambda x: x.clone(), inputs, ) ``` ## `Embedding` Bases: `Module` Extracts and updates the embedding vector of entities. This module is designed to work with `Entity`, `Relation`, `Entities`, `Relations` or `KnowledgeGraph` data models. It supports to mask the entity fields in order to keep **only one** field to embed per entity. **Note**: Each entity should have the *same field* to compute the embedding from like a `name` or `description` field using `in_mask`. **Or** every entity should have *only one field left* after masking using `out_mask` argument. ``` import synalinks import asyncio from typing import Literal class Document(synalinks.Entity): label: Literal["Document"] text: str = synalinks.Field( description="The document content", ) async def main(): inputs = synalinks.Input(data_model=Document) outputs = await synalinks.Embedding( embedding_model=embedding_model, in_mask=["text"], )(inputs) program = synalinks.Program( inputs=inputs, outputs=outputs, name="embbed_document", description="Embbed the given documents" ) doc = Document( label="Document", text="my document", ) result = await program(doc) if __name__ == "__main__": asyncio.run(main()) ``` If you want to process batch asynchronously use `program.predict()` instead, see the [FAQ](https://synalinks.github.io/synalinks/FAQ/#whats-the-difference-between-program-methods-predict-and-__call__) to understand the difference between `program()` and `program.predict()` Here is an example: ``` import synalinks import asyncio import numpy as np from typing import Literal class Document(synalinks.Entity): label: Literal["Document"] text: str = synalinks.Field( description="The document content", ) async def main(): inputs = synalinks.Input(data_model=Document) outputs = await synalinks.Embedding( embedding_model=embedding_model, in_mask=["text"], )(inputs) program = synalinks.Program( inputs=inputs, outputs=outputs, name="embbed_document", description="Embbed the given documents" ) doc1 = Document(label="Document", text="my document 1") doc2 = Document(label="Document", text="my document 2") doc3 = Document(label="Document", text="my document 3") docs = np.array([doc1, doc2, doc3], dtype="object") embedded_docs = await program.predict(docs) if __name__ == "__main__": asyncio.run(main()) ``` Parameters: | Name | Type | Description | Default | | ----------------- | ---------------- | --------------------------------------------------- | ------- | | `embedding_model` | `EmbeddingModel` | The embedding model to use. | `None` | | `in_mask` | `list` | A mask applied to keep specific entity fields. | `None` | | `out_mask` | `list` | A mask applied to remove specific entity fields. | `None` | | `name` | `str` | Optional. The name of the module. | `None` | | `description` | `str` | Optional. The description of the module. | `None` | | `trainable` | `bool` | Whether the module's variables should be trainable. | `False` | Source code in `synalinks/src/modules/knowledge/embedding.py` ```` @synalinks_export( [ "synalinks.modules.Embedding", "synalinks.Embedding", ] ) class Embedding(Module): """Extracts and updates the embedding vector of entities. This module is designed to work with `Entity`, `Relation`, `Entities`, `Relations` or `KnowledgeGraph` data models. It supports to mask the entity fields in order to keep **only one** field to embed per entity. **Note**: Each entity should have the *same field* to compute the embedding from like a `name` or `description` field using `in_mask`. **Or** every entity should have *only one field left* after masking using `out_mask` argument. ```python import synalinks import asyncio from typing import Literal class Document(synalinks.Entity): label: Literal["Document"] text: str = synalinks.Field( description="The document content", ) async def main(): inputs = synalinks.Input(data_model=Document) outputs = await synalinks.Embedding( embedding_model=embedding_model, in_mask=["text"], )(inputs) program = synalinks.Program( inputs=inputs, outputs=outputs, name="embbed_document", description="Embbed the given documents" ) doc = Document( label="Document", text="my document", ) result = await program(doc) if __name__ == "__main__": asyncio.run(main()) ``` If you want to process batch asynchronously use `program.predict()` instead, see the [FAQ](https://synalinks.github.io/synalinks/FAQ/#whats-the-difference-between-program-methods-predict-and-__call__) to understand the difference between `program()` and `program.predict()` Here is an example: ```python import synalinks import asyncio import numpy as np from typing import Literal class Document(synalinks.Entity): label: Literal["Document"] text: str = synalinks.Field( description="The document content", ) async def main(): inputs = synalinks.Input(data_model=Document) outputs = await synalinks.Embedding( embedding_model=embedding_model, in_mask=["text"], )(inputs) program = synalinks.Program( inputs=inputs, outputs=outputs, name="embbed_document", description="Embbed the given documents" ) doc1 = Document(label="Document", text="my document 1") doc2 = Document(label="Document", text="my document 2") doc3 = Document(label="Document", text="my document 3") docs = np.array([doc1, doc2, doc3], dtype="object") embedded_docs = await program.predict(docs) if __name__ == "__main__": asyncio.run(main()) ``` Args: embedding_model (EmbeddingModel): The embedding model to use. in_mask (list): A mask applied to keep specific entity fields. out_mask (list): A mask applied to remove specific entity fields. name (str): Optional. The name of the module. description (str): Optional. The description of the module. trainable (bool): Whether the module's variables should be trainable. """ def __init__( self, embedding_model=None, in_mask=None, out_mask=None, name=None, description=None, trainable=False, ): super().__init__( name=name, description=description, trainable=trainable, ) self.embedding_model = embedding_model self.in_mask = in_mask self.out_mask = out_mask async def _embed_entity(self, entity): # # Check if entity is already embedded and has valid embeddings embeddings = entity.get("embeddings") if embeddings: warnings.warn( "Embeddings already generated for entity.Returning original entity." ) return JsonDataModel( json=entity.get_json(), schema=entity.get_schema(), name=entity.name + "_embedded", ) # Apply masking to the entity filtered_entity = entity # Default to original entity if self.out_mask: filtered_entity = await ops.out_mask( entity, mask=self.out_mask, recursive=False, name=entity.name + "_out_mask", ) elif self.in_mask: filtered_entity = await ops.in_mask( entity, mask=self.in_mask, recursive=False, name=entity.name + "_in_mask", ) # Generate embeddings embeddings = await ops.embedding( filtered_entity, embedding_model=self.embedding_model, name=entity.name + "_embedding", ) # Validate embeddings if not embeddings or not embeddings.get("embeddings"): warnings.warn( f"No embeddings generated for entity {entity.name}. " "Please check that your schema is correct." ) return None embedding_list = embeddings.get("embeddings") if len(embedding_list) != 1: warnings.warn( "Entities can only have one embedding vector per entity, " "adjust `Embedding` module's `in_mask` or `out_mask` " "to keep only one field. Skipping embedding." ) return None # Add embedding to entity vector = embedding_list[0] return await ops.concat( entity, EmbeddingVector(embedding=vector), name=entity.name + "_embedded", ) async def _embed_relation(self, relation): subj = relation.get_nested_entity("subj") obj = relation.get_nested_entity("obj") if not subj or not obj: return None embedded_subj = await self._embed_entity(subj) embedded_obj = await self._embed_entity(obj) relation_json = copy.deepcopy(relation.get_json()) relation_json.update( { "subj": embedded_subj.get_json(), "obj": embedded_obj.get_json(), } ) outputs_schema = copy.deepcopy(relation.get_schema()) # Update schema definitions for embedded entities if outputs_schema.get("$defs"): subj_label = subj.get("label") obj_label = obj.get("label") if subj_label and subj_label in outputs_schema["$defs"]: embedded_subj_schema = embedded_subj.get_schema() if embedded_subj_schema.get("properties"): outputs_schema["$defs"][subj_label]["properties"].update( embedded_subj_schema["properties"] ) if obj_label and obj_label in outputs_schema["$defs"]: embedded_obj_schema = embedded_obj.get_schema() if embedded_obj_schema.get("properties"): outputs_schema["$defs"][obj_label]["properties"].update( embedded_obj_schema["properties"] ) return JsonDataModel( json=relation_json, schema=outputs_schema, name=relation.name + "_embedded", ) async def call(self, inputs): if not inputs: return None if is_knowledge_graph(inputs): entities_json = [] relations_json = [] outputs_schema = copy.deepcopy(inputs.get_schema()) # Process entities for entity in inputs.get_nested_entity_list("entities"): embedded_entity = await self._embed_entity(entity) if embedded_entity: entities_json.append(embedded_entity.get_json()) # Update schema definitions if outputs_schema.get("$defs"): entity_label = entity.get("label") if entity_label and entity_label in outputs_schema["$defs"]: embedded_schema = embedded_entity.get_schema() if embedded_schema.get("properties"): outputs_schema["$defs"][entity_label][ "properties" ].update(embedded_schema["properties"]) # Process relations for relation in inputs.get_nested_entity_list("relations"): embedded_relation = await self._embed_relation(relation) if embedded_relation: relations_json.append(embedded_relation.get_json()) embedded_schema = embedded_relation.get_schema() if embedded_schema.get("$defs") and outputs_schema.get("$defs"): for def_key, def_value in embedded_schema["$defs"].items(): if def_key in outputs_schema["$defs"]: # Merge properties if they exist if def_value.get("properties") and outputs_schema[ "$defs" ][def_key].get("properties"): outputs_schema["$defs"][def_key]["properties"].update( def_value["properties"] ) else: outputs_schema["$defs"][def_key] = def_value # Update output JSON outputs_json = inputs.get_json() outputs_json.update({"entities": entities_json, "relations": relations_json}) return JsonDataModel( json=outputs_json, schema=outputs_schema, name=inputs.name + "_embedded", ) elif is_entities(inputs): entities_json = [] outputs_schema = copy.deepcopy(inputs.get_schema()) # Process all entities and collect schema updates for entity in inputs.get_nested_entity_list("entities"): embedded_entity = await self._embed_entity(entity) if embedded_entity: entities_json.append(embedded_entity.get_json()) # Update schema definitions if outputs_schema.get("$defs"): entity_label = entity.get("label") if entity_label and entity_label in outputs_schema["$defs"]: embedded_schema = embedded_entity.get_schema() if embedded_schema.get("properties"): outputs_schema["$defs"][entity_label][ "properties" ].update(embedded_schema["properties"]) # Update output JSON with embedded entities outputs_json = inputs.get_json() outputs_json.update({"entities": entities_json}) return JsonDataModel( json=outputs_json, schema=outputs_schema, name=inputs.name + "_embedded", ) elif is_relations(inputs): relations_json = [] outputs_schema = copy.deepcopy(inputs.get_schema()) # Process all relations for relation in inputs.get_nested_entity_list("relations"): embedded_relation = await self._embed_relation(relation) if embedded_relation: relations_json.append(embedded_relation.get_json()) # Merge schema definitions from embedded relation embedded_schema = embedded_relation.get_schema() if embedded_schema.get("$defs") and outputs_schema.get("$defs"): for def_key, def_value in embedded_schema["$defs"].items(): if def_key in outputs_schema["$defs"]: # Merge properties if they exist if def_value.get("properties") and outputs_schema[ "$defs" ][def_key].get("properties"): outputs_schema["$defs"][def_key]["properties"].update( def_value["properties"] ) else: outputs_schema["$defs"][def_key] = def_value # Update output JSON outputs_json = inputs.get_json() outputs_json.update({"relations": relations_json}) return JsonDataModel( json=outputs_json, schema=outputs_schema, name=inputs.name + "_embedded", ) elif is_relation(inputs): return await self._embed_relation(inputs) elif is_entity(inputs): return await self._embed_entity(inputs) else: return None async def compute_output_spec(self, inputs): return inputs.clone(name=inputs.name + "_embedded") def get_config(self): config = { "in_mask": self.in_mask, "out_mask": self.out_mask, "name": self.name, "description": self.description, "trainable": self.trainable, } embedding_model_config = { "embedding_model": serialization_lib.serialize_synalinks_object( self.embedding_model ) } return {**embedding_model_config, **config} @classmethod def from_config(cls, config): embedding_model = serialization_lib.deserialize_synalinks_object( config.pop("embedding_model") ) return cls(embedding_model=embedding_model, **config) ```` ## `UpdateKnowledge` Bases: `Module` Update the given knowledge base. This module requires an `Entity`, `Relation`, `Entities`, `Relations` or `KnowledgeGraph` data model as input. This module perform alignment automatically, also called deduplication, by using the similarity search of the knowledge base. This way of performing alignment is more performant than using a linear alignement algorithm as it use the hierarchical small world neighbors (HSWN) algorithm of the knowledge base. It however needs to have the entities embeded using the `Embedding` module before updating the knwoledge base. Parameters: | Name | Type | Description | Default | | ---------------- | --------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- | | `knowledge_base` | `KnowledgeBase` | The knowledge base to update. | `None` | | `threshold` | `float` | Similarity threshold for entity alignment. Entities with similarity above this threshold may be merged. Should be between 0.0 and 1.0 (Defaults to 0.8). | `0.8` | | `name` | `str` | Optional. The name of the module. | `None` | | `description` | `str` | Optional. The description of the module. | `None` | | `trainable` | `bool` | Whether the module's variables should be trainable. | `False` | Source code in `synalinks/src/modules/knowledge/update_knowledge.py` ``` @synalinks_export( [ "synalinks.modules.UpdateKnowledge", "synalinks.UpdateKnowledge", ] ) class UpdateKnowledge(Module): """Update the given knowledge base. This module requires an `Entity`, `Relation`, `Entities`, `Relations` or `KnowledgeGraph` data model as input. This module perform alignment automatically, also called deduplication, by using the similarity search of the knowledge base. This way of performing alignment is more performant than using a linear alignement algorithm as it use the hierarchical small world neighbors (HSWN) algorithm of the knowledge base. It however needs to have the entities embeded using the `Embedding` module before updating the knwoledge base. Args: knowledge_base (KnowledgeBase): The knowledge base to update. threshold (float): Similarity threshold for entity alignment. Entities with similarity above this threshold may be merged. Should be between 0.0 and 1.0 (Defaults to 0.8). name (str): Optional. The name of the module. description (str): Optional. The description of the module. trainable (bool): Whether the module's variables should be trainable. """ def __init__( self, knowledge_base=None, threshold=0.8, name=None, description=None, trainable=False, ): super().__init__( name=name, description=description, trainable=trainable, ) self.knowledge_base = knowledge_base self.threshold = threshold async def call(self, inputs): if not inputs: return None if is_knowledge_graph(inputs): for entity in inputs.get_nested_entity_list("entities"): _ = await ops.update_knowledge( entity, knowledge_base=self.knowledge_base, threshold=self.threshold, name=entity.name + "_updated", ) for relation in inputs.get_nested_entity_list("relations"): subj = relation.get_nested_entity("subj") if not subj: continue _ = await ops.update_knowledge( subj, knowledge_base=self.knowledge_base, threshold=self.threshold, name=subj.name + "_updated", ) obj = relation.get_nested_entity("obj") if not obj: continue _ = await ops.update_knowledge( obj, knowledge_base=self.knowledge_base, threshold=self.threshold, name=obj.name + "_updated", ) _ = await ops.update_knowledge( relation, knowledge_base=self.knowledge_base, threshold=self.threshold, name=relation.name + "_updated", ) return inputs.clone(name=inputs.name + "_updated") elif is_entities(inputs): for entity in inputs.get_nested_entity_list("entities"): _ = await ops.update_knowledge( entity, knowledge_base=self.knowledge_base, threshold=self.threshold, name=entity.name + "_updated", ) return inputs.clone(name=inputs.name + "_updated") elif is_relations(inputs): for relation in inputs.get_nested_entity_list("relations"): subj = relation.get_nested_entity("subj") if not subj: continue _ = await ops.update_knowledge( subj, knowledge_base=self.knowledge_base, threshold=self.threshold, name=subj.name + "_updated", ) obj = relation.get_nested_entity("obj") if not obj: continue _ = await ops.update_knowledge( obj, knowledge_base=self.knowledge_base, threshold=self.threshold, name=obj.name + "_updated", ) _ = await ops.update_knowledge( relation, knowledge_base=self.knowledge_base, threshold=self.threshold, name=relation.name + "_updated", ) return inputs.clone(name=inputs.name + "_updated") elif is_relation(inputs): subj = inputs.get_nested_entity("subj") _ = await ops.update_knowledge( subj, knowledge_base=self.knowledge_base, threshold=self.threshold, name=subj.name + "_updated", ) obj = inputs.get_nested_entity("obj") _ = await ops.update_knowledge( obj, knowledge_base=self.knowledge_base, threshold=self.threshold, name=obj.name + "_updated", ) return inputs.clone(name=inputs.name + "_updated") elif is_entity(inputs): _ = await ops.update_knowledge( inputs, knowledge_base=self.knowledge_base, threshold=self.threshold, name=inputs.name + "_updated", ) return inputs.clone(name=inputs.name + "_updated") else: return None async def compute_output_spec(self, inputs): return inputs.clone() def get_config(self): config = { "threshold": self.threshold, "name": self.name, "description": self.description, "trainable": self.trainable, } knowledge_base_config = { "knowledge_base": serialization_lib.serialize_synalinks_object( self.knowledge_base ) } return {**knowledge_base_config, **config} @classmethod def from_config(cls, config): knowledge_base = serialization_lib.deserialize_synalinks_object( config.pop("knowledge_base") ) return cls(knowledge_base=knowledge_base, **config) ``` ## `And` Bases: `Module` Perform a logical And operation. It takes as input a list of data models, and returns a concatenation of them. If any input is None, then it output None. Table: | `x1` | `x2` | Logical And (`&`) | | ------ | ------ | ----------------- | | `x1` | `x2` | `x1 + x2` | | `x1` | `None` | `None` | | `None` | `x2` | `None` | | `None` | `None` | `None` | Parameters: | Name | Type | Description | Default | | ---------- | ------------------- | ------------------------------------------ | ------- | | `**kwargs` | `keyword arguments` | Standard keyword arguments for the module. | `{}` | Source code in `synalinks/src/modules/merging/logical_and.py` ``` @synalinks_export( [ "synalinks.And", "synalinks.modules.And", ] ) class And(Module): """Perform a logical And operation. It takes as input a list of data models, and returns a concatenation of them. If any input is None, then it output None. Table: | `x1` | `x2` | Logical And (`&`) | | ------ | ------ | ----------------- | | `x1` | `x2` | `x1 + x2` | | `x1` | `None` | `None` | | `None` | `x2` | `None` | | `None` | `None` | `None` | Args: **kwargs (keyword arguments): Standard keyword arguments for the module. """ def __init__(self, **kwargs): super().__init__(**kwargs) async def call(self, inputs, training=False): output = inputs[0] for i in range(1, len(inputs)): output = await ops.logical_and( output, inputs[i], name=f"module_and_{i}_" + self.name, ) return output ``` ## `Concat` Bases: `Module` Perform a concatenation operation. It takes as input a list of data models, and returns a concatenation of them. If any input is None, an exception is raised. Table: | `x1` | `x2` | Concat (`+`) | | ------ | ------ | ------------ | | `x1` | `x2` | `x1 + x2` | | `x1` | `None` | `Exception` | | `None` | `x2` | `Exception` | | `None` | `None` | `Exception` | Parameters: | Name | Type | Description | Default | | ---------- | ------------------- | ------------------------------------------ | ------- | | `**kwargs` | `keyword arguments` | Standard keyword arguments for the module. | `{}` | Source code in `synalinks/src/modules/merging/concat.py` ``` @synalinks_export( [ "synalinks.Concat", "synalinks.Concatenate", "synalinks.modules.Concat", "synalinks.modules.Concatenate", ] ) class Concat(Module): """Perform a concatenation operation. It takes as input a list of data models, and returns a concatenation of them. If any input is None, an exception is raised. Table: | `x1` | `x2` | Concat (`+`) | | ------ | ------ | ----------------- | | `x1` | `x2` | `x1 + x2` | | `x1` | `None` | `Exception` | | `None` | `x2` | `Exception` | | `None` | `None` | `Exception` | Args: **kwargs (keyword arguments): Standard keyword arguments for the module. """ def __init__(self, **kwargs): super().__init__(**kwargs) async def call(self, inputs, training=False): output = inputs[0] for i in range(1, len(inputs)): output = await ops.concat( output, inputs[i], name=f"module_concat_{i}_" + self.name, ) return output ``` ## `Or` Bases: `Module` Perform a logical Or operation. It takes as input a list of data models, and returns a concatenation of them (if all are provided) otherwise it output the one that is not None. If any input is None, it is ignored. Table: | `x1` | `x2` | Logical Or (`|`) | | --- | --- | --- | | `x1` | `x2` | `x1 + x2` | | `x1` | `None` | `x1` | | `None` | `x2` | `x2` | | `None` | `None` | `None` | Parameters: | Name | Type | Description | Default | | ---------- | ------------------- | ------------------------------------------ | ------- | | `**kwargs` | `keyword arguments` | Standard keyword arguments for the module. | `{}` | Source code in `synalinks/src/modules/merging/logical_or.py` ``` @synalinks_export( [ "synalinks.Or", "synalinks.modules.Or", ] ) class Or(Module): """Perform a logical Or operation. It takes as input a list of data models, and returns a concatenation of them (if all are provided) otherwise it output the one that is not None. If any input is None, it is ignored. Table: | `x1` | `x2` | Logical Or (`|`) | | ------ | ------ | ---------------- | | `x1` | `x2` | `x1 + x2` | | `x1` | `None` | `x1` | | `None` | `x2` | `x2` | | `None` | `None` | `None` | Args: **kwargs (keyword arguments): Standard keyword arguments for the module. """ def __init__(self, **kwargs): super().__init__(**kwargs) async def call(self, inputs, training=False): output = inputs[0] for i in range(1, len(inputs)): output = await ops.logical_or( output, inputs[i], name=f"module_or_{i}_" + self.name, ) return output ``` ## `Xor` Bases: `Module` Perform a logical Xor operation. It takes as input a list of data models, If more than two data models are not None, then it output None. otherwise it output the one that is not None. Table: | `x1` | `x2` | Logical Xor (`^`) | | ------ | ------ | ----------------- | | `x1` | `x2` | `None` | | `x1` | `None` | `x1` | | `None` | `x2` | `x2` | | `None` | `None` | `None` | Parameters: | Name | Type | Description | Default | | ---------- | ------------------- | ------------------------------------------ | ------- | | `**kwargs` | `keyword arguments` | Standard keyword arguments for the module. | `{}` | Source code in `synalinks/src/modules/merging/logical_xor.py` ``` @synalinks_export( [ "synalinks.Xor", "synalinks.modules.Xor", ] ) class Xor(Module): """Perform a logical Xor operation. It takes as input a list of data models, If more than two data models are not None, then it output None. otherwise it output the one that is not None. Table: | `x1` | `x2` | Logical Xor (`^`)| | ------ | ------ | ---------------- | | `x1` | `x2` | `None` | | `x1` | `None` | `x1` | | `None` | `x2` | `x2` | | `None` | `None` | `None` | Args: **kwargs (keyword arguments): Standard keyword arguments for the module. """ def __init__(self, **kwargs): super().__init__(**kwargs) async def compute_output_spec(self, inputs, training=False): return inputs[0].clone() async def call(self, inputs, training=False): output = inputs[0] for i in range(1, len(inputs)): if inputs[i]: if not output: output = inputs[i] else: return None return output.clone(name=self.name) ``` ## `EntityRetriever` Bases: `Module` Retrieve entities from a knowledge base, based on the embedding vector. This module is useful to implement vector-only (retrieval augmented generation) RAG systems, for KAG (knowledge augmented generation) systems see the `KnowledgeRetriever` module. If you give multiple entity models to this module, the LM will select the most suitable one to perform the search. Having multiple entity models to search for is an easy way to enhance the performance of you RAG system by having multiple indexes (one per entity model type). ``` import synalinks import asyncio from typing import Literal class Query(synalinks.DataModel): query: str = synalinks.Field( description="The user query", ) class Answer(synalinks.DataModel): query: str = synalinks.Field( description="The answer to the user query", ) class Document(synalinks.Entity): label: Literal["Document"] filename: str = synalinks.Field( description="The document's filename", ) text: str = synalinks.Field( description="The document's text", ) class Chunk(synalinks.Entity): label: Literal["Chunk"] text: str = synalinks.Field( description="The chunk's text", ) class IsPartOf(synalinks.Relation): subj: Chunk label: Literal["IsPartOf"] obj: Document language_model = synalinks.LanguageModel( model="ollama/mistral", ) embedding_model = synalinks.EmbeddingModel( model="ollama/mxbai-embed-large" ) knowledge_base = synalinks.KnowledgeBase( uri="neo4j://localhost:7687", entity_models=[Country, City], relation_models=[IsCapitalOf, IsCityOf], embedding_model=embedding_model, metric="cosine", ) async def main(): inputs = synalinks.Input(data_model=Query) x = await synalinks.EntityRetriever( entity_models=[Chunk], language_model=language_model, knowledge_base=knowledge_base, )(inputs) outputs = await synalinks.Generator( data_model=Answer, language_model=language_model, )(x) program = synalinks.Program( inputs=inputs, outputs=outputs, name="rag_program", description="A naive RAG program", ) if __name__ == "__main__": asyncio.run(main()) ``` Parameters: | Name | Type | Description | Default | | -------------------- | --------------- | -------------------------------------------------------------------------------------------------------------------------------------------------- | ------- | | `knowledge_base` | `KnowledgeBase` | The knowledge base to use. | `None` | | `language_model` | `LanguageModel` | The language model to use. | `None` | | `entity_models` | `list` | The list of entities models to search for being a list of Entity data models. | `None` | | `k` | `int` | Maximum number of similar entities to return (Defaults to 10). | `10` | | `threshold` | `float` | Minimum similarity score for results. Entities with similarity below this threshold are excluded. Should be between 0.0 and 1.0 (Defaults to 0.5). | `0.5` | | `prompt_template` | `str` | The default jinja2 prompt template to use (see Generator). | `None` | | `examples` | `list` | The default list of examples, the examples are a list of tuples containing input/output JSON pairs. | `None` | | `instructions` | `str` | The default instructions being a string containing instructions for the language model. | `None` | | `seed_instructions` | `list` | Optional. A list of instructions to use as seed for the optimization. If not provided, use the default instructions as seed. | `None` | | `temperature` | `float` | Optional. The temperature for the LM call. | `0.0` | | `use_inputs_schema` | `bool` | Optional. Whether or not use the inputs schema in the prompt (Default to False) (see Generator). | `False` | | `use_outputs_schema` | `bool` | Optional. Whether or not use the outputs schema in the prompt (Default to False) (see Generator). | `False` | | `return_inputs` | `bool` | Optional. Whether or not to concatenate the inputs to the outputs (Default to True). | `True` | | `return_query` | `bool` | Optional. Whether or not to concatenate the search query to the outputs (Default to True). | `True` | | `name` | `str` | Optional. The name of the module. | `None` | | `description` | `str` | Optional. The description of the module. | `None` | | `trainable` | `bool` | Whether the module's variables should be trainable. | `True` | Source code in `synalinks/src/modules/retrievers/entity_retriever.py` ```` @synalinks_export( [ "synalinks.modules.EntityRetriever", "synalinks.EntityRetriever", ] ) class EntityRetriever(Module): """Retrieve entities from a knowledge base, based on the embedding vector. This module is useful to implement vector-only (retrieval augmented generation) RAG systems, for KAG (knowledge augmented generation) systems see the `KnowledgeRetriever` module. If you give multiple entity models to this module, the LM will select the most suitable one to perform the search. Having multiple entity models to search for is an easy way to enhance the performance of you RAG system by having multiple indexes (one per entity model type). ```python import synalinks import asyncio from typing import Literal class Query(synalinks.DataModel): query: str = synalinks.Field( description="The user query", ) class Answer(synalinks.DataModel): query: str = synalinks.Field( description="The answer to the user query", ) class Document(synalinks.Entity): label: Literal["Document"] filename: str = synalinks.Field( description="The document's filename", ) text: str = synalinks.Field( description="The document's text", ) class Chunk(synalinks.Entity): label: Literal["Chunk"] text: str = synalinks.Field( description="The chunk's text", ) class IsPartOf(synalinks.Relation): subj: Chunk label: Literal["IsPartOf"] obj: Document language_model = synalinks.LanguageModel( model="ollama/mistral", ) embedding_model = synalinks.EmbeddingModel( model="ollama/mxbai-embed-large" ) knowledge_base = synalinks.KnowledgeBase( uri="neo4j://localhost:7687", entity_models=[Country, City], relation_models=[IsCapitalOf, IsCityOf], embedding_model=embedding_model, metric="cosine", ) async def main(): inputs = synalinks.Input(data_model=Query) x = await synalinks.EntityRetriever( entity_models=[Chunk], language_model=language_model, knowledge_base=knowledge_base, )(inputs) outputs = await synalinks.Generator( data_model=Answer, language_model=language_model, )(x) program = synalinks.Program( inputs=inputs, outputs=outputs, name="rag_program", description="A naive RAG program", ) if __name__ == "__main__": asyncio.run(main()) ``` Args: knowledge_base (KnowledgeBase): The knowledge base to use. language_model (LanguageModel): The language model to use. entity_models (list): The list of entities models to search for being a list of `Entity` data models. k (int): Maximum number of similar entities to return (Defaults to 10). threshold (float): Minimum similarity score for results. Entities with similarity below this threshold are excluded. Should be between 0.0 and 1.0 (Defaults to 0.5). prompt_template (str): The default jinja2 prompt template to use (see `Generator`). examples (list): The default list of examples, the examples are a list of tuples containing input/output JSON pairs. instructions (str): The default instructions being a string containing instructions for the language model. seed_instructions (list): Optional. A list of instructions to use as seed for the optimization. If not provided, use the default instructions as seed. temperature (float): Optional. The temperature for the LM call. use_inputs_schema (bool): Optional. Whether or not use the inputs schema in the prompt (Default to False) (see `Generator`). use_outputs_schema (bool): Optional. Whether or not use the outputs schema in the prompt (Default to False) (see `Generator`). return_inputs (bool): Optional. Whether or not to concatenate the inputs to the outputs (Default to True). return_query (bool): Optional. Whether or not to concatenate the search query to the outputs (Default to True). name (str): Optional. The name of the module. description (str): Optional. The description of the module. trainable (bool): Whether the module's variables should be trainable. """ def __init__( self, knowledge_base=None, language_model=None, entity_models=None, k=10, threshold=0.5, prompt_template=None, examples=None, instructions=None, seed_instructions=None, temperature=0.0, use_inputs_schema=False, use_outputs_schema=False, return_inputs=True, return_query=True, name=None, description=None, trainable=True, ): super().__init__( name=name, description=description, trainable=trainable, ) self.knowledge_base = knowledge_base self.language_model = language_model self.k = k self.threshold = threshold self.prompt_template = prompt_template self.examples = examples if entity_models: node_labels = [ entity_model.get_schema().get("title") for entity_model in entity_models ] else: node_labels = [] if not instructions: instructions = default_entity_retriever_instructions(node_labels) self.instructions = instructions self.seed_instructions = seed_instructions self.temperature = temperature self.use_inputs_schema = use_inputs_schema self.use_outputs_schema = use_outputs_schema self.return_inputs = return_inputs self.return_query = return_query self.schema = SimilaritySearch.get_schema() self.schema = dynamic_enum( schema=self.schema, prop_to_update="entity_label", labels=node_labels, description="The entity label to search for", ) self.query_generator = Generator( schema=self.schema, language_model=self.language_model, prompt_template=self.prompt_template, examples=self.examples, instructions=self.instructions, seed_instructions=self.seed_instructions, temperature=self.temperature, use_inputs_schema=self.use_inputs_schema, use_outputs_schema=self.use_outputs_schema, return_inputs=False, name="query_generator_" + self.name, ) async def call(self, inputs, training=False): if not inputs: return None similarity_search_query = await self.query_generator( inputs, training=training, ) if self.return_inputs: if self.return_query: return await ops.concat( inputs, await ops.concat( similarity_search_query, await ops.similarity_search( similarity_search_query, knowledge_base=self.knowledge_base, k=self.k, threshold=self.threshold, name="similarity_search_" + self.name, ), name="similarity_search_with_query_and_inputs_"+self.name, ), ) else: return await ops.concat( inputs, await ops.similarity_search( similarity_search_query, knowledge_base=self.knowledge_base, k=self.k, threshold=self.threshold, name="similarity_search_" + self.name, ), name="similarity_search_with_inputs_" + self.name, ) else: if self.return_query: return await ops.concat( similarity_search_query, await ops.similarity_search( similarity_search_query, knowledge_base=self.knowledge_base, k=self.k, threshold=self.threshold, name="similarity_search_" + self.name, ), name="similarity_search_with_query_" + self.name, ) else: return await ops.similarity_search( similarity_search_query, knowledge_base=self.knowledge_base, k=self.k, threshold=self.threshold, name="similarity_search_" + self.name, ) def get_config(self): config = { "k": self.question, "threshold": self.labels, "prompt_template": self.prompt_template, "examples": self.examples, "instructions": self.instructions, "seed_instructions": self.seed_instructions, "temperature": self.temperature, "use_inputs_schema": self.use_inputs_schema, "use_outputs_schema": self.use_outputs_schema, "return_inputs": self.return_inputs, "return_query": self.return_query, "name": self.name, "description": self.description, "trainable": self.trainable, } knowledge_base_config = { "knowledge_base": serialization_lib.serialize_synalinks_object( self.knowledge_base, ) } language_model_config = { "language_model": serialization_lib.serialize_synalinks_object( self.teacher_language_model, ) } entity_models_config = { "entity_models": [ ( serialization_lib.serialize_synalinks_object( entity_model.to_symbolic_data_model( name="entity_model" + (f"_{i}_" if i > 0 else "_") + self.name ) ) if not is_symbolic_data_model(entity_model) else serialization_lib.serialize_synalinks_object(entity_model) ) for i, entity_model in enumerate(self.entity_models) ] } return { **config, **knowledge_base_config, **language_model_config, **entity_models_config, } @classmethod def from_config(cls, config): knowledge_base = serialization_lib.deserialize_synalinks_object( config.pop("knowledge_base"), ) language_model = serialization_lib.deserialize_synalinks_object( config.pop("language_model"), ) entity_models_config = config.pop("entity_models") entity_models = [ serialization_lib.deserialize_synalinks_object(entity_model) for entity_model in entity_models_config ] return cls( knowledge_base=knowledge_base, entity_models=entity_models, language_model=teacher_language_model, **config, ) ```` ## `default_entity_retriever_instructions(entity_labels)` The default instructions for the entity retriever Source code in `synalinks/src/modules/retrievers/entity_retriever.py` ``` def default_entity_retriever_instructions(entity_labels): """The default instructions for the entity retriever""" return f""" Your task is to retrive entities among the following entity labels: {entity_labels}. First, decide step-by-step which entity label you need, then describe the entities you are looking for. The `similarity search` field should be a short description of the entities to match. """.strip() ``` ## `TripletRetriever` Bases: `Module` Retrieve triplets using a hybrid neuro-symbolic approach. Unlike the Text2Cypher approach, this retriever is 100% guaranteed to generate a valid Cypher query **every time**. It doesn't need to have the graph schema in the prompt, thus helping the language models by avoiding prompt confusion, because the nodes and relation labels are enforced dynamically using **constrained structured output** (similar to the `Decision` module). It works by using the language model to infer the subject, object and relation labels to search for, along with a similarity search field for the object and subject triplets. These parameters are then used to *programmatically create a valid Cypher query*. This approach not only ensures the syntactical correctness of the Cypher query but also protects the graph database from Cypher injections that could arise from an adversarial prompt injection. ``` import synalinks import asyncio from typing import Literal class Query(synalinks.DataModel): query: str = synalinks.Field( description="The user query", ) class Answer(synalinks.DataModel): query: str = synalinks.Field( description="The answer to the user query", ) class Country(synalinks.Entity): label: Literal["Country"] name: str = synalinks.Field( description="The country's name", ) class City(synalinks.Entity): label: Literal["City"] name: str = synalinks.Field( description="The city's name", ) class IsCapitalOf(synalinks.Relation): subj: City label: Literal["IsCapitalOf"] obj: Country class IsCityOf(synalinks.Relation): subj: City label: Literal["IsCityOf"] obj: Country language_model = synalinks.LanguageModel( model="ollama/mistral", ) embedding_model = synalinks.EmbeddingModel( model="ollama/mxbai-embed-large" ) knowledge_base = synalinks.KnowledgeBase( uri="neo4j://localhost:7687", entity_models=[Country, City], relation_models=[IsCapitalOf, IsCityOf], embedding_model=embedding_model, metric="cosine", ) async def main(): inputs = synalinks.Input(data_model=Query) x = await synalinks.KnowledgeRetriever( entity_models=[Country, City], relation_models=[IsCityOf, IsCapitalOf] language_model=language_model, knowledge_base=knowledge_base, )(inputs) outputs = await synalinks.Generator( data_model=Answer, language_model=language_model, )(x) program = synalinks.Program( inputs=inputs, outputs=outputs, name="kag_program", description="A simple KAG program", ) if __name__ == "__main__": asyncio.run(main()) ``` Parameters: | Name | Type | Description | Default | | -------------------- | --------------- | -------------------------------------------------------------------------------------------------------------------------------------------------- | ------- | | `knowledge_base` | `KnowledgeBase` | The knowledge base to use. | `None` | | `language_model` | `LanguageModel` | The language model to use. | `None` | | `entity_models` | `list` | The list of entities models to search for being a list of Entity data models. | `None` | | `relation_models` | `list` | The list of relations models to seach for. being a list of Relation data models. | `None` | | `k` | `int` | Maximum number of similar entities to return (Defaults to 10). | `10` | | `threshold` | `float` | Minimum similarity score for results. Entities with similarity below this threshold are excluded. Should be between 0.0 and 1.0 (Defaults to 0.5). | `0.5` | | `prompt_template` | `str` | The default jinja2 prompt template to use (see Generator). | `None` | | `examples` | `list` | The default list of examples, the examples are a list of tuples containing input/output JSON pairs. | `None` | | `instructions` | `str` | The default instructions being a string containing instructions for the language model. | `None` | | `seed_instructions` | `list` | Optional. A list of instructions to use as seed for the optimization. If not provided, use the default instructions as seed. | `None` | | `temperature` | `float` | Optional. The temperature for the LM call. | `0.0` | | `use_inputs_schema` | `bool` | Optional. Whether or not use the inputs schema in the prompt (Default to False) (see Generator). | `False` | | `use_outputs_schema` | `bool` | Optional. Whether or not use the outputs schema in the prompt (Default to False) (see Generator). | `False` | | `return_inputs` | `bool` | Optional. Whether or not to concatenate the inputs to the outputs (Default to True). | `True` | | `return_query` | `bool` | Optional. Whether or not to concatenate the search query to the outputs (Default to True). | `True` | | `name` | `str` | Optional. The name of the module. | `None` | | `description` | `str` | Optional. The description of the module. | `None` | | `trainable` | `bool` | Whether the module's variables should be trainable. | `True` | Source code in `synalinks/src/modules/retrievers/triplet_retriever.py` ```` @synalinks_export( [ "synalinks.modules.TripletRetriever", "synalinks.TripletRetriever", ] ) class TripletRetriever(Module): """Retrieve triplets using a hybrid neuro-symbolic approach. Unlike the Text2Cypher approach, this retriever is 100% guaranteed to generate a valid Cypher query **every time**. It doesn't need to have the graph schema in the prompt, thus helping the language models by avoiding prompt confusion, because the nodes and relation labels are enforced dynamically using **constrained structured output** (similar to the `Decision` module). It works by using the language model to infer the subject, object and relation labels to search for, along with a similarity search field for the object and subject triplets. These parameters are then used to *programmatically create a valid Cypher query*. This approach not only ensures the syntactical correctness of the Cypher query but also protects the graph database from Cypher injections that could arise from an adversarial prompt injection. ```python import synalinks import asyncio from typing import Literal class Query(synalinks.DataModel): query: str = synalinks.Field( description="The user query", ) class Answer(synalinks.DataModel): query: str = synalinks.Field( description="The answer to the user query", ) class Country(synalinks.Entity): label: Literal["Country"] name: str = synalinks.Field( description="The country's name", ) class City(synalinks.Entity): label: Literal["City"] name: str = synalinks.Field( description="The city's name", ) class IsCapitalOf(synalinks.Relation): subj: City label: Literal["IsCapitalOf"] obj: Country class IsCityOf(synalinks.Relation): subj: City label: Literal["IsCityOf"] obj: Country language_model = synalinks.LanguageModel( model="ollama/mistral", ) embedding_model = synalinks.EmbeddingModel( model="ollama/mxbai-embed-large" ) knowledge_base = synalinks.KnowledgeBase( uri="neo4j://localhost:7687", entity_models=[Country, City], relation_models=[IsCapitalOf, IsCityOf], embedding_model=embedding_model, metric="cosine", ) async def main(): inputs = synalinks.Input(data_model=Query) x = await synalinks.KnowledgeRetriever( entity_models=[Country, City], relation_models=[IsCityOf, IsCapitalOf] language_model=language_model, knowledge_base=knowledge_base, )(inputs) outputs = await synalinks.Generator( data_model=Answer, language_model=language_model, )(x) program = synalinks.Program( inputs=inputs, outputs=outputs, name="kag_program", description="A simple KAG program", ) if __name__ == "__main__": asyncio.run(main()) ``` Args: knowledge_base (KnowledgeBase): The knowledge base to use. language_model (LanguageModel): The language model to use. entity_models (list): The list of entities models to search for being a list of `Entity` data models. relation_models (list): The list of relations models to seach for. being a list of `Relation` data models. k (int): Maximum number of similar entities to return (Defaults to 10). threshold (float): Minimum similarity score for results. Entities with similarity below this threshold are excluded. Should be between 0.0 and 1.0 (Defaults to 0.5). prompt_template (str): The default jinja2 prompt template to use (see `Generator`). examples (list): The default list of examples, the examples are a list of tuples containing input/output JSON pairs. instructions (str): The default instructions being a string containing instructions for the language model. seed_instructions (list): Optional. A list of instructions to use as seed for the optimization. If not provided, use the default instructions as seed. temperature (float): Optional. The temperature for the LM call. use_inputs_schema (bool): Optional. Whether or not use the inputs schema in the prompt (Default to False) (see `Generator`). use_outputs_schema (bool): Optional. Whether or not use the outputs schema in the prompt (Default to False) (see `Generator`). return_inputs (bool): Optional. Whether or not to concatenate the inputs to the outputs (Default to True). return_query (bool): Optional. Whether or not to concatenate the search query to the outputs (Default to True). name (str): Optional. The name of the module. description (str): Optional. The description of the module. trainable (bool): Whether the module's variables should be trainable. """ def __init__( self, knowledge_base=None, language_model=None, entity_models=None, relation_models=None, k=10, threshold=0.5, prompt_template=None, examples=None, instructions=None, seed_instructions=None, temperature=0.0, use_inputs_schema=False, use_outputs_schema=False, return_inputs=True, return_query=True, name=None, description=None, trainable=True, ): super().__init__( name=name, description=description, trainable=trainable, ) self.knowledge_base = knowledge_base self.language_model = language_model self.k = k self.threshold = threshold self.prompt_template = prompt_template self.examples = examples if entity_models: node_labels = [ entity_model.get_schema().get("title") for entity_model in entity_models ] else: node_labels = [] if relation_models: relation_labels = [ relation_model.get_schema().get("title") for relation_model in relation_models ] else: relation_labels = [] if not instructions: instructions = default_triplet_retriever_instructions( node_labels, relation_labels ) self.instructions = instructions self.seed_instructions = seed_instructions self.temperature = temperature self.use_inputs_schema = use_inputs_schema self.use_outputs_schema = use_outputs_schema self.return_inputs = return_inputs self.return_query = return_query self.schema = TripletSearch.get_schema() self.schema = dynamic_enum( schema=self.schema, prop_to_update="subject_label", labels=node_labels, description="The subject label to match", ) self.schema = dynamic_enum( schema=self.schema, prop_to_update="relation_label", labels=relation_labels, description="The relation label to match", ) self.schema = dynamic_enum( schema=self.schema, prop_to_update="object_label", labels=node_labels, description="The object label to match", ) self.query_generator = ChainOfThought( schema=self.schema, language_model=self.language_model, prompt_template=self.prompt_template, examples=self.examples, instructions=self.instructions, seed_instructions=self.seed_instructions, temperature=self.temperature, use_inputs_schema=self.use_inputs_schema, use_outputs_schema=self.use_outputs_schema, return_inputs=False, name="query_generator_" + self.name, ) async def call(self, inputs, training=False): if not inputs: return None triplet_search_query = await self.query_generator( inputs, training=training, ) if self.return_inputs: if self.return_query: return await ops.logical_and( inputs, await ops.logical_and( triplet_search_query, await ops.triplet_search( triplet_search_query, knowledge_base=self.knowledge_base, k=self.k, threshold=self.threshold, name="similarity_search_" + self.name, ), name="similarity_search_with_query_and_inputs_" + self.name, ), ) else: return await ops.logical_and( inputs, await ops.triplet_search( triplet_search_query, knowledge_base=self.knowledge_base, k=self.k, threshold=self.threshold, name="similarity_search_" + self.name, ), name="similarity_search_with_inputs_" + self.name, ) else: if self.return_query: return await ops.logical_and( triplet_search_query, await ops.triplet_search( triplet_search_query, knowledge_base=self.knowledge_base, k=self.k, threshold=self.threshold, name="similarity_search_" + self.name, ), name="similarity_search_with_query_" + self.name, ) else: return await ops.triplet_search( triplet_search_query, knowledge_base=self.knowledge_base, k=self.k, threshold=self.threshold, name="similarity_search_" + self.name, ) def get_config(self): config = { "k": self.question, "threshold": self.labels, "prompt_template": self.prompt_template, "examples": self.examples, "instructions": self.instructions, "seed_instructions": self.seed_instructions, "temperature": self.temperature, "use_inputs_schema": self.use_inputs_schema, "use_outputs_schema": self.use_outputs_schema, "return_inputs": self.return_inputs, "return_query": self.return_query, "name": self.name, "description": self.description, "trainable": self.trainable, } knowledge_base_config = { "knowledge_base": serialization_lib.serialize_synalinks_object( self.knowledge_base, ) } language_model_config = { "language_model": serialization_lib.serialize_synalinks_object( self.language_model, ) } entity_models_config = { "entity_models": [ ( serialization_lib.serialize_synalinks_object( entity_model.to_symbolic_data_model( name="entity_model" + (f"_{i}_" if i > 0 else "_") + self.name ) ) if not is_symbolic_data_model(entity_model) else serialization_lib.serialize_synalinks_object(entity_model) ) for i, entity_model in enumerate(self.entity_models) ] } relation_models_config = { "relation_models": [ ( serialization_lib.serialize_synalinks_object( relation_model.to_symbolic_data_model( name="relation_model" + (f"_{i}_" if i > 0 else "_") + self.name ) ) if not is_symbolic_data_model(relation_model) else serialization_lib.serialize_synalinks_object(relation_model) ) for i, relation_model in enumerate(self.relation_models) ] } return { **config, **knowledge_base_config, **language_model_config, **entity_models_config, **relation_models_config, } @classmethod def from_config(cls, config): knowledge_base = serialization_lib.deserialize_synalinks_object( config.pop("knowledge_base"), ) language_model = serialization_lib.deserialize_synalinks_object( config.pop("language_model"), ) entity_models_config = config.pop("entity_models") entity_models = [ serialization_lib.deserialize_synalinks_object(entity_model) for entity_model in entity_models_config ] relation_models_config = config.pop("relation_models") relation_models = [ serialization_lib.deserialize_synalinks_object(relation_model) for relation_model in relation_models_config ] return cls( knowledge_base=knowledge_base, entity_models=entity_models, relation_models=relation_models, language_model=teacher_language_model, **config, ) ```` ## `default_triplet_retriever_instructions(entity_labels, relation_labels)` The default instructions for the triplet retriever Source code in `synalinks/src/modules/retrievers/triplet_retriever.py` ``` def default_triplet_retriever_instructions(entity_labels, relation_labels): """The default instructions for the triplet retriever""" return f""" Your task is to retrive triplets among the following entity labels: {entity_labels} and relation labels: {relation_labels}. Think about the triplet you are looking for, which relation label do you need, then the subject and object label. The similarity search parameters should be a short natural language string describing the entities to match. Remember to replace the similarity search with `?` for the entity you are looking for. """.strip() ``` ## `PythonScript` Bases: `Trainable` The python code to transform a JSON object into another JSON object Source code in `synalinks/src/modules/synthesis/python_synthesis.py` ``` class PythonScript(Trainable): """The python code to transform a JSON object into another JSON object""" python_script: str = Field( description="The python script to transform a JSON object into another object" ) ``` ## `PythonSynthesis` Bases: `Module` A code Python code transformation on JSON data. **Note**: This module is **NOT** completly safe (yet) for business applications. Its is only provided for reseach purposes on program synthesis. Altought the code don't evolve during inference, so it can't be prompt injected. This module features a python code as trainable variable, allowing the optimizers to refine the code during the training loop based on iterative feedback and automatic selection of the best script. This module works **ONLY** with advanced optimizers (**NOT** the `RandomFewShot` optimizer). The module executes the entire Python script and expects the result to be stored in a variable named 'result' at the end of execution. Example: ``` import synalinks import asyncio default_python_script = \ """ def transform(inputs): # TODO implement the code to transform the input grid into the output grid return {"output_grid": inputs.get("input_grid")} result = transform(inputs) """ async def main(): inputs = synalinks.Input( data_model=synalinks.datasets.arcagi.get_input_data_model(), ) outputs = await synalinks.PythonSynthesis( data_model=synalinks.datasets.arcagi.get_output_data_model() python_script=default_python_script, default_return_value={"output_grid": [[]]}, )(inputs) program = synalinks.Program( inputs=inputs, outputs=outputs, name="python_script_synthesis", description="A program to solve ARCAGI with python code", ) ``` If you want to explore the future of neuro-symbolic self-evolving systems, contact us. While these systems are not "hard" to code thanks to Synalinks, they requires technical knowledge and a deep understanding of multiple AI paradigm. Parameters: | Name | Type | Description | Default | | ---------------------- | ----------- | ----------------------------------------------------------------------------------------------------------------------------------- | --------------- | | `schema` | `dict` | The target JSON schema. If not provided use the data_model to infer it. | `None` | | `data_model` | \`DataModel | SymbolicDataModel | JsonDataModel\` | | `python_script` | `str` | The default Python script. | `None` | | `seed_scripts` | `list` | Optional. A list of Python scripts to use as seed for the evolution. If not provided, create a seed from the default configuration. | `None` | | `default_return_value` | `dict` | Default return value. | `None` | | `timeout` | `int` | Maximum execution time in seconds. (Default 5 seconds). | `5` | | `name` | `str` | Optional. The name of the module. | `None` | | `description` | `str` | Optional. The description of the module. | `None` | | `trainable` | `bool` | Whether the module's variables should be trainable. | `True` | Source code in `synalinks/src/modules/synthesis/python_synthesis.py` ```` @synalinks_export( [ "synalinks.modules.PythonSynthesis", "synalinks.PythonSynthesis", ] ) class PythonSynthesis(Module): """A code Python code transformation on JSON data. **Note**: This module is **NOT** completly safe (yet) for business applications. Its is only provided for reseach purposes on program synthesis. Altought the code don't evolve during inference, so it can't be prompt injected. This module features a python code as trainable variable, allowing the optimizers to refine the code during the training loop based on iterative feedback and automatic selection of the best script. This module works **ONLY** with advanced optimizers (**NOT** the `RandomFewShot` optimizer). The module executes the entire Python script and expects the result to be stored in a variable named 'result' at the end of execution. Example: ```python import synalinks import asyncio default_python_script = \\ \"\"\" def transform(inputs): # TODO implement the code to transform the input grid into the output grid return {"output_grid": inputs.get("input_grid")} result = transform(inputs) \"\"\" async def main(): inputs = synalinks.Input( data_model=synalinks.datasets.arcagi.get_input_data_model(), ) outputs = await synalinks.PythonSynthesis( data_model=synalinks.datasets.arcagi.get_output_data_model() python_script=default_python_script, default_return_value={"output_grid": [[]]}, )(inputs) program = synalinks.Program( inputs=inputs, outputs=outputs, name="python_script_synthesis", description="A program to solve ARCAGI with python code", ) ``` If you want to explore the future of neuro-symbolic self-evolving systems, contact us. While these systems are not "hard" to code thanks to Synalinks, they requires technical knowledge and a deep understanding of multiple AI paradigm. Args: schema (dict): The target JSON schema. If not provided use the `data_model` to infer it. data_model (DataModel | SymbolicDataModel | JsonDataModel): The target data model for structured output. python_script (str): The default Python script. seed_scripts (list): Optional. A list of Python scripts to use as seed for the evolution. If not provided, create a seed from the default configuration. default_return_value (dict): Default return value. timeout (int): Maximum execution time in seconds. (Default 5 seconds). name (str): Optional. The name of the module. description (str): Optional. The description of the module. trainable (bool): Whether the module's variables should be trainable. """ def __init__( self, schema=None, data_model=None, python_script=None, seed_scripts=None, default_return_value=None, timeout=5, sandbox=False, name=None, description=None, trainable=True, ): super().__init__( name=name, description=description, trainable=trainable, ) if not schema and data_model: schema = data_model.get_schema() self.schema = schema if not python_script: raise ValueError("You should provide the `python_script` argument") self.python_script = python_script if not default_return_value: raise ValueError("You should provide the `default_return_value` argument") try: jsonschema.validate(default_return_value, self.schema) except ValidationError as e: raise ValueError( f"`default_return_value` parameter does not conform to schema: {e}" ) self.default_return_value = default_return_value self.timeout = timeout if not seed_scripts: seed_scripts = [] self.seed_scripts = seed_scripts seed_candidates = [ {"python_script": seed_script} for seed_script in self.seed_scripts ] self.state = self.add_variable( initializer=PythonScript( python_script=self.python_script, seed_candidates=seed_candidates, ).get_json(), data_model=PythonScript, name="state_" + self.name, ) async def execute(self, inputs, python_script): """Execute the Python script with timeout using multiprocessing. """ result_queue = Queue() process = Process( target=_execute_script_in_process, args=(python_script, inputs.get_json(), self.schema, result_queue) ) process.start() start_time = asyncio.get_event_loop().time() timeout_remaining = self.timeout while process.is_alive() and timeout_remaining > 0: await asyncio.sleep(0.1) elapsed = asyncio.get_event_loop().time() - start_time timeout_remaining = self.timeout - elapsed if process.is_alive(): process.terminate() process.join(timeout=1) if process.is_alive(): process.kill() process.join() return None, "", f"Timeout Error: Script execution exceeded {self.timeout} second(s)\n" process.join() if not result_queue.empty(): result, stdout, stderr = result_queue.get() return result, stdout, stderr else: return None, "", "Error: Process terminated unexpectedly\n" async def call(self, inputs, training=False): if not inputs: return None python_script = self.state.get("python_script") result, stdout, stderr = await self.execute(inputs, python_script) if training: predictions = self.state.get("current_predictions") if result: predictions.append( { "inputs": { **inputs.get_json(), }, "outputs": { **result, "stdout": stdout, "stderr": stderr, }, "reward": None, } ) else: predictions.append( { "inputs": { **inputs.get_json(), }, "outputs": { "stdout": stdout, "stderr": stderr, }, "reward": None, } ) if result: return JsonDataModel( json={ **result, "stdout": stdout, "stderr": stderr, }, schema=self.schema, name=self.name, ) else: return JsonDataModel( json={ **self.default_return_value, "stdout": stdout, "stderr": stderr, }, schema=self.schema, name=self.name, ) async def compute_output_spec(self, inputs, training=False): return await ops.concat( SymbolicDataModel(schema=self.schema), PythonConsoleLog, name=self.name, ) def get_config(self): config = { "schema": self.schema, "python_script": self.python_script, "seed_scripts": self.seed_scripts, "default_return_value": self.default_return_value, "name": self.name, "description": self.description, "trainable": self.trainable, } return config @classmethod def from_config(cls, config): return cls(**config) ```` ### `execute(inputs, python_script)` Execute the Python script with timeout using multiprocessing. Source code in `synalinks/src/modules/synthesis/python_synthesis.py` ``` async def execute(self, inputs, python_script): """Execute the Python script with timeout using multiprocessing. """ result_queue = Queue() process = Process( target=_execute_script_in_process, args=(python_script, inputs.get_json(), self.schema, result_queue) ) process.start() start_time = asyncio.get_event_loop().time() timeout_remaining = self.timeout while process.is_alive() and timeout_remaining > 0: await asyncio.sleep(0.1) elapsed = asyncio.get_event_loop().time() - start_time timeout_remaining = self.timeout - elapsed if process.is_alive(): process.terminate() process.join(timeout=1) if process.is_alive(): process.kill() process.join() return None, "", f"Timeout Error: Script execution exceeded {self.timeout} second(s)\n" process.join() if not result_queue.empty(): result, stdout, stderr = result_queue.get() return result, stdout, stderr else: return None, "", "Error: Process terminated unexpectedly\n" ``` ## `TimeoutException` Bases: `Exception` Exception raised when script execution times out Source code in `synalinks/src/modules/synthesis/python_synthesis.py` ``` class TimeoutException(Exception): """Exception raised when script execution times out""" pass ``` ## `SequentialPlan` Bases: `Trainable` The sequential step by step plan to achieve the task Source code in `synalinks/src/modules/synthesis/sequential_plan_synthesis.py` ``` class SequentialPlan(Trainable): """The sequential step by step plan to achieve the task""" steps: List[str] = Field( description="The list of steps", ) ``` ## `SequentialPlanSynthesis` Bases: `Module` A module that executes a sequential plan of steps. This module features a sequential plan as a trainable variable, allowing optimizers to refine the plan during the training loop based on iterative feedback. Basically learning to plan based on iterative feedback and automatic selection of the best plan. The module executes each step in the plan sequentially, passing the output of each step as input to the next step. The runner is responsible for executing each individual step. The most common runners are usually a `FunctionCallingAgent`, `ChainOfThought` or `Generator` module, but you can use any Module or Program. This module start by defaut without any plan, so it is equivalent to a single runner call. This module works **ONLY** with advanced optimizers (**NOT** the `RandomFewShot` optimizer). **Note**: The inputs are forwarded to the runner each time by concatenating the inputs with the previous steps outputs. So **ensure that the runner doesn't returns the inputs**, use `return_inputs=False` or `return_inputs_with_trajectory=False` when configuring your runner. Example: ``` import synalinks import asyncio class Query(synalinks.DataModel): query: str = synalinks.Field( description="The user query", ) class FinalReport(synalinks.DataModel): report: str = synalinks.Field( description="The final report", ) class TaskSummary(synalinks.DataModel): summary: str = synalinks.Field( description="The summary of the executed task", ) async def main(): tools = # ... tools definition (see `FunctionCallingAgent`) inputs = synalinks.Input(data_model=Query) outputs = await synalinks.SequentialPlanSynthesis( data_model=FinalReport, language_model=language_model, runner=synalinks.FunctionCallingAgent( data_model=TaskSummary, language_model=language_model, tools=tools, return_inputs_with_trajectory=False, ), )(inputs) program = synalinks.Program( inputs=inputs, outputs=outputs, name="planner_agent", description="An agent that learn a step by step plan to achieve a task", ) ``` Parameters: | Name | Type | Description | Default | | ---------------- | --------------- | -------------------------------------------------------------------------------------------------------------- | --------------------------------------------- | | `schema` | `dict` | The target JSON schema. If not provided use the data_model to infer it. | `None` | | `data_model` | \`DataModel | SymbolicDataModel | JsonDataModel\` | | `language_model` | `LanguageModel` | The language model to use. | `None` | | `steps` | `list` | Optional. The default list of steps being a list of strings. | `None` | | `seed_steps` | `list` | Optional. A list of steps to use as seed for the optimization. If not provided, use the default steps as seed. | `None` | | `runner` | \`Module | Program\` | Required. The runner that executes each step. | | `return_inputs` | `bool` | Optional. Whether or not to concatenate the inputs to the outputs (Default to False). | `True` | | `name` | `str` | Optional. The name of the module. | `None` | | `description` | `str` | Optional. The description of the module. | `None` | | `trainable` | `bool` | Whether the module's variables should be trainable. | `True` | Source code in `synalinks/src/modules/synthesis/sequential_plan_synthesis.py` ```` class SequentialPlanSynthesis(Module): """A module that executes a sequential plan of steps. This module features a sequential plan as a trainable variable, allowing optimizers to refine the plan during the training loop based on iterative feedback. Basically learning to plan based on iterative feedback and automatic selection of the best plan. The module executes each step in the plan sequentially, passing the output of each step as input to the next step. The runner is responsible for executing each individual step. The most common runners are usually a `FunctionCallingAgent`, `ChainOfThought` or `Generator` module, but you can use any Module or Program. This module start by defaut without any plan, so it is equivalent to a single runner call. This module works **ONLY** with advanced optimizers (**NOT** the `RandomFewShot` optimizer). **Note**: The inputs are forwarded to the runner each time by concatenating the inputs with the previous steps outputs. So **ensure that the runner doesn't returns the inputs**, use `return_inputs=False` or `return_inputs_with_trajectory=False` when configuring your runner. Example: ```python import synalinks import asyncio class Query(synalinks.DataModel): query: str = synalinks.Field( description="The user query", ) class FinalReport(synalinks.DataModel): report: str = synalinks.Field( description="The final report", ) class TaskSummary(synalinks.DataModel): summary: str = synalinks.Field( description="The summary of the executed task", ) async def main(): tools = # ... tools definition (see `FunctionCallingAgent`) inputs = synalinks.Input(data_model=Query) outputs = await synalinks.SequentialPlanSynthesis( data_model=FinalReport, language_model=language_model, runner=synalinks.FunctionCallingAgent( data_model=TaskSummary, language_model=language_model, tools=tools, return_inputs_with_trajectory=False, ), )(inputs) program = synalinks.Program( inputs=inputs, outputs=outputs, name="planner_agent", description="An agent that learn a step by step plan to achieve a task", ) ``` Args: schema (dict): The target JSON schema. If not provided use the `data_model` to infer it. data_model (DataModel | SymbolicDataModel | JsonDataModel): The target data model for structured output. language_model (LanguageModel): The language model to use. steps (list): Optional. The default list of steps being a list of strings. seed_steps (list): Optional. A list of steps to use as seed for the optimization. If not provided, use the default steps as seed. runner (Module | Program): Required. The runner that executes each step. return_inputs (bool): Optional. Whether or not to concatenate the inputs to the outputs (Default to False). name (str): Optional. The name of the module. description (str): Optional. The description of the module. trainable (bool): Whether the module's variables should be trainable. """ def __init__( self, schema=None, data_model=None, language_model=None, steps=None, seed_steps=None, runner=None, return_inputs=True, name=None, description=None, trainable=True, ): super().__init__( name=name, description=description, trainable=trainable, ) if not schema and data_model: schema = data_model.get_schema() self.schema = schema if not steps: steps = [] self.steps = steps if not seed_steps: seed_steps = [[]] self.seed_steps = seed_steps if not runner: raise ValueError("The `runner` parameter is required.") if not isinstance(runner, Module): raise ValueError("The `runner` parameter should be a `Module` or `Program`.") self.language_model = language_model self.runner = runner self.return_inputs = return_inputs self.state = self.add_variable( initializer=SequentialPlan( steps=self.steps, seed_candidates=self.seed_steps, ).get_json(), data_model=SequentialPlan, name="state"+self.name, ) self.final_generator = ChainOfThought( schema=self.schema, language_model=self.language_model, return_inputs=self.return_inputs, name="final_generator_"+self.name, ) async def call(self, inputs, training=False): if not inputs: return None steps = self.state.get("steps") previous_steps = None if steps: for i, step in enumerate(steps): step_result = await self.runner(inputs, training=training) if not previous_steps: previous_steps = step_result else: previous_steps = await ops.concat( previous_steps, step_result, name=+f"step_{i}_with_inputs"+self.name, ) inputs = await ops.concat( inputs, await ops.concat( previous_steps, Step(step=step), name=f"step_{i}_"+self.name, ), name=f"step_{i}_with_inputs_"+self.name, ) else: result = await self.runner(inputs, training=training) inputs = await ops.concat( inputs, result, name="with_inputs_"+self.name, ) return await self.final_generator(inputs, training=training) async def compute_output_spec(self, inputs, training=False): _ = await self.runner(inputs) return await self.final_generator(inputs) def get_config(self): config = { "schema": self.schema, "steps": self.steps, "seed_steps": self.seed_steps, "return_inputs": self.return_inputs, "name": self.name, "description": self.description, "trainable": self.trainable, } language_model_config = { "language_model": serialization_lib.serialize_synalinks_object( self.language_model, ) } runner_config = { "runner": serialization_lib.serialize_synalinks_object( self.runner, ) } return { **config, **language_model_config, **runner_config, } @classmethod def from_config(cls, config): language_model = serialization_lib.deserialize_synalinks_object( config.pop("language_model"), ) runner = serialization_lib.deserialize_synalinks_object( config.pop("runner"), ) return cls( language_model=language_model, runner=runner, **config, ) ```` ## `Step` Bases: `DataModel` The individual step to execute Source code in `synalinks/src/modules/synthesis/sequential_plan_synthesis.py` ``` class Step(DataModel): """The individual step to execute""" step: str = Field( description="The step to execute", ) ``` ## `ChainOfThought` Bases: `Module` Useful to answer in a step by step manner. This component concatenate thinking fields to your data model/schema and generate a prediction allowing the LM to think step by step before answering. The parameter K specify the number of thinking fields to add (Default to 1). Example: ``` import synalink import asyncio class Query(synalinks.DataModel): query: str = synalinks.Field( description="The user query", ) class Answer(synalinks.DataModel): answer: str = synalinks.Field( description="The correct answer", ) async def main(): language_model = synalinks.LanguageModel( model="ollama/mistral", ) x0 = synalinks.Input(data_model=Query) x1 = await synalinks.ChainOfThought( data_model=Answer, language_model=language_model, k=3, )(x0) program = synalinks.Program( inputs=x0, outputs=x1, name="answer_with_chain_of_thought", description="Useful to answer step by step", ) if __name__ == "__main__": asyncio.run(main()) ``` References - [Chain-of-Thought Prompting Elicits Reasoning in Large Language Models](https://arxiv.org/abs/2201.11903) Parameters: | Name | Type | Description | Default | | -------------------- | --------------- | ---------------------------------------------------------------------------------------------------------------------------- | --------------- | | `schema` | `dict` | The target JSON schema. If not provided use the data_model to infer it. | `None` | | `data_model` | \`DataModel | SymbolicDataModel | JsonDataModel\` | | `language_model` | `LanguageModel` | The language model to use. | `None` | | `prompt_template` | `str` | The jinja2 prompt template (see Generator). | `None` | | `examples` | `list` | The default list of examples, the examples are a list of tuples containing input/output JSON pairs. | `None` | | `instructions` | `str` | The default instructions being a string containing instructions for the language model. | `None` | | `seed_instructions` | `list` | Optional. A list of instructions to use as seed for the optimization. If not provided, use the default instructions as seed. | `None` | | `temperature` | `float` | Optional. The temperature for the LM call. | `0.0` | | `use_inputs_schema` | `bool` | Optional. Whether or not use the inputs schema in the prompt (Default to False) (see Generator). | `False` | | `use_outputs_schema` | `bool` | Optional. Whether or not use the outputs schema in the prompt (Default to False) (see Generator). | `False` | | `k` | `int` | The number of thinking fields to add. | `1` | | `return_inputs` | `bool` | Optional. Whether or not to concatenate the inputs to the outputs (Default to False) (see Generator). | `False` | | `name` | `str` | Optional. The name of the module. | `None` | | `description` | `str` | Optional. The description of the module. | `None` | | `trainable` | `bool` | Whether the module's variables should be trainable. | `True` | Source code in `synalinks/src/modules/ttc/chain_of_thought.py` ```` @synalinks_export( [ "synalinks.modules.ChainOfThought", "synalinks.ChainOfThought", ] ) class ChainOfThought(Module): """Useful to answer in a step by step manner. This component concatenate thinking fields to your data model/schema and generate a prediction allowing the LM to think step by step before answering. The parameter K specify the number of thinking fields to add (Default to 1). Example: ```python import synalink import asyncio class Query(synalinks.DataModel): query: str = synalinks.Field( description="The user query", ) class Answer(synalinks.DataModel): answer: str = synalinks.Field( description="The correct answer", ) async def main(): language_model = synalinks.LanguageModel( model="ollama/mistral", ) x0 = synalinks.Input(data_model=Query) x1 = await synalinks.ChainOfThought( data_model=Answer, language_model=language_model, k=3, )(x0) program = synalinks.Program( inputs=x0, outputs=x1, name="answer_with_chain_of_thought", description="Useful to answer step by step", ) if __name__ == "__main__": asyncio.run(main()) ``` References: - [Chain-of-Thought Prompting Elicits Reasoning in Large Language Models](https://arxiv.org/abs/2201.11903) Args: schema (dict): The target JSON schema. If not provided use the `data_model` to infer it. data_model (DataModel | SymbolicDataModel | JsonDataModel): The target data model. language_model (LanguageModel): The language model to use. prompt_template (str): The jinja2 prompt template (see `Generator`). examples (list): The default list of examples, the examples are a list of tuples containing input/output JSON pairs. instructions (str): The default instructions being a string containing instructions for the language model. seed_instructions (list): Optional. A list of instructions to use as seed for the optimization. If not provided, use the default instructions as seed. temperature (float): Optional. The temperature for the LM call. use_inputs_schema (bool): Optional. Whether or not use the inputs schema in the prompt (Default to False) (see `Generator`). use_outputs_schema (bool): Optional. Whether or not use the outputs schema in the prompt (Default to False) (see `Generator`). k (int): The number of thinking fields to add. return_inputs (bool): Optional. Whether or not to concatenate the inputs to the outputs (Default to False) (see `Generator`). name (str): Optional. The name of the module. description (str): Optional. The description of the module. trainable (bool): Whether the module's variables should be trainable. """ def __init__( self, schema=None, data_model=None, language_model=None, prompt_template=None, examples=None, instructions=None, seed_instructions=None, temperature=0.0, use_inputs_schema=False, use_outputs_schema=False, k=1, return_inputs=False, name=None, description=None, trainable=True, ): super().__init__( name=name, description=description, trainable=trainable, ) if not schema and data_model: schema = data_model.get_schema() self.schema = schema self.language_model = language_model self.prompt_template = prompt_template self.examples = examples self.instructions = instructions self.seed_instructions = seed_instructions self.temperature = temperature self.use_inputs_schema = use_inputs_schema self.use_outputs_schema = use_outputs_schema self.return_inputs = return_inputs self.k = k thinking_data_model = Thinking if k > 1: for _ in range(k - 1): thinking_data_model += Thinking final_data_model = thinking_data_model + SymbolicDataModel(schema=self.schema) self.generator = Generator( data_model=final_data_model, language_model=self.language_model, prompt_template=self.prompt_template, examples=self.examples, instructions=self.instructions, seed_instructions=self.seed_instructions, temperature=self.temperature, use_inputs_schema=self.use_inputs_schema, use_outputs_schema=self.use_outputs_schema, return_inputs=self.return_inputs, name="generator_"+self.name, ) async def call(self, inputs, training=False): return await self.generator(inputs, training=training) def get_config(self): config = { "schema": self.schema, "prompt_template": self.prompt_template, "examples": self.examples, "instructions": self.instructions, "seed_instructions": self.seed_instructions, "temperature": self.temperature, "use_inputs_schema": self.use_inputs_schema, "use_outputs_schema": self.use_outputs_schema, "return_inputs": self.return_inputs, "k": self.k, "name": self.name, "description": self.description, "trainable": self.trainable, } language_model_config = { "language_model": serialization_lib.serialize_synalinks_object( self.language_model, ) } return { **config, **language_model_config, } @classmethod def from_config(cls, config): language_model = serialization_lib.deserialize_synalinks_object( config.pop("language_model"), ) return cls( language_model=language_model, **config, ) ```` ## `SelfCritique` Bases: `Module` Useful to critique the given inputs. This component critique the inputs given and eventually generate an intermediate reward between 0.0 and 1.0. You can enable or disable the intermediate reward computation by using the `return_reward` flag (default to True). To have more accurate results, ensure that the inputs are provided along with the output to evaluate using `return_inputs` in your modules. Example: ``` import synalink import asyncio class Query(synalinks.DataModel): query: str = synalinks.Field( description="The user query", ) class Answer(synalinks.DataModel): answer: str = synalinks.Field( description="The correct answer", ) async def main(): language_model = synalinks.LanguageModel( model="ollama/mistral", ) x0 = synalinks.Input(data_model=Query) x1 = await synalinks.ChainOfThought( data_model=Answer, language_model=language_model, return_inputs=True, )(x0) x2 = await synalinks.SelfCritique( language_model=language_model, )(x1) program = synalinks.Program( inputs=x0, outputs=x2, name="answer_with_cot_and_self_critique", description="Useful to answer accurately", ) if __name__ == "__main__": asyncio.run(main()) ``` Parameters: | Name | Type | Description | Default | | -------------------- | --------------- | ---------------------------------------------------------------------------------------------------------------------------- | ------- | | `language_model` | `LanguageModel` | The language model to use. | `None` | | `prompt_template` | `str` | The jinja2 prompt template (see Generator). | `None` | | `examples` | `list` | The default list of examples, the examples are a list of tuples containing input/output JSON pairs. | `None` | | `instructions` | `str` | The default instructions being a string containing instructions for the language model. | `None` | | `seed_instructions` | `list` | Optional. A list of instructions to use as seed for the optimization. If not provided, use the default instructions as seed. | `None` | | `temperature` | `float` | Optional. The temperature for the LM call. | `0.0` | | `use_inputs_schema` | `bool` | Optional. Whether or not use the inputs schema in the prompt (Default to False) (see Generator). | `False` | | `use_outputs_schema` | `bool` | Optional. Whether or not use the outputs schema in the prompt (Default to False) (see Generator). | `False` | | `return_reward` | `bool` | Optional. Whether or not to compute an intermediate reward. | `True` | | `return_inputs` | `bool` | Optional. Whether or not to concatenate the inputs to the outputs (Default to True) (see Generator). | `True` | | `name` | `str` | Optional. The name of the module. | `None` | | `description` | `str` | Optional. The description of the module. | `None` | | `trainable` | `bool` | Whether the module's variables should be trainable. | `True` | Source code in `synalinks/src/modules/ttc/self_critique.py` ```` @synalinks_export( [ "synalinks.modules.SelfCritique", "synalinks.SelfCritique", ] ) class SelfCritique(Module): """Useful to critique the given inputs. This component critique the inputs given and eventually generate an intermediate reward between 0.0 and 1.0. You can enable or disable the intermediate reward computation by using the `return_reward` flag (default to True). To have more accurate results, ensure that the inputs are provided along with the output to evaluate using `return_inputs` in your modules. Example: ```python import synalink import asyncio class Query(synalinks.DataModel): query: str = synalinks.Field( description="The user query", ) class Answer(synalinks.DataModel): answer: str = synalinks.Field( description="The correct answer", ) async def main(): language_model = synalinks.LanguageModel( model="ollama/mistral", ) x0 = synalinks.Input(data_model=Query) x1 = await synalinks.ChainOfThought( data_model=Answer, language_model=language_model, return_inputs=True, )(x0) x2 = await synalinks.SelfCritique( language_model=language_model, )(x1) program = synalinks.Program( inputs=x0, outputs=x2, name="answer_with_cot_and_self_critique", description="Useful to answer accurately", ) if __name__ == "__main__": asyncio.run(main()) ``` Args: language_model (LanguageModel): The language model to use. prompt_template (str): The jinja2 prompt template (see `Generator`). examples (list): The default list of examples, the examples are a list of tuples containing input/output JSON pairs. instructions (str): The default instructions being a string containing instructions for the language model. seed_instructions (list): Optional. A list of instructions to use as seed for the optimization. If not provided, use the default instructions as seed. temperature (float): Optional. The temperature for the LM call. use_inputs_schema (bool): Optional. Whether or not use the inputs schema in the prompt (Default to False) (see `Generator`). use_outputs_schema (bool): Optional. Whether or not use the outputs schema in the prompt (Default to False) (see `Generator`). return_reward (bool): Optional. Whether or not to compute an intermediate reward. return_inputs (bool): Optional. Whether or not to concatenate the inputs to the outputs (Default to True) (see `Generator`). name (str): Optional. The name of the module. description (str): Optional. The description of the module. trainable (bool): Whether the module's variables should be trainable. """ def __init__( self, language_model=None, prompt_template=None, examples=None, instructions=None, seed_instructions=None, temperature=0.0, use_inputs_schema=False, use_outputs_schema=False, return_reward=True, return_inputs=True, name=None, description=None, trainable=True, ): super().__init__( name=name, description=description, trainable=trainable, ) self.language_model = language_model self.prompt_template = prompt_template self.examples = examples self.instructions = instructions self.seed_instructions = seed_instructions self.temperature = temperature self.use_inputs_schema = use_inputs_schema self.use_outputs_schema = use_outputs_schema self.return_reward = return_reward self.return_inputs = return_inputs if self.return_reward: schema = CritiqueWithReward.get_schema() else: schema = Critique.get_schema() self.generator = Generator( schema=schema, language_model=self.language_model, prompt_template=self.prompt_template, examples=self.examples, instructions=self.instructions, seed_instructions=self.seed_instructions, temperature=self.temperature, use_inputs_schema=self.use_inputs_schema, use_outputs_schema=self.use_outputs_schema, return_inputs=self.return_inputs, name="generator_" + self.name, ) async def call(self, inputs, training=False): return await self.generator(inputs, training=training) def get_config(self): config = { "prompt_template": self.prompt_template, "examples": self.examples, "instructions": self.instructions, "seed_instructions": self.seed_instructions, "temperature": self.temperature, "use_inputs_schema": self.use_inputs_schema, "use_outputs_schema": self.use_outputs_schema, "return_reward": self.return_reward, "return_inputs": self.return_inputs, "name": self.name, "description": self.description, "trainable": self.trainable, } language_model_config = { "language_model": serialization_lib.serialize_synalinks_object( self.language_model, ) } return { **config, **language_model_config, } @classmethod def from_config(cls, config): language_model = serialization_lib.deserialize_synalinks_object( config.pop("language_model"), ) return cls( language_model=language_model, **config, ) ```` ## `CosineSimilarity` Bases: `RewardFunctionWrapper` Computes the cosine similarity between `y_true` and `y_pred`. Formula: ``` reward = (sum(l2_norm(y_true) * l2_norm(y_pred))+1) / 2 ``` The formula is similar to the classic cosine similarity used in deep learning, but scaled to [0.0, 1.0] and adjusted to have a reward that tend towards 1.0 if the two objects are similar (and 0.0 otherwise). Example: ``` program.compile( reward=synalinks.rewards.CosineSimilarity( embedding_model=embedding_model ) optimizer=synalinks.optimizers.RandomFewShot(), ) ``` Parameters: | Name | Type | Description | Default | | ----------------- | ---------------- | --------------------------------------------------------------------------------------- | --------------------- | | `embedding_model` | `EmbeddingModel` | The embedding model to use to compute the cosine similarity. | `None` | | `axis` | `int` | (Optional) Defaults to -1. The dimension along which the cosine similarity is computed. | `-1` | | `name` | `str` | (Optional) string name of the reward instance. | `'cosine_similarity'` | | `in_mask` | `list` | (Optional) list of keys to keep to compute the reward. | `None` | | `out_mask` | `list` | (Optional) list of keys to remove to compute the reward. | `None` | Source code in `synalinks/src/rewards/cosine_similarity.py` ```` @synalinks_export( [ "synalinks.CosineSimilarity", "synalinks.rewards.CosineSimilarity", ] ) class CosineSimilarity(RewardFunctionWrapper): """ Computes the cosine similarity between `y_true` and `y_pred`. Formula: ``` reward = (sum(l2_norm(y_true) * l2_norm(y_pred))+1) / 2 ``` The formula is similar to the classic cosine similarity used in deep learning, but scaled to [0.0, 1.0] and adjusted to have a reward that tend towards 1.0 if the two objects are similar (and 0.0 otherwise). Example: ```python program.compile( reward=synalinks.rewards.CosineSimilarity( embedding_model=embedding_model ) optimizer=synalinks.optimizers.RandomFewShot(), ) ``` Args: embedding_model (EmbeddingModel): The embedding model to use to compute the cosine similarity. axis (int): (Optional) Defaults to `-1`. The dimension along which the cosine similarity is computed. name (str): (Optional) string name of the reward instance. in_mask (list): (Optional) list of keys to keep to compute the reward. out_mask (list): (Optional) list of keys to remove to compute the reward. """ def __init__( self, embedding_model=None, axis=-1, name="cosine_similarity", in_mask=None, out_mask=None, ): super().__init__( fn=cosine_similarity, name=name, in_mask=in_mask, out_mask=out_mask, axis=axis, embedding_model=embedding_model, ) def get_config(self): config = Reward.get_config() from synalinks.src.saving.serialization_lib import serialize_synalinks_object embedding_model_config = { "embedding_model": serialize_synalinks_object(self.embedding_model) } return {**config, **embedding_model_config} @classmethod def from_config(cls, config): from synalinks.saving.serialization_lib import deserialize_synalinks_object embedding_model = deserialize_synalinks_object(config.pop("embedding_model")) return cls(embedding_model=embedding_model, **config) ```` ## `cosine_similarity(y_true, y_pred, embedding_model=None, axis=-1)` Computes the cosine similarity between `y_true` and `y_pred`. Formula: ``` reward = (sum(l2_norm(y_true) * l2_norm(y_pred))+1) / 2 ``` The formula is similar to the classic cosine similarity used in deep learning, but scaled to [0.0, 1.0] and adjusted to have a reward that tend towards 1.0 if the two objects are similar (and 0.0 otherwise). Parameters: | Name | Type | Description | Default | | ----------------- | ---------------- | --------------------------------------------------------------------------------------- | ---------- | | `y_true` | `JsonDataModel` | The ground truth JSON data_model. | *required* | | `y_pred` | `JsonDataModel` | The predicted JSON data_model. | *required* | | `embedding_model` | `EmbeddingModel` | The embedding model to use to compute the cosine similarity. | `None` | | `axis` | `int` | (Optional) Defaults to -1. The dimension along which the cosine similarity is computed. | `-1` | Returns: | Type | Description | | ------- | ----------------------------------------------------------------------------------------- | | `float` | The reward value, which tend to 1.0 if the values are similar, and towards 0.0 otherwise. | Source code in `synalinks/src/rewards/cosine_similarity.py` ```` @synalinks_export("synalinks.rewards.cosine_similarity") async def cosine_similarity(y_true, y_pred, embedding_model=None, axis=-1): """ Computes the cosine similarity between `y_true` and `y_pred`. Formula: ``` reward = (sum(l2_norm(y_true) * l2_norm(y_pred))+1) / 2 ``` The formula is similar to the classic cosine similarity used in deep learning, but scaled to [0.0, 1.0] and adjusted to have a reward that tend towards 1.0 if the two objects are similar (and 0.0 otherwise). Args: y_true (JsonDataModel): The ground truth JSON data_model. y_pred (JsonDataModel): The predicted JSON data_model. embedding_model (EmbeddingModel): The embedding model to use to compute the cosine similarity. axis (int): (Optional) Defaults to `-1`. The dimension along which the cosine similarity is computed. Returns: (float): The reward value, which tend to 1.0 if the values are similar, and towards 0.0 otherwise. """ reward = 0.0 if y_pred is not None: y_true = await ops.embedding(y_true, embedding_model=embedding_model) y_pred = await ops.embedding(y_pred, embedding_model=embedding_model) y_true = np.convert_to_tensor(y_true.get("embeddings")) y_pred = np.convert_to_tensor(y_pred.get("embeddings")) y_true, y_pred = squeeze_or_expand_to_same_rank(y_true, y_pred) y_pred = np.normalize(y_pred, axis=axis) y_true = np.normalize(y_true, axis=axis) reward = (np.sum(y_true * y_pred, axis=axis) + 1) / 2 return reward ```` ## `ExactMatch` Bases: `RewardFunctionWrapper` Computes the exact match between `y_true` and `y_pred`. Example: ``` program.compile( reward=synalinks.rewards.ExactMatch(), optimizer=synalinks.optimizers.RandomFewShot(), ) ``` Parameters: | Name | Type | Description | Default | | ---------- | ------ | ------------------------------------------------------- | --------------- | | `name` | `str` | Optional. string name of the reward instance. | `'exact_match'` | | `in_mask` | `list` | Optional. list of keys to keep to compute the reward. | `None` | | `out_mask` | `list` | Optional. list of keys to remove to compute the reward. | `None` | Source code in `synalinks/src/rewards/exact_match.py` ```` @synalinks_export( [ "synalinks.ExactMatch", "synalinks.rewards.ExactMatch", ] ) class ExactMatch(RewardFunctionWrapper): """Computes the exact match between `y_true` and `y_pred`. Example: ```python program.compile( reward=synalinks.rewards.ExactMatch(), optimizer=synalinks.optimizers.RandomFewShot(), ) ``` Args: name (str): Optional. string name of the reward instance. in_mask (list): Optional. list of keys to keep to compute the reward. out_mask (list): Optional. list of keys to remove to compute the reward. """ def __init__( self, name="exact_match", in_mask=None, out_mask=None, ): super().__init__( fn=exact_match, name=name, in_mask=in_mask, out_mask=out_mask, ) def get_config(self): return { "name": self.name, "in_mask": self.in_mask, "out_mask": self.out_mask, } @classmethod def from_config(cls, config): return cls(**config) ```` ## `exact_match(y_true, y_pred)` Computes the exact match between `y_true` and `y_pred`. If their values are equal, it returns a reward of 1.0; otherwise, it returns 0.0. Parameters: | Name | Type | Description | Default | | -------- | --------------- | --------------------------------- | ---------- | | `y_true` | `JsonDataModel` | The ground truth JSON data_model. | *required* | | `y_pred` | `JsonDataModel` | The predicted JSON data_model. | *required* | Returns: | Type | Description | | ------- | ------------------------------------------------------------------------------ | | `float` | The reward value, which is 1.0 if the values match exactly, and 0.0 otherwise. | Source code in `synalinks/src/rewards/exact_match.py` ``` @synalinks_export("synalinks.rewards.exact_match") async def exact_match(y_true, y_pred): """ Computes the exact match between `y_true` and `y_pred`. If their values are equal, it returns a reward of 1.0; otherwise, it returns 0.0. Args: y_true (JsonDataModel): The ground truth JSON data_model. y_pred (JsonDataModel): The predicted JSON data_model. Returns: (float): The reward value, which is 1.0 if the values match exactly, and 0.0 otherwise. """ reward = 0.0 if y_pred is not None: if y_pred.get_json() == y_true.get_json(): reward = 1.0 return reward ``` ## `LMAsJudge` Bases: `ProgramAsJudge` Evaluate the output of a program using a `LanguageModel`. Example: ``` async def main(): # ... program definition program.compile( reward=synalinks.rewards.LMAsJudge( language_model=language_model, ) optimizer=synalinks.optimizers.RandomFewShot(), ) history = await program.fit(...) ``` Parameters: | Name | Type | Description | Default | | ----------------- | --------------- | ---------------------------------------------------------- | --------------- | | `language_model` | `LanguageModel` | The language model to use. | `None` | | `prompt_template` | `str` | The default jinja2 prompt template to use (see Generator). | `None` | | `instructions` | `list` | The default instructions to use (see Generator). | `None` | | `examples` | `list` | The default examples to use in the prompt (see Generator). | `None` | | `name` | `str` | Optional. string name of the reward instance. | `'lm_as_judge'` | | `in_mask` | `list` | Optional. list of keys to keep to compute the reward. | `None` | | `out_mask` | `list` | Optional. list of keys to remove to compute the reward. | `None` | Source code in `synalinks/src/rewards/lm_as_judge.py` ```` @synalinks_export( [ "synalinks.LMAsJudge", "synalinks.rewards.LMAsJudge", ] ) class LMAsJudge(ProgramAsJudge): """Evaluate the output of a program using a `LanguageModel`. Example: ```python async def main(): # ... program definition program.compile( reward=synalinks.rewards.LMAsJudge( language_model=language_model, ) optimizer=synalinks.optimizers.RandomFewShot(), ) history = await program.fit(...) ``` Args: language_model (LanguageModel): The language model to use. prompt_template (str): The default jinja2 prompt template to use (see `Generator`). instructions (list): The default instructions to use (see `Generator`). examples (list): The default examples to use in the prompt (see `Generator`). name (str): Optional. string name of the reward instance. in_mask (list): Optional. list of keys to keep to compute the reward. out_mask (list): Optional. list of keys to remove to compute the reward. """ def __init__( self, language_model=None, prompt_template=None, examples=None, instructions=None, name="lm_as_judge", in_mask=None, out_mask=None, ): program = LMAsJudgeProgram( language_model=language_model, prompt_template=prompt_template, examples=examples, instructions=instructions, ) super().__init__( program=program, name=name, in_mask=in_mask, out_mask=out_mask, ) ```` ## `LMAsJudgeProgram` Bases: `Program` Evaluate the output of a program using a `LanguageModel`. Parameters: | Name | Type | Description | Default | | ----------------- | --------------- | ---------------------------------------------------------- | ------- | | `language_model` | `LanguageModel` | The language model to use. | `None` | | `prompt_template` | `str` | The default jinja2 prompt template to use (see Generator). | `None` | | `examples` | `list` | The default examples to use in the prompt (see Generator). | `None` | | `instructions` | `list` | The default instructions to use (see Generator). | `None` | | `name` | `str` | Optional. The name of the program. | `None` | | `description` | `str` | Optional. The description of the program. | `None` | | `trainable` | `bool` | Whether the program's variables should be trainable. | `True` | Source code in `synalinks/src/rewards/lm_as_judge.py` ``` class LMAsJudgeProgram(Program): """Evaluate the output of a program using a `LanguageModel`. Args: language_model (LanguageModel): The language model to use. prompt_template (str): The default jinja2 prompt template to use (see `Generator`). examples (list): The default examples to use in the prompt (see `Generator`). instructions (list): The default instructions to use (see `Generator`). name (str): Optional. The name of the program. description (str): Optional. The description of the program. trainable (bool): Whether the program's variables should be trainable. """ def __init__( self, language_model=None, prompt_template=None, examples=None, instructions=None, name=None, description=None, trainable=True, ): super().__init__( name=name, description=description, trainable=trainable, ) self.critique = SelfCritique( language_model=language_model, prompt_template=prompt_template, examples=examples, instructions=instructions, name="self_critique_" + self.name, ) self.language_model = language_model self.prompt_template = prompt_template self.examples = examples self.instructions = instructions async def call(self, inputs): if not isinstance(inputs, (list, tuple)): raise ValueError("The inputs should be a list or tuple.") if len(inputs) != 2: raise ValueError("The inputs of the program should have a length of 2.") y_true = inputs[0] y_pred = inputs[1] if y_true: y_true = await ops.prefix( y_true, prefix="gold", name="gold_y_true", ) return await self.critique( await ops.concat( y_true, y_pred, name="y_true_with_y_pred", ) ) else: return await self.critique(y_pred) def get_config(self): config = { "prompt_template": self.prompt_template, "examples": self.examples, "instructions": self.instructions, "name": self.name, "description": self.description, "trainable": self.trainable, } language_model_config = { "language_model": serialization_lib.serialize_synalinks_object( self.language_model ) } return {**language_model_config, **config} @classmethod def from_config(cls, config): language_model = serialization_lib.deserialize_synalinks_object( config.pop("language_model") ) return cls(language_model=language_model, **config) ``` ## `ProgramAsJudge` Bases: `Reward` Wrap a `Program` into a `Reward`. You can use this to create advanced reward functions that use a Synalinks `Program`. The program should have two inputs and one output. **Note:** The output data model/schema should have a field named `reward`. Example: ``` # ... your program declaration program = synalinks.Program( inputs=x0, outputs=xn, ) program.compile( reward=synalinks.rewards.ProgramAsJudge(program=program) optimizer=synalinks.optimizers.RandomFewShot(), ) ``` Parameters: | Name | Type | Description | Default | | ---------- | --------- | ------------------------------------------------------- | ---------- | | `program` | `Program` | The reward program to wrap. | *required* | | `name` | `str` | Optional. string name of the reward instance. | `None` | | `in_mask` | `list` | Optional. list of keys to keep to compute the reward. | `None` | | `out_mask` | `list` | Optional. list of keys to remove to compute the reward. | `None` | Source code in `synalinks/src/rewards/reward_wrappers.py` ```` @synalinks_export( [ "synalinks.ProgramAsJudge", "synalinks.rewards.ProgramAsJudge", ] ) class ProgramAsJudge(Reward): """Wrap a `Program` into a `Reward`. You can use this to create advanced reward functions that use a Synalinks `Program`. The program should have two inputs and one output. **Note:** The output data model/schema should have a field named `reward`. Example: ```python # ... your program declaration program = synalinks.Program( inputs=x0, outputs=xn, ) program.compile( reward=synalinks.rewards.ProgramAsJudge(program=program) optimizer=synalinks.optimizers.RandomFewShot(), ) ``` Args: program (Program): The reward program to wrap. name (str): Optional. string name of the reward instance. in_mask (list): Optional. list of keys to keep to compute the reward. out_mask (list): Optional. list of keys to remove to compute the reward. """ def __init__( self, program, reduction="mean", name=None, in_mask=None, out_mask=None, ): super().__init__( name=name, reduction=reduction, in_mask=in_mask, out_mask=out_mask, ) self.program = program async def call(self, y_true, y_pred): result = await self.program([y_true, y_pred]) return float(result.get("reward", 0.0)) def get_config(self): config = super().get_config() config.update({"program": self.program}) return config def __repr__(self): return f"" ```` ## `RewardFunctionWrapper` Bases: `Reward` Wrap a stateless function into a `Reward`. You can use this to quickly build a reward from a function. The function needs to have the signature `fn(y_true, y_pred)`. Example: ``` def my_reward(y_true, y_pred): # ... return reward program.compile( reward=synalinks.rewards.RewardFunctionWrapper, optimizer=synalinks.optimizers.RandomFewShot() ) ``` Parameters: | Name | Type | Description | Default | | ---------- | ------------------- | --------------------------------------------------------------------------- | ---------- | | `fn` | `callable` | The reward function to wrap, with signature fn(y_true, y_pred, \*\*kwargs). | *required* | | `name` | `str` | Optional. string name of the reward instance. | `None` | | `in_mask` | `list` | Optional. list of keys to keep to compute the reward. | `None` | | `out_mask` | `list` | Optional. list of keys to remove to compute the reward. | `None` | | `**kwargs` | `keyword arguments` | Keyword arguments to pass on to fn. | `{}` | Source code in `synalinks/src/rewards/reward_wrappers.py` ```` @synalinks_export("synalinks.rewards.RewardFunctionWrapper") class RewardFunctionWrapper(Reward): """Wrap a stateless function into a `Reward`. You can use this to quickly build a reward from a function. The function needs to have the signature `fn(y_true, y_pred)`. Example: ```python def my_reward(y_true, y_pred): # ... return reward program.compile( reward=synalinks.rewards.RewardFunctionWrapper, optimizer=synalinks.optimizers.RandomFewShot() ) ``` Args: fn (callable): The reward function to wrap, with signature `fn(y_true, y_pred, **kwargs)`. name (str): Optional. string name of the reward instance. in_mask (list): Optional. list of keys to keep to compute the reward. out_mask (list): Optional. list of keys to remove to compute the reward. **kwargs (keyword arguments): Keyword arguments to pass on to `fn`. """ def __init__( self, fn, reduction="mean", name=None, in_mask=None, out_mask=None, **kwargs, ): super().__init__( name=name, reduction=reduction, in_mask=in_mask, out_mask=out_mask, ) self.fn = fn self._fn_kwargs = kwargs async def call(self, y_true, y_pred): return await self.fn(y_true, y_pred, **self._fn_kwargs) def get_config(self): config = super().get_config() config.update({"fn": serialization_lib.serialize_synalinks_object(self.fn)}) config.update(serialization_lib.serialize_synalinks_object(self._fn_kwargs)) return config @classmethod def from_config(cls, config): if "fn" in config: config = serialization_lib.deserialize_synalinks_object(config) return cls(**config) def __repr__(self): return f"" ````