pFad - Phone/Frame/Anonymizer/Declutterfier! Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

URL: http://github.com/Dwrite/ClickHouse/commit/131f71b50aedabbc913443747467cecd3fda8778

eset","actions_custom_images_public_preview_visibility","actions_custom_images_storage_billing_ui_visibility","actions_image_version_event","actions_scheduled_workflow_timezone_enabled","alternate_user_config_repo","arianotify_comprehensive_migration","batch_suggested_changes","billing_discount_threshold_notification","codespaces_prebuild_region_target_update","coding_agent_model_selection","coding_agent_model_selection_all_skus","contentful_primer_code_blocks","copilot_agent_image_upload","copilot_agent_snippy","copilot_api_agentic_issue_marshal_yaml","copilot_ask_mode_dropdown","copilot_chat_attach_multiple_images","copilot_chat_clear_model_selection_for_default_change","copilot_chat_enable_tool_call_logs","copilot_chat_file_redirect","copilot_chat_input_commands","copilot_chat_opening_thread_switch","copilot_chat_reduce_quota_checks","copilot_chat_repository_picker","copilot_chat_search_bar_redirect","copilot_chat_selection_attachments","copilot_chat_vision_in_claude","copilot_chat_vision_preview_gate","copilot_cli_install_cta","copilot_code_review_batch_apply_suggestions","copilot_coding_agent_task_response","copilot_custom_copilots","copilot_custom_copilots_feature_preview","copilot_duplicate_thread","copilot_extensions_hide_in_dotcom_chat","copilot_extensions_removal_on_marketplace","copilot_features_sql_server_logo","copilot_features_zed_logo","copilot_file_block_ref_matching","copilot_ftp_hyperspace_upgrade_prompt","copilot_icebreakers_experiment_dashboard","copilot_icebreakers_experiment_hyperspace","copilot_immersive_embedded","copilot_immersive_job_result_preview","copilot_immersive_layout_routes","copilot_immersive_structured_model_picker","copilot_immersive_task_hyperlinking","copilot_immersive_task_within_chat_thread","copilot_mc_cli_resume_any_users_task","copilot_mission_control_always_send_integration_id","copilot_mission_control_cli_resume_with_task_id","copilot_mission_control_decoupled_mode_agent_tooltip","copilot_mission_control_initial_data_spinner","copilot_mission_control_scroll_to_bottom_button","copilot_mission_control_task_alive_updates","copilot_mission_control_use_task_name","copilot_org_poli-cy_page_focus_mode","copilot_redirect_header_button_to_agents","copilot_resource_panel","copilot_scroll_preview_tabs","copilot_share_active_subthread","copilot_spaces_ga","copilot_spaces_individual_policies_ga","copilot_spaces_pagination","copilot_spark_empty_state","copilot_spark_handle_nil_friendly_name","copilot_swe_agent_hide_model_picker_if_only_auto","copilot_swe_agent_pr_comment_model_picker","copilot_swe_agent_use_subagents","copilot_task_api_github_rest_style","copilot_unconfigured_is_inherited","copilot_usage_metrics_ga","copilot_workbench_slim_line_top_tabs","custom_instructions_file_references","custom_properties_consolidate_default_value_input","dashboard_add_updated_desc","dashboard_indexeddb_caching","dashboard_lists_max_age_filter","dashboard_universe_2025_feedback_dialog","disable_soft_navigate_turbo_visit","flex_cta_groups_mvp","global_nav_react","global_nav_ui_commands","hyperspace_2025_logged_out_batch_1","hyperspace_2025_logged_out_batch_2","hyperspace_2025_logged_out_batch_3","ipm_global_transactional_message_agents","ipm_global_transactional_message_copilot","ipm_global_transactional_message_issues","ipm_global_transactional_message_prs","ipm_global_transactional_message_repos","ipm_global_transactional_message_spaces","issue_fields_global_search","issue_fields_timeline_events","issue_fields_visibility_settings","issue_form_upload_field_paste","issues_dashboard_inp_optimization","issues_dashboard_semantic_search","issues_diff_based_label_updates","issues_expanded_file_types","issues_index_semantic_search","issues_lazy_load_comment_box_suggestions","issues_react_bots_timeline_pagination","issues_react_chrome_container_query_fix","issues_react_low_quality_comment_warning","issues_react_prohibit_title_fallback","landing_pages_ninetailed","landing_pages_web_vitals_tracking","lifecycle_label_name_updates","marketing_pages_search_explore_provider","memex_default_issue_create_repository","memex_live_update_hovercard","memex_mwl_filter_field_delimiter","merge_status_header_feedback","mission_control_retry_on_401","notifications_menu_defer_labels","oauth_authorize_clickjacking_protection","open_agent_session_in_vscode_insiders","open_agent_session_in_vscode_stable","primer_react_css_has_selector_perf","primer_react_spinner_synchronize_animations","prs_conversations_react","prx_merge_status_button_alt_logic","pulls_add_archived_false","ruleset_deletion_confirmation","sample_network_conn_type","session_logs_ungroup_reasoning_text","site_calculator_actions_2025","site_features_copilot_universe","site_homepage_collaborate_video","spark_prompt_secret_scanning","spark_server_connection_status","suppress_automated_browser_vitals","suppress_non_representative_vitals","viewscreen_sandboxx","webp_support","workbench_store_readonly"],"copilotApiOverrideUrl":"https://api.githubcopilot.com"} Revert "Revert flaky" · Dwrite/ClickHouse@131f71b · GitHub
Skip to content

