Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pp/entrypoint/head/lss.h
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
#pragma once

#include <memory>
#include <variant>

#include "bare_bones/exception.h"
#include "primitives/primitives.h"
#include "primitives/snug_composites.h"
#include "series_index/queryable_encoding_bimap.h"
#include "series_index/trie/cedarpp_tree.h"

namespace entrypoint::head {

using LsIdsSlice = BareBones::Vector<PromPP::Primitives::LabelSetID>;
using LsIdsSlicePtr = std::unique_ptr<LsIdsSlice>;

enum class LssType : uint32_t {
kEncodingBimap = 0,
kOrderedEncodingBimap,
Expand Down
19 changes: 16 additions & 3 deletions pp/entrypoint/primitives_lss.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

using GoLabelMatchers = PromPP::Primitives::Go::SliceView<PromPP::Prometheus::LabelMatcherTrait<PromPP::Primitives::Go::String>>;
using GoSliceOfString = PromPP::Primitives::Go::Slice<PromPP::Primitives::Go::String>;
using entrypoint::head::LsIdsSlice;
using entrypoint::head::LsIdsSlicePtr;
using entrypoint::head::LssType;
using entrypoint::head::LssVariantPtr;
using entrypoint::head::QueryableEncodingBimap;
Expand Down Expand Up @@ -255,7 +257,8 @@ extern "C" void prompp_primitives_lss_copy_added_series(uint64_t source_lss, uin
auto& dst = std::get<QueryableEncodingBimap>(*std::bit_cast<entrypoint::head::LssVariant*>(destination_lss));
src.build_deferred_indexes();

series_index::QueryableEncodingBimapCopier copier(src, src.sorting_index(), src.added_series(), dst);
BareBones::Vector<PromPP::Primitives::LabelSetID> dst_src_ids_mapping;
series_index::QueryableEncodingBimapCopier copier(src, src.sorting_index(), src.added_series(), dst, dst_src_ids_mapping);
copier.copy_added_series_and_build_indexes();
}

Expand All @@ -281,11 +284,21 @@ extern "C" void prompp_primitives_lss_bitset_dtor(void* args) {
static_cast<Arguments*>(args)->~Arguments();
}

extern "C" void prompp_primitives_readonly_lss_copy_added_series(uint64_t source_lss, uint64_t source_bitset, uint64_t destination_lss) {
extern "C" void prompp_primitives_readonly_lss_copy_added_series(uint64_t source_lss, uint64_t source_bitset, uint64_t destination_lss, uint64_t ids_mapping) {
const auto& src = std::get<entrypoint::head::ReadonlyLss>(*std::bit_cast<entrypoint::head::LssVariant*>(source_lss));
const auto& src_bitset = *std::bit_cast<BareBones::Bitset*>(source_bitset);
auto& dst = std::get<QueryableEncodingBimap>(*std::bit_cast<entrypoint::head::LssVariant*>(destination_lss));
const auto dst_src_ids_mapping = std::bit_cast<LsIdsSlicePtr*>(ids_mapping);
*dst_src_ids_mapping = std::make_unique<LsIdsSlice>();

series_index::QueryableEncodingBimapCopier copier(src, src.sorting_index(), src_bitset, dst);
series_index::QueryableEncodingBimapCopier copier(src, src.sorting_index(), src_bitset, dst, **dst_src_ids_mapping);
copier.copy_added_series_and_build_indexes();
}

void prompp_primitives_free_ls_ids_mapping(void* args) {
struct Arguments {
LsIdsSlicePtr ls_ids_mapping;
};

static_cast<Arguments*>(args)->~Arguments();
}
13 changes: 12 additions & 1 deletion pp/entrypoint/primitives_lss.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,11 +223,22 @@ void prompp_primitives_lss_bitset_dtor(void* args);
* @param source_lss pointer to source label sets;
* @param source_bitset pointer to source bitset;
* @param destination_lss pointer to destination label sets;
* @param ids_mapping pointer to uintptr
*
* @attention This binding used as a CGO call!!!
*
*/
void prompp_primitives_readonly_lss_copy_added_series(uint64_t source_lss, uint64_t source_bitset, uint64_t destination_lss);
void prompp_primitives_readonly_lss_copy_added_series(uint64_t source_lss, uint64_t source_bitset, uint64_t destination_lss, uint64_t ids_mapping);

/**
* @brief destroy ls ids mapping
*
* @param args {
* ls_ids_mapping uintptr
* }
*
*/
void prompp_primitives_free_ls_ids_mapping(void* args);

#ifdef __cplusplus
} // extern "C"
Expand Down
12 changes: 11 additions & 1 deletion pp/entrypoint/prometheus_relabeler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,7 @@ extern "C" void prompp_prometheus_per_goroutine_relabeler_append_relabeler_serie
}
}

void prompp_prometheus_per_goroutine_relabeler_track_stale_nans(void* args) {
extern "C" void prompp_prometheus_per_goroutine_relabeler_track_stale_nans(void* args) {
struct Arguments {
PromPP::Primitives::Go::SliceView<PromPP::Prometheus::Relabel::InnerSeries*> inner_series;
StaleNaNsStatePtr stale_nans_state;
Expand All @@ -879,3 +879,13 @@ void prompp_prometheus_per_goroutine_relabeler_track_stale_nans(void* args) {
const auto in = static_cast<Arguments*>(args);
PromPP::Prometheus::Relabel::PerGoroutineRelabeler::track_stale_nans(in->inner_series, *in->stale_nans_state, in->default_timestamp);
}

extern "C" void prompp_remap_stale_nans_state(void* args) {
struct Arguments {
StaleNaNsStatePtr stale_nans_state;
entrypoint::head::LsIdsSlicePtr dst_src_ls_ids_mapping;
};

const auto in = static_cast<Arguments*>(args);
in->stale_nans_state->remap(*in->dst_src_ls_ids_mapping);
}
10 changes: 10 additions & 0 deletions pp/entrypoint/prometheus_relabeler.h
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,16 @@ void prompp_prometheus_per_goroutine_relabeler_append_relabeler_series(void* arg
*/
void prompp_prometheus_per_goroutine_relabeler_track_stale_nans(void* args);

/**
* @brief add stale nans to inner series if needed
*
* @param args {
* stale_nan_state uintptr // pointer to source state
* ls_ids_mapping uintptr // pointer to dst_src_ls_ids_mapping
* }
*/
void prompp_remap_stale_nans_state(void* args);

#ifdef __cplusplus
} // extern "C"
#endif
34 changes: 32 additions & 2 deletions pp/go/cppbridge/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ var (
)
)

func freeBytes(b []byte) {
func freeBytes[T any](b []T) {
testGC()
fastcgo.UnsafeCall1(
C.prompp_free_bytes,
Expand Down Expand Up @@ -1561,18 +1561,35 @@ func primitivesLSSBitsetDtor(bitset uintptr) {

// primitivesReadonlyLSSCopyAddedSeries copy the label sets from the source lss to the destination lss
// that were added source lss.
func primitivesReadonlyLSSCopyAddedSeries(source, sourceBitset, destination uintptr) {
func primitivesReadonlyLSSCopyAddedSeries(source, sourceBitset, destination uintptr) uintptr {
var dstSrcLsIdsMapping uintptr

C.prompp_primitives_readonly_lss_copy_added_series(
C.uint64_t(source),
C.uint64_t(sourceBitset),
C.uint64_t(destination),
C.uint64_t(uintptr(unsafe.Pointer(&dstSrcLsIdsMapping))),
)

return dstSrcLsIdsMapping
}

func primitivesLSSCopyAddedSeries(source, destination uintptr) {
C.prompp_primitives_lss_copy_added_series(C.uint64_t(source), C.uint64_t(destination))
}

func primitivesFreeLsIdsMapping(lsIdsMapping uintptr) {
args := struct {
lsIdsMapping uintptr
}{lsIdsMapping}

testGC()
fastcgo.UnsafeCall1(
C.prompp_primitives_free_ls_ids_mapping,
uintptr(unsafe.Pointer(&args)),
)
}

//
// StatelessRelabeler
//
Expand Down Expand Up @@ -3762,3 +3779,16 @@ func prometheusPerGoroutineRelabelerTrackStaleNans(
uintptr(unsafe.Pointer(&args)),
)
}

func prometheusRemapStaleNansState(staleNansState, lsIdsMapping uintptr) {
args := struct {
staleNansState uintptr
lsIdsMapping uintptr
}{staleNansState, lsIdsMapping}

testGC()
fastcgo.UnsafeCall1(
C.prompp_remap_stale_nans_state,
uintptr(unsafe.Pointer(&args)),
)
}
23 changes: 22 additions & 1 deletion pp/go/cppbridge/entrypoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -694,11 +694,22 @@ void prompp_primitives_lss_bitset_dtor(void* args);
* @param source_lss pointer to source label sets;
* @param source_bitset pointer to source bitset;
* @param destination_lss pointer to destination label sets;
* @param ids_mapping pointer to []uint32;
*
* @attention This binding used as a CGO call!!!
*
*/
void prompp_primitives_readonly_lss_copy_added_series(uint64_t source_lss, uint64_t source_bitset, uint64_t destination_lss);
void prompp_primitives_readonly_lss_copy_added_series(uint64_t source_lss, uint64_t source_bitset, uint64_t destination_lss, uint64_t ids_mapping);

/**
* @brief destroy ls ids mapping
*
* @param args {
* ls_ids_mapping uintptr
* }
*
*/
void prompp_primitives_free_ls_ids_mapping(void* args);

#ifdef __cplusplus
} // extern "C"
Expand Down Expand Up @@ -1321,6 +1332,16 @@ void prompp_prometheus_per_goroutine_relabeler_append_relabeler_series(void* arg
*/
void prompp_prometheus_per_goroutine_relabeler_track_stale_nans(void* args);

/**
* @brief add stale nans to inner series if needed
*
* @param args {
* stale_nan_state uintptr // pointer to source state
* ls_ids_mapping uintptr // pointer to dst_src_ls_ids_mapping
* }
*/
void prompp_remap_stale_nans_state(void* args);

#ifdef __cplusplus
} // extern "C"
#endif
Expand Down
20 changes: 18 additions & 2 deletions pp/go/cppbridge/lss_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,27 @@ func (lss *LabelSetSnapshot) Query(selector uintptr) *LSSQueryResult {
return result
}

type IdsMapping struct {
pointer uintptr
}

func (m *IdsMapping) IsEmpty() bool {
return m.pointer == uintptr(0)
}

// CopyAddedSeries copy the label sets from the source lss to the destination lss
// that were added source lss.
func (lss *LabelSetSnapshot) CopyAddedSeries(bitsetSeries *BitsetSeries, destination *LabelSetStorage) {
primitivesReadonlyLSSCopyAddedSeries(lss.pointer, bitsetSeries.pointer, destination.pointer)
func (lss *LabelSetSnapshot) CopyAddedSeries(bitsetSeries *BitsetSeries, destination *LabelSetStorage) *IdsMapping {
idsMapping := &IdsMapping{
pointer: primitivesReadonlyLSSCopyAddedSeries(lss.pointer, bitsetSeries.pointer, destination.pointer),
}
runtime.SetFinalizer(idsMapping, func(idsMapping *IdsMapping) {
primitivesFreeLsIdsMapping(idsMapping.pointer)
})

runtime.KeepAlive(lss)
runtime.KeepAlive(bitsetSeries)
runtime.KeepAlive(destination)

return idsMapping
}
22 changes: 18 additions & 4 deletions pp/go/cppbridge/prometheus_relabeler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math"
"regexp"
"runtime"
"slices"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -556,6 +557,12 @@ func NewStaleNansState() *StaleNansState {
return s
}

func (s *StaleNansState) Remap(mapping *IdsMapping) {
prometheusRemapStaleNansState(s.state, mapping.pointer)
runtime.KeepAlive(s)
runtime.KeepAlive(mapping)
}

// RelabelerStats statistics return from relabeler.
type RelabelerStats struct {
SamplesAdded uint32
Expand Down Expand Up @@ -1614,11 +1621,14 @@ func (s *StateV2) EnableTrackStaleness() {
func (s *StateV2) Reconfigure(
generationHead uint64,
numberOfShards uint16,
staleNansIdsMappings []*IdsMapping,
) {
if s.status&inited == inited && generationHead == s.generationHead {
return
}

remapStaleNansState := (generationHead-s.generationHead == 1) && (int(numberOfShards) == len(s.staleNansStates))

// long way
s.locker.Lock()

Expand All @@ -1637,7 +1647,7 @@ func (s *StateV2) Reconfigure(
}

s.resetCaches(numberOfShards)
s.resetStaleNansStates(numberOfShards)
s.resetStaleNansStates(numberOfShards, remapStaleNansState, staleNansIdsMappings)
s.status |= inited
s.generationHead = generationHead

Expand Down Expand Up @@ -1721,7 +1731,7 @@ func (s *StateV2) resetCaches(numberOfShards uint16) {
}

// resetStaleNansStates recreate StaleNansStates.
func (s *StateV2) resetStaleNansStates(numberOfShards uint16) {
func (s *StateV2) resetStaleNansStates(numberOfShards uint16, remapStaleNansState bool, staleNansIdsMappings []*IdsMapping) {
if !s.trackStaleness {
return
}
Expand All @@ -1736,10 +1746,14 @@ func (s *StateV2) resetStaleNansStates(numberOfShards uint16) {
s.staleNansStates = s.staleNansStates[:numberOfShards]
case len(s.staleNansStates) < int(numberOfShards):
// grow
s.staleNansStates = make([]*StaleNansState, numberOfShards)
s.staleNansStates = slices.Grow(s.staleNansStates, int(numberOfShards)-len(s.staleNansStates))[:numberOfShards]
}

for shardID := range s.staleNansStates {
s.staleNansStates[shardID] = NewStaleNansState()
if remapStaleNansState && staleNansIdsMappings[shardID] != nil && !staleNansIdsMappings[shardID].IsEmpty() {
s.staleNansStates[shardID].Remap(staleNansIdsMappings[shardID])
} else {
s.staleNansStates[shardID] = NewStaleNansState()
}
}
}
Loading
Loading