Python Reference
Related Topics: Python Endpoints Tutorial (step-by-step guide) | Plugins (custom UDFs) | Configuration (secrets access) | Type System (parameter types)
Python endpoints have access to the mxcp.runtime module, which provides database access, configuration, secrets, and lifecycle management.
Quick Example
Section titled “Quick Example”from mxcp.runtime import db, config, plugins, on_init, on_shutdown, reload_duckdb
def my_endpoint(param: str) -> dict: # Query database results = db.execute("SELECT * FROM users WHERE name = $name", {"name": param})
# Access secrets api_key_params = config.get_secret("api_key")
# Get configuration project = config.get_setting("project")
return {"users": results, "project": project}Database Access
Section titled “Database Access”db.execute()
Section titled “db.execute()”Execute SQL query and return results as list of dictionaries.
# Simple queryusers = db.execute("SELECT * FROM users")
# Parameterized queryresult = db.execute( "SELECT * FROM orders WHERE customer_id = $id AND status = $status", {"id": 123, "status": "pending"})
# Insert/Update (returns affected rows)db.execute( "INSERT INTO logs (message, timestamp) VALUES ($msg, NOW())", {"msg": "Event occurred"})Parameters:
query(str): SQL query to executeparams(dict, optional): Named parameters
Returns: list[dict] - Query results
Configuration & Secrets
Section titled “Configuration & Secrets”config.get_secret()
Section titled “config.get_secret()”Get secret parameters as dictionary.
# Value-type secretssecret_params = config.get_secret("api_key")# Returns: {"value": "secret-value"}
api_key = secret_params["value"] if secret_params else None
# HTTP secretshttp_secret = config.get_secret("http_auth")# Returns: {"BEARER_TOKEN": "...", "EXTRA_HTTP_HEADERS": {...}}
# Check if secret existsif config.get_secret("optional_secret"): # Secret is configured passParameters:
name(str): Secret name
Returns: dict | None - Secret parameters or None
config.get_setting()
Section titled “config.get_setting()”Get configuration value from site config.
# Get required settingproject = config.get_setting("project")
# Get with defaultdebug = config.get_setting("debug", default=False)extensions = config.get_setting("extensions", default=[])timeout = config.get_setting("timeout", default=30)
# Nested key access (use dot notation)dbt_enabled = config.get_setting("dbt.enabled", default=False)db_path = config.get_setting("profiles.default.duckdb.path")Parameters:
key(str): Setting key (supports dot notation for nested access)default(any, optional): Default value if not found
Returns: Setting value or default
config.user_config
Section titled “config.user_config”Access full user configuration as dictionary.
user_cfg = config.user_configif user_cfg: project_name = user_cfg.get("site") projects = user_cfg.get("projects", {}) active_project = projects.get(project_name, {})Returns: dict | None - User configuration or None
config.site_config
Section titled “config.site_config”Access full site configuration as dictionary.
site_cfg = config.site_configif site_cfg: project = site_cfg.get("project") profile = site_cfg.get("profile") secrets = site_cfg.get("secrets", [])Returns: dict | None - Site configuration or None
Plugin Access
Section titled “Plugin Access”plugins.get()
Section titled “plugins.get()”Get plugin instance by name.
my_plugin = plugins.get("custom_plugin")if my_plugin: result = my_plugin.process_data(data)Parameters:
name(str): Plugin instance name
Returns: Plugin instance or None
plugins.list()
Section titled “plugins.list()”Get list of available plugin names.
available = plugins.list()# Returns: ["plugin1", "plugin2", ...]
for name in plugins.list(): plugin = plugins.get(name) print(f"Plugin: {name}")Returns: list[str] - Plugin names
MCP Logging & Progress
Section titled “MCP Logging & Progress”The mxcp.runtime.mcp proxy provides MCP-aware logging. When running in FastMCP, calls are forwarded to the client; during CLI/tests they fall back to server logging.
from mxcp.runtime import mcp
async def run_job() -> None: await mcp.info("Starting ingestion") await mcp.progress(1, 4, "Fetched source metadata") await mcp.warning("External API is slow today") await mcp.progress(4, 4, "Done")Logging Functions
Section titled “Logging Functions”# Log levelsawait mcp.debug("Debug message")await mcp.info("Info message")await mcp.warning("Warning message")await mcp.error("Error message")
# Progress reportingawait mcp.progress(current=1, total=10, message="Processing item 1")Synchronous Usage
Section titled “Synchronous Usage”For synchronous endpoints:
import asyncio
def sync_endpoint(data: str) -> dict: asyncio.run(mcp.info("Processing data")) return {"result": process(data)}Lifecycle Hooks
Section titled “Lifecycle Hooks”@on_init
Section titled “@on_init”Register function to run when server starts.
from mxcp.runtime import on_init
@on_initdef setup(): print("Server starting up") # Initialize resources # Load caches # Connect to servicesUse Cases:
- Warm up caches
- Initialize connections
- Validate configuration
- Load static data
@on_shutdown
Section titled “@on_shutdown”Register function to run when server stops.
from mxcp.runtime import on_shutdown
@on_shutdowndef cleanup(): print("Server shutting down") # Close connections # Flush buffers # Clean up resourcesUse Cases:
- Close database connections
- Flush log buffers
- Release external resources
- Save state
Multiple Hooks
Section titled “Multiple Hooks”You can register multiple functions for the same lifecycle event. Hooks are executed sequentially in the order they were registered (typically the order they appear in your code). If a hook raises an exception, it is logged but subsequent hooks still execute.
@on_initdef setup_database(): print("Setting up database connections") # Runs first
@on_initdef setup_cache(): print("Warming up cache") # Runs second
@on_shutdowndef close_database(): print("Closing database connections") # Runs first on shutdown
@on_shutdowndef flush_logs(): print("Flushing log buffers") # Runs second on shutdownReload Management
Section titled “Reload Management”reload_duckdb()
Section titled “reload_duckdb()”Request an asynchronous system reload.
from mxcp.runtime import reload_duckdbimport subprocessimport shutil
def replace_database(): """Payload function - runs with all connections closed.""" # Run dbt to rebuild models subprocess.run(["dbt", "run"], check=True)
# Or copy a new database file shutil.copy("/data/updated.duckdb", "/app/data.duckdb")
# Schedule reload with database replacementreload_duckdb( payload_func=replace_database, description="Replacing database with updated version")
# Or just reload configurationreload_duckdb()
# Return immediately - reload happens asynchronouslyreturn {"status": "Reload scheduled"}Parameters:
payload_func(callable, optional): Function to run during reloaddescription(str, optional): Description for logging
Reload Process:
- Queues reload request
- Active requests are drained
- Runtime shuts down (Python hooks + DuckDB)
- Payload function runs (if provided)
- Runtime restarts with fresh configuration
Use Cases:
- Update DuckDB data without restart
- Run ETL pipelines on demand
- Refresh materialized views
- Swap in pre-built database files
- Reload configuration after secret rotation
Important Notes:
- Returns immediately (non-blocking)
- Reload happens asynchronously after the current request completes
- Only one reload processes at a time
- Payload runs with connections closed
- Cannot wait for completion from MCP tools
- Only available when called from within MXCP endpoints
When NOT to Use:
For most database operations, you don’t need reload_duckdb(). Use db.execute() directly since DuckDB supports concurrent operations through its MVCC transactional model:
# Simple updates - no reload neededdb.execute("INSERT INTO users VALUES ($name)", {"name": "Alice"})db.execute("UPDATE stats SET value = $value", {"value": 100})Only use reload_duckdb() when external tools need exclusive database access.
Type Compatibility
Section titled “Type Compatibility”Python return values must match declared endpoint return types:
| YAML Type | Python Type | Example |
|---|---|---|
string | str | "hello" |
integer | int | 42 |
number | float | 3.14 |
boolean | bool | True |
array | list | [1, 2, 3] |
object | dict | {"key": "value"} |
Type Examples
Section titled “Type Examples”# String returndef get_name(id: int) -> str: return "Alice"
# Integer returndef get_count() -> int: return 42
# Number returndef get_average() -> float: return 3.14
# Boolean returndef is_valid(id: int) -> bool: return True
# Array returndef get_users() -> list: return [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]
# Object returndef get_user(id: int) -> dict: return {"id": id, "name": "Alice", "active": True}Complex Types
Section titled “Complex Types”Nested structures are fully supported - objects can contain arrays, and arrays can contain objects:
# Nested objectsdef get_report() -> dict: return { "summary": { "total": 100, "average": 42.5 }, "items": [ {"id": 1, "value": 10}, {"id": 2, "value": 20} ] }
# Arrays of objectsdef get_orders() -> list: return [ {"id": 1, "items": ["a", "b"]}, {"id": 2, "items": ["c", "d"]} ]Context Availability
Section titled “Context Availability”The runtime context is automatically set when MXCP calls your function:
def my_endpoint(param: str) -> dict: # db, config, plugins are all available # Context is thread-safe and isolated between requests users = db.execute("SELECT * FROM users") return {"users": users}Error Handling
Section titled “Error Handling”Handle errors gracefully by returning error information in your response rather than raising exceptions. Unhandled exceptions will result in MCP error responses to the client.
def safe_endpoint(id: int) -> dict: try: result = db.execute("SELECT * FROM users WHERE id = $id", {"id": id}) if not result: return {"error": "User not found", "id": id} return result[0] except Exception as e: return {"error": str(e)}Complete Example
Section titled “Complete Example”A full endpoint demonstrating lifecycle hooks, configuration, database access, plugins, and MCP logging:
from mxcp.runtime import db, config, plugins, on_init, on_shutdown, mcpimport asyncio
# Initialization@on_initdef setup(): print("Starting up...")
@on_shutdowndef cleanup(): print("Shutting down...")
# Main endpointasync def analyze_data(department: str, limit: int = 10) -> dict: await mcp.info(f"Analyzing {department}")
# Get configuration threshold = config.get_setting("threshold", default=100)
# Query database await mcp.progress(1, 3, "Fetching data") employees = db.execute( "SELECT * FROM employees WHERE department = $dept LIMIT $limit", {"dept": department, "limit": limit} )
# Process with plugin await mcp.progress(2, 3, "Processing") analytics = plugins.get("analytics") if analytics: stats = analytics.calculate_stats(employees) else: stats = {"count": len(employees)}
# Return results await mcp.progress(3, 3, "Complete") return { "department": department, "employees": employees, "stats": stats, "threshold": threshold }Next Steps
Section titled “Next Steps”- Plugin Reference - Plugin development
- Tutorials - Python examples
- Type System - Type mapping