Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,7 @@ backtrace = { git = "https://github.com/rust-lang/backtrace-rs.git", rev = "7226
color-eyre = { git = "https://github.com/eyre-rs/eyre.git", rev = "e5d92c3" }
deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "9954bff" }
display-more = { git = "https://github.com/databendlabs/display-more", tag = "v0.2.0" }
jsonb = { git = "https://github.com/b41sh/jsonb", rev = "ef5482fe9c07d87daa84115b86fd9af55dea277c" }
map-api = { git = "https://github.com/databendlabs/map-api", tag = "v0.4.2" }
openraft = { git = "https://github.com/databendlabs/openraft", tag = "v0.10.0-alpha.9" }
orc-rust = { git = "https://github.com/datafuse-extras/orc-rust", rev = "d82aa6d" }
Expand Down
15 changes: 15 additions & 0 deletions src/meta/proto-conv/src/schema_from_to_protobuf_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,13 @@ impl FromToProtoEnum for ex::VariantDataType {
variant_data_type::Dt::ArrayT(dt) => {
ex::VariantDataType::Array(Box::new(ex::VariantDataType::from_pb_enum(*dt)?))
}
variant_data_type::Dt::DecimalT(dt) => {
ex::VariantDataType::Decimal(ex::types::decimal::DecimalDataType::from_pb(dt)?)
}
variant_data_type::Dt::BinaryT(_) => ex::VariantDataType::Binary,
variant_data_type::Dt::DateT(_) => ex::VariantDataType::Date,
variant_data_type::Dt::TimestampT(_) => ex::VariantDataType::Timestamp,
variant_data_type::Dt::IntervalT(_) => ex::VariantDataType::Interval,
})
}

Expand All @@ -498,6 +505,14 @@ impl FromToProtoEnum for ex::VariantDataType {
VariantDataType::Array(dt) => {
pb::variant_data_type::Dt::ArrayT(Box::new(dt.to_pb_enum()?))
}
VariantDataType::Decimal(n) => {
let x = n.to_pb()?;
pb::variant_data_type::Dt::DecimalT(x)
}
VariantDataType::Binary => pb::variant_data_type::Dt::BinaryT(pb::Empty {}),
VariantDataType::Date => pb::variant_data_type::Dt::DateT(pb::Empty {}),
VariantDataType::Timestamp => pb::variant_data_type::Dt::TimestampT(pb::Empty {}),
VariantDataType::Interval => pb::variant_data_type::Dt::IntervalT(pb::Empty {}),
};

