forked from ClickHouse/ClickHouse
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathCompressionCodecZSTD.cpp
More file actions
139 lines (118 loc) · 5.44 KB
/
CompressionCodecZSTD.cpp
File metadata and controls
139 lines (118 loc) · 5.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#include <Compression/CompressionCodecZSTD.h>
#include <Compression/CompressionInfo.h>
#include <Compression/CompressionFactory.h>
#include <zstd.h>
#include <Parsers/IAST.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTFunction.h>
#include <Common/typeid_cast.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBuffer.h>
#include <IO/BufferWithOwnMemory.h>
#include <Poco/Logger.h>
#include <Common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_COMPRESS;
extern const int CANNOT_DECOMPRESS;
extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE;
extern const int ILLEGAL_CODEC_PARAMETER;
}
uint8_t CompressionCodecZSTD::getMethodByte() const
{
return static_cast<uint8_t>(CompressionMethodByte::ZSTD);
}
void CompressionCodecZSTD::updateHash(SipHash & hash) const
{
getCodecDesc()->updateTreeHash(hash, /*ignore_aliases=*/ true);
}
UInt32 CompressionCodecZSTD::getMaxCompressedDataSize(UInt32 uncompressed_size) const
{
return static_cast<UInt32>(ZSTD_compressBound(uncompressed_size));
}
UInt32 CompressionCodecZSTD::doCompressData(const char * source, UInt32 source_size, char * dest) const
{
ZSTD_CCtx * cctx = ZSTD_createCCtx();
ZSTD_CCtx_setParameter(cctx, ZSTD_c_compressionLevel, level);
if (enable_long_range)
{
ZSTD_CCtx_setParameter(cctx, ZSTD_c_enableLongDistanceMatching, 1);
ZSTD_CCtx_setParameter(cctx, ZSTD_c_windowLog, window_log); // NB zero window_log means "use default" for libzstd
}
size_t compressed_size = ZSTD_compress2(cctx, dest, ZSTD_compressBound(source_size), source, source_size);
ZSTD_freeCCtx(cctx);
if (ZSTD_isError(compressed_size))
throw Exception(ErrorCodes::CANNOT_COMPRESS, "Cannot compress with ZSTD codec: {}", ZSTD_getErrorName(compressed_size));
return static_cast<UInt32>(compressed_size);
}
void CompressionCodecZSTD::doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const
{
size_t res = ZSTD_decompress(dest, uncompressed_size, source, source_size);
if (ZSTD_isError(res))
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot decompress ZSTD-encoded data: {}", std::string(ZSTD_getErrorName(res)));
}
CompressionCodecZSTD::CompressionCodecZSTD(int level_, int window_log_)
: level(level_)
, enable_long_range(true)
, window_log(window_log_)
{
setCodecDescription(
"ZSTD", {std::make_shared<ASTLiteral>(static_cast<UInt64>(level)), std::make_shared<ASTLiteral>(static_cast<UInt64>(window_log))});
}
CompressionCodecZSTD::CompressionCodecZSTD(int level_)
: level(level_)
, enable_long_range(false)
, window_log(0)
{
setCodecDescription("ZSTD", {std::make_shared<ASTLiteral>(static_cast<UInt64>(level))});
}
void registerCodecZSTD(CompressionCodecFactory & factory)
{
UInt8 method_code = static_cast<UInt8>(CompressionMethodByte::ZSTD);
factory.registerCompressionCodec("ZSTD", method_code, [&](const ASTPtr & arguments) -> CompressionCodecPtr {
int level = CompressionCodecZSTD::ZSTD_DEFAULT_LEVEL;
if (arguments && !arguments->children.empty())
{
if (arguments->children.size() > 2)
throw Exception(ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE, "ZSTD codec must have 1 or 2 parameters, given {}",
arguments->children.size());
const auto children = arguments->children;
const auto * literal = children[0]->as<ASTLiteral>();
if (!literal)
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "ZSTD codec argument must be integer");
level = static_cast<int>(literal->value.safeGet<UInt64>());
if (level > ZSTD_maxCLevel())
{
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER,
"ZSTD codec can't have level more than {}, given {}",
ZSTD_maxCLevel(), level);
}
if (arguments->children.size() > 1)
{
const auto * window_literal = children[1]->as<ASTLiteral>();
if (!window_literal)
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "ZSTD codec second argument must be integer");
const int window_log = static_cast<int>(window_literal->value.safeGet<UInt64>());
ZSTD_bounds window_log_bounds = ZSTD_cParam_getBounds(ZSTD_c_windowLog);
if (ZSTD_isError(window_log_bounds.error))
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "ZSTD windowLog parameter is not supported {}",
std::string(ZSTD_getErrorName(window_log_bounds.error)));
// 0 means "use default" for libzstd
if (window_log != 0 && (window_log > window_log_bounds.upperBound || window_log < window_log_bounds.lowerBound))
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER,
"ZSTD codec can't have window log more than {} and lower than {}, given {}",
toString(window_log_bounds.upperBound),
toString(window_log_bounds.lowerBound), toString(window_log));
return std::make_shared<CompressionCodecZSTD>(level, window_log);
}
}
return std::make_shared<CompressionCodecZSTD>(level);
});
}
CompressionCodecPtr getCompressionCodecZSTD(int level)
{
return std::make_shared<CompressionCodecZSTD>(level);
}
}