Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Untitled Diagram.drawio
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
<mxfile host="app.diagrams.net" modified="2021-05-21T02:44:41.807Z" agent="5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.90 Safari/537.36" version="14.4.7" etag="ZaHaef2dIdKwSCI_kHiP" type="github"><diagram id="pv2JXVRlaylYAFMuSIbh">UzV2zq1wL0osyPDNT0nNUTV2VTV2LsrPL4GwciucU3NyVI0MMlNUjV1UjYwMgFjVyA2HrCFY1qAgsSg1rwSLBiADYTaQg2Y1AA==</diagram></mxfile>
38 changes: 34 additions & 4 deletions pkg/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package coordinator

import (
"context"
"fmt"
"github.com/pkg/errors"
"strings"
"time"
"tkestack.io/kvass/pkg/discovery"
"tkestack.io/kvass/pkg/shard"
Expand Down Expand Up @@ -100,6 +102,8 @@ func (c *Coordinator) runOnce() error {
active = c.getActive()
shardsInfo = c.getShardInfos(shards)
changeAbleShards = changeAbleShardsInfo(shardsInfo)
// global targets are those jobs that with "kvass_global_" as its name prefix
globalTargets = getGlobalTarget(active)
)

if int32(len(changeAbleShards)) < c.minShard { // insure that scaling up to min shard
Expand All @@ -110,15 +114,23 @@ func (c *Coordinator) runOnce() error {
}

lastGlobalScrapeStatus := c.globalScrapeStatus(active, shardsInfo)
c.gcTargets(changeAbleShards, active)
needSpace := c.alleviateShards(changeAbleShards)
needSpace += c.assignNoScrapingTargets(shardsInfo, active, lastGlobalScrapeStatus)
c.gcTargets(changeAbleShards, active, globalTargets)
needSpace := c.alleviateShards(changeAbleShards, globalTargets)
// TODO: assign global target
needSpace += c.assignNoScrapingTargets(shardsInfo, active, globalTargets, lastGlobalScrapeStatus)

scale := int32(len(shardsInfo))
if needSpace != 0 {
c.log.Infof("need space %d", needSpace)
scale = c.tryScaleUp(shardsInfo, needSpace)
globalSeries := getSeriesTotal(globalTargets, lastGlobalScrapeStatus)
c.log.Info(fmt.Sprintf("Global series: %d", globalSeries))
if c.maxSeries - globalSeries <= 0 {
c.log.Error("There is no enough to assign other targets after global targets has been assigned.")
return fmt.Errorf("There is no enough to assign other targets after global targets has been assigned")
}
scale = c.tryScaleUp(shardsInfo, globalSeries, needSpace)
} else if c.maxIdleTime != 0 {
// TODO: if there is only global target in the shard, need to scale down
scale = c.tryScaleDown(shardsInfo)
}

Expand All @@ -143,3 +155,21 @@ func (c *Coordinator) runOnce() error {
c.lastGlobalScrapeStatus = newLastGlobalScrapeStatus
return nil
}

func getSeriesTotal(targets map[uint64]*discovery.SDTargets, globalScrapeStatus map[uint64]*target.ScrapeStatus,) int64 {
var total int64 = 0
for h, _ := range targets {
total += globalScrapeStatus[h].Series
}
return total
}

func getGlobalTarget(active map[uint64]*discovery.SDTargets) map[uint64]*discovery.SDTargets {
globalTargets := make(map[uint64]*discovery.SDTargets)
for k, v := range active {
if strings.HasPrefix(v.Job, "kvass_global_") {
globalTargets[k] = v
}
}
return globalTargets
}
39 changes: 32 additions & 7 deletions pkg/coordinator/rebalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package coordinator

import (
"fmt"
"github.com/prometheus/prometheus/scrape"
"golang.org/x/sync/errgroup"
"math/rand"
Expand Down Expand Up @@ -147,13 +148,17 @@ func (c *Coordinator) applyShardsInfo(shards []*shardInfo) {
// 1. not exist in active targets
// 2. is in_transfer state and had been scraped by other shard
// 3. is normal state and had been scraped by other shard with lower head series
func (c *Coordinator) gcTargets(changeAbleShards []*shardInfo, active map[uint64]*discovery.SDTargets) {
func (c *Coordinator) gcTargets(changeAbleShards []*shardInfo, active , globalTargets map[uint64]*discovery.SDTargets) {
for _, s := range changeAbleShards {
for h, tar := range s.scraping {
// target not exist in active targets
if _, exist := active[h]; !exist {
delete(s.scraping, h)
continue
} else {
if _, exist := globalTargets[h];exist {
continue
}
}

if tar.ScrapeTimes < minWaitScrapeTimes {
Expand Down Expand Up @@ -185,7 +190,7 @@ func (c *Coordinator) gcTargets(changeAbleShards []*shardInfo, active map[uint64
// make expect series of targets less than maxSeries * 0.5 if current head series > maxSeries 1.4
// make expect series of targets less than maxSeries * 0.2 if current head series > maxSeries 1.6
// remove all targets if current head series > maxSeries 1.8
func (c *Coordinator) alleviateShards(changeAbleShards []*shardInfo) (needSpace int64) {
func (c *Coordinator) alleviateShards(changeAbleShards []*shardInfo, globalTargets map[uint64]*discovery.SDTargets) (needSpace int64) {
var threshold = []struct {
maxSeriesRate float64
expectSeriesRate float64
Expand All @@ -212,7 +217,7 @@ func (c *Coordinator) alleviateShards(changeAbleShards []*shardInfo) (needSpace
for _, t := range threshold {
if s.runtime.HeadSeries >= seriesWithRate(c.maxSeries, t.maxSeriesRate) {
c.log.Infof("%s need alleviate", s.shard.ID)
needSpace += c.alleviateShard(s, changeAbleShards, seriesWithRate(c.maxSeries, t.expectSeriesRate))
needSpace += c.alleviateShard(s, changeAbleShards, seriesWithRate(c.maxSeries, t.expectSeriesRate), globalTargets)
break
}
}
Expand All @@ -221,13 +226,17 @@ func (c *Coordinator) alleviateShards(changeAbleShards []*shardInfo) (needSpace
return needSpace
}

func (c *Coordinator) alleviateShard(s *shardInfo, changeAbleShards []*shardInfo, expSeries int64) (needSpace int64) {
func (c *Coordinator) alleviateShard(s *shardInfo, changeAbleShards []*shardInfo, expSeries int64, globalTargets map[uint64]*discovery.SDTargets) (needSpace int64) {
total := s.totalTargetsSeries()
for hash, tar := range s.scraping {
if total < expSeries {
break
}

if _, isGlobal := globalTargets[hash]; isGlobal {
continue
}

if tar.TargetState != target.StateNormal || tar.Health != scrape.HealthGood || tar.ScrapeTimes < minWaitScrapeTimes {
continue
}
Expand Down Expand Up @@ -268,18 +277,32 @@ func seriesWithRate(series int64, rate float64) int64 {
func (c *Coordinator) assignNoScrapingTargets(
shards []*shardInfo,
active map[uint64]*discovery.SDTargets,
globalTargets map[uint64]*discovery.SDTargets,
globalScrapeStatus map[uint64]*target.ScrapeStatus,
) (needSpace int64) {
healthShards := changeAbleShardsInfo(shards)
scraping := map[uint64]bool{}
for _, s := range shards {
// Check and assign global targets for every shard.
for h, _ := range globalTargets {
if _, exist := s.scraping[h]; s.scraping != nil && !exist {
s.scraping[h] = globalScrapeStatus[h]
s.runtime.HeadSeries += globalScrapeStatus[h].Series
}
}

for hash := range s.scraping {
scraping[hash] = true
}
}

for hash := range active {
if scraping[hash] {
//isGlobal := false
//if _, isGlobal = globalTargets[hash]; scraping[hash] && !isGlobal{
// continue
//}

if scraping[hash]{
continue
}

Expand Down Expand Up @@ -432,10 +455,12 @@ func (c *Coordinator) shardBecomeIdle(src *shardInfo, shards []*shardInfo) bool
}

// tryScaleUp calculate the expect scale according to 'needSpace'
func (c *Coordinator) tryScaleUp(shard []*shardInfo, needSpace int64) int32 {
func (c *Coordinator) tryScaleUp(shard []*shardInfo, globalSeries, needSpace int64) int32 {
health := changeAbleShardsInfo(shard)
free := c.maxSeries - globalSeries
c.log.Info(fmt.Sprintf("After global target assigned, free space: %d", free))
exp := int32(len(health))
exp += int32((needSpace / c.maxSeries) + 1)
exp += int32((needSpace / free) + 1)

if exp < int32(len(shard)) {
exp = int32(len(shard))
Expand Down
23 changes: 20 additions & 3 deletions pkg/shard/kubernetes/shardmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,29 @@ func (s *shardManager) Shards() ([]*shard.Shard, error) {
return nil, errors.Wrap(err, "list pod")
}

ret := make([]*shard.Shard, 0)
sts, err := s.cli.AppsV1().StatefulSets(s.sts.Namespace).Get(context.TODO(), s.sts.Name, v12.GetOptions{})
if err != nil {
return nil, err
}

podMap := map[string]v1.Pod{}
for _, p := range pods.Items {
url := fmt.Sprintf("http://%s:%d", p.Status.PodIP, s.port)
ret = append(ret, shard.NewShard(p.Name, url, k8sutil.IsPodReady(&p), s.lg.WithField("shard", p.Name)))
podMap[p.Name] = p
}

ret := make([]*shard.Shard, 0)
for i := int32(0); i < *sts.Spec.Replicas; i ++ {
if p, ok := podMap[fmt.Sprintf("%s-%d", sts.Name, i)]; ok {
url := fmt.Sprintf("http://%s:%d", p.Status.PodIP, s.port)
ret = append(ret, shard.NewShard(p.Name, url, k8sutil.IsPodReady(&p), s.lg.WithField("shard", p.Name)))
}
}

//for _, p := range pods.Items {
// url := fmt.Sprintf("http://%s:%d", p.Status.PodIP, s.port)
// ret = append(ret, shard.NewShard(p.Name, url, k8sutil.IsPodReady(&p), s.lg.WithField("shard", p.Name)))
//}

return ret, nil
}

Expand Down
16 changes: 14 additions & 2 deletions pkg/sidecar/targets.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ package sidecar

import (
"encoding/json"
"fmt"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"io/ioutil"
"os"
"path"
"strings"
"time"
"tkestack.io/kvass/pkg/shard"
"tkestack.io/kvass/pkg/target"
Expand All @@ -44,6 +46,7 @@ type TargetsInfo struct {
IdleAt *time.Time
// Status is the runtime status of all targets
Status map[uint64]*target.ScrapeStatus `json:"-"`
globalTargetMap map[uint64]bool
}

func newTargetsInfo() TargetsInfo {
Expand Down Expand Up @@ -118,19 +121,27 @@ func (t *TargetsManager) UpdateTargets(req *shard.UpdateTargetsRequest) error {
}

func (t *TargetsManager) updateIdleState() {
if len(t.targets.Status) == 0 && t.targets.IdleAt == nil {
t.log.Info(fmt.Sprintf("Status len: %d", len(t.targets.Status)))
t.log.Info(fmt.Sprintf("Global target len: %d", len(t.targets.globalTargetMap)))
normalTarNum := len(t.targets.Status) - len(t.targets.globalTargetMap)
if normalTarNum == 0 && t.targets.IdleAt == nil {
t.targets.IdleAt = types.TimePtr(time.Now())
t.log.Info(fmt.Sprintf("Shard is idle. Time: %s", t.targets.IdleAt.String()))
}

if len(t.targets.Status) != 0 {
if normalTarNum != 0 {
t.targets.IdleAt = nil
}
}

func (t *TargetsManager) updateStatus() {
status := map[uint64]*target.ScrapeStatus{}
globalTargetMap := map[uint64]bool{}
for job, ts := range t.targets.Targets {
for _, tar := range ts {
if strings.HasPrefix(job, "kvass_global_") {
globalTargetMap[tar.Hash] = true
}
if t.targets.Status[tar.Hash] == nil {
status[tar.Hash] = target.NewScrapeStatus(tar.Series)
} else {
Expand All @@ -145,6 +156,7 @@ func (t *TargetsManager) updateStatus() {
}
}
t.targets.Status = status
t.targets.globalTargetMap = globalTargetMap
}

func (t *TargetsManager) doCallbacks() error {
Expand Down