Skip to content

Monitor

Monitor

Bases: Hook

Monitor hook for sending module call traces to a remote endpoint in realtime.

This hook sends trace data immediately to a specified endpoint for realtime monitoring. Traces are sent asynchronously using asyncio to avoid blocking module execution.

You can enable monitoring for every modules by using synalinks.enable_observability() at the beginning of your scripts:

Example:

import synalinks

synalinks.enable_observability()

Parameters:

Name Type Description Default
timeout

Request timeout in seconds (default: 5).

5
headers

Optional additional headers to include in requests

None
Source code in synalinks/src/hooks/monitor.py
@synalinks_export("synalinks.hooks.Monitor")
class Monitor(Hook):
    """Monitor hook for sending module call traces to a remote endpoint in realtime.

    This hook sends trace data immediately to a specified endpoint for realtime monitoring.
    Traces are sent asynchronously using asyncio to avoid blocking module execution.

    You can enable monitoring for every modules by using `synalinks.enable_observability()`
    at the beginning of your scripts:

    Example:

    ```python
    import synalinks

    synalinks.enable_observability()
    ```

    Args:
        timeout: Request timeout in seconds (default: 5).
        headers: Optional additional headers to include in requests
    """

    def __init__(
        self,
        timeout=5,
        headers=None,
    ):
        super().__init__()
        self.endpoint = api_base()
        self.timeout = timeout
        if api_key() is not None and not headers:
            headers = {"Authorization": api_key()}
        self.headers = headers or {}
        self._session = None
        self.call_start_times = {}
        self._pending_tasks = []
        self.logger = logging.getLogger(__name__)

    async def _get_session(self):
        """Get or create aiohttp session."""
        if self._session is None or self._session.closed:
            self._session = aiohttp.ClientSession(headers=self.headers)
        return self._session

    async def _post_trace(self, data: dict):
        """POST trace data to the endpoint asynchronously."""
        url = f"{self.endpoint}/trace"

        try:
            session = await self._get_session()
            async with session.post(
                url,
                json=data,
                timeout=aiohttp.ClientTimeout(total=self.timeout),
            ) as response:
                response.raise_for_status()
                self.logger.debug(
                    f"Trace sent successfully: {data.get('event')} for call {data.get('call_id')}"
                )
        except aiohttp.ClientError as e:
            self.logger.error(f"Failed to send trace to {url}: {e}")
        except asyncio.TimeoutError:
            self.logger.error(f"Timeout sending trace to {url}")
        except Exception as e:
            self.logger.error(f"Unexpected error sending trace: {e}")

    def _send_trace_async(self, trace_data: dict):
        """Send trace asynchronously without blocking."""
        try:
            loop = asyncio.get_event_loop()
        except RuntimeError:
            # No event loop in current thread, create a new one
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)

        # Create task and store reference to prevent garbage collection
        task = loop.create_task(self._post_trace(trace_data))
        self._pending_tasks.append(task)

        # Clean up completed tasks
        self._pending_tasks = [t for t in self._pending_tasks if not t.done()]

    def _extract_data_models_info(self, data):
        """Extract data model information from inputs/outputs."""
        if not data:
            return None

        flatten_data = tree.flatten(data)

        if any_symbolic_data_models(data):
            schemas = [dm.get_schema() for dm in flatten_data if dm is not None]
            return {
                "type": "symbolic",
                "schemas": schemas,
            }
        else:
            jsons = [dm.get_json() for dm in flatten_data if dm is not None]
            return {
                "type": "data",
                "data": jsons,
            }

    def on_call_begin(
        self,
        call_id,
        inputs=None,
    ):
        """Called when a module call begins."""
        self.call_start_times[call_id] = time.time()

        trace_data = {
            "event": "call_begin",
            "call_id": call_id,
            "module_name": self.module.name,
            "module_description": self.module.description,
            "timestamp": self.call_start_times[call_id],
            "inputs": self._extract_data_models_info(inputs),
        }

        self._send_trace_async(trace_data)

    def on_call_end(
        self,
        call_id,
        outputs=None,
        exception=None,
    ):
        """Called when a module call ends."""
        end_time = time.time()
        start_time = self.call_start_times.pop(call_id, end_time)
        duration = end_time - start_time

        trace_data = {
            "event": "call_end",
            "call_id": call_id,
            "module_name": self.module.name,
            "module_description": self.module.description,
            "timestamp": end_time,
            "duration": duration,
            "outputs": self._extract_data_models_info(outputs),
            "exception": str(exception) if exception else None,
            "success": exception is None,
        }

        self._send_trace_async(trace_data)

    async def _cleanup(self):
        """Wait for pending tasks and close session."""
        if self._pending_tasks:
            await asyncio.gather(*self._pending_tasks, return_exceptions=True)
        if self._session and not self._session.closed:
            await self._session.close()

    def __del__(self):
        """Cleanup pending traces and close session."""
        if hasattr(self, "_pending_tasks") and self._pending_tasks:
            try:
                loop = asyncio.get_event_loop()
                if loop.is_running():
                    # If loop is running, schedule cleanup
                    loop.create_task(self._cleanup())
                else:
                    # If loop is not running, run cleanup
                    loop.run_until_complete(self._cleanup())
            except Exception:
                pass

__del__()

Cleanup pending traces and close session.

Source code in synalinks/src/hooks/monitor.py
def __del__(self):
    """Cleanup pending traces and close session."""
    if hasattr(self, "_pending_tasks") and self._pending_tasks:
        try:
            loop = asyncio.get_event_loop()
            if loop.is_running():
                # If loop is running, schedule cleanup
                loop.create_task(self._cleanup())
            else:
                # If loop is not running, run cleanup
                loop.run_until_complete(self._cleanup())
        except Exception:
            pass

on_call_begin(call_id, inputs=None)

Called when a module call begins.

Source code in synalinks/src/hooks/monitor.py
def on_call_begin(
    self,
    call_id,
    inputs=None,
):
    """Called when a module call begins."""
    self.call_start_times[call_id] = time.time()

    trace_data = {
        "event": "call_begin",
        "call_id": call_id,
        "module_name": self.module.name,
        "module_description": self.module.description,
        "timestamp": self.call_start_times[call_id],
        "inputs": self._extract_data_models_info(inputs),
    }

    self._send_trace_async(trace_data)

on_call_end(call_id, outputs=None, exception=None)

Called when a module call ends.

Source code in synalinks/src/hooks/monitor.py
def on_call_end(
    self,
    call_id,
    outputs=None,
    exception=None,
):
    """Called when a module call ends."""
    end_time = time.time()
    start_time = self.call_start_times.pop(call_id, end_time)
    duration = end_time - start_time

    trace_data = {
        "event": "call_end",
        "call_id": call_id,
        "module_name": self.module.name,
        "module_description": self.module.description,
        "timestamp": end_time,
        "duration": duration,
        "outputs": self._extract_data_models_info(outputs),
        "exception": str(exception) if exception else None,
        "success": exception is None,
    }

    self._send_trace_async(trace_data)