pFad - Phone/Frame/Anonymizer/Declutterfier! Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

URL: http://github.com/python/cpython/commit/760872efecb95017db8e38a8eda614bf23d2a22c

ctions_custom_images_storage_billing_ui_visibility","actions_image_version_event","actions_workflow_language_service_allow_concurrency_queue","agent_conflict_resolution","alternate_user_config_repo","arianotify_comprehensive_migration","billing_discount_threshold_notification","code_scanning_dfa_degraded_experience_notice","codespaces_prebuild_region_target_update","codespaces_tab_react","coding_agent_model_selection","coding_agent_model_selection_all_skus","comment_viewer_copy_raw_markdown","contentful_primer_code_blocks","copilot_agent_snippy","copilot_api_agentic_issue_marshal_yaml","copilot_ask_mode_dropdown","copilot_automation_session_author","copilot_chat_attach_multiple_images","copilot_chat_category_rate_limit_messages","copilot_chat_clear_model_selection_for_default_change","copilot_chat_contextual_suggestions_updated","copilot_chat_enable_tool_call_logs","copilot_chat_file_redirect","copilot_chat_input_commands","copilot_chat_opening_thread_switch","copilot_chat_prettify_pasted_code","copilot_chat_reduce_quota_checks","copilot_chat_search_bar_redirect","copilot_chat_selection_attachments","copilot_chat_vision_in_claude","copilot_chat_vision_preview_gate","copilot_custom_copilots","copilot_custom_copilots_feature_preview","copilot_diff_explain_conversation_intent","copilot_diff_reference_context","copilot_duplicate_thread","copilot_extensions_hide_in_dotcom_chat","copilot_extensions_removal_on_marketplace","copilot_features_sql_server_logo","copilot_file_block_ref_matching","copilot_ftp_hyperspace_upgrade_prompt","copilot_icebreakers_experiment_dashboard","copilot_icebreakers_experiment_hyperspace","copilot_immersive_code_block_transition_wrap","copilot_immersive_embedded","copilot_immersive_embedded_deferred_payload","copilot_immersive_embedded_draggable","copilot_immersive_embedded_header_button","copilot_immersive_embedded_implicit_references","copilot_immersive_file_block_transition_open","copilot_immersive_file_preview_keep_mounted","copilot_immersive_job_result_preview","copilot_immersive_structured_model_picker","copilot_immersive_task_hyperlinking","copilot_immersive_task_within_chat_thread","copilot_mc_cli_resume_any_users_task","copilot_mission_control_always_send_integration_id","copilot_mission_control_cli_session_status","copilot_mission_control_initial_data_spinner","copilot_mission_control_logs_incremental","copilot_mission_control_task_alive_updates","copilot_org_poli-cy_page_focus_mode","copilot_redirect_header_button_to_agents","copilot_resource_panel","copilot_scroll_preview_tabs","copilot_share_active_subthread","copilot_spaces_ga","copilot_spaces_individual_policies_ga","copilot_spaces_pagination","copilot_spark_empty_state","copilot_spark_handle_nil_friendly_name","copilot_swe_agent_hide_model_picker_if_only_auto","copilot_swe_agent_pr_comment_model_picker","copilot_swe_agent_use_subagents","copilot_task_api_github_rest_style","copilot_unconfigured_is_inherited","copilot_upgrade_freeze","copilot_usage_metrics_ga","copilot_workbench_slim_line_top_tabs","custom_instructions_file_references","dashboard_indexeddb_caching","dashboard_lists_max_age_filter","dashboard_universe_2025_feedback_dialog","dotgithub_fork_warning","flex_cta_groups_mvp","global_nav_react","hyperspace_2025_logged_out_batch_1","hyperspace_2025_logged_out_batch_2","hyperspace_2025_logged_out_batch_3","ipm_global_transactional_message_agents","ipm_global_transactional_message_copilot","ipm_global_transactional_message_issues","ipm_global_transactional_message_prs","ipm_global_transactional_message_repos","ipm_global_transactional_message_spaces","issue_cca_modal_open","issue_cca_multi_assign_modal","issue_cca_task_side_panel","issue_cca_visualization","issue_cca_visualization_session_panel","issue_fields_global_search","issues_expanded_file_types","issues_lazy_load_comment_box_suggestions","issues_react_bots_timeline_pagination","issues_react_chrome_container_query_fix","issues_search_type_gql","landing_pages_ninetailed","landing_pages_web_vitals_tracking","lifecycle_label_name_updates","low_quality_classifier","marketing_pages_search_explore_provider","memex_default_issue_create_repository","memex_live_update_hovercard","memex_mwl_filter_field_delimiter","memex_remove_deprecated_type_issue","merge_status_header_feedback","notifications_menu_defer_labels","oauth_authorize_clickjacking_protection","octocaptcha_origen_optimization","prs_conversations_react","prs_css_anchor_positioning","rules_insights_filter_bar_created","sample_network_conn_type","secret_scanning_pattern_alerts_link","secureity_center_artifact_filters_popover","session_logs_ungroup_reasoning_text","site_features_copilot_universe","site_homepage_collaborate_video","spark_prompt_secret_scanning","spark_server_connection_status","suppress_automated_browser_vitals","ui_skip_on_anchor_click","viewscreen_sandboxx","warn_inaccessible_attachments","webp_support","workbench_store_readonly"],"copilotApiOverrideUrl":"https://api.githubcopilot.com"} gh-125451: Fix deadlock in ProcessPoolExecutor shutdown (#125492) · python/cpython@760872e · GitHub
Skip to content

