1616
1717#include " QueryEngine/ColumnFetcher.h"
1818
19- #include < memory>
20-
2119#include " DataMgr/ArrayNoneEncoder.h"
2220#include " QueryEngine/ErrorHandling.h"
2321#include " QueryEngine/Execute.h"
2422#include " Shared/Intervals.h"
2523#include " Shared/likely.h"
2624#include " Shared/sqltypes.h"
2725
26+ #include < tbb/parallel_for.h>
27+ #include < memory>
28+
2829namespace {
2930
3031std::string getMemoryLevelString (Data_Namespace::MemoryLevel memoryLevel) {
@@ -239,6 +240,11 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments(
239240 int db_id = col_info->db_id ;
240241 int table_id = col_info->table_id ;
241242 int col_id = col_info->column_id ;
243+
244+ // Array type passed to getAllTableColumnFragments. Should be handled in
245+ // linearization.
246+ CHECK (!col_info->type ->isString () && !col_info->type ->isArray ());
247+
242248 const auto fragments_it = all_tables_fragments.find ({db_id, table_id});
243249 CHECK (fragments_it != all_tables_fragments.end ());
244250 const auto fragments = fragments_it->second ;
@@ -248,7 +254,6 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments(
248254 const InputDescriptor table_desc (db_id, table_id, int (0 ));
249255 {
250256 std::lock_guard<std::mutex> columnar_conversion_guard (columnar_fetch_mutex_);
251-
252257 auto col_token = data_provider_->getZeroCopyColumnData (*col_info);
253258 if (col_token != nullptr ) {
254259 size_t num_rows = col_token->getSize () / col_token->getType ()->size ();
@@ -262,44 +267,91 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments(
262267 }
263268
264269 auto column_it = columnarized_scan_table_cache_.find ({table_id, col_id});
265- if (column_it == columnarized_scan_table_cache_.end ()) {
266- for (size_t frag_id = 0 ; frag_id < frag_count; ++frag_id) {
267- if (executor_->getConfig ()
268- .exec .interrupt .enable_non_kernel_time_query_interrupt &&
269- executor_->checkNonKernelTimeInterrupted ()) {
270- throw QueryExecutionError (Executor::ERR_INTERRUPTED);
271- }
272- std::list<std::shared_ptr<Chunk_NS::Chunk>> chunk_holder;
273- std::list<ChunkIter> chunk_iter_holder;
274- const auto & fragment = (*fragments)[frag_id];
275- if (fragment.isEmptyPhysicalFragment ()) {
276- continue ;
277- }
278- auto chunk_meta_it = fragment.getChunkMetadataMap ().find (col_id);
279- CHECK (chunk_meta_it != fragment.getChunkMetadataMap ().end ());
280- auto col_buffer = getOneTableColumnFragment (col_info,
281- static_cast <int >(frag_id),
282- all_tables_fragments,
283- chunk_holder,
284- chunk_iter_holder,
285- Data_Namespace::CPU_LEVEL,
286- int (0 ),
287- device_allocator);
288- column_frags.push_back (
289- std::make_unique<ColumnarResults>(executor_->row_set_mem_owner_ ,
290- col_buffer,
291- fragment.getNumTuples (),
292- chunk_meta_it->second ->type (),
293- thread_idx));
294- }
295- auto merged_results =
296- ColumnarResults::mergeResults (executor_->row_set_mem_owner_ , column_frags);
270+ if (column_it != columnarized_scan_table_cache_.end ()) {
271+ table_column = column_it->second .get ();
272+ return ColumnFetcher::transferColumnIfNeeded (
273+ table_column, 0 , memory_level, device_id, device_allocator);
274+ }
275+
276+ if (executor_->getConfig ().exec .interrupt .enable_non_kernel_time_query_interrupt &&
277+ executor_->checkNonKernelTimeInterrupted ()) {
278+ throw QueryExecutionError (Executor::ERR_INTERRUPTED);
279+ }
280+
281+ size_t total_row_count = 0 ;
282+ for (size_t frag_id = 0 ; frag_id < frag_count; ++frag_id) {
283+ const auto & fragment = (*fragments)[frag_id];
284+ const auto rows_in_frag = fragment.getNumTuples ();
285+ total_row_count += rows_in_frag;
286+ }
287+
288+ if (total_row_count == 0 ) {
289+ std::unique_ptr<ColumnarResults> merged_results (nullptr );
290+
297291 table_column = merged_results.get ();
298292 columnarized_scan_table_cache_.emplace (std::make_pair (table_id, col_id),
299293 std::move (merged_results));
300- } else {
301- table_column = column_it->second .get ();
294+
295+ return ColumnFetcher::transferColumnIfNeeded (
296+ table_column, 0 , memory_level, device_id, device_allocator);
297+ }
298+
299+ const auto type_width = col_info->type ->size ();
300+ auto write_ptr =
301+ executor_->row_set_mem_owner_ ->allocate (type_width * total_row_count);
302+ std::vector<std::pair<int8_t *, size_t >> write_ptrs;
303+ std::vector<size_t > valid_fragments;
304+ for (size_t frag_id = 0 ; frag_id < frag_count; ++frag_id) {
305+ const auto & fragment = (*fragments)[frag_id];
306+ if (fragment.isEmptyPhysicalFragment ()) {
307+ continue ;
308+ }
309+ CHECK_EQ (type_width, fragment.getChunkMetadataMap ().at (col_id)->type ()->size ());
310+ write_ptrs.push_back ({write_ptr, fragment.getNumTuples () * type_width});
311+ write_ptr += fragment.getNumTuples () * type_width;
312+ valid_fragments.push_back (frag_id);
302313 }
314+
315+ CHECK (!write_ptrs.empty ());
316+ size_t valid_frag_count = valid_fragments.size ();
317+ tbb::parallel_for (
318+ tbb::blocked_range<size_t >(0 , valid_frag_count),
319+ [&](const tbb::blocked_range<size_t >& frag_ids) {
320+ for (size_t v_frag_id = frag_ids.begin (); v_frag_id < frag_ids.end ();
321+ ++v_frag_id) {
322+ std::list<std::shared_ptr<Chunk_NS::Chunk>> chunk_holder;
323+ std::list<ChunkIter> chunk_iter_holder;
324+ size_t frag_id = valid_fragments[v_frag_id];
325+ const auto & fragment = (*fragments)[frag_id];
326+ auto chunk_meta_it = fragment.getChunkMetadataMap ().find (col_id);
327+ CHECK (chunk_meta_it != fragment.getChunkMetadataMap ().end ());
328+ std::shared_ptr<Chunk_NS::Chunk> chunk;
329+ {
330+ ChunkKey chunk_key{
331+ db_id, fragment.physicalTableId , col_id, fragment.fragmentId };
332+ chunk = data_provider_->getChunk (col_info,
333+ chunk_key,
334+ Data_Namespace::CPU_LEVEL,
335+ 0 ,
336+ chunk_meta_it->second ->numBytes (),
337+ chunk_meta_it->second ->numElements ());
338+ std::lock_guard<std::mutex> chunk_list_lock (chunk_list_mutex_);
339+ chunk_holder.push_back (chunk);
340+ }
341+ auto ab = chunk->getBuffer ();
342+ CHECK (ab->getMemoryPtr ());
343+ int8_t * col_buffer =
344+ ab->getMemoryPtr (); // @TODO(alex) change to use ChunkIter
345+ memcpy (write_ptrs[frag_id].first , col_buffer, write_ptrs[frag_id].second );
346+ }
347+ });
348+
349+ std::unique_ptr<ColumnarResults> merged_results (new ColumnarResults (
350+ {write_ptrs[0 ].first }, total_row_count, col_info->type , thread_idx));
351+
352+ table_column = merged_results.get ();
353+ columnarized_scan_table_cache_.emplace (std::make_pair (table_id, col_id),
354+ std::move (merged_results));
303355 }
304356 return ColumnFetcher::transferColumnIfNeeded (
305357 table_column, 0 , memory_level, device_id, device_allocator);
0 commit comments