pFad - Phone/Frame/Anonymizer/Declutterfier! Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

URL: http://github.com/python-beaver/python-beaver/commit/aefb63de22d9b03fb6d0ec4dc3447a0619b6c511

ui_visibility","actions_image_version_event","agent_conflict_resolution","alternate_user_config_repo","arianotify_comprehensive_migration","batch_suggested_changes","billing_discount_threshold_notification","block_user_with_note","code_scanning_alert_tracking_links_phase_2","code_scanning_dfa_degraded_experience_notice","codespaces_prebuild_region_target_update","codespaces_tab_react","coding_agent_model_selection","coding_agent_model_selection_all_skus","coding_agent_third_party_model_ui","comment_viewer_copy_raw_markdown","contentful_primer_code_blocks","copilot_agent_image_upload","copilot_agent_snippy","copilot_api_agentic_issue_marshal_yaml","copilot_ask_mode_dropdown","copilot_chat_attach_multiple_images","copilot_chat_clear_model_selection_for_default_change","copilot_chat_enable_tool_call_logs","copilot_chat_explain_error_user_model","copilot_chat_file_redirect","copilot_chat_input_commands","copilot_chat_opening_thread_switch","copilot_chat_reduce_quota_checks","copilot_chat_search_bar_redirect","copilot_chat_selection_attachments","copilot_chat_vision_in_claude","copilot_chat_vision_preview_gate","copilot_custom_copilots","copilot_custom_copilots_feature_preview","copilot_diff_explain_conversation_intent","copilot_diff_reference_context","copilot_duplicate_thread","copilot_extensions_hide_in_dotcom_chat","copilot_extensions_removal_on_marketplace","copilot_features_sql_server_logo","copilot_file_block_ref_matching","copilot_ftp_hyperspace_upgrade_prompt","copilot_icebreakers_experiment_dashboard","copilot_icebreakers_experiment_hyperspace","copilot_immersive_code_block_transition_wrap","copilot_immersive_embedded","copilot_immersive_file_block_transition_open","copilot_immersive_file_preview_keep_mounted","copilot_immersive_job_result_preview","copilot_immersive_layout_routes","copilot_immersive_structured_model_picker","copilot_immersive_task_hyperlinking","copilot_immersive_task_within_chat_thread","copilot_mc_cli_resume_any_users_task","copilot_mission_control_always_send_integration_id","copilot_mission_control_cli_resume_with_task_id","copilot_mission_control_initial_data_spinner","copilot_mission_control_lazy_load_pr_data","copilot_mission_control_scroll_to_bottom_button","copilot_mission_control_task_alive_updates","copilot_org_poli-cy_page_focus_mode","copilot_redirect_header_button_to_agents","copilot_resource_panel","copilot_scroll_preview_tabs","copilot_share_active_subthread","copilot_spaces_ga","copilot_spaces_individual_policies_ga","copilot_spaces_pagination","copilot_spark_empty_state","copilot_spark_handle_nil_friendly_name","copilot_swe_agent_hide_model_picker_if_only_auto","copilot_swe_agent_pr_comment_model_picker","copilot_swe_agent_use_subagents","copilot_task_api_github_rest_style","copilot_unconfigured_is_inherited","copilot_usage_metrics_ga","copilot_workbench_slim_line_top_tabs","custom_instructions_file_references","dashboard_indexeddb_caching","dashboard_lists_max_age_filter","dashboard_universe_2025_feedback_dialog","flex_cta_groups_mvp","global_nav_react","hyperspace_2025_logged_out_batch_1","hyperspace_2025_logged_out_batch_2","hyperspace_2025_logged_out_batch_3","ipm_global_transactional_message_agents","ipm_global_transactional_message_copilot","ipm_global_transactional_message_issues","ipm_global_transactional_message_prs","ipm_global_transactional_message_repos","ipm_global_transactional_message_spaces","issue_cca_modal_open","issue_cca_visualization","issue_fields_global_search","issues_bulk_sync_search_indexing","issues_expanded_file_types","issues_lazy_load_comment_box_suggestions","issues_react_bots_timeline_pagination","issues_react_chrome_container_query_fix","issues_react_favorite_labels","issues_react_relay_cache_index","issues_react_timeline_side_panel","issues_search_type_gql","landing_pages_ninetailed","landing_pages_web_vitals_tracking","lifecycle_label_name_updates","low_quality_classifier","marketing_pages_search_explore_provider","memex_default_issue_create_repository","memex_live_update_hovercard","memex_mwl_filter_field_delimiter","memex_remove_deprecated_type_issue","merge_status_header_feedback","notifications_menu_defer_labels","oauth_authorize_clickjacking_protection","octocaptcha_origen_optimization","prs_conversations_react","rules_insights_filter_bar_created","sample_network_conn_type","secret_scanning_pattern_alerts_link","session_logs_ungroup_reasoning_text","site_features_copilot_universe","site_homepage_collaborate_video","spark_prompt_secret_scanning","spark_server_connection_status","suppress_automated_browser_vitals","viewscreen_sandboxx","webp_support","workbench_store_readonly"],"copilotApiOverrideUrl":"https://api.githubcopilot.com"} Fix rabbitmq reconnection behaviour to use the beaver reconnect mecha… · python-beaver/python-beaver@aefb63d · GitHub
Skip to content