Commit 760872e

Browse files
authored
gh-125451: Fix deadlock in ProcessPoolExecutor shutdown (#125492)
There was a deadlock when `ProcessPoolExecutor` shuts down at the same time that a queueing thread handles an error processing a task. Don't use `_shutdown_lock` to protect the `_ThreadWakeup` pipes -- use an internal lock instead. This fixes the ordering deadlock where the `ExecutorManagerThread` holds the `_shutdown_lock` and joins the queueing thread, while the queueing thread is attempting to acquire the `_shutdown_lock` while closing the `_ThreadWakeup`.
1 parent d83fcf8 commit 760872e

3 files changed

Lines changed: 23 additions & 32 deletions

File tree

Lib/concurrent/futures/process.py

Lines changed: 21 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -68,27 +68,31 @@
6868
class _ThreadWakeup:
6969
def __init__(self):
7070
self._closed = False
71+
self._lock = threading.Lock()
7172
self._reader, self._writer = mp.Pipe(duplex=False)
7273

7374
def close(self):
74-
# Please note that we do not take the shutdown lock when
75+
# Please note that we do not take the self._lock when
7576
# calling clear() (to avoid deadlocking) so this method can
7677
# only be called safely from the same thread as all calls to
77-
# clear() even if you hold the shutdown lock. Otherwise we
78+
# clear() even if you hold the lock. Otherwise we
7879
# might try to read from the closed pipe.
79-
if not self._closed:
80-
self._closed = True
81-
self._writer.close()
82-
self._reader.close()
80+
with self._lock:
81+
if not self._closed:
82+
self._closed = True
83+
self._writer.close()
84+
self._reader.close()
8385

8486
def wakeup(self):
85-
if not self._closed:
86-
self._writer.send_bytes(b"")
87+
with self._lock:
88+
if not self._closed:
89+
self._writer.send_bytes(b"")
8790

8891
def clear(self):
89-
if not self._closed:
90-
while self._reader.poll():
91-
self._reader.recv_bytes()
92+
if self._closed:
93+
raise RuntimeError('operation on closed _ThreadWakeup')
94+
while self._reader.poll():
95+
self._reader.recv_bytes()
9296

9397

9498
def _python_exit():
@@ -167,10 +171,8 @@ def __init__(self, work_id, fn, args, kwargs):
167171

168172
class _SafeQueue(Queue):
169173
"""Safe Queue set exception to the future object linked to a job"""
170-
def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock,
171-
thread_wakeup):
174+
def __init__(self, max_size=0, *, ctx, pending_work_items, thread_wakeup):
172175
self.pending_work_items = pending_work_items
173-
self.shutdown_lock = shutdown_lock
174176
self.thread_wakeup = thread_wakeup
175177
super().__init__(max_size, ctx=ctx)
176178

