Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add redownload tool and treat EOF as recoverable error #400

Merged
merged 3 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 189 additions & 0 deletions cmd/slackdump/internal/diag/redownload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package diag

import (
"context"
"errors"
"fmt"
"log/slog"
"os"
"path/filepath"

"github.com/rusq/fsadapter"
"github.com/rusq/slack"

"github.com/rusq/slackdump/v3/cmd/slackdump/internal/bootstrap"
"github.com/rusq/slackdump/v3/cmd/slackdump/internal/cfg"
"github.com/rusq/slackdump/v3/cmd/slackdump/internal/golang/base"
"github.com/rusq/slackdump/v3/internal/chunk"
"github.com/rusq/slackdump/v3/internal/chunk/transform/fileproc"
"github.com/rusq/slackdump/v3/internal/structures"
"github.com/rusq/slackdump/v3/processor"
)

var cmdRedownload = &base.Command{
UsageLine: "tools redownload [flags] <archive_dir>",
Short: "attempts to redownload missing files from the archive",
Long: `# File redownload tool
Redownload tool scans the directory with Slackdump Archive, validating the files.
If a file is missing or has zero length, it will be redownloaded from the Slack API.
The tool will not overwrite existing files, so it is safe to run multiple times.

Please note:

1. It requires you to have a valid authentication in the selected workspace.
2. Ensure that you have selected the correct workspace using "slackdump workspace select".
3. It only works with Slackdump Archive directories, Slack exports and dumps
are not supported.`,
FlagMask: cfg.OmitAll &^ cfg.OmitAuthFlags,
Run: runRedownload,
PrintFlags: true,
RequireAuth: true,
}

func runRedownload(ctx context.Context, _ *base.Command, args []string) error {
if len(args) != 1 {
base.SetExitStatus(base.SInvalidParameters)
return errors.New("expected exactly one argument")
}
dir := args[0]
if fi, err := os.Stat(dir); err != nil {
base.SetExitStatus(base.SUserError)
return fmt.Errorf("error accessing the directory: %w", err)
} else if !fi.IsDir() {
base.SetExitStatus(base.SUserError)
return errors.New("expected a directory")
}
if fi, err := os.Stat(filepath.Join(dir, "workspace.json.gz")); err != nil {
base.SetExitStatus(base.SUserError)
return fmt.Errorf("error accessing the workspace file: %w", err)
} else if fi.IsDir() {
base.SetExitStatus(base.SUserError)
return errors.New("this does not look like an archive directory")
}

if n, err := redownload(ctx, dir); err != nil {
base.SetExitStatus(base.SApplicationError)
return err
} else {
if n == 0 {
slog.Info("no missing files found")
} else {
slog.Info("redownloaded missing files", "num_files", n)
}
}

return nil
}

func redownload(ctx context.Context, dir string) (int, error) {
cd, err := chunk.OpenDir(dir)
if err != nil {
return 0, fmt.Errorf("error opening directory: %w", err)
}
defer cd.Close()

channels, err := cd.Channels()
if err != nil {
return 0, fmt.Errorf("error reading channels: %w", err)
}
if len(channels) == 0 {
return 0, errors.New("no channels found")
}
slog.Info("directory opened", "num_channels", len(channels))

sess, err := bootstrap.SlackdumpSession(ctx)
if err != nil {
return 0, fmt.Errorf("error creating slackdump session: %w", err)
}
dl, stop := fileproc.NewDownloader(
ctx,
true,
sess.Client(),
fsadapter.NewDirectory(cd.Name()),
cfg.Log,
)
defer stop()
// we are using the same file subprocessor as the mattermost export.
fproc := fileproc.NewExport(fileproc.STmattermost, dl)

total := 0
for _, ch := range channels {
if n, err := redlChannel(ctx, fproc, cd, &ch); err != nil {
return total, err
} else {
total += n
}
}

return total, nil
}

