Skip to content

Commit

Permalink
[ISSUE #4694] Component initialization order adjustment, add resource…
Browse files Browse the repository at this point in the history
… constraints. (#4693)

* resources describes the compute resource requirements and limits 、component initialization order rules and DNS.

* add license and update runtime resources

* update runtime_controller.go

* update

* deployment operator, commands simplified.

* update

* update

* update

* update
  • Loading branch information
Alonexc authored Jan 18, 2024
1 parent e837f34 commit ad2a41c
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 4 deletions.
4 changes: 4 additions & 0 deletions eventmesh-operator/api/v1/connectors_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ type ConnectorsSpec struct {
Size int `json:"size"`
// ConnectorContainers define some configuration
ConnectorContainers []corev1.Container `json:"connectorContainers"`
// HostNetwork can be true or false
HostNetwork bool `json:"hostNetwork,omitempty"`
// DNSPolicy Set DNS policy for the pod.
DNSPolicy corev1.DNSPolicy `json:"dnsPolicy,omitempty"`
// Volumes define the connector config
Volumes []corev1.Volume `json:"volumes"`
// ImagePullPolicy defines how the image is pulled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2266,6 +2266,12 @@ spec:
type: string
type: object
type: object
dnsPolicy:
description: DNSPolicy Set DNS policy for the pod.
type: string
hostNetwork:
description: HostNetwork can be true or false
type: boolean
imagePullPolicy:
description: ImagePullPolicy defines how the image is pulled
type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ spec:
size: 1
# imagePullPolicy is the image pull policy
imagePullPolicy: Always
# set DNS policy for the pod
# dnsPolicy: ClusterFirstWithHostNet
# define the connector-rocketmq container.
connectorContainers:
- name: connector-rocketmq
Expand All @@ -81,6 +83,14 @@ spec:
volumeMounts:
- mountPath: "/data/app/eventmesh-connector-rocketmq/conf"
name: connector-rocketmq-config
# resources describes the compute resource requirements and limits
resources:
requests:
memory: 512Mi
cpu: 250m
limits:
memory: 1024Mi
cpu: 1000m
# configuration file settings to be mounted.
volumes:
- name: connector-rocketmq-config
Expand Down
23 changes: 22 additions & 1 deletion eventmesh-operator/config/samples/eventmesh_v1_runtime.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ data:
rocketmq.properties: |
#######################rocketmq-client##################
eventMesh.server.rocketmq.namesrvAddr=127.0.0.1:9876;127.0.0.1:9876
eventMesh.server.rocketmq.cluster=DefaultCluster
eventMesh.server.rocketmq.accessKey=********
eventMesh.server.rocketmq.secretKey=********
eventmesh.properties: |
###############################EVNETMESH-runtime ENV#################################
eventMesh.server.idc=DEFAULT
Expand Down Expand Up @@ -254,7 +258,16 @@ spec:
runtimePodTemplate:
template:
spec:
# hostNetwork: true
# Host networking requested for this pod. Use the host's network namespace.
# If this option is set, the ports that will be used must be specified.
# Default to false.
hostNetwork: false
# Defaults to "ClusterFirst"
# Valid values are 'ClusterFirstWithHostNet', 'ClusterFirst', 'Default' or 'None'.
# DNS parameters given in DNSConfig will be merged with the policy selected with DNSPolicy.
# To have DNS options set along with hostNetwork, you have to specify DNS policy
# explicitly to 'ClusterFirstWithHostNet'.
dnsPolicy: ClusterFirstWithHostNet
# define the runtime container.
containers:
- name: eventmesh-runtime
Expand All @@ -271,6 +284,14 @@ spec:
name: runtime-config
# imagePullPolicy is the image pull policy
imagePullPolicy: Always
# resources describes the compute resource requirements and limits
resources:
requests:
memory: 2048Mi
cpu: 2000m
limits:
memory: 8192Mi
cpu: 8000m
# configuration file settings to be mounted.
volumes:
- name: runtime-config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"fmt"
eventmeshoperatorv1 "github.com/apache/eventmesh/eventmesh-operator/api/v1"
"github.com/apache/eventmesh/eventmesh-operator/share"
"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -123,6 +124,15 @@ func (r ConnectorsReconciler) Reconcile(ctx context.Context, req reconcile.Reque
return reconcile.Result{}, err
}

for {
if share.IsEventMeshRuntimeInitialized {
break
} else {
r.Logger.Info("connector Waiting for runtime ready...")
time.Sleep(time.Duration(share.WaitForRuntimePodNameReadyInSecond) * time.Second)
}
}

connectorStatefulSet := r.getConnectorStatefulSet(connector)
// Check if the statefulSet already exists, if not create a new one
found := &appsv1.StatefulSet{}
Expand Down Expand Up @@ -192,7 +202,7 @@ func (r ConnectorsReconciler) Reconcile(ctx context.Context, req reconcile.Reque
}

r.Logger.Info("Successful reconciliation!")
return reconcile.Result{}, nil
return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(share.RequeueAfterSecond) * time.Second}, nil
}

func (r ConnectorsReconciler) getConnectorStatefulSet(connector *eventmeshoperatorv1.Connectors) *appsv1.StatefulSet {
Expand All @@ -217,13 +227,16 @@ func (r ConnectorsReconciler) getConnectorStatefulSet(connector *eventmeshoperat
Labels: getLabels(),
},
Spec: corev1.PodSpec{
HostNetwork: connector.Spec.HostNetwork,
DNSPolicy: connector.Spec.DNSPolicy,
ServiceAccountName: connector.Spec.ServiceAccountName,
Affinity: connector.Spec.Affinity,
Tolerations: connector.Spec.Tolerations,
NodeSelector: connector.Spec.NodeSelector,
PriorityClassName: connector.Spec.PriorityClassName,
ImagePullSecrets: connector.Spec.ImagePullSecrets,
Containers: []corev1.Container{{
Resources: connector.Spec.ConnectorContainers[0].Resources,
Image: connector.Spec.ConnectorContainers[0].Image,
Name: connector.Spec.ConnectorContainers[0].Name,
SecurityContext: getConnectorContainerSecurityContext(connector),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"fmt"
eventmeshoperatorv1 "github.com/apache/eventmesh/eventmesh-operator/api/v1"
"github.com/apache/eventmesh/eventmesh-operator/share"
"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -232,8 +233,24 @@ func (r *RuntimeReconciler) Reconcile(ctx context.Context, req reconcile.Request
r.Logger.Error(err, "Not found eventmesh runtime pods")
}

runningEventMeshRuntimeNum := getRunningRuntimeNum(podList.Items)
if runningEventMeshRuntimeNum == eventMeshRuntime.Spec.Size {
share.IsEventMeshRuntimeInitialized = true
}

r.Logger.Info("Successful reconciliation!")
return reconcile.Result{}, nil

return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(share.RequeueAfterSecond) * time.Second}, nil
}

func getRunningRuntimeNum(pods []corev1.Pod) int {
var num = 0
for _, pod := range pods {
if reflect.DeepEqual(pod.Status.Phase, corev1.PodRunning) {
num++
}
}
return num
}

func getRuntimePodNames(pods []corev1.Pod) []string {
Expand Down Expand Up @@ -274,13 +291,14 @@ func (r *RuntimeReconciler) getEventMeshRuntimeStatefulSet(runtime *eventmeshope
Labels: label,
},
Spec: corev1.PodSpec{
DNSPolicy: corev1.DNSClusterFirstWithHostNet,
DNSPolicy: runtime.Spec.RuntimePodTemplate.Template.Spec.DNSPolicy,
Affinity: runtime.Spec.RuntimePodTemplate.Template.Spec.Affinity,
Tolerations: runtime.Spec.RuntimePodTemplate.Template.Spec.Tolerations,
NodeSelector: runtime.Spec.RuntimePodTemplate.Template.Spec.NodeSelector,
PriorityClassName: runtime.Spec.RuntimePodTemplate.Template.Spec.PriorityClassName,
HostNetwork: runtime.Spec.RuntimePodTemplate.Template.Spec.HostNetwork,
Containers: []corev1.Container{{
Resources: runtime.Spec.RuntimePodTemplate.Template.Spec.Containers[0].Resources,
Image: runtime.Spec.RuntimePodTemplate.Template.Spec.Containers[0].Image,
Name: runtime.Spec.RuntimePodTemplate.Template.Spec.Containers[0].Name,
SecurityContext: getContainerSecurityContext(runtime),
Expand Down
30 changes: 30 additions & 0 deletions eventmesh-operator/share/share.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package share

var (
// IsEventMeshRuntimeInitialized is whether the runtime list is initialized
IsEventMeshRuntimeInitialized = false
)

const (
// WaitForRuntimePodNameReadyInSecond is the time connector sleep for waiting runtime ready in second
WaitForRuntimePodNameReadyInSecond = 1
// RequeueAfterSecond is a universal interval of the reconcile function
RequeueAfterSecond = 6
)

0 comments on commit ad2a41c

Please sign in to comment.