@@ -118,18 +118,18 @@ ConnectionPoolWithFailover::Status ConnectionPoolWithFailover::getStatus() const
118118 return result;
119119}
120120
121- std::vector<IConnectionPool::Entry> ConnectionPoolWithFailover::getMany (const ConnectionTimeouts & timeouts,
122- const Settings & settings,
123- PoolMode pool_mode,
124- AsyncCallback async_callback,
125- std::optional<bool > skip_unavailable_endpoints)
121+ std::vector<IConnectionPool::Entry> ConnectionPoolWithFailover::getMany (
122+ const ConnectionTimeouts & timeouts,
123+ const Settings & settings,
124+ PoolMode pool_mode,
125+ AsyncCallback async_callback,
126+ std::optional<bool > skip_unavailable_endpoints,
127+ GetPriorityForLoadBalancing::Func priority_func)
126128{
127129 TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
128- {
129- return tryGetEntry (pool, timeouts, fail_message, settings, nullptr , async_callback);
130- };
130+ { return tryGetEntry (pool, timeouts, fail_message, settings, nullptr , async_callback); };
131131
132- std::vector<TryResult> results = getManyImpl (settings, pool_mode, try_get_entry, skip_unavailable_endpoints);
132+ std::vector<TryResult> results = getManyImpl (settings, pool_mode, try_get_entry, skip_unavailable_endpoints, priority_func );
133133
134134 std::vector<Entry> entries;
135135 entries.reserve (results.size ());
@@ -153,17 +153,17 @@ std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::g
153153
154154std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::getManyChecked (
155155 const ConnectionTimeouts & timeouts,
156- const Settings & settings, PoolMode pool_mode,
156+ const Settings & settings,
157+ PoolMode pool_mode,
157158 const QualifiedTableName & table_to_check,
158159 AsyncCallback async_callback,
159- std::optional<bool > skip_unavailable_endpoints)
160+ std::optional<bool > skip_unavailable_endpoints,
161+ GetPriorityForLoadBalancing::Func priority_func)
160162{
161163 TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
162- {
163- return tryGetEntry (pool, timeouts, fail_message, settings, &table_to_check, async_callback);
164- };
164+ { return tryGetEntry (pool, timeouts, fail_message, settings, &table_to_check, async_callback); };
165165
166- return getManyImpl (settings, pool_mode, try_get_entry, skip_unavailable_endpoints);
166+ return getManyImpl (settings, pool_mode, try_get_entry, skip_unavailable_endpoints, priority_func );
167167}
168168
169169ConnectionPoolWithFailover::Base::GetPriorityFunc ConnectionPoolWithFailover::makeGetPriorityFunc (const Settings & settings)
@@ -175,14 +175,16 @@ ConnectionPoolWithFailover::Base::GetPriorityFunc ConnectionPoolWithFailover::ma
175175}
176176
177177std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::getManyImpl (
178- const Settings & settings,
179- PoolMode pool_mode,
180- const TryGetEntryFunc & try_get_entry,
181- std::optional<bool > skip_unavailable_endpoints)
178+ const Settings & settings,
179+ PoolMode pool_mode,
180+ const TryGetEntryFunc & try_get_entry,
181+ std::optional<bool > skip_unavailable_endpoints,
182+ GetPriorityForLoadBalancing::Func priority_func)
182183{
183184 if (nested_pools.empty ())
184- throw DB::Exception (DB::ErrorCodes::ALL_CONNECTION_TRIES_FAILED,
185- " Cannot get connection from ConnectionPoolWithFailover cause nested pools are empty" );
185+ throw DB::Exception (
186+ DB::ErrorCodes::ALL_CONNECTION_TRIES_FAILED,
187+ " Cannot get connection from ConnectionPoolWithFailover cause nested pools are empty" );
186188
187189 if (!skip_unavailable_endpoints.has_value ())
188190 skip_unavailable_endpoints = settings.skip_unavailable_shards ;
@@ -203,14 +205,13 @@ std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::g
203205 else
204206 throw DB::Exception (DB::ErrorCodes::LOGICAL_ERROR, " Unknown pool allocation mode" );
205207
206- GetPriorityFunc get_priority = makeGetPriorityFunc (settings);
208+ if (!priority_func)
209+ priority_func = makeGetPriorityFunc (settings);
207210
208211 UInt64 max_ignored_errors = settings.distributed_replica_max_ignored_errors .value ;
209212 bool fallback_to_stale_replicas = settings.fallback_to_stale_replicas_for_distributed_queries .value ;
210213
211- return Base::getMany (min_entries, max_entries, max_tries,
212- max_ignored_errors, fallback_to_stale_replicas,
213- try_get_entry, get_priority);
214+ return Base::getMany (min_entries, max_entries, max_tries, max_ignored_errors, fallback_to_stale_replicas, try_get_entry, priority_func);
214215}
215216
216217ConnectionPoolWithFailover::TryResult
@@ -251,11 +252,14 @@ ConnectionPoolWithFailover::tryGetEntry(
251252 return result;
252253}
253254
254- std::vector<ConnectionPoolWithFailover::Base::ShuffledPool> ConnectionPoolWithFailover::getShuffledPools (const Settings & settings)
255+ std::vector<ConnectionPoolWithFailover::Base::ShuffledPool>
256+ ConnectionPoolWithFailover::getShuffledPools (const Settings & settings, GetPriorityForLoadBalancing::Func priority_func)
255257{
256- GetPriorityFunc get_priority = makeGetPriorityFunc (settings);
258+ if (!priority_func)
259+ priority_func = makeGetPriorityFunc (settings);
260+
257261 UInt64 max_ignored_errors = settings.distributed_replica_max_ignored_errors .value ;
258- return Base::getShuffledPools (max_ignored_errors, get_priority );
262+ return Base::getShuffledPools (max_ignored_errors, priority_func );
259263}
260264
261265}
0 commit comments