@synalinks_export("synalinks.hooks.Monitor")
class Monitor(Hook):
"""Monitor hook for sending module call traces to a remote endpoint in realtime.
This hook sends trace data immediately to a specified endpoint for realtime monitoring.
Traces are sent asynchronously using asyncio to avoid blocking module execution.
You can enable monitoring for every modules by using `synalinks.enable_observability()`
at the beginning of your scripts:
Example:
```python
import synalinks
synalinks.enable_observability()
```
Args:
timeout: Request timeout in seconds (default: 5).
headers: Optional additional headers to include in requests
"""
def __init__(
self,
timeout=5,
headers=None,
):
super().__init__()
self.endpoint = api_base()
self.timeout = timeout
if api_key() is not None and not headers:
headers = {"Authorization": api_key()}
self.headers = headers or {}
self._session = None
self.call_start_times = {}
self._pending_tasks = []
self.logger = logging.getLogger(__name__)
async def _get_session(self):
"""Get or create aiohttp session."""
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession(headers=self.headers)
return self._session
async def _post_trace(self, data: dict):
"""POST trace data to the endpoint asynchronously."""
url = f"{self.endpoint}/trace"
try:
session = await self._get_session()
async with session.post(
url,
json=data,
timeout=aiohttp.ClientTimeout(total=self.timeout),
) as response:
response.raise_for_status()
self.logger.debug(
f"Trace sent successfully: {data.get('event')} for call {data.get('call_id')}"
)
except aiohttp.ClientError as e:
self.logger.error(f"Failed to send trace to {url}: {e}")
except asyncio.TimeoutError:
self.logger.error(f"Timeout sending trace to {url}")
except Exception as e:
self.logger.error(f"Unexpected error sending trace: {e}")
def _send_trace_async(self, trace_data: dict):
"""Send trace asynchronously without blocking."""
try:
loop = asyncio.get_event_loop()
except RuntimeError:
# No event loop in current thread, create a new one
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Create task and store reference to prevent garbage collection
task = loop.create_task(self._post_trace(trace_data))
self._pending_tasks.append(task)
# Clean up completed tasks
self._pending_tasks = [t for t in self._pending_tasks if not t.done()]
def _extract_data_models_info(self, data):
"""Extract data model information from inputs/outputs."""
if not data:
return None
flatten_data = tree.flatten(data)
if any_symbolic_data_models(data):
schemas = [dm.get_schema() for dm in flatten_data if dm is not None]
return {
"type": "symbolic",
"schemas": schemas,
}
else:
jsons = [dm.get_json() for dm in flatten_data if dm is not None]
return {
"type": "data",
"data": jsons,
}
def on_call_begin(
self,
call_id,
inputs=None,
):
"""Called when a module call begins."""
self.call_start_times[call_id] = time.time()
trace_data = {
"event": "call_begin",
"call_id": call_id,
"module_name": self.module.name,
"module_description": self.module.description,
"timestamp": self.call_start_times[call_id],
"inputs": self._extract_data_models_info(inputs),
}
self._send_trace_async(trace_data)
def on_call_end(
self,
call_id,
outputs=None,
exception=None,
):
"""Called when a module call ends."""
end_time = time.time()
start_time = self.call_start_times.pop(call_id, end_time)
duration = end_time - start_time
trace_data = {
"event": "call_end",
"call_id": call_id,
"module_name": self.module.name,
"module_description": self.module.description,
"timestamp": end_time,
"duration": duration,
"outputs": self._extract_data_models_info(outputs),
"exception": str(exception) if exception else None,
"success": exception is None,
}
self._send_trace_async(trace_data)
async def _cleanup(self):
"""Wait for pending tasks and close session."""
if self._pending_tasks:
await asyncio.gather(*self._pending_tasks, return_exceptions=True)
if self._session and not self._session.closed:
await self._session.close()
def __del__(self):
"""Cleanup pending traces and close session."""
if hasattr(self, "_pending_tasks") and self._pending_tasks:
try:
loop = asyncio.get_event_loop()
if loop.is_running():
# If loop is running, schedule cleanup
loop.create_task(self._cleanup())
else:
# If loop is not running, run cleanup
loop.run_until_complete(self._cleanup())
except Exception:
pass