Skip to content

Commit

Permalink
adding helm chart (#2)
Browse files Browse the repository at this point in the history
* adding helm chart

* adding enable flags for keda features

* adding stream retention setting and stream republish subject, removing creation of archive stream

* chart version to 0.1.2
  • Loading branch information
rayjanoka authored Oct 19, 2022
1 parent 86f3107 commit b08bd8e
Show file tree
Hide file tree
Showing 14 changed files with 691 additions and 48 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
ca.crt
scripts/
dist/
*.tgz
77 changes: 35 additions & 42 deletions client/jetstream.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package client

import (
"fmt"
"github.com/nats-io/nats.go"
"github.com/rs/zerolog/log"
"time"
Expand All @@ -11,8 +10,8 @@ func JetStream(
url, subject, stream, durableConsumer, streamMaxAgeDur, rootCA, username, password string,
streamReplicas, maxAckPending int,
streamMaxSize int64,
msgTimeout string,
jetStreamStreamRePublishEnabled, jetStreamProvisioningDisabled bool,
msgTimeout, streamRetention, streamRepublishSubject string,
provisioningDisabled bool,
) (*nats.Subscription, *nats.Conn) {
var connectOptions []nats.Option
if rootCA != "" {
Expand All @@ -34,15 +33,18 @@ func JetStream(
log.Fatal().Err(err).Msg("Failed to initialize JetStream context")
}

// TODO: output some information about the JetStream server
accountInfo, err := jetStream.AccountInfo()
log.Info().Uint64("memory", accountInfo.Tier.Memory).
Uint64("storage", accountInfo.Tier.Store).
Int("streams", accountInfo.Tier.Streams).
Int("consumers", accountInfo.Tier.Consumers).
Msg("JetStream account info")
if err != nil {
log.Fatal().Err(err).Msg("Failed to get JetStream account info")
}

if !jetStreamProvisioningDisabled {
if !provisioningDisabled {
// build the stream
streamMaxAge, err := time.ParseDuration(streamMaxAgeDur)
if err != nil {
log.Fatal().Err(err).Msg("Failed to parse jetstream-max-age duration argument")
}

streamMaxBytes := int64(-1)
if streamMaxSize != -1 {
streamMaxBytes = 1_000_000 * streamMaxSize // Megabytes
Expand All @@ -51,34 +53,27 @@ func JetStream(
streamConfig := &nats.StreamConfig{
Name: stream,
Subjects: []string{subject},
MaxAge: streamMaxAge,
MaxBytes: -1, // TODO: fix this
Replicas: streamReplicas, // TODO: test this
MaxBytes: streamMaxBytes,
Replicas: streamReplicas,
Retention: nats.LimitsPolicy,
}

if jetStreamStreamRePublishEnabled {
// set the main stream to delete msgs on ACK
// then use the archive stream as a backup
streamConfig.Retention = nats.InterestPolicy

// setup the msg forwarding to the archive stream
streamConfig.RePublish = &nats.RePublish{Source: subject, Destination: fmt.Sprintf("%s-archive", subject)}

// build an archive stream that will not be consumed
archiveStreamConfig := &nats.StreamConfig{
Name: fmt.Sprintf("%s-archive", stream),
Subjects: []string{fmt.Sprintf("%s-archive", subject)},
MaxAge: streamMaxAge,
MaxBytes: streamMaxBytes, // TODO: test this
Replicas: streamReplicas, // TODO: test this
Retention: nats.LimitsPolicy,
if streamMaxAgeDur != "" {
streamMaxAge, err := time.ParseDuration(streamMaxAgeDur)
if err != nil {
log.Fatal().Err(err).Msg("Failed to parse jetstream-max-age duration argument")
}
streamConfig.MaxAge = streamMaxAge
}

archiveStreamInfo := createOrUpdateStream(jetStream, fmt.Sprintf("%s-archive", stream), archiveStreamConfig)
if streamRetention == "interest" {
streamConfig.Retention = nats.InterestPolicy
} else if streamRetention == "work_queue" {
streamConfig.Retention = nats.WorkQueuePolicy
}

log.Info().Msgf("JetStream archive stream %s configured with %d replicas and limited by %s max age, and %d max bytes",
archiveStreamInfo.Config.Name, archiveStreamInfo.Config.Replicas, archiveStreamInfo.Config.MaxAge, archiveStreamInfo.Config.MaxBytes)
if streamRepublishSubject != "" {
streamConfig.RePublish = &nats.RePublish{Source: subject, Destination: streamRepublishSubject}
}

streamInfo := createOrUpdateStream(jetStream, stream, streamConfig)
Expand All @@ -93,16 +88,14 @@ func JetStream(
}

desiredConsumerConfig := &nats.ConsumerConfig{
//MaxWaiting: jetStreamMaxWaiting, // must match fetch() batch parameter
//MaxRequestExpires: 60 * time.Second, // limit max fetch() expires (pull)
//MaxRequestBatch: 10, // limit max fetch() batch size (pull)
AckPolicy: nats.AckExplicitPolicy, // always ack (default)
AckWait: ackWait, // wait before retry
DeliverPolicy: nats.DeliverNewPolicy, // deliver since consumer creation
Durable: durableConsumer, // consumer name
FilterSubject: subject, // nats subject for stream
MaxAckPending: maxAckPending, // stop offering msgs once we're waiting on too many acks
MaxDeliver: -1, // try to redeliver forever
AckPolicy: nats.AckExplicitPolicy, // always ack (default)
AckWait: ackWait, // wait before retry
DeliverPolicy: nats.DeliverNewPolicy, // deliver since consumer creation
Durable: durableConsumer, // consumer name
FilterSubject: subject, // nats subject for stream
MaxAckPending: maxAckPending, // stop offering msgs once we're waiting on too many acks
MaxDeliver: -1, // try to redeliver forever
SampleFrequency: "100", // deliver all messages as a percentage, required by tf module
}

consumerInfo, err := jetStream.ConsumerInfo(stream, durableConsumer)
Expand Down
23 changes: 23 additions & 0 deletions helm/archie/.helmignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Patterns to ignore when building packages.
# This supports shell glob matching, relative path matching, and
# negation (prefixed with !). Only one pattern per line.
.DS_Store
# Common VCS dirs
.git/
.gitignore
.bzr/
.bzrignore
.hg/
.hgignore
.svn/
# Common backup files
*.swp
*.bak
*.tmp
*.orig
*~
# Various IDEs
.project
.idea/
*.tmproj
.vscode/
16 changes: 16 additions & 0 deletions helm/archie/Chart.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
apiVersion: v2
name: archie
description: A Helm chart for archie in Kubernetes

type: application

# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.1.2

# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: "0.1.1"
70 changes: 70 additions & 0 deletions helm/archie/templates/_helpers.tpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
{{/*
Expand the name of the chart.
*/}}
{{- define "archie.name" -}}
{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" }}
{{- end }}

{{/*
Create a default fully qualified app name.
We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec).
If release name contains chart name it will be used as a full name.
*/}}
{{- define "archie.fullname" -}}
{{- if .Values.fullnameOverride }}
{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" }}
{{- else }}
{{- $name := default .Chart.Name .Values.nameOverride }}
{{- if contains $name .Release.Name }}
{{- .Release.Name | trunc 63 | trimSuffix "-" }}
{{- else }}
{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" }}
{{- end }}
{{- end }}
{{- end }}

{{/*
Create chart name and version as used by the chart label.
*/}}
{{- define "archie.chart" -}}
{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" }}
{{- end }}

{{/*
Common labels
*/}}
{{- define "archie.labels" -}}
helm.sh/chart: {{ include "archie.chart" . }}
{{ include "archie.selectorLabels" . }}
{{- if .Chart.AppVersion }}
app.kubernetes.io/version: {{ .Chart.AppVersion | quote }}
{{- end }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
{{- end }}

{{/*
Selector labels
*/}}
{{- define "archie.selectorLabels" -}}
app.kubernetes.io/name: {{ include "archie.name" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
{{- end }}

{{/*
Create the name of the service account to use
*/}}
{{- define "archie.serviceAccountName" -}}
{{- if .Values.serviceAccount.create }}
{{- default (include "archie.fullname" .) .Values.serviceAccount.name }}
{{- else }}
{{- default "default" .Values.serviceAccount.name }}
{{- end }}
{{- end }}

{{/*
Add 5 seconds to the shutdown wait time to give
some additional time before forcing termination
*/}}
{{- define "archie.terminationGracePeriodSeconds" -}}
{{- add .Values.archie.shutdownWait 5 }}
{{- end }}
92 changes: 92 additions & 0 deletions helm/archie/templates/deployment.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
{{- if .Values.archie.deployment.create -}}
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ include "archie.fullname" . }}
labels:
{{- include "archie.labels" . | nindent 4 }}
spec:
replicas: {{ .Values.replicaCount }}
selector:
matchLabels:
{{- include "archie.selectorLabels" . | nindent 6 }}
strategy:
type: RollingUpdate
rollingUpdate:
{{- with .Values.archie.rollingUpdate }}
{{- toYaml . | nindent 6 }}
{{- end }}
template:
metadata:
{{- with .Values.podAnnotations }}
annotations:
{{- toYaml . | nindent 8 }}
{{- end }}
labels:
{{- include "archie.selectorLabels" . | nindent 8 }}
spec:
{{- with .Values.imagePullSecrets }}
imagePullSecrets:
{{- toYaml . | nindent 8 }}
{{- end }}
serviceAccountName: {{ include "archie.serviceAccountName" . }}
securityContext:
{{- toYaml .Values.podSecurityContext | nindent 8 }}
containers:
- name: {{ .Chart.Name }}
securityContext:
{{- toYaml .Values.securityContext | nindent 12 }}
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
args:
{{- with .Values.image.args }}
{{- toYaml . | nindent 12 }}
{{- end }}
envFrom:
- secretRef:
name: {{ include "archie.fullname" . }}
imagePullPolicy: {{ .Values.image.pullPolicy }}
ports:
- name: metrics
containerPort: 9999
protocol: TCP
{{- if .Values.archie.healthCheck.enabled }}
- name: http
containerPort: {{ .Values.archie.healthCheck.port }}
protocol: TCP
livenessProbe:
httpGet:
path: /live
port: http
readinessProbe:
httpGet:
path: /ready
port: http
{{- end }}
volumeMounts:
{{- with .Values.jetstream.rootCA }}
- name: {{ .secretName }}-clients-volume
mountPath: /etc/nats-cert/{{ .secretName }}
subPath: {{ .fileName }}
{{- end }}
resources:
{{- toYaml .Values.archie.resources | nindent 12 }}
terminationGracePeriodSeconds: {{ include "archie.terminationGracePeriodSeconds" . }}
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.affinity }}
affinity:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.tolerations }}
tolerations:
{{- toYaml . | nindent 8 }}
{{- end }}
volumes:
{{- with .Values.jetstream.rootCA }}
- name: {{ .secretName }}-clients-volume
secret:
secretName: {{ .secretName }}
{{- end }}
{{- end }}
Loading

0 comments on commit b08bd8e

Please sign in to comment.