Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 126 additions & 19 deletions datafusion-pg-catalog/src/pg_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use std::sync::atomic::AtomicU32;

use async_trait::async_trait;
use datafusion::arrow::array::{
ArrayRef, AsArray, BooleanBuilder, Int32Builder, RecordBatch, StringArray, StringBuilder,
as_boolean_array,
ArrayRef, AsArray, BooleanArray, BooleanBuilder, Int32Builder, RecordBatch, StringArray,
StringBuilder, as_boolean_array,
};
use datafusion::arrow::datatypes::{DataType, Field, Int32Type, SchemaRef};
use datafusion::arrow::ipc::reader::FileReader;
Expand Down Expand Up @@ -35,12 +35,16 @@ pub mod pg_attribute;
pub mod pg_class;
pub mod pg_database;
pub mod pg_get_expr_udf;
pub mod pg_locks;
pub mod pg_namespace;
pub mod pg_replication_slot;
pub mod pg_roles;
pub mod pg_settings;
pub mod pg_stat_gssapi;
pub mod pg_stat_ssl;
pub mod pg_tables;
pub mod pg_tablespace;
pub mod pg_timezone;
pub mod pg_views;
pub mod quote_ident_udf;

Expand Down Expand Up @@ -103,6 +107,9 @@ const PG_CATALOG_TABLE_PG_STATISTIC_EXT_DATA: &str = "pg_statistic_ext_data";
const PG_CATALOG_TABLE_PG_SUBSCRIPTION: &str = "pg_subscription";
const PG_CATALOG_TABLE_PG_SUBSCRIPTION_REL: &str = "pg_subscription_rel";
const PG_CATALOG_TABLE_PG_TABLESPACE: &str = "pg_tablespace";
const PG_CATALOG_TABLE_PG_LOCKS: &str = "pg_locks";
const PG_CATALOG_VIEW_PG_TIMEZONE_NAMES: &str = "pg_timezone_names";
const PG_CATALOG_VIEW_PG_TIMEZONE_ABBREVS: &str = "pg_timezone_abbrevs";
const PG_CATALOG_TABLE_PG_TRIGGER: &str = "pg_trigger";
const PG_CATALOG_TABLE_PG_USER_MAPPING: &str = "pg_user_mapping";
const PG_CATALOG_VIEW_PG_SETTINGS: &str = "pg_settings";
Expand All @@ -111,6 +118,7 @@ const PG_CATALOG_VIEW_PG_MATVIEWS: &str = "pg_matviews";
const PG_CATALOG_VIEW_PG_ROLES: &str = "pg_roles";
const PG_CATALOG_VIEW_PG_TABLES: &str = "pg_tables";
const PG_CATALOG_VIEW_PG_STAT_GSSAPI: &str = "pg_stat_gssapi";
const PG_CATALOG_VIEW_PG_STAT_SSL: &str = "pg_stat_ssl";
const PG_CATALOG_VIEW_PG_STAT_USER_TABLES: &str = "pg_stat_user_tables";
const PG_CATALOG_VIEW_PG_REPLICATION_SLOTS: &str = "pg_replication_slots";

Expand Down Expand Up @@ -174,16 +182,20 @@ pub const PG_CATALOG_TABLES: &[&str] = &[
PG_CATALOG_TABLE_PG_SUBSCRIPTION,
PG_CATALOG_TABLE_PG_SUBSCRIPTION_REL,
PG_CATALOG_TABLE_PG_TABLESPACE,
PG_CATALOG_TABLE_PG_LOCKS,
PG_CATALOG_TABLE_PG_TRIGGER,
PG_CATALOG_TABLE_PG_USER_MAPPING,
PG_CATALOG_VIEW_PG_ROLES,
PG_CATALOG_VIEW_PG_SETTINGS,
PG_CATALOG_VIEW_PG_STAT_GSSAPI,
PG_CATALOG_VIEW_PG_STAT_SSL,
PG_CATALOG_VIEW_PG_TABLES,
PG_CATALOG_VIEW_PG_VIEWS,
PG_CATALOG_VIEW_PG_MATVIEWS,
PG_CATALOG_VIEW_PG_STAT_USER_TABLES,
PG_CATALOG_VIEW_PG_REPLICATION_SLOTS,
PG_CATALOG_VIEW_PG_TIMEZONE_NAMES,
PG_CATALOG_VIEW_PG_TIMEZONE_ABBREVS,
];

