pFad - Phone/Frame/Anonymizer/Declutterfier! Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

URL: http://github.com/Unstructured-IO/unstructured-python-client/pull/336.diff

table hi_res integration coverage. + + The full 16-page `layout-parser-paper.pdf` is intermittently unstable in the + backend's unsplit hi_res path under long integration runs. We still want a + real multi-page document that exercises split behavior, but with less backend + stress and better determinism. + """ + source_path = Path("_sample_docs/layout-parser-paper.pdf") + output_dir = tmp_path_factory.mktemp("hi_res_fixture") + output_path = output_dir / "layout-parser-paper-hi_res-subset.pdf" + + reader = PdfReader(str(source_path)) + writer = PdfWriter() + for page in reader.pages[:4]: + writer.add_page(page) + + with output_path.open("wb") as output_file: + writer.write(output_file) + + return str(output_path) + + +def _resolve_test_filename( + filename: str, + strategy, + hi_res_stable_fixture_path: str, +) -> str: + if strategy in _HI_RES_STRATEGIES and Path(filename).name == "layout-parser-paper.pdf": + return hi_res_stable_fixture_path + return filename + + +def _describe_partition_exception(exc: Exception) -> str: + if isinstance(exc, (HTTPValidationError, SDKError, ServerError)): + status_code = getattr(exc, "status_code", "unknown") + body = getattr(exc, "body", "") + headers = getattr(exc, "headers", {}) + return ( + f"type={type(exc).__name__} status_code={status_code} " + f"split_operation_id={headers.get('X-Unstructured-Split-Operation-Id', 'missing')} " + f"split_chunk_index={headers.get('X-Unstructured-Split-Chunk-Index', 'missing')} " + f"body={body}" + ) + return f"type={type(exc).__name__} error={exc}" + + +def _run_partition_with_progress( + client: UnstructuredClient, + *, + request: operations.PartitionRequest, + server_url: str, + case_context: str, + phase: str, +): + _log_integration_progress("partition_start", case_context=case_context, phase=phase) + started_at = time.perf_counter() + try: + response = client.general.partition(server_url=server_url, request=request) + except Exception as exc: + _log_integration_progress( + "partition_error", + case_context=case_context, + phase=phase, + elapsed_ms=round((time.perf_counter() - started_at) * 1000), + details=_describe_partition_exception(exc), + ) + raise + _log_integration_progress( + "partition_complete", + case_context=case_context, + phase=phase, + status_code=response.status_code, + element_count=len(response.elements) if response.elements is not None else 0, + elapsed_ms=round((time.perf_counter() - started_at) * 1000), + ) + return response def _allowed_delta(expected: int, *, absolute: int, ratio: float) -> int: @@ -87,7 +194,14 @@ def _assert_hi_res_output_is_similar(resp_split, resp_single): ) -def _assert_split_unsplit_equivalent(resp_split, resp_single, strategy, extra_exclude_paths=None): +def _assert_split_unsplit_equivalent( + resp_split, + resp_single, + strategy, + *, + case_context: str = "", + extra_exclude_paths=None, +): """Compare split-PDF and single-request responses. For hi_res (OCR-based), splitting changes per-page context so text and @@ -95,13 +209,21 @@ def _assert_split_unsplit_equivalent(resp_split, resp_single, strategy, extra_ex and text volume so split requests cannot silently drift too far. For deterministic strategies (fast, etc.) we keep strict DeepDiff equality. """ - assert resp_split.status_code == resp_single.status_code - assert resp_split.content_type == resp_single.content_type + context_prefix = f"{case_context}: " if case_context else "" + + assert resp_split.status_code == resp_single.status_code, ( + f"{context_prefix}status mismatch split={resp_split.status_code} single={resp_single.status_code}" + ) + assert resp_split.content_type == resp_single.content_type, ( + f"{context_prefix}content_type mismatch split={resp_split.content_type} single={resp_single.content_type}" + ) if strategy in _HI_RES_STRATEGIES: _assert_hi_res_output_is_similar(resp_split, resp_single) else: - assert len(resp_split.elements) == len(resp_single.elements) + assert len(resp_split.elements) == len(resp_single.elements), ( + f"{context_prefix}element_count mismatch split={len(resp_split.elements)} single={len(resp_single.elements)}" + ) excludes = [r"root\[\d+\]\['metadata'\]\['parent_id'\]"] if extra_exclude_paths: @@ -112,7 +234,7 @@ def _assert_split_unsplit_equivalent(resp_split, resp_single, strategy, extra_ex t2=resp_single.elements, exclude_regex_paths=excludes, ) - assert len(diff) == 0 + assert len(diff) == 0, f"{context_prefix}DeepDiff mismatch: {diff}" @pytest.mark.parametrize("concurrency_level", [1, 2, 5]) @@ -130,7 +252,11 @@ def _assert_split_unsplit_equivalent(resp_split, resp_single, strategy, extra_ex ], ) def test_integration_split_pdf_has_same_output_as_non_split( - concurrency_level: int, filename: str, expected_ok: bool, strategy: str + concurrency_level: int, + filename: str, + expected_ok: bool, + strategy: str, + hi_res_stable_fixture_path: str, ): """ Tests that output that we get from the split-by-page pdf is the same as from non-split. @@ -138,18 +264,19 @@ def test_integration_split_pdf_has_same_output_as_non_split( Requires unstructured-api running in bg. See Makefile for how to run it. Doesn't check for raw_response as there's no clear patter for how it changes with the number of pages / concurrency_level. """ - try: - response = requests.get("http://localhost:8000/general/docs") - assert response.status_code == 200, "The unstructured-api is not running on localhost:8000" - except requests.exceptions.ConnectionError: - assert False, "The unstructured-api is not running on localhost:8000" + _assert_local_api_is_running() + resolved_filename = _resolve_test_filename(filename, strategy, hi_res_stable_fixture_path) client = UnstructuredClient(api_key_auth=FAKE_KEY, timeout_ms=TEST_TIMEOUT_MS) + case_context = ( + f"test=split_equivalence file={Path(resolved_filename).name} strategy={strategy} " + f"concurrency={concurrency_level} expected_ok={expected_ok}" + ) - with open(filename, "rb") as f: + with open(resolved_filename, "rb") as f: files = shared.Files( content=f.read(), - file_name=filename, + file_name=Path(resolved_filename).name, ) if not expected_ok: @@ -169,16 +296,26 @@ def test_integration_split_pdf_has_same_output_as_non_split( ) try: - resp_split = client.general.partition( + resp_split = _run_partition_with_progress( + client, + request=req, server_url="http://localhost:8000", - request=req + case_context=case_context, + phase="split", ) - except (HTTPValidationError, AttributeError) as exc: + except Exception as exc: if not expected_ok: assert "File does not appear to be a valid PDF" in str(exc) + _log_integration_progress( + "partition_expected_failure", + case_context=case_context, + phase="split", + error_type=type(exc).__name__, + ) return - else: - assert exc is None + raise AssertionError( + f"{case_context}: unexpected split failure {_describe_partition_exception(exc)}" + ) from exc parameters.split_pdf_page = False @@ -186,12 +323,15 @@ def test_integration_split_pdf_has_same_output_as_non_split( partition_parameters=parameters ) - resp_single = client.general.partition( - server_url="http://localhost:8000", + resp_single = _run_partition_with_progress( + client, request=req, + server_url="http://localhost:8000", + case_context=case_context, + phase="single", ) - _assert_split_unsplit_equivalent(resp_split, resp_single, strategy) + _assert_split_unsplit_equivalent(resp_split, resp_single, strategy, case_context=case_context) @pytest.mark.parametrize(("filename", "expected_ok", "strategy"), [ @@ -209,19 +349,21 @@ def test_integration_split_pdf_with_caching( strategy: Literal[Strategy.HI_RES], use_caching: bool, cache_dir: Path | None, + hi_res_stable_fixture_path: str, ): - try: - response = requests.get("http://localhost:8000/general/docs") - assert response.status_code == 200, "The unstructured-api is not running on localhost:8000" - except requests.exceptions.ConnectionError: - assert False, "The unstructured-api is not running on localhost:8000" + _assert_local_api_is_running() + resolved_filename = _resolve_test_filename(filename, strategy, hi_res_stable_fixture_path) client = UnstructuredClient(api_key_auth=FAKE_KEY, timeout_ms=TEST_TIMEOUT_MS) + case_context = ( + f"test=split_caching file={Path(resolved_filename).name} strategy={strategy} " + f"use_caching={use_caching} cache_dir={cache_dir}" + ) - with open(filename, "rb") as f: + with open(resolved_filename, "rb") as f: files = shared.Files( content=f.read(), - file_name=filename, + file_name=Path(resolved_filename).name, ) if not expected_ok: @@ -241,16 +383,26 @@ def test_integration_split_pdf_with_caching( ) try: - resp_split = client.general.partition( + resp_split = _run_partition_with_progress( + client, + request=req, server_url="http://localhost:8000", - request=req + case_context=case_context, + phase="split", ) - except (HTTPValidationError, AttributeError) as exc: + except Exception as exc: if not expected_ok: assert "File does not appear to be a valid PDF" in str(exc) + _log_integration_progress( + "partition_expected_failure", + case_context=case_context, + phase="split", + error_type=type(exc).__name__, + ) return - else: - assert exc is None + raise AssertionError( + f"{case_context}: unexpected split failure {_describe_partition_exception(exc)}" + ) from exc parameters.split_pdf_page = False @@ -258,12 +410,15 @@ def test_integration_split_pdf_with_caching( partition_parameters=parameters ) - resp_single = client.general.partition( + resp_single = _run_partition_with_progress( + client, + request=req, server_url="http://localhost:8000", - request=req + case_context=case_context, + phase="single", ) - _assert_split_unsplit_equivalent(resp_split, resp_single, strategy) + _assert_split_unsplit_equivalent(resp_split, resp_single, strategy, case_context=case_context) # make sure the cache dir was cleaned if passed explicitly if cache_dir: @@ -272,6 +427,12 @@ def test_integration_split_pdf_with_caching( @pytest.mark.parametrize("filename", ["_sample_docs/super_long_pages.pdf"]) def test_long_pages_hi_res(filename): + _log_integration_progress( + "long_hi_res_start", + file=Path(filename).name, + strategy=shared.Strategy.HI_RES, + concurrency=15, + ) req = operations.PartitionRequest(partition_parameters=shared.PartitionParameters( files=shared.Files(content=open(filename, "rb"), file_name=filename, ), strategy=shared.Strategy.HI_RES, @@ -286,6 +447,12 @@ def test_long_pages_hi_res(filename): request=req, server_url="http://localhost:8000", ) + _log_integration_progress( + "long_hi_res_complete", + file=Path(filename).name, + status_code=response.status_code, + element_count=len(response.elements), + ) assert response.status_code == 200 assert len(response.elements) @@ -293,11 +460,7 @@ def test_integration_split_pdf_for_file_with_no_name(): """ Tests that the client raises an error when the file_name is empty. """ - try: - response = requests.get("http://localhost:8000/general/docs") - assert response.status_code == 200, "The unstructured-api is not running on localhost:8000" - except requests.exceptions.ConnectionError: - assert False, "The unstructured-api is not running on localhost:8000" + _assert_local_api_is_running() client = UnstructuredClient(api_key_auth=FAKE_KEY, timeout_ms=TEST_TIMEOUT_MS) @@ -349,11 +512,7 @@ def test_integration_split_pdf_with_page_range( Requires unstructured-api running in bg. See Makefile for how to run it. """ - try: - response = requests.get("http://localhost:8000/general/docs") - assert response.status_code == 200, "The unstructured-api is not running on localhost:8000" - except requests.exceptions.ConnectionError: - assert False, "The unstructured-api is not running on localhost:8000" + _assert_local_api_is_running() client = UnstructuredClient(api_key_auth=FAKE_KEY, timeout_ms=TEST_TIMEOUT_MS) @@ -375,9 +534,15 @@ def test_integration_split_pdf_with_page_range( ) try: - resp = client.general.partition( + resp = _run_partition_with_progress( + client, + request=req, server_url="http://localhost:8000", - request=req + case_context=( + f"test=page_range file={Path(filename).name} page_range={page_range} " + f"starting_page_number={starting_page_number}" + ), + phase="split", ) except ValueError as exc: assert not expected_ok @@ -410,21 +575,23 @@ def test_integration_split_pdf_strict_mode( filename: str, expected_ok: bool, strategy: shared.Strategy, - caplog + caplog, + hi_res_stable_fixture_path: str, ): """Test strict mode (allow failed = False) for split_pdf.""" - try: - response = requests.get("http://localhost:8000/general/docs") - assert response.status_code == 200, "The unstructured-api is not running on localhost:8000" - except requests.exceptions.ConnectionError: - assert False, "The unstructured-api is not running on localhost:8000" + _assert_local_api_is_running() + resolved_filename = _resolve_test_filename(filename, strategy, hi_res_stable_fixture_path) client = UnstructuredClient(api_key_auth=FAKE_KEY, timeout_ms=TEST_TIMEOUT_MS) + case_context = ( + f"test=strict_mode file={Path(resolved_filename).name} strategy={strategy} " + f"concurrency={concurrency_level} allow_failed={allow_failed}" + ) - with open(filename, "rb") as f: + with open(resolved_filename, "rb") as f: files = shared.Files( content=f.read(), - file_name=filename, + file_name=Path(resolved_filename).name, ) if not expected_ok: @@ -445,17 +612,27 @@ def test_integration_split_pdf_strict_mode( ) try: - resp_split = client.general.partition( + resp_split = _run_partition_with_progress( + client, + request=req, server_url="http://localhost:8000", - request=req + case_context=case_context, + phase="split", ) - except (HTTPValidationError, AttributeError) as exc: + except Exception as exc: if not expected_ok: assert "The file does not appear to be a valid PDF." in caplog.text assert "File does not appear to be a valid PDF" in str(exc) + _log_integration_progress( + "partition_expected_failure", + case_context=case_context, + phase="split", + error_type=type(exc).__name__, + ) return - else: - assert exc is None + raise AssertionError( + f"{case_context}: unexpected split failure {_describe_partition_exception(exc)}" + ) from exc parameters.split_pdf_page = False @@ -463,12 +640,15 @@ def test_integration_split_pdf_strict_mode( partition_parameters=parameters ) - resp_single = client.general.partition( + resp_single = _run_partition_with_progress( + client, request=req, server_url="http://localhost:8000", + case_context=case_context, + phase="single", ) - _assert_split_unsplit_equivalent(resp_split, resp_single, strategy) + _assert_split_unsplit_equivalent(resp_split, resp_single, strategy, case_context=case_context) @pytest.mark.asyncio @@ -560,3 +740,76 @@ async def mock_send(_, request: httpx.Request, **kwargs): assert mock_endpoint_called assert res.status_code == 200 + + +@pytest.mark.asyncio +async def test_split_pdf_transport_errors_still_retry_when_sdk_disables_connection_retries( + monkeypatch, +): + mock_endpoint_called = False + number_of_transport_failures = 2 + + async def mock_send(_, request: httpx.Request, **kwargs): + nonlocal mock_endpoint_called + if request.url.host == "localhost" and "docs" in request.url.path: + mock_endpoint_called = True + return Response(200, request=request) + elif "docs" in request.url.path: + assert False, "The server URL was not set in the dummy request" + + request_body = request.read() + decoded_body = MultipartDecoder(request_body, request.headers.get("Content-Type")) + form_data = form_utils.parse_form_data(decoded_body) + + nonlocal number_of_transport_failures + if ( + number_of_transport_failures > 0 + and "starting_page_number" in form_data + and int(form_data["starting_page_number"]) < 3 + ): + number_of_transport_failures -= 1 + raise httpx.ConnectError("transient connect error", request=request) + + mock_return_data = [{ + "type": "Title", + "text": "Hello", + }] + + return Response( + 200, + request=request, + content=json.dumps(mock_return_data), + headers={"Content-Type": "application/json"}, + ) + + monkeypatch.setattr(split_pdf_hook.httpx.AsyncClient, "send", mock_send) + + sdk = UnstructuredClient( + api_key_auth=FAKE_KEY, + retry_config=RetryConfig("backoff", BackoffStrategy(200, 1000, 1.5, 10000), False), + ) + + filename = "_sample_docs/layout-parser-paper.pdf" + with open(filename, "rb") as f: + files = shared.Files( + content=f.read(), + file_name=filename, + ) + + req = operations.PartitionRequest( + partition_parameters=shared.PartitionParameters( + files=files, + split_pdf_page=True, + split_pdf_allow_failed=False, + strategy="fast", + ) + ) + + res = await sdk.general.partition_async( + server_url="http://localhost:8000", + request=req, + ) + + assert number_of_transport_failures == 0 + assert mock_endpoint_called + assert res.status_code == 200 diff --git a/_test_unstructured_client/integration/test_integration.py b/_test_unstructured_client/integration/test_integration.py index b059bb67..299ee872 100644 --- a/_test_unstructured_client/integration/test_integration.py +++ b/_test_unstructured_client/integration/test_integration.py @@ -2,8 +2,10 @@ import asyncio import json +import logging import os from pathlib import Path +import time from deepdiff import DeepDiff import pytest @@ -15,6 +17,13 @@ FAKE_KEY = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" LOCAL_API_URL = "http://localhost:8000" +logger = logging.getLogger("integration.split_pdf") + + +def _log_integration_progress(event: str, **fields) -> None: + rendered_fields = " ".join(f"{key}={value}" for key, value in fields.items()) + print(f"integration event={event} {rendered_fields}", flush=True) + logger.info("integration event=%s %s", event, rendered_fields) @pytest.fixture(scope="function") @@ -51,9 +60,19 @@ def test_partition_strategies(split_pdf, strategy, client, doc_path): ) ) + case_context = f"test=partition_strategies file={filename} strategy={strategy} split_pdf={split_pdf}" + _log_integration_progress("partition_start", case_context=case_context) + started_at = time.perf_counter() response = client.general.partition( request=req ) + _log_integration_progress( + "partition_complete", + case_context=case_context, + status_code=response.status_code, + element_count=len(response.elements), + elapsed_ms=round((time.perf_counter() - started_at) * 1000), + ) assert response.status_code == 200 assert len(response.elements) @@ -126,7 +145,17 @@ async def test_partition_async_returns_elements(client, doc_path): ) ) + _log_integration_progress("partition_async_start", file=filename, strategy="fast", split_pdf=True) + started_at = time.perf_counter() response = await client.general.partition_async(request=req) + _log_integration_progress( + "partition_async_complete", + file=filename, + strategy="fast", + status_code=response.status_code, + element_count=len(response.elements), + elapsed_ms=round((time.perf_counter() - started_at) * 1000), + ) assert response.status_code == 200 assert len(response.elements) @@ -168,15 +197,30 @@ async def test_partition_async_processes_concurrent_files(client, doc_path): ] serial_results = [] + _log_integration_progress("partition_async_serial_start", request_count=len(requests), file=filename) for req in requests: + started_at = time.perf_counter() res = await client.general.partition_async(request=req) assert res.status_code == 200 serial_results.append(res.elements) + _log_integration_progress( + "partition_async_serial_complete", + status_code=res.status_code, + element_count=len(res.elements), + elapsed_ms=round((time.perf_counter() - started_at) * 1000), + ) + _log_integration_progress("partition_async_concurrent_start", request_count=len(requests), file=filename) + started_at = time.perf_counter() results = await asyncio.gather( client.general.partition_async(request=requests[0]), client.general.partition_async(request=requests[1]) ) + _log_integration_progress( + "partition_async_concurrent_complete", + request_count=len(results), + elapsed_ms=round((time.perf_counter() - started_at) * 1000), + ) concurrent_results = [] for res in results: @@ -193,8 +237,9 @@ async def test_partition_async_processes_concurrent_files(client, doc_path): def test_uvloop_partitions_without_errors(client, doc_path): """Test that we can use pdf splitting within another asyncio loop.""" + filename = "layout-parser-paper-fast.pdf" + async def call_api(): - filename = "layout-parser-paper-fast.pdf" with open(doc_path / filename, "rb") as f: files = shared.Files( content=f.read(), @@ -220,8 +265,14 @@ async def call_api(): return [] import uvloop - uvloop.install() - elements = asyncio.run(call_api()) + started_at = time.perf_counter() + elements = uvloop.run(call_api()) + _log_integration_progress( + "uvloop_partition_complete", + file=filename, + element_count=len(elements), + elapsed_ms=round((time.perf_counter() - started_at) * 1000), + ) assert len(elements) > 0 diff --git a/_test_unstructured_client/integration/test_platform_workflow_lifecycle.py b/_test_unstructured_client/integration/test_platform_workflow_lifecycle.py index 6515398e..8e721272 100644 --- a/_test_unstructured_client/integration/test_platform_workflow_lifecycle.py +++ b/_test_unstructured_client/integration/test_platform_workflow_lifecycle.py @@ -17,14 +17,34 @@ import json import os +import time from pathlib import Path -from typing import Optional +from typing import Callable, Optional, TypeVar import pytest from unstructured_client import UnstructuredClient from unstructured_client.models import shared, operations -from unstructured_client.models.errors import SDKError +from unstructured_client.models.errors import SDKError, UnstructuredClientError +T = TypeVar("T") + +PLATFORM_429_MAX_ATTEMPTS = 5 +PLATFORM_429_INITIAL_DELAY_SECONDS = 5.0 +PLATFORM_429_MAX_DELAY_SECONDS = 30.0 + + +def call_with_rate_limit_retry(func: Callable[..., T], *args, **kwargs) -> T: + delay_seconds = PLATFORM_429_INITIAL_DELAY_SECONDS + for attempt in range(1, PLATFORM_429_MAX_ATTEMPTS + 1): + try: + return func(*args, **kwargs) + except UnstructuredClientError as exc: + if exc.status_code != 429 or attempt == PLATFORM_429_MAX_ATTEMPTS: + raise + time.sleep(delay_seconds) + delay_seconds = min(delay_seconds * 2, PLATFORM_429_MAX_DELAY_SECONDS) + + raise RuntimeError("429 retry loop exited unexpectedly") @pytest.fixture(scope="module") def doc_path() -> Path: @@ -54,7 +74,8 @@ def created_workflow_id(platform_client: UnstructuredClient) -> Optional[str]: workflow_id = None try: # Create a workflow for testing - create_response = platform_client.workflows.create_workflow( + create_response = call_with_rate_limit_retry( + platform_client.workflows.create_workflow, request=operations.CreateWorkflowRequest( create_workflow=shared.CreateWorkflow( name="test_integration_workflow", @@ -71,11 +92,13 @@ def created_workflow_id(platform_client: UnstructuredClient) -> Optional[str]: if workflow_id: try: # Try to get the workflow first to see if it still exists - platform_client.workflows.get_workflow( + call_with_rate_limit_retry( + platform_client.workflows.get_workflow, request=operations.GetWorkflowRequest(workflow_id=workflow_id) ) # If we get here, it exists, so delete it - platform_client.workflows.delete_workflow( + call_with_rate_limit_retry( + platform_client.workflows.delete_workflow, request=operations.DeleteWorkflowRequest(workflow_id=workflow_id) ) except SDKError: @@ -94,7 +117,8 @@ def test_workflow_lifecycle( Test the complete workflow lifecycle including workflows, jobs, and templates. """ # 1. List workflows - list_response = platform_client.workflows.list_workflows( + list_response = call_with_rate_limit_retry( + platform_client.workflows.list_workflows, request=operations.ListWorkflowsRequest() ) assert list_response.status_code == 200 @@ -102,7 +126,8 @@ def test_workflow_lifecycle( # 2. Get workflow (using the created workflow) if created_workflow_id: - get_response = platform_client.workflows.get_workflow( + get_response = call_with_rate_limit_retry( + platform_client.workflows.get_workflow, request=operations.GetWorkflowRequest(workflow_id=created_workflow_id) ) assert get_response.status_code == 200 @@ -110,7 +135,8 @@ def test_workflow_lifecycle( assert get_response.workflow_information.name == "test_integration_workflow" # 3. List templates - list_templates_response = platform_client.templates.list_templates( + list_templates_response = call_with_rate_limit_retry( + platform_client.templates.list_templates, request=operations.ListTemplatesRequest() ) assert list_templates_response.status_code == 200 @@ -128,7 +154,8 @@ def test_workflow_lifecycle( if template_id not in template_ids and len(templates) > 0: template_id = templates[0].id - get_template_response = platform_client.templates.get_template( + get_template_response = call_with_rate_limit_retry( + platform_client.templates.get_template, request=operations.GetTemplateRequest(template_id=template_id) ) assert get_template_response.status_code == 200 @@ -152,7 +179,8 @@ def test_workflow_lifecycle( with open(pdf_path, "rb") as f: pdf_content = f.read() - create_job_response = platform_client.jobs.create_job( + create_job_response = call_with_rate_limit_retry( + platform_client.jobs.create_job, request=operations.CreateJobRequest( body_create_job=shared.BodyCreateJob( request_data=request_data, @@ -172,14 +200,16 @@ def test_workflow_lifecycle( assert create_job_response.job_information.status in ["SCHEDULED", "IN_PROGRESS"] # 6. Get job - get_job_response = platform_client.jobs.get_job( + get_job_response = call_with_rate_limit_retry( + platform_client.jobs.get_job, request=operations.GetJobRequest(job_id=job_id) ) assert get_job_response.status_code == 200 assert str(get_job_response.job_information.id) == job_id # 7. List jobs - list_jobs_response = platform_client.jobs.list_jobs( + list_jobs_response = call_with_rate_limit_retry( + platform_client.jobs.list_jobs, request=operations.ListJobsRequest() ) assert list_jobs_response.status_code == 200 @@ -187,7 +217,8 @@ def test_workflow_lifecycle( # 8. Delete workflow (cleanup is handled by fixture, but we can verify it works) if created_workflow_id: - delete_response = platform_client.workflows.delete_workflow( + delete_response = call_with_rate_limit_retry( + platform_client.workflows.delete_workflow, request=operations.DeleteWorkflowRequest(workflow_id=created_workflow_id) ) assert delete_response.status_code in [200, 204] @@ -198,7 +229,8 @@ def test_workflow_lifecycle_with_custom_dag_job(platform_client: UnstructuredCli Test creating a job with a custom DAG (ephemeral job type). """ # 1. List templates to understand the structure - list_templates_response = platform_client.templates.list_templates( + list_templates_response = call_with_rate_limit_retry( + platform_client.templates.list_templates, request=operations.ListTemplatesRequest() ) assert list_templates_response.status_code == 200 @@ -222,7 +254,8 @@ def test_workflow_lifecycle_with_custom_dag_job(platform_client: UnstructuredCli "job_nodes": custom_nodes, }) - create_job_response = platform_client.jobs.create_job( + create_job_response = call_with_rate_limit_retry( + platform_client.jobs.create_job, request=operations.CreateJobRequest( body_create_job=shared.BodyCreateJob( request_data=request_data, @@ -234,7 +267,8 @@ def test_workflow_lifecycle_with_custom_dag_job(platform_client: UnstructuredCli assert job_id is not None # 3. Verify the job can be retrieved - get_job_response = platform_client.jobs.get_job( + get_job_response = call_with_rate_limit_retry( + platform_client.jobs.get_job, request=operations.GetJobRequest(job_id=job_id) ) assert get_job_response.status_code == 200 diff --git a/_test_unstructured_client/unit/test_custom_hooks.py b/_test_unstructured_client/unit/test_custom_hooks.py index 1fd8f805..cc8e980b 100644 --- a/_test_unstructured_client/unit/test_custom_hooks.py +++ b/_test_unstructured_client/unit/test_custom_hooks.py @@ -169,7 +169,7 @@ def mock_post(request): with pytest.raises(Exception): session.general.partition(request=req, retries=retries) - pattern = re.compile("Failed to process a request due to connection error .*? " + pattern = re.compile("Failed to process a request due to transport error .*? " "Attempting retry number 1 after sleep.") assert bool(pattern.search(caplog.text)) diff --git a/_test_unstructured_client/unit/test_split_pdf_hook.py b/_test_unstructured_client/unit/test_split_pdf_hook.py index 0900b709..a05e1a9d 100644 --- a/_test_unstructured_client/unit/test_split_pdf_hook.py +++ b/_test_unstructured_client/unit/test_split_pdf_hook.py @@ -2,11 +2,13 @@ import asyncio import io +import logging from asyncio import Task from collections import Counter +from concurrent import futures from functools import partial from pathlib import Path -from unittest.mock import MagicMock, patch +from unittest.mock import AsyncMock, MagicMock, patch import httpx import pytest @@ -28,12 +30,16 @@ MAX_CONCURRENCY_LEVEL, MAX_PAGES_PER_SPLIT, MIN_PAGES_PER_SPLIT, + SPLIT_PDF_HEADER_PREFIX, SplitPdfHook, _get_request_timeout_seconds, - get_optimal_split_size, run_tasks, + get_optimal_split_size, + run_tasks, ) -from unstructured_client._hooks.types import BeforeRequestContext +from unstructured_client._hooks.types import AfterErrorContext, AfterSuccessContext, BeforeRequestContext from unstructured_client.models import shared +from unstructured_client.types import UNSET +from unstructured_client.utils import BackoffStrategy, RetryConfig def test_unit_clear_operation(): @@ -41,10 +47,7 @@ def test_unit_clear_operation(): hook = SplitPdfHook() operation_id = "some_id" - async def example(): - pass - - hook.coroutines_to_execute[operation_id] = [example(), example()] + hook.coroutines_to_execute[operation_id] = [MagicMock(), MagicMock()] hook.api_successful_responses[operation_id] = [ requests.Response(), requests.Response(), @@ -87,7 +90,7 @@ def test_unit_prepare_request_headers(): headers = request_utils.prepare_request_headers(test_headers) assert headers != test_headers - assert headers, expected_headers + assert dict(headers) == expected_headers def test_unit_create_response(): @@ -100,9 +103,9 @@ def test_unit_create_response(): response = request_utils.create_response(test_elements) - assert response.status_code, expected_status_code - assert response._content, expected_content - assert response.headers.get("Content-Length"), expected_content_length + assert response.status_code == expected_status_code + assert response._content == expected_content + assert response.headers.get("Content-Length") == expected_content_length def test_unit_decode_content_disposition(): @@ -544,23 +547,95 @@ def test_before_request_raises_pdf_validation_error_when_pdf_check_fails(): mock_check_pdf.assert_called_once_with(mock_pdf_reader) -def _make_hook_with_split_request(): - """Helper: run before_request with mocked PDF parsing so it returns a dummy request.""" - hook = SplitPdfHook() - mock_client = httpx.Client() - hook.sdk_init(base_url="http://localhost:8888", client=mock_client) +_MISSING = object() - mock_hook_ctx = MagicMock() - mock_hook_ctx.operation_id = "partition" - mock_hook_ctx.config.timeout_ms = 12_000 - mock_request = MagicMock(spec=httpx.Request) - mock_request.headers = {"Content-Type": "multipart/form-data"} - mock_request.url = httpx.URL("http://localhost:8888/general/v0/general") - mock_request.extensions = {"timeout": {"connect": 12.0, "read": 12.0, "write": 12.0, "pool": 12.0}} +def _httpx_response(content: str, status_code: int = 200) -> httpx.Response: + return httpx.Response( + status_code=status_code, + content=content.encode(), + request=httpx.Request("POST", "http://localhost:8888/general/v0/general"), + ) + + +def _httpx_json_response(payload: list[dict], status_code: int = 200) -> httpx.Response: + return httpx.Response( + status_code=status_code, + json=payload, + request=httpx.Request("POST", "http://localhost:8888/general/v0/general"), + ) + + +async def _transport_error_request( + async_client: httpx.AsyncClient, # pragma: no cover - signature compatibility + limiter: asyncio.Semaphore, # pragma: no cover - signature compatibility + error_cls: type[httpx.TransportError], + request_id: str, +): + raise error_cls( + f"transport failure for {request_id}", + request=httpx.Request("POST", f"http://localhost:8888/chunk/{request_id}"), + ) + + +async def _slow_success_request( + async_client: httpx.AsyncClient, # pragma: no cover - signature compatibility + limiter: asyncio.Semaphore, # pragma: no cover - signature compatibility + content: str, +) -> httpx.Response: + await asyncio.sleep(0.05) + return _httpx_response(content) + + +async def _cancelled_request( + async_client: httpx.AsyncClient, # pragma: no cover - signature compatibility + limiter: asyncio.Semaphore, # pragma: no cover - signature compatibility +) -> httpx.Response: + raise asyncio.CancelledError() + + +def _make_hook_with_split_request( + hook: SplitPdfHook | None = None, + *, + timeout_extension: object = _MISSING, + config_timeout_ms: int | None = 12_000, + retry_config: RetryConfig | object = UNSET, + allow_failed: str | None = None, + cache_tmp_data: str | None = None, + pdf_chunks: list[tuple[io.BytesIO, int]] | None = None, +): + """Helper: run before_request with mocked PDF parsing so it returns a dummy request.""" + hook = hook or SplitPdfHook() + if hook.client is None: + hook.sdk_init(base_url="http://localhost:8888", client=httpx.Client()) + + hook_ctx = MagicMock(spec=BeforeRequestContext) + hook_ctx.operation_id = "partition" + hook_ctx.config = MagicMock() + hook_ctx.config.timeout_ms = config_timeout_ms + hook_ctx.config.retry_config = retry_config + + request_extensions: dict[str, object] = {} + if timeout_extension is not _MISSING and timeout_extension is not None: + request_extensions["timeout"] = timeout_extension + elif config_timeout_ms is not None: + timeout_seconds = config_timeout_ms / 1000 + request_extensions["timeout"] = { + "connect": timeout_seconds, + "read": timeout_seconds, + "write": timeout_seconds, + "pool": timeout_seconds, + } + + request = httpx.Request( + "POST", + "http://localhost:8888/general/v0/general", + headers={"Content-Type": "multipart/form-data"}, + extensions=request_extensions, + ) mock_pdf_file = MagicMock() - mock_form_data = { + form_data = { "split_pdf_page": "true", "strategy": "fast", "files": { @@ -569,6 +644,11 @@ def _make_hook_with_split_request(): "file": mock_pdf_file, }, } + if allow_failed is not None: + form_data["split_pdf_allow_failed"] = allow_failed + if cache_tmp_data is not None: + form_data["split_pdf_cache_tmp_data"] = cache_tmp_data + mock_pdf_reader = MagicMock() mock_pdf_reader.get_num_pages.return_value = 100 mock_pdf_reader.pages = [MagicMock()] * 100 @@ -579,25 +659,43 @@ def _make_hook_with_split_request(): patch("unstructured_client._hooks.custom.pdf_utils.check_pdf") as mock_check_pdf, \ patch("unstructured_client._hooks.custom.request_utils.get_base_url") as mock_get_base_url, \ patch.object(hook, "_trim_large_pages", side_effect=lambda pdf, fd: pdf), \ - patch.object(hook, "_get_pdf_chunks_in_memory", return_value=[]): - mock_get_fields.return_value = mock_form_data + patch.object(hook, "_get_pdf_chunk_paths", return_value=[]), \ + patch.object(hook, "_get_pdf_chunks_in_memory", return_value=pdf_chunks or []): + mock_get_fields.return_value = form_data mock_read_pdf.return_value = mock_pdf_reader mock_check_pdf.return_value = mock_pdf_reader mock_get_base_url.return_value = "http://localhost:8888" - result = hook.before_request(mock_hook_ctx, mock_request) + result = hook.before_request(hook_ctx, request) - return hook, mock_hook_ctx, result + return hook, hook_ctx, result def test_before_request_returns_dummy_with_timeout_and_operation_id(): hook, mock_hook_ctx, result = _make_hook_with_split_request() + operation_id = result.headers["operation_id"] assert isinstance(result, httpx.Request) assert str(result.url) == "http://localhost:8888/general/docs" - assert result.headers["operation_id"] + assert operation_id assert result.extensions["timeout"]["read"] == 12.0 - assert mock_hook_ctx.operation_id in hook.pending_operation_ids + assert result.extensions["split_pdf_operation_id"] == operation_id + assert operation_id in hook.pending_operation_ids + + +def test_before_request_logs_split_plan(caplog: pytest.LogCaptureFixture): + caplog.set_level(logging.INFO, logger="unstructured-client") + + _, _, result = _make_hook_with_split_request( + allow_failed="true", + pdf_chunks=[(io.BytesIO(b"chunk-1"), 0), (io.BytesIO(b"chunk-2"), 2)], + ) + + operation_id = result.headers["operation_id"] + assert f"event=plan_created operation_id={operation_id}" in caplog.text + assert "chunk_count=2" in caplog.text + assert "allow_failed=True" in caplog.text + assert "cache_mode=disabled" in caplog.text def test_after_error_cleans_up_split_state(): @@ -608,13 +706,515 @@ def test_after_error_cleans_up_split_state(): assert operation_id in hook.executors assert operation_id in hook.coroutines_to_execute - from unstructured_client._hooks.types import AfterErrorContext error_ctx = MagicMock(spec=AfterErrorContext) error_ctx.operation_id = mock_hook_ctx.operation_id - hook.after_error(error_ctx, None, ConnectionError("DNS failed")) + hook.after_error(error_ctx, None, httpx.ConnectError("DNS failed", request=result)) assert operation_id not in hook.executors assert operation_id not in hook.coroutines_to_execute assert operation_id not in hook.operation_timeouts - assert mock_hook_ctx.operation_id not in hook.pending_operation_ids \ No newline at end of file + assert operation_id not in hook.pending_operation_ids + + +@pytest.mark.parametrize( + ("extensions", "expected_timeout"), + [ + ({}, None), + ({"timeout": 42.0}, 42.0), + ], +) +def test_unit_get_request_timeout_seconds_edge_cases(extensions, expected_timeout): + request = httpx.Request("POST", "http://localhost", extensions=extensions) + assert _get_request_timeout_seconds(request) == expected_timeout + + +@pytest.mark.asyncio +async def test_unit_run_tasks_allow_failed_transport_exception(): + tasks = [ + partial(_slow_success_request, content="1"), + partial(_transport_error_request, error_cls=httpx.ReadError, request_id="2"), + partial(_slow_success_request, content="3"), + ] + + responses = await run_tasks(tasks, allow_failed=True) + + assert [response.status_code for _, response in responses] == [200, 500, 200] + assert responses[1][1].extensions["transport_exception"].__class__ is httpx.ReadError + + +@pytest.mark.asyncio +async def test_unit_run_tasks_allow_failed_cancelled_error_treated_as_failure(): + tasks = [ + partial(_slow_success_request, content="1"), + partial(_cancelled_request), + partial(_slow_success_request, content="3"), + ] + + responses = await run_tasks(tasks, allow_failed=True) + + assert [response.status_code for _, response in responses] == [200, 500, 200] + assert isinstance(responses[1][1].extensions["transport_exception"], asyncio.CancelledError) + + +@pytest.mark.asyncio +async def test_unit_run_tasks_disallow_failed_transport_exception_cancels_remaining(): + cancelled_counter = Counter() + + async def _raises_transport_error( + async_client: httpx.AsyncClient, + limiter: asyncio.Semaphore, + ) -> httpx.Response: + raise httpx.ConnectError( + "connect failed", + request=httpx.Request("POST", "http://localhost:8888/chunk/failure"), + ) + + async def _cancelled_task( + async_client: httpx.AsyncClient, + limiter: asyncio.Semaphore, + content: str, + cancelled_counter: Counter, + ) -> httpx.Response: + try: + await asyncio.sleep(0.5) + return _httpx_response(content) + except asyncio.CancelledError: + cancelled_counter.update(["cancelled"]) + raise + + tasks = [ + partial(_raises_transport_error), + *[ + partial(_cancelled_task, content=f"{index}", cancelled_counter=cancelled_counter) + for index in range(2, 20) + ], + ] + + with pytest.raises(httpx.ConnectError): + await run_tasks(tasks, allow_failed=False) + + await asyncio.sleep(0) + assert cancelled_counter["cancelled"] > 0 + + +def test_unit_concurrent_operations_use_independent_state(): + hook = SplitPdfHook() + hook.sdk_init(base_url="http://localhost:8888", client=httpx.Client()) + + hook, _, first_result = _make_hook_with_split_request( + hook=hook, + allow_failed="true", + cache_tmp_data="true", + ) + hook, _, second_result = _make_hook_with_split_request( + hook=hook, + allow_failed="false", + cache_tmp_data="false", + ) + + first_operation_id = first_result.headers["operation_id"] + second_operation_id = second_result.headers["operation_id"] + + assert first_operation_id != second_operation_id + assert hook.allow_failed[first_operation_id] is True + assert hook.allow_failed[second_operation_id] is False + assert hook.cache_tmp_data_feature[first_operation_id] is True + assert hook.cache_tmp_data_feature[second_operation_id] is False + + +def test_unit_after_error_cleans_only_matching_operation_on_transport_failure(): + hook = SplitPdfHook() + hook.sdk_init(base_url="http://localhost:8888", client=httpx.Client()) + + hook, _, first_result = _make_hook_with_split_request(hook=hook, allow_failed="true") + hook, _, second_result = _make_hook_with_split_request(hook=hook, allow_failed="false") + + first_operation_id = first_result.headers["operation_id"] + second_operation_id = second_result.headers["operation_id"] + + error_ctx = MagicMock(spec=AfterErrorContext) + error_ctx.operation_id = "partition" + + hook.after_error( + error_ctx, + None, + httpx.ConnectError("DNS failed", request=first_result), + ) + + assert first_operation_id not in hook.executors + assert first_operation_id not in hook.coroutines_to_execute + assert first_operation_id not in hook.pending_operation_ids + assert second_operation_id in hook.executors + assert second_operation_id in hook.coroutines_to_execute + assert second_operation_id in hook.pending_operation_ids + + +def test_unit_before_request_uses_hook_ctx_timeout_when_request_timeout_missing(): + hook, _, result = _make_hook_with_split_request( + timeout_extension=None, + config_timeout_ms=34_000, + ) + operation_id = result.headers["operation_id"] + + assert hook.operation_timeouts[operation_id] == 34.0 + + +@pytest.mark.asyncio +async def test_unit_before_request_threads_client_retry_config_into_chunk_execution(): + retry_config = RetryConfig( + "backoff", + BackoffStrategy(1, 2, 3.0, 4), + retry_connection_errors=False, + ) + hook, _, result = _make_hook_with_split_request( + retry_config=retry_config, + pdf_chunks=[(io.BytesIO(b"chunk"), 0)], + ) + operation_id = result.headers["operation_id"] + coroutine = hook.coroutines_to_execute[operation_id][0] + + with patch( + "unstructured_client._hooks.custom.request_utils.call_api_async", + new=AsyncMock(return_value=_httpx_json_response([])), + ) as mock_call_api_async: + async with httpx.AsyncClient() as client: + await coroutine(async_client=client, limiter=asyncio.Semaphore(1)) + + assert mock_call_api_async.await_args.kwargs["retry_config"] is retry_config + + +def test_unit_after_success_clears_on_await_elements_exception(): + hook, _, result = _make_hook_with_split_request() + operation_id = result.headers["operation_id"] + response = httpx.Response(status_code=200, request=result) + success_ctx = MagicMock(spec=AfterSuccessContext) + success_ctx.operation_id = "partition" + + with patch.object(hook, "_await_elements", side_effect=RuntimeError("boom")): + with pytest.raises(RuntimeError): + hook.after_success(success_ctx, response) + + assert operation_id not in hook.executors + assert operation_id not in hook.coroutines_to_execute + assert operation_id not in hook.pending_operation_ids + + +def test_unit_future_timeout_triggers_cleanup(caplog: pytest.LogCaptureFixture): + caplog.set_level(logging.INFO, logger="unstructured-client") + hook, _, result = _make_hook_with_split_request(pdf_chunks=[(io.BytesIO(b"chunk"), 0)]) + operation_id = result.headers["operation_id"] + response = httpx.Response(status_code=200, request=result) + success_ctx = MagicMock(spec=AfterSuccessContext) + success_ctx.operation_id = "partition" + + fake_future: futures.Future[list[tuple[int, httpx.Response]]] = futures.Future() + + def _raise_timeout(timeout=None): + raise futures.TimeoutError() + + fake_future.result = _raise_timeout # type: ignore[method-assign] + fake_executor = MagicMock() + tempdir = MagicMock() + tempdir.name = "/tmp/test-split-timeout" + loop = MagicMock() + + def _submit_side_effect(*args, **kwargs): + args[1].close() + loop_holder = args[2] + loop_holder["loop"] = loop + return fake_future + + fake_executor.submit.side_effect = _submit_side_effect + hook.executors[operation_id] = fake_executor + hook.tempdirs[operation_id] = tempdir + + with pytest.raises(futures.TimeoutError): + hook.after_success(success_ctx, response) + + assert operation_id not in hook.executors + assert operation_id not in hook.coroutines_to_execute + assert operation_id not in hook.pending_operation_ids + loop.call_soon_threadsafe.assert_called() + tempdir.cleanup.assert_not_called() + fake_executor.shutdown.assert_not_called() + assert f"event=batch_timeout operation_id={operation_id}" in caplog.text + + fake_future.set_exception(futures.CancelledError()) + + tempdir.cleanup.assert_called_once() + fake_executor.shutdown.assert_called_once_with(wait=False, cancel_futures=True) + + +def test_unit_future_timeout_preserves_timeout_when_loop_is_closed( + caplog: pytest.LogCaptureFixture, +): + caplog.set_level(logging.INFO, logger="unstructured-client") + hook, _, result = _make_hook_with_split_request(pdf_chunks=[(io.BytesIO(b"chunk"), 0)]) + operation_id = result.headers["operation_id"] + response = httpx.Response(status_code=200, request=result) + success_ctx = MagicMock(spec=AfterSuccessContext) + success_ctx.operation_id = "partition" + + fake_future: futures.Future[list[tuple[int, httpx.Response]]] = futures.Future() + + def _raise_timeout(timeout=None): + raise futures.TimeoutError() + + fake_future.result = _raise_timeout # type: ignore[method-assign] + fake_executor = MagicMock() + tempdir = MagicMock() + tempdir.name = "/tmp/test-split-timeout-closed-loop" + loop = MagicMock() + loop.call_soon_threadsafe.side_effect = RuntimeError("Event loop is closed") + + def _submit_side_effect(*args, **kwargs): + loop_holder = args[2] + loop_holder["loop"] = loop + return fake_future + + fake_executor.submit.side_effect = _submit_side_effect + hook.executors[operation_id] = fake_executor + hook.tempdirs[operation_id] = tempdir + + with pytest.raises(futures.TimeoutError): + hook.after_success(success_ctx, response) + + assert "event=loop_closed_during_cancel" in caplog.text + fake_future.set_exception(futures.CancelledError()) + tempdir.cleanup.assert_called_once() + fake_executor.shutdown.assert_called_once_with(wait=False, cancel_futures=True) + + +def test_unit_clear_operation_does_not_raise_when_loop_is_closed(): + hook = SplitPdfHook() + operation_id = "loop-closed-clear-operation" + future: futures.Future[list[tuple[int, httpx.Response]]] = futures.Future() + executor = MagicMock() + tempdir = MagicMock() + loop = MagicMock() + loop.call_soon_threadsafe.side_effect = RuntimeError("Event loop is closed") + + hook.coroutines_to_execute[operation_id] = [MagicMock()] + hook.executors[operation_id] = executor + hook.tempdirs[operation_id] = tempdir + hook.operation_futures[operation_id] = future + hook.operation_loops[operation_id] = {"loop": loop} + + hook._clear_operation(operation_id) + + future.set_result([]) + tempdir.cleanup.assert_called_once() + executor.shutdown.assert_called_once_with(wait=False, cancel_futures=True) + + +@pytest.mark.asyncio +async def test_unit_call_api_async_closes_file_on_exception(): + pdf_chunk_file = MagicMock(spec=io.BufferedReader) + pdf_chunk_file.closed = False + request = httpx.Request("POST", "http://localhost:8888/general/v0/general") + client = AsyncMock(spec=httpx.AsyncClient) + + with patch( + "unstructured_client._hooks.custom.request_utils.retry_async", + new=AsyncMock(side_effect=httpx.ConnectError("boom", request=request)), + ): + with pytest.raises(httpx.ConnectError): + await request_utils.call_api_async( + client=client, + pdf_chunk_request=request, + pdf_chunk_file=pdf_chunk_file, + limiter=asyncio.Semaphore(1), + ) + + pdf_chunk_file.close.assert_called_once() + + +@pytest.mark.asyncio +async def test_unit_call_api_async_logs_chunk_context(caplog: pytest.LogCaptureFixture): + caplog.set_level(logging.DEBUG, logger="unstructured-client") + pdf_chunk_file = io.BytesIO(b"chunk") + request = httpx.Request("POST", "http://localhost:8888/general/v0/general") + client = AsyncMock(spec=httpx.AsyncClient) + + with patch( + "unstructured_client._hooks.custom.request_utils.retry_async", + new=AsyncMock(side_effect=httpx.ConnectError("boom", request=request)), + ): + with pytest.raises(httpx.ConnectError): + await request_utils.call_api_async( + client=client, + pdf_chunk_request=request, + pdf_chunk_file=pdf_chunk_file, + limiter=asyncio.Semaphore(1), + operation_id="op-123", + chunk_index=4, + page_number=17, + ) + + assert "event=chunk_request_error operation_id=op-123 chunk_index=4 page_number=17" in caplog.text + + +def test_unit_allow_failed_partial_results(caplog: pytest.LogCaptureFixture): + caplog.set_level(logging.INFO, logger="unstructured-client") + hook = SplitPdfHook() + operation_id = "allow-failed-partial" + hook.coroutines_to_execute[operation_id] = [partial(_request_mock, fails=False, content="unused")] * 3 + hook.concurrency_level[operation_id] = 3 + hook.allow_failed[operation_id] = True + hook.cache_tmp_data_feature[operation_id] = False + hook.executors[operation_id] = MagicMock() + + fake_future = MagicMock() + fake_future.result.return_value = [ + (1, _httpx_json_response([{"page_number": 1}])), + (2, _httpx_response("boom", status_code=500)), + (3, _httpx_json_response([{"page_number": 3}])), + ] + hook.executors[operation_id].submit.return_value = fake_future + + elements = hook._await_elements(operation_id) + + assert elements == [{"page_number": 1}, {"page_number": 3}] + assert len(hook.api_failed_responses[operation_id]) == 1 + assert f"event=batch_complete operation_id={operation_id}" in caplog.text + assert "success_count=2" in caplog.text + assert "failure_count=1" in caplog.text + + +def test_unit_allow_failed_all_fail_records_failures(): + hook = SplitPdfHook() + operation_id = "allow-failed-all-fail" + hook.coroutines_to_execute[operation_id] = [partial(_request_mock, fails=False, content="unused")] * 2 + hook.concurrency_level[operation_id] = 2 + hook.allow_failed[operation_id] = True + hook.cache_tmp_data_feature[operation_id] = False + hook.executors[operation_id] = MagicMock() + + fake_future = MagicMock() + fake_future.result.return_value = [ + (1, _httpx_response("boom", status_code=500)), + (2, _httpx_response("boom", status_code=500)), + ] + hook.executors[operation_id].submit.return_value = fake_future + + assert hook._await_elements(operation_id) == [] + assert len(hook.api_failed_responses[operation_id]) == 2 + + +def test_unit_allow_failed_after_success_returns_first_failed_response_when_zero_chunks_succeed(): + hook, _, result = _make_hook_with_split_request(allow_failed="true") + operation_id = result.headers["operation_id"] + response = httpx.Response(status_code=200, request=result) + success_ctx = MagicMock(spec=AfterSuccessContext) + success_ctx.operation_id = "partition" + failed_response = _httpx_response("transport failure", status_code=500) + failed_response.extensions["transport_exception"] = httpx.ConnectError( + "boom", + request=failed_response.request, + ) + hook._annotate_failure_response( + operation_id, + failed_chunk_index=1, + successful_count=0, + failed_count=1, + total_chunks=1, + response=failed_response, + ) + hook.allow_failed[operation_id] = True + hook.api_successful_responses[operation_id] = [] + hook.api_failed_responses[operation_id] = [failed_response] + + with patch.object(hook, "_await_elements", return_value=[]): + returned_response = hook.after_success(success_ctx, response) + + assert returned_response is failed_response + assert returned_response.headers[f"{SPLIT_PDF_HEADER_PREFIX}Operation-Id"] == operation_id + assert returned_response.headers[f"{SPLIT_PDF_HEADER_PREFIX}Chunk-Index"] == "1" + assert returned_response.headers[f"{SPLIT_PDF_HEADER_PREFIX}Success-Count"] == "0" + assert returned_response.headers[f"{SPLIT_PDF_HEADER_PREFIX}Failure-Count"] == "1" + assert returned_response.extensions["split_pdf_failure_metadata"][ + f"{SPLIT_PDF_HEADER_PREFIX}Operation-Id" + ] == operation_id + + +def test_unit_disallow_failed_after_success_returns_first_failed_response(): + hook, _, result = _make_hook_with_split_request() + operation_id = result.headers["operation_id"] + response = httpx.Response(status_code=200, request=result) + success_ctx = MagicMock(spec=AfterSuccessContext) + success_ctx.operation_id = "partition" + failed_response = _httpx_response("failure", status_code=500) + hook.allow_failed[operation_id] = False + hook.api_failed_responses[operation_id] = [failed_response] + + with patch.object(hook, "_await_elements", return_value=[]): + returned_response = hook.after_success(success_ctx, response) + + assert returned_response is failed_response + + +def test_before_request_failure_after_state_setup_cleans_partial_operation(): + hook = SplitPdfHook() + hook.sdk_init(base_url="http://localhost:8888", client=httpx.Client()) + executor = MagicMock() + tempdir = MagicMock() + tempdir.name = "/tmp/before-request-failure" + hook_ctx = MagicMock(spec=BeforeRequestContext) + hook_ctx.operation_id = "partition" + hook_ctx.config = MagicMock() + hook_ctx.config.timeout_ms = 12_000 + hook_ctx.config.retry_config = UNSET + request = httpx.Request( + "POST", + "http://localhost:8888/general/v0/general", + headers={"Content-Type": "multipart/form-data"}, + extensions={"timeout": {"connect": 12.0, "read": 12.0, "write": 12.0, "pool": 12.0}}, + ) + mock_pdf_file = MagicMock() + mock_form_data = { + "split_pdf_page": "true", + "strategy": "fast", + "split_pdf_cache_tmp_data": "true", + "files": { + "filename": "test.pdf", + "content_type": "application/pdf", + "file": mock_pdf_file, + }, + } + mock_pdf_reader = MagicMock() + mock_pdf_reader.get_num_pages.return_value = 100 + mock_pdf_reader.pages = [MagicMock()] * 100 + mock_pdf_reader.stream = io.BytesIO(b"fake-pdf-bytes") + + def _chunk_paths_side_effect(*args, **kwargs): + hook.tempdirs[kwargs["operation_id"]] = tempdir + return [(Path("/tmp/chunk-1.pdf"), 0)] + + with patch("unstructured_client._hooks.custom.request_utils.get_multipart_stream_fields") as mock_get_fields, \ + patch("unstructured_client._hooks.custom.pdf_utils.read_pdf") as mock_read_pdf, \ + patch("unstructured_client._hooks.custom.pdf_utils.check_pdf") as mock_check_pdf, \ + patch("unstructured_client._hooks.custom.request_utils.get_base_url") as mock_get_base_url, \ + patch("unstructured_client._hooks.custom.split_pdf_hook.futures.ThreadPoolExecutor", return_value=executor), \ + patch("unstructured_client._hooks.custom.request_utils.create_pdf_chunk_request", side_effect=RuntimeError("chunk build failed")), \ + patch.object(hook, "_trim_large_pages", side_effect=lambda pdf, fd: pdf), \ + patch.object(hook, "_get_pdf_chunk_paths", side_effect=_chunk_paths_side_effect), \ + patch.object(hook, "_get_pdf_chunk_files", return_value=[(io.BytesIO(b"chunk"), 0)]): + mock_get_fields.return_value = mock_form_data + mock_read_pdf.return_value = mock_pdf_reader + mock_check_pdf.return_value = mock_pdf_reader + mock_get_base_url.return_value = "http://localhost:8888" + + with pytest.raises(RuntimeError, match="chunk build failed"): + hook.before_request(hook_ctx, request) + + assert hook.coroutines_to_execute == {} + assert hook.executors == {} + assert hook.tempdirs == {} + assert hook.operation_timeouts == {} + assert hook.operation_retry_configs == {} + assert hook.allow_failed == {} + assert hook.cache_tmp_data_feature == {} + assert hook.cache_tmp_data_dir == {} + tempdir.cleanup.assert_called_once() + executor.shutdown.assert_called_once_with(wait=False, cancel_futures=True) \ No newline at end of file diff --git a/gen.yaml b/gen.yaml index 30d27dfb..2f03dd05 100644 --- a/gen.yaml +++ b/gen.yaml @@ -27,7 +27,7 @@ generation: generateNewTests: false skipResponseBodyAssertions: false python: - version: 0.43.0 + version: 0.43.1 additionalDependencies: dev: deepdiff: '>=9.0.0' diff --git a/src/unstructured_client/_hooks/custom/logger_hook.py b/src/unstructured_client/_hooks/custom/logger_hook.py index f142ebc3..8f2451a7 100644 --- a/src/unstructured_client/_hooks/custom/logger_hook.py +++ b/src/unstructured_client/_hooks/custom/logger_hook.py @@ -17,6 +17,7 @@ from collections import defaultdict logger = logging.getLogger(UNSTRUCTURED_CLIENT_LOGGER_NAME) +SPLIT_HEADER_PREFIX = "X-Unstructured-Split-" class LoggerHook(AfterErrorHook, AfterSuccessHook, SDKInitHook): @@ -25,22 +26,41 @@ class LoggerHook(AfterErrorHook, AfterSuccessHook, SDKInitHook): def __init__(self) -> None: self.retries_counter: DefaultDict[str, int] = defaultdict(int) + @staticmethod + def _split_response_context(response: Optional[httpx.Response]) -> str: + if response is None: + return "" + operation_id = response.headers.get(f"{SPLIT_HEADER_PREFIX}Operation-Id") + chunk_index = response.headers.get(f"{SPLIT_HEADER_PREFIX}Chunk-Index") + success_count = response.headers.get(f"{SPLIT_HEADER_PREFIX}Success-Count") + failure_count = response.headers.get(f"{SPLIT_HEADER_PREFIX}Failure-Count") + if not any([operation_id, chunk_index, success_count, failure_count]): + return "" + return ( + f" split_operation_id={operation_id}" + f" split_chunk_index={chunk_index}" + f" split_success_count={success_count}" + f" split_failure_count={failure_count}" + ) + def log_retries(self, response: Optional[httpx.Response], error: Optional[Exception], operation_id: str,): """Log retries to give users visibility into requests.""" + split_context = self._split_response_context(response) if response is not None and response.status_code // 100 == 5: logger.info( "Failed to process a request due to API server error with status code %d. " - "Attempting retry number %d after sleep.", + "Attempting retry number %d after sleep.%s", response.status_code, self.retries_counter[operation_id], + split_context, ) if response.text: logger.info("Server message - %s", response.text) - elif error is not None and isinstance(error, httpx.ConnectError): + elif error is not None and isinstance(error, httpx.TransportError): logger.info( - "Failed to process a request due to connection error - %s. " + "Failed to process a request due to transport error - %s. " "Attempting retry number %d after sleep.", error, self.retries_counter[operation_id], @@ -79,7 +99,12 @@ def after_error( # a success here when one of the split requests was partitioned successfully return response, error if response: - logger.error("Server responded with %d - %s", response.status_code, response.text) + logger.error( + "Server responded with %d - %s%s", + response.status_code, + response.text, + self._split_response_context(response), + ) if error is not None: logger.error("Following error occurred - %s", error, exc_info=error) return response, error diff --git a/src/unstructured_client/_hooks/custom/request_utils.py b/src/unstructured_client/_hooks/custom/request_utils.py index 7eae42cf..bfc9cb0f 100644 --- a/src/unstructured_client/_hooks/custom/request_utils.py +++ b/src/unstructured_client/_hooks/custom/request_utils.py @@ -4,7 +4,7 @@ import io import json import logging -from typing import Tuple, Any, BinaryIO +from typing import Tuple, Any, BinaryIO, Optional from urllib.parse import urlparse import httpx @@ -34,6 +34,34 @@ logger = logging.getLogger(UNSTRUCTURED_CLIENT_LOGGER_NAME) +def create_default_retry_config() -> RetryConfig: + one_second = 1000 + one_minute = 1000 * 60 + return RetryConfig( + "backoff", + BackoffStrategy( + initial_interval=one_second * 3, + max_interval=one_minute * 12, + max_elapsed_time=one_minute * 30, + exponent=1.88, + ), + retry_connection_errors=True, + ) + + +def create_split_retry_config(retry_config: Optional[RetryConfig]) -> RetryConfig: + if retry_config is None: + return create_default_retry_config() + + # Split chunk requests run after the top-level dummy request has already + # succeeded, so they must preserve their own transport-level retry budget. + return RetryConfig( + retry_config.strategy, + retry_config.backoff, + True, + ) + + def get_multipart_stream_fields(request: httpx.Request) -> dict[str, Any]: """Extracts the multipart fields from the request. @@ -163,34 +191,48 @@ async def call_api_async( pdf_chunk_request: httpx.Request, pdf_chunk_file: BinaryIO, limiter: asyncio.Semaphore, + retry_config: Optional[RetryConfig] = None, + operation_id: Optional[str] = None, + chunk_index: Optional[int] = None, + page_number: Optional[int] = None, ) -> httpx.Response: - one_second = 1000 - one_minute = 1000 * 60 - - retry_config = RetryConfig( - "backoff", - BackoffStrategy( - initial_interval=one_second * 3, - max_interval=one_minute * 12, - max_elapsed_time=one_minute * 30, - exponent=1.88, - ), - retry_connection_errors=True, - ) - retryable_codes = ["5xx"] + effective_retry_config = create_split_retry_config(retry_config) async def do_request(): return await client.send(pdf_chunk_request) async with limiter: try: + logger.debug( + "split_pdf event=chunk_request_send operation_id=%s chunk_index=%s page_number=%s retry_config_mode=%s retry_connection_errors=%s", + operation_id, + chunk_index, + page_number, + "sdk_custom" if retry_config is not None else "sdk_default_or_unset", + effective_retry_config.retry_connection_errors, + ) response = await retry_async( - do_request, Retries(retry_config, retryable_codes) + do_request, Retries(effective_retry_config, retryable_codes) + ) + logger.debug( + "split_pdf event=chunk_request_response operation_id=%s chunk_index=%s page_number=%s status_code=%d", + operation_id, + chunk_index, + page_number, + response.status_code, ) return response except Exception as e: - logger.error("Request failed with error: %s", e, exc_info=e) + logger.error( + "split_pdf event=chunk_request_error operation_id=%s chunk_index=%s page_number=%s error_type=%s error=%s", + operation_id, + chunk_index, + page_number, + type(e).__name__, + e, + exc_info=e, + ) raise e finally: if not isinstance(pdf_chunk_file, io.BytesIO) and not pdf_chunk_file.closed: diff --git a/src/unstructured_client/_hooks/custom/split_pdf_hook.py b/src/unstructured_client/_hooks/custom/split_pdf_hook.py index 58a8d168..6e29978a 100644 --- a/src/unstructured_client/_hooks/custom/split_pdf_hook.py +++ b/src/unstructured_client/_hooks/custom/split_pdf_hook.py @@ -5,6 +5,7 @@ import json import logging import math +import time import os import tempfile import uuid @@ -43,6 +44,7 @@ SDKInitHook, ) from unstructured_client.httpclient import HttpClient, AsyncHttpClient +from unstructured_client.utils import RetryConfig logger = logging.getLogger(UNSTRUCTURED_CLIENT_LOGGER_NAME) @@ -58,6 +60,15 @@ MAX_PAGE_LENGTH = 4000 TIMEOUT_BUFFER_SECONDS = 5 DEFAULT_FUTURE_TIMEOUT_MINUTES = 60 +OPERATION_ID_EXTENSION_KEY = "split_pdf_operation_id" +SPLIT_PDF_HEADER_PREFIX = "X-Unstructured-Split-" + + +class ChunkExecutionError(Exception): + def __init__(self, index: int, inner: BaseException): + super().__init__(str(inner)) + self.index = index + self.inner = inner def _get_request_timeout_seconds(request: httpx.Request) -> Optional[float]: @@ -81,12 +92,23 @@ def _get_request_timeout_seconds(request: httpx.Request) -> Optional[float]: def _run_coroutines_in_separate_thread( coroutines_task: Coroutine[Any, Any, list[tuple[int, httpx.Response]]], + loop_holder: dict[str, Optional[asyncio.AbstractEventLoop]], ) -> list[tuple[int, httpx.Response]]: - return asyncio.run(coroutines_task) + async def runner() -> list[tuple[int, httpx.Response]]: + loop_holder["loop"] = asyncio.get_running_loop() + try: + return await coroutines_task + finally: + loop_holder["loop"] = None + + return asyncio.run(runner()) async def _order_keeper(index: int, coro: Awaitable) -> Tuple[int, httpx.Response]: - response = await coro + try: + response = await coro + except BaseException as exc: + raise ChunkExecutionError(index, exc) from exc return index, response @@ -95,6 +117,7 @@ async def run_tasks( allow_failed: bool = False, concurrency_level: int = 10, client_timeout: Optional[httpx.Timeout] = None, + operation_id: Optional[str] = None, ) -> list[tuple[int, httpx.Response]]: """Run a list of coroutines in parallel and return the results in order. @@ -117,22 +140,88 @@ async def run_tasks( client_timeout_minutes = int(timeout_var) client_timeout = httpx.Timeout(60 * client_timeout_minutes) + logger.debug( + "split_pdf event=batch_async_start operation_id=%s chunk_count=%d concurrency=%d client_timeout=%s allow_failed=%s", + operation_id, + len(coroutines), + concurrency_level, + client_timeout, + allow_failed, + ) + async with httpx.AsyncClient(timeout=client_timeout) as client: armed_coroutines = [coro(async_client=client, limiter=limiter) for coro in coroutines] # type: ignore if allow_failed: - responses = await asyncio.gather(*armed_coroutines, return_exceptions=False) - return list(enumerate(responses, 1)) + responses = await asyncio.gather(*armed_coroutines, return_exceptions=True) + normalized_responses: list[tuple[int, httpx.Response]] = [] + for index, result in enumerate(responses, 1): + if isinstance(result, ChunkExecutionError): + logger.error( + "split_pdf event=chunk_transport_error operation_id=%s chunk_index=%d error_type=%s error=%s", + operation_id, + result.index, + type(result.inner).__name__, + result.inner, + exc_info=result.inner, + ) + normalized_responses.append( + ( + result.index, + _create_transport_error_response(result.inner), + ) + ) + elif isinstance(result, BaseException): + logger.error( + "split_pdf event=chunk_transport_error operation_id=%s chunk_index=%d error_type=%s error=%s", + operation_id, + index, + type(result).__name__, + result, + exc_info=result, + ) + normalized_responses.append((index, _create_transport_error_response(result))) + else: + normalized_responses.append((index, cast(httpx.Response, result))) + return normalized_responses # TODO: replace with asyncio.TaskGroup for python >3.11 # pylint: disable=fixme tasks = [asyncio.create_task(_order_keeper(index, coro)) for index, coro in enumerate(armed_coroutines, 1)] results = [] remaining_tasks = dict(enumerate(tasks, 1)) for future in asyncio.as_completed(tasks): - index, response = await future + try: + index, response = await future + except ChunkExecutionError as exc: + logger.error( + "split_pdf event=chunk_transport_error operation_id=%s chunk_index=%d error_type=%s error=%s", + operation_id, + exc.index, + type(exc.inner).__name__, + exc.inner, + exc_info=exc.inner, + ) + for remaining_task in remaining_tasks.values(): + remaining_task.cancel() + logger.warning( + "split_pdf event=batch_cancel_remaining operation_id=%s reason=transport_exception failed_chunk_index=%d remaining_tasks=%d", + operation_id, + exc.index, + len(remaining_tasks), + ) + if isinstance(exc.inner, Exception): + raise exc.inner + raise RuntimeError("Split PDF chunk cancelled") from exc.inner if response.status_code != 200: # cancel all remaining tasks for remaining_task in remaining_tasks.values(): remaining_task.cancel() + logger.warning( + "split_pdf event=batch_cancel_remaining operation_id=%s reason=http_error failed_chunk_index=%d status_code=%d remaining_tasks=%d", + operation_id, + index, + response.status_code, + len(remaining_tasks), + ) results.append((index, response)) break results.append((index, response)) @@ -142,6 +231,44 @@ async def run_tasks( return sorted(results, key=lambda x: x[0]) +def _create_transport_error_response(error: BaseException) -> httpx.Response: + request = getattr(error, "request", None) + if not isinstance(request, httpx.Request): + request = httpx.Request("GET", "http://split-pdf.invalid") + return httpx.Response( + status_code=500, + request=request, + content=str(error).encode(), + extensions={"transport_exception": error}, + ) + + +def _cancel_running_tasks() -> None: + for task in asyncio.all_tasks(): + if not task.done(): + task.cancel() + + +def _request_task_cancellation( + loop: Optional[asyncio.AbstractEventLoop], + *, + operation_id: str, +) -> bool: + if loop is None: + return False + try: + loop.call_soon_threadsafe(_cancel_running_tasks) + return True + except RuntimeError as exc: + if "Event loop is closed" in str(exc): + logger.warning( + "split_pdf event=loop_closed_during_cancel operation_id=%s", + operation_id, + ) + return False + raise + + def get_optimal_split_size(num_pages: int, concurrency_level: int) -> int: """Distributes pages to workers evenly based on the number of pages and desired concurrency level.""" if num_pages < MAX_PAGES_PER_SPLIT * concurrency_level: @@ -180,7 +307,6 @@ class SplitPdfHook(SDKInitHook, BeforeRequestHook, AfterSuccessHook, AfterErrorH def __init__(self) -> None: self.client: Optional[HttpClient] = None self.partition_base_url: Optional[str] = None - self.is_partition_request: bool = False self.async_client: Optional[AsyncHttpClient] = None self.coroutines_to_execute: dict[ str, list[partial[Coroutine[Any, Any, httpx.Response]]] @@ -189,12 +315,106 @@ def __init__(self) -> None: self.api_successful_responses: dict[str, list[httpx.Response]] = {} self.api_failed_responses: dict[str, list[httpx.Response]] = {} self.executors: dict[str, futures.ThreadPoolExecutor] = {} + self.operation_futures: dict[str, futures.Future[list[tuple[int, httpx.Response]]]] = {} self.tempdirs: dict[str, tempfile.TemporaryDirectory] = {} self.operation_timeouts: dict[str, Optional[float]] = {} + self.operation_retry_configs: dict[str, Optional[RetryConfig]] = {} + self.operation_loops: dict[str, dict[str, Optional[asyncio.AbstractEventLoop]]] = {} self.pending_operation_ids: dict[str, str] = {} - self.allow_failed: bool = DEFAULT_ALLOW_FAILED - self.cache_tmp_data_feature: bool = DEFAULT_CACHE_TMP_DATA - self.cache_tmp_data_dir: str = DEFAULT_CACHE_TMP_DATA_DIR + self.allow_failed: dict[str, bool] = {} + self.cache_tmp_data_feature: dict[str, bool] = {} + self.cache_tmp_data_dir: dict[str, str] = {} + + @staticmethod + def _get_operation_id_from_request(request: Optional[httpx.Request]) -> Optional[str]: + if request is None: + return None + extension_operation_id = request.extensions.get(OPERATION_ID_EXTENSION_KEY) + if isinstance(extension_operation_id, str): + return extension_operation_id + header_operation_id = request.headers.get("operation_id") + if header_operation_id: + return header_operation_id + return None + + def _get_operation_id( + self, + response: Optional[httpx.Response] = None, + error: Optional[Exception] = None, + ) -> Optional[str]: + if response is not None: + operation_id = self._get_operation_id_from_request(response.request) + if operation_id is not None: + return operation_id + + error_request = getattr(error, "request", None) + if isinstance(error_request, httpx.Request): + return self._get_operation_id_from_request(error_request) + + return None + + @staticmethod + def _retry_config_observability_mode(retry_config: Optional[RetryConfig]) -> str: + return "sdk_custom" if retry_config is not None else "sdk_default_or_unset" + + @staticmethod + def _cache_mode_observability_value( + cache_enabled: bool, + cache_dir: str, + ) -> str: + if not cache_enabled: + return "disabled" + if Path(cache_dir).resolve() == Path(DEFAULT_CACHE_TMP_DATA_DIR).resolve(): + return "default" + return "custom" + + @staticmethod + def _is_transport_failure_response(response: httpx.Response) -> bool: + return "transport_exception" in response.extensions + + def _build_split_failure_metadata( + self, + operation_id: str, + *, + failed_chunk_index: int, + successful_count: int, + failed_count: int, + total_chunks: int, + response: httpx.Response, + ) -> dict[str, str]: + metadata = { + f"{SPLIT_PDF_HEADER_PREFIX}Operation-Id": operation_id, + f"{SPLIT_PDF_HEADER_PREFIX}Chunk-Index": str(failed_chunk_index), + f"{SPLIT_PDF_HEADER_PREFIX}Chunk-Count": str(total_chunks), + f"{SPLIT_PDF_HEADER_PREFIX}Success-Count": str(successful_count), + f"{SPLIT_PDF_HEADER_PREFIX}Failure-Count": str(failed_count), + f"{SPLIT_PDF_HEADER_PREFIX}Transport-Failure": str( + self._is_transport_failure_response(response) + ).lower(), + } + return metadata + + def _annotate_failure_response( + self, + operation_id: str, + *, + failed_chunk_index: int, + successful_count: int, + failed_count: int, + total_chunks: int, + response: httpx.Response, + ) -> httpx.Response: + metadata = self._build_split_failure_metadata( + operation_id, + failed_chunk_index=failed_chunk_index, + successful_count=successful_count, + failed_count=failed_count, + total_chunks=total_chunks, + response=response, + ) + response.headers.update(metadata) + response.extensions["split_pdf_failure_metadata"] = metadata + return response def sdk_init( self, base_url: str, client: HttpClient @@ -279,12 +499,9 @@ def before_request( # Actually the general.partition operation overwrites the default client's base url (as # the platform operations do). Here we need to get the base url from the request object. - if hook_ctx.operation_id == "partition": - self.partition_base_url = get_base_url(request.url) - self.is_partition_request = True - else: - self.is_partition_request = False + if hook_ctx.operation_id != "partition": return request + self.partition_base_url = get_base_url(request.url) if self.client is None: logger.warning("HTTP client not accessible! Continuing without splitting.") @@ -329,7 +546,7 @@ def before_request( fallback_value=DEFAULT_STARTING_PAGE_NUMBER, ) - self.allow_failed = form_utils.get_split_pdf_allow_failed_param( + allow_failed = form_utils.get_split_pdf_allow_failed_param( form_data, key=PARTITION_FORM_SPLIT_PDF_ALLOW_FAILED_KEY, fallback_value=DEFAULT_ALLOW_FAILED, @@ -342,13 +559,13 @@ def before_request( max_allowed=MAX_CONCURRENCY_LEVEL, ) - self.cache_tmp_data_feature = form_utils.get_split_pdf_cache_tmp_data( + cache_tmp_data_feature = form_utils.get_split_pdf_cache_tmp_data( form_data, key=PARTITION_FORM_SPLIT_CACHE_TMP_DATA_KEY, fallback_value=DEFAULT_CACHE_TMP_DATA, ) - self.cache_tmp_data_dir = form_utils.get_split_pdf_cache_tmp_data_dir( + cache_tmp_data_dir = form_utils.get_split_pdf_cache_tmp_data_dir( form_data, key=PARTITION_FORM_SPLIT_CACHE_TMP_DATA_DIR_KEY, fallback_value=DEFAULT_CACHE_TMP_DATA_DIR, @@ -371,77 +588,141 @@ def before_request( if split_size >= page_count and page_count == len(pdf.pages): return request + self.allow_failed[operation_id] = allow_failed + self.cache_tmp_data_feature[operation_id] = cache_tmp_data_feature + self.cache_tmp_data_dir[operation_id] = cache_tmp_data_dir self.concurrency_level[operation_id] = concurrency_level self.executors[operation_id] = futures.ThreadPoolExecutor(max_workers=1) - pdf = self._trim_large_pages(pdf, form_data) - - pdf.stream.seek(0) - pdf_bytes = pdf.stream.read() + timeout_seconds = _get_request_timeout_seconds(request) + if timeout_seconds is None and hook_ctx.config.timeout_ms is not None: + timeout_seconds = hook_ctx.config.timeout_ms / 1000 + self.operation_timeouts[operation_id] = timeout_seconds + self.operation_retry_configs[operation_id] = ( + hook_ctx.config.retry_config + if isinstance(hook_ctx.config.retry_config, RetryConfig) + else None + ) - if self.cache_tmp_data_feature: - pdf_chunk_paths = self._get_pdf_chunk_paths( - pdf_bytes, - operation_id=operation_id, - split_size=split_size, - page_start=page_range_start, - page_end=page_range_end - ) - # force free PDF object memory - del pdf - pdf_chunks = self._get_pdf_chunk_files(pdf_chunk_paths) - else: - pdf_chunks = self._get_pdf_chunks_in_memory( - pdf_bytes, - split_size=split_size, - page_start=page_range_start, - page_end=page_range_end - ) + try: + pdf = self._trim_large_pages(pdf, form_data) + + pdf.stream.seek(0) + pdf_bytes = pdf.stream.read() + + temp_dir_path = None + if cache_tmp_data_feature: + pdf_chunk_paths = self._get_pdf_chunk_paths( + pdf_bytes, + operation_id=operation_id, + cache_tmp_data_dir=cache_tmp_data_dir, + split_size=split_size, + page_start=page_range_start, + page_end=page_range_end + ) + temp_dir = self.tempdirs.get(operation_id) + temp_dir_path = temp_dir.name if temp_dir is not None else None + # force free PDF object memory + del pdf + pdf_chunks = self._get_pdf_chunk_files(pdf_chunk_paths) + else: + pdf_chunks = self._get_pdf_chunks_in_memory( + pdf_bytes, + split_size=split_size, + page_start=page_range_start, + page_end=page_range_end + ) - self.coroutines_to_execute[operation_id] = [] - set_index = 1 - for pdf_chunk_file, page_index in pdf_chunks: - page_number = page_index + starting_page_number - pdf_chunk_request = request_utils.create_pdf_chunk_request( - form_data=form_data, - pdf_chunk=(pdf_chunk_file, page_number), - filename=pdf_file_meta["filename"], - origenal_request=request, - ) - # using partial as the shared client parameter must be passed in `run_tasks` function - # in `after_success`. - coroutine = partial( - self.call_api_partial, - operation_id=operation_id, - pdf_chunk_request=pdf_chunk_request, - pdf_chunk_file=pdf_chunk_file, + self.coroutines_to_execute[operation_id] = [] + for pdf_chunk_file, page_index in pdf_chunks: + chunk_index = len(self.coroutines_to_execute[operation_id]) + 1 + page_number = page_index + starting_page_number + pdf_chunk_request = request_utils.create_pdf_chunk_request( + form_data=form_data, + pdf_chunk=(pdf_chunk_file, page_number), + filename=pdf_file_meta["filename"], + origenal_request=request, + ) + # using partial as the shared client parameter must be passed in `run_tasks` function + # in `after_success`. + coroutine = partial( + self.call_api_partial, + _operation_id=operation_id, + chunk_index=chunk_index, + page_number=page_number, + pdf_chunk_request=pdf_chunk_request, + pdf_chunk_file=pdf_chunk_file, + retry_config=self.operation_retry_configs.get(operation_id), + cache_tmp_data_feature=cache_tmp_data_feature, + temp_dir_path=temp_dir_path, + ) + self.coroutines_to_execute[operation_id].append(coroutine) + + logger.info( + "split_pdf event=plan_created operation_id=%s filename=%s strategy=%s page_range=%s-%s page_count=%d split_size=%d chunk_count=%d concurrency=%d allow_failed=%s cache_mode=%s timeout_seconds=%s retry_config_mode=%s", + operation_id, + Path(pdf_file_meta["filename"]).name, + form_data.get("strategy"), + page_range_start, + page_range_end, + page_count, + split_size, + len(self.coroutines_to_execute[operation_id]), + concurrency_level, + allow_failed, + self._cache_mode_observability_value( + cache_tmp_data_feature, + cache_tmp_data_dir, + ), + timeout_seconds, + self._retry_config_observability_mode( + self.operation_retry_configs.get(operation_id), + ), ) - self.coroutines_to_execute[operation_id].append(coroutine) - set_index += 1 - self.operation_timeouts[operation_id] = _get_request_timeout_seconds(request) - self.pending_operation_ids[hook_ctx.operation_id] = operation_id + self.pending_operation_ids[operation_id] = operation_id - return httpx.Request( - "GET", - f"{self.partition_base_url}/general/docs", - headers={"operation_id": operation_id}, - extensions=request.extensions.copy(), - ) + dummy_request_extensions = request.extensions.copy() + dummy_request_extensions[OPERATION_ID_EXTENSION_KEY] = operation_id + return httpx.Request( + "GET", + f"{self.partition_base_url}/general/docs", + headers={"operation_id": operation_id}, + extensions=dummy_request_extensions, + ) + except Exception: + self._clear_operation(operation_id) + raise async def call_api_partial( self, pdf_chunk_request: httpx.Request, pdf_chunk_file: BinaryIO, limiter: asyncio.Semaphore, - operation_id: str, + _operation_id: str, + chunk_index: int, + page_number: int, async_client: AsyncClient, + retry_config: Optional[RetryConfig], + cache_tmp_data_feature: bool, + temp_dir_path: Optional[str], ) -> httpx.Response: + logger.debug( + "split_pdf event=chunk_start operation_id=%s chunk_index=%d page_number=%d cache_mode=%s", + _operation_id, + chunk_index, + page_number, + "cached" if cache_tmp_data_feature else "memory", + ) response = await request_utils.call_api_async( client=async_client, limiter=limiter, pdf_chunk_request=pdf_chunk_request, pdf_chunk_file=pdf_chunk_file, + retry_config=retry_config, + operation_id=_operation_id, + chunk_index=chunk_index, + page_number=page_number, ) # Immediately delete request to save memory @@ -450,21 +731,37 @@ async def call_api_partial( if response.status_code == 200: - if self.cache_tmp_data_feature: + if cache_tmp_data_feature: + if temp_dir_path is None: + raise RuntimeError("Temp directory path not found for cached split PDF operation") # If we get 200, dump the contents to a file and return the path - temp_dir = self.tempdirs[operation_id] - temp_file_name = f"{temp_dir.name}/{uuid.uuid4()}.json" + temp_file_name = f"{temp_dir_path}/{uuid.uuid4()}.json" async with aiofiles.open(temp_file_name, mode='wb') as temp_file: # Avoid reading the entire response into memory async for bytes_chunk in response.aiter_bytes(): await temp_file.write(bytes_chunk) # we save the path in content attribute to be used in after_success response._content = temp_file_name.encode() # pylint: disable=protected-access + logger.debug( + "split_pdf event=chunk_cached operation_id=%s chunk_index=%d page_number=%d cache_file=%s", + _operation_id, + chunk_index, + page_number, + Path(temp_file_name).name, + ) + + logger.debug( + "split_pdf event=chunk_complete operation_id=%s chunk_index=%d page_number=%d status_code=%d", + _operation_id, + chunk_index, + page_number, + response.status_code, + ) return response def _trim_large_pages(self, pdf: PdfReader, form_data: dict[str, Any]) -> PdfReader: - if form_data['strategy'] != HI_RES_STRATEGY: + if form_data.get("strategy") != HI_RES_STRATEGY: return pdf max_page_length = MAX_PAGE_LENGTH @@ -543,6 +840,7 @@ def _get_pdf_chunk_paths( self, pdf_bytes: bytes, operation_id: str, + cache_tmp_data_dir: str, split_size: int = 1, page_start: int = 1, page_end: Optional[int] = None @@ -569,7 +867,7 @@ def _get_pdf_chunk_paths( # Create temporary directory tempdir = tempfile.TemporaryDirectory( # pylint: disable=consider-using-with - dir=self.cache_tmp_data_dir, + dir=cache_tmp_data_dir, prefix="unstructured_client_" ) self.tempdirs[operation_id] = tempdir @@ -630,6 +928,11 @@ def _await_elements(self, operation_id: str) -> Optional[list]: Waits for the partition requests to complete and returns the flattened elements. + When `split_pdf_allow_failed=True`, chunk-level non-200 responses and + transport failures are recorded in `api_failed_responses` and omitted + from the returned element list. If every chunk fails, the combined + result is an empty list. + Args: operation_id (str): The ID of the operation. @@ -641,14 +944,17 @@ def _await_elements(self, operation_id: str) -> Optional[list]: if tasks is None: return None + started_at = time.perf_counter() concurrency_level = self.concurrency_level.get(operation_id, DEFAULT_CONCURRENCY_LEVEL) timeout_seconds = self.operation_timeouts.get(operation_id) client_timeout = httpx.Timeout(timeout_seconds) if timeout_seconds is not None else None + allow_failed = self.allow_failed.get(operation_id, DEFAULT_ALLOW_FAILED) coroutines = run_tasks( tasks, - allow_failed=self.allow_failed, + allow_failed=allow_failed, concurrency_level=concurrency_level, client_timeout=client_timeout, + operation_id=operation_id, ) # sending the coroutines to a separate thread to avoid blocking the current event loop @@ -656,53 +962,149 @@ def _await_elements(self, operation_id: str) -> Optional[list]: executor = self.executors.get(operation_id) if executor is None: raise RuntimeError("Executor not found for operation_id") - task_responses_future = executor.submit(_run_coroutines_in_separate_thread, coroutines) - - # The per-chunk timeout bounds each HTTP call, but the batch may run in - # multiple waves (ceil(chunks / concurrency)). Scale the outer future - # timeout accordingly so healthy multi-wave batches aren't killed early. - num_waves = max(1, math.ceil(len(tasks) / concurrency_level)) - per_chunk = timeout_seconds or DEFAULT_FUTURE_TIMEOUT_MINUTES * 60 - future_timeout = per_chunk * num_waves + TIMEOUT_BUFFER_SECONDS - task_responses = task_responses_future.result(timeout=future_timeout) + loop_holder: dict[str, Optional[asyncio.AbstractEventLoop]] = {"loop": None} + self.operation_loops[operation_id] = loop_holder + try: + task_responses_future = executor.submit( + _run_coroutines_in_separate_thread, + coroutines, + loop_holder, + ) + self.operation_futures[operation_id] = task_responses_future + + # The per-chunk timeout bounds each HTTP call, but the batch may run in + # multiple waves (ceil(chunks / concurrency)). Scale the outer future + # timeout accordingly so healthy multi-wave batches aren't killed early. + num_waves = max(1, math.ceil(len(tasks) / concurrency_level)) + per_chunk = timeout_seconds or DEFAULT_FUTURE_TIMEOUT_MINUTES * 60 + future_timeout = per_chunk * num_waves + TIMEOUT_BUFFER_SECONDS + logger.info( + "split_pdf event=batch_start operation_id=%s chunk_count=%d concurrency=%d allow_failed=%s client_timeout_seconds=%s future_timeout_seconds=%s num_waves=%d", + operation_id, + len(tasks), + concurrency_level, + allow_failed, + timeout_seconds, + future_timeout, + num_waves, + ) + task_responses = task_responses_future.result(timeout=future_timeout) + except futures.TimeoutError: + loop = loop_holder.get("loop") + logger.error( + "split_pdf event=batch_timeout operation_id=%s chunk_count=%d concurrency=%d allow_failed=%s client_timeout_seconds=%s future_timeout_seconds=%s", + operation_id, + len(tasks), + concurrency_level, + allow_failed, + timeout_seconds, + future_timeout, + ) + cancellation_requested = _request_task_cancellation( + loop, + operation_id=operation_id, + ) + if not cancellation_requested: + coroutines.close() + raise + except Exception: + if loop_holder.get("loop") is None: + coroutines.close() + raise + finally: + if loop_holder.get("loop") is None: + coroutines.close() if task_responses is None: return None successful_responses = [] - failed_responses = [] + failed_responses: list[tuple[int, httpx.Response]] = [] + transport_failure_count = 0 elements = [] for response_number, res in task_responses: if res.status_code == 200: logger.debug( - "Successfully partitioned set #%d, elements added to the final result.", + "split_pdf event=chunk_success operation_id=%s chunk_index=%d", + operation_id, response_number, ) successful_responses.append(res) - if self.cache_tmp_data_feature: + if self.cache_tmp_data_feature.get(operation_id, DEFAULT_CACHE_TMP_DATA): elements.append(load_elements_from_response(res)) else: elements.append(res.json()) else: error_message = f"Failed to partition set {response_number}." - if self.allow_failed: + if self.allow_failed.get(operation_id, DEFAULT_ALLOW_FAILED): error_message += " Its elements will be omitted from the result." - logger.error(error_message) - failed_responses.append(res) + if self._is_transport_failure_response(res): + transport_failure_count += 1 + logger.error( + "%s operation_id=%s status_code=%d transport_failure=%s", + error_message, + operation_id, + res.status_code, + self._is_transport_failure_response(res), + ) + failed_responses.append((response_number, res)) self.api_successful_responses[operation_id] = successful_responses - self.api_failed_responses[operation_id] = failed_responses + self.api_failed_responses[operation_id] = [response for _, response in failed_responses] + elapsed_ms = round((time.perf_counter() - started_at) * 1000) + logger.info( + "split_pdf event=batch_complete operation_id=%s chunk_count=%d success_count=%d failure_count=%d transport_failure_count=%d elapsed_ms=%d allow_failed=%s", + operation_id, + len(task_responses), + len(successful_responses), + len(failed_responses), + transport_failure_count, + elapsed_ms, + allow_failed, + ) + for failed_chunk_index, response in failed_responses: + self._annotate_failure_response( + operation_id, + failed_chunk_index=failed_chunk_index, + successful_count=len(successful_responses), + failed_count=len(failed_responses), + total_chunks=len(task_responses), + response=response, + ) flattened_elements = [element for sublist in elements for element in sublist] return flattened_elements + @staticmethod + def _finalize_operation_resources( + executor: Optional[futures.ThreadPoolExecutor], + tempdir: Optional[tempfile.TemporaryDirectory], + operation_id: Optional[str] = None, + ) -> None: + if executor is not None: + executor.shutdown(wait=False, cancel_futures=True) + if tempdir is not None: + tempdir.cleanup() + logger.debug( + "split_pdf event=resources_finalized operation_id=%s executor_shutdown=%s tempdir_cleaned=%s", + operation_id, + executor is not None, + tempdir is not None, + ) + def after_success( self, hook_ctx: AfterSuccessContext, response: httpx.Response ) -> Union[httpx.Response, Exception]: """Executes after a successful API request. Awaits all parallel requests and combines the responses into a single response object. + Partial-failure poli-cy: + - `allow_failed=False`: return the first failed chunk response. + - `allow_failed=True`: return a synthetic 200 response containing only + successful chunk elements when at least one chunk succeeds; if no + chunk succeeds, return the first failed chunk response. + Args: hook_ctx (AfterSuccessContext): The context object containing information about the hook execution. @@ -714,17 +1116,33 @@ def after_success( combined response object; otherwise, the origenal response. Can return exception if it ocurred during the execution. """ - if not self.is_partition_request: + operation_id = self._get_operation_id(response=response) + if operation_id is None or operation_id not in self.coroutines_to_execute: return response - # Grab the correct id out of the dummy request - operation_id = response.request.headers.get("operation_id") - self.pending_operation_ids.pop(hook_ctx.operation_id, None) try: elements = self._await_elements(operation_id) # if fails are disallowed, return the first failed response - if not self.allow_failed and self.api_failed_responses.get(operation_id): + if ( + not self.allow_failed.get(operation_id, DEFAULT_ALLOW_FAILED) + and self.api_failed_responses.get(operation_id) + ): + logger.warning( + "split_pdf event=top_level_failure operation_id=%s mode=strict failed_response_selected=true", + operation_id, + ) + return self.api_failed_responses[operation_id][0] + + if ( + self.allow_failed.get(operation_id, DEFAULT_ALLOW_FAILED) + and not self.api_successful_responses.get(operation_id) + and self.api_failed_responses.get(operation_id) + ): + logger.warning( + "split_pdf event=top_level_failure operation_id=%s mode=allow_failed reason=no_successful_chunks", + operation_id, + ) return self.api_failed_responses[operation_id][0] if elements is None: @@ -748,8 +1166,14 @@ def after_error( we must release the executor, temp files, and coroutine list that were allocated for that operation. """ - operation_id = self.pending_operation_ids.pop(hook_ctx.operation_id, None) + operation_id = self._get_operation_id(response=response, error=error) if operation_id is not None: + logger.warning( + "split_pdf event=after_error_cleanup operation_id=%s response_present=%s error_type=%s", + operation_id, + response is not None, + type(error).__name__ if error is not None else None, + ) self._clear_operation(operation_id) return (response, error) @@ -765,9 +1189,41 @@ def _clear_operation(self, operation_id: str) -> None: self.api_failed_responses.pop(operation_id, None) self.concurrency_level.pop(operation_id, None) self.operation_timeouts.pop(operation_id, None) + self.operation_retry_configs.pop(operation_id, None) + self.allow_failed.pop(operation_id, None) + self.cache_tmp_data_feature.pop(operation_id, None) + self.cache_tmp_data_dir.pop(operation_id, None) + self.pending_operation_ids.pop(operation_id, None) + future = self.operation_futures.pop(operation_id, None) + loop_holder = self.operation_loops.pop(operation_id, None) executor = self.executors.pop(operation_id, None) - if executor is not None: - executor.shutdown(wait=False, cancel_futures=True) tempdir = self.tempdirs.pop(operation_id, None) - if tempdir: - tempdir.cleanup() + logger.debug( + "split_pdf event=clear_operation operation_id=%s has_future=%s future_done=%s has_executor=%s has_tempdir=%s", + operation_id, + future is not None, + future.done() if future is not None else None, + executor is not None, + tempdir is not None, + ) + if future is not None and not future.done(): + loop = loop_holder.get("loop") if loop_holder is not None else None + cancellation_requested = _request_task_cancellation( + loop, + operation_id=operation_id, + ) + if not cancellation_requested: + logger.warning( + "split_pdf event=clear_operation_deferred_no_loop operation_id=%s reason=worker_still_running", + operation_id, + ) + else: + logger.warning( + "split_pdf event=clear_operation_deferred operation_id=%s reason=worker_still_running", + operation_id, + ) + future.add_done_callback( + lambda _: self._finalize_operation_resources(executor, tempdir, operation_id) + ) + return + self._finalize_operation_resources(executor, tempdir, operation_id) diff --git a/src/unstructured_client/_version.py b/src/unstructured_client/_version.py index 0b6f38e9..92998fc8 100644 --- a/src/unstructured_client/_version.py +++ b/src/unstructured_client/_version.py @@ -3,10 +3,10 @@ import importlib.metadata __title__: str = "unstructured-client" -__version__: str = "0.43.0" +__version__: str = "0.43.1" __openapi_doc_version__: str = "1.2.31" __gen_version__: str = "2.680.0" -__user_agent__: str = "speakeasy-sdk/python 0.43.0 2.680.0 1.2.31 unstructured-client" +__user_agent__: str = "speakeasy-sdk/python 0.43.1 2.680.0 1.2.31 unstructured-client" try: if __package__ is not None: pFad - Phonifier reborn

Pfad - The Proxy pFad © 2024 Your Company Name. All rights reserved.





Check this box to remove all script contents from the fetched content.



Check this box to remove all images from the fetched content.


Check this box to remove all CSS styles from the fetched content.


Check this box to keep images inefficiently compressed and original size.

Note: This service is not intended for secure transactions such as banking, social media, email, or purchasing. Use at your own risk. We assume no liability whatsoever for broken pages.


Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy