forked from ClickHouse/ClickHouse
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathConnectionPoolWithFailover.h
More file actions
145 lines (119 loc) · 5.51 KB
/
ConnectionPoolWithFailover.h
File metadata and controls
145 lines (119 loc) · 5.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#pragma once
#include <Common/PoolWithFailoverBase.h>
#include <Common/GetPriorityForLoadBalancing.h>
#include <Client/ConnectionPool.h>
#include <chrono>
#include <vector>
namespace DB
{
/** Connection pool with fault tolerance.
* Initialized by several other IConnectionPools.
* When a connection is received, it tries to create or select a live connection from a pool,
* fetch them in some order, using no more than the specified number of attempts.
* Pools with fewer errors are preferred;
* pools with the same number of errors are tried in random order.
*
* Note: if one of the nested pools is blocked due to overflow, then this pool will also be blocked.
*/
/// Specifies how many connections to return from ConnectionPoolWithFailover::getMany() method.
enum class PoolMode : uint8_t
{
/// Return exactly one connection.
GET_ONE = 0,
/// Return a number of connections, this number being determined by max_parallel_replicas setting.
GET_MANY,
/// Return a connection from each nested pool.
GET_ALL
};
class ConnectionPoolWithFailover : public IConnectionPool, private PoolWithFailoverBase<IConnectionPool>
{
public:
ConnectionPoolWithFailover(
ConnectionPoolPtrs nested_pools_,
LoadBalancing load_balancing,
time_t decrease_error_period_ = DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_DECREASE_ERROR_PERIOD,
size_t max_error_cap = DBMS_CONNECTION_POOL_WITH_FAILOVER_MAX_ERROR_COUNT);
using Entry = IConnectionPool::Entry;
using PoolWithFailoverBase<IConnectionPool>::getValidTryResult;
/** Allocates connection to work. */
Entry get(const ConnectionTimeouts & timeouts) override;
Entry get(const ConnectionTimeouts & timeouts,
const Settings & settings,
bool force_connected) override; /// From IConnectionPool
/** Allocates up to the specified number of connections to work.
* Connections provide access to different replicas of one shard.
*/
std::vector<Entry> getMany(
const ConnectionTimeouts & timeouts,
const Settings & settings,
PoolMode pool_mode,
AsyncCallback async_callback = {},
std::optional<bool> skip_unavailable_endpoints = std::nullopt,
GetPriorityForLoadBalancing::Func priority_func = {});
/// The same as getMany(), but return std::vector<TryResult>.
std::vector<TryResult> getManyForTableFunction(const ConnectionTimeouts & timeouts,
const Settings & settings, PoolMode pool_mode);
using Base = PoolWithFailoverBase<IConnectionPool>;
using TryResult = Base::TryResult;
/// The same as getMany(), but check that replication delay for table_to_check is acceptable.
/// Delay threshold is taken from settings.
std::vector<TryResult> getManyChecked(
const ConnectionTimeouts & timeouts,
const Settings & settings,
PoolMode pool_mode,
const QualifiedTableName & table_to_check,
AsyncCallback async_callback = {},
std::optional<bool> skip_unavailable_endpoints = std::nullopt,
GetPriorityForLoadBalancing::Func priority_func = {});
/// The same as getManyChecked(), but respects distributed_insert_skip_read_only_replicas setting.
std::vector<TryResult> getManyCheckedForInsert(
const ConnectionTimeouts & timeouts,
const Settings & settings,
PoolMode pool_mode,
const QualifiedTableName & table_to_check);
struct NestedPoolStatus
{
const Base::NestedPoolPtr pool;
size_t error_count = 0;
size_t slowdown_count = 0;
std::chrono::seconds estimated_recovery_time;
};
using Status = std::vector<NestedPoolStatus>;
Status getStatus() const;
std::vector<Base::ShuffledPool> getShuffledPools(const Settings & settings, GetPriorityFunc priority_func = {}, bool use_slowdown_count = false);
size_t getMaxErrorCap() const { return Base::max_error_cap; }
void updateSharedError(std::vector<ShuffledPool> & shuffled_pools)
{
Base::updateSharedErrorCounts(shuffled_pools);
}
void incrementErrorCount(ConnectionPoolPtr pool)
{
Base::incrementErrorCount(pool);
}
size_t getPoolSize() const { return Base::getPoolSize(); }
private:
/// Get the values of relevant settings and call Base::getMany()
std::vector<TryResult> getManyImpl(
const Settings & settings,
PoolMode pool_mode,
const TryGetEntryFunc & try_get_entry,
std::optional<bool> skip_unavailable_endpoints = std::nullopt,
GetPriorityForLoadBalancing::Func priority_func = {},
bool skip_read_only_replicas = false);
/// Try to get a connection from the pool and check that it is good.
/// If table_to_check is not null and the check is enabled in settings, check that replication delay
/// for this table is not too large.
TryResult tryGetEntry(
const ConnectionPoolPtr & pool,
const ConnectionTimeouts & timeouts,
std::string & fail_message,
const Settings & settings,
const QualifiedTableName * table_to_check = nullptr,
AsyncCallback async_callback = {},
bool force_connected = false);
GetPriorityForLoadBalancing::Func makeGetPriorityFunc(const Settings & settings);
GetPriorityForLoadBalancing get_priority_load_balancing;
};
using ConnectionPoolWithFailoverPtr = std::shared_ptr<ConnectionPoolWithFailover>;
using ConnectionPoolWithFailoverPtrs = std::vector<ConnectionPoolWithFailoverPtr>;
}