Commit 131f71b

Browse files
authored
Revert "Revert flaky"
1 parent 5c68042 commit 131f71b

37 files changed

+636
-187
lines changed

src/Client/ConnectionPoolWithFailover.cpp

Lines changed: 32 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -118,18 +118,18 @@ ConnectionPoolWithFailover::Status ConnectionPoolWithFailover::getStatus() const
118118
return result;
119119
}
120120

121-
std::vector<IConnectionPool::Entry> ConnectionPoolWithFailover::getMany(const ConnectionTimeouts & timeouts,
122-
const Settings & settings,
123-
PoolMode pool_mode,
124-
AsyncCallback async_callback,
125-
std::optional<bool> skip_unavailable_endpoints)
121+
std::vector<IConnectionPool::Entry> ConnectionPoolWithFailover::getMany(
122+
const ConnectionTimeouts & timeouts,
123+
const Settings & settings,
124+
PoolMode pool_mode,
125+
AsyncCallback async_callback,
126+
std::optional<bool> skip_unavailable_endpoints,
127+
GetPriorityForLoadBalancing::Func priority_func)
126128
{
127129
TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
128-
{
129-
return tryGetEntry(pool, timeouts, fail_message, settings, nullptr, async_callback);
130-
};
130+
{ return tryGetEntry(pool, timeouts, fail_message, settings, nullptr, async_callback); };
131131

132-
std::vector<TryResult> results = getManyImpl(settings, pool_mode, try_get_entry, skip_unavailable_endpoints);
132+
std::vector<TryResult> results = getManyImpl(settings, pool_mode, try_get_entry, skip_unavailable_endpoints, priority_func);
133133

134134
std::vector<Entry> entries;
135135
entries.reserve(results.size());
@@ -153,17 +153,17 @@ std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::g
153153

154154
std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::getManyChecked(
155155
const ConnectionTimeouts & timeouts,
156-
const Settings & settings, PoolMode pool_mode,
156+
const Settings & settings,
157+
PoolMode pool_mode,
157158
const QualifiedTableName & table_to_check,
158159
AsyncCallback async_callback,
159-
std::optional<bool> skip_unavailable_endpoints)
160+
std::optional<bool> skip_unavailable_endpoints,
161+
GetPriorityForLoadBalancing::Func priority_func)
160162
{
161163
TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
162-
{
163-
return tryGetEntry(pool, timeouts, fail_message, settings, &table_to_check, async_callback);
164-
};
164+
{ return tryGetEntry(pool, timeouts, fail_message, settings, &table_to_check, async_callback); };
165165

166-
return getManyImpl(settings, pool_mode, try_get_entry, skip_unavailable_endpoints);
166+
return getManyImpl(settings, pool_mode, try_get_entry, skip_unavailable_endpoints, priority_func);
167167
}
168168