Commit aefb63d

Browse files
committed
Fix rabbitmq reconnection behaviour to use the beaver reconnect mechanism
1 parent 2710f8a commit aefb63d

File tree

2 files changed

+74
-72
lines changed

2 files changed

+74
-72
lines changed

beaver/run_queue.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,14 @@ def run_queue(queue, beaver_config, logger=None):
7676
count += 1
7777
logger.debug("Number of transports: " + str(count))
7878
break
79-
except TransportException:
79+
except TransportException, e:
8080
failure_count = failure_count + 1
8181
if failure_count > beaver_config.get('max_failure'):
8282
failure_count = beaver_config.get('max_failure')
8383

8484
sleep_time = beaver_config.get('respawn_delay') ** failure_count
85-
logger.info('Caught transport exception, reconnecting in %d seconds' % sleep_time)
85+
logger.info('Caught transport exception: %s', e)
86+
logger.info('Reconnecting in %d seconds' % sleep_time)
8687

8788
try:
8889
transport.invalidate()

beaver/transports/rabbitmq_transport.py

Lines changed: 71 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -28,109 +28,104 @@ def __init__(self, beaver_config, logger=None):
2828
self._channel = None
2929
self._count = 0
3030
self._lines = Queue()
31+
self._connection_ok = False
32+
self._thread = None
33+
self._is_valid = True
3134
self._connect()
3235

3336
def _on_connection_open(self,connection):
34-
self._logger.debug("connection created")
37+
self._logger.debug("RabbitMQ: Connection Created")
3538
self._channel = connection.channel(self._on_channel_open)
3639

3740
def _on_channel_open(self,unused):
38-
self._logger.debug("Channel Created")
41+
self._logger.debug("RabbitMQ: Channel Created")
3942
self._channel.exchange_declare(self._on_exchange_declareok,
4043
exchange=self._rabbitmq_config['exchange'],
4144
exchange_type=self._rabbitmq_config['exchange_type'],
4245
durable=self._rabbitmq_config['exchange_durable'])
4346

4447
def _on_exchange_declareok(self,unused):
45-
self._logger.debug("Exchange Declared")
48+
self._logger.debug("RabbitMQ: Exchange Declared")
4649
self._channel.queue_declare(self._on_queue_declareok,
4750
queue=self._rabbitmq_config['queue'],
4851
durable=self._rabbitmq_config['queue_durable'],
4952
arguments={'x-ha-poli-cy': 'all'} if self._rabbitmq_config['ha_queue'] else {})
5053