func redlChannel(ctx context.Context, fp processor.Filer, cd *chunk.Directory, ch *slack.Channel) (int, error) {
slog.Info("processing channel", "channel", ch.ID)
f, err := cd.Open(chunk.FileID(ch.ID))
if err != nil {
return 0, fmt.Errorf("error reading messages: %w", err)
}
defer f.Close()
msgs, err := f.AllMessages(ch.ID)
if err != nil {
return 0, fmt.Errorf("error reading messages: %w", err)
}
if len(msgs) == 0 {
return 0, nil
}
slog.Info("scanning messages", "num_messages", len(msgs))
return scanMsgs(ctx, fp, cd, f, ch, msgs)
}

func scanMsgs(ctx context.Context, fp processor.Filer, cd *chunk.Directory, f *chunk.File, ch *slack.Channel, msgs []slack.Message) (int, error) {
lg := slog.With("channel", ch.ID)
total := 0
for _, m := range msgs {
if structures.IsThreadStart(&m) {
tm, err := f.AllThreadMessages(ch.ID, m.ThreadTimestamp)
if err != nil {
return 0, fmt.Errorf("error reading thread messages: %w", err)
}
lg.Info("scanning thread messages", "num_messages", len(tm), "thread", m.ThreadTimestamp)
if n, err := scanMsgs(ctx, fp, cd, f, ch, tm); err != nil {
return total, err
} else {
total += n
}
}

// collect all missing files from the message.
var missing []slack.File
for _, ff := range m.Files {
name := filepath.Join(cd.Name(), fileproc.MattermostFilepath(ch, &ff))
lg := lg.With("file", name)
lg.Debug("checking file")
if fi, err := os.Stat(name); err != nil {
if os.IsNotExist(err) {
// file does not exist
lg.Debug("missing file")
missing = append(missing, ff)
} else {
lg.Error("error accessing file", "error", err)
// some other error
return total, fmt.Errorf("error accessing file: %w", err)
}
} else if fi.Size() == 0 {
// zero length files are considered missing
lg.Debug("zero length file")
missing = append(missing, ff)
} else {
lg.Debug("file OK")
}
}
if len(missing) > 0 {
total += len(missing)
lg.Info("found missing files", "num_files", len(missing))
if err := fp.Files(ctx, ch, m, missing); err != nil {
return total, fmt.Errorf("error processing files: %w", err)
}
}
}
return total, nil
}
1 change: 1 addition & 0 deletions cmd/slackdump/internal/diag/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ Tools command contains different tools, running which may be requested if you op
// cmdSearch,
cmdThread,
cmdHydrate,
cmdRedownload,
// cmdWizDebug,
},
}
11 changes: 11 additions & 0 deletions internal/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"io"
"log/slog"
"net"
"net/http"
Expand Down Expand Up @@ -117,6 +118,16 @@ func WithRetry(ctx context.Context, lim *rate.Limiter, maxAttempts int, fn func(
ne *net.OpError // read tcp error: see #234
)
switch {
case errors.Is(cbErr, io.EOF):
// EOF is a transient error
delay := wait(attempt)
slog.WarnContext(ctx, "got EOF, sleeping", "error", cbErr, "delay", delay.String())
tracelogf(ctx, "info", "got EOF, sleeping %s (%s)", delay, cbErr)
if err := sleepCtx(ctx, delay); err != nil {
return err
}
slog.Debug("resuming after EOF")
continue
case errors.As(cbErr, &rle):
slog.InfoContext(ctx, "got rate limited, sleeping", "retry_after_sec", rle.RetryAfter, "error", cbErr)
tracelogf(ctx, "info", "got rate limited, sleeping %s (%s)", rle.RetryAfter, cbErr)
Expand Down
30 changes: 23 additions & 7 deletions internal/network/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -186,12 +187,10 @@ func TestWithRetry(t *testing.T) {
}
})
}
// setting fast wait function
t.Run("500 error handling", func(t *testing.T) {
t.Parallel()
setWaitFunc(func(attempt int) time.Duration { return 50 * time.Millisecond })
defer func() {
setWaitFunc(cubicWait)
}()
t.Cleanup(func() { setWaitFunc(cubicWait) })

codes := []int{500, 502, 503, 504, 598}
for _, code := range codes {
Expand Down Expand Up @@ -232,8 +231,6 @@ func TestWithRetry(t *testing.T) {
})
}
t.Run("404 error", func(t *testing.T) {
t.Parallel()

const (
testRetryCount = 1
)
Expand Down Expand Up @@ -265,8 +262,27 @@ func TestWithRetry(t *testing.T) {
}
})
})
t.Run("EOF error", func(t *testing.T) {
setWaitFunc(func(attempt int) time.Duration { return 50 * time.Millisecond })
t.Cleanup(func() { setWaitFunc(cubicWait) })

reterr := []error{io.EOF, io.EOF, nil}
var retries int

ctx := context.Background()
err := WithRetry(ctx, rate.NewLimiter(1, 1), 3, func() error {
err := reterr[retries]
if err != nil {
retries++
}
return err
})
assert.NoError(t, err)
assert.Equal(t, 2, retries)
})
t.Run("meaningful error message", func(t *testing.T) {
t.Parallel()
setWaitFunc(func(attempt int) time.Duration { return 50 * time.Millisecond })
t.Cleanup(func() { setWaitFunc(cubicWait) })
errFunc := func() error {
return slack.StatusCodeError{Code: 500, Status: "Internal Server Error"}
}
Expand Down
12 changes: 7 additions & 5 deletions stream/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ import (
"time"

"github.com/rusq/slack"
"golang.org/x/sync/errgroup"

"github.com/rusq/slackdump/v3/internal/network"
"github.com/rusq/slackdump/v3/internal/structures"
"github.com/rusq/slackdump/v3/processor"
"golang.org/x/sync/errgroup"
)

// SyncConversations fetches the conversations from the link which can be a
Expand Down Expand Up @@ -277,7 +278,7 @@ func (cs *Stream) thread(ctx context.Context, req request, callback func(mm []sl
func procChanMsg(ctx context.Context, proc processor.Conversations, threadC chan<- request, channel *slack.Channel, isLast bool, mm []slack.Message) (int, error) {
lg := slog.With("channel_id", channel.ID, "is_last", isLast, "msg_count", len(mm))

var trs = make([]request, 0, len(mm))
trs := make([]request, 0, len(mm))
for i := range mm {
// collecting threads to get their count. But we don't start
// processing them yet, before we send the messages with the number of
Expand Down Expand Up @@ -326,11 +327,12 @@ func procThreadMsg(ctx context.Context, proc processor.Conversations, channel *s
return err
}
if err := proc.ThreadMessages(ctx, channel.ID, head, threadOnly, isLast, rest); err != nil {
return fmt.Errorf("failed to process thread message id=%s, thread_ts=%s: %w", head.Msg.Timestamp, threadTS, err)
return fmt.Errorf("failed to process thread message id=%s, thread_ts=%s: %w", head.Timestamp, threadTS, err)
}
return nil
}

// procFiles proceses the files in slice of Messages msgs.
func procFiles(ctx context.Context, proc processor.Filer, channel *slack.Channel, msgs ...slack.Message) error {
if len(msgs) == 0 {
return nil
Expand Down Expand Up @@ -416,7 +418,7 @@ func (cs *Stream) procChannelUsers(ctx context.Context, proc processor.ChannelIn
func (cs *Stream) procChannelInfoWithUsers(ctx context.Context, proc processor.ChannelInformer, channelID, threadTS string) (*slack.Channel, error) {
var eg errgroup.Group

var chC = make(chan slack.Channel, 1)
chC := make(chan slack.Channel, 1)
eg.Go(func() error {
defer close(chC)
ch, err := cs.procChannelInfo(ctx, proc, channelID, threadTS)
Expand All @@ -427,7 +429,7 @@ func (cs *Stream) procChannelInfoWithUsers(ctx context.Context, proc processor.C
return nil
})

var uC = make(chan []string, 1)
uC := make(chan []string, 1)
eg.Go(func() error {
defer close(uC)
m, err := cs.procChannelUsers(ctx, proc, channelID, threadTS)
Expand Down
3 changes: 3 additions & 0 deletions utils/count_chunks.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/sh

gzcat $1.json.gz| jq '.t' | awk '{count[$1]++}END{for(t in count)print t,count[t]}'
Loading