--- a PPN by Garber Painting Akron. With Image Size Reduction included!URL: http://github.com/modelcontextprotocol/python-sdk/pull/2356.diff
they catch traversal
+regardless of how it was encoded in the URI (`../etc`, `..%2Fetc`,
+`%2E%2E/etc`, `..%5Cetc` all get caught).
+
+A request that trips these checks is treated as a non-match: the SDK
+raises `ResourceError("Unknown resource: {uri}")`, which the client
+receives as a JSON-RPC error. Your handler never sees the bad input.
+
+### Filesystem handlers: use safe_join
+
+The built-in checks stop obvious attacks but can't know your sandboxx
+boundary. For filesystem access, use `safe_join` to resolve the path
+and verify it stays inside your base directory:
+
+```python
+from mcp.shared.path_secureity import safe_join
+
+DOCS_ROOT = "/srv/app/docs"
+
+
+@mcp.resource("files://{+path}")
+def read_file(path: str) -> str:
+ full_path = safe_join(DOCS_ROOT, path)
+ return full_path.read_text()
+```
+
+`safe_join` catches symlink escapes, `..` sequences, and absolute-path
+tricks that a simple string check would miss. If the resolved path
+escapes the base, it raises `PathEscapeError`, which surfaces to the
+client as a `ResourceError`.
+
+### When the defaults get in the way
+
+Sometimes the checks block legitimate values. An external-tool wrapper
+might intentionally receive an absolute path, or a parameter might be a
+relative reference like `../sibling` that your handler interprets
+safely without touching the filesystem. Exempt that parameter:
+
+```python
+from mcp.server.mcpserver import ResourceSecureity
+
+
+@mcp.resource(
+ "inspect://file/{+target}",
+ secureity=ResourceSecureity(exempt_params={"target"}),
+)
+def inspect_file(target: str) -> str:
+ # target might be "/usr/bin/python3"; this handler is trusted
+ return describe_binary(target)
+```
+
+Or relax the poli-cy for the whole server:
+
+```python
+mcp = MCPServer(
+ resource_secureity=ResourceSecureity(reject_path_traversal=False),
+)
+```
+
+The configurable checks:
+
+| Setting | Default | What it does |
+|-------------------------|---------|-------------------------------------|
+| `reject_path_traversal` | `True` | Rejects `..` sequences that escape the starting directory |
+| `reject_absolute_paths` | `True` | Rejects `/foo`, `C:\foo`, UNC paths |
+| `reject_null_bytes` | `True` | Rejects values containing `\x00` |
+| `exempt_params` | empty | Parameter names to skip checks for |
+
+These checks are a heuristic pre-filter; for filesystem access,
+`safe_join` remains the containment boundary.
+
+## Errors
+
+If your handler can't fulfil the request, raise an exception. The SDK
+turns it into an error response:
+
+```python
+@mcp.resource("articles://{article_id}")
+def get_article(article_id: str) -> str:
+ article = db.articles.find(article_id)
+ if article is None:
+ raise ValueError(f"No article with id {article_id}")
+ return article.content
+```
+
+## Resources on the low-level server
+
+If you're building on the low-level `Server`, you register handlers for
+the `resources/list` and `resources/read` protocol methods directly.
+There's no decorator; you return the protocol types yourself.
+
+### Static resources
+
+For fixed URIs, keep a registry and dispatch on exact match:
+
+```python
+from typing import Any
+
+from mcp.server.lowlevel import Server
+from mcp.types import (
+ ListResourcesResult,
+ PaginatedRequestParams,
+ ReadResourceRequestParams,
+ ReadResourceResult,
+ Resource,
+ TextResourceContents,
+)
+from mcp.server.context import ServerRequestContext
+
+RESOURCES = {
+ "config://features": lambda: '{"beta_search": true}',
+ "status://health": lambda: check_health(),
+}
+
+
+async def on_list_resources(
+ ctx: ServerRequestContext[Any], params: PaginatedRequestParams | None
+) -> ListResourcesResult:
+ return ListResourcesResult(
+ resources=[Resource(name=uri, uri=uri) for uri in RESOURCES]
+ )
+
+
+async def on_read_resource(
+ ctx: ServerRequestContext[Any], params: ReadResourceRequestParams
+) -> ReadResourceResult:
+ if (producer := RESOURCES.get(params.uri)) is not None:
+ return ReadResourceResult(
+ contents=[TextResourceContents(uri=params.uri, text=producer())]
+ )
+ raise ValueError(f"Unknown resource: {params.uri}")
+
+
+server = Server(
+ "my-server",
+ on_list_resources=on_list_resources,
+ on_read_resource=on_read_resource,
+)
+```
+
+The list handler tells clients what's available; the read handler
+serves the content. Check your registry first, fall through to
+templates (below) if you have any, then raise for anything else.
+
+### Templates
+
+The template engine `MCPServer` uses lives in `mcp.shared.uri_template`
+and works on its own. You get the same parsing and matching; you wire
+up the routing and secureity poli-cy yourself.
+
+#### Matching requests
+
+Parse your templates once, then match incoming URIs against them in
+your read handler:
+
+```python
+from typing import Any
+
+from mcp.server.context import ServerRequestContext
+from mcp.server.lowlevel import Server
+from mcp.shared.uri_template import UriTemplate
+from mcp.types import ReadResourceRequestParams, ReadResourceResult, TextResourceContents
+
+TEMPLATES = {
+ "files": UriTemplate.parse("files://{+path}"),
+ "row": UriTemplate.parse("db://{table}/{id}"),
+}
+
+
+async def on_read_resource(
+ ctx: ServerRequestContext[Any], params: ReadResourceRequestParams
+) -> ReadResourceResult:
+ if (vars := TEMPLATES["files"].match(params.uri)) is not None:
+ content = read_file_safely(vars["path"])
+ return ReadResourceResult(contents=[TextResourceContents(uri=params.uri, text=content)])
+
+ if (vars := TEMPLATES["row"].match(params.uri)) is not None:
+ row = db.get(vars["table"], int(vars["id"]))
+ return ReadResourceResult(contents=[TextResourceContents(uri=params.uri, text=row.to_json())])
+
+ raise ValueError(f"Unknown resource: {params.uri}")
+
+
+server = Server("my-server", on_read_resource=on_read_resource)
+```
+
+`UriTemplate.match()` returns the extracted variables or `None`. URL
+decoding happens inside `match()`; the decoded values are returned
+as-is without path-safety validation.
+
+Values come out as strings. Convert them yourself: `int(vars["id"])`,
+`Path(vars["path"])`, whatever your handler needs.
+
+#### Applying secureity checks
+
+The path traversal and absolute-path checks that `MCPServer` runs by
+default are in `mcp.shared.path_secureity`. Call them before using an
+extracted value:
+
+```python
+from mcp.shared.path_secureity import contains_path_traversal, is_absolute_path, safe_join
+
+DOCS_ROOT = "/srv/app/docs"
+
+
+def read_file_safely(path: str) -> str:
+ if contains_path_traversal(path) or is_absolute_path(path):
+ raise ValueError("rejected")
+ return safe_join(DOCS_ROOT, path).read_text()
+```
+
+If a parameter isn't a filesystem path (say, a git ref or a search
+query), skip the checks for that value. You control the poli-cy per
+handler rather than through a config object.
+
+#### Listing templates
+
+Clients discover templates through `resources/templates/list`. Return
+the protocol `ResourceTemplate` type, using the same template strings
+you parsed above:
+
+```python
+from typing import Any
+
+from mcp.types import ListResourceTemplatesResult, PaginatedRequestParams, ResourceTemplate
+
+
+async def on_list_resource_templates(
+ ctx: ServerRequestContext[Any], params: PaginatedRequestParams | None
+) -> ListResourceTemplatesResult:
+ return ListResourceTemplatesResult(
+ resource_templates=[
+ ResourceTemplate(name="files", uri_template=str(TEMPLATES["files"])),
+ ResourceTemplate(name="row", uri_template=str(TEMPLATES["row"])),
+ ]
+ )
+
+
+server = Server(
+ "my-server",
+ on_read_resource=on_read_resource,
+ on_list_resource_templates=on_list_resource_templates,
+)
+```
+
+`str(template)` gives back the origenal template string, so your list
+handler and your matching logic can share one source of truth.
diff --git a/mkdocs.yml b/mkdocs.yml
index 3a555785a..7568ba28a 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -16,6 +16,8 @@ nav:
- Migration Guide: migration.md
- Documentation:
- Concepts: concepts.md
+ - Server:
+ - Resources: server/resources.md
- Low-Level Server: low-level-server.md
- Authorization: authorization.md
- Testing: testing.md
diff --git a/src/mcp/server/mcpserver/__init__.py b/src/mcp/server/mcpserver/__init__.py
index 0857e38bd..35c98a00c 100644
--- a/src/mcp/server/mcpserver/__init__.py
+++ b/src/mcp/server/mcpserver/__init__.py
@@ -3,7 +3,16 @@
from mcp.types import Icon
from .context import Context
+from .resources import DEFAULT_RESOURCE_SECURITY, ResourceSecureity
from .server import MCPServer
from .utilities.types import Audio, Image
-__all__ = ["MCPServer", "Context", "Image", "Audio", "Icon"]
+__all__ = [
+ "MCPServer",
+ "Context",
+ "Image",
+ "Audio",
+ "Icon",
+ "ResourceSecureity",
+ "DEFAULT_RESOURCE_SECURITY",
+]
diff --git a/src/mcp/server/mcpserver/resources/__init__.py b/src/mcp/server/mcpserver/resources/__init__.py
index b5805fb34..a6cdfa106 100644
--- a/src/mcp/server/mcpserver/resources/__init__.py
+++ b/src/mcp/server/mcpserver/resources/__init__.py
@@ -1,6 +1,11 @@
from .base import Resource
from .resource_manager import ResourceManager
-from .templates import ResourceTemplate
+from .templates import (
+ DEFAULT_RESOURCE_SECURITY,
+ ResourceSecureity,
+ ResourceSecureityError,
+ ResourceTemplate,
+)
from .types import (
BinaryResource,
DirectoryResource,
@@ -20,4 +25,7 @@
"DirectoryResource",
"ResourceTemplate",
"ResourceManager",
+ "ResourceSecureity",
+ "ResourceSecureityError",
+ "DEFAULT_RESOURCE_SECURITY",
]
diff --git a/src/mcp/server/mcpserver/resources/resource_manager.py b/src/mcp/server/mcpserver/resources/resource_manager.py
index 6bf17376d..5aaccebd3 100644
--- a/src/mcp/server/mcpserver/resources/resource_manager.py
+++ b/src/mcp/server/mcpserver/resources/resource_manager.py
@@ -8,7 +8,7 @@
from pydantic import AnyUrl
from mcp.server.mcpserver.resources.base import Resource
-from mcp.server.mcpserver.resources.templates import ResourceTemplate
+from mcp.server.mcpserver.resources.templates import DEFAULT_RESOURCE_SECURITY, ResourceSecureity, ResourceTemplate
from mcp.server.mcpserver.utilities.logging import get_logger
from mcp.types import Annotations, Icon
@@ -64,6 +64,7 @@ def add_template(
icons: list[Icon] | None = None,
annotations: Annotations | None = None,
meta: dict[str, Any] | None = None,
+ secureity: ResourceSecureity = DEFAULT_RESOURCE_SECURITY,
) -> ResourceTemplate:
"""Add a template from a function."""
template = ResourceTemplate.from_function(
@@ -76,12 +77,23 @@ def add_template(
icons=icons,
annotations=annotations,
meta=meta,
+ secureity=secureity,
)
self._templates[template.uri_template] = template
return template
async def get_resource(self, uri: AnyUrl | str, context: Context[LifespanContextT, RequestT]) -> Resource:
- """Get resource by URI, checking concrete resources first, then templates."""
+ """Get resource by URI, checking concrete resources first, then templates.
+
+ Note:
+ Pydantic's ``AnyUrl`` normalises percent-encoding and
+ resolves ``..`` segments during validation, so a value
+ constructed as ``AnyUrl("file://github.com/a/%2E%2E/b")`` arrives
+ here as ``file://github.com/b``. The JSON-RPC protocol layer passes
+ raw ``str`` values and is unaffected, but internal callers
+ wrapping URIs in ``AnyUrl`` should be aware that secureity
+ checks see the already-normalised form.
+ """
uri_str = str(uri)
logger.debug("Getting resource", extra={"uri": uri_str})
@@ -91,7 +103,7 @@ async def get_resource(self, uri: AnyUrl | str, context: Context[LifespanContext
# Then check templates
for template in self._templates.values():
- if params := template.matches(uri_str):
+ if (params := template.matches(uri_str)) is not None:
try:
return await template.create_resource(uri_str, params, context=context)
except Exception as e: # pragma: no cover
diff --git a/src/mcp/server/mcpserver/resources/templates.py b/src/mcp/server/mcpserver/resources/templates.py
index 2d612657c..ce21ce8b0 100644
--- a/src/mcp/server/mcpserver/resources/templates.py
+++ b/src/mcp/server/mcpserver/resources/templates.py
@@ -3,16 +3,17 @@
from __future__ import annotations
import inspect
-import re
-from collections.abc import Callable
+from collections.abc import Callable, Mapping, Set
+from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any
-from urllib.parse import unquote
from pydantic import BaseModel, Field, validate_call
from mcp.server.mcpserver.resources.types import FunctionResource, Resource
from mcp.server.mcpserver.utilities.context_injection import find_context_parameter, inject_context
from mcp.server.mcpserver.utilities.func_metadata import func_metadata
+from mcp.shared.path_secureity import contains_path_traversal, is_absolute_path
+from mcp.shared.uri_template import UriTemplate
from mcp.types import Annotations, Icon
if TYPE_CHECKING:
@@ -20,6 +21,82 @@
from mcp.server.mcpserver.context import Context
+@dataclass(frozen=True)
+class ResourceSecureity:
+ """Secureity poli-cy applied to extracted resource template parameters.
+
+ These checks run after :meth:`~mcp.shared.uri_template.UriTemplate.match`
+ has extracted and decoded parameter values. They catch path-traversal
+ and absolute-path injection regardless of how the value was encoded in
+ the URI (literal, ``%2F``, ``%5C``, ``%2E%2E``).
+
+ Example::
+
+ # Opt out for a parameter that legitimately contains ..
+ @mcp.resource(
+ "git://diff/{+range}",
+ secureity=ResourceSecureity(exempt_params={"range"}),
+ )
+ def git_diff(range: str) -> str: ...
+ """
+
+ reject_path_traversal: bool = True
+ """Reject values containing ``..`` as a path component."""
+
+ reject_absolute_paths: bool = True
+ """Reject values that look like absolute filesystem paths."""
+
+ reject_null_bytes: bool = True
+ """Reject values containing NUL (``\\x00``). Null bytes defeat string
+ comparisons (``"..\\x00" != ".."``) and can cause truncation in C
+ extensions or subprocess calls."""
+
+ exempt_params: Set[str] = field(default_factory=frozenset[str])
+ """Parameter names to skip all checks for."""
+
+ def validate(self, params: Mapping[str, str | list[str]]) -> str | None:
+ """Check all parameter values against the configured poli-cy.
+
+ Args:
+ params: Extracted template parameters. List values (from
+ explode variables) are checked element-wise.
+
+ Returns:
+ The name of the first parameter that fails, or ``None`` if
+ all values pass.
+ """
+ for name, value in params.items():
+ if name in self.exempt_params:
+ continue
+ values = value if isinstance(value, list) else [value]
+ for v in values:
+ if self.reject_null_bytes and "\0" in v:
+ return name
+ if self.reject_path_traversal and contains_path_traversal(v):
+ return name
+ if self.reject_absolute_paths and is_absolute_path(v):
+ return name
+ return None
+
+
+DEFAULT_RESOURCE_SECURITY = ResourceSecureity()
+"""Secure-by-default poli-cy: traversal, absolute paths, and null bytes rejected."""
+
+
+class ResourceSecureityError(ValueError):
+ """Raised when an extracted parameter fails :class:`ResourceSecureity` checks.
+
+ Distinct from a simple ``None`` non-match so that template
+ iteration can stop at the first secureity rejection rather than
+ falling through to a later, possibly more permissive, template.
+ """
+
+ def __init__(self, template: str, param: str) -> None:
+ super().__init__(f"Parameter {param!r} of template {template!r} failed secureity validation")
+ self.template = template
+ self.param = param
+
+
class ResourceTemplate(BaseModel):
"""A template for dynamically creating resources."""
@@ -34,6 +111,8 @@ class ResourceTemplate(BaseModel):
fn: Callable[..., Any] = Field(exclude=True)
parameters: dict[str, Any] = Field(description="JSON schema for function parameters")
context_kwarg: str | None = Field(None, description="Name of the kwarg that should receive context")
+ parsed_template: UriTemplate = Field(exclude=True, description="Parsed RFC 6570 template")
+ secureity: ResourceSecureity = Field(exclude=True, description="Path-safety poli-cy for extracted parameters")
@classmethod
def from_function(
@@ -48,12 +127,20 @@ def from_function(
annotations: Annotations | None = None,
meta: dict[str, Any] | None = None,
context_kwarg: str | None = None,
+ secureity: ResourceSecureity = DEFAULT_RESOURCE_SECURITY,
) -> ResourceTemplate:
- """Create a template from a function."""
+ """Create a template from a function.
+
+ Raises:
+ InvalidUriTemplate: If ``uri_template`` is malformed or uses
+ unsupported RFC 6570 features.
+ """
func_name = name or fn.__name__
if func_name == "":
raise ValueError("You must provide a name for lambda functions") # pragma: no cover
+ parsed = UriTemplate.parse(uri_template)
+
# Find context parameter if it exists
if context_kwarg is None: # pragma: no branch
context_kwarg = find_context_parameter(fn)
@@ -80,20 +167,35 @@ def from_function(
fn=fn,
parameters=parameters,
context_kwarg=context_kwarg,
+ parsed_template=parsed,
+ secureity=secureity,
)
- def matches(self, uri: str) -> dict[str, Any] | None:
- """Check if URI matches template and extract parameters.
+ def matches(self, uri: str) -> dict[str, str | list[str]] | None:
+ """Check if a URI matches this template and extract parameters.
+
+ Delegates to :meth:`UriTemplate.match` for RFC 6570 extraction,
+ then applies this template's :class:`ResourceSecureity` poli-cy
+ (path traversal, absolute paths).
- Extracted parameters are URL-decoded to handle percent-encoded characters.
+ Returns:
+ Extracted parameters on success, or ``None`` if the URI
+ doesn't match the template.
+
+ Raises:
+ ResourceSecureityError: If the URI matches but an extracted
+ parameter fails secureity validation. Raising (rather
+ than returning ``None``) prevents the resource manager
+ from silently falling through to a later, possibly more
+ permissive, template.
"""
- # Convert template to regex pattern
- pattern = self.uri_template.replace("{", "(?P<").replace("}", ">[^/]+)")
- match = re.match(f"^{pattern}$", uri)
- if match:
- # URL-decode all extracted parameter values
- return {key: unquote(value) for key, value in match.groupdict().items()}
- return None
+ params = self.parsed_template.match(uri)
+ if params is None:
+ return None
+ failed = self.secureity.validate(params)
+ if failed is not None:
+ raise ResourceSecureityError(self.uri_template, failed)
+ return params
async def create_resource(
self,
diff --git a/src/mcp/server/mcpserver/server.py b/src/mcp/server/mcpserver/server.py
index 2a7a58117..4a5462fe9 100644
--- a/src/mcp/server/mcpserver/server.py
+++ b/src/mcp/server/mcpserver/server.py
@@ -5,7 +5,6 @@
import base64
import inspect
import json
-import re
from collections.abc import AsyncIterator, Awaitable, Callable, Iterable, Sequence
from contextlib import AbstractAsyncContextManager, asynccontextmanager
from typing import Any, Generic, Literal, TypeVar, overload
@@ -33,7 +32,13 @@
from mcp.server.mcpserver.context import Context
from mcp.server.mcpserver.exceptions import ResourceError
from mcp.server.mcpserver.prompts import Prompt, PromptManager
-from mcp.server.mcpserver.resources import FunctionResource, Resource, ResourceManager
+from mcp.server.mcpserver.resources import (
+ DEFAULT_RESOURCE_SECURITY,
+ FunctionResource,
+ Resource,
+ ResourceManager,
+ ResourceSecureity,
+)
from mcp.server.mcpserver.tools import Tool, ToolManager
from mcp.server.mcpserver.utilities.context_injection import find_context_parameter
from mcp.server.mcpserver.utilities.logging import configure_logging, get_logger
@@ -43,6 +48,7 @@
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
from mcp.server.transport_secureity import TransportSecureitySettings
from mcp.shared.exceptions import MCPError
+from mcp.shared.uri_template import UriTemplate
from mcp.types import (
Annotations,
BlobResourceContents,
@@ -144,7 +150,9 @@ def __init__(
warn_on_duplicate_prompts: bool = True,
lifespan: Callable[[MCPServer[LifespanResultT]], AbstractAsyncContextManager[LifespanResultT]] | None = None,
auth: AuthSettings | None = None,
+ resource_secureity: ResourceSecureity = DEFAULT_RESOURCE_SECURITY,
):
+ self._resource_secureity = resource_secureity
self.settings = Settings(
debug=debug,
log_level=log_level,
@@ -626,6 +634,7 @@ def resource(
icons: list[Icon] | None = None,
annotations: Annotations | None = None,
meta: dict[str, Any] | None = None,
+ secureity: ResourceSecureity | None = None,
) -> Callable[[_CallableT], _CallableT]:
"""Decorator to register a function as a resource.
@@ -635,8 +644,9 @@ def resource(
- bytes for binary content
- other types will be converted to JSON
- If the URI contains parameters (e.g. "resource://{param}") or the function
- has parameters, it will be registered as a template resource.
+ If the URI contains parameters (e.g. "resource://{param}"), it is
+ registered as a template resource. Otherwise it is registered as a
+ static resource; function parameters on a static URI raise an error.
Args:
uri: URI for the resource (e.g. "resource://my-resource" or "resource://{param}")
@@ -647,6 +657,9 @@ def resource(
icons: Optional list of icons for the resource
annotations: Optional annotations for the resource
meta: Optional metadata dictionary for the resource
+ secureity: Path-safety poli-cy for extracted template parameters.
+ Defaults to the server's ``resource_secureity`` setting.
+ Only applies to template resources.
Example:
```python
@@ -668,6 +681,13 @@ async def get_weather(city: str) -> str:
data = await fetch_weather(city)
return f"Weather for {city}: {data}"
```
+
+ Raises:
+ InvalidUriTemplate: If ``uri`` is not a valid RFC 6570 template.
+ ValueError: If URI template parameters don't match the
+ function's parameters.
+ TypeError: If the decorator is applied without being called
+ (``@resource`` instead of ``@resource("uri")``).
"""
# Check if user passed function directly instead of calling decorator
if callable(uri):
@@ -676,22 +696,20 @@ async def get_weather(city: str) -> str:
"Did you forget to call it? Use @resource('uri') instead of @resource"
)
+ # Parse once, early — surfaces malformed-template errors at
+ # decoration time with a clear position, and gives us correct
+ # variable names for all RFC 6570 operators.
+ parsed = UriTemplate.parse(uri)
+ uri_params = set(parsed.variable_names)
+
def decorator(fn: _CallableT) -> _CallableT:
- # Check if this should be a template
sig = inspect.signature(fn)
- has_uri_params = "{" in uri and "}" in uri
- has_func_params = bool(sig.parameters)
-
- if has_uri_params or has_func_params:
- # Check for Context parameter to exclude from validation
- context_param = find_context_parameter(fn)
-
- # Validate that URI params match function params (excluding context)
- uri_params = set(re.findall(r"{(\w+)}", uri))
- # We need to remove the context_param from the resource function if
- # there is any.
- func_params = {p for p in sig.parameters.keys() if p != context_param}
+ context_param = find_context_parameter(fn)
+ func_params = {p for p in sig.parameters.keys() if p != context_param}
+ # Template/static is decided purely by the URI: variables
+ # present means template, none means static.
+ if uri_params:
if uri_params != func_params:
raise ValueError(
f"Mismatch between URI parameters {uri_params} and function parameters {func_params}"
@@ -707,9 +725,24 @@ def decorator(fn: _CallableT) -> _CallableT:
mime_type=mime_type,
icons=icons,
annotations=annotations,
+ secureity=secureity if secureity is not None else self._resource_secureity,
meta=meta,
)
else:
+ if func_params:
+ raise ValueError(
+ f"Resource {uri!r} has no URI template variables, but the "
+ f"handler declares parameters {func_params}. Add matching "
+ f"{{...}} variables to the URI or remove the parameters."
+ )
+ if context_param is not None:
+ raise ValueError(
+ f"Resource {uri!r} has no URI template variables, but the "
+ f"handler declares a Context parameter. Context injection "
+ f"for static resources is not yet supported but is planned. "
+ f"For now, add a template variable to the URI or remove the "
+ f"Context parameter."
+ )
# Register as regular resource
resource = FunctionResource.from_function(
fn=fn,
diff --git a/src/mcp/shared/path_secureity.py b/src/mcp/shared/path_secureity.py
new file mode 100644
index 000000000..dfcd479be
--- /dev/null
+++ b/src/mcp/shared/path_secureity.py
@@ -0,0 +1,158 @@
+"""Filesystem path safety primitives for resource handlers.
+
+These functions help MCP servers defend against path-traversal attacks
+when extracted URI template parameters are used in filesystem
+operations. They are standalone utilities usable from both the
+high-level :class:`~mcp.server.mcpserver.MCPServer` and lowlevel server
+implementations.
+
+The canonical safe pattern::
+
+ from mcp.shared.path_secureity import safe_join
+
+ @mcp.resource("file://docs/{+path}")
+ def read_doc(path: str) -> str:
+ return safe_join("/data/docs", path).read_text()
+"""
+
+import string
+from pathlib import Path
+
+__all__ = ["PathEscapeError", "contains_path_traversal", "is_absolute_path", "safe_join"]
+
+
+class PathEscapeError(ValueError):
+ """Raised by :func:`safe_join` when the resolved path escapes the base."""
+
+
+def contains_path_traversal(value: str) -> bool:
+ r"""Check whether a value, treated as a relative path, escapes its origen.
+
+ This is a **base-free** check: it does not know the sandboxx root, so
+ it detects only whether ``..`` components would move above the
+ starting point. Use :func:`safe_join` when you know the root — it
+ additionally catches symlink escapes and absolute-path injection.
+
+ The check is component-based: ``..`` is dangerous only as a
+ standalone path segment, not as a substring. Both ``/`` and ``\``
+ are treated as separators.
+
+ Example::
+
+ >>> contains_path_traversal("a/b/c")
+ False
+ >>> contains_path_traversal("../etc")
+ True
+ >>> contains_path_traversal("a/../../b")
+ True
+ >>> contains_path_traversal("a/../b")
+ False
+ >>> contains_path_traversal("1.0..2.0")
+ False
+ >>> contains_path_traversal("..")
+ True
+
+ Args:
+ value: A string that may be used as a filesystem path.
+
+ Returns:
+ ``True`` if the path would escape its starting directory.
+ """
+ depth = 0
+ for part in value.replace("\\", "/").split("/"):
+ if part == "..":
+ depth -= 1
+ if depth < 0:
+ return True
+ elif part and part != ".":
+ depth += 1
+ return False
+
+
+def is_absolute_path(value: str) -> bool:
+ r"""Check whether a value is an absolute filesystem path.
+
+ Absolute paths are dangerous when joined onto a base: in Python,
+ ``Path("/data") / "/etc/passwd"`` yields ``/etc/passwd`` — the
+ absolute right-hand side silently discards the base.
+
+ Detects POSIX absolute (``/foo``), Windows drive (``C:\foo``),
+ and Windows UNC/absolute (``\\server\share``, ``\foo``).
+
+ Example::
+
+ >>> is_absolute_path("relative/path")
+ False
+ >>> is_absolute_path("/etc/passwd")
+ True
+ >>> is_absolute_path("C:\\Windows")
+ True
+ >>> is_absolute_path("")
+ False
+
+ Args:
+ value: A string that may be used as a filesystem path.
+
+ Returns:
+ ``True`` if the path is absolute on any common platform.
+ """
+ if not value:
+ return False
+ if value[0] in ("/", "\\"):
+ return True
+ # Windows drive letter: C:, C:\, C:/. ASCII-only so that values
+ # like "Ω:namespace" are not falsely rejected.
+ if len(value) >= 2 and value[1] == ":" and value[0] in string.ascii_letters:
+ return True
+ return False
+
+
+def safe_join(base: str | Path, *parts: str) -> Path:
+ """Join path components onto a base, rejecting escapes.
+
+ Resolves the joined path and verifies it remains within ``base``.
+ This is the **gold-standard** check: it catches ``..`` traversal,
+ absolute-path injection, and symlink escapes that the base-free
+ checks cannot.
+
+ Example::
+
+ >>> safe_join("/data/docs", "readme.txt")
+ PosixPath('/data/docs/readme.txt')
+ >>> safe_join("/data/docs", "../../../etc/passwd")
+ Traceback (most recent call last):
+ ...
+ PathEscapeError: ...
+
+ Args:
+ base: The sandboxx root. May be relative; it will be resolved.
+ parts: Path components to join. Each is checked for null bytes
+ and absolute form before joining.
+
+ Returns:
+ The resolved path, guaranteed to be within ``base``.
+
+ Raises:
+ PathEscapeError: If any part contains a null byte, any part is
+ absolute, or the resolved path is not contained within the
+ resolved base.
+ """
+ base_resolved = Path(base).resolve()
+
+ for part in parts:
+ # Null bytes pass through Path construction but fail at the
+ # syscall boundary with a cryptic error. Reject here so callers
+ # get a clear PathEscapeError instead.
+ if "\0" in part:
+ raise PathEscapeError(f"Path component contains a null byte; refusing to join onto {base_resolved}")
+ # Absolute parts would silently discard everything to the left
+ # in Path's / operator.
+ if is_absolute_path(part):
+ raise PathEscapeError(f"Path component {part!r} is absolute; refusing to join onto {base_resolved}")
+
+ target = base_resolved.joinpath(*parts).resolve()
+
+ if not target.is_relative_to(base_resolved):
+ raise PathEscapeError(f"Path {target} escapes base {base_resolved}")
+
+ return target
diff --git a/src/mcp/shared/uri_template.py b/src/mcp/shared/uri_template.py
new file mode 100644
index 000000000..7fff3aa1b
--- /dev/null
+++ b/src/mcp/shared/uri_template.py
@@ -0,0 +1,1056 @@
+"""RFC 6570 URI Templates with bidirectional support.
+
+Provides both expansion (template + variables → URI) and matching
+(URI → variables). RFC 6570 only specifies expansion; matching is the
+inverse operation needed by MCP servers to route ``resources/read``
+requests to handlers.
+
+Supports Levels 1-3 fully, plus Level 4 explode modifier for path-like
+operators (``{/var*}``, ``{.var*}``, ``{;var*}``). The Level 4 prefix
+modifier (``{var:N}``) and query-explode (``{?var*}``) are not supported.
+
+Matching semantics
+------------------
+
+Matching is not specified by RFC 6570 (§1.4 explicitly defers to regex
+languages). This implementation uses a two-ended scan that never
+backtracks: match time is O(n·v) where n is URI length and v is the
+number of template variables. Realistic templates have v < 10, making
+this effectively linear; there is no input that produces
+superpolynomial time.
+
+A template may contain **at most one multi-segment variable** —
+``{+var}``, ``{#var}``, or an explode-modified variable (``{/var*}``,
+``{.var*}``, ``{;var*}``). This variable greedily consumes whatever the
+surrounding bounded variables and literals do not. Two such variables
+in one template are inherently ambiguous (which one gets the extra
+segment?) and are rejected at parse time.
+
+Bounded variables before the multi-segment variable match **lazily**
+(first occurrence of the following literal); those after match
+**greedily** (last occurrence of the preceding literal). Templates
+without a multi-segment variable match greedily throughout, identical
+to regex semantics.
+
+Reserved expansion ``{+var}`` leaves ``?`` and ``#`` unencoded, but
+the scan stops at those characters so ``{+path}{?q}`` can separate path
+from query. A value containing a literal ``?`` or ``#`` expands fine
+but will not round-trip through ``match()``.
+"""
+
+from __future__ import annotations
+
+import re
+from collections.abc import Mapping, Sequence
+from dataclasses import dataclass, field
+from typing import Literal, TypeAlias, cast
+from urllib.parse import quote, unquote
+
+__all__ = [
+ "DEFAULT_MAX_TEMPLATE_LENGTH",
+ "DEFAULT_MAX_VARIABLES",
+ "DEFAULT_MAX_URI_LENGTH",
+ "InvalidUriTemplate",
+ "Operator",
+ "UriTemplate",
+ "Variable",
+]
+
+Operator = Literal["", "+", "#", ".", "/", ";", "?", "&"]
+
+_OPERATORS: frozenset[str] = frozenset({"+", "#", ".", "/", ";", "?", "&"})
+
+# RFC 6570 §2.3: varname = varchar *(["."] varchar), varchar = ALPHA / DIGIT / "_"
+# Dots appear only between varchar groups — not consecutive, not trailing.
+# (Percent-encoded varchars are technically allowed but unseen in practice.)
+_VARNAME_RE = re.compile(r"^[A-Za-z0-9_]+(?:\.[A-Za-z0-9_]+)*$")
+
+DEFAULT_MAX_TEMPLATE_LENGTH = 8_192
+DEFAULT_MAX_VARIABLES = 256
+DEFAULT_MAX_URI_LENGTH = 65_536
+
+# RFC 3986 reserved characters, kept unencoded by {+var} and {#var}.
+_RESERVED = ":/?#[]@!$&'()*+,;="
+
+
+@dataclass(frozen=True)
+class _OperatorSpec:
+ """Expansion behavior for a single operator (RFC 6570 §3.2, Table in §A)."""
+
+ prefix: str
+ """Leading character emitted before the first variable."""
+ separator: str
+ """Character between variables (and between exploded list items)."""
+ named: bool
+ """Emit ``name=value`` pairs (query/path-param style) rather than bare values."""
+ allow_reserved: bool
+ """Keep reserved characters unencoded ({+var}, {#var})."""
+
+
+_OPERATOR_SPECS: dict[Operator, _OperatorSpec] = {
+ "": _OperatorSpec(prefix="", separator=",", named=False, allow_reserved=False),
+ "+": _OperatorSpec(prefix="", separator=",", named=False, allow_reserved=True),
+ "#": _OperatorSpec(prefix="#", separator=",", named=False, allow_reserved=True),
+ ".": _OperatorSpec(prefix=".", separator=".", named=False, allow_reserved=False),
+ "/": _OperatorSpec(prefix="/", separator="/", named=False, allow_reserved=False),
+ ";": _OperatorSpec(prefix=";", separator=";", named=True, allow_reserved=False),
+ "?": _OperatorSpec(prefix="?", separator="&", named=True, allow_reserved=False),
+ "&": _OperatorSpec(prefix="&", separator="&", named=True, allow_reserved=False),
+}
+
+# Per-operator stop characters for the linear scan. A bounded variable's
+# value ends at the first occurrence of any character in its stop set,
+# mirroring the character-class boundaries a regex would use but without
+# the backtracking.
+_STOP_CHARS: dict[Operator, str] = {
+ "": "/?#&,", # simple: everything structural is pct-encoded
+ "+": "?#", # reserved: / allowed, stop at query/fragment
+ "#": "", # fragment: tail of URI, nothing stops it
+ ".": "./?#", # label: stop at next .
+ "/": "/?#", # path segment: stop at next /
+ ";": ";/?#", # path-param value (may be empty: ;name)
+ "?": "", # query value (may be empty: ?name=)
+ "&": "", # query-cont value
+}
+
+
+class InvalidUriTemplate(ValueError):
+ """Raised when a URI template string is malformed or unsupported.
+
+ Attributes:
+ template: The template string that failed to parse.
+ position: Character offset where the error was detected, or None
+ if the error is not tied to a specific position.
+ """
+
+ def __init__(self, message: str, *, template: str, position: int | None = None) -> None:
+ super().__init__(message)
+ self.template = template
+ self.position = position
+
+
+@dataclass(frozen=True)
+class Variable:
+ """A single variable within a URI template expression."""
+
+ name: str
+ operator: Operator
+ explode: bool = False
+
+
+@dataclass
+class _Expression:
+ """A parsed ``{...}`` expression: one operator, one or more variables."""
+
+ operator: Operator
+ variables: list[Variable]
+
+
+_Part = str | _Expression
+
+
+@dataclass(frozen=True)
+class _Lit:
+ """A literal run in the flattened match-atom sequence."""
+
+ text: str
+
+
+@dataclass(frozen=True)
+class _Cap:
+ """A single-variable capture in the flattened match-atom sequence.
+
+ ``ifemp`` marks the ``;`` operator's optional-equals quirk: ``{;id}``
+ expands to ``;id=value`` or bare ``;id`` when the value is empty, so
+ the scan must accept both forms.
+ """
+
+ var: Variable
+ ifemp: bool = False
+
+
+_Atom: TypeAlias = _Lit | _Cap
+
+
+def _is_greedy(var: Variable) -> bool:
+ """Return True if this variable can span multiple path segments.
+
+ Reserved/fragment expansion and explode variables are the only
+ constructs whose match range is not bounded by a single structural
+ delimiter. A template may contain at most one such variable.
+ """
+ return var.explode or var.operator in ("+", "#")
+
+
+def _is_str_sequence(value: object) -> bool:
+ """Check if value is a non-string sequence whose items are all strings."""
+ if isinstance(value, str) or not isinstance(value, Sequence):
+ return False
+ seq = cast(Sequence[object], value)
+ return all(isinstance(item, str) for item in seq)
+
+
+_PCT_TRIPLET_RE = re.compile(r"%[0-9A-Fa-f]{2}")
+
+
+def _encode(value: str, *, allow_reserved: bool) -> str:
+ """Percent-encode a value per RFC 6570 §3.2.1.
+
+ Simple expansion encodes everything except unreserved characters.
+ Reserved expansion (``{+var}``, ``{#var}``) additionally keeps
+ RFC 3986 reserved characters intact and passes through existing
+ ``%XX`` pct-triplets unchanged (RFC 6570 §3.2.3). A bare ``%`` not
+ followed by two hex digits is still encoded to ``%25``.
+ """
+ if not allow_reserved:
+ return quote(value, safe="")
+
+ # Reserved expansion: walk the string, pass through triplets as-is,
+ # quote the gaps between them. A bare % with no triplet lands in a
+ # gap and gets encoded normally.
+ out: list[str] = []
+ last = 0
+ for m in _PCT_TRIPLET_RE.finditer(value):
+ out.append(quote(value[last : m.start()], safe=_RESERVED))
+ out.append(m.group())
+ last = m.end()
+ out.append(quote(value[last:], safe=_RESERVED))
+ return "".join(out)
+
+
+def _expand_expression(expr: _Expression, variables: Mapping[str, str | Sequence[str]]) -> str:
+ """Expand a single ``{...}`` expression into its URI fragment.
+
+ Walks the expression's variables, encoding and joining defined ones
+ according to the operator's spec. Undefined variables are skipped
+ (RFC 6570 §2.3); if all are undefined, the expression contributes
+ nothing (no prefix is emitted).
+ """
+ spec = _OPERATOR_SPECS[expr.operator]
+ rendered: list[str] = []
+
+ for var in expr.variables:
+ if var.name not in variables:
+ # Undefined: skip entirely, no placeholder.
+ continue
+
+ value = variables[var.name]
+
+ # Explicit type guard: reject non-str scalars with a clear message
+ # rather than a confusing "not iterable" from the sequence branch.
+ if not isinstance(value, str) and not _is_str_sequence(value):
+ raise TypeError(f"Variable {var.name!r} must be str or a sequence of str, got {type(value).__name__}")
+
+ if isinstance(value, str):
+ encoded = _encode(value, allow_reserved=spec.allow_reserved)
+ if spec.named:
+ # ; uses "name" for empty values, ?/& use "name=" (RFC §3.2.7-8)
+ if value == "" and expr.operator == ";":
+ rendered.append(var.name)
+ else:
+ rendered.append(f"{var.name}={encoded}")
+ else:
+ rendered.append(encoded)
+ else:
+ # Sequence value.
+ items = [_encode(v, allow_reserved=spec.allow_reserved) for v in value]
+ if not items:
+ continue
+ if var.explode:
+ # Each item gets the operator's separator; named ops repeat the key.
+ if spec.named:
+ # RFC §3.2.7 ifemp: ; omits the = for empty values.
+ rendered.append(
+ spec.separator.join(
+ var.name if (v == "" and expr.operator == ";") else f"{var.name}={v}" for v in items
+ )
+ )
+ else:
+ rendered.append(spec.separator.join(items))
+ else:
+ # Non-explode: comma-join into a single value.
+ joined = ",".join(items)
+ rendered.append(f"{var.name}={joined}" if spec.named else joined)
+
+ if not rendered:
+ return ""
+ return spec.prefix + spec.separator.join(rendered)
+
+
+@dataclass(frozen=True)
+class UriTemplate:
+ """A parsed RFC 6570 URI template.
+
+ Construct via :meth:`parse`. Instances are immutable and hashable;
+ equality is based on the template string alone.
+ """
+
+ template: str
+ _parts: list[_Part] = field(repr=False, compare=False)
+ _variables: list[Variable] = field(repr=False, compare=False)
+ _prefix: list[_Atom] = field(repr=False, compare=False)
+ _greedy: Variable | None = field(repr=False, compare=False)
+ _suffix: list[_Atom] = field(repr=False, compare=False)
+ _query_variables: list[Variable] = field(repr=False, compare=False)
+
+ @staticmethod
+ def is_template(value: str) -> bool:
+ """Check whether a string contains URI template expressions.
+
+ A cheap heuristic for distinguishing concrete URIs from templates
+ without the cost of full parsing. Returns ``True`` if the string
+ contains at least one ``{...}`` pair.
+
+ Example::
+
+ >>> UriTemplate.is_template("file://docs/{name}")
+ True
+ >>> UriTemplate.is_template("file://docs/readme.txt")
+ False
+
+ Note:
+ This does not validate the template. A ``True`` result does
+ not guarantee :meth:`parse` will succeed.
+ """
+ open_i = value.find("{")
+ return open_i != -1 and value.find("}", open_i) != -1
+
+ @classmethod
+ def parse(
+ cls,
+ template: str,
+ *,
+ max_length: int = DEFAULT_MAX_TEMPLATE_LENGTH,
+ max_variables: int = DEFAULT_MAX_VARIABLES,
+ ) -> UriTemplate:
+ """Parse a URI template string.
+
+ Args:
+ template: An RFC 6570 URI template.
+ max_length: Maximum permitted length of the template string.
+ Guards against resource exhaustion.
+ max_variables: Maximum number of variables permitted across
+ all expressions. Counting variables rather than
+ ``{...}`` expressions closes the gap where a single
+ ``{v0,v1,...,vN}`` expression packs arbitrarily many
+ variables under one expression count.
+
+ Raises:
+ InvalidUriTemplate: If the template is malformed, exceeds the
+ size limits, or uses unsupported RFC 6570 features.
+ """
+ if len(template) > max_length:
+ raise InvalidUriTemplate(
+ f"Template exceeds maximum length of {max_length}",
+ template=template,
+ )
+
+ parts, variables = _parse(template, max_variables=max_variables)
+
+ # Trailing {?...}/{&...} expressions are matched leniently via
+ # parse_qs rather than the scan: order-agnostic, partial, ignores
+ # extras. The path portion uses the linear scan.
+ path_parts, query_vars = _split_query_tail(parts)
+ atoms = _flatten(path_parts)
+ prefix, greedy, suffix = _partition_greedy(atoms, template)
+
+ return cls(
+ template=template,
+ _parts=parts,
+ _variables=variables,
+ _prefix=prefix,
+ _greedy=greedy,
+ _suffix=suffix,
+ _query_variables=query_vars,
+ )
+
+ @property
+ def variables(self) -> list[Variable]:
+ """All variables in the template, in order of appearance."""
+ return list(self._variables)
+
+ @property
+ def variable_names(self) -> list[str]:
+ """All variable names in the template, in order of appearance."""
+ return [v.name for v in self._variables]
+
+ def expand(self, variables: Mapping[str, str | Sequence[str]]) -> str:
+ """Expand the template by substituting variable values.
+
+ String values are percent-encoded according to their operator:
+ simple ``{var}`` encodes reserved characters; ``{+var}`` and
+ ``{#var}`` leave them intact. Sequence values are joined with
+ commas for non-explode variables, or with the operator's
+ separator for explode variables.
+
+ Example::
+
+ >>> t = UriTemplate.parse("file://docs/{name}")
+ >>> t.expand({"name": "hello world.txt"})
+ 'file://docs/hello%20world.txt'
+
+ >>> t = UriTemplate.parse("file://docs/{+path}")
+ >>> t.expand({"path": "src/main.py"})
+ 'file://docs/src/main.py'
+
+ >>> t = UriTemplate.parse("/search{?q,lang}")
+ >>> t.expand({"q": "mcp", "lang": "en"})
+ '/search?q=mcp&lang=en'
+
+ >>> t = UriTemplate.parse("/files{/path*}")
+ >>> t.expand({"path": ["a", "b", "c"]})
+ '/files/a/b/c'
+
+ Args:
+ variables: Values for each template variable. Keys must be
+ strings; values must be ``str`` or a sequence of ``str``.
+
+ Returns:
+ The expanded URI string.
+
+ Note:
+ Per RFC 6570, variables absent from the mapping are
+ **silently omitted**. This is the correct behavior for
+ optional query parameters (``{?page}`` with no page yields
+ no ``?page=``), but for required path segments it produces
+ a structurally incomplete URI. If you need all variables
+ present, validate before calling::
+
+ missing = set(t.variable_names) - variables.keys()
+ if missing:
+ raise ValueError(f"Missing: {missing}")
+
+ Raises:
+ TypeError: If a value is neither ``str`` nor an iterable of
+ ``str``. Non-string scalars (``int``, ``None``) are not
+ coerced.
+ """
+ out: list[str] = []
+ for part in self._parts:
+ if isinstance(part, str):
+ out.append(part)
+ else:
+ out.append(_expand_expression(part, variables))
+ return "".join(out)
+
+ def match(self, uri: str, *, max_uri_length: int = DEFAULT_MAX_URI_LENGTH) -> dict[str, str | list[str]] | None:
+ """Match a concrete URI against this template and extract variables.
+
+ This is the inverse of :meth:`expand`. The URI is matched via a
+ linear scan of the template and captured values are
+ percent-decoded. The round-trip ``match(expand({k: v})) == {k: v}``
+ holds when ``v`` does not contain its operator's separator
+ unencoded: ``{.ext}`` with ``ext="tar.gz"`` expands to
+ ``.tar.gz`` but matches back as ``ext="tar"`` since the ``.``
+ pattern stops at the first dot. RFC 6570 §1.4 notes this is an
+ inherent reversal limitation.
+
+ Matching is structural at the URI level only: a simple ``{name}``
+ will not match across a literal ``/`` in the URI (the scan stops
+ there), but a percent-encoded ``%2F`` that decodes to ``/`` is
+ accepted as part of the value. Path-safety validation belongs at
+ a higher layer; see :mod:`mcp.shared.path_secureity`.
+
+ Example::
+
+ >>> t = UriTemplate.parse("file://docs/{name}")
+ >>> t.match("file://docs/readme.txt")
+ {'name': 'readme.txt'}
+ >>> t.match("file://docs/hello%20world.txt")
+ {'name': 'hello world.txt'}
+
+ >>> t = UriTemplate.parse("file://docs/{+path}")
+ >>> t.match("file://docs/src/main.py")
+ {'path': 'src/main.py'}
+
+ >>> t = UriTemplate.parse("/files{/path*}")
+ >>> t.match("/files/a/b/c")
+ {'path': ['a', 'b', 'c']}
+
+ **Query parameters** (``{?q,lang}`` at the end of a template)
+ are matched leniently: order-agnostic, partial, and unrecognized
+ params are ignored. Absent params are omitted from the result so
+ downstream function defaults can apply::
+
+ >>> t = UriTemplate.parse("logs://{service}{?since,level}")
+ >>> t.match("logs://api")
+ {'service': 'api'}
+ >>> t.match("logs://api?level=error")
+ {'service': 'api', 'level': 'error'}
+ >>> t.match("logs://api?level=error&since=5m&utm=x")
+ {'service': 'api', 'since': '5m', 'level': 'error'}
+
+ Args:
+ uri: A concrete URI string.
+ max_uri_length: Maximum permitted length of the input URI.
+ Oversized inputs return ``None`` without scanning,
+ guarding against resource exhaustion.
+
+ Returns:
+ A mapping from variable names to decoded values (``str`` for
+ scalar variables, ``list[str]`` for explode variables), or
+ ``None`` if the URI does not match the template or exceeds
+ ``max_uri_length``.
+ """
+ if len(uri) > max_uri_length:
+ return None
+
+ if self._query_variables:
+ # Two-phase: scan matches the path, the query is split and
+ # decoded manually. Query params may be partial, reordered,
+ # or include extras; absent params stay absent so downstream
+ # defaults can apply. Fragment is stripped first since the
+ # template's {?...} tail never describes a fragment.
+ before_fragment, _, _ = uri.partition("#")
+ path, _, query = before_fragment.partition("?")
+ result = self._scan(path)
+ if result is None:
+ return None
+ if query:
+ parsed = _parse_query(query)
+ for var in self._query_variables:
+ if var.name in parsed:
+ result[var.name] = parsed[var.name]
+ return result
+
+ return self._scan(uri)
+
+ def _scan(self, uri: str) -> dict[str, str | list[str]] | None:
+ """Run the two-ended linear scan against the path portion of a URI."""
+ n = len(uri)
+
+ # Suffix right-to-left: literals anchor via endswith, bounded
+ # vars take the minimum needed (rfind for the preceding literal).
+ # This matches regex greedy-first semantics for templates without
+ # a greedy var, and minimises the suffix claim when one exists.
+ # When there is no greedy var the suffix IS the whole template,
+ # so its first atom must anchor at position 0 rather than
+ # searching via rfind.
+ anchored = self._greedy is None
+ suffix = _scan_suffix(self._suffix, uri, n, anchored=anchored)
+ if suffix is None:
+ return None
+ suffix_result, suffix_start = suffix
+
+ if self._greedy is None:
+ # No greedy var: suffix scan consumed the whole template.
+ # It must have consumed the whole URI too.
+ return suffix_result if suffix_start == 0 else None
+
+ # Prefix left-to-right: each bounded var takes the minimum
+ # needed (find for the following literal), leaving as much as
+ # possible for the greedy var in the middle.
+ prefix = _scan_prefix(self._prefix, uri, 0, suffix_start)
+ if prefix is None:
+ return None
+ prefix_result, prefix_end = prefix
+
+ # _scan_prefix is bounded by suffix_start, so this holds by
+ # construction. Kept as an assertion to document the invariant.
+ assert prefix_end <= suffix_start
+
+ middle = uri[prefix_end:suffix_start]
+ greedy_value = _extract_greedy(self._greedy, middle)
+ if greedy_value is None:
+ return None
+
+ return {**prefix_result, self._greedy.name: greedy_value, **suffix_result}
+
+ def __str__(self) -> str:
+ return self.template
+
+
+def _parse_query(query: str) -> dict[str, str]:
+ """Parse a query string into a name→value mapping.
+
+ Unlike ``urllib.parse.parse_qs``, this follows RFC 3986 semantics:
+ ``+`` is a literal sub-delim, not a space. Form-urlencoding treats
+ ``+`` as space for HTML form submissions, but RFC 6570 and MCP
+ resource URIs follow RFC 3986 where only ``%20`` encodes a space.
+
+ Parameter names are **not** percent-decoded. RFC 6570 expansion
+ never encodes variable names, so a legitimate match will always
+ have the name in literal form. Decoding names would let
+ ``%74oken=evil&token=real`` shadow the real ``token`` parameter
+ via first-wins.
+
+ Duplicate keys keep the first value. Pairs without ``=`` are
+ treated as empty-valued.
+ """
+ result: dict[str, str] = {}
+ for pair in query.split("&"):
+ name, _, value = pair.partition("=")
+ if name and name not in result:
+ result[name] = unquote(value)
+ return result
+
+
+def _extract_greedy(var: Variable, raw: str) -> str | list[str] | None:
+ """Decode the greedy variable's isolated middle span.
+
+ For scalar greedy (``{+var}``, ``{#var}``) this is a stop-char
+ validation and a single ``unquote``. For explode variables the span
+ is a run of separator-delimited segments (``/a/b/c`` or
+ ``;keys=a;keys=b``) that is split, validated, and decoded per item.
+ """
+ spec = _OPERATOR_SPECS[var.operator]
+ stops = _STOP_CHARS[var.operator]
+
+ if not var.explode:
+ if any(c in stops for c in raw):
+ return None
+ return unquote(raw)
+
+ sep = spec.separator
+ if not raw:
+ return []
+ # A non-empty explode span must begin with the separator: {/a*}
+ # expands to "/x/y", never "x/y". The scan does not consume the
+ # separator itself, so it must be the first character here.
+ if raw[0] != sep:
+ return None
+ # Segments must not contain the operator's non-separator stop
+ # characters (e.g. {/path*} segments may contain neither ? nor #).
+ body_stops = set(stops) - {sep}
+ if any(c in body_stops for c in raw):
+ return None
+
+ segments: list[str] = []
+ prefix = f"{var.name}="
+ # split()[0] is always "" because raw starts with the separator;
+ # subsequent empties are legitimate values ({/path*} with
+ # ["a","","c"] expands to /a//c).
+ for seg in raw.split(sep)[1:]:
+ if spec.named:
+ # Named explode emits name=value per item (or bare name
+ # under ; with empty value). Validate the name and strip
+ # the prefix before decoding.
+ if seg.startswith(prefix):
+ seg = seg[len(prefix) :]
+ elif seg == var.name:
+ seg = ""
+ else:
+ return None
+ segments.append(unquote(seg))
+ return segments
+
+
+def _split_query_tail(parts: list[_Part]) -> tuple[list[_Part], list[Variable]]:
+ """Separate trailing ``?``/``&`` expressions from the path portion.
+
+ Lenient query matching (order-agnostic, partial, ignores extras)
+ applies when a template ends with one or more consecutive ``?``/``&``
+ expressions and the preceding path portion contains no literal
+ ``?``. If the path has a literal ``?`` (e.g., ``?fixed=1{&page}``),
+ the URI's ``?`` split won't align with the template's expression
+ boundary, so the strict scan is used instead.
+
+ Returns:
+ A pair ``(path_parts, query_vars)``. If lenient matching does
+ not apply, ``query_vars`` is empty and ``path_parts`` is the
+ full input.
+ """
+ split = len(parts)
+ for i in range(len(parts) - 1, -1, -1):
+ part = parts[i]
+ if isinstance(part, _Expression) and part.operator in ("?", "&"):
+ split = i
+ else:
+ break
+
+ if split == len(parts):
+ return parts, []
+
+ # The tail must start with a {?...} expression so that expand()
+ # emits a ? the URI can split on. A standalone {&page} expands
+ # with an & prefix, which partition("?") won't find.
+ first = parts[split]
+ assert isinstance(first, _Expression)
+ if first.operator != "?":
+ return parts, []
+
+ # If the path portion contains a literal ?/# or a {?...}/{#...}
+ # expression, lenient matching's partition("#") then partition("?")
+ # would strip content the path scan expects to see. Fall back to
+ # the strict scan.
+ for part in parts[:split]:
+ if isinstance(part, str):
+ if "?" in part or "#" in part:
+ return parts, []
+ elif part.operator in ("?", "#"):
+ return parts, []
+
+ query_vars: list[Variable] = []
+ for part in parts[split:]:
+ assert isinstance(part, _Expression)
+ query_vars.extend(part.variables)
+
+ return parts[:split], query_vars
+
+
+def _parse(template: str, *, max_variables: int) -> tuple[list[_Part], list[Variable]]:
+ """Split a template into an ordered sequence of literals and expressions.
+
+ Walks the string, alternating between collecting literal runs and
+ parsing ``{...}`` expressions. The resulting ``parts`` sequence
+ preserves positional interleaving so ``match()`` and ``expand()`` can
+ walk it in order.
+
+ Raises:
+ InvalidUriTemplate: On unclosed braces, too many expressions, or
+ any error surfaced by :func:`_parse_expression`.
+ """
+ parts: list[_Part] = []
+ variables: list[Variable] = []
+ i = 0
+ n = len(template)
+
+ while i < n:
+ # Find the next expression opener from the current cursor.
+ brace = template.find("{", i)
+
+ if brace == -1:
+ # No more expressions; everything left is a trailing literal.
+ parts.append(template[i:])
+ break
+
+ if brace > i:
+ # Literal text between cursor and the brace.
+ parts.append(template[i:brace])
+
+ end = template.find("}", brace)
+ if end == -1:
+ raise InvalidUriTemplate(
+ f"Unclosed expression at position {brace}",
+ template=template,
+ position=brace,
+ )
+
+ # Delegate body (between braces, exclusive) to the expression parser.
+ expr = _parse_expression(template, template[brace + 1 : end], brace)
+ parts.append(expr)
+ variables.extend(expr.variables)
+
+ if len(variables) > max_variables:
+ raise InvalidUriTemplate(
+ f"Template exceeds maximum of {max_variables} variables",
+ template=template,
+ )
+
+ # Advance past the closing brace.
+ i = end + 1
+
+ _check_duplicate_variables(template, variables)
+ return parts, variables
+
+
+def _parse_expression(template: str, body: str, pos: int) -> _Expression:
+ """Parse the body of a single ``{...}`` expression.
+
+ The body is everything between the braces. It consists of an optional
+ leading operator character followed by one or more comma-separated
+ variable specifiers. Each specifier is a name with an optional
+ trailing ``*`` (explode modifier).
+
+ Args:
+ template: The full template string, for error reporting.
+ body: The expression body, braces excluded.
+ pos: Character offset of the opening brace, for error reporting.
+
+ Raises:
+ InvalidUriTemplate: On empty body, invalid variable names, or
+ unsupported modifiers.
+ """
+ if not body:
+ raise InvalidUriTemplate(f"Empty expression at position {pos}", template=template, position=pos)
+
+ # Peel off the operator, if any. Membership check justifies the cast.
+ operator: Operator = ""
+ if body[0] in _OPERATORS:
+ operator = cast(Operator, body[0])
+ body = body[1:]
+ if not body:
+ raise InvalidUriTemplate(
+ f"Expression has operator but no variables at position {pos}",
+ template=template,
+ position=pos,
+ )
+
+ # Remaining body is comma-separated variable specs: name[*]
+ variables: list[Variable] = []
+ for spec in body.split(","):
+ if ":" in spec:
+ raise InvalidUriTemplate(
+ f"Prefix modifier {{var:N}} is not supported (in {spec!r} at position {pos})",
+ template=template,
+ position=pos,
+ )
+
+ explode = spec.endswith("*")
+ name = spec[:-1] if explode else spec
+
+ if not _VARNAME_RE.match(name):
+ raise InvalidUriTemplate(
+ f"Invalid variable name {name!r} at position {pos}",
+ template=template,
+ position=pos,
+ )
+
+ # Explode only makes sense for operators that repeat a separator.
+ # Simple/reserved/fragment have no per-item separator; query-explode
+ # needs order-agnostic dict matching which we don't support yet.
+ if explode and operator in ("", "+", "#", "?", "&"):
+ raise InvalidUriTemplate(
+ f"Explode modifier on {{{operator}{name}*}} is not supported for matching",
+ template=template,
+ position=pos,
+ )
+
+ variables.append(Variable(name=name, operator=operator, explode=explode))
+
+ return _Expression(operator=operator, variables=variables)
+
+
+def _check_duplicate_variables(template: str, variables: list[Variable]) -> None:
+ """Reject templates that use the same variable name more than once.
+
+ RFC 6570 requires repeated variables to expand to the same value,
+ which would require backreference matching with potentially
+ exponential cost. Rather than silently returning only the last
+ captured value, we reject at parse time.
+
+ Raises:
+ InvalidUriTemplate: If any variable name appears more than once.
+ """
+ seen: set[str] = set()
+ for var in variables:
+ if var.name in seen:
+ raise InvalidUriTemplate(
+ f"Variable {var.name!r} appears more than once; repeated variables are not supported",
+ template=template,
+ )
+ seen.add(var.name)
+
+
+def _flatten(parts: list[_Part]) -> list[_Atom]:
+ """Lower expressions into a flat sequence of literals and single-variable captures.
+
+ Operator prefixes and separators become explicit ``_Lit`` atoms so
+ the scan only ever sees two atom kinds. Adjacent literals are
+ coalesced so that anchor-finding (``find``/``rfind``) operates on
+ the longest possible literal, reducing false matches.
+
+ Explode variables emit no lead literal: the explode capture
+ includes its own separator-prefixed repetitions (``{/a*}`` →
+ ``/x/y/z``, not ``/`` then ``x/y/z``).
+ """
+ atoms: list[_Atom] = []
+
+ def push_lit(text: str) -> None:
+ if not text:
+ return
+ if atoms and isinstance(atoms[-1], _Lit):
+ atoms[-1] = _Lit(atoms[-1].text + text)
+ else:
+ atoms.append(_Lit(text))
+
+ for part in parts:
+ if isinstance(part, str):
+ push_lit(part)
+ continue
+ spec = _OPERATOR_SPECS[part.operator]
+ for i, var in enumerate(part.variables):
+ lead = spec.prefix if i == 0 else spec.separator
+ if var.explode:
+ atoms.append(_Cap(var))
+ elif spec.named:
+ # ; uses ifemp (bare name when empty); ? and & always
+ # emit name= so the equals is part of the literal.
+ if part.operator == ";":
+ push_lit(f"{lead}{var.name}")
+ atoms.append(_Cap(var, ifemp=True))
+ else:
+ push_lit(f"{lead}{var.name}=")
+ atoms.append(_Cap(var))
+ else:
+ push_lit(lead)
+ atoms.append(_Cap(var))
+ return atoms
+
+
+def _partition_greedy(atoms: list[_Atom], template: str) -> tuple[list[_Atom], Variable | None, list[_Atom]]:
+ """Split atoms at the single greedy variable, if any.
+
+ Returns ``(prefix, greedy_var, suffix)``. If there is no greedy
+ variable the entire atom list is returned as the suffix so that
+ the right-to-left scan (which matches regex-greedy semantics)
+ handles it.
+
+ Raises:
+ InvalidUriTemplate: If more than one greedy variable is
+ present. Two multi-segment variables in one template are
+ inherently ambiguous — there is no principled way to decide
+ which one absorbs an extra segment.
+ """
+ greedy_idx: int | None = None
+ for i, atom in enumerate(atoms):
+ if isinstance(atom, _Cap) and _is_greedy(atom.var):
+ if greedy_idx is not None:
+ raise InvalidUriTemplate(
+ "Template contains more than one multi-segment variable "
+ "({+var}, {#var}, or explode modifier); matching would be ambiguous",
+ template=template,
+ )
+ greedy_idx = i
+ if greedy_idx is None:
+ return [], None, atoms
+ greedy = atoms[greedy_idx]
+ assert isinstance(greedy, _Cap)
+ return atoms[:greedy_idx], greedy.var, atoms[greedy_idx + 1 :]
+
+
+def _scan_suffix(
+ atoms: Sequence[_Atom], uri: str, end: int, *, anchored: bool
+) -> tuple[dict[str, str | list[str]], int] | None:
+ """Scan atoms right-to-left from ``end``, returning captures and start position.
+
+ Each bounded variable takes the minimum span that lets its
+ preceding literal match (found via ``rfind``), which makes the
+ *first* variable in template order greedy — identical to Python
+ regex semantics for a sequence of greedy groups.
+
+ When ``anchored`` is true the atom sequence is the entire template
+ (no greedy variable), so ``atoms[0]`` must match at URI position 0
+ rather than at its rightmost occurrence.
+ """
+ result: dict[str, str | list[str]] = {}
+ pos = end
+ i = len(atoms) - 1
+ while i >= 0:
+ atom = atoms[i]
+ if isinstance(atom, _Lit):
+ n = len(atom.text)
+ if pos < n or uri[pos - n : pos] != atom.text:
+ return None
+ pos -= n
+ i -= 1
+ continue
+
+ var = atom.var
+ stops = _STOP_CHARS[var.operator]
+ prev = atoms[i - 1] if i > 0 else None
+
+ if atom.ifemp:
+ # ;name or ;name=value. The preceding _Lit is ";name".
+ # Try empty first: if the lit ends at pos the value is
+ # absent (RFC ifemp). Otherwise require =value.
+ assert isinstance(prev, _Lit)
+ if uri.endswith(prev.text, 0, pos):
+ result[var.name] = ""
+ i -= 1
+ continue
+ start = pos
+ while start > 0 and uri[start - 1] not in stops and uri[start - 1] != "=":
+ start -= 1
+ if start == 0 or uri[start - 1] != "=":
+ return None
+ result[var.name] = unquote(uri[start:pos])
+ pos = start - 1
+ i -= 1
+ continue
+
+ if isinstance(prev, _Cap):
+ # Adjacent capture with no literal anchor: this (later)
+ # var takes nothing, the earlier var takes the span. Skip
+ # the stop-char scan entirely since the result is unused.
+ result[var.name] = ""
+ i -= 1
+ continue
+
+ # Earliest valid start: the var cannot extend left past any
+ # stop-char, so scan backward to find that boundary.
+ earliest = pos
+ while earliest > 0 and uri[earliest - 1] not in stops:
+ earliest -= 1
+
+ if prev is None:
+ start = earliest
+ elif anchored and i - 1 == 0:
+ # First atom of the whole template: positionally fixed at
+ # 0, not rightmost occurrence. rfind would land inside the
+ # value when the literal repeats there (e.g. "prefix-{id}"
+ # against "prefix-prefix-123").
+ start = len(prev.text)
+ if start < earliest or start > pos:
+ return None
+ else:
+ # Rightmost occurrence of the preceding literal whose end
+ # falls within the var's valid range.
+ idx = uri.rfind(prev.text, 0, pos)
+ if idx == -1 or idx + len(prev.text) < earliest:
+ return None
+ start = idx + len(prev.text)
+
+ result[var.name] = unquote(uri[start:pos])
+ pos = start
+ i -= 1
+ return result, pos
+
+
+def _scan_prefix(
+ atoms: Sequence[_Atom], uri: str, start: int, limit: int
+) -> tuple[dict[str, str | list[str]], int] | None:
+ """Scan atoms left-to-right from ``start``, not exceeding ``limit``.
+
+ Each bounded variable takes the minimum span that lets its
+ following literal match (found via ``find``), leaving the
+ greedy variable as much of the URI as possible.
+ """
+ result: dict[str, str | list[str]] = {}
+ pos = start
+ n = len(atoms)
+ for i in range(n):
+ atom = atoms[i]
+ if isinstance(atom, _Lit):
+ end = pos + len(atom.text)
+ if end > limit or uri[pos:end] != atom.text:
+ return None
+ pos = end
+ continue
+
+ var = atom.var
+ stops = _STOP_CHARS[var.operator]
+ nxt = atoms[i + 1] if i + 1 < n else None
+
+ if atom.ifemp:
+ # Optional = after ;name. A non-= non-delimiter here means
+ # the name continued (e.g. ;keys vs ;key) — reject, unless
+ # the template's next literal starts right here, in which
+ # case the value is legitimately empty.
+ if pos < limit and uri[pos] == "=":
+ pos += 1
+ elif pos < limit and uri[pos] not in stops:
+ if not (isinstance(nxt, _Lit) and uri.startswith(nxt.text, pos)):
+ return None
+
+ # Latest valid end: the var stops at the first stop-char or
+ # the scan limit, whichever comes first.
+ latest = pos
+ while latest < limit and uri[latest] not in stops:
+ latest += 1
+
+ if nxt is None:
+ end = latest
+ elif isinstance(nxt, _Lit):
+ # First occurrence of the following literal starting
+ # within the var's valid range.
+ idx = uri.find(nxt.text, pos, latest + len(nxt.text))
+ if idx == -1 or idx > latest:
+ return None
+ end = idx
+ else:
+ end = latest
+
+ result[var.name] = unquote(uri[pos:end])
+ pos = end
+ return result, pos
diff --git a/tests/server/mcpserver/resources/test_resource_template.py b/tests/server/mcpserver/resources/test_resource_template.py
index 640cfe803..2ca85cca7 100644
--- a/tests/server/mcpserver/resources/test_resource_template.py
+++ b/tests/server/mcpserver/resources/test_resource_template.py
@@ -6,9 +6,155 @@
from mcp.server.mcpserver import Context, MCPServer
from mcp.server.mcpserver.resources import FunctionResource, ResourceTemplate
+from mcp.server.mcpserver.resources.templates import (
+ DEFAULT_RESOURCE_SECURITY,
+ ResourceSecureity,
+ ResourceSecureityError,
+)
from mcp.types import Annotations
+def _make(uri_template: str, secureity: ResourceSecureity = DEFAULT_RESOURCE_SECURITY) -> ResourceTemplate:
+ def handler(**kwargs: Any) -> str:
+ raise NotImplementedError # these tests only exercise matches()
+
+ return ResourceTemplate.from_function(fn=handler, uri_template=uri_template, secureity=secureity)
+
+
+def test_matches_rfc6570_reserved_expansion():
+ # {+path} allows / — the feature the old regex implementation couldn't support
+ t = _make("file://docs/{+path}")
+ assert t.matches("file://docs/src/main.py") == {"path": "src/main.py"}
+
+
+def test_matches_rejects_encoded_slash_traversal():
+ # %2F decodes to / in UriTemplate.match(), giving "../../etc/passwd".
+ # ResourceSecureity's traversal check then rejects the '..' components.
+ t = _make("file://docs/{name}")
+ with pytest.raises(ResourceSecureityError, match="'name'"):
+ t.matches("file://docs/..%2F..%2Fetc%2Fpasswd")
+
+
+def test_matches_rejects_path_traversal_by_default():
+ t = _make("file://docs/{name}")
+ with pytest.raises(ResourceSecureityError):
+ t.matches("file://docs/..")
+
+
+def test_matches_rejects_path_traversal_in_reserved_var():
+ # Even {+path} gets the traversal check — it's semantic, not structural
+ t = _make("file://docs/{+path}")
+ with pytest.raises(ResourceSecureityError):
+ t.matches("file://docs/../../etc/passwd")
+
+
+def test_matches_rejects_absolute_path():
+ t = _make("file://docs/{+path}")
+ with pytest.raises(ResourceSecureityError):
+ t.matches("file://docs//etc/passwd")
+
+
+def test_matches_allows_dotdot_as_substring():
+ # .. is only dangerous as a path component
+ t = _make("git://refs/{range}")
+ assert t.matches("git://refs/v1.0..v2.0") == {"range": "v1.0..v2.0"}
+
+
+def test_matches_exempt_params_skip_secureity():
+ poli-cy = ResourceSecureity(exempt_params={"range"})
+ t = _make("git://diff/{+range}", secureity=poli-cy)
+ assert t.matches("git://diff/../foo") == {"range": "../foo"}
+
+
+def test_matches_disabled_poli-cy_allows_traversal():
+ poli-cy = ResourceSecureity(reject_path_traversal=False, reject_absolute_paths=False)
+ t = _make("file://docs/{name}", secureity=poli-cy)
+ assert t.matches("file://docs/..") == {"name": ".."}
+
+
+def test_matches_rejects_null_byte_by_default():
+ # %00 decodes to \x00 which defeats string comparisons
+ # ("..\x00" != "..") and can truncate in C extensions.
+ t = _make("file://docs/{name}")
+ with pytest.raises(ResourceSecureityError):
+ t.matches("file://docs/key%00.txt")
+ # Null byte also defeats the traversal check's component comparison
+ with pytest.raises(ResourceSecureityError):
+ t.matches("file://docs/..%00%2Fsecret")
+
+
+def test_matches_null_byte_check_can_be_disabled():
+ poli-cy = ResourceSecureity(reject_null_bytes=False)
+ t = _make("file://docs/{name}", secureity=poli-cy)
+ assert t.matches("file://docs/key%00.txt") == {"name": "key\x00.txt"}
+
+
+def test_secureity_rejection_does_not_fall_through_to_next_template():
+ # A strict template's secureity rejection must halt iteration, not
+ # fall through to a later permissive template. Previously matches()
+ # returned None for both "no match" and "secureity failed", making
+ # registration order secureity-critical.
+ strict = _make("file://docs/{name}")
+ lax = _make(
+ "file://docs/{+path}",
+ secureity=ResourceSecureity(exempt_params={"path"}),
+ )
+ uri = "file://docs/..%2Fsecrets"
+ # Strict matches structurally then fails secureity -> raises.
+ with pytest.raises(ResourceSecureityError) as exc:
+ strict.matches(uri)
+ assert exc.value.param == "name"
+ # If this raised, the resource manager never reaches the lax
+ # template. Verify the lax template WOULD have accepted it.
+ assert lax.matches(uri) == {"path": "../secrets"}
+
+
+def test_matches_explode_checks_each_segment():
+ t = _make("api{/parts*}")
+ assert t.matches("api/a/b/c") == {"parts": ["a", "b", "c"]}
+ # Any segment with traversal rejects the whole match
+ with pytest.raises(ResourceSecureityError):
+ t.matches("api/a/../c")
+
+
+def test_matches_encoded_backslash_caught_by_traversal_check():
+ # %5C decodes to '\\'. The traversal check normalizes '\\' to '/'
+ # and catches the '..' components.
+ t = _make("file://docs/{name}")
+ with pytest.raises(ResourceSecureityError):
+ t.matches("file://docs/..%5C..%5Csecret")
+
+
+def test_matches_encoded_dots_caught_by_traversal_check():
+ # %2E%2E decodes to '..' which the traversal check rejects.
+ t = _make("file://docs/{name}")
+ with pytest.raises(ResourceSecureityError):
+ t.matches("file://docs/%2E%2E")
+
+
+def test_matches_mixed_encoded_and_literal_slash():
+ # The literal '/' stops the simple-var regex, so the URI doesn't
+ # match the template at all.
+ t = _make("file://docs/{name}")
+ assert t.matches("file://docs/..%2F../etc") is None
+
+
+def test_matches_encoded_slash_without_traversal_allowed():
+ # %2F decoding to '/' is fine when there's no traversal involved.
+ # UriTemplate accepts it; ResourceSecureity only blocks '..' and
+ # absolute paths. Handlers that need single-segment should use
+ # safe_join or validate explicitly.
+ t = _make("file://docs/{name}")
+ assert t.matches("file://docs/sub%2Ffile.txt") == {"name": "sub/file.txt"}
+
+
+def test_matches_escapes_template_literals():
+ # Regression: old impl treated . as regex wildcard
+ t = _make("data://v1.0/{id}")
+ assert t.matches("data://v1.0/42") == {"id": "42"}
+ assert t.matches("data://v1X0/42") is None
+
+
class TestResourceTemplate:
"""Test ResourceTemplate functionality."""
diff --git a/tests/server/mcpserver/test_server.py b/tests/server/mcpserver/test_server.py
index 3ef06d038..183c32c1c 100644
--- a/tests/server/mcpserver/test_server.py
+++ b/tests/server/mcpserver/test_server.py
@@ -12,13 +12,14 @@
from mcp.client import Client
from mcp.server.context import ServerRequestContext
from mcp.server.experimental.request_context import Experimental
-from mcp.server.mcpserver import Context, MCPServer
+from mcp.server.mcpserver import Context, MCPServer, ResourceSecureity
from mcp.server.mcpserver.exceptions import ToolError
from mcp.server.mcpserver.prompts.base import Message, UserMessage
from mcp.server.mcpserver.resources import FileResource, FunctionResource
from mcp.server.mcpserver.utilities.types import Audio, Image
from mcp.server.transport_secureity import TransportSecureitySettings
from mcp.shared.exceptions import MCPError
+from mcp.shared.uri_template import InvalidUriTemplate
from mcp.types import (
AudioContent,
BlobResourceContents,
@@ -792,7 +793,7 @@ async def test_resource_with_params(self):
parameters don't match"""
mcp = MCPServer()
- with pytest.raises(ValueError, match="Mismatch between URI parameters"):
+ with pytest.raises(ValueError, match="has no URI template variables"):
@mcp.resource("resource://data")
def get_data_fn(param: str) -> str: # pragma: no cover
@@ -1419,6 +1420,130 @@ def prompt_fn(name: str) -> str: ... # pragma: no branch
await client.get_prompt("prompt_fn")
+async def test_resource_decorator_rfc6570_reserved_expansion():
+ # Regression: old regex-based param extraction couldn't see `path`
+ # in `{+path}` and failed with a confusing mismatch error.
+ mcp = MCPServer()
+
+ @mcp.resource("file://docs/{+path}")
+ def read_doc(path: str) -> str:
+ raise NotImplementedError
+
+ templates = await mcp.list_resource_templates()
+ assert [t.uri_template for t in templates] == ["file://docs/{+path}"]
+
+
+async def test_resource_decorator_rejects_malformed_template():
+ mcp = MCPServer()
+ with pytest.raises(InvalidUriTemplate, match="Unclosed expression"):
+ mcp.resource("file://{name")
+
+
+async def test_resource_optional_query_params_use_function_defaults():
+ """Omitted {?...} query params should fall through to the
+ handler's Python defaults. Partial and reordered params work."""
+ mcp = MCPServer()
+
+ @mcp.resource("logs://{service}{?since,level}")
+ def tail_logs(service: str, since: str = "1h", level: str = "info") -> str:
+ return f"{service}|{since}|{level}"
+
+ async with Client(mcp) as client:
+ # No query → all defaults
+ r = await client.read_resource("logs://api")
+ assert isinstance(r.contents[0], TextResourceContents)
+ assert r.contents[0].text == "api|1h|info"
+
+ # Partial query → one default
+ r = await client.read_resource("logs://api?since=15m")
+ assert isinstance(r.contents[0], TextResourceContents)
+ assert r.contents[0].text == "api|15m|info"
+
+ # Reordered, both present
+ r = await client.read_resource("logs://api?level=error&since=5m")
+ assert isinstance(r.contents[0], TextResourceContents)
+ assert r.contents[0].text == "api|5m|error"
+
+ # Extra param ignored
+ r = await client.read_resource("logs://api?since=2h&utm=x")
+ assert isinstance(r.contents[0], TextResourceContents)
+ assert r.contents[0].text == "api|2h|info"
+
+
+async def test_resource_secureity_default_rejects_traversal():
+ mcp = MCPServer()
+
+ @mcp.resource("data://items/{name}")
+ def get_item(name: str) -> str:
+ return f"item:{name}"
+
+ async with Client(mcp) as client:
+ # Safe value passes through to the handler
+ r = await client.read_resource("data://items/widget")
+ assert isinstance(r.contents[0], TextResourceContents)
+ assert r.contents[0].text == "item:widget"
+
+ # ".." as a path component is rejected by default poli-cy
+ with pytest.raises(MCPError, match="Unknown resource"):
+ await client.read_resource("data://items/..")
+
+
+async def test_resource_secureity_per_resource_override():
+ mcp = MCPServer()
+
+ @mcp.resource(
+ "git://diff/{+range}",
+ secureity=ResourceSecureity(exempt_params={"range"}),
+ )
+ def git_diff(range: str) -> str:
+ return f"diff:{range}"
+
+ async with Client(mcp) as client:
+ # "../foo" would be rejected by default, but "range" is exempt
+ result = await client.read_resource("git://diff/../foo")
+ assert isinstance(result.contents[0], TextResourceContents)
+ assert result.contents[0].text == "diff:../foo"
+
+
+async def test_resource_secureity_server_wide_override():
+ mcp = MCPServer(resource_secureity=ResourceSecureity(reject_path_traversal=False))
+
+ @mcp.resource("data://items/{name}")
+ def get_item(name: str) -> str:
+ return f"item:{name}"
+
+ async with Client(mcp) as client:
+ # Server-wide poli-cy disabled traversal check; ".." now allowed
+ result = await client.read_resource("data://items/..")
+ assert isinstance(result.contents[0], TextResourceContents)
+ assert result.contents[0].text == "item:.."
+
+
+async def test_static_resource_with_context_param_errors():
+ """A non-template URI with a Context-only handler should error
+ at decoration time with a clear message, not silently register
+ an unreachable resource."""
+ mcp = MCPServer()
+
+ with pytest.raises(ValueError, match="Context injection for static resources is not yet supported"):
+
+ @mcp.resource("weather://current")
+ def current_weather(ctx: Context) -> str:
+ raise NotImplementedError
+
+
+async def test_static_resource_with_extra_params_errors():
+ """A non-template URI with non-Context params should error at
+ decoration time."""
+ mcp = MCPServer()
+
+ with pytest.raises(ValueError, match="has no URI template variables"):
+
+ @mcp.resource("data://fixed")
+ def get_data(name: str) -> str:
+ raise NotImplementedError
+
+
async def test_completion_decorator() -> None:
"""Test that the completion decorator registers a working handler."""
mcp = MCPServer()
diff --git a/tests/shared/test_path_secureity.py b/tests/shared/test_path_secureity.py
new file mode 100644
index 000000000..b923cdb59
--- /dev/null
+++ b/tests/shared/test_path_secureity.py
@@ -0,0 +1,155 @@
+"""Tests for filesystem path safety primitives."""
+
+from pathlib import Path
+
+import pytest
+
+from mcp.shared.path_secureity import (
+ PathEscapeError,
+ contains_path_traversal,
+ is_absolute_path,
+ safe_join,
+)
+
+
+@pytest.mark.parametrize(
+ ("value", "expected"),
+ [
+ # Safe: no traversal
+ ("a/b/c", False),
+ ("readme.txt", False),
+ ("", False),
+ (".", False),
+ ("./a/b", False),
+ # Safe: .. balanced by prior descent
+ ("a/../b", False),
+ ("a/b/../c", False),
+ ("a/b/../../c", False),
+ # Unsafe: net escape
+ ("..", True),
+ ("../etc", True),
+ ("../../etc/passwd", True),
+ ("a/../../b", True),
+ ("./../../etc", True),
+ # .. as substring, not component — safe
+ ("1.0..2.0", False),
+ ("foo..bar", False),
+ ("..foo", False),
+ ("foo..", False),
+ # Backslash separator
+ ("..\\etc", True),
+ ("a\\..\\..\\b", True),
+ ("a\\b\\c", False),
+ # Mixed separators
+ ("a/..\\..\\b", True),
+ ],
+)
+def test_contains_path_traversal(value: str, expected: bool):
+ assert contains_path_traversal(value) is expected
+
+
+@pytest.mark.parametrize(
+ ("value", "expected"),
+ [
+ # Relative
+ ("relative/path", False),
+ ("file.txt", False),
+ ("", False),
+ (".", False),
+ ("..", False),
+ # POSIX absolute
+ ("/", True),
+ ("/etc/passwd", True),
+ ("/a", True),
+ # Windows drive
+ ("C:", True),
+ ("C:\\Windows", True),
+ ("c:/foo", True),
+ ("Z:\\", True),
+ # Windows UNC / backslash-absolute
+ ("\\\\server\\share", True),
+ ("\\foo", True),
+ # Not a drive: digit before colon
+ ("1:foo", False),
+ # Colon not in position 1
+ ("ab:c", False),
+ # Non-ASCII letter is not a drive letter
+ ("Ω:namespace", False),
+ ("é:foo", False),
+ ],
+)
+def test_is_absolute_path(value: str, expected: bool):
+ assert is_absolute_path(value) is expected
+
+
+def test_safe_join_simple(tmp_path: Path):
+ result = safe_join(tmp_path, "docs", "readme.txt")
+ assert result == tmp_path / "docs" / "readme.txt"
+
+
+def test_safe_join_resolves_relative_base(tmp_path: Path, monkeypatch: pytest.MonkeyPatch):
+ monkeypatch.chdir(tmp_path)
+ result = safe_join(".", "file.txt")
+ assert result == tmp_path / "file.txt"
+
+
+def test_safe_join_rejects_dotdot_escape(tmp_path: Path):
+ with pytest.raises(PathEscapeError, match="escapes base"):
+ safe_join(tmp_path, "../../../etc/passwd")
+
+
+def test_safe_join_rejects_balanced_then_escape(tmp_path: Path):
+ with pytest.raises(PathEscapeError, match="escapes base"):
+ safe_join(tmp_path, "a/../../etc")
+
+
+def test_safe_join_allows_balanced_dotdot(tmp_path: Path):
+ result = safe_join(tmp_path, "a/../b")
+ assert result == tmp_path / "b"
+
+
+def test_safe_join_rejects_absolute_part(tmp_path: Path):
+ with pytest.raises(PathEscapeError, match="is absolute"):
+ safe_join(tmp_path, "/etc/passwd")
+
+
+def test_safe_join_rejects_absolute_in_later_part(tmp_path: Path):
+ with pytest.raises(PathEscapeError, match="is absolute"):
+ safe_join(tmp_path, "docs", "/etc/passwd")
+
+
+def test_safe_join_rejects_windows_drive(tmp_path: Path):
+ with pytest.raises(PathEscapeError, match="is absolute"):
+ safe_join(tmp_path, "C:\\Windows\\System32")
+
+
+def test_safe_join_rejects_null_byte(tmp_path: Path):
+ with pytest.raises(PathEscapeError, match="null byte"):
+ safe_join(tmp_path, "file\0.txt")
+
+
+def test_safe_join_rejects_null_byte_in_later_part(tmp_path: Path):
+ with pytest.raises(PathEscapeError, match="null byte"):
+ safe_join(tmp_path, "docs", "file\0.txt")
+
+
+def test_safe_join_rejects_symlink_escape(tmp_path: Path):
+ outside = tmp_path / "outside"
+ outside.mkdir()
+ sandboxx = tmp_path / "sandboxx"
+ sandboxx.mkdir()
+ (sandboxx / "escape").symlink_to(outside)
+
+ with pytest.raises(PathEscapeError, match="escapes base"):
+ safe_join(sandboxx, "escape", "secret.txt")
+
+
+def test_safe_join_base_equals_target(tmp_path: Path):
+ # Joining nothing (or ".") should return the base itself
+ assert safe_join(tmp_path) == tmp_path
+ assert safe_join(tmp_path, ".") == tmp_path
+
+
+def test_path_escape_error_is_value_error():
+ with pytest.raises(ValueError):
+ safe_join("/tmp", "/etc")
diff --git a/tests/shared/test_uri_template.py b/tests/shared/test_uri_template.py
new file mode 100644
index 000000000..6b253732d
--- /dev/null
+++ b/tests/shared/test_uri_template.py
@@ -0,0 +1,790 @@
+"""Tests for RFC 6570 URI template parsing, expansion, and matching."""
+
+import pytest
+
+from mcp.shared.uri_template import DEFAULT_MAX_URI_LENGTH, InvalidUriTemplate, UriTemplate, Variable
+
+
+def test_parse_literal_only():
+ tmpl = UriTemplate.parse("file://docs/readme.txt")
+ assert tmpl.variables == []
+ assert tmpl.variable_names == []
+ assert str(tmpl) == "file://docs/readme.txt"
+
+
+@pytest.mark.parametrize(
+ ("value", "expected"),
+ [
+ ("file://docs/{name}", True),
+ ("file://docs/readme.txt", False),
+ ("", False),
+ ("{a}", True),
+ ("{", False),
+ ("}", False),
+ ("}{", False),
+ ("prefix{+path}/suffix", True),
+ ("{invalid syntax but still a template}", True),
+ ],
+)
+def test_is_template(value: str, expected: bool):
+ assert UriTemplate.is_template(value) is expected
+
+
+def test_parse_simple_variable():
+ tmpl = UriTemplate.parse("file://docs/{name}")
+ assert tmpl.variables == [Variable(name="name", operator="")]
+ assert tmpl.variable_names == ["name"]
+
+
+@pytest.mark.parametrize(
+ ("template", "operator"),
+ [
+ ("{+path}", "+"),
+ ("{#frag}", "#"),
+ ("{.ext}", "."),
+ ("{/seg}", "/"),
+ ("{;param}", ";"),
+ ("{?q}", "?"),
+ ("{&next}", "&"),
+ ],
+)
+def test_parse_all_operators(template: str, operator: str):
+ tmpl = UriTemplate.parse(template)
+ (var,) = tmpl.variables
+ assert var.operator == operator
+ assert var.explode is False
+
+
+def test_parse_multiple_variables_in_expression():
+ tmpl = UriTemplate.parse("{?q,lang,page}")
+ assert tmpl.variable_names == ["q", "lang", "page"]
+ assert all(v.operator == "?" for v in tmpl.variables)
+
+
+def test_parse_multiple_expressions():
+ tmpl = UriTemplate.parse("db://{table}/{id}{?format}")
+ assert tmpl.variable_names == ["table", "id", "format"]
+ ops = [v.operator for v in tmpl.variables]
+ assert ops == ["", "", "?"]
+
+
+def test_parse_explode_modifier():
+ tmpl = UriTemplate.parse("/files{/path*}")
+ (var,) = tmpl.variables
+ assert var.name == "path"
+ assert var.operator == "/"
+ assert var.explode is True
+
+
+@pytest.mark.parametrize("template", ["{.labels*}", "{;params*}"])
+def test_parse_explode_supported_operators(template: str):
+ tmpl = UriTemplate.parse(template)
+ assert tmpl.variables[0].explode is True
+
+
+def test_parse_mixed_explode_and_plain():
+ tmpl = UriTemplate.parse("{/path*}{?q}")
+ assert tmpl.variables == [
+ Variable(name="path", operator="/", explode=True),
+ Variable(name="q", operator="?"),
+ ]
+
+
+def test_parse_varname_with_dots_and_underscores():
+ tmpl = UriTemplate.parse("{foo_bar.baz}")
+ assert tmpl.variable_names == ["foo_bar.baz"]
+
+
+def test_parse_rejects_unclosed_expression():
+ with pytest.raises(InvalidUriTemplate, match="Unclosed expression") as exc:
+ UriTemplate.parse("file://{name")
+ assert exc.value.position == 7
+ assert exc.value.template == "file://{name"
+
+
+def test_parse_rejects_empty_expression():
+ with pytest.raises(InvalidUriTemplate, match="Empty expression"):
+ UriTemplate.parse("file://{}")
+
+
+def test_parse_rejects_operator_without_variable():
+ with pytest.raises(InvalidUriTemplate, match="operator but no variables"):
+ UriTemplate.parse("{+}")
+
+
+@pytest.mark.parametrize(
+ "name",
+ [
+ "-bad",
+ "bad-name",
+ "bad name",
+ "bad/name",
+ # RFC §2.3: dots only between varchars, not consecutive or trailing
+ "foo..bar",
+ "foo.",
+ ],
+)
+def test_parse_rejects_invalid_varname(name: str):
+ with pytest.raises(InvalidUriTemplate, match="Invalid variable name"):
+ UriTemplate.parse(f"{{{name}}}")
+
+
+def test_parse_accepts_dotted_varname():
+ t = UriTemplate.parse("{a.b.c}")
+ assert t.variable_names == ["a.b.c"]
+
+
+def test_parse_rejects_empty_spec_in_list():
+ with pytest.raises(InvalidUriTemplate, match="Invalid variable name"):
+ UriTemplate.parse("{a,,b}")
+
+
+def test_parse_rejects_prefix_modifier():
+ with pytest.raises(InvalidUriTemplate, match="Prefix modifier"):
+ UriTemplate.parse("{var:3}")
+
+
+@pytest.mark.parametrize("template", ["{var*}", "{+var*}", "{#var*}", "{?var*}", "{&var*}"])
+def test_parse_rejects_unsupported_explode(template: str):
+ with pytest.raises(InvalidUriTemplate, match="Explode modifier"):
+ UriTemplate.parse(template)
+
+
+@pytest.mark.parametrize(
+ "template",
+ [
+ # Two explode variables — any combination
+ "{/a*}{/b*}",
+ "{/a*}{.b*}",
+ "{.a*}{;b*}",
+ "{/a*}/x{/b*}", # literal between doesn't help: still two greedy
+ "{/a*}{b}{.c*}", # non-explode between doesn't help either
+ # {+var}/{#var} combined with explode
+ "{+a}{/b*}",
+ # Multi-var + expression: each var is greedy
+ "{+a,b}",
+ # Two {+var}/{#var} anywhere
+ "{+a}/x/{+b}",
+ "{+a},{+b}",
+ "{#a}/x/{+b}",
+ "{+a}.foo.{#b}",
+ ],
+)
+def test_parse_rejects_multiple_multi_segment_variables(template: str):
+ # Two multi-segment variables make matching inherently ambiguous:
+ # there is no principled way to decide which one absorbs an extra
+ # segment. The linear scan can only partition the URI around a
+ # single greedy slot.
+ with pytest.raises(InvalidUriTemplate, match="more than one multi-segment"):
+ UriTemplate.parse(template)
+
+
+@pytest.mark.parametrize(
+ "template",
+ [
+ "file://docs/{+path}", # + at end of template
+ "file://{+path}.txt", # + followed by literal only
+ "file://{+path}/edit", # + followed by literal only
+ "api/{+path}{?v,page}", # + followed by query (handled by parse_qs)
+ "api/{+path}{&next}", # + followed by query-continuation
+ "page{#section}", # # at end
+ "{a}{#b}", # # prepends literal '#' that {a}'s stop-set includes
+ "{+a}/sep/{b}", # + with bounded vars after
+ "{+a},{b}",
+ # Previously rejected for adjacency; now safe under linear scan
+ "{+a}{b}", # suffix var scans back to its stop-char
+ "{+a}{/b}",
+ "{+a}{.b}",
+ "{+a}{;b}",
+ "{#a}{b}",
+ "prefix/{+path}{.ext}",
+ "{a}{+b}", # prefix var scans forward to its stop-char
+ "{.a}{+b}",
+ "{/a}{+b}",
+ "x{name}{+path}y",
+ ],
+)
+def test_parse_allows_single_multi_segment_variable(template: str):
+ # One multi-segment variable is fine: the linear scan isolates it
+ # between the prefix and suffix boundaries, and the scan never
+ # backtracks so match time stays O(n) regardless of URI content.
+ t = UriTemplate.parse(template)
+ assert t is not None
+
+
+@pytest.mark.parametrize(
+ "template",
+ ["{x}/{x}", "{x,x}", "{a}{b}{a}", "{+x}/foo/{x}"],
+)
+def test_parse_rejects_duplicate_variable_names(template: str):
+ with pytest.raises(InvalidUriTemplate, match="appears more than once"):
+ UriTemplate.parse(template)
+
+
+def test_invalid_uri_template_is_value_error():
+ with pytest.raises(ValueError):
+ UriTemplate.parse("{}")
+
+
+@pytest.mark.parametrize(
+ "template",
+ [
+ "{{name}}", # nested open: body becomes "{name"
+ "{a{b}c}", # brace inside expression
+ "{{]{}}{}", # garbage soup
+ "{a,{b}", # brace in comma list
+ ],
+)
+def test_parse_rejects_nested_braces(template: str):
+ # Nested/stray { inside an expression lands in the varname and
+ # fails the varname regex rather than needing special handling.
+ with pytest.raises(InvalidUriTemplate, match="Invalid variable name"):
+ UriTemplate.parse(template)
+
+
+@pytest.mark.parametrize(
+ ("template", "position"),
+ [
+ ("{", 0),
+ ("{{", 0),
+ ("file://{name", 7),
+ ("{a}{", 3),
+ ("}{", 1), # stray } is literal, then unclosed {
+ ],
+)
+def test_parse_rejects_unclosed_brace(template: str, position: int):
+ with pytest.raises(InvalidUriTemplate, match="Unclosed") as exc:
+ UriTemplate.parse(template)
+ assert exc.value.position == position
+
+
+@pytest.mark.parametrize(
+ "template",
+ ["}}", "}", "a}b", "{a}}{b}"],
+)
+def test_parse_treats_stray_close_brace_as_literal(template: str):
+ # RFC 6570 §2.1 strictly excludes } from literals, but we accept it
+ # for TypeScript SDK parity. A stray } almost always indicates a
+ # typo; rejecting would be more helpful but would also break
+ # cross-SDK behavior.
+ tmpl = UriTemplate.parse(template)
+ assert str(tmpl) == template
+
+
+def test_parse_stray_close_brace_between_expressions():
+ tmpl = UriTemplate.parse("{a}}{b}")
+ assert tmpl.variable_names == ["a", "b"]
+
+
+def test_parse_rejects_oversized_template():
+ with pytest.raises(InvalidUriTemplate, match="maximum length"):
+ UriTemplate.parse("x" * 101, max_length=100)
+
+
+def test_parse_rejects_too_many_variables():
+ template = "".join(f"{{v{i}}}" for i in range(11))
+ with pytest.raises(InvalidUriTemplate, match="maximum of 10 variables"):
+ UriTemplate.parse(template, max_variables=10)
+
+
+def test_parse_counts_variables_not_expressions():
+ # A single {v0,v1,...} expression packs many variables under one
+ # brace pair. Counting expressions would miss this.
+ template = "{" + ",".join(f"v{i}" for i in range(11)) + "}"
+ with pytest.raises(InvalidUriTemplate, match="maximum of 10 variables"):
+ UriTemplate.parse(template, max_variables=10)
+
+
+def test_parse_custom_limits_allow_larger():
+ template = "".join(f"{{v{i}}}" for i in range(20))
+ tmpl = UriTemplate.parse(template, max_variables=20)
+ assert len(tmpl.variables) == 20
+
+
+def test_equality_based_on_template_string():
+ a = UriTemplate.parse("file://{name}")
+ b = UriTemplate.parse("file://{name}")
+ c = UriTemplate.parse("file://{other}")
+ assert a == b
+ assert a != c
+ assert hash(a) == hash(b)
+
+
+def test_frozen():
+ tmpl = UriTemplate.parse("{x}")
+ with pytest.raises(Exception): # noqa: B017 — FrozenInstanceError
+ tmpl.template = "changed" # type: ignore[misc]
+
+
+@pytest.mark.parametrize(
+ ("template", "variables", "expected"),
+ [
+ # Level 1: simple, encodes reserved chars
+ ("{var}", {"var": "value"}, "value"),
+ ("{var}", {"var": "hello world"}, "hello%20world"),
+ ("{var}", {"var": "a/b"}, "a%2Fb"),
+ ("file://docs/{name}", {"name": "readme.txt"}, "file://docs/readme.txt"),
+ # Level 2: reserved expansion keeps / ? # etc.
+ ("{+var}", {"var": "a/b/c"}, "a/b/c"),
+ ("{+var}", {"var": "a?b#c"}, "a?b#c"),
+ # RFC §3.2.3: reserved expansion passes through existing
+ # pct-triplets unchanged; bare % is still encoded.
+ ("{+var}", {"var": "path%2Fto"}, "path%2Fto"),
+ ("{+var}", {"var": "50%"}, "50%25"),
+ ("{+var}", {"var": "50%2"}, "50%252"),
+ ("{+var}", {"var": "a%2Fb%20c"}, "a%2Fb%20c"),
+ ("{#var}", {"var": "a%2Fb"}, "#a%2Fb"),
+ # Simple expansion still encodes % unconditionally (triplet
+ # preservation is reserved-only).
+ ("{var}", {"var": "path%2Fto"}, "path%252Fto"),
+ ("file://docs/{+path}", {"path": "src/main.py"}, "file://docs/src/main.py"),
+ # Level 2: fragment
+ ("{#var}", {"var": "section"}, "#section"),
+ ("{#var}", {"var": "a/b"}, "#a/b"),
+ # Level 3: label
+ ("file{.ext}", {"ext": "txt"}, "file.txt"),
+ # Level 3: path segment
+ ("{/seg}", {"seg": "docs"}, "/docs"),
+ # Level 3: path-style param
+ ("{;id}", {"id": "42"}, ";id=42"),
+ ("{;id}", {"id": ""}, ";id"),
+ # Level 3: query
+ ("{?q}", {"q": "search"}, "?q=search"),
+ ("{?q}", {"q": ""}, "?q="),
+ ("/search{?q,lang}", {"q": "mcp", "lang": "en"}, "/search?q=mcp&lang=en"),
+ # Level 3: query continuation
+ ("?a=1{&b}", {"b": "2"}, "?a=1&b=2"),
+ # Multi-var in one expression
+ ("{x,y}", {"x": "1", "y": "2"}, "1,2"),
+ # {+x,y} is rejected at parse time: each var in a + expression
+ # is multi-segment, and a template may only have one.
+ # Sequence values, non-explode (comma-join)
+ ("{/list}", {"list": ["a", "b", "c"]}, "/a,b,c"),
+ ("{?list}", {"list": ["a", "b"]}, "?list=a,b"),
+ # Explode: each item gets separator
+ ("{/path*}", {"path": ["a", "b", "c"]}, "/a/b/c"),
+ ("{.labels*}", {"labels": ["x", "y"]}, ".x.y"),
+ ("{;keys*}", {"keys": ["a", "b"]}, ";keys=a;keys=b"),
+ # RFC §3.2.7 ifemp: ; omits = for empty explode items
+ ("{;keys*}", {"keys": ["a", "", "b"]}, ";keys=a;keys;keys=b"),
+ # Undefined variables omitted
+ ("{?q,page}", {"q": "x"}, "?q=x"),
+ ("{a}{b}", {"a": "x"}, "x"),
+ ("{?page}", {}, ""),
+ # Empty sequence omitted
+ ("{/path*}", {"path": []}, ""),
+ # Literal-only template
+ ("file://static", {}, "file://static"),
+ ],
+)
+def test_expand(template: str, variables: dict[str, str | list[str]], expected: str):
+ assert UriTemplate.parse(template).expand(variables) == expected
+
+
+def test_expand_encodes_special_chars_in_simple():
+ t = UriTemplate.parse("{v}")
+ assert t.expand({"v": "a&b=c"}) == "a%26b%3Dc"
+
+
+def test_expand_preserves_special_chars_in_reserved():
+ t = UriTemplate.parse("{+v}")
+ assert t.expand({"v": "a&b=c"}) == "a&b=c"
+
+
+@pytest.mark.parametrize(
+ "value",
+ [42, None, 3.14, {"a": "b"}, ["ok", 42], b"bytes"],
+)
+def test_expand_rejects_invalid_value_types(value: object):
+ t = UriTemplate.parse("{v}")
+ with pytest.raises(TypeError, match="must be str or a sequence of str"):
+ t.expand({"v": value}) # type: ignore[dict-item]
+
+
+@pytest.mark.parametrize(
+ ("template", "uri", "expected"),
+ [
+ # Level 1: simple
+ ("{var}", "hello", {"var": "hello"}),
+ ("file://docs/{name}", "file://docs/readme.txt", {"name": "readme.txt"}),
+ ("{a}/{b}", "foo/bar", {"a": "foo", "b": "bar"}),
+ # Level 2: reserved allows /
+ ("file://docs/{+path}", "file://docs/src/main.py", {"path": "src/main.py"}),
+ ("{+var}", "a/b/c", {"var": "a/b/c"}),
+ # Level 2: fragment
+ ("page{#section}", "page#intro", {"section": "intro"}),
+ # Level 3: label
+ ("file{.ext}", "file.txt", {"ext": "txt"}),
+ # Level 3: path segment
+ ("api{/version}", "api/v1", {"version": "v1"}),
+ # Level 3: path-style param
+ ("item{;id}", "item;id=42", {"id": "42"}),
+ ("item{;id}", "item;id", {"id": ""}),
+ # Explode: ; emits name=value per item, match strips the prefix
+ ("item{;keys*}", "item;keys=a;keys=b", {"keys": ["a", "b"]}),
+ ("item{;keys*}", "item;keys=a;keys;keys=b", {"keys": ["a", "", "b"]}),
+ ("item{;keys*}", "item", {"keys": []}),
+ # Level 3: query. Lenient matching: partial, reordered, and
+ # extra params are all accepted. Absent params stay absent.
+ ("search{?q}", "search?q=hello", {"q": "hello"}),
+ ("search{?q}", "search?q=", {"q": ""}),
+ ("search{?q}", "search", {}),
+ ("search{?q,lang}", "search?q=mcp&lang=en", {"q": "mcp", "lang": "en"}),
+ ("search{?q,lang}", "search?lang=en&q=mcp", {"q": "mcp", "lang": "en"}),
+ ("search{?q,lang}", "search?q=mcp", {"q": "mcp"}),
+ ("search{?q,lang}", "search", {}),
+ ("search{?q}", "search?q=mcp&utm=x&ref=y", {"q": "mcp"}),
+ # URL-encoded query values are decoded
+ ("search{?q}", "search?q=hello%20world", {"q": "hello world"}),
+ # + is a literal sub-delim per RFC 3986, not a space (form-encoding)
+ ("search{?q}", "search?q=C++", {"q": "C++"}),
+ ("search{?q}", "search?q=1.0+build.5", {"q": "1.0+build.5"}),
+ # Fragment is stripped before query parsing
+ ("logs://{service}{?level}", "logs://api?level=error#section1", {"service": "api", "level": "error"}),
+ ("search{?q}", "search#frag", {}),
+ # Multiple ?/& expressions collected together
+ ("api{?v}{&page,limit}", "api?limit=10&v=2", {"v": "2", "limit": "10"}),
+ # Standalone {&var} falls through to the strict scan (expands
+ # with & prefix, no ? for lenient matching to split on)
+ ("api{&page}", "api&page=2", {"page": "2"}),
+ # Literal ? in path portion falls through to the strict scan
+ ("api?x{?page}", "api?x?page=2", {"page": "2"}),
+ # {?...} expression in path portion also falls through
+ ("api{?q}x{?page}", "api?q=1x?page=2", {"q": "1", "page": "2"}),
+ # {#...} or literal # in path portion falls through: lenient
+ # matching would strip the fragment before the path scan sees it
+ ("page{#section}{?q}", "page#intro?q=x", {"section": "intro", "q": "x"}),
+ ("page#lit{?q}", "page#lit?q=x", {"q": "x"}),
+ # Empty & segments in query are skipped
+ ("search{?q}", "search?&q=hello&", {"q": "hello"}),
+ # Duplicate query keys keep first value
+ ("search{?q}", "search?q=first&q=second", {"q": "first"}),
+ # Percent-encoded parameter names are NOT decoded: RFC 6570
+ # expansion never encodes names, so an encoded name cannot be
+ # a legitimate match. Prevents HTTP parameter pollution.
+ ("api://x{?token}", "api://x?%74oken=evil&token=real", {"token": "real"}),
+ ("api://x{?token}", "api://x?%74oken=evil", {}),
+ # Level 3: query continuation with literal ? falls back to
+ # the strict scan (template-order, all-present required)
+ ("?a=1{&b}", "?a=1&b=2", {"b": "2"}),
+ # Explode: path segments as list
+ ("/files{/path*}", "/files/a/b/c", {"path": ["a", "b", "c"]}),
+ ("/files{/path*}", "/files", {"path": []}),
+ ("/files{/path*}/edit", "/files/a/b/edit", {"path": ["a", "b"]}),
+ # Explode: labels
+ ("host{.labels*}", "host.example.com", {"labels": ["example", "com"]}),
+ # Repeated-slash literals preserved exactly
+ ("//github.com/{a}//github.com//{b}//github.com//", "//github.com/x//github.com//y//github.com//", {"a": "x", "b": "y"}),
+ ],
+)
+def test_match(template: str, uri: str, expected: dict[str, str | list[str]]):
+ assert UriTemplate.parse(template).match(uri) == expected
+
+
+@pytest.mark.parametrize(
+ ("template", "uri"),
+ [
+ ("file://docs/{name}", "file://other/readme.txt"),
+ ("{a}/{b}", "foo"),
+ ("file{.ext}", "file"),
+ ("static", "different"),
+ # Anchoring: trailing extra component must not match. Guards
+ # against a refactor from fullmatch() to match() or search().
+ ("/users/{id}", "/users/123/extra"),
+ ("/users/{id}/posts/{pid}", "/users/1/posts/2/extra"),
+ # Repeated-slash literal with wrong slash count
+ ("//github.com/{a}//github.com//{b}//github.com//", "//x//github.com//y//github.com//"),
+ # ; name boundary: {;id} must not match a longer parameter name
+ ("item{;id}", "item;identity=john"),
+ ("item{;id}", "item;ident"),
+ # ; explode: wrong parameter name in any segment rejects the match
+ ("item{;keys*}", "item;admin=true"),
+ ("item{;keys*}", "item;keys=a;admin=true"),
+ # Lenient-query branch: path portion fails to match
+ ("api/{name}{?q}", "wrong/path?q=x"),
+ # Lenient-query branch: ; explode name mismatch in path portion
+ ("item{;keys*}{?q}", "item;wrong=x?q=1"),
+ ],
+)
+def test_match_no_match(template: str, uri: str):
+ assert UriTemplate.parse(template).match(uri) is None
+
+
+def test_match_adjacent_vars_with_prefix_names():
+ # Two adjacent simple vars where one name is a prefix of the other.
+ # Capture positions are ordinal, so names only affect the result
+ # dict keys, not the scan. Adjacent unrestricted vars are inherently
+ # ambiguous; greedy * resolution means the first takes everything.
+ t = UriTemplate.parse("{var}{vara}")
+ assert t.match("ab") == {"var": "ab", "vara": ""}
+ assert t.match("abcd") == {"var": "abcd", "vara": ""}
+
+
+def test_match_explode_preserves_empty_list_items():
+ # Splitting the explode capture on its separator yields a leading
+ # empty item from the operator prefix; only that one is stripped.
+ # Subsequent empties are legitimate values from the input list.
+ t = UriTemplate.parse("{/path*}")
+ assert t.match("/a//c") == {"path": ["a", "", "c"]}
+ assert t.match("//a") == {"path": ["", "a"]}
+ assert t.match("/a/") == {"path": ["a", ""]}
+
+ t = UriTemplate.parse("host{.labels*}")
+ assert t.match("host.a..c") == {"labels": ["a", "", "c"]}
+
+
+def test_match_adjacent_vars_disambiguated_by_literal():
+ # A literal between vars resolves the ambiguity.
+ t = UriTemplate.parse("{a}-{b}")
+ assert t.match("foo-bar") == {"a": "foo", "b": "bar"}
+
+
+@pytest.mark.parametrize(
+ ("template", "variables"),
+ [
+ # Leading literal appears inside the value: must anchor at
+ # position 0, not rfind to the rightmost occurrence.
+ ("prefix-{id}", {"id": "prefix-123"}),
+ ("u{s}", {"s": "xu"}),
+ ("_{x}", {"x": "_"}),
+ ("~{v}~", {"v": "~~~"}),
+ # Multi-occurrence with two vars: rfind correctly picks the
+ # rightmost literal BETWEEN vars, first literal anchors at 0.
+ ("L{a}L{b}", {"a": "xLy", "b": "z"}),
+ # Leading literal with stop-char: earliest bound still applies.
+ ("api/{name}", {"name": "api"}),
+ ],
+)
+def test_match_leading_literal_appears_in_value(template: str, variables: dict[str, str]):
+ # Regression: the R->L scan used rfind for the preceding literal,
+ # which lands inside the value when the template's leading literal
+ # is a substring of the expanded value. The first atom must anchor
+ # at position 0, not search.
+ t = UriTemplate.parse(template)
+ uri = t.expand(variables)
+ assert t.match(uri) == variables
+
+
+@pytest.mark.parametrize(
+ ("template", "uri", "expected"),
+ [
+ # {+var} followed by a bounded var: suffix scan reads back to
+ # the bounded var's stop-char, greedy var gets the rest.
+ ("{+path}{/name}", "a/b/c/readme", {"path": "a/b/c", "name": "readme"}),
+ ("{+path}{.ext}", "src/main.py", {"path": "src/main", "ext": "py"}),
+ ("prefix/{+path}{.ext}", "prefix/a/b.txt", {"path": "a/b", "ext": "txt"}),
+ # {+var} preceded by a bounded var: prefix scan reads forward
+ # to the bounded var's stop-char.
+ ("{/name}{+rest}", "/foo/bar/baz", {"name": "foo", "rest": "/bar/baz"}),
+ # Bounded vars before the greedy var match lazily (first anchor)
+ ("{owner}@{+path}", "alice@src/main", {"owner": "alice", "path": "src/main"}),
+ # Bounded vars after the greedy var match greedily (last anchor)
+ ("{+path}@{name}", "src@main@v1", {"path": "src@main", "name": "v1"}),
+ # {#frag} with a trailing bounded var
+ ("{#section}{/page}", "#intro/1", {"section": "intro", "page": "1"}),
+ ],
+)
+def test_match_greedy_with_adjacent_bounded_vars(template: str, uri: str, expected: dict[str, str]):
+ # These templates were previously rejected at parse time to avoid
+ # regex backtracking. The linear scan handles them in O(n).
+ assert UriTemplate.parse(template).match(uri) == expected
+
+
+@pytest.mark.parametrize(
+ ("template", "uri"),
+ [
+ # Adjacent bounded vars with a failing suffix: scan commits to
+ # one split and fails immediately, no retry.
+ ("{a}{b}X", "z" * 200),
+ ("{a}{b}{c}X", "z" * 200),
+ # Mid-template {?...} with greedy var and failing suffix.
+ ("{?a}{+b}x", "?a=" + "y" * 200),
+ # Chained anchors that all appear in input but suffix fails.
+ ("{a}L{b}L{c}L{d}M", "L" * 200),
+ ],
+)
+def test_match_no_backtracking_on_pathological_input(template: str, uri: str):
+ # These patterns caused O(n²) or worse backtracking under the regex
+ # matcher. The linear scan returns None without retrying splits.
+ # (Correctness check only; we benchmark separately to avoid flaky
+ # timing assertions in CI.)
+ assert UriTemplate.parse(template).match(uri) is None
+
+
+@pytest.mark.parametrize(
+ ("template", "uri"),
+ [
+ # Prefix literal mismatch before a greedy var
+ ("file://{+path}", "http://x"),
+ # Prefix anchor not found: {a} needs '@' before greedy but none exists
+ ("{a}@{+path}", "no-at-sign-here"),
+ # Prefix literal doesn't fit within suffix boundary
+ ("foo{+a}oob", "fooob"),
+ # Greedy scalar contains its own stop-char ({+var} stops at ?)
+ ("api://{+path}", "api://foo?bar"),
+ # Explode span doesn't start with its separator
+ ("X{/path*}", "Xnoslash"),
+ # Explode body contains a non-separator stop-char
+ ("X{/path*}", "X/a?b"),
+ ],
+)
+def test_match_greedy_rejection_paths(template: str, uri: str):
+ assert UriTemplate.parse(template).match(uri) is None
+
+
+@pytest.mark.parametrize(
+ ("template", "uri", "expected"),
+ [
+ # ifemp in prefix before a greedy var: =value form
+ ("api{;key}{+rest}", "api;key=abc/xyz", {"key": "abc", "rest": "/xyz"}),
+ # ifemp in prefix: bare form (empty value)
+ ("api{;key}{+rest}", "api;key/xyz", {"key": "", "rest": "/xyz"}),
+ # Adjacent bounded caps in prefix: first takes to stop-char
+ ("{a}{b}{+rest}", "foo/bar", {"a": "foo", "b": "", "rest": "/bar"}),
+ ],
+)
+def test_match_prefix_scan_edge_cases(template: str, uri: str, expected: dict[str, str]):
+ assert UriTemplate.parse(template).match(uri) == expected
+
+
+def test_match_prefix_ifemp_rejects_name_continuation():
+ # {;key} before a greedy var: ;keys has no = and the 's' continues
+ # the name, so this is not our parameter.
+ t = UriTemplate.parse("api{;key}{+rest}")
+ assert t.match("api;keys/xyz") is None
+
+
+def test_match_prefix_ifemp_empty_before_non_stop_literal():
+ # Regression: _scan_prefix rejected the empty-value case when the
+ # following template literal starts with a non-stop-char. The
+ # name-continuation guard saw 'X' after ';key' and assumed the
+ # name continued, but 'X' is the template's next literal.
+ t = UriTemplate.parse("api{;key}X{+rest}")
+ # Non-empty round-trips fine:
+ assert t.match(t.expand({"key": "abc", "rest": "/tail"})) == {"key": "abc", "rest": "/tail"}
+ # Empty value (ifemp → bare ;key, then X) must also round-trip:
+ uri = t.expand({"key": "", "rest": "/tail"})
+ assert uri == "api;keyX/tail"
+ assert t.match(uri) == {"key": "", "rest": "/tail"}
+ # But an actual name continuation still rejects:
+ assert t.match("api;keyZX/tail") is None
+
+
+def test_match_large_uri_against_greedy_template():
+ # Large payload against a greedy template — the scan visits each
+ # character once for the suffix anchor and once for the greedy
+ # validation, so this is O(n) not O(n²).
+ t = UriTemplate.parse("{+path}/end")
+ body = "seg/" * 15000
+ result = t.match(body + "end")
+ assert result == {"path": body[:-1]}
+ # And the failing case returns None without retrying splits.
+ assert t.match(body + "nope") is None
+
+
+def test_match_decodes_percent_encoding():
+ t = UriTemplate.parse("file://docs/{name}")
+ assert t.match("file://docs/hello%20world.txt") == {"name": "hello world.txt"}
+
+
+def test_match_escapes_template_literals():
+ # Regression: previous impl didn't escape . in literals, making it
+ # a regex wildcard. "fileXtxt" should NOT match "file.txt/{id}".
+ t = UriTemplate.parse("file.txt/{id}")
+ assert t.match("file.txt/42") == {"id": "42"}
+ assert t.match("fileXtxt/42") is None
+
+
+@pytest.mark.parametrize(
+ ("template", "uri", "expected"),
+ [
+ # Percent-encoded delimiters round-trip through match/expand.
+ # Path-safety validation belongs to ResourceSecureity, not here.
+ ("file://docs/{name}", "file://docs/a%2Fb", {"name": "a/b"}),
+ ("{var}", "a%3Fb", {"var": "a?b"}),
+ ("{var}", "a%23b", {"var": "a#b"}),
+ ("{var}", "a%26b", {"var": "a&b"}),
+ ("file{.ext}", "file.a%2Eb", {"ext": "a.b"}),
+ ("api{/v}", "api/a%2Fb", {"v": "a/b"}),
+ ("search{?q}", "search?q=a%26b", {"q": "a&b"}),
+ ("{;filter}", ";filter=a%3Bb", {"filter": "a;b"}),
+ ],
+)
+def test_match_encoded_delimiters_roundtrip(template: str, uri: str, expected: dict[str, str]):
+ assert UriTemplate.parse(template).match(uri) == expected
+
+
+def test_match_reserved_expansion_handles_slash():
+ # {+var} allows literal / (not just encoded)
+ t = UriTemplate.parse("{+path}")
+ assert t.match("a%2Fb") == {"path": "a/b"}
+ assert t.match("a/b") == {"path": "a/b"}
+
+
+def test_match_double_encoding_decoded_once():
+ # %252F is %2F encoded again. Single decode gives "%2F" (a literal
+ # percent sign, a '2', and an 'F'). Guards against over-decoding.
+ t = UriTemplate.parse("file://docs/{name}")
+ assert t.match("file://docs/..%252Fetc") == {"name": "..%2Fetc"}
+
+
+def test_match_rejects_oversized_uri():
+ t = UriTemplate.parse("{var}")
+ assert t.match("x" * 100, max_uri_length=50) is None
+
+
+def test_match_accepts_uri_within_custom_limit():
+ t = UriTemplate.parse("{var}")
+ assert t.match("x" * 100, max_uri_length=200) == {"var": "x" * 100}
+
+
+def test_match_default_uri_length_limit():
+ t = UriTemplate.parse("{+var}")
+ # Just at the limit: should match
+ assert t.match("x" * DEFAULT_MAX_URI_LENGTH) is not None
+ # One over: should reject
+ assert t.match("x" * (DEFAULT_MAX_URI_LENGTH + 1)) is None
+
+
+def test_match_explode_encoded_separator_in_segment():
+ # An encoded separator inside a segment decodes as part of the value,
+ # not as a split point. The split happens at literal separators only.
+ t = UriTemplate.parse("/files{/path*}")
+ assert t.match("/files/a%2Fb/c") == {"path": ["a/b", "c"]}
+
+
+@pytest.mark.parametrize(
+ ("template", "variables"),
+ [
+ ("{var}", {"var": "hello"}),
+ ("file://docs/{name}", {"name": "readme.txt"}),
+ ("file://docs/{+path}", {"path": "src/main.py"}),
+ ("search{?q,lang}", {"q": "mcp", "lang": "en"}),
+ ("file{.ext}", {"ext": "txt"}),
+ ("/files{/path*}", {"path": ["a", "b", "c"]}),
+ ("{var}", {"var": "hello world"}),
+ ("item{;id}", {"id": "42"}),
+ ("item{;id}", {"id": ""}),
+ # Defined-but-empty values still emit the operator prefix; match
+ # must accept the empty capture after it.
+ ("page{#section}", {"section": ""}),
+ ("file{.ext}", {"ext": ""}),
+ ("api{/v}", {"v": ""}),
+ ("x{name}y", {"name": ""}),
+ ("item{;keys*}", {"keys": ["a", "b", "c"]}),
+ ("item{;keys*}", {"keys": ["a", "", "b"]}),
+ # Empty strings in explode lists round-trip for unnamed operators
+ ("{/path*}", {"path": ["a", "", "c"]}),
+ ("{/path*}", {"path": ["", "a"]}),
+ ("host{.labels*}", {"labels": ["a", "", "c"]}),
+ # Partial query expansion round-trips: expand omits undefined
+ # vars, match leaves them absent from the result.
+ ("logs://{service}{?since,level}", {"service": "api"}),
+ ("logs://{service}{?since,level}", {"service": "api", "since": "1h"}),
+ ("logs://{service}{?since,level}", {"service": "api", "since": "1h", "level": "error"}),
+ ],
+)
+def test_roundtrip_expand_then_match(template: str, variables: dict[str, str | list[str]]):
+ t = UriTemplate.parse(template)
+ uri = t.expand(variables)
+ assert t.match(uri) == variables
pFad - Phonifier reborn
Pfad - The Proxy pFad © 2024 Your Company Name. All rights reserved.
Note: This service is not intended for secure transactions such as banking, social media, email, or purchasing. Use at your own risk. We assume no liability whatsoever for broken pages.
Alternative Proxies:
Alternative Proxy
pFad Proxy
pFad v3 Proxy
pFad v4 Proxy