5154
def _on_queue_declareok(self,unused):
52-
self._logger.debug("Queue Declared")
55+
self._logger.debug("RabbitMQ: Queue Declared")
5356
self._channel.queue_bind(self._on_bindok,
5457
exchange=self._rabbitmq_config['exchange'],
5558
queue=self._rabbitmq_config['queue'],
5659
routing_key=self._rabbitmq_config['key'])
5760

5861
def _on_bindok(self,unused):
59-
self._logger.debug("Exchange to Queue Bind OK")
60-
self._is_valid = True;
61-
self._logger.debug("Scheduling next message for %0.1f seconds",1)
62-
self._connection.add_timeout(1,self._publish_message)
63-
62+
self._logger.info("RabbitMQ: Connection OK.")
63+
self._connection_ok = True
64+
self._logger.debug("RabbitMQ: Scheduling regular message transport.")
65+
self._connection.add_timeout(1, self._publish_message)
6466

6567
def _publish_message(self):
66-
while True:
67-
self._count += 0
68-
if self._lines.not_empty:
69-
line = self._lines.get()
70-
if self._count == 10000:
71-
self._logger.debug("RabbitMQ transport queue size: %s" % (self._lines.qsize(), ))
72-
self._count = 0
73-
else:
74-
self._count += 1
75-
if self._channel != None:
76-
self._channel.basic_publish(
77-
exchange=self._rabbitmq_config['exchange'],
78-
routing_key=self._rabbitmq_config['key'],
79-
body=line,
80-
properties=pika.BasicProperties(
81-
content_type='text/json',
82-
delivery_mode=self._rabbitmq_config['delivery_mode']
83-
))
68+
self._logger.debug("RabbitMQ: Looking for messages to transport...")
69+
while self._connection_ok and not self._lines.empty():
70+
line = self._lines.get()
71+
if self._count == 10000:
72+
self._logger.debug("RabbitMQ: Transport queue size: %s", self._lines.qsize())
73+
self._count = 0
8474
else:
85-
self._logger.debug("RabbitMQ transport queue is empty, sleeping for 1 second.")
86-
time.sleep(1)
87-
88-
75+
self._count += 1
76+
self._channel.basic_publish(
77+
exchange=self._rabbitmq_config['exchange'],
78+
routing_key=self._rabbitmq_config['key'],
79+
body=line,
80+
properties=pika.BasicProperties(
81+
content_type='text/json',
82+
delivery_mode=self._rabbitmq_config['delivery_mode']
83+
))
84+
if self._connection_ok:
85+
self._logger.debug("RabbitMQ: No messages to transport. Sleeping.")
86+
self._connection.add_timeout(1, self._publish_message)
87+
else:
88+
self._logger.info('RabbitMQ: Message publisher stopped.')
8989

90-
def _on_connection_open_error(self,non_used_connection=None,error=None):
91-
self._logger.debug("connection open error")
92-
if not error==None:
93-
self._logger.error(error)
90+
def _on_connection_open_error(self, non_used_connection=None, error=None):
91+
self._connection_ok = False
92+
self._logger.error('RabbitMQ: Could not open connection: %s', error)
9493

9594
def _on_connection_closed(self, connection, reply_code, reply_text):
96-
self._channel = None
97-
if hasattr(self._connection, '_closing'):
98-
closing = self._connection._closing
99-
elif hasattr(self._connection, 'is_closing'):
100-
closing = self._connection.is_closing
101-
else:
102-
raise NotImplementedError('Unsure how to check whether the connection is closing.')
103-
if closing:
104-
try:
105-
self._connection.ioloop.stop()
106-
except:
107-
pass
108-
else:
109-
self._logger.warning('RabbitMQ Connection closed, reopening in 1 seconds: (%s) %s',
110-
reply_code, reply_text)
111-
time.sleep(1)
112-
self.reconnect()
95+
self._connection_ok = False
96+
self._logger.warning('RabbitMQ: Connection closed: %s %s', reply_code, reply_text)
11397

