From ad2a41c4a4f20ce4f37e8b098e493cde69eeb68b Mon Sep 17 00:00:00 2001 From: Alonexc <91315508+Alonexc@users.noreply.github.com> Date: Thu, 18 Jan 2024 20:32:40 +0800 Subject: [PATCH] [ISSUE #4694] Component initialization order adjustment, add resource constraints. (#4693) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * resources describes the compute resource requirements and limits 、component initialization order rules and DNS. * add license and update runtime resources * update runtime_controller.go * update * deployment operator, commands simplified. * update * update * update * update --- eventmesh-operator/api/v1/connectors_types.go | 4 +++ ...entmesh-operator.eventmesh_connectors.yaml | 6 ++++ .../eventmesh_v1_connectors_rocketmq.yaml | 10 +++++++ .../config/samples/eventmesh_v1_runtime.yaml | 23 +++++++++++++- .../connectors_controller.go | 15 +++++++++- .../eventmesh_runtime/runtime_controller.go | 22 ++++++++++++-- eventmesh-operator/share/share.go | 30 +++++++++++++++++++ 7 files changed, 106 insertions(+), 4 deletions(-) create mode 100644 eventmesh-operator/share/share.go diff --git a/eventmesh-operator/api/v1/connectors_types.go b/eventmesh-operator/api/v1/connectors_types.go index a15af4c851..0e6f9026fa 100644 --- a/eventmesh-operator/api/v1/connectors_types.go +++ b/eventmesh-operator/api/v1/connectors_types.go @@ -35,6 +35,10 @@ type ConnectorsSpec struct { Size int `json:"size"` // ConnectorContainers define some configuration ConnectorContainers []corev1.Container `json:"connectorContainers"` + // HostNetwork can be true or false + HostNetwork bool `json:"hostNetwork,omitempty"` + // DNSPolicy Set DNS policy for the pod. + DNSPolicy corev1.DNSPolicy `json:"dnsPolicy,omitempty"` // Volumes define the connector config Volumes []corev1.Volume `json:"volumes"` // ImagePullPolicy defines how the image is pulled diff --git a/eventmesh-operator/config/crd/bases/eventmesh-operator.eventmesh_connectors.yaml b/eventmesh-operator/config/crd/bases/eventmesh-operator.eventmesh_connectors.yaml index 863733b301..c62dbd8c56 100644 --- a/eventmesh-operator/config/crd/bases/eventmesh-operator.eventmesh_connectors.yaml +++ b/eventmesh-operator/config/crd/bases/eventmesh-operator.eventmesh_connectors.yaml @@ -2266,6 +2266,12 @@ spec: type: string type: object type: object + dnsPolicy: + description: DNSPolicy Set DNS policy for the pod. + type: string + hostNetwork: + description: HostNetwork can be true or false + type: boolean imagePullPolicy: description: ImagePullPolicy defines how the image is pulled type: string diff --git a/eventmesh-operator/config/samples/eventmesh_v1_connectors_rocketmq.yaml b/eventmesh-operator/config/samples/eventmesh_v1_connectors_rocketmq.yaml index 6ef8724eb8..96cff08456 100644 --- a/eventmesh-operator/config/samples/eventmesh_v1_connectors_rocketmq.yaml +++ b/eventmesh-operator/config/samples/eventmesh_v1_connectors_rocketmq.yaml @@ -72,6 +72,8 @@ spec: size: 1 # imagePullPolicy is the image pull policy imagePullPolicy: Always + # set DNS policy for the pod + # dnsPolicy: ClusterFirstWithHostNet # define the connector-rocketmq container. connectorContainers: - name: connector-rocketmq @@ -81,6 +83,14 @@ spec: volumeMounts: - mountPath: "/data/app/eventmesh-connector-rocketmq/conf" name: connector-rocketmq-config + # resources describes the compute resource requirements and limits + resources: + requests: + memory: 512Mi + cpu: 250m + limits: + memory: 1024Mi + cpu: 1000m # configuration file settings to be mounted. volumes: - name: connector-rocketmq-config diff --git a/eventmesh-operator/config/samples/eventmesh_v1_runtime.yaml b/eventmesh-operator/config/samples/eventmesh_v1_runtime.yaml index 9d675a634e..ee94d606fb 100644 --- a/eventmesh-operator/config/samples/eventmesh_v1_runtime.yaml +++ b/eventmesh-operator/config/samples/eventmesh_v1_runtime.yaml @@ -23,6 +23,10 @@ data: rocketmq.properties: | #######################rocketmq-client################## eventMesh.server.rocketmq.namesrvAddr=127.0.0.1:9876;127.0.0.1:9876 + eventMesh.server.rocketmq.cluster=DefaultCluster + eventMesh.server.rocketmq.accessKey=******** + eventMesh.server.rocketmq.secretKey=******** + eventmesh.properties: | ###############################EVNETMESH-runtime ENV################################# eventMesh.server.idc=DEFAULT @@ -254,7 +258,16 @@ spec: runtimePodTemplate: template: spec: - # hostNetwork: true + # Host networking requested for this pod. Use the host's network namespace. + # If this option is set, the ports that will be used must be specified. + # Default to false. + hostNetwork: false + # Defaults to "ClusterFirst" + # Valid values are 'ClusterFirstWithHostNet', 'ClusterFirst', 'Default' or 'None'. + # DNS parameters given in DNSConfig will be merged with the policy selected with DNSPolicy. + # To have DNS options set along with hostNetwork, you have to specify DNS policy + # explicitly to 'ClusterFirstWithHostNet'. + dnsPolicy: ClusterFirstWithHostNet # define the runtime container. containers: - name: eventmesh-runtime @@ -271,6 +284,14 @@ spec: name: runtime-config # imagePullPolicy is the image pull policy imagePullPolicy: Always + # resources describes the compute resource requirements and limits + resources: + requests: + memory: 2048Mi + cpu: 2000m + limits: + memory: 8192Mi + cpu: 8000m # configuration file settings to be mounted. volumes: - name: runtime-config diff --git a/eventmesh-operator/controllers/eventmesh_connectors/connectors_controller.go b/eventmesh-operator/controllers/eventmesh_connectors/connectors_controller.go index d554cbefe7..dd5850cef3 100644 --- a/eventmesh-operator/controllers/eventmesh_connectors/connectors_controller.go +++ b/eventmesh-operator/controllers/eventmesh_connectors/connectors_controller.go @@ -21,6 +21,7 @@ import ( "context" "fmt" eventmeshoperatorv1 "github.com/apache/eventmesh/eventmesh-operator/api/v1" + "github.com/apache/eventmesh/eventmesh-operator/share" "github.com/go-logr/logr" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -123,6 +124,15 @@ func (r ConnectorsReconciler) Reconcile(ctx context.Context, req reconcile.Reque return reconcile.Result{}, err } + for { + if share.IsEventMeshRuntimeInitialized { + break + } else { + r.Logger.Info("connector Waiting for runtime ready...") + time.Sleep(time.Duration(share.WaitForRuntimePodNameReadyInSecond) * time.Second) + } + } + connectorStatefulSet := r.getConnectorStatefulSet(connector) // Check if the statefulSet already exists, if not create a new one found := &appsv1.StatefulSet{} @@ -192,7 +202,7 @@ func (r ConnectorsReconciler) Reconcile(ctx context.Context, req reconcile.Reque } r.Logger.Info("Successful reconciliation!") - return reconcile.Result{}, nil + return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(share.RequeueAfterSecond) * time.Second}, nil } func (r ConnectorsReconciler) getConnectorStatefulSet(connector *eventmeshoperatorv1.Connectors) *appsv1.StatefulSet { @@ -217,6 +227,8 @@ func (r ConnectorsReconciler) getConnectorStatefulSet(connector *eventmeshoperat Labels: getLabels(), }, Spec: corev1.PodSpec{ + HostNetwork: connector.Spec.HostNetwork, + DNSPolicy: connector.Spec.DNSPolicy, ServiceAccountName: connector.Spec.ServiceAccountName, Affinity: connector.Spec.Affinity, Tolerations: connector.Spec.Tolerations, @@ -224,6 +236,7 @@ func (r ConnectorsReconciler) getConnectorStatefulSet(connector *eventmeshoperat PriorityClassName: connector.Spec.PriorityClassName, ImagePullSecrets: connector.Spec.ImagePullSecrets, Containers: []corev1.Container{{ + Resources: connector.Spec.ConnectorContainers[0].Resources, Image: connector.Spec.ConnectorContainers[0].Image, Name: connector.Spec.ConnectorContainers[0].Name, SecurityContext: getConnectorContainerSecurityContext(connector), diff --git a/eventmesh-operator/controllers/eventmesh_runtime/runtime_controller.go b/eventmesh-operator/controllers/eventmesh_runtime/runtime_controller.go index 731ef8c55b..7bff28c364 100644 --- a/eventmesh-operator/controllers/eventmesh_runtime/runtime_controller.go +++ b/eventmesh-operator/controllers/eventmesh_runtime/runtime_controller.go @@ -21,6 +21,7 @@ import ( "context" "fmt" eventmeshoperatorv1 "github.com/apache/eventmesh/eventmesh-operator/api/v1" + "github.com/apache/eventmesh/eventmesh-operator/share" "github.com/go-logr/logr" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -232,8 +233,24 @@ func (r *RuntimeReconciler) Reconcile(ctx context.Context, req reconcile.Request r.Logger.Error(err, "Not found eventmesh runtime pods") } + runningEventMeshRuntimeNum := getRunningRuntimeNum(podList.Items) + if runningEventMeshRuntimeNum == eventMeshRuntime.Spec.Size { + share.IsEventMeshRuntimeInitialized = true + } + r.Logger.Info("Successful reconciliation!") - return reconcile.Result{}, nil + + return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(share.RequeueAfterSecond) * time.Second}, nil +} + +func getRunningRuntimeNum(pods []corev1.Pod) int { + var num = 0 + for _, pod := range pods { + if reflect.DeepEqual(pod.Status.Phase, corev1.PodRunning) { + num++ + } + } + return num } func getRuntimePodNames(pods []corev1.Pod) []string { @@ -274,13 +291,14 @@ func (r *RuntimeReconciler) getEventMeshRuntimeStatefulSet(runtime *eventmeshope Labels: label, }, Spec: corev1.PodSpec{ - DNSPolicy: corev1.DNSClusterFirstWithHostNet, + DNSPolicy: runtime.Spec.RuntimePodTemplate.Template.Spec.DNSPolicy, Affinity: runtime.Spec.RuntimePodTemplate.Template.Spec.Affinity, Tolerations: runtime.Spec.RuntimePodTemplate.Template.Spec.Tolerations, NodeSelector: runtime.Spec.RuntimePodTemplate.Template.Spec.NodeSelector, PriorityClassName: runtime.Spec.RuntimePodTemplate.Template.Spec.PriorityClassName, HostNetwork: runtime.Spec.RuntimePodTemplate.Template.Spec.HostNetwork, Containers: []corev1.Container{{ + Resources: runtime.Spec.RuntimePodTemplate.Template.Spec.Containers[0].Resources, Image: runtime.Spec.RuntimePodTemplate.Template.Spec.Containers[0].Image, Name: runtime.Spec.RuntimePodTemplate.Template.Spec.Containers[0].Name, SecurityContext: getContainerSecurityContext(runtime), diff --git a/eventmesh-operator/share/share.go b/eventmesh-operator/share/share.go new file mode 100644 index 0000000000..1027bba580 --- /dev/null +++ b/eventmesh-operator/share/share.go @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package share + +var ( + // IsEventMeshRuntimeInitialized is whether the runtime list is initialized + IsEventMeshRuntimeInitialized = false +) + +const ( + // WaitForRuntimePodNameReadyInSecond is the time connector sleep for waiting runtime ready in second + WaitForRuntimePodNameReadyInSecond = 1 + // RequeueAfterSecond is a universal interval of the reconcile function + RequeueAfterSecond = 6 +)