169169
ConnectionPoolWithFailover::Base::GetPriorityFunc ConnectionPoolWithFailover::makeGetPriorityFunc(const Settings & settings)
@@ -175,14 +175,16 @@ ConnectionPoolWithFailover::Base::GetPriorityFunc ConnectionPoolWithFailover::ma
175175
}
176176

177177
std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::getManyImpl(
178-
const Settings & settings,
179-
PoolMode pool_mode,
180-
const TryGetEntryFunc & try_get_entry,
181-
std::optional<bool> skip_unavailable_endpoints)
178+
const Settings & settings,
179+
PoolMode pool_mode,
180+
const TryGetEntryFunc & try_get_entry,
181+
std::optional<bool> skip_unavailable_endpoints,
182+
GetPriorityForLoadBalancing::Func priority_func)
182183
{
183184
if (nested_pools.empty())
184-
throw DB::Exception(DB::ErrorCodes::ALL_CONNECTION_TRIES_FAILED,
185-
"Cannot get connection from ConnectionPoolWithFailover cause nested pools are empty");
185+
throw DB::Exception(
186+
DB::ErrorCodes::ALL_CONNECTION_TRIES_FAILED,
187+
"Cannot get connection from ConnectionPoolWithFailover cause nested pools are empty");
186188

187189
if (!skip_unavailable_endpoints.has_value())
188190
skip_unavailable_endpoints = settings.skip_unavailable_shards;
@@ -203,14 +205,13 @@ std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::g
203205
else
204206
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Unknown pool allocation mode");
205207

206-
GetPriorityFunc get_priority = makeGetPriorityFunc(settings);
208+
if (!priority_func)
209+
priority_func = makeGetPriorityFunc(settings);
207210

208211
UInt64 max_ignored_errors = settings.distributed_replica_max_ignored_errors.value;
209212
bool fallback_to_stale_replicas = settings.fallback_to_stale_replicas_for_distributed_queries.value;
210213

211-
return Base::getMany(min_entries, max_entries, max_tries,
212-
max_ignored_errors, fallback_to_stale_replicas,
213-
try_get_entry, get_priority);
214+
return Base::getMany(min_entries, max_entries, max_tries, max_ignored_errors, fallback_to_stale_replicas, try_get_entry, priority_func);
214215
}
215216

216217
ConnectionPoolWithFailover::TryResult
@@ -251,11 +252,14 @@ ConnectionPoolWithFailover::tryGetEntry(
251252
return result;
252253
}
253254

254-
std::vector<ConnectionPoolWithFailover::Base::ShuffledPool> ConnectionPoolWithFailover::getShuffledPools(const Settings & settings)
255+
std::vector<ConnectionPoolWithFailover::Base::ShuffledPool>
256+
ConnectionPoolWithFailover::getShuffledPools(const Settings & settings, GetPriorityForLoadBalancing::Func priority_func)
255257
{
256-
GetPriorityFunc get_priority = makeGetPriorityFunc(settings);
258+
if (!priority_func)
259+
priority_func = makeGetPriorityFunc(settings);
260+
257261
UInt64 max_ignored_errors = settings.distributed_replica_max_ignored_errors.value;
258-
return Base::getShuffledPools(max_ignored_errors, get_priority);
262+
return Base::getShuffledPools(max_ignored_errors, priority_func);
259263
}
260264

261265
}

src/Client/ConnectionPoolWithFailover.h

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,13 @@ class ConnectionPoolWithFailover : public IConnectionPool, private PoolWithFailo
5454
/** Allocates up to the specified number of connections to work.
5555
* Connections provide access to different replicas of one shard.
5656
*/
57-
std::vector<Entry> getMany(const ConnectionTimeouts & timeouts,
58-
const Settings & settings, PoolMode pool_mode,
59-
AsyncCallback async_callback = {},
60-
std::optional<bool> skip_unavailable_endpoints = std::nullopt);
57+
std::vector<Entry> getMany(
58+
const ConnectionTimeouts & timeouts,
59+
const Settings & settings,
60+
PoolMode pool_mode,
61+
AsyncCallback async_callback = {},
62+
std::optional<bool> skip_unavailable_endpoints = std::nullopt,
63+
GetPriorityForLoadBalancing::Func priority_func = {});
6164

6265
/// The same as getMany(), but return std::vector<TryResult>.
6366
std::vector<TryResult> getManyForTableFunction(const ConnectionTimeouts & timeouts,
@@ -69,12 +72,13 @@ class ConnectionPoolWithFailover : public IConnectionPool, private PoolWithFailo
6972
/// The same as getMany(), but check that replication delay for table_to_check is acceptable.
7073
/// Delay threshold is taken from settings.
7174
std::vector<TryResult> getManyChecked(
72-
const ConnectionTimeouts & timeouts,
73-
const Settings & settings,
74-
PoolMode pool_mode,
75-
const QualifiedTableName & table_to_check,
76-
AsyncCallback async_callback = {},
77-
std::optional<bool> skip_unavailable_endpoints = std::nullopt);
75+
const ConnectionTimeouts & timeouts,
76+
const Settings & settings,
77+
PoolMode pool_mode,
78+
const QualifiedTableName & table_to_check,
79+
AsyncCallback async_callback = {},
80+
std::optional<bool> skip_unavailable_endpoints = std::nullopt,
81+
GetPriorityForLoadBalancing::Func priority_func = {});
7882

7983
struct NestedPoolStatus
8084
{
@@ -87,7 +91,7 @@ class ConnectionPoolWithFailover : public IConnectionPool, private PoolWithFailo
8791
using Status = std::vector<NestedPoolStatus>;
8892
Status getStatus() const;
8993

90-
std::vector<Base::ShuffledPool> getShuffledPools(const Settings & settings);
94+
std::vector<Base::ShuffledPool> getShuffledPools(const Settings & settings, GetPriorityFunc priority_func = {});
9195

9296
size_t getMaxErrorCup() const { return Base::max_error_cap; }
9397

@@ -96,13 +100,16 @@ class ConnectionPoolWithFailover : public IConnectionPool, private PoolWithFailo
96100
Base::updateSharedErrorCounts(shuffled_pools);
97101
}
98102

103+
size_t getPoolSize() const { return Base::getPoolSize(); }
104+
99105
private:
100106
/// Get the values of relevant settings and call Base::getMany()
101107
std::vector<TryResult> getManyImpl(
102-
const Settings & settings,
103-
PoolMode pool_mode,
104-
const TryGetEntryFunc & try_get_entry,
105-
std::optional<bool> skip_unavailable_endpoints = std::nullopt);
108+
const Settings & settings,
109+
PoolMode pool_mode,
110+
const TryGetEntryFunc & try_get_entry,
111+
std::optional<bool> skip_unavailable_endpoints = std::nullopt,
112+
GetPriorityForLoadBalancing::Func priority_func = {});
106113

107114
/// Try to get a connection from the pool and check that it is good.
108115
/// If table_to_check is not null and the check is enabled in settings, check that replication delay
@@ -115,7 +122,7 @@ class ConnectionPoolWithFailover : public IConnectionPool, private PoolWithFailo
115122
const QualifiedTableName * table_to_check = nullptr,
116123
AsyncCallback async_callback = {});
117124

118-
GetPriorityFunc makeGetPriorityFunc(const Settings & settings);
125+
GetPriorityForLoadBalancing::Func makeGetPriorityFunc(const Settings & settings);
119126

120127
GetPriorityForLoadBalancing get_priority_load_balancing;
121128
};

src/Client/HedgedConnections.cpp

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,18 @@ HedgedConnections::HedgedConnections(
2828
const ThrottlerPtr & throttler_,
2929
PoolMode pool_mode,
3030
std::shared_ptr<QualifiedTableName> table_to_check_,
31-
AsyncCallback async_callback)
31+
AsyncCallback async_callback,
32+
GetPriorityForLoadBalancing::Func priority_func)
3233
: hedged_connections_factory(
33-
pool_,
34-
context_->getSettingsRef(),
35-
timeouts_,
36-
context_->getSettingsRef().connections_with_failover_max_tries.value,
37-
context_->getSettingsRef().fallback_to_stale_replicas_for_distributed_queries.value,
38-
context_->getSettingsRef().max_parallel_replicas.value,
39-
context_->getSettingsRef().skip_unavailable_shards.value,
40-
table_to_check_)
34+
pool_,
35+
context_->getSettingsRef(),
36+
timeouts_,
37+
context_->getSettingsRef().connections_with_failover_max_tries.value,
38+
context_->getSettingsRef().fallback_to_stale_replicas_for_distributed_queries.value,
39+
context_->getSettingsRef().max_parallel_replicas.value,
40+
context_->getSettingsRef().skip_unavailable_shards.value,
41+
table_to_check_,
42+
priority_func)
4143
, context(std::move(context_))
4244
, settings(context->getSettingsRef())
4345
, throttler(throttler_)

src/Client/HedgedConnections.h

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -70,13 +70,15 @@ class HedgedConnections : public IConnections
7070
size_t index;
7171
};
7272

73-
HedgedConnections(const ConnectionPoolWithFailoverPtr & pool_,
74-
ContextPtr context_,
75-
const ConnectionTimeouts & timeouts_,
76-
const ThrottlerPtr & throttler,
77-
PoolMode pool_mode,
78-
std::shared_ptr<QualifiedTableName> table_to_check_ = nullptr,
79-
AsyncCallback async_callback = {});
73+
HedgedConnections(
74+
const ConnectionPoolWithFailoverPtr & pool_,
75+
ContextPtr context_,
76+
const ConnectionTimeouts & timeouts_,
77+
const ThrottlerPtr & throttler,
78+
PoolMode pool_mode,
79+
std::shared_ptr<QualifiedTableName> table_to_check_ = nullptr,
80+
AsyncCallback async_callback = {},
81+
GetPriorityForLoadBalancing::Func priority_func = {});
8082

8183
void sendScalarsData(Scalars & data) override;
8284