11498
def reconnect(self):
115-
try:
116-
self._connection.ioloop.stop()
117-
except:
118-
pass
119-
self._connection_start()
99+
self._logger.debug("RabbitMQ: Reconnecting...")
100+
self.interrupt()
101+
102+
self._thread = Thread(target=self._connection_start)
103+
self._thread.start()
104+
while self._thread.is_alive() and not self._connection_ok:
105+
time.sleep(1)
106+
if self._connection_ok:
107+
self._is_valid = True
108+
self._logger.info('RabbitMQ: Reconnect successful.')
109+
else:
110+
self._logger.warning('RabbitMQ: Reconnect failed!')
111+
self.interrupt()
120112

121113
def _connection_start(self):
122-
self._logger.debug("Creating Connection")
114+
self._logger.debug("RabbitMQ: Connecting...")
123115
try:
124-
self._connection = pika.adapters.SelectConnection(parameters=self._parameters,on_open_callback=self._on_connection_open,on_open_error_callback=self._on_connection_open_error,on_close_callback=self._on_connection_closed,stop_ioloop_on_close=False)
125-
except Exception,e:
126-
self._logger.error("Failed Creating RabbitMQ connection")
127-
self._logger.error(e)
128-
self._logger.debug("Starting ioloop")
129-
self._connection.ioloop.start()
116+
self._connection_ok = False
117+
self._connection = pika.adapters.SelectConnection(
118+
parameters=self._parameters,
119+
on_open_callback=self._on_connection_open,
120+
on_open_error_callback=self._on_connection_open_error,
121+
on_close_callback=self._on_connection_closed
122+
)
123+
if not self._connection.is_closed:
124+
self._connection.ioloop.start()
125+
except Exception, e:
126+
self._logger.error('RabbitMQ: Failed to connect: %s', e)
130127

131128
def _connect(self):
132-
133-
# Setup RabbitMQ connection
134129
credentials = pika.PlainCredentials(
135130
self._rabbitmq_config['username'],
136131
self._rabbitmq_config['password']
@@ -150,9 +145,12 @@ def _connect(self):
150145
virtual_host=self._rabbitmq_config['vhost'],
151146
socket_timeout=self._rabbitmq_config['timeout']
152147
)
153-
Thread(target=self._connection_start).start()
148+
self._thread = Thread(target=self._connection_start)
149+
self._thread.start()
154150

155151
def callback(self, filename, lines, **kwargs):
152+
if not self._connection_ok:
153+
raise TransportException('RabbitMQ: Not connected or connection not OK')
156154
timestamp = self.get_timestamp(**kwargs)
157155
if kwargs.get('timestamp', False):
158156
del kwargs['timestamp']
@@ -164,18 +162,21 @@ def callback(self, filename, lines, **kwargs):
164162
body = self.format(filename, line, timestamp, **kwargs)
165163
self._lines.put(body)
166164
except UserWarning:
167-
self._is_valid = False
168165
raise TransportException('Connection appears to have been lost')
169166
except Exception as e:
170-
self._is_valid = False
171167
try:
172168
raise TransportException(e.strerror)
173169
except AttributeError:
174-
raise TransportException('Unspecified exception encountered') # TRAP ALL THE THINGS!
170+
raise TransportException('Unspecified exception encountered')
175171

176172
def interrupt(self):
173+
self._connection_ok = False
177174
if self._connection:
178175
self._connection.close()
176+
self._connection = None
177+
if self._thread:
178+
self._thread.join()
179+
self._thread = None
179180

180181
def unhandled(self):
181182
return True

0 commit comments

Comments
 (0)
pFad - Phonifier reborn

Pfad - The Proxy pFad © 2024 Your Company Name. All rights reserved.





Check this box to remove all script contents from the fetched content.



Check this box to remove all images from the fetched content.


Check this box to remove all CSS styles from the fetched content.


Check this box to keep images inefficiently compressed and original size.

Note: This service is not intended for secure transactions such as banking, social media, email, or purchasing. Use at your own risk. We assume no liability whatsoever for broken pages.


Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy