-
Notifications
You must be signed in to change notification settings - Fork 8.2k
Expand file tree
/
Copy pathCgroupsMemoryUsageObserver.cpp
More file actions
93 lines (74 loc) · 2.62 KB
/
CgroupsMemoryUsageObserver.cpp
File metadata and controls
93 lines (74 loc) · 2.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
#include <Common/CgroupsMemoryUsageObserver.h>
#if defined(OS_LINUX)
#include <Common/setThreadName.h>
#include <Common/logger_useful.h>
#include <Common/formatReadable.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/ReadHelpers.h>
#include <base/cgroupsv2.h>
#include <base/getMemoryAmount.h>
#include <cstdint>
using namespace DB;
namespace DB
{
CgroupsMemoryUsageObserver::CgroupsMemoryUsageObserver(std::chrono::seconds wait_time_)
: log(getLogger("CgroupsMemoryUsageObserver")), wait_time(wait_time_)
{}
CgroupsMemoryUsageObserver::~CgroupsMemoryUsageObserver()
{
stopThread();
}
void CgroupsMemoryUsageObserver::setOnMemoryAmountAvailableChangedFn(OnMemoryAmountAvailableChangedFn on_memory_amount_available_changed_)
{
std::lock_guard<std::mutex> memory_amount_available_changed_lock(memory_amount_available_changed_mutex);
on_memory_amount_available_changed = on_memory_amount_available_changed_;
}
void CgroupsMemoryUsageObserver::startThread()
{
if (!thread.joinable())
{
thread = ThreadFromGlobalPool(&CgroupsMemoryUsageObserver::runThread, this);
LOG_INFO(log, "Started cgroup current memory usage observer thread");
}
}
void CgroupsMemoryUsageObserver::stopThread()
{
{
std::lock_guard lock(thread_mutex);
if (!thread.joinable())
return;
quit = true;
}
cond.notify_one();
thread.join();
LOG_INFO(log, "Stopped cgroup current memory usage observer thread");
}
void CgroupsMemoryUsageObserver::runThread()
{
DB::setThreadName(DB::ThreadName::CGROUP_MEMORY_OBSERVER);
last_available_memory_amount = getMemoryAmount();
LOG_INFO(log, "Memory amount initially available to the process is {}", ReadableSize(last_available_memory_amount));
std::unique_lock lock(thread_mutex);
while (true)
{
if (cond.wait_for(lock, wait_time, [this] { return quit; }))
break;
try
{
uint64_t available_memory_amount = getMemoryAmount();
if (available_memory_amount != last_available_memory_amount)
{
LOG_INFO(log, "Memory amount available to the process changed from {} to {}", ReadableSize(last_available_memory_amount), ReadableSize(available_memory_amount));
last_available_memory_amount = available_memory_amount;
std::lock_guard<std::mutex> memory_amount_available_changed_lock(memory_amount_available_changed_mutex);
on_memory_amount_available_changed();
}
}
catch (...)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
}
}
}
#endif