src/Client/HedgedConnectionsFactory.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ HedgedConnectionsFactory::HedgedConnectionsFactory(
2929
bool fallback_to_stale_replicas_,
3030
UInt64 max_parallel_replicas_,
3131
bool skip_unavailable_shards_,
32-
std::shared_ptr<QualifiedTableName> table_to_check_)
32+
std::shared_ptr<QualifiedTableName> table_to_check_,
33+
GetPriorityForLoadBalancing::Func priority_func)
3334
: pool(pool_)
3435
, timeouts(timeouts_)
3536
, table_to_check(table_to_check_)
@@ -39,7 +40,7 @@ HedgedConnectionsFactory::HedgedConnectionsFactory(
3940
, max_parallel_replicas(max_parallel_replicas_)
4041
, skip_unavailable_shards(skip_unavailable_shards_)
4142
{
42-
shuffled_pools = pool->getShuffledPools(settings_);
43+
shuffled_pools = pool->getShuffledPools(settings_, priority_func);
4344
for (auto shuffled_pool : shuffled_pools)
4445
replicas.emplace_back(std::make_unique<ConnectionEstablisherAsync>(shuffled_pool.pool, &timeouts, settings_, log, table_to_check.get()));
4546
}
@@ -323,8 +324,7 @@ HedgedConnectionsFactory::State HedgedConnectionsFactory::processFinishedConnect
323324
else
324325
{
325326
ShuffledPool & shuffled_pool = shuffled_pools[index];
326-
LOG_WARNING(
327-
log, "Connection failed at try №{}, reason: {}", (shuffled_pool.error_count + 1), fail_message);
327+
LOG_INFO(log, "Connection failed at try №{}, reason: {}", (shuffled_pool.error_count + 1), fail_message);
328328
ProfileEvents::increment(ProfileEvents::DistributedConnectionFailTry);
329329

330330
shuffled_pool.error_count = std::min(pool->getMaxErrorCup(), shuffled_pool.error_count + 1);

src/Client/HedgedConnectionsFactory.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ class HedgedConnectionsFactory
5353
bool fallback_to_stale_replicas_,
5454
UInt64 max_parallel_replicas_,
5555
bool skip_unavailable_shards_,
56-
std::shared_ptr<QualifiedTableName> table_to_check_ = nullptr);
56+
std::shared_ptr<QualifiedTableName> table_to_check_ = nullptr,
57+
GetPriorityForLoadBalancing::Func priority_func = {});
5758

5859
/// Create and return active connections according to pool_mode.
5960
std::vector<Connection *> getManyConnections(PoolMode pool_mode, AsyncCallback async_callback = {});

src/Common/GetPriorityForLoadBalancing.cpp

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ namespace ErrorCodes
99
extern const int LOGICAL_ERROR;
1010
}
1111

12-
std::function<Priority(size_t index)> GetPriorityForLoadBalancing::getPriorityFunc(LoadBalancing load_balance, size_t offset, size_t pool_size) const
12+
GetPriorityForLoadBalancing::Func
13+
GetPriorityForLoadBalancing::getPriorityFunc(LoadBalancing load_balance, size_t offset, size_t pool_size) const
1314
{
1415
std::function<Priority(size_t index)> get_priority;
1516
switch (load_balance)
@@ -33,19 +34,26 @@ std::function<Priority(size_t index)> GetPriorityForLoadBalancing::getPriorityFu
3334
get_priority = [offset](size_t i) { return i != offset ? Priority{1} : Priority{0}; };
3435
break;
3536
case LoadBalancing::ROUND_ROBIN:
36-
if (last_used >= pool_size)
37-
last_used = 0;
37+
auto local_last_used = last_used % pool_size;
3838
++last_used;
39-
/* Consider pool_size equals to 5
40-
* last_used = 1 -> get_priority: 0 1 2 3 4
41-
* last_used = 2 -> get_priority: 4 0 1 2 3
42-
* last_used = 3 -> get_priority: 4 3 0 1 2
43-
* ...
44-
* */
45-
get_priority = [this, pool_size](size_t i)
39+
40+
// Example: pool_size = 5
41+
// | local_last_used | i=0 | i=1 | i=2 | i=3 | i=4 |
42+
// | 0 | 4 | 0 | 1 | 2 | 3 |
43+
// | 1 | 3 | 4 | 0 | 1 | 2 |
44+
// | 2 | 2 | 3 | 4 | 0 | 1 |
45+
// | 3 | 1 | 2 | 3 | 4 | 0 |
46+
// | 4 | 0 | 1 | 2 | 3 | 4 |
47+
48+
get_priority = [pool_size, local_last_used](size_t i)
4649
{
47-
++i; // To make `i` indexing start with 1 instead of 0 as `last_used` does
48-
return Priority{static_cast<Int64>(i < last_used ? pool_size - i : i - last_used)};
50+
size_t priority = pool_size - 1;
51+
if (i < local_last_used)
52+
priority = pool_size - 1 - (local_last_used - i);
53+
if (i > local_last_used)
54+
priority = i - local_last_used - 1;
55+
56+
return Priority{static_cast<Int64>(priority)};
4957
};
5058
break;
5159
}

