Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions api/v1/kubegres_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ import (

// ----------------------- SPEC -------------------------------------------

type KubegresNodeSet struct {
// Name of this set of nodes. Becomes a part of the StatefulSet name.
Name string `json:"name"`
Affinity *v1.Affinity `json:"affinity,omitempty"`
Tolerations []v1.Toleration `json:"tolerations,omitempty"`
}

type KubegresDatabase struct {
Size string `json:"size,omitempty"`
VolumeMount string `json:"volumeMount,omitempty"`
Expand Down Expand Up @@ -67,6 +74,7 @@ type Probe struct {

type KubegresSpec struct {
Replicas *int32 `json:"replicas,omitempty"`
NodeSets []KubegresNodeSet `json:"nodeSets,omitempty"`
Image string `json:"image,omitempty"`
Port int32 `json:"port,omitempty"`
ImagePullSecrets []v1.LocalObjectReference `json:"imagePullSecrets,omitempty"`
Expand All @@ -85,8 +93,8 @@ type KubegresSpec struct {
// ----------------------- STATUS -----------------------------------------

type KubegresStatefulSetOperation struct {
InstanceIndex int32 `json:"instanceIndex,omitempty"`
Name string `json:"name,omitempty"`
Instance string `json:"instance,omitempty"`
Name string `json:"name,omitempty"`
}

type KubegresStatefulSetSpecUpdateOperation struct {
Expand All @@ -105,7 +113,7 @@ type KubegresBlockingOperation struct {
}

type KubegresStatus struct {
LastCreatedInstanceIndex int32 `json:"lastCreatedInstanceIndex,omitempty"`
LastCreatedInstance string `json:"lastCreatedInstance,omitempty"`
BlockingOperation KubegresBlockingOperation `json:"blockingOperation,omitempty"`
PreviousBlockingOperation KubegresBlockingOperation `json:"previousBlockingOperation,omitempty"`
EnforcedReplicas int32 `json:"enforcedReplicas,omitempty"`
Expand Down
35 changes: 35 additions & 0 deletions api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

693 changes: 684 additions & 9 deletions config/crd/bases/kubegres.reactive-tech.io_kubegres.yaml

Large diffs are not rendered by default.

43 changes: 41 additions & 2 deletions controllers/ctx/KubegresContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package ctx

import (
"context"
apps "k8s.io/api/apps/v1"
"reactive-tech.io/kubegres/api/v1"
"reactive-tech.io/kubegres/controllers/ctx/log"
"reactive-tech.io/kubegres/controllers/ctx/status"
Expand Down Expand Up @@ -53,6 +54,9 @@ const (
EnvVarNamePgData = "PGDATA"
EnvVarNameOfPostgresSuperUserPsw = "POSTGRES_PASSWORD"
EnvVarNameOfPostgresReplicationUserPsw = "POSTGRES_REPLICATION_PASSWORD"
NameLabelKey = "app.kubernetes.io/name"
InstanceLabelKey = "app.kubernetes.io/instance"
ReplicationRoleLabelKey = "app.kubegres.io/replication-role"
)

func (r *KubegresContext) GetServiceResourceName(isPrimary bool) string {
Expand All @@ -62,8 +66,8 @@ func (r *KubegresContext) GetServiceResourceName(isPrimary bool) string {
return r.Kubegres.Name + "-replica"
}

func (r *KubegresContext) GetStatefulSetResourceName(instanceIndex int32) string {
return r.Kubegres.Name + "-" + strconv.Itoa(int(instanceIndex))
func (r *KubegresContext) GetStatefulSetResourceName(instance string) string {
return r.Kubegres.Name + "-" + instance
}

func (r *KubegresContext) IsReservedVolumeName(volumeName string) bool {
Expand All @@ -72,3 +76,38 @@ func (r *KubegresContext) IsReservedVolumeName(volumeName string) bool {
volumeName == CustomConfigMapVolumeName ||
strings.Contains(volumeName, "kube-api")
}

func (r *KubegresContext) ReplicasCount() int32 {
if r.Kubegres.Spec.NodeSets == nil {
return *r.Kubegres.Spec.Replicas
}
return int32(len(r.Kubegres.Spec.NodeSets))
}

func (r *KubegresContext) GetNodeSetsFromSpec() []v1.KubegresNodeSet {
if r.Kubegres.Spec.NodeSets == nil {
nodeSets := make([]v1.KubegresNodeSet, *r.Kubegres.Spec.Replicas)
for i := int32(0); i < *r.Kubegres.Spec.Replicas; i += 1 {
nodeSets[i] = v1.KubegresNodeSet{
Name: strconv.Itoa(int(i)),
}
}
return nodeSets
}
return r.Kubegres.Spec.NodeSets
}

func (r *KubegresContext) GetInstanceFromStatefulSet(statefulSet apps.StatefulSet) string {
return statefulSet.Labels[InstanceLabelKey]
}

func (r *KubegresContext) GetNodeSetSpecFromInstance(instance string) *v1.KubegresNodeSet {
for _, nodeSet := range r.Kubegres.Spec.NodeSets {
if nodeSet.Name == instance {
return &nodeSet
}
}
return &v1.KubegresNodeSet{
Name: instance,
}
}
6 changes: 2 additions & 4 deletions controllers/ctx/resources/ResourcesContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package resources

import (
"context"

"github.com/go-logr/logr"
"k8s.io/client-go/tools/record"
postgresV1 "reactive-tech.io/kubegres/api/v1"
Expand Down Expand Up @@ -75,13 +74,12 @@ func CreateResourcesContext(kubegres *postgresV1.Kubegres,
client client.Client,
recorder record.EventRecorder) (rc *ResourcesContext, err error) {

setReplicaFieldToZeroIfNil(kubegres)
setReplicasFieldToZeroIfNil(kubegres)

rc = &ResourcesContext{}

rc.LogWrapper = log.LogWrapper{Kubegres: kubegres, Logger: logger, Recorder: recorder}
rc.LogWrapper.Info("KUBEGRES", "name", kubegres.Name, "Status", kubegres.Status)
//rc.LogWrapper.WithName(kubegres.Name)

rc.KubegresStatusWrapper = &status.KubegresStatusWrapper{
Kubegres: kubegres,
Expand Down Expand Up @@ -127,7 +125,7 @@ func CreateResourcesContext(kubegres *postgresV1.Kubegres,
return rc, nil
}

func setReplicaFieldToZeroIfNil(kubegres *postgresV1.Kubegres) {
func setReplicasFieldToZeroIfNil(kubegres *postgresV1.Kubegres) {
if kubegres.Spec.Replicas != nil {
return
}
Expand Down
10 changes: 5 additions & 5 deletions controllers/ctx/status/KubegresStatusWrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ type KubegresStatusWrapper struct {
statusFieldsToUpdate map[string]interface{}
}

func (r *KubegresStatusWrapper) GetLastCreatedInstanceIndex() int32 {
return r.Kubegres.Status.LastCreatedInstanceIndex
func (r *KubegresStatusWrapper) GetLastCreatedInstance() string {
return r.Kubegres.Status.LastCreatedInstance
}

func (r *KubegresStatusWrapper) SetLastCreatedInstanceIndex(value int32) {
r.addStatusFieldToUpdate("LastCreatedInstanceIndex", value)
r.Kubegres.Status.LastCreatedInstanceIndex = value
func (r *KubegresStatusWrapper) SetLastCreatedInstance(instance string) {
r.addStatusFieldToUpdate("LastCreatedInstance", instance)
r.Kubegres.Status.LastCreatedInstance = instance
}

func (r *KubegresStatusWrapper) GetBlockingOperation() v1.KubegresBlockingOperation {
Expand Down
14 changes: 7 additions & 7 deletions controllers/operation/BlockingOperation.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,17 @@ func (r *BlockingOperation) ActivateOperation(operationId string, stepId string)
return r.activateOperation(r.createOperationObj(operationId, stepId))
}

func (r *BlockingOperation) ActivateOperationOnStatefulSet(operationId string, stepId string, statefulSetInstanceIndex int32) error {
func (r *BlockingOperation) ActivateOperationOnStatefulSet(operationId string, stepId string, instance string) error {
blockingOperation := r.createOperationObj(operationId, stepId)
blockingOperation.StatefulSetOperation = r.createStatefulSetOperationObj(statefulSetInstanceIndex)
blockingOperation.StatefulSetOperation = r.createStatefulSetOperationObj(instance)
return r.activateOperation(blockingOperation)
}

func (r *BlockingOperation) ActivateOperationOnStatefulSetSpecUpdate(operationId string, stepId string,
statefulSetInstanceIndex int32, specDifferences string) error {
instance string, specDifferences string) error {

blockingOperation := r.createOperationObj(operationId, stepId)
blockingOperation.StatefulSetOperation = r.createStatefulSetOperationObj(statefulSetInstanceIndex)
blockingOperation.StatefulSetOperation = r.createStatefulSetOperationObj(instance)
blockingOperation.StatefulSetSpecUpdateOperation = r.createStatefulSetSpecUpdateOperationObj(specDifferences)

return r.activateOperation(blockingOperation)
Expand Down Expand Up @@ -154,10 +154,10 @@ func (r *BlockingOperation) createOperationObj(operationId string, stepId string
return v1.KubegresBlockingOperation{OperationId: operationId, StepId: stepId}
}

func (r *BlockingOperation) createStatefulSetOperationObj(statefulSetInstanceIndex int32) v1.KubegresStatefulSetOperation {
func (r *BlockingOperation) createStatefulSetOperationObj(instance string) v1.KubegresStatefulSetOperation {
return v1.KubegresStatefulSetOperation{
InstanceIndex: statefulSetInstanceIndex,
Name: r.kubegresContext.GetStatefulSetResourceName(statefulSetInstanceIndex),
Instance: instance,
Name: r.kubegresContext.GetStatefulSetResourceName(instance),
}
}

Expand Down
9 changes: 3 additions & 6 deletions controllers/operation/log/BlockingOperationLogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ func (r *BlockingOperationLogger) Log() {
}

func (r *BlockingOperationLogger) logActiveOperation() {

activeOperation := r.blockingOperation.GetActiveOperation()
activeOperationId := activeOperation.OperationId
nbreSecondsLeftBeforeTimeOut := r.blockingOperation.GetNbreSecondsLeftBeforeTimeOut()
Expand All @@ -37,7 +36,6 @@ func (r *BlockingOperationLogger) logActiveOperation() {
}

func (r *BlockingOperationLogger) logPreviouslyActiveOperation() {

previousActiveOperation := r.blockingOperation.GetPreviouslyActiveOperation()
operationId := previousActiveOperation.OperationId

Expand All @@ -51,20 +49,19 @@ func (r *BlockingOperationLogger) logPreviouslyActiveOperation() {
}

func (r *BlockingOperationLogger) logOperation(operation v1.KubegresBlockingOperation) []interface{} {

operationId := operation.OperationId
stepId := operation.StepId
hasTimedOut := operation.HasTimedOut
statefulSetInstanceIndex := operation.StatefulSetOperation.InstanceIndex
instance := operation.StatefulSetOperation.Instance
statefulSetSpecDifferences := operation.StatefulSetSpecUpdateOperation.SpecDifferences

var keysAndValues []interface{}
keysAndValues = append(keysAndValues, "OperationId", operationId)
keysAndValues = append(keysAndValues, "StepId", stepId)
keysAndValues = append(keysAndValues, "HasTimedOut", hasTimedOut)

if statefulSetInstanceIndex != 0 {
keysAndValues = append(keysAndValues, "StatefulSetInstanceIndex", statefulSetInstanceIndex)
if instance != "" {
keysAndValues = append(keysAndValues, "Instance", instance)
}

if statefulSetSpecDifferences != "" {
Expand Down
16 changes: 15 additions & 1 deletion controllers/spec/checker/SpecChecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,25 @@ func (r *SpecChecker) CheckSpec() (SpecCheckResult, error) {
specCheckResult.FatalErrorMessage = r.createErrMsgSpecUndefined("spec.env.POSTGRES_REPLICATION_PASSWORD")
}

if *spec.Replicas <= 0 {
if *spec.Replicas <= 0 && len(spec.NodeSets) == 0 {
specCheckResult.HasSpecFatalError = true
specCheckResult.FatalErrorMessage = r.createErrMsgSpecUndefined("spec.replicas")
}

if *spec.Replicas > 0 && len(spec.NodeSets) > 0 {
specCheckResult.HasSpecFatalError = true
specCheckResult.FatalErrorMessage = r.logSpecErrMsg("In the Resources Spec the value of " +
"'spec.replicas' and 'spec.nodeSets' are mutually exclusive. " +
"Please set only one of the value otherwise this operator cannot work correctly.")
}

for _, nodeSet := range spec.NodeSets {
if nodeSet.Name == "" {
specCheckResult.HasSpecFatalError = true
specCheckResult.FatalErrorMessage = r.createErrMsgSpecUndefined("spec.nodeSets[].Name")
}
}

if spec.Image == "" {
specCheckResult.HasSpecFatalError = true
specCheckResult.FatalErrorMessage = r.createErrMsgSpecUndefined("spec.image")
Expand Down
3 changes: 1 addition & 2 deletions controllers/spec/defaultspec/UndefinedSpecValuesChecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ func (r *UndefinedSpecValuesChecker) updateSpec() error {
}

func (r *UndefinedSpecValuesChecker) createDefaultAffinity() *core.Affinity {

resourceName := r.kubegresContext.Kubegres.Name

weightedPodAffinityTerm := core.WeightedPodAffinityTerm{
Expand All @@ -113,7 +112,7 @@ func (r *UndefinedSpecValuesChecker) createDefaultAffinity() *core.Affinity {
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "app",
Key: ctx.NameLabelKey,
Operator: metav1.LabelSelectorOpIn,
Values: []string{resourceName},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (r *ServicesCountSpecEnforcer) isPrimaryDbReady() bool {
}

func (r *ServicesCountSpecEnforcer) isThereReadyReplica() bool {
return r.resourcesStates.StatefulSets.Replicas.NbreReady > 0
return r.resourcesStates.StatefulSets.Replicas.NumberReady > 0
}

func (r *ServicesCountSpecEnforcer) deployPrimaryService() error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ func CreateStatefulSetCountSpecEnforcer(primaryDbCountSpecEnforcer statefulset.P
}

func (r *StatefulSetCountSpecEnforcer) EnforceSpec() error {

if err := r.enforcePrimaryDbInstance(); err != nil {
return err
}
Expand Down
Loading