Skip to content

Commit d47563c

Browse files
ccciudatuaditanase
authored andcommitted
[HSTACK] Register log store on next::DeltaScan::scan_with_args
1 parent d669b75 commit d47563c

1 file changed

Lines changed: 15 additions & 2 deletions

File tree

  • crates/core/src/delta_datafusion/table_provider/next

crates/core/src/delta_datafusion/table_provider/next/mod.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,14 @@ use datafusion::{
4040
physical_plan::ExecutionPlan,
4141
};
4242
use datafusion::catalog::{ScanArgs, ScanResult};
43+
use datafusion::error::DataFusionError;
4344
use delta_kernel::table_configuration::TableConfiguration;
4445
use serde::{Deserialize, Serialize};
45-
46+
use url::Url;
47+
use crate::delta_datafusion::engine::AsObjectStoreUrl;
4648
pub use self::scan::{DeltaScanExec, DeltaNextPhysicalCodec};
4749
pub(crate) use self::scan::KernelScanPlan;
48-
use crate::delta_datafusion::DeltaScanConfig;
50+
use crate::delta_datafusion::{DeltaScanConfig, DeltaSessionExt};
4951
use crate::delta_datafusion::engine::DataFusionEngine;
5052
use crate::delta_datafusion::table_provider::TableProviderBuilder;
5153
use crate::kernel::{EagerSnapshot, Snapshot};
@@ -54,6 +56,7 @@ mod scan;
5456

5557
/// Default column name for the file id column we add to files read from disk.
5658
pub(crate) use crate::delta_datafusion::file_id::FILE_ID_COLUMN_DEFAULT;
59+
use crate::logstore::{object_store_factories, StorageConfig};
5760

5861
#[derive(Clone, Debug, Serialize, Deserialize)]
5962
pub enum SnapshotWrapper {
@@ -219,6 +222,16 @@ impl TableProvider for DeltaScan {
219222

220223
async fn scan_with_args<'a>(&self, state: &dyn Session, args: ScanArgs<'a>) -> Result<ScanResult> {
221224
let engine = DataFusionEngine::new_from_session(state);
225+
let table_uri = self.snapshot.table_configuration().table_root();
226+
if state.runtime_env().object_store(table_uri.as_object_store_url()).is_err() {
227+
let url_key = Url::parse(&format!("{}://", table_uri.scheme()))
228+
.map_err(|e| DataFusionError::External(Box::new(e)))?;
229+
if let Some(entry) = object_store_factories().get(&url_key) {
230+
let storage_config = StorageConfig::parse_options(&self.snapshot.snapshot().load_config().options)?;
231+
let (store, _) = entry.value().parse_url_opts(table_uri, &storage_config)?;
232+
state.runtime_env().register_object_store(table_uri, store);
233+
}
234+
}
222235

223236
// Filter out file_id column from projection if present
224237
let file_id_idx = self

0 commit comments

Comments
 (0)