@@ -227,55 +227,69 @@ void HttpCzarQueryModule::_dumpQueryInfo(string const& func, SubmitResult const&
227227
228228json HttpCzarQueryModule::_waitAndExtractResult (SubmitResult const & submitResult,
229229 http::BinaryEncodingMode binaryEncoding, bool async) const {
230- if (!(submitResult.status != " COMPLETED" || submitResult.status != " EXECUTING" )) {
231- throw http::Error (context () + __func__, submitResult.errorMessage );
232- }
233- // Block the current thread before the query will finish or fail.
234- string const messageSelectQuery =
235- " SELECT chunkId, code, message, severity+0, timeStamp FROM " + submitResult.messageTable ;
236230 auto const conn =
237231 sql::SqlConnectionFactory::make (cconfig::CzarConfig::instance ()->getMySqlResultConfig ());
238- sql::SqlResults messageQueryResults;
239- sql::SqlErrorObject messageQueryErr;
240- if (!conn-> runQuery (messageSelectQuery, messageQueryResults, messageQueryErr)) {
241- _dropTable (submitResult. messageTable );
242- _dropTable (submitResult.resultTable );
243- string const msg = " failed query= " + messageSelectQuery + " err= " + messageQueryErr. printErrMsg ();
244- error (__func__, msg);
245- throw http::Error ( context () + __func__, msg );
246- }
232+ if (async) {
233+ // For the async queries the result table is not going to be ready
234+ // until the query is completed. The caller should wait for the query
235+ // to finish before calling this method.
236+ if (submitResult.status != " COMPLETED " ) {
237+ throw http::Error ( context () + __func__,
238+ " queryId= " + to_string (submitResult. queryId ) +
239+ " is still executing or it has failed, status= " + submitResult. status );
240+ }
247241
248- // Read thе message table to see if the user query suceeded or failed
249- vector<string> chunkId;
250- vector<string> code;
251- vector<string> message;
252- vector<string> severity;
253- sql::SqlErrorObject messageProcessErr;
254- if (!messageQueryResults.extractFirst4Columns (chunkId, code, message, severity, messageProcessErr)) {
255- messageQueryResults.freeResults ();
242+ // For the async queries the message table (if it still exists) is not used.
256243 _dropTable (submitResult.messageTable );
257- _dropTable (submitResult.resultTable );
258- string const msg = " failed to extract results of query=" + messageSelectQuery +
259- " err=" + messageProcessErr.printErrMsg ();
260- error (__func__, msg);
261- throw http::Error (context () + __func__, msg);
262- }
263- string errorMsg;
264- for (size_t i = 0 ; i < chunkId.size (); ++i) {
265- if (stoi (code[i]) > 0 ) {
266- errorMsg += " [chunkId=" + chunkId[i] + " code=" + code[i] + " message=" + message[i] +
267- " severity=" + severity[i] + " ], " ;
244+ } else {
245+ // The sync queries are required to wait on the message table to be unlocked
246+ // before proceeding to the result table. The message table is expected to be ready
247+ // at any point of time after the query has been submitted. If the query is still
248+ // executing the thread will block until the query is completed or failed.
249+ string const messageSelectQuery =
250+ " SELECT chunkId, code, message, severity+0, timeStamp FROM " + submitResult.messageTable ;
251+ sql::SqlResults messageQueryResults;
252+ sql::SqlErrorObject messageQueryErr;
253+ if (!conn->runQuery (messageSelectQuery, messageQueryResults, messageQueryErr)) {
254+ _dropTable (submitResult.messageTable );
255+ _dropTable (submitResult.resultTable );
256+ string const msg = " failed query=" + messageSelectQuery + " err=" + messageQueryErr.printErrMsg ();
257+ error (__func__, msg);
258+ throw http::Error (context () + __func__, msg);
259+ }
260+
261+ // Read thе message table to see if the user query suceeded or failed
262+ vector<string> chunkId;
263+ vector<string> code;
264+ vector<string> message;
265+ vector<string> severity;
266+ sql::SqlErrorObject messageProcessErr;
267+ if (!messageQueryResults.extractFirst4Columns (chunkId, code, message, severity, messageProcessErr)) {
268+ messageQueryResults.freeResults ();
269+ _dropTable (submitResult.messageTable );
270+ _dropTable (submitResult.resultTable );
271+ string const msg = " failed to extract results of query=" + messageSelectQuery +
272+ " err=" + messageProcessErr.printErrMsg ();
273+ error (__func__, msg);
274+ throw http::Error (context () + __func__, msg);
275+ }
276+ string errorMsg;
277+ for (size_t i = 0 ; i < chunkId.size (); ++i) {
278+ if (stoi (code[i]) > 0 ) {
279+ errorMsg += " [chunkId=" + chunkId[i] + " code=" + code[i] + " message=" + message[i] +
280+ " severity=" + severity[i] + " ], " ;
281+ }
282+ }
283+ if (!errorMsg.empty ()) {
284+ messageQueryResults.freeResults ();
285+ _dropTable (submitResult.messageTable );
286+ _dropTable (submitResult.resultTable );
287+ error (__func__, errorMsg);
288+ throw http::Error (context () + __func__, errorMsg);
268289 }
269- }
270- if (!errorMsg.empty ()) {
271290 messageQueryResults.freeResults ();
272291 _dropTable (submitResult.messageTable );
273- _dropTable (submitResult.resultTable );
274- error (__func__, errorMsg);
275- throw http::Error (context () + __func__, errorMsg);
276292 }
277- messageQueryResults.freeResults ();
278- _dropTable (submitResult.messageTable );
279293
280294 // Read a result set from the result table, package it into the JSON object
281295 // and sent it back to a user.
@@ -302,9 +316,9 @@ json HttpCzarQueryModule::_waitAndExtractResult(SubmitResult const& submitResult
302316 json rowsJson = _rowsToJson (resultQueryResults, schemaJson, binaryEncoding);
303317 resultQueryResults.freeResults ();
304318
305- // Note that the result table of the asynchronously submitted queries
306- // is now explicitly deleted by the caller after the result has been extracted.
307- // Otherwise the table will be automatically garbage collected by Czar after
319+ // Note that the result table of the asynchronously submitted queries is required to be
320+ // explicitly deleted by the caller after the result has been extracted. If this is not
321+ // done by the user the table will be automatically garbage collected by Czar after
308322 // a certain period of time.
309323 if (!async) {
310324 _dropTable (submitResult.resultTable );
0 commit comments