forked from ClickHouse/ClickHouse
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathTableFunctionURL.cpp
More file actions
171 lines (147 loc) · 6.1 KB
/
TableFunctionURL.cpp
File metadata and controls
171 lines (147 loc) · 6.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
#include <TableFunctions/TableFunctionURL.h>
#include <TableFunctions/registerTableFunctions.h>
#include <Access/Common/AccessFlags.h>
#include <Analyzer/FunctionNode.h>
#include <Analyzer/TableFunctionNode.h>
#include <Core/Settings.h>
#include <Formats/FormatFactory.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/parseColumnsListForTableFunction.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/NamedCollectionsHelpers.h>
#include <Storages/StorageURLCluster.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromVector.h>
namespace DB
{
namespace Setting
{
extern const SettingsUInt64 allow_experimental_parallel_reading_from_replicas;
extern const SettingsBool parallel_replicas_for_cluster_engines;
extern const SettingsString cluster_for_parallel_replicas;
extern const SettingsParallelReplicasMode parallel_replicas_mode;
}
std::vector<size_t> TableFunctionURL::skipAnalysisForArguments(const QueryTreeNodePtr & query_node_table_function, ContextPtr) const
{
auto & table_function_node = query_node_table_function->as<TableFunctionNode &>();
auto & table_function_arguments_nodes = table_function_node.getArguments().getNodes();
size_t table_function_arguments_size = table_function_arguments_nodes.size();
std::vector<size_t> result;
for (size_t i = 0; i < table_function_arguments_size; ++i)
{
auto * function_node = table_function_arguments_nodes[i]->as<FunctionNode>();
if (function_node && function_node->getFunctionName() == "headers")
result.push_back(i);
}
return result;
}
void TableFunctionURL::parseArguments(const ASTPtr & ast, ContextPtr context)
{
/// Clone ast function, because we can modify it's arguments like removing headers.
ITableFunctionFileLike::parseArguments(ast->clone(), context);
}
void TableFunctionURL::parseArgumentsImpl(ASTs & args, const ContextPtr & context)
{
if (auto named_collection = tryGetNamedCollectionWithOverrides(args, context))
{
StorageURL::processNamedCollectionResult(configuration, *named_collection);
filename = configuration.url;
structure = configuration.structure;
compression_method = configuration.compression_method;
format = configuration.format;
if (format == "auto")
format = FormatFactory::instance().tryGetFormatFromFileName(Poco::URI(filename).getPath()).value_or("auto");
StorageURL::evalArgsAndCollectHeaders(args, configuration.headers, context);
}
else
{
size_t count = StorageURL::evalArgsAndCollectHeaders(args, configuration.headers, context);
/// ITableFunctionFileLike cannot parse headers argument, so remove it.
ASTPtr headers_ast;
if (count != args.size())
{
chassert(count + 1 == args.size());
headers_ast = args.back();
args.pop_back();
}
ITableFunctionFileLike::parseArgumentsImpl(args, context);
if (headers_ast)
args.push_back(headers_ast);
}
}
StoragePtr TableFunctionURL::getStorage(
const String & source, const String & format_, const ColumnsDescription & columns, ContextPtr global_context,
const std::string & table_name, const String & compression_method_, bool is_insert_query) const
{
const auto & settings = global_context->getSettingsRef();
const auto is_secondary_query = global_context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY;
const auto parallel_replicas_cluster_name = settings[Setting::cluster_for_parallel_replicas].toString();
const auto can_use_parallel_replicas = !parallel_replicas_cluster_name.empty()
&& settings[Setting::parallel_replicas_for_cluster_engines]
&& global_context->canUseTaskBasedParallelReplicas()
&& !global_context->isDistributed()
&& !is_secondary_query
&& !is_insert_query;
if (can_use_parallel_replicas)
{
return std::make_shared<StorageURLCluster>(
global_context,
parallel_replicas_cluster_name,
filename,
format,
compression_method,
StorageID(getDatabaseName(), table_name),
getActualTableStructure(global_context, /* is_insert_query */ true),
ConstraintsDescription{},
configuration);
}
return std::make_shared<StorageURL>(
source,
StorageID(getDatabaseName(), table_name),
format_,
std::nullopt /*format settings*/,
columns,
ConstraintsDescription{},
String{},
global_context,
compression_method_,
configuration.headers,
configuration.http_method,
nullptr,
/*distributed_processing=*/ is_secondary_query);
}
ColumnsDescription TableFunctionURL::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const
{
if (structure == "auto")
{
if (const auto access_object = getSourceAccessObject())
context->checkAccess(AccessType::READ, toStringSource(*access_object));
if (format == "auto")
return StorageURL::getTableStructureAndFormatFromData(
filename,
chooseCompressionMethod(Poco::URI(filename).getPath(), compression_method),
configuration.headers,
std::nullopt,
context).first;
return StorageURL::getTableStructureFromData(format,
filename,
chooseCompressionMethod(Poco::URI(filename).getPath(), compression_method),
configuration.headers,
std::nullopt,
context);
}
return parseColumnsListFromString(structure, context);
}
std::optional<String> TableFunctionURL::tryGetFormatFromFirstArgument()
{
return FormatFactory::instance().tryGetFormatFromFileName(Poco::URI(filename).getPath());
}
void registerTableFunctionURL(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionURL>();
}
}