Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-picks for 2.10.26-RC.1 #6462

Merged
merged 23 commits into from
Feb 6, 2025
Merged
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
708ea97
De-flake `TestJetStreamClusterGhostEphemeralsAfterRestart`
neilalexander Jan 24, 2025
4a80536
Remove the comment that durable_name is deprecated
ripienaar Jan 27, 2025
0a5b889
Improve error message when attempting to change consumer type
piotrpio Jan 26, 2025
8ea2222
Descriptive stream & consumer health errors
neilalexander Jan 27, 2025
ffb04c2
Simplify Docker nightlies, fix on-demand custom builds
neilalexander Jan 28, 2025
0df8678
Fix data rase on mset.cfg
evankanderson Jan 29, 2025
d49d546
NRG: Lockless `Leaderless` and `HadPreviousLeader`
neilalexander Jan 30, 2025
68bbb66
Expose `raftz` and `ipqueuesz` via system account
neilalexander Jan 31, 2025
d401cb6
Optimise `IntersectStree` for matching subject literals
neilalexander Feb 3, 2025
2159c13
Add filestore permutations to `BenchmarkJetStreamConsumeWithFilters`
neilalexander Feb 4, 2025
440ea35
Optimise `firstMatchingMulti` using subject tree intersection
neilalexander Jan 31, 2025
ba1b74b
Check `dmap` before calling `cacheLookup` to avoid unnecessary time c…
neilalexander Feb 4, 2025
8420b75
Increase disk I/O semaphore limit
neilalexander Feb 4, 2025
ee700b6
Avoid last load timestamp update when ranging interior deletes
neilalexander Feb 4, 2025
14ff11a
WebSocket compression fixes
neilalexander Feb 4, 2025
0619132
Update to Go 1.23.6/1.22.12
neilalexander Feb 4, 2025
ba0b93c
Add `sliceHeader` for zero-copy parsing of message headers, use for c…
neilalexander Feb 4, 2025
39d31b2
[FIXED] Desync after stream/consumer peer remove
MauriceVanVeen Feb 5, 2025
da0681c
stree: Split `Iter` into `IterFast` and `IterOrdered`
neilalexander Feb 5, 2025
17aaba2
[FIXED] Reserved resources accounting after cluster reset
MauriceVanVeen Feb 5, 2025
946b1db
[FIXED] Desync after quit during catchup
MauriceVanVeen Feb 5, 2025
299a411
[FIXED] Consumer/JS deadlock
MauriceVanVeen Feb 5, 2025
84474c6
Added LoadPrevMsg to optimize when we walk backwards looking for sour…
derekcollison Feb 5, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 8 additions & 31 deletions .github/actions/nightly-release/action.yaml
Original file line number Diff line number Diff line change
@@ -2,14 +2,6 @@ name: Nightly Docker Releaser
description: Builds nightly docker images

inputs:
go:
description: The version of go to build with
required: true

label:
description: The label to use for built images
required: true

hub_username:
description: Docker hub username
required: true
@@ -25,35 +17,20 @@ inputs:
runs:
using: composite
steps:
- name: Log in to Docker Hub
shell: bash
run: docker login -u "${{ inputs.hub_username }}" -p "${{ inputs.hub_password }}"

- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: "${{ inputs.go }}"
go-version: "stable"

- name: goreleaser
- name: Build and push Docker images
# Use commit hash here to avoid a re-tagging attack, as this is a third-party action
# Commit 5742e2a039330cbb23ebf35f046f814d4c6ff811 = tag v5
uses: goreleaser/goreleaser-action@5742e2a039330cbb23ebf35f046f814d4c6ff811
with:
workdir: "${{ inputs.workdir }}"
version: latest
args: release --snapshot --config .goreleaser-nightly.yml

- name: images
shell: bash
run: docker images

- name: docker_login
shell: bash
run: docker login -u "${{ inputs.hub_username }}" -p "${{ inputs.hub_password }}"

- name: docker_push
shell: bash
run: |
NDATE=$(date +%Y%m%d)

docker tag synadia/nats-server:nightly-${NDATE} synadia/nats-server:${{ inputs.label }}-${NDATE}
docker tag synadia/nats-server:nightly-${NDATE} synadia/nats-server:${{ inputs.label }}

docker push synadia/nats-server:${{ inputs.label }}-${NDATE}
docker push synadia/nats-server:${{ inputs.label }}
version: ~> v2
args: release --skip=announce,validate --config .goreleaser-nightly.yml
6 changes: 3 additions & 3 deletions .github/workflows/nightly.yaml
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@ on:
workflow_dispatch:
inputs:
target:
description: "Override image branch (optional)"
description: "Override source branch (optional)"
type: string
required: false

@@ -19,11 +19,11 @@ jobs:
with:
path: src/github.com/nats-io/nats-server
ref: ${{ inputs.target || 'main' }}
fetch-depth: 0
fetch-tags: true

- uses: ./src/github.com/nats-io/nats-server/.github/actions/nightly-release
with:
go: "1.21"
workdir: src/github.com/nats-io/nats-server
label: nightly
hub_username: "${{ secrets.DOCKER_USERNAME }}"
hub_password: "${{ secrets.DOCKER_PASSWORD }}"
13 changes: 8 additions & 5 deletions .goreleaser-nightly.yml
Original file line number Diff line number Diff line change
@@ -14,16 +14,19 @@ builds:
goarch:
- amd64

release:
disable: true

dockers:
- goos: linux
goarch: amd64
skip_push: true
dockerfile: docker/Dockerfile.nightly
skip_push: false
build_flag_templates:
- '--build-arg=VERSION={{ if index .Env "IMAGE_NAME" }}{{ .Env.IMAGE_NAME }}{{ else if not (eq .Branch "main" "dev" "") }}{{ replace .Branch "/" "-" }}{{ else }}nightly{{ end }}-{{ time "20060102" }}'
- '--build-arg=VERSION={{ if ne .Branch "main" }}{{ replace .Branch "/" "-" }}{{ else }}nightly{{ end }}-{{ time "20060102" }}'
image_templates:
- synadia/nats-server:{{.Version}}
- synadia/nats-server:{{ if index .Env "IMAGE_NAME" }}{{ .Env.IMAGE_NAME }}{{ else if not (eq .Branch "main" "dev" "") }}{{ replace .Branch "/" "-" }}{{ else }}nightly{{ end }}
- synadia/nats-server:{{ if ne .Branch "main" }}{{ replace .Branch "/" "-" }}{{ else }}nightly{{ end }}
- synadia/nats-server:{{ if ne .Branch "main" }}{{ replace .Branch "/" "-" }}{{ else }}nightly{{ end }}-{{ time "20060102" }}
extra_files:
- docker/nats-server.conf

@@ -32,4 +35,4 @@ checksum:
algorithm: sha256

snapshot:
name_template: '{{ if index .Env "IMAGE_NAME" }}{{ .Env.IMAGE_NAME }}{{ else if not (eq .Branch "main" "dev" "") }}{{ replace .Branch "/" "-" }}{{ else }}nightly{{ end }}-{{ time "20060102" }}'
version_template: '{{ if ne .Branch "main" }}{{ replace .Branch "/" "-" }}{{ else }}nightly{{ end }}-{{ time "20060102" }}'
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -8,8 +8,8 @@ language: go
go:
# This should be quoted or use .x, but should not be unquoted.
# Remember that a YAML bare float drops trailing zeroes.
- "1.23.5"
- "1.22.11"
- "1.23.6"
- "1.22.12"

go_import_path: github.com/nats-io/nats-server

2 changes: 1 addition & 1 deletion docker/Dockerfile.nightly
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.21-alpine AS builder
FROM golang:alpine AS builder

ARG VERSION="nightly"

26 changes: 18 additions & 8 deletions server/client.go
Original file line number Diff line number Diff line change
@@ -4129,9 +4129,20 @@ func (c *client) setHeader(key, value string, msg []byte) []byte {
return bb.Bytes()
}

// Will return the value for the header denoted by key or nil if it does not exists.
// This function ignores errors and tries to achieve speed and no additional allocations.
// Will return a copy of the value for the header denoted by key or nil if it does not exist.
// If you know that it is safe to refer to the underlying hdr slice for the period that the
// return value is used, then sliceHeader() will be faster.
func getHeader(key string, hdr []byte) []byte {
v := sliceHeader(key, hdr)
if v == nil {
return nil
}
return append(make([]byte, 0, len(v)), v...)
}

// Will return the sliced value for the header denoted by key or nil if it does not exists.
// This function ignores errors and tries to achieve speed and no additional allocations.
func sliceHeader(key string, hdr []byte) []byte {
if len(hdr) == 0 {
return nil
}
@@ -4156,15 +4167,14 @@ func getHeader(key string, hdr []byte) []byte {
index++
}
// Collect together the rest of the value until we hit a CRLF.
var value []byte
start := index
for index < hdrLen {
if hdr[index] == '\r' && index < hdrLen-1 && hdr[index+1] == '\n' {
break
}
value = append(value, hdr[index])
index++
}
return value
return hdr[start:index:index]
}

// For bytes.HasPrefix below.
@@ -4278,7 +4288,7 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
var ci *ClientInfo
if hadPrevSi && c.pa.hdr >= 0 {
var cis ClientInfo
if err := json.Unmarshal(getHeader(ClientInfoHdr, msg[:c.pa.hdr]), &cis); err == nil {
if err := json.Unmarshal(sliceHeader(ClientInfoHdr, msg[:c.pa.hdr]), &cis); err == nil {
ci = &cis
ci.Service = acc.Name
// Check if we are moving into a share details account from a non-shared
@@ -4287,7 +4297,7 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
c.addServerAndClusterInfo(ci)
}
}
} else if c.kind != LEAF || c.pa.hdr < 0 || len(getHeader(ClientInfoHdr, msg[:c.pa.hdr])) == 0 {
} else if c.kind != LEAF || c.pa.hdr < 0 || len(sliceHeader(ClientInfoHdr, msg[:c.pa.hdr])) == 0 {
ci = c.getClientInfo(share)
// If we did not share but the imports destination is the system account add in the server and cluster info.
if !share && isSysImport {
@@ -4846,7 +4856,7 @@ func (c *client) checkLeafClientInfoHeader(msg []byte) (dmsg []byte, setHdr bool
if c.pa.hdr < 0 || len(msg) < c.pa.hdr {
return msg, false
}
cir := getHeader(ClientInfoHdr, msg[:c.pa.hdr])
cir := sliceHeader(ClientInfoHdr, msg[:c.pa.hdr])
if len(cir) == 0 {
return msg, false
}
23 changes: 23 additions & 0 deletions server/client_test.go
Original file line number Diff line number Diff line change
@@ -2964,6 +2964,29 @@ func TestRemoveHeaderIfPrefixPresent(t *testing.T) {
}
}

func TestSliceHeader(t *testing.T) {
hdr := []byte("NATS/1.0\r\n\r\n")

hdr = genHeader(hdr, "a", "1")
hdr = genHeader(hdr, JSExpectedStream, "my-stream")
hdr = genHeader(hdr, JSExpectedLastSeq, "22")
hdr = genHeader(hdr, "b", "2")
hdr = genHeader(hdr, JSExpectedLastSubjSeq, "24")
hdr = genHeader(hdr, JSExpectedLastMsgId, "1")
hdr = genHeader(hdr, "c", "3")

sliced := sliceHeader(JSExpectedLastSubjSeq, hdr)
copied := getHeader(JSExpectedLastSubjSeq, hdr)

require_NotNil(t, sliced)
require_Equal(t, cap(sliced), 2)

require_NotNil(t, copied)
require_Equal(t, cap(copied), len(copied))

require_True(t, bytes.Equal(sliced, copied))
}

func TestClientFlushOutboundNoSlowConsumer(t *testing.T) {
opts := DefaultOptions()
opts.MaxPending = 1024 * 1024 * 140 // 140MB
26 changes: 16 additions & 10 deletions server/consumer.go
Original file line number Diff line number Diff line change
@@ -60,7 +60,6 @@ type ConsumerInfo struct {
}

type ConsumerConfig struct {
// Durable is deprecated. All consumers should have names, picked by clients.
Durable string `json:"durable_name,omitempty"`
Name string `json:"name,omitempty"`
Description string `json:"description,omitempty"`
@@ -1583,6 +1582,12 @@ var (
// deleteNotActive must only be called from time.AfterFunc or in its own
// goroutine, as it can block on clean-up.
func (o *consumer) deleteNotActive() {
// Take a copy of these when the goroutine starts, mostly it avoids a
// race condition with tests that modify these consts, such as
// TestJetStreamClusterGhostEphemeralsAfterRestart.
cnaMax := consumerNotActiveMaxInterval
cnaStart := consumerNotActiveStartInterval

o.mu.Lock()
if o.mset == nil {
o.mu.Unlock()
@@ -1626,10 +1631,10 @@ func (o *consumer) deleteNotActive() {
if o.srv != nil {
qch = o.srv.quitCh
}
if o.js != nil {
cqch = o.js.clusterQuitC()
}
o.mu.Unlock()
if js != nil {
cqch = js.clusterQuitC()
}

// Useful for pprof.
setGoRoutineLabels(pprofLabels{
@@ -1663,8 +1668,8 @@ func (o *consumer) deleteNotActive() {
if ca != nil && cc != nil {
// Check to make sure we went away.
// Don't think this needs to be a monitored go routine.
jitter := time.Duration(rand.Int63n(int64(consumerNotActiveStartInterval)))
interval := consumerNotActiveStartInterval + jitter
jitter := time.Duration(rand.Int63n(int64(cnaStart)))
interval := cnaStart + jitter
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
@@ -1686,7 +1691,7 @@ func (o *consumer) deleteNotActive() {
if nca != nil && nca == ca {
s.Warnf("Consumer assignment for '%s > %s > %s' not cleaned up, retrying", acc, stream, name)
meta.ForwardProposal(removeEntry)
if interval < consumerNotActiveMaxInterval {
if interval < cnaMax {
interval *= 2
ticker.Reset(interval)
}
@@ -1859,9 +1864,6 @@ func (acc *Account) checkNewConsumerConfig(cfg, ncfg *ConsumerConfig) error {
if cfg.FlowControl != ncfg.FlowControl {
return errors.New("flow control can not be updated")
}
if cfg.MaxWaiting != ncfg.MaxWaiting {
return errors.New("max waiting can not be updated")
}

// Deliver Subject is conditional on if its bound.
if cfg.DeliverSubject != ncfg.DeliverSubject {
@@ -1876,6 +1878,10 @@ func (acc *Account) checkNewConsumerConfig(cfg, ncfg *ConsumerConfig) error {
}
}

if cfg.MaxWaiting != ncfg.MaxWaiting {
return errors.New("max waiting can not be updated")
}

// Check if BackOff is defined, MaxDeliver is within range.
if lbo := len(ncfg.BackOff); lbo > 0 && ncfg.MaxDeliver != -1 && lbo > ncfg.MaxDeliver {
return NewJSConsumerMaxDeliverBackoffError()
34 changes: 34 additions & 0 deletions server/events.go
Original file line number Diff line number Diff line change
@@ -1215,6 +1215,14 @@ func (s *Server) initEventTracking() {
optz := &ExpvarzEventOptions{}
s.zReq(c, reply, hdr, msg, &optz.EventFilterOptions, optz, func() (any, error) { return s.expvarz(optz), nil })
},
"IPQUEUESZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) {
optz := &IpqueueszEventOptions{}
s.zReq(c, reply, hdr, msg, &optz.EventFilterOptions, optz, func() (any, error) { return s.Ipqueuesz(&optz.IpqueueszOptions), nil })
},
"RAFTZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) {
optz := &RaftzEventOptions{}
s.zReq(c, reply, hdr, msg, &optz.EventFilterOptions, optz, func() (any, error) { return s.Raftz(&optz.RaftzOptions), nil })
},
}
profilez := func(_ *subscription, c *client, _ *Account, _, rply string, rmsg []byte) {
hdr, msg := c.msgParts(rmsg)
@@ -1921,6 +1929,18 @@ type ExpvarzEventOptions struct {
EventFilterOptions
}

// In the context of system events, IpqueueszEventOptions are options passed to Ipqueuesz
type IpqueueszEventOptions struct {
EventFilterOptions
IpqueueszOptions
}

// In the context of system events, RaftzEventOptions are options passed to Raftz
type RaftzEventOptions struct {
EventFilterOptions
RaftzOptions
}

// returns true if the request does NOT apply to this server and can be ignored.
// DO NOT hold the server lock when
func (s *Server) filterRequest(fOpts *EventFilterOptions) bool {
@@ -2043,6 +2063,20 @@ type ServerAPIExpvarzResponse struct {
Error *ApiError `json:"error,omitempty"`
}

// ServerAPIpqueueszResponse is the response type for ipqueuesz
type ServerAPIpqueueszResponse struct {
Server *ServerInfo `json:"server"`
Data *IpqueueszStatus `json:"data,omitempty"`
Error *ApiError `json:"error,omitempty"`
}

// ServerAPIRaftzResponse is the response type for raftz
type ServerAPIRaftzResponse struct {
Server *ServerInfo `json:"server"`
Data *RaftzStatus `json:"data,omitempty"`
Error *ApiError `json:"error,omitempty"`
}

// statszReq is a request for us to respond with current statsz.
func (s *Server) statszReq(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) {
if !s.EventsEnabled() {
2 changes: 1 addition & 1 deletion server/events_test.go
Original file line number Diff line number Diff line change
@@ -1675,7 +1675,7 @@ func TestSystemAccountWithGateways(t *testing.T) {

// If this tests fails with wrong number after 10 seconds we may have
// added a new initial subscription for the eventing system.
checkExpectedSubs(t, 58, sa)
checkExpectedSubs(t, 62, sa)

// Create a client on B and see if we receive the event
urlb := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port)
Loading