diff --git a/cmd/slackdump/internal/diag/redownload.go b/cmd/slackdump/internal/diag/redownload.go new file mode 100644 index 00000000..56ebd4df --- /dev/null +++ b/cmd/slackdump/internal/diag/redownload.go @@ -0,0 +1,189 @@ +package diag + +import ( + "context" + "errors" + "fmt" + "log/slog" + "os" + "path/filepath" + + "github.com/rusq/fsadapter" + "github.com/rusq/slack" + + "github.com/rusq/slackdump/v3/cmd/slackdump/internal/bootstrap" + "github.com/rusq/slackdump/v3/cmd/slackdump/internal/cfg" + "github.com/rusq/slackdump/v3/cmd/slackdump/internal/golang/base" + "github.com/rusq/slackdump/v3/internal/chunk" + "github.com/rusq/slackdump/v3/internal/chunk/transform/fileproc" + "github.com/rusq/slackdump/v3/internal/structures" + "github.com/rusq/slackdump/v3/processor" +) + +var cmdRedownload = &base.Command{ + UsageLine: "tools redownload [flags] ", + Short: "attempts to redownload missing files from the archive", + Long: `# File redownload tool +Redownload tool scans the directory with Slackdump Archive, validating the files. +If a file is missing or has zero length, it will be redownloaded from the Slack API. +The tool will not overwrite existing files, so it is safe to run multiple times. + +Please note: + +1. It requires you to have a valid authentication in the selected workspace. +2. Ensure that you have selected the correct workspace using "slackdump workspace select". +3. It only works with Slackdump Archive directories, Slack exports and dumps +are not supported.`, + FlagMask: cfg.OmitAll &^ cfg.OmitAuthFlags, + Run: runRedownload, + PrintFlags: true, + RequireAuth: true, +} + +func runRedownload(ctx context.Context, _ *base.Command, args []string) error { + if len(args) != 1 { + base.SetExitStatus(base.SInvalidParameters) + return errors.New("expected exactly one argument") + } + dir := args[0] + if fi, err := os.Stat(dir); err != nil { + base.SetExitStatus(base.SUserError) + return fmt.Errorf("error accessing the directory: %w", err) + } else if !fi.IsDir() { + base.SetExitStatus(base.SUserError) + return errors.New("expected a directory") + } + if fi, err := os.Stat(filepath.Join(dir, "workspace.json.gz")); err != nil { + base.SetExitStatus(base.SUserError) + return fmt.Errorf("error accessing the workspace file: %w", err) + } else if fi.IsDir() { + base.SetExitStatus(base.SUserError) + return errors.New("this does not look like an archive directory") + } + + if n, err := redownload(ctx, dir); err != nil { + base.SetExitStatus(base.SApplicationError) + return err + } else { + if n == 0 { + slog.Info("no missing files found") + } else { + slog.Info("redownloaded missing files", "num_files", n) + } + } + + return nil +} + +func redownload(ctx context.Context, dir string) (int, error) { + cd, err := chunk.OpenDir(dir) + if err != nil { + return 0, fmt.Errorf("error opening directory: %w", err) + } + defer cd.Close() + + channels, err := cd.Channels() + if err != nil { + return 0, fmt.Errorf("error reading channels: %w", err) + } + if len(channels) == 0 { + return 0, errors.New("no channels found") + } + slog.Info("directory opened", "num_channels", len(channels)) + + sess, err := bootstrap.SlackdumpSession(ctx) + if err != nil { + return 0, fmt.Errorf("error creating slackdump session: %w", err) + } + dl, stop := fileproc.NewDownloader( + ctx, + true, + sess.Client(), + fsadapter.NewDirectory(cd.Name()), + cfg.Log, + ) + defer stop() + // we are using the same file subprocessor as the mattermost export. + fproc := fileproc.NewExport(fileproc.STmattermost, dl) + + total := 0 + for _, ch := range channels { + if n, err := redlChannel(ctx, fproc, cd, &ch); err != nil { + return total, err + } else { + total += n + } + } + + return total, nil +} + +func redlChannel(ctx context.Context, fp processor.Filer, cd *chunk.Directory, ch *slack.Channel) (int, error) { + slog.Info("processing channel", "channel", ch.ID) + f, err := cd.Open(chunk.FileID(ch.ID)) + if err != nil { + return 0, fmt.Errorf("error reading messages: %w", err) + } + defer f.Close() + msgs, err := f.AllMessages(ch.ID) + if err != nil { + return 0, fmt.Errorf("error reading messages: %w", err) + } + if len(msgs) == 0 { + return 0, nil + } + slog.Info("scanning messages", "num_messages", len(msgs)) + return scanMsgs(ctx, fp, cd, f, ch, msgs) +} + +func scanMsgs(ctx context.Context, fp processor.Filer, cd *chunk.Directory, f *chunk.File, ch *slack.Channel, msgs []slack.Message) (int, error) { + lg := slog.With("channel", ch.ID) + total := 0 + for _, m := range msgs { + if structures.IsThreadStart(&m) { + tm, err := f.AllThreadMessages(ch.ID, m.ThreadTimestamp) + if err != nil { + return 0, fmt.Errorf("error reading thread messages: %w", err) + } + lg.Info("scanning thread messages", "num_messages", len(tm), "thread", m.ThreadTimestamp) + if n, err := scanMsgs(ctx, fp, cd, f, ch, tm); err != nil { + return total, err + } else { + total += n + } + } + + // collect all missing files from the message. + var missing []slack.File + for _, ff := range m.Files { + name := filepath.Join(cd.Name(), fileproc.MattermostFilepath(ch, &ff)) + lg := lg.With("file", name) + lg.Debug("checking file") + if fi, err := os.Stat(name); err != nil { + if os.IsNotExist(err) { + // file does not exist + lg.Debug("missing file") + missing = append(missing, ff) + } else { + lg.Error("error accessing file", "error", err) + // some other error + return total, fmt.Errorf("error accessing file: %w", err) + } + } else if fi.Size() == 0 { + // zero length files are considered missing + lg.Debug("zero length file") + missing = append(missing, ff) + } else { + lg.Debug("file OK") + } + } + if len(missing) > 0 { + total += len(missing) + lg.Info("found missing files", "num_files", len(missing)) + if err := fp.Files(ctx, ch, m, missing); err != nil { + return total, fmt.Errorf("error processing files: %w", err) + } + } + } + return total, nil +} diff --git a/cmd/slackdump/internal/diag/tools.go b/cmd/slackdump/internal/diag/tools.go index b84e1fab..47bb615f 100644 --- a/cmd/slackdump/internal/diag/tools.go +++ b/cmd/slackdump/internal/diag/tools.go @@ -31,6 +31,7 @@ Tools command contains different tools, running which may be requested if you op // cmdSearch, cmdThread, cmdHydrate, + cmdRedownload, // cmdWizDebug, }, } diff --git a/internal/network/network.go b/internal/network/network.go index 6011f74f..834450f5 100644 --- a/internal/network/network.go +++ b/internal/network/network.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "io" "log/slog" "net" "net/http" @@ -117,6 +118,16 @@ func WithRetry(ctx context.Context, lim *rate.Limiter, maxAttempts int, fn func( ne *net.OpError // read tcp error: see #234 ) switch { + case errors.Is(cbErr, io.EOF): + // EOF is a transient error + delay := wait(attempt) + slog.WarnContext(ctx, "got EOF, sleeping", "error", cbErr, "delay", delay.String()) + tracelogf(ctx, "info", "got EOF, sleeping %s (%s)", delay, cbErr) + if err := sleepCtx(ctx, delay); err != nil { + return err + } + slog.Debug("resuming after EOF") + continue case errors.As(cbErr, &rle): slog.InfoContext(ctx, "got rate limited, sleeping", "retry_after_sec", rle.RetryAfter, "error", cbErr) tracelogf(ctx, "info", "got rate limited, sleeping %s (%s)", rle.RetryAfter, cbErr) diff --git a/internal/network/network_test.go b/internal/network/network_test.go index f783a068..d6be672f 100644 --- a/internal/network/network_test.go +++ b/internal/network/network_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "io" "net" "net/http" "net/http/httptest" @@ -186,12 +187,10 @@ func TestWithRetry(t *testing.T) { } }) } + // setting fast wait function t.Run("500 error handling", func(t *testing.T) { - t.Parallel() setWaitFunc(func(attempt int) time.Duration { return 50 * time.Millisecond }) - defer func() { - setWaitFunc(cubicWait) - }() + t.Cleanup(func() { setWaitFunc(cubicWait) }) codes := []int{500, 502, 503, 504, 598} for _, code := range codes { @@ -232,8 +231,6 @@ func TestWithRetry(t *testing.T) { }) } t.Run("404 error", func(t *testing.T) { - t.Parallel() - const ( testRetryCount = 1 ) @@ -265,8 +262,27 @@ func TestWithRetry(t *testing.T) { } }) }) + t.Run("EOF error", func(t *testing.T) { + setWaitFunc(func(attempt int) time.Duration { return 50 * time.Millisecond }) + t.Cleanup(func() { setWaitFunc(cubicWait) }) + + reterr := []error{io.EOF, io.EOF, nil} + var retries int + + ctx := context.Background() + err := WithRetry(ctx, rate.NewLimiter(1, 1), 3, func() error { + err := reterr[retries] + if err != nil { + retries++ + } + return err + }) + assert.NoError(t, err) + assert.Equal(t, 2, retries) + }) t.Run("meaningful error message", func(t *testing.T) { - t.Parallel() + setWaitFunc(func(attempt int) time.Duration { return 50 * time.Millisecond }) + t.Cleanup(func() { setWaitFunc(cubicWait) }) errFunc := func() error { return slack.StatusCodeError{Code: 500, Status: "Internal Server Error"} } diff --git a/stream/conversation.go b/stream/conversation.go index 94a8bf7d..b0f9d655 100644 --- a/stream/conversation.go +++ b/stream/conversation.go @@ -9,10 +9,11 @@ import ( "time" "github.com/rusq/slack" + "golang.org/x/sync/errgroup" + "github.com/rusq/slackdump/v3/internal/network" "github.com/rusq/slackdump/v3/internal/structures" "github.com/rusq/slackdump/v3/processor" - "golang.org/x/sync/errgroup" ) // SyncConversations fetches the conversations from the link which can be a @@ -277,7 +278,7 @@ func (cs *Stream) thread(ctx context.Context, req request, callback func(mm []sl func procChanMsg(ctx context.Context, proc processor.Conversations, threadC chan<- request, channel *slack.Channel, isLast bool, mm []slack.Message) (int, error) { lg := slog.With("channel_id", channel.ID, "is_last", isLast, "msg_count", len(mm)) - var trs = make([]request, 0, len(mm)) + trs := make([]request, 0, len(mm)) for i := range mm { // collecting threads to get their count. But we don't start // processing them yet, before we send the messages with the number of @@ -326,11 +327,12 @@ func procThreadMsg(ctx context.Context, proc processor.Conversations, channel *s return err } if err := proc.ThreadMessages(ctx, channel.ID, head, threadOnly, isLast, rest); err != nil { - return fmt.Errorf("failed to process thread message id=%s, thread_ts=%s: %w", head.Msg.Timestamp, threadTS, err) + return fmt.Errorf("failed to process thread message id=%s, thread_ts=%s: %w", head.Timestamp, threadTS, err) } return nil } +// procFiles proceses the files in slice of Messages msgs. func procFiles(ctx context.Context, proc processor.Filer, channel *slack.Channel, msgs ...slack.Message) error { if len(msgs) == 0 { return nil @@ -416,7 +418,7 @@ func (cs *Stream) procChannelUsers(ctx context.Context, proc processor.ChannelIn func (cs *Stream) procChannelInfoWithUsers(ctx context.Context, proc processor.ChannelInformer, channelID, threadTS string) (*slack.Channel, error) { var eg errgroup.Group - var chC = make(chan slack.Channel, 1) + chC := make(chan slack.Channel, 1) eg.Go(func() error { defer close(chC) ch, err := cs.procChannelInfo(ctx, proc, channelID, threadTS) @@ -427,7 +429,7 @@ func (cs *Stream) procChannelInfoWithUsers(ctx context.Context, proc processor.C return nil }) - var uC = make(chan []string, 1) + uC := make(chan []string, 1) eg.Go(func() error { defer close(uC) m, err := cs.procChannelUsers(ctx, proc, channelID, threadTS) diff --git a/utils/count_chunks.sh b/utils/count_chunks.sh new file mode 100644 index 00000000..715fe9c4 --- /dev/null +++ b/utils/count_chunks.sh @@ -0,0 +1,3 @@ +#!/bin/sh + +gzcat $1.json.gz| jq '.t' | awk '{count[$1]++}END{for(t in count)print t,count[t]}' \ No newline at end of file