#[derive(Debug, Hash, Eq, PartialEq, PartialOrd, Ord)]
Expand Down Expand Up @@ -364,7 +376,8 @@ impl<C: CatalogInfo, P: PgCatalogContextProvider> PgCatalogSchemaProvider<C, P>
Ok(Some(self.static_tables.pg_subscription_rel.clone().into()))
}
PG_CATALOG_TABLE_PG_TABLESPACE => {
Ok(Some(self.static_tables.pg_tablespace.clone().into()))
let table = Arc::new(pg_tablespace::PgTablespaceTable::new());
Ok(Some(PgCatalogTable::Dynamic(table)))
}
PG_CATALOG_TABLE_PG_TRIGGER => Ok(Some(self.static_tables.pg_trigger.clone().into())),
PG_CATALOG_TABLE_PG_USER_MAPPING => {
Expand Down Expand Up @@ -416,6 +429,10 @@ impl<C: CatalogInfo, P: PgCatalogContextProvider> PgCatalogSchemaProvider<C, P>
let table = Arc::new(pg_stat_gssapi::PgStatGssApiTable::new());
Ok(Some(PgCatalogTable::Dynamic(table)))
}
PG_CATALOG_VIEW_PG_STAT_SSL => {
let table = Arc::new(pg_stat_ssl::PgStatSslTable::new());
Ok(Some(PgCatalogTable::Dynamic(table)))
}
PG_CATALOG_VIEW_PG_ROLES => {
let table = Arc::new(pg_roles::PgRolesTable::new(self.context_provider.clone()));
Ok(Some(PgCatalogTable::Dynamic(table)))
Expand All @@ -427,6 +444,11 @@ impl<C: CatalogInfo, P: PgCatalogContextProvider> PgCatalogSchemaProvider<C, P>
PG_CATALOG_VIEW_PG_REPLICATION_SLOTS => {
Ok(Some(pg_replication_slot::pg_replication_slots().into()))
}
PG_CATALOG_TABLE_PG_LOCKS => Ok(Some(pg_locks::pg_locks().into())),
PG_CATALOG_VIEW_PG_TIMEZONE_NAMES => Ok(Some(pg_timezone::pg_timezone_names().into())),
PG_CATALOG_VIEW_PG_TIMEZONE_ABBREVS => {
Ok(Some(pg_timezone::pg_timezone_abbrevs().into()))
}

_ => Ok(None),
}
Expand Down Expand Up @@ -588,7 +610,6 @@ pub struct PgCatalogStaticTables {
pub pg_statistic_ext_data: Arc<ArrowTable>,
pub pg_subscription: Arc<ArrowTable>,
pub pg_subscription_rel: Arc<ArrowTable>,
pub pg_tablespace: Arc<ArrowTable>,
pub pg_trigger: Arc<ArrowTable>,
pub pg_user_mapping: Arc<ArrowTable>,

Expand Down Expand Up @@ -977,13 +998,6 @@ impl PgCatalogStaticTables {
))
.to_vec(),
)?,
pg_tablespace: Self::create_arrow_table(
include_bytes!(concat!(
env!("CARGO_MANIFEST_DIR"),
"/pg_catalog_arrow_exports/pg_tablespace.feather"
))
.to_vec(),
)?,
pg_trigger: Self::create_arrow_table(
include_bytes!(concat!(
env!("CARGO_MANIFEST_DIR"),
Expand Down Expand Up @@ -1426,6 +1440,102 @@ pub fn create_pg_get_partition_ancestors_udf() -> ScalarUDF {
)
}

pub fn create_pg_postmaster_start_time_udf() -> ScalarUDF {
let func = move |_args: &[ColumnarValue]| {
let mut builder = datafusion::arrow::array::TimestampNanosecondBuilder::new()
.with_data_type(DataType::Timestamp(
datafusion::arrow::datatypes::TimeUnit::Nanosecond,
Some(Arc::from("UTC")),
));
builder.append_value(1_700_000_000_000_000_000);
let array: ArrayRef = Arc::new(builder.finish());
Ok(ColumnarValue::Array(array))
};

create_udf(
"pg_postmaster_start_time",
vec![],
DataType::Timestamp(
datafusion::arrow::datatypes::TimeUnit::Nanosecond,
Some(Arc::from("UTC")),
),
Volatility::Stable,
Arc::new(func),
)
}

pub fn create_pg_is_in_recovery_udf() -> ScalarUDF {
let func = move |_args: &[ColumnarValue]| {
let array: ArrayRef = Arc::new(BooleanArray::from(vec![false]));
Ok(ColumnarValue::Array(array))
};

create_udf(
"pg_is_in_recovery",
vec![],
DataType::Boolean,
Volatility::Stable,
Arc::new(func),
)
}

pub fn create_txid_current_udf() -> ScalarUDF {
let func = move |_args: &[ColumnarValue]| {
let array: ArrayRef = Arc::new(datafusion::arrow::array::Int64Array::from(vec![1i64]));
Ok(ColumnarValue::Array(array))
};

create_udf(
"txid_current",
vec![],
DataType::Int64,
Volatility::Stable,
Arc::new(func),
)
}

pub fn create_pg_tablespace_location_udf() -> ScalarUDF {
let func = move |args: &[ColumnarValue]| {
let args = ColumnarValue::values_to_arrays(args)?;
let input = &args[0];
let mut builder = StringBuilder::new();
for _ in 0..input.len() {
builder.append_null();
}
let array: ArrayRef = Arc::new(builder.finish());
Ok(ColumnarValue::Array(array))
};

create_udf(
"pg_tablespace_location",
vec![DataType::Int32],
DataType::Utf8,
Volatility::Stable,
Arc::new(func),
)
}

pub fn create_age_udf() -> ScalarUDF {
let func = move |args: &[ColumnarValue]| {
let args = ColumnarValue::values_to_arrays(args)?;
let input = &args[0];
let mut builder = datafusion::arrow::array::Int64Builder::new();
for _ in 0..input.len() {
builder.append_value(0);
}
let array: ArrayRef = Arc::new(builder.finish());
Ok(ColumnarValue::Array(array))
};

create_udf(
"age",
vec![DataType::Utf8],
DataType::Int64,
Volatility::Stable,
Arc::new(func),
)
}

/// Install pg_catalog and postgres UDFs to current `SessionContext`
pub fn setup_pg_catalog<P>(
session_context: &SessionContext,
Expand Down Expand Up @@ -1484,6 +1594,11 @@ where
session_context.register_udf(create_pg_get_partition_ancestors_udf());
session_context.register_udf(quote_ident_udf::create_quote_ident_udf());
session_context.register_udf(quote_ident_udf::create_parse_ident_udf());
session_context.register_udf(create_pg_postmaster_start_time_udf());
session_context.register_udf(create_pg_is_in_recovery_udf());
session_context.register_udf(create_txid_current_udf());
session_context.register_udf(create_pg_tablespace_location_udf());
session_context.register_udf(create_age_udf());

Ok(())
}
Expand Down Expand Up @@ -1942,14 +2057,6 @@ mod test {
.to_vec(),
)
.expect("Failed to load ipc data");
let _ = ArrowTable::from_ipc_data(
include_bytes!(concat!(
env!("CARGO_MANIFEST_DIR"),
"/pg_catalog_arrow_exports/pg_tablespace.feather"
))
.to_vec(),
)
.expect("Failed to load ipc data");
let _ = ArrowTable::from_ipc_data(
include_bytes!(concat!(
env!("CARGO_MANIFEST_DIR"),
Expand Down
28 changes: 20 additions & 8 deletions datafusion-pg-catalog/src/pg_catalog/pg_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};

use datafusion::arrow::array::{
ArrayRef, BooleanArray, Int32Array, ListArray, RecordBatch, StringArray,
ArrayRef, BooleanArray, Int32Array, ListBuilder, RecordBatch, StringArray, StringBuilder,
};
use datafusion::arrow::datatypes::{DataType, Field, Int32Type, Schema, SchemaRef};
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::error::Result;
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
Expand Down Expand Up @@ -52,9 +52,9 @@ impl<C: CatalogInfo> PgDatabaseTable<C> {
Field::new("daticurules", DataType::Utf8, true),
Field::new(
"datacl",
DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
true,
), // Access privileges
),
]));

Self {
Expand Down Expand Up @@ -84,7 +84,7 @@ impl<C: CatalogInfo> PgDatabaseTable<C> {
let mut dattablespaces = Vec::new();
let mut daticulocales: Vec<Option<String>> = Vec::new();
let mut daticurules: Vec<Option<String>> = Vec::new();
let mut datacls: Vec<Option<Vec<Option<i32>>>> = Vec::new();
let mut datacls: Vec<Option<Vec<Option<String>>>> = Vec::new();

// to store all schema-oid mapping temporarily before adding to global oid cache
let mut catalog_oid_cache = HashMap::new();
Expand Down Expand Up @@ -169,9 +169,21 @@ impl<C: CatalogInfo> PgDatabaseTable<C> {
Arc::new(Int32Array::from(dattablespaces)),
Arc::new(StringArray::from(daticulocales)),
Arc::new(StringArray::from(daticurules)),
Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(
datacls.into_iter(),
)),
Arc::new({
let mut builder = ListBuilder::new(StringBuilder::new());
for acl in &datacls {
match acl {
Some(items) => {
for item in items {
builder.values().append_option(item.as_deref());
}
builder.append(true);
}
None => builder.append(false),
}
}
builder.finish()
}),
];

// Create a full record batch
Expand Down
33 changes: 33 additions & 0 deletions datafusion-pg-catalog/src/pg_catalog/pg_locks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use crate::pg_catalog::empty_table::EmptyTable;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use std::sync::Arc;

pub(crate) fn pg_locks() -> EmptyTable {
let schema = Arc::new(Schema::new(vec![
Field::new("locktype", DataType::Utf8, true),
Field::new("database", DataType::Int32, true),
Field::new("relation", DataType::Int32, true),
Field::new("page", DataType::Int32, true),
Field::new("tuple", DataType::Int16, true),
Field::new("virtualxid", DataType::Utf8, true),
Field::new("transactionid", DataType::Utf8, true),
Field::new("classid", DataType::Int32, true),
Field::new("objid", DataType::Int32, true),
Field::new("objsubid", DataType::Int16, true),
Field::new("virtualtransaction", DataType::Utf8, true),
Field::new("pid", DataType::Int32, true),
Field::new("mode", DataType::Utf8, true),
Field::new("granted", DataType::Boolean, true),
Field::new("fastpath", DataType::Boolean, true),
Field::new(
"waitstart",
DataType::Timestamp(
datafusion::arrow::datatypes::TimeUnit::Microsecond,
Some(Arc::from("UTC")),
),
true,
),
]));

EmptyTable::new(schema)
}
19 changes: 9 additions & 10 deletions datafusion-pg-catalog/src/pg_catalog/pg_namespace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,13 @@ impl<C: CatalogInfo> PgNamespaceTable<C> {
oid_counter: Arc<AtomicU32>,
oid_cache: Arc<RwLock<HashMap<OidCacheKey, Oid>>>,
) -> Self {
// Define the schema for pg_namespace
// This matches the columns from PostgreSQL's pg_namespace
let schema = Arc::new(Schema::new(vec![
Field::new("oid", DataType::Int32, false), // Object identifier
Field::new("nspname", DataType::Utf8, false), // Name of the namespace (schema)
Field::new("nspowner", DataType::Int32, false), // Owner of the namespace
Field::new("nspacl", DataType::Utf8, true), // Access privileges
Field::new("options", DataType::Utf8, true), // Schema-level options
Field::new("oid", DataType::Int32, false),
Field::new("xmin", DataType::Int32, true),
Field::new("nspname", DataType::Utf8, false),
Field::new("nspowner", DataType::Int32, false),
Field::new("nspacl", DataType::Utf8, true),
Field::new("options", DataType::Utf8, true),
]));

Self {
Expand All @@ -47,10 +46,9 @@ impl<C: CatalogInfo> PgNamespaceTable<C> {
}
}

/// Generate record batches based on the current state of the catalog
async fn get_data(this: Self) -> Result<RecordBatch> {
// Vectors to store column data
let mut oids = Vec::new();
let mut xmins = Vec::new();
let mut nspnames = Vec::new();
let mut nspowners = Vec::new();
let mut nspacls: Vec<Option<String>> = Vec::new();
Expand All @@ -74,6 +72,7 @@ impl<C: CatalogInfo> PgNamespaceTable<C> {
schema_oid_cache.insert(cache_key, schema_oid);

oids.push(schema_oid as i32);
xmins.push(Some(1i32));
nspnames.push(schema_name.clone());
nspowners.push(10); // Default owner
nspacls.push(None);
Expand All @@ -92,9 +91,9 @@ impl<C: CatalogInfo> PgNamespaceTable<C> {
// add new schema cache
oid_cache.extend(schema_oid_cache);

// Create Arrow arrays from the collected data
let arrays: Vec<ArrayRef> = vec![
Arc::new(Int32Array::from(oids)),
Arc::new(Int32Array::from(xmins)),
Arc::new(StringArray::from(nspnames)),
Arc::new(Int32Array::from(nspowners)),
Arc::new(StringArray::from_iter(nspacls.into_iter())),
Expand Down
Loading
Loading