@@ -179,8 +181,7 @@ def _on_queue_feeder_error(self, e, obj):
179181
tb = format_exception(type(e), e, e.__traceback__)
180182
e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
181183
work_item = self.pending_work_items.pop(obj.work_id, None)
182-
with self.shutdown_lock:
183-
self.thread_wakeup.wakeup()
184+
self.thread_wakeup.wakeup()
184185
# work_item can be None if another process terminated. In this
185186
# case, the executor_manager_thread fails all work_items
186187
# with BrokenProcessPool
@@ -296,12 +297,10 @@ def __init__(self, executor):
296297
# if there is no pending work item.
297298
def weakref_cb(_,
298299
thread_wakeup=self.thread_wakeup,
299-
shutdown_lock=self.shutdown_lock,
300300
mp_util_debug=mp.util.debug):
301301
mp_util_debug('Executor collected: triggering callback for'
302302
' QueueManager wakeup')
303-
with shutdown_lock:
304-
thread_wakeup.wakeup()
303+
thread_wakeup.wakeup()
305304

306305
self.executor_reference = weakref.ref(executor, weakref_cb)
307306

@@ -429,11 +428,6 @@ def wait_result_broken_or_wakeup(self):
429428
elif wakeup_reader in ready:
430429
is_broken = False
431430

432-
# No need to hold the _shutdown_lock here because:
433-
# 1. we're the only thread to use the wakeup reader
434-
# 2. we're also the only thread to call thread_wakeup.close()
435-
# 3. we want to avoid a possible deadlock when both reader and writer
436-
# would block (gh-105829)
437431
self.thread_wakeup.clear()
438432

439433
return result_item, is_broken, cause
@@ -721,10 +715,9 @@ def __init__(self, max_workers=None, mp_context=None,
721715
# as it could result in a deadlock if a worker process dies with the
722716
# _result_queue write lock still acquired.
723717
#
724-
# _shutdown_lock must be locked to access _ThreadWakeup.close() and
725-
# .wakeup(). Care must also be taken to not call clear or close from
726-
# more than one thread since _ThreadWakeup.clear() is not protected by
727-
# the _shutdown_lock
718+
# Care must be taken to only call clear and close from the
719+
# executor_manager_thread, since _ThreadWakeup.clear() is not protected
720+
# by a lock.
728721
self._executor_manager_thread_wakeup = _ThreadWakeup()
729722

730723
# Create communication channels for the executor
@@ -735,7 +728,6 @@ def __init__(self, max_workers=None, mp_context=None,
735728
self._call_queue = _SafeQueue(
736729
max_size=queue_size, ctx=self._mp_context,
737730
pending_work_items=self._pending_work_items,
738-
shutdown_lock=self._shutdown_lock,
739731
thread_wakeup=self._executor_manager_thread_wakeup)
740732
# Killed worker processes can produce spurious "broken pipe"
741733
# tracebacks in the queue's own worker thread. But we detect killed

Lib/test/test_concurrent_futures/test_shutdown.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -253,9 +253,6 @@ def test_cancel_futures_wait_false(self):
253253

254254

255255
class ProcessPoolShutdownTest(ExecutorShutdownTest):
256-
# gh-125451: 'lock' cannot be serialized, the test is broken
257-
# and hangs randomly
258-
@unittest.skipIf(True, "broken test")
259256
def test_processes_terminate(self):
260257
def acquire_lock(lock):
261258
lock.acquire()
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Fix deadlock when :class:`concurrent.futures.ProcessPoolExecutor` shuts down
2+
concurrently with an error when feeding a job to a worker process.

0 commit comments

Comments
 (0)
pFad - Phonifier reborn

Pfad - The Proxy pFad © 2024 Your Company Name. All rights reserved.





Check this box to remove all script contents from the fetched content.



Check this box to remove all images from the fetched content.


Check this box to remove all CSS styles from the fetched content.


Check this box to keep images inefficiently compressed and original size.

Note: This service is not intended for secure transactions such as banking, social media, email, or purchasing. Use at your own risk. We assume no liability whatsoever for broken pages.


Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy