1616
1717#include " QueryEngine/ColumnFetcher.h"
1818
19- #include < memory>
20-
2119#include " DataMgr/ArrayNoneEncoder.h"
2220#include " QueryEngine/ErrorHandling.h"
2321#include " QueryEngine/Execute.h"
2422#include " Shared/Intervals.h"
2523#include " Shared/likely.h"
2624#include " Shared/sqltypes.h"
2725
26+ #include < tbb/parallel_for.h>
27+ #include < memory>
28+
2829namespace {
2930
3031std::string getMemoryLevelString (Data_Namespace::MemoryLevel memoryLevel) {
@@ -239,16 +240,17 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments(
239240 int db_id = col_info->db_id ;
240241 int table_id = col_info->table_id ;
241242 int col_id = col_info->column_id ;
243+
242244 const auto fragments_it = all_tables_fragments.find ({db_id, table_id});
243245 CHECK (fragments_it != all_tables_fragments.end ());
246+
244247 const auto fragments = fragments_it->second ;
245248 const auto frag_count = fragments->size ();
246249 std::vector<std::unique_ptr<ColumnarResults>> column_frags;
247250 const ColumnarResults* table_column = nullptr ;
248251 const InputDescriptor table_desc (db_id, table_id, int (0 ));
249252 {
250253 std::lock_guard<std::mutex> columnar_conversion_guard (columnar_fetch_mutex_);
251-
252254 auto col_token = data_provider_->getZeroCopyColumnData (*col_info);
253255 if (col_token != nullptr ) {
254256 size_t num_rows = col_token->getSize () / col_token->getType ()->size ();
@@ -262,44 +264,111 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments(
262264 }
263265
264266 auto column_it = columnarized_scan_table_cache_.find ({table_id, col_id});
265- if (column_it == columnarized_scan_table_cache_.end ()) {
266- for (size_t frag_id = 0 ; frag_id < frag_count; ++frag_id) {
267- if (executor_->getConfig ()
268- .exec .interrupt .enable_non_kernel_time_query_interrupt &&
269- executor_->checkNonKernelTimeInterrupted ()) {
270- throw QueryExecutionError (Executor::ERR_INTERRUPTED);
271- }
272- std::list<std::shared_ptr<Chunk_NS::Chunk>> chunk_holder;
273- std::list<ChunkIter> chunk_iter_holder;
274- const auto & fragment = (*fragments)[frag_id];
275- if (fragment.isEmptyPhysicalFragment ()) {
276- continue ;
277- }
278- auto chunk_meta_it = fragment.getChunkMetadataMap ().find (col_id);
279- CHECK (chunk_meta_it != fragment.getChunkMetadataMap ().end ());
280- auto col_buffer = getOneTableColumnFragment (col_info,
281- static_cast <int >(frag_id),
282- all_tables_fragments,
283- chunk_holder,
284- chunk_iter_holder,
285- Data_Namespace::CPU_LEVEL,
286- int (0 ),
287- device_allocator);
288- column_frags.push_back (
289- std::make_unique<ColumnarResults>(executor_->row_set_mem_owner_ ,
290- col_buffer,
291- fragment.getNumTuples (),
292- chunk_meta_it->second ->type (),
293- thread_idx));
267+ if (column_it != columnarized_scan_table_cache_.end ()) {
268+ table_column = column_it->second .get ();
269+ return ColumnFetcher::transferColumnIfNeeded (
270+ table_column, 0 , memory_level, device_id, device_allocator);
271+ }
272+
273+ size_t total_row_count = 0 ;
274+ for (size_t frag_id = 0 ; frag_id < frag_count; ++frag_id) {
275+ if (executor_->getConfig ().exec .interrupt .enable_non_kernel_time_query_interrupt &&
276+ executor_->checkNonKernelTimeInterrupted ()) {
277+ throw QueryExecutionError (Executor::ERR_INTERRUPTED);
294278 }
295- auto merged_results =
296- ColumnarResults::mergeResults (executor_->row_set_mem_owner_ , column_frags);
279+ const auto & fragment = (*fragments)[frag_id];
280+ const auto & rows_in_frag = fragment.getNumTuples ();
281+ total_row_count += rows_in_frag;
282+ }
283+
284+ const auto & type_width = col_info->type ->size ();
285+ auto write_ptr =
286+ executor_->row_set_mem_owner_ ->allocate (type_width * total_row_count);
287+ std::vector<std::pair<int8_t *, size_t >> write_ptrs;
288+ std::vector<size_t > valid_fragments;
289+ for (size_t frag_id = 0 ; frag_id < frag_count; ++frag_id) {
290+ const auto & fragment = (*fragments)[frag_id];
291+ if (fragment.isEmptyPhysicalFragment ()) {
292+ continue ;
293+ }
294+ CHECK_EQ (type_width, fragment.getChunkMetadataMap ().at (col_id)->type ()->size ());
295+ write_ptrs.push_back ({write_ptr, fragment.getNumTuples () * type_width});
296+ write_ptr += fragment.getNumTuples () * type_width;
297+ valid_fragments.push_back (frag_id);
298+ }
299+
300+ if (write_ptrs.empty ()) {
301+ std::unique_ptr<ColumnarResults> merged_results (nullptr );
302+
297303 table_column = merged_results.get ();
298304 columnarized_scan_table_cache_.emplace (std::make_pair (table_id, col_id),
299305 std::move (merged_results));
300- } else {
301- table_column = column_it->second .get ();
306+
307+ return ColumnFetcher::transferColumnIfNeeded (
308+ table_column, 0 , memory_level, device_id, device_allocator);
302309 }
310+
311+ size_t valid_frag_count = valid_fragments.size ();
312+ tbb::parallel_for (
313+ tbb::blocked_range<size_t >(0 , valid_frag_count),
314+ [&](const tbb::blocked_range<size_t >& frag_ids) {
315+ for (size_t v_frag_id = frag_ids.begin (); v_frag_id < frag_ids.end ();
316+ ++v_frag_id) {
317+ std::list<std::shared_ptr<Chunk_NS::Chunk>> chunk_holder;
318+ std::list<ChunkIter> chunk_iter_holder;
319+ size_t frag_id = valid_fragments[v_frag_id];
320+ const auto & fragment = (*fragments)[frag_id];
321+ auto chunk_meta_it = fragment.getChunkMetadataMap ().find (col_id);
322+ CHECK (chunk_meta_it != fragment.getChunkMetadataMap ().end ());
323+ std::shared_ptr<Chunk_NS::Chunk> chunk;
324+ // Fixed length arrays are also included here.
325+ const bool is_varlen =
326+ col_info->type ->isString () || col_info->type ->isArray ();
327+ int8_t * col_buffer;
328+ {
329+ ChunkKey chunk_key{
330+ db_id, fragment.physicalTableId , col_id, fragment.fragmentId };
331+ std::unique_ptr<std::lock_guard<std::mutex>> varlen_chunk_lock;
332+ if (is_varlen) {
333+ varlen_chunk_lock.reset (
334+ new std::lock_guard<std::mutex>(varlen_chunk_fetch_mutex_));
335+ }
336+ chunk = data_provider_->getChunk (col_info,
337+ chunk_key,
338+ Data_Namespace::CPU_LEVEL,
339+ 0 ,
340+ chunk_meta_it->second ->numBytes (),
341+ chunk_meta_it->second ->numElements ());
342+ std::lock_guard<std::mutex> chunk_list_lock (chunk_list_mutex_);
343+ chunk_holder.push_back (chunk);
344+ }
345+ if (is_varlen) {
346+ CHECK_GT (table_id, 0 );
347+ CHECK (chunk_meta_it != fragment.getChunkMetadataMap ().end ());
348+ chunk_iter_holder.push_back (chunk->begin_iterator (chunk_meta_it->second ));
349+ auto & chunk_iter = chunk_iter_holder.back ();
350+ col_buffer = reinterpret_cast <int8_t *>(&chunk_iter);
351+ } else {
352+ auto ab = chunk->getBuffer ();
353+ CHECK (ab->getMemoryPtr ());
354+ col_buffer = ab->getMemoryPtr (); // @TODO(alex) change to use ChunkIter
355+ }
356+ memcpy (write_ptrs[frag_id].first , col_buffer, write_ptrs[frag_id].second );
357+ }
358+ });
359+
360+ std::vector<int8_t *> raw_write_ptrs;
361+ raw_write_ptrs.reserve (frag_count);
362+ for (size_t i = 0 ; i < frag_count; i++) {
363+ raw_write_ptrs.emplace_back (write_ptrs[i].first );
364+ }
365+
366+ std::unique_ptr<ColumnarResults> merged_results (new ColumnarResults (
367+ std::move (raw_write_ptrs), total_row_count, col_info->type , thread_idx));
368+
369+ table_column = merged_results.get ();
370+ columnarized_scan_table_cache_.emplace (std::make_pair (table_id, col_id),
371+ std::move (merged_results));
303372 }
304373 return ColumnFetcher::transferColumnIfNeeded (
305374 table_column, 0 , memory_level, device_id, device_allocator);
0 commit comments