Skip to content

Commit c8a0745

Browse files
committed
Use bulk_insert_iterator for aggragation clauses
1 parent d7adb85 commit c8a0745

File tree

3 files changed

+82
-65
lines changed

3 files changed

+82
-65
lines changed

cpp/arcticdb/processing/unsorted_aggregation.cpp

Lines changed: 62 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -430,31 +430,36 @@ void MeanAggregatorData::aggregate(
430430
) {
431431
fractions_.resize(unique_values);
432432
sparse_map_.resize(unique_values);
433-
details::visit_type(input_column.column_->type().data_type(), [&input_column, &groups, this](auto col_tag) {
434-
using col_type_info = ScalarTypeInfo<decltype(col_tag)>;
435-
if constexpr (is_sequence_type(col_type_info::data_type)) {
436-
util::raise_rte("String aggregations not currently supported");
437-
} else if constexpr (is_empty_type(col_type_info::data_type)) {
438-
return;
439-
}
440-
Column::for_each_enumerated<typename col_type_info::TDT>(
441-
*input_column.column_,
442-
[&groups, this](auto enumerating_it) {
443-
auto& fraction = fractions_[groups[enumerating_it.idx()]];
444-
if constexpr ((is_floating_point_type(col_type_info ::data_type))) {
445-
if (ARCTICDB_LIKELY(!std::isnan(enumerating_it.value()))) {
446-
fraction.numerator_ += static_cast<double>(enumerating_it.value());
447-
++fraction.denominator_;
448-
sparse_map_.set(groups[enumerating_it.idx()]);
449-
}
450-
} else {
451-
fraction.numerator_ += static_cast<double>(enumerating_it.value());
452-
++fraction.denominator_;
453-
sparse_map_.set(groups[enumerating_it.idx()]);
454-
}
433+
util::BitSet::bulk_insert_iterator inserter(sparse_map_);
434+
details::visit_type(
435+
input_column.column_->type().data_type(),
436+
[&input_column, &groups, &inserter, this](auto col_tag) {
437+
using col_type_info = ScalarTypeInfo<decltype(col_tag)>;
438+
if constexpr (is_sequence_type(col_type_info::data_type)) {
439+
util::raise_rte("String aggregations not currently supported");
440+
} else if constexpr (is_empty_type(col_type_info::data_type)) {
441+
return;
455442
}
456-
);
457-
});
443+
Column::for_each_enumerated<typename col_type_info::TDT>(
444+
*input_column.column_,
445+
[&groups, &inserter, this](auto enumerating_it) {
446+
auto& fraction = fractions_[groups[enumerating_it.idx()]];
447+
if constexpr ((is_floating_point_type(col_type_info ::data_type))) {
448+
if (ARCTICDB_LIKELY(!std::isnan(enumerating_it.value()))) {
449+
fraction.numerator_ += static_cast<double>(enumerating_it.value());
450+
++fraction.denominator_;
451+
inserter = groups[enumerating_it.idx()];
452+
}
453+
} else {
454+
fraction.numerator_ += static_cast<double>(enumerating_it.value());
455+
++fraction.denominator_;
456+
inserter = groups[enumerating_it.idx()];
457+
}
458+
}
459+
);
460+
}
461+
);
462+
inserter.flush();
458463
}
459464

460465
SegmentInMemory MeanAggregatorData::finalize(const ColumnName& output_column_name, bool, size_t unique_values) {
@@ -500,25 +505,30 @@ void CountAggregatorData::aggregate(
500505
) {
501506
aggregated_.resize(unique_values);
502507
sparse_map_.resize(unique_values);
503-
details::visit_type(input_column.column_->type().data_type(), [&input_column, &groups, this](auto col_tag) {
504-
using col_type_info = ScalarTypeInfo<decltype(col_tag)>;
505-
Column::for_each_enumerated<typename col_type_info::TDT>(
506-
*input_column.column_,
507-
[&groups, this](auto enumerating_it) {
508-
if constexpr (is_floating_point_type(col_type_info::data_type)) {
509-
if (ARCTICDB_LIKELY(!std::isnan(enumerating_it.value()))) {
510-
auto& val = aggregated_[groups[enumerating_it.idx()]];
511-
++val;
512-
sparse_map_.set(groups[enumerating_it.idx()]);
508+
util::BitSet::bulk_insert_iterator inserter(sparse_map_);
509+
details::visit_type(
510+
input_column.column_->type().data_type(),
511+
[&input_column, &groups, &inserter, this](auto col_tag) {
512+
using col_type_info = ScalarTypeInfo<decltype(col_tag)>;
513+
Column::for_each_enumerated<typename col_type_info::TDT>(
514+
*input_column.column_,
515+
[&groups, &inserter, this](auto enumerating_it) {
516+
if constexpr (is_floating_point_type(col_type_info::data_type)) {
517+
if (ARCTICDB_LIKELY(!std::isnan(enumerating_it.value()))) {
518+
auto& val = aggregated_[groups[enumerating_it.idx()]];
519+
++val;
520+
inserter = groups[enumerating_it.idx()];
521+
}
522+
} else {
523+
auto& val = aggregated_[groups[enumerating_it.idx()]];
524+
++val;
525+
inserter = groups[enumerating_it.idx()];
526+
}
513527
}
514-
} else {
515-
auto& val = aggregated_[groups[enumerating_it.idx()]];
516-
++val;
517-
sparse_map_.set(groups[enumerating_it.idx()]);
518-
}
519-
}
520-
);
521-
});
528+
);
529+
}
530+
);
531+
inserter.flush();
522532
}
523533

524534
SegmentInMemory CountAggregatorData::finalize(const ColumnName& output_column_name, bool, size_t unique_values) {
@@ -561,11 +571,12 @@ void FirstAggregatorData::aggregate(
561571
using GlobalRawType = typename GlobalTypeDescriptorTag::DataTypeTag::raw_type;
562572
aggregated_.resize(sizeof(GlobalRawType) * unique_values);
563573
sparse_map_.resize(unique_values);
574+
util::BitSet::bulk_insert_iterator inserter(sparse_map_);
564575
auto col_data = input_column.column_->data();
565576
auto out_ptr = reinterpret_cast<GlobalRawType*>(aggregated_.data());
566577
details::visit_type(
567578
input_column.column_->type().data_type(),
568-
[this, &groups, &out_ptr, &col_data](auto col_tag) {
579+
[this, &groups, &out_ptr, &col_data, &inserter](auto col_tag) {
569580
using ColumnTagType = std::decay_t<decltype(col_tag)>;
570581
using ColumnType = typename ColumnTagType::raw_type;
571582
auto groups_pos = 0;
@@ -580,19 +591,20 @@ void FirstAggregatorData::aggregate(
580591
if (is_first_group_el || std::isnan(static_cast<ColumnType>(val))) {
581592
groups_cache_.insert(groups[groups_pos]);
582593
val = GlobalRawType(*ptr);
583-
sparse_map_.set(groups[groups_pos]);
594+
inserter = groups[groups_pos];
584595
}
585596
} else {
586597
if (is_first_group_el) {
587598
groups_cache_.insert(groups[groups_pos]);
588599
val = GlobalRawType(*ptr);
589-
sparse_map_.set(groups[groups_pos]);
600+
inserter = groups[groups_pos];
590601
}
591602
}
592603
}
593604
}
594605
}
595606
);
607+
inserter.flush();
596608
});
597609
}
598610
}
@@ -641,11 +653,12 @@ void LastAggregatorData::aggregate(
641653
using GlobalRawType = typename GlobalTypeDescriptorTag::DataTypeTag::raw_type;
642654
aggregated_.resize(sizeof(GlobalRawType) * unique_values);
643655
sparse_map_.resize(unique_values);
656+
util::BitSet::bulk_insert_iterator inserter(sparse_map_);
644657
auto col_data = input_column.column_->data();
645658
auto out_ptr = reinterpret_cast<GlobalRawType*>(aggregated_.data());
646659
details::visit_type(
647660
input_column.column_->type().data_type(),
648-
[&groups, &out_ptr, &col_data, this](auto col_tag) {
661+
[&groups, &out_ptr, &col_data, &inserter, this](auto col_tag) {
649662
using ColumnTagType = std::decay_t<decltype(col_tag)>;
650663
using ColumnType = typename ColumnTagType::raw_type;
651664
auto groups_pos = 0;
@@ -662,16 +675,17 @@ void LastAggregatorData::aggregate(
662675
if (is_first_group_el || !std::isnan(static_cast<ColumnType>(curr))) {
663676
groups_cache_.insert(groups[groups_pos]);
664677
val = curr;
665-
sparse_map_.set(groups[groups_pos]);
678+
inserter = groups[groups_pos];
666679
}
667680
} else {
668681
val = GlobalRawType(*ptr);
669-
sparse_map_.set(groups[groups_pos]);
682+
inserter = groups[groups_pos];
670683
}
671684
}
672685
}
673686
}
674687
);
688+
inserter.flush();
675689
});
676690
}
677691
}

