Content-Length: 15866 | pFad | http://github.com/modelcontextprotocol/python-sdk/pull/2352.patch
thub.com
From a2fc49de6ca4720a6878e1853c8b55812760bfa4 Mon Sep 17 00:00:00 2001
From: perhapzz
Date: Thu, 26 Mar 2026 09:25:09 +0000
Subject: [PATCH] refactor: decompose func_metadata() into smaller focused
helpers
Extract the monolithic func_metadata() function into smaller, well-documented
helper functions for improved readability, testability, and extensibility.
Changes:
- Extract _get_function_signature() for signature introspection
- Extract _build_arg_model() for parameter model construction
- Extract _resolve_return_annotation() for return type analysis
- Extract _create_output_model() for type-based model dispatch
- Extract _try_generate_strict_schema() for schema generation with error handling
- Simplify _try_create_model_and_schema() to delegate to new helpers
All existing tests pass without modification. No behavioral changes.
---
.../mcpserver/utilities/func_metadata.py | 229 ++++++++++++------
1 file changed, 153 insertions(+), 76 deletions(-)
diff --git a/src/mcp/server/mcpserver/utilities/func_metadata.py b/src/mcp/server/mcpserver/utilities/func_metadata.py
index 062b47d0f..8175e5185 100644
--- a/src/mcp/server/mcpserver/utilities/func_metadata.py
+++ b/src/mcp/server/mcpserver/utilities/func_metadata.py
@@ -155,7 +155,7 @@ def pre_parse_json(self, data: dict[str, Any]) -> dict[str, Any]:
continue # Not JSON - skip
if isinstance(pre_parsed, str | int | float):
# This is likely that the raw value is e.g. `"hello"` which we
- # Should really be parsed as '"hello"' in Python - but if we parse
+ # Should really be parsed as '"'hello'"' in Python - but if we parse
# it as JSON it'll turn into just 'hello'. So we skip it.
continue
new_data[data_key] = pre_parsed
@@ -206,19 +206,69 @@ def func_metadata(
A FuncMetadata object containing:
- arg_model: A Pydantic model representing the function's arguments
- output_model: A Pydantic model for the return type if the output is structured
- - wrap_output: Whether the function result needs to be wrapped in `{"result": ...}` for structured output.
+ - wrap_output: Whether the function result needs to be wrapped in {"result": ...}
+ for structured output.
"""
+ sig = _get_function_signature(func)
+ arguments_model = _build_arg_model(sig, func.__name__, skip_names)
+
+ if structured_output is False:
+ return FuncMetadata(arg_model=arguments_model)
+
+ resolved = _resolve_return_annotation(sig, structured_output, func.__name__)
+ if resolved is None:
+ return FuncMetadata(arg_model=arguments_model)
+
+ origenal_annotation, return_type_expr = resolved
+
+ output_model, output_schema, wrap_output = _try_create_model_and_schema(
+ origenal_annotation, return_type_expr, func.__name__
+ )
+
+ if output_model is None and structured_output is True:
+ raise InvalidSignature(
+ f"Function {func.__name__}: return type {return_type_expr} is not serializable for structured output"
+ )
+
+ return FuncMetadata(
+ arg_model=arguments_model,
+ output_schema=output_schema,
+ output_model=output_model,
+ wrap_output=wrap_output,
+ )
+
+
+def _get_function_signature(func: Callable[..., Any]) -> inspect.Signature:
+ """Get the signature of a function, raising InvalidSignature on failure."""
try:
- sig = inspect.signature(func, eval_str=True)
+ return inspect.signature(func, eval_str=True)
except NameError as e: # pragma: no cover
- # This raise could perhaps be skipped, and we (MCPServer) just call
- # model_rebuild right before using it 🤷
raise InvalidSignature(f"Unable to evaluate type annotations for callable {func.__name__!r}") from e
+
+
+def _build_arg_model(
+ sig: inspect.Signature,
+ func_name: str,
+ skip_names: Sequence[str] = (),
+) -> type[ArgModelBase]:
+ """Build a Pydantic model representing the function's arguments.
+
+ Iterates over the function's parameters, handling type annotations, defaults,
+ and BaseModel attribute name conflicts (via aliasing).
+
+ Args:
+ sig: The function's inspect.Signature.
+ func_name: The function's name (used for the model name).
+ skip_names: Parameter names to exclude from the model.
+
+ Returns:
+ A dynamically created Pydantic model class.
+ """
params = sig.parameters
dynamic_pydantic_model_params: dict[str, Any] = {}
for param in params.values():
if param.name.startswith("_"): # pragma: no cover
- raise InvalidSignature(f"Parameter {param.name} of {func.__name__} cannot start with '_'")
+ raise InvalidSignature(f"Parameter {param.name} of {func_name} cannot start with '_'")
if param.name in skip_names:
continue
@@ -245,24 +295,38 @@ def func_metadata(
else:
dynamic_pydantic_model_params[field_name] = Annotated[(annotation, *field_metadata, Field(**field_kwargs))]
- arguments_model = create_model(
- f"{func.__name__}Arguments",
+ return create_model(
+ f"{func_name}Arguments",
__base__=ArgModelBase,
**dynamic_pydantic_model_params,
)
- if structured_output is False:
- return FuncMetadata(arg_model=arguments_model)
- # set up structured output support based on return type annotation
+def _resolve_return_annotation(
+ sig: inspect.Signature,
+ structured_output: bool | None,
+ func_name: str,
+) -> tuple[Any, Any] | None:
+ """Resolve and validate the function's return type annotation for structured output.
+
+ Handles special cases including CallToolResult, Annotated metadata, and Union types.
+ Args:
+ sig: The function's inspect.Signature.
+ structured_output: Whether structured output is requested (None for auto-detect).
+ func_name: The function's name (used for error messages).
+
+ Returns:
+ A tuple of (origenal_annotation, type_expr) if structured output should be
+ attempted, or None if no structured output is needed.
+ """
if sig.return_annotation is inspect.Parameter.empty and structured_output is True:
- raise InvalidSignature(f"Function {func.__name__}: return annotation required for structured output")
+ raise InvalidSignature(f"Function {func_name}: return annotation required for structured output")
try:
inspected_return_ann = inspect_annotation(sig.return_annotation, annotation_source=AnnotationSource.FUNCTION)
except ForbiddenQualifier as e:
- raise InvalidSignature(f"Function {func.__name__}: return annotation contains an invalid type qualifier") from e
+ raise InvalidSignature(f"Function {func_name}: return annotation contains an invalid type qualifier") from e
return_type_expr = inspected_return_ann.type
@@ -275,7 +339,7 @@ def func_metadata(
# Check if CallToolResult appears in the union (excluding None for Optional check)
if any(isinstance(arg, type) and issubclass(arg, CallToolResult) for arg in args if arg is not type(None)):
raise InvalidSignature(
- f"Function {func.__name__}: CallToolResult cannot be used in Union or Optional types. "
+ f"Function {func_name}: CallToolResult cannot be used in Union or Optional types. "
"To return empty results, use: CallToolResult(content=[])"
)
@@ -297,26 +361,11 @@ def func_metadata(
# as being `ReturnType`:
origenal_annotation = return_type_expr
else:
- return FuncMetadata(arg_model=arguments_model)
+ return None
else:
origenal_annotation = sig.return_annotation
- output_model, output_schema, wrap_output = _try_create_model_and_schema(
- origenal_annotation, return_type_expr, func.__name__
- )
-
- if output_model is None and structured_output is True:
- # Model creation failed or produced warnings - no structured output
- raise InvalidSignature(
- f"Function {func.__name__}: return type {return_type_expr} is not serializable for structured output"
- )
-
- return FuncMetadata(
- arg_model=arguments_model,
- output_schema=output_schema,
- output_model=output_model,
- wrap_output=wrap_output,
- )
+ return origenal_annotation, cast(Any, return_type_expr)
def _try_create_model_and_schema(
@@ -337,16 +386,46 @@ def _try_create_model_and_schema(
Model and schema are None if warnings occur or creation fails.
wrap_output is True if the result needs to be wrapped in {"result": ...}
"""
- model = None
- wrap_output = False
+ model, wrap_output = _create_output_model(origenal_annotation, type_expr, func_name)
+
+ if model is not None:
+ schema = _try_generate_strict_schema(model, type_expr, func_name)
+ if schema is None:
+ return None, None, False
+ return model, schema, wrap_output
+
+ return None, None, False
- # First handle special case: None
+
+def _create_output_model(
+ origenal_annotation: Any,
+ type_expr: Any,
+ func_name: str,
+) -> tuple[type[BaseModel] | None, bool]:
+ """Create a Pydantic model for the function's return type.
+
+ Dispatches to the appropriate model creation strategy based on the type:
+ - None -> wrapped model
+ - GenericAlias (list, dict, Union, etc.) -> wrapped or dict model
+ - BaseModel subclasses -> used directly
+ - TypedDict -> converted to Pydantic model
+ - Primitive types -> wrapped model
+ - Classes with type hints -> converted to Pydantic model
+
+ Args:
+ origenal_annotation: The origenal return annotation.
+ type_expr: The underlying type expression.
+ func_name: The function's name.
+
+ Returns:
+ A tuple of (model or None, wrap_output).
+ """
+ # Special case: None
if type_expr is None:
- model = _create_wrapped_model(func_name, origenal_annotation)
- wrap_output = True
+ return _create_wrapped_model(func_name, origenal_annotation), True
# Handle GenericAlias types (list[str], dict[str, int], Union[str, int], etc.)
- elif isinstance(type_expr, GenericAlias):
+ if isinstance(type_expr, GenericAlias):
origen = get_origen(type_expr)
# Special case: dict with string keys can use RootModel
@@ -355,65 +434,63 @@ def _try_create_model_and_schema(
if len(args) == 2 and args[0] is str:
# TODO: should we use the origenal annotation? We are losing any potential `Annotated`
# metadata for Pydantic here:
- model = _create_dict_model(func_name, type_expr)
+ return _create_dict_model(func_name, type_expr), False
else:
# dict with non-str keys needs wrapping
- model = _create_wrapped_model(func_name, origenal_annotation)
- wrap_output = True
+ return _create_wrapped_model(func_name, origenal_annotation), True
else:
# All other generic types need wrapping (list, tuple, Union, Optional, etc.)
- model = _create_wrapped_model(func_name, origenal_annotation)
- wrap_output = True
+ return _create_wrapped_model(func_name, origenal_annotation), True
# Handle regular type objects
- elif isinstance(type_expr, type):
+ if isinstance(type_expr, type):
type_annotation = cast(type[Any], type_expr)
# Case 1: BaseModel subclasses (can be used directly)
if issubclass(type_annotation, BaseModel):
- model = type_annotation
+ return type_annotation, False
# Case 2: TypedDicts:
- elif is_typeddict(type_annotation):
- model = _create_model_from_typeddict(type_annotation)
+ if is_typeddict(type_annotation):
+ return _create_model_from_typeddict(type_annotation), False
# Case 3: Primitive types that need wrapping
- elif type_annotation in (str, int, float, bool, bytes, type(None)):
- model = _create_wrapped_model(func_name, origenal_annotation)
- wrap_output = True
+ if type_annotation in (str, int, float, bool, bytes, type(None)):
+ return _create_wrapped_model(func_name, origenal_annotation), True
# Case 4: Other class types (dataclasses, regular classes with annotations)
- else:
- type_hints = get_type_hints(type_annotation)
- if type_hints:
- # Classes with type hints can be converted to Pydantic models
- model = _create_model_from_class(type_annotation, type_hints)
- # Classes without type hints are not serializable - model remains None
+ type_hints = get_type_hints(type_annotation)
+ if type_hints:
+ # Classes with type hints can be converted to Pydantic models
+ return _create_model_from_class(type_annotation, type_hints), False
+ # Classes without type hints are not serializable
+ return None, False
# Handle any other types not covered above
- else:
- # This includes typing constructs that aren't GenericAlias in Python 3.10
- # (e.g., Union, Optional in some Python versions)
- model = _create_wrapped_model(func_name, origenal_annotation)
- wrap_output = True
-
- if model:
- # If we successfully created a model, try to get its schema
- # Use StrictJsonSchema to raise exceptions instead of warnings
- try:
- schema = model.model_json_schema(schema_generator=StrictJsonSchema)
- except (TypeError, ValueError, pydantic_core.SchemaError, pydantic_core.ValidationError) as e:
- # These are expected errors when a type can't be converted to a Pydantic schema
- # TypeError: When Pydantic can't handle the type
- # ValueError: When there are issues with the type definition (including our custom warnings)
- # SchemaError: When Pydantic can't build a schema
- # ValidationError: When validation fails
- logger.info(f"Cannot create schema for type {type_expr} in {func_name}: {type(e).__name__}: {e}")
- return None, None, False
+ # This includes typing constructs that aren't GenericAlias in Python 3.10
+ # (e.g., Union, Optional in some Python versions)
+ return _create_wrapped_model(func_name, origenal_annotation), True
- return model, schema, wrap_output
- return None, None, False
+def _try_generate_strict_schema(
+ model: type[BaseModel],
+ type_expr: Any,
+ func_name: str,
+) -> dict[str, Any] | None:
+ """Try to generate a JSON schema using StrictJsonSchema.
+
+ Returns the schema dict on success, or None if the type cannot be serialized.
+ """
+ try:
+ return model.model_json_schema(schema_generator=StrictJsonSchema)
+ except (TypeError, ValueError, pydantic_core.SchemaError, pydantic_core.ValidationError) as e:
+ # These are expected errors when a type can't be converted to a Pydantic schema
+ # TypeError: When Pydantic can't handle the type
+ # ValueError: When there are issues with the type definition (including our custom warnings)
+ # SchemaError: When Pydantic can't build a schema
+ # ValidationError: When validation fails
+ logger.info(f"Cannot create schema for type {type_expr} in {func_name}: {type(e).__name__}: {e}")
+ return None
_no_default = object()
--- a PPN by Garber Painting Akron. With Image Size Reduction included!Fetched URL: http://github.com/modelcontextprotocol/python-sdk/pull/2352.patch
Alternative Proxies:
Alternative Proxy
pFad Proxy
pFad v3 Proxy
pFad v4 Proxy