Ok(pb::VariantDataType { dt: Some(dt) })
Expand Down
5 changes: 5 additions & 0 deletions src/meta/protos/proto/virtual_schema.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ message VariantDataType {
Empty float64_t = 5;
Empty string_t = 6;
VariantDataType array_t = 7;
Decimal decimal_t = 8;
Empty binary_t = 9;
Empty date_t = 10;
Empty timestamp_t = 11;
Empty interval_t = 12;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ async fn test_fuse_do_refresh_virtual_column() -> Result<()> {
.default_session()
.get_settings()
.set_data_retention_time_in_days(0)?;
fixture
.default_session()
.get_settings()
.set_enable_experimental_virtual_column(1)?;
fixture.create_default_database().await?;
fixture.create_variant_table().await?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use std::str::FromStr;

use databend_common_base::base::tokio;
use databend_common_exception::Result;
use databend_common_expression::types::DecimalDataType;
use databend_common_expression::types::DecimalSize;
use databend_common_expression::types::Int32Type;
use databend_common_expression::types::VariantType;
use databend_common_expression::ColumnId;
Expand All @@ -32,11 +34,6 @@ use jsonb::OwnedJsonb;
#[tokio::test(flavor = "multi_thread")]
async fn test_virtual_column_builder() -> Result<()> {
let fixture = TestFixture::setup_with_custom(EESetup::new()).await?;

fixture
.default_session()
.get_settings()
.set_enable_experimental_virtual_column(1)?;
fixture.create_default_database().await?;
fixture.create_variant_table().await?;

Expand Down Expand Up @@ -247,7 +244,10 @@ async fn test_virtual_column_builder() -> Result<()> {
"['geo']['lat']",
)
.unwrap();
assert_eq!(meta_geo_lat.data_type, VariantDataType::Jsonb);
assert_eq!(
meta_geo_lat.data_type,
VariantDataType::Decimal(DecimalDataType::from(DecimalSize::new_unchecked(18, 1)))
);

let entries = vec![
Int32Type::from_data(vec![1, 2, 3, 4, 5, 6, 7, 8]).into(),
Expand Down Expand Up @@ -316,11 +316,6 @@ async fn test_virtual_column_builder() -> Result<()> {
#[tokio::test(flavor = "multi_thread")]
async fn test_virtual_column_builder_stream_write() -> Result<()> {
let fixture = TestFixture::setup_with_custom(EESetup::new()).await?;

fixture
.default_session()
.get_settings()
.set_enable_experimental_virtual_column(1)?;
fixture.create_default_database().await?;
fixture.create_variant_table().await?;

Expand Down
7 changes: 6 additions & 1 deletion src/query/expression/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ pub struct DataSchema {
pub(crate) metadata: BTreeMap<String, String>,
}

#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Ord, Serialize, Deserialize)]
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub enum VariantDataType {
Jsonb,
Boolean,
Expand All @@ -158,6 +158,11 @@ pub enum VariantDataType {
Float64,
String,
Array(Box<VariantDataType>),
Decimal(DecimalDataType),
Binary,
Date,
Timestamp,
Interval,
}

#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
Expand Down
2 changes: 2 additions & 0 deletions src/query/expression/src/types/decimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1541,6 +1541,8 @@ impl Decimal for i256 {
Copy,
PartialEq,
Eq,
PartialOrd,
Ord,
Hash,
Serialize,
Deserialize,
Expand Down
5 changes: 5 additions & 0 deletions src/query/expression/src/utils/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,11 @@ impl Display for VariantDataType {
VariantDataType::Float64 => write!(f, "Float64"),
VariantDataType::String => write!(f, "String"),
VariantDataType::Array(inner) => write!(f, "Array({inner})"),
VariantDataType::Decimal(inner) => write!(f, "Decimal({inner})"),
VariantDataType::Binary => write!(f, "Binary"),
VariantDataType::Date => write!(f, "Date"),
VariantDataType::Timestamp => write!(f, "Timestamp"),
VariantDataType::Interval => write!(f, "Interval"),
}
}
}
Expand Down
9 changes: 1 addition & 8 deletions src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1079,13 +1079,6 @@ impl DefaultSettings {
scope: SettingScope::Both,
range: Some(SettingRange::String(vec!["None".into(), "LZ4".into(), "ZSTD".into()])),
}),
("enable_refresh_virtual_column_after_write", DefaultSettingValue {
value: UserSettingValue::UInt64(1),
desc: "Refresh virtual column after new data written",
mode: SettingMode::Both,
scope: SettingScope::Both,
range: Some(SettingRange::Numeric(0..=1)),
}),
("enable_refresh_aggregating_index_after_write", DefaultSettingValue {
value: UserSettingValue::UInt64(1),
desc: "Refresh aggregating index after new data written",
Expand Down Expand Up @@ -1446,7 +1439,7 @@ impl DefaultSettings {
range: Some(SettingRange::Numeric(1..=1024)),
}),
("enable_experimental_virtual_column", DefaultSettingValue {
value: UserSettingValue::UInt64(0),
value: UserSettingValue::UInt64(1),
desc: "Enables experimental virtual column",
mode: SettingMode::Both,
scope: SettingScope::Both,
Expand Down
4 changes: 0 additions & 4 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -789,10 +789,6 @@ impl Settings {
}
}

pub fn get_enable_refresh_virtual_column_after_write(&self) -> Result<bool> {
Ok(self.try_get_u64("enable_refresh_virtual_column_after_write")? != 0)
}

pub fn get_enable_refresh_aggregating_index_after_write(&self) -> Result<bool> {
Ok(self.try_get_u64("enable_refresh_aggregating_index_after_write")? != 0)
}
Expand Down
56 changes: 53 additions & 3 deletions src/query/storages/common/table_meta/src/meta/v2/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ use std::sync::Arc;

use chrono::DateTime;
use chrono::Utc;
use databend_common_expression::types::i256;
use databend_common_expression::types::Decimal;
use databend_common_expression::types::DecimalDataType;
use databend_common_expression::types::DecimalSize;
use databend_common_expression::types::NumberDataType;
use databend_common_expression::BlockMetaInfo;
use databend_common_expression::BlockMetaInfoDowncast;
Expand Down Expand Up @@ -85,7 +89,16 @@ pub struct VirtualColumnMeta {
// 3 => int64
// 4 => float64
// 5 => string
// 6 => decimal64
// 7 => decimal128
// 8 => decimal256
// 9 => binary
// 10 => date
// 11 => timestamp
// 12 => interval
pub data_type: u8,
/// the scale size, only used for decimal type
pub scale: Option<u8>,
/// virtual column statistics.
pub column_stat: Option<ColumnStatistics>,
}
Expand All @@ -100,26 +113,63 @@ impl VirtualColumnMeta {
}

pub fn data_type(&self) -> TableDataType {
let scale = self.scale.unwrap_or_default();
match self.data_type {
1 => TableDataType::Nullable(Box::new(TableDataType::Boolean)),
2 => TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt64))),
3 => TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::Int64))),
4 => TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::Float64))),
5 => TableDataType::Nullable(Box::new(TableDataType::String)),
6 => {
let size = DecimalSize::new_unchecked(i64::MAX_PRECISION, scale);
TableDataType::Nullable(Box::new(TableDataType::Decimal(DecimalDataType::from(
size,
))))
}
7 => {
let size = DecimalSize::new_unchecked(i128::MAX_PRECISION, scale);
TableDataType::Nullable(Box::new(TableDataType::Decimal(DecimalDataType::from(
size,
))))
}
8 => {
let size = DecimalSize::new_unchecked(i256::MAX_PRECISION, scale);
TableDataType::Nullable(Box::new(TableDataType::Decimal(DecimalDataType::from(
size,
))))
}
9 => TableDataType::Nullable(Box::new(TableDataType::Binary)),
10 => TableDataType::Nullable(Box::new(TableDataType::Date)),
11 => TableDataType::Nullable(Box::new(TableDataType::Timestamp)),
12 => TableDataType::Nullable(Box::new(TableDataType::Interval)),
_ => TableDataType::Nullable(Box::new(TableDataType::Variant)),
}
}

pub fn data_type_code(variant_type: &VariantDataType) -> u8 {
match variant_type {
pub fn data_type_code(variant_type: &VariantDataType) -> (u8, Option<u8>) {
let ty = match variant_type {
VariantDataType::Jsonb => 0,
VariantDataType::Boolean => 1,
VariantDataType::UInt64 => 2,
VariantDataType::Int64 => 3,
VariantDataType::Float64 => 4,
VariantDataType::String => 5,
VariantDataType::Decimal(ty) => match ty {
DecimalDataType::Decimal64(_) => 6,
DecimalDataType::Decimal128(_) => 7,
DecimalDataType::Decimal256(_) => 8,
},
VariantDataType::Binary => 9,
VariantDataType::Date => 10,
VariantDataType::Timestamp => 11,
VariantDataType::Interval => 12,
_ => unreachable!(),
}
};
let scale = match variant_type {
VariantDataType::Decimal(ty) => Some(ty.scale()),
_ => None,
};
(ty, scale)
}
}

Expand Down
1 change: 1 addition & 0 deletions src/query/storages/fuse/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ edition = { workspace = true }

databend-common-base = { workspace = true }
databend-common-catalog = { workspace = true }
databend-common-column = { workspace = true }
databend-common-exception = { workspace = true }
databend-common-expression = { workspace = true }
databend-common-functions = { workspace = true }
Expand Down
7 changes: 1 addition & 6 deletions src/query/storages/fuse/src/io/write/stream/block_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,12 +475,7 @@ impl StreamBlockProperties {
.collect::<HashSet<_>>();

let inverted_index_builders = create_inverted_index_builders(&table.table_info.meta);
let virtual_column_builder = if ctx
.get_settings()
.get_enable_refresh_virtual_column_after_write()
.unwrap_or_default()
&& table.support_virtual_columns()
{
let virtual_column_builder = if table.support_virtual_columns() {
VirtualColumnBuilder::try_create(ctx.clone(), source_schema.clone()).ok()
} else {
None
Expand Down
Loading
Loading