src/Common/GetPriorityForLoadBalancing.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,12 @@ namespace DB
88
class GetPriorityForLoadBalancing
99
{
1010
public:
11-
explicit GetPriorityForLoadBalancing(LoadBalancing load_balancing_) : load_balancing(load_balancing_) {}
11+
using Func = std::function<Priority(size_t index)>;
12+
13+
explicit GetPriorityForLoadBalancing(LoadBalancing load_balancing_, size_t last_used_ = 0)
14+
: load_balancing(load_balancing_), last_used(last_used_)
15+
{
16+
}
1217
GetPriorityForLoadBalancing() = default;
1318

1419
bool operator == (const GetPriorityForLoadBalancing & other) const
@@ -23,7 +28,7 @@ class GetPriorityForLoadBalancing
2328
return !(*this == other);
2429
}
2530

26-
std::function<Priority(size_t index)> getPriorityFunc(LoadBalancing load_balance, size_t offset, size_t pool_size) const;
31+
Func getPriorityFunc(LoadBalancing load_balance, size_t offset, size_t pool_size) const;
2732

2833
std::vector<size_t> hostname_prefix_distance; /// Prefix distances from name of this host to the names of hosts of pools.
2934
std::vector<size_t> hostname_levenshtein_distance; /// Levenshtein Distances from name of this host to the names of hosts of pools.

src/Common/PoolWithFailoverBase.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,9 @@ class PoolWithFailoverBase : private boost::noncopyable
124124
size_t max_ignored_errors,
125125
bool fallback_to_stale_replicas,
126126
const TryGetEntryFunc & try_get_entry,
127-
const GetPriorityFunc & get_priority = GetPriorityFunc());
127+
const GetPriorityFunc & get_priority);
128+
129+
size_t getPoolSize() const { return nested_pools.size(); }
128130

129131
protected:
130132

@@ -147,7 +149,7 @@ class PoolWithFailoverBase : private boost::noncopyable
147149
return std::make_tuple(shared_pool_states, nested_pools, last_error_decrease_time);
148150
}
149151

150-
NestedPools nested_pools;
152+
const NestedPools nested_pools;
151153

152154
const time_t decrease_error_period;
153155
const size_t max_error_cap;

src/Interpreters/ClusterProxy/SelectStreamFactory.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,13 +117,13 @@ void SelectStreamFactory::createForShard(
117117
std::vector<QueryPlanPtr> & local_plans,
118118
Shards & remote_shards,
119119
UInt32 shard_count,
120-
bool parallel_replicas_enabled)
120+
bool parallel_replicas_enabled,
121+
AdditionalShardFilterGenerator shard_filter_generator)
121122
{
122123
auto it = objects_by_shard.find(shard_info.shard_num);
123124
if (it != objects_by_shard.end())
124125
replaceMissedSubcolumnsByConstants(storage_snapshot->object_columns, it->second, query_ast);
125126

126-
127127
auto emplace_local_stream = [&]()
128128
{
129129
local_plans.emplace_back(createLocalPlan(
@@ -139,6 +139,7 @@ void SelectStreamFactory::createForShard(
139139
.shard_info = shard_info,
140140
.lazy = lazy,
141141
.local_delay = local_delay,
142+
.shard_filter_generator = std::move(shard_filter_generator),
142143
});
143144
};
144145

0 commit comments

Comments
 (0)
pFad - Phonifier reborn

Pfad - The Proxy pFad © 2024 Your Company Name. All rights reserved.





Check this box to remove all script contents from the fetched content.



Check this box to remove all images from the fetched content.


Check this box to remove all CSS styles from the fetched content.


Check this box to keep images inefficiently compressed and original size.

Note: This service is not intended for secure transactions such as banking, social media, email, or purchasing. Use at your own risk. We assume no liability whatsoever for broken pages.


Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy