Skip to content

Commit

Permalink
Add --raw mode to ./etcd-dump-log
Browse files Browse the repository at this point in the history
This mode allows to look at RAW protos for all entries in WAL logs in the given directory.

Signed-off-by: Piotr Tabor <[email protected]>
  • Loading branch information
ptabor committed Dec 30, 2022
1 parent 58681d3 commit e571fb7
Show file tree
Hide file tree
Showing 6 changed files with 244 additions and 52 deletions.
11 changes: 8 additions & 3 deletions server/storage/wal/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,22 @@ type decoder struct {
continueOnCrcError bool
}

func NewDecoder(r ...fileutil.FileReader) Decoder {
func NewDecoderAdvanced(continueOnCrcError bool, r ...fileutil.FileReader) Decoder {
readers := make([]*fileutil.FileBufReader, len(r))
for i := range r {
readers[i] = fileutil.NewFileBufReader(r[i])
}
return &decoder{
brs: readers,
crc: crc.New(0, crcTable),
brs: readers,
crc: crc.New(0, crcTable),
continueOnCrcError: continueOnCrcError,
}
}

func NewDecoder(r ...fileutil.FileReader) Decoder {
return NewDecoderAdvanced(false, r...)
}

// Decode reads the next record out of the file.
// In the success path, fills 'rec' and returns nil.
// When it fails, it returns err and usually resets 'rec' to the defaults.
Expand Down
4 changes: 2 additions & 2 deletions server/storage/wal/repair_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ package wal

import (
"fmt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"io"
"os"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"

"go.etcd.io/etcd/server/v3/storage/wal/walpb"
Expand Down
68 changes: 36 additions & 32 deletions tools/etcd-dump-logs/etcd-dump-log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,38 +52,7 @@ func TestEtcdDumpLogEntryType(t *testing.T) {

p := t.TempDir()

memberdir := filepath.Join(p, "member")
err = os.Mkdir(memberdir, 0744)
if err != nil {
t.Fatal(err)
}
waldir := walDir(p)
snapdir := snapDir(p)

w, err := wal.Create(zaptest.NewLogger(t), waldir, nil)
if err != nil {
t.Fatal(err)
}

err = os.Mkdir(snapdir, 0744)
if err != nil {
t.Fatal(err)
}

ents := make([]raftpb.Entry, 0)

// append entries into wal log
appendConfigChangeEnts(&ents)
appendNormalRequestEnts(&ents)
appendNormalIRREnts(&ents)
appendUnknownNormalEnts(&ents)

// force commit newly appended entries
err = w.Save(raftpb.HardState{}, ents)
if err != nil {
t.Fatal(err)
}
w.Close()
mustCreateWalLog(t, p)

argtests := []struct {
name string
Expand Down Expand Up @@ -128,6 +97,41 @@ func TestEtcdDumpLogEntryType(t *testing.T) {

}

func mustCreateWalLog(t *testing.T, path string) {
memberdir := filepath.Join(path, "member")
err := os.Mkdir(memberdir, 0744)
if err != nil {
t.Fatal(err)
}
waldir := walDir(path)
snapdir := snapDir(path)

w, err := wal.Create(zaptest.NewLogger(t), waldir, nil)
if err != nil {
t.Fatal(err)
}

err = os.Mkdir(snapdir, 0744)
if err != nil {
t.Fatal(err)
}

ents := make([]raftpb.Entry, 0)

// append entries into wal log
appendConfigChangeEnts(&ents)
appendNormalRequestEnts(&ents)
appendNormalIRREnts(&ents)
appendUnknownNormalEnts(&ents)

// force commit newly appended entries
err = w.Save(raftpb.HardState{}, ents)
if err != nil {
t.Fatal(err)
}
w.Close()
}

func appendConfigChangeEnts(ents *[]raftpb.Entry) {
configChangeData := []raftpb.ConfChange{
{ID: 1, Type: raftpb.ConfChangeAddNode, NodeID: 2, Context: []byte("")},
Expand Down
51 changes: 36 additions & 15 deletions tools/etcd-dump-logs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,10 @@ IRRCompaction, IRRLeaseGrant, IRRLeaseRevoke, IRRLeaseCheckpoint`)
streamdecoder := flag.String("stream-decoder", "", `The name of an executable decoding tool, the executable must process
hex encoded lines of binary input (from etcd-dump-logs)
and output a hex encoded line of binary for each input line`)
raw := flag.Bool("raw", false, "Read the logs in the low-level form")

flag.Parse()
lg := zap.NewExample()

if len(flag.Args()) != 1 {
log.Fatalf("Must provide data-dir argument (got %+v)", flag.Args())
Expand All @@ -68,6 +70,37 @@ and output a hex encoded line of binary for each input line`)
log.Fatal("start-snap and start-index flags cannot be used together.")
}

if !*raw {
ents := readUsingReadAll(lg, index, snapfile, dataDir, waldir)

fmt.Printf("WAL entries: %d\n", len(ents))
if len(ents) > 0 {
fmt.Printf("lastIndex=%d\n", ents[len(ents)-1].Index)
}

fmt.Printf("%4s\t%10s\ttype\tdata", "term", "index")
if *streamdecoder != "" {
fmt.Print("\tdecoder_status\tdecoded_data")
}
fmt.Println()

listEntriesType(*entrytype, *streamdecoder, ents)
} else {
if *snapfile != "" ||
*entrytype != defaultEntryTypes ||
*streamdecoder != "" {
log.Fatalf("Flags --entry-type, --stream-decoder, --entrytype not supported in the RAW mode.")
}

wd := *waldir
if wd == "" {
wd = walDir(dataDir)
}
readRaw(lg, index, wd, os.Stdout)
}
}

func readUsingReadAll(lg *zap.Logger, index *uint64, snapfile *string, dataDir string, waldir *string) []raftpb.Entry {
var (
walsnap walpb.Snapshot
snapshot *raftpb.Snapshot
Expand All @@ -84,7 +117,7 @@ and output a hex encoded line of binary for each input line`)
ss := snap.New(zap.NewExample(), snapDir(dataDir))
snapshot, err = ss.Load()
} else {
snapshot, err = snap.Read(zap.NewExample(), filepath.Join(snapDir(dataDir), *snapfile))
snapshot, err = snap.Read(lg, filepath.Join(snapDir(dataDir), *snapfile))
}

switch err {
Expand Down Expand Up @@ -123,19 +156,7 @@ and output a hex encoded line of binary for each input line`)
vid := types.ID(state.Vote)
fmt.Printf("WAL metadata:\nnodeID=%s clusterID=%s term=%d commitIndex=%d vote=%s\n",
id, cid, state.Term, state.Commit, vid)

fmt.Printf("WAL entries: %d\n", len(ents))
if len(ents) > 0 {
fmt.Printf("lastIndex=%d\n", ents[len(ents)-1].Index)
}

fmt.Printf("%4s\t%10s\ttype\tdata", "term", "index")
if *streamdecoder != "" {
fmt.Print("\tdecoder_status\tdecoded_data")
}
fmt.Println()

listEntriesType(*entrytype, *streamdecoder, ents)
return ents
}

func walDir(dataDir string) string { return filepath.Join(dataDir, "member", "wal") }
Expand Down Expand Up @@ -360,7 +381,7 @@ func listEntriesType(entrytype string, streamdecoder string, ents []raftpb.Entry
printer(e)
if streamdecoder == "" {
fmt.Println()
continue
//continue
}

// if decoder is set, pass the e.Data to stdin and read the stdout from decoder
Expand Down
107 changes: 107 additions & 0 deletions tools/etcd-dump-logs/raw.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright 2022 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"

"go.uber.org/zap"

"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/client/pkg/v3/fileutil"
"go.etcd.io/etcd/pkg/v3/pbutil"
"go.etcd.io/etcd/server/v3/storage/wal"
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
"go.etcd.io/raft/v3/raftpb"
)

func readRaw(lg *zap.Logger, fromIndex *uint64, waldir string, out io.Writer) {
var walReaders []fileutil.FileReader
files, err := ioutil.ReadDir(waldir)
if err != nil {
lg.Fatal("Failed to read directory.", zap.String("directory", waldir), zap.Error(err))
}
for _, finfo := range files {
if filepath.Ext(finfo.Name()) != ".wal" {
lg.Warn("Ignoring not .wal file", zap.String("filename", finfo.Name()))
}
f, err := os.Open(filepath.Join(waldir, finfo.Name()))
if err != nil {
lg.Fatal("Failed to read file", zap.String("filename", finfo.Name()), zap.Error(err))
}
walReaders = append(walReaders, fileutil.NewFileReader(f))
}
decoder := wal.NewDecoderAdvanced(true, walReaders...)
// The variable is used to not pollute log with multiple continuous crc errors.
crcDesync := false
for {
rec := walpb.Record{}
err := decoder.Decode(&rec)
if err == nil || errors.Is(err, walpb.ErrCRCMismatch) {
if err != nil && !crcDesync {
lg.Warn("Reading entry failed with CRC error", zap.Error(err))
crcDesync = true
}
printRec(lg, &rec, fromIndex, out)
if rec.Type == wal.CrcType {
decoder.UpdateCRC(rec.Crc)
crcDesync = false
}
continue
}
if errors.Is(err, io.EOF) {
lg.Info("EOF: All entries were processed")
break
} else {
lg.Error("Reading failed", zap.Error(err))
break
}
}
}

func printRec(lg *zap.Logger, rec *walpb.Record, fromIndex *uint64, out io.Writer) {
switch rec.Type {
case wal.MetadataType:
var metadata etcdserverpb.Metadata
pbutil.MustUnmarshal(&metadata, rec.Data)
fmt.Fprintf(out, "Metadata: %s\n", metadata.String())
case wal.CrcType:
fmt.Fprintf(out, "CRC: %d\n", rec.Crc)
case wal.EntryType:
e := wal.MustUnmarshalEntry(rec.Data)
if fromIndex == nil || e.Index >= *fromIndex {
fmt.Fprintf(out, "Entry: %s\n", e.String())
}
case wal.SnapshotType:
var snap walpb.Snapshot
pbutil.MustUnmarshal(&snap, rec.Data)
if fromIndex == nil || snap.Index >= *fromIndex {
fmt.Fprintf(out, "Snapshot: %s\n", snap.String())
}
case wal.StateType:
var state raftpb.HardState
pbutil.MustUnmarshal(&state, rec.Data)
if fromIndex == nil || state.Commit >= *fromIndex {
fmt.Fprintf(out, "HardState: %s\n", state.String())
}
default:
lg.Error("Unexpected WAL log type", zap.Int64("type", rec.Type))
}
}
55 changes: 55 additions & 0 deletions tools/etcd-dump-logs/raw_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package main

import (
"bytes"
"testing"

"github.com/stretchr/testify/assert"
"go.uber.org/zap/zaptest"
)

func Test_readRaw(t *testing.T) {
path := t.TempDir()
mustCreateWalLog(t, path)
var out bytes.Buffer
readRaw(zaptest.NewLogger(t), nil, walDir(path), &out)
assert.Equal(t,
`CRC: 0
Metadata:
Snapshot:
Entry: Term:1 Index:1 Type:EntryConfChange Data:"\010\001\020\000\030\002\"\000"
Entry: Term:2 Index:2 Type:EntryConfChange Data:"\010\002\020\001\030\002\"\000"
Entry: Term:2 Index:3 Type:EntryConfChange Data:"\010\003\020\002\030\002\"\000"
Entry: Term:2 Index:4 Type:EntryConfChange Data:"\010\004\020\003\030\003\"\000"
Entry: Term:3 Index:5 Data:"\010\000\022\000\032\006/path0\"\030{\"hey\":\"ho\",\"hi\":[\"yo\"]}(\0012\0008\000@\000H\tP\000X\001`+"`"+`\000h\000p\000x\001\200\001\000\210\001\000"
Entry: Term:3 Index:6 Data:"\010\001\022\004QGET\032\006/path1\"\023{\"0\":\"1\",\"2\":[\"3\"]}(\0002\0008\000@\000H\tP\000X\001`+"`"+`\000h\000p\000x\001\200\001\000\210\001\000"
Entry: Term:3 Index:7 Data:"\010\002\022\004SYNC\032\006/path2\"\023{\"0\":\"1\",\"2\":[\"3\"]}(\0002\0008\000@\000H\002P\000X\001`+"`"+`\000h\000p\000x\001\200\001\000\210\001\000"
Entry: Term:3 Index:8 Data:"\010\003\022\006DELETE\032\006/path3\"\030{\"hey\":\"ho\",\"hi\":[\"yo\"]}(\0002\0008\000@\001H\002P\000X\001`+"`"+`\000h\000p\000x\001\200\001\000\210\001\000"
Entry: Term:3 Index:9 Data:"\010\004\022\006RANDOM\032\246\001/path4/superlong/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path\"\030{\"hey\":\"ho\",\"hi\":[\"yo\"]}(\0002\0008\000@\000H\002P\000X\001`+"`"+`\000h\000p\000x\001\200\001\000\210\001\000"
Entry: Term:4 Index:10 Data:"\010\005\032\025\n\0011\022\002hi\030\006 \001(\001X\240\234\001h\240\234\001"
Entry: Term:5 Index:11 Data:"\010\006\"\020\n\004foo1\022\004bar1\030\0010\001"
Entry: Term:6 Index:12 Data:"\010\007*\010\n\0010\022\0019\030\001"
Entry: Term:7 Index:13 Data:"\010\0102\024\022\010\032\006\n\001a\022\001b\032\010\032\006\n\001a\022\001b"
Entry: Term:8 Index:14 Data:"\010\t:\002\020\001"
Entry: Term:9 Index:15 Data:"\010\nB\004\010\001\020\001"
Entry: Term:10 Index:16 Data:"\010\013J\002\010\002"
Entry: Term:11 Index:17 Data:"\010\014R\006\010\003\020\004\030\005"
Entry: Term:12 Index:18 Data:"\010\r\302>\000"
Entry: Term:13 Index:19 Data:"\010\016\232?\000"
Entry: Term:14 Index:20 Data:"\010\017\242?\031\n\006myname\022\010password\032\005token"
Entry: Term:15 Index:21 Data:"\010\020\342D\020\n\005name1\022\005pass1\032\000"
Entry: Term:16 Index:22 Data:"\010\021\352D\007\n\005name1"
Entry: Term:17 Index:23 Data:"\010\022\362D\007\n\005name1"
Entry: Term:18 Index:24 Data:"\010\023\372D\016\n\005name1\022\005pass2"
Entry: Term:19 Index:25 Data:"\010\024\202E\016\n\005user1\022\005role1"
Entry: Term:20 Index:26 Data:"\010\025\212E\016\n\005user2\022\005role2"
Entry: Term:21 Index:27 Data:"\010\026\222E\000"
Entry: Term:22 Index:28 Data:"\010\027\232E\000"
Entry: Term:23 Index:29 Data:"\010\030\202K\007\n\005role2"
Entry: Term:24 Index:30 Data:"\010\031\212K\007\n\005role1"
Entry: Term:25 Index:31 Data:"\010\032\222K\007\n\005role3"
Entry: Term:26 Index:32 Data:"\010\033\232K\033\n\005role3\022\022\010\001\022\004Keys\032\010RangeEnd"
Entry: Term:27 Index:33 Data:"\010\034\242K\026\n\005role3\022\003key\032\010rangeend"
Entry: Term:27 Index:34 Data:"?"
`, out.String())
}

0 comments on commit e571fb7

Please sign in to comment.