python/tests/unit/arcticdb/version_store/test_append.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -755,7 +755,7 @@ def test_append_series_with_different_row_range_index_name(lmdb_version_store_dy
755755
@pytest.mark.xfail(reason="Wrong normalization metadata update. Monday ref: 10029194063")
756756
def test_append_no_columns(lmdb_version_store_dynamic_schema_v1):
757757
lib = lmdb_version_store_dynamic_schema_v1
758-
to_write = pd.DataFrame({"col" : [1, 2, 3]}, index=pd.date_range(pd.Timestamp(2025, 1, 1), periods=3))
758+
to_write = pd.DataFrame({"col": [1, 2, 3]}, index=pd.date_range(pd.Timestamp(2025, 1, 1), periods=3))
759759
to_append = pd.DataFrame({}, index=pd.date_range(pd.Timestamp(2025, 1, 4), periods=3))
760760
lib.write("sym", to_write)
761761
lib.append("sym", to_append)

python/tests/unit/arcticdb/version_store/test_arrow.py

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -769,40 +769,43 @@ def test_resample_empty_slices(lmdb_version_store_dynamic_schema_v1):
769769
lib = lmdb_version_store_dynamic_schema_v1
770770
lib.set_output_format(OutputFormat.EXPERIMENTAL_ARROW)
771771
sym = "sym"
772+
772773
def gen_df(start, num_rows, with_columns=True):
773774
data = {}
774775
if with_columns:
775776
data = {
776-
"mean_col": np.arange(start, start+num_rows, dtype=np.float64),
777-
"sum_col": np.arange(start, start+num_rows, dtype=np.float64),
778-
"min_col": np.arange(start, start+num_rows, dtype=np.float64),
779-
"max_col": np.arange(start, start+num_rows, dtype=np.float64),
780-
"count_col": np.arange(start, start+num_rows, dtype=np.float64),
777+
"mean_col": np.arange(start, start + num_rows, dtype=np.float64),
778+
"sum_col": np.arange(start, start + num_rows, dtype=np.float64),
779+
"min_col": np.arange(start, start + num_rows, dtype=np.float64),
780+
"max_col": np.arange(start, start + num_rows, dtype=np.float64),
781+
"count_col": np.arange(start, start + num_rows, dtype=np.float64),
781782
}
782783
index = pd.date_range(pd.Timestamp(2025, 1, start), periods=num_rows)
783784
return pd.DataFrame(data, index=index)
784785

785786
slices = [
786787
gen_df(1, 3),
787-
gen_df(4, 2, False), # We expect an entirely missing slice 4th-5th
788+
gen_df(4, 2, False), # We expect an entirely missing slice 4th-5th
788789
gen_df(6, 3),
789-
gen_df(9, 5, False), # We expect two missing slices 10th-11th and 12th-13th
790+
gen_df(9, 5, False), # We expect two missing slices 10th-11th and 12th-13th
790791
gen_df(14, 2),
791-
gen_df(16, 2, False), # We expect one missing slice 16th-17th
792+
gen_df(16, 2, False), # We expect one missing slice 16th-17th
792793
# TODO: If we don't finish with an append with columns our normalization metadata will be broken
793-
gen_df(18, 1)
794+
gen_df(18, 1),
794795
]
795796
for df_slice in slices:
796797
lib.append(sym, df_slice, write_if_missing=True)
797798

798799
q = QueryBuilder()
799-
q.resample("2d").agg({
800-
"mean_col": "mean",
801-
"sum_col": "sum",
802-
"min_col": "min",
803-
"max_col": "max",
804-
"count_col": "count",
805-
})
800+
q.resample("2d").agg(
801+
{
802+
"mean_col": "mean",
803+
"sum_col": "sum",
804+
"min_col": "min",
805+
"max_col": "max",
806+
"count_col": "count",
807+
}
808+
)
806809

807810
table = lib.read(sym, query_builder=q).data
808811
# sum_col is correctly filled with 0s instead of nulls

0 commit comments

Comments
 (0)