forked from ClickHouse/ClickHouse
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathClientBase.h
More file actions
241 lines (186 loc) · 9.08 KB
/
ClientBase.h
File metadata and controls
241 lines (186 loc) · 9.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
#pragma once
#include <Common/ProgressIndication.h>
#include <Common/InterruptListener.h>
#include <Common/ShellCommand.h>
#include <Core/ExternalTable.h>
#include <Poco/Util/Application.h>
#include <Interpreters/Context.h>
#include <Client/Suggest.h>
#include <Client/QueryFuzzer.h>
#include <boost/program_options.hpp>
namespace po = boost::program_options;
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
enum MultiQueryProcessingStage
{
QUERIES_END,
PARSING_EXCEPTION,
CONTINUE_PARSING,
EXECUTE_QUERY,
PARSING_FAILED,
};
void interruptSignalHandler(int signum);
class InternalTextLogs;
class ClientBase : public Poco::Util::Application
{
public:
using Arguments = std::vector<String>;
ClientBase();
~ClientBase() override;
void init(int argc, char ** argv);
protected:
void runInteractive();
void runNonInteractive();
virtual bool processWithFuzzing(const String &)
{
throw Exception("Query processing with fuzzing is not implemented", ErrorCodes::NOT_IMPLEMENTED);
}
virtual bool executeMultiQuery(const String & all_queries_text) = 0;
virtual void connect() = 0;
virtual void processError(const String & query) const = 0;
virtual String getName() const = 0;
void processOrdinaryQuery(const String & query_to_execute, ASTPtr parsed_query);
void processInsertQuery(const String & query_to_execute, ASTPtr parsed_query);
void processTextAsSingleQuery(const String & full_query);
void processParsedSingleQuery(const String & full_query, const String & query_to_execute,
ASTPtr parsed_query, std::optional<bool> echo_query_ = {}, bool report_error = false);
static void adjustQueryEnd(const char *& this_query_end, const char * all_queries_end, int max_parser_depth);
ASTPtr parseQuery(const char *& pos, const char * end, bool allow_multi_statements) const;
static void setupSignalHandler();
MultiQueryProcessingStage analyzeMultiQueryText(
const char *& this_query_begin, const char *& this_query_end, const char * all_queries_end,
String & query_to_execute, ASTPtr & parsed_query, const String & all_queries_text,
std::optional<Exception> & current_exception);
/// For non-interactive multi-query mode get queries text prefix.
virtual String getQueryTextPrefix() { return ""; }
static void clearTerminal();
void showClientVersion();
using ProgramOptionsDescription = boost::program_options::options_description;
using CommandLineOptions = boost::program_options::variables_map;
struct OptionsDescription
{
std::optional<ProgramOptionsDescription> main_description;
std::optional<ProgramOptionsDescription> external_description;
};
virtual void printHelpMessage(const OptionsDescription & options_description) = 0;
virtual void addAndCheckOptions(OptionsDescription & options_description, po::variables_map & options, Arguments & arguments) = 0;
virtual void processOptions(const OptionsDescription & options_description,
const CommandLineOptions & options,
const std::vector<Arguments> & external_tables_arguments) = 0;
virtual void processConfig() = 0;
private:
bool processQueryText(const String & text);
void receiveResult(ASTPtr parsed_query);
bool receiveAndProcessPacket(ASTPtr parsed_query, bool cancelled);
void receiveLogs(ASTPtr parsed_query);
bool receiveSampleBlock(Block & out, ColumnsDescription & columns_description, ASTPtr parsed_query);
bool receiveEndOfQuery();
void onProgress(const Progress & value);
void onData(Block & block, ASTPtr parsed_query);
void onLogData(Block & block);
void onTotals(Block & block, ASTPtr parsed_query);
void onExtremes(Block & block, ASTPtr parsed_query);
void onReceiveExceptionFromServer(std::unique_ptr<Exception> && e);
void onProfileInfo(const ProfileInfo & profile_info);
void onEndOfStream();
void onProfileEvents(Block & block);
void sendData(Block & sample, const ColumnsDescription & columns_description, ASTPtr parsed_query);
void sendDataFrom(ReadBuffer & buf, Block & sample,
const ColumnsDescription & columns_description, ASTPtr parsed_query);
void sendExternalTables(ASTPtr parsed_query);
void initBlockOutputStream(const Block & block, ASTPtr parsed_query);
void initLogsOutputStream();
inline String prompt() const
{
return boost::replace_all_copy(prompt_by_server_display_name, "{database}", config().getString("database", "default"));
}
void resetOutput();
void outputQueryInfo(bool echo_query_);
void readArguments(int argc, char ** argv, Arguments & common_arguments, std::vector<Arguments> & external_tables_arguments);
protected:
bool is_interactive = false; /// Use either interactive line editing interface or batch mode.
bool is_multiquery = false;
bool echo_queries = false; /// Print queries before execution in batch mode.
bool ignore_error = false; /// In case of errors, don't print error message, continue to next query. Only applicable for non-interactive mode.
bool print_time_to_stderr = false; /// Output execution time to stderr in batch mode.
bool load_suggestions = false;
std::vector<String> queries_files; /// If not empty, queries will be read from these files
std::vector<String> interleave_queries_files; /// If not empty, run queries from these files before processing every file from 'queries_files'.
bool stdin_is_a_tty = false; /// stdin is a terminal.
bool stdout_is_a_tty = false; /// stdout is a terminal.
uint64_t terminal_width = 0;
ServerConnectionPtr connection;
ConnectionParameters connection_parameters;
String format; /// Query results output format.
bool is_default_format = true; /// false, if format is set in the config or command line.
size_t format_max_block_size = 0; /// Max block size for console output.
String insert_format; /// Format of INSERT data that is read from stdin in batch mode.
size_t insert_format_max_block_size = 0; /// Max block size when reading INSERT data.
size_t max_client_network_bandwidth = 0; /// The maximum speed of data exchange over the network for the client in bytes per second.
bool has_vertical_output_suffix = false; /// Is \G present at the end of the query string?
/// We will format query_id in interactive mode in various ways, the default is just to print Query id: ...
std::vector<std::pair<String, String>> query_id_formats;
/// Settings specified via command line args
Settings cmd_settings;
SharedContextHolder shared_context;
ContextMutablePtr global_context;
/// Buffer that reads from stdin in batch mode.
ReadBufferFromFileDescriptor std_in{STDIN_FILENO};
/// Console output.
WriteBufferFromFileDescriptor std_out{STDOUT_FILENO};
std::unique_ptr<ShellCommand> pager_cmd;
/// The user can specify to redirect query output to a file.
std::unique_ptr<WriteBuffer> out_file_buf;
std::shared_ptr<IOutputFormat> output_format;
/// The user could specify special file for server logs (stderr by default)
std::unique_ptr<WriteBuffer> out_logs_buf;
String server_logs_file;
std::unique_ptr<InternalTextLogs> logs_out_stream;
String home_path;
String history_file; /// Path to a file containing command history.
String current_profile;
UInt64 server_revision = 0;
String server_version;
String prompt_by_server_display_name;
String server_display_name;
ProgressIndication progress_indication;
bool need_render_progress = true;
bool written_first_block = false;
size_t processed_rows = 0; /// How many rows have been read or written.
/// The last exception that was received from the server. Is used for the
/// return code in batch mode.
std::unique_ptr<Exception> server_exception;
/// Likewise, the last exception that occurred on the client.
std::unique_ptr<Exception> client_exception;
/// If the last query resulted in exception. `server_exception` or
/// `client_exception` must be set.
bool have_error = false;
std::list<ExternalTable> external_tables; /// External tables info.
bool send_external_tables = false;
NameToNameMap query_parameters; /// Dictionary with query parameters for prepared statements.
QueryFuzzer fuzzer;
int query_fuzzer_runs = 0;
QueryProcessingStage::Enum query_processing_stage;
struct HostPort
{
String host;
std::optional<int> port{};
friend std::istream & operator>>(std::istream & in, HostPort & hostPort) {
String
host_with_port,
delimiter = ":";
in >> host_with_port;
size_t delimiter_pos = host_with_port.find(delimiter);
hostPort.host = host_with_port.substr(0, delimiter_pos);
if (delimiter_pos < host_with_port.length())
hostPort.port = std::stoi(host_with_port.substr(delimiter_pos + 1, host_with_port.length()));
return in;
}
};
std::vector<HostPort> hosts_ports{};
};
}