-
Notifications
You must be signed in to change notification settings - Fork 134
fix: architectural resolution for #819 using AsyncOutputCoordinator #943
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 4 commits
5408534
97c2bf4
984569f
d5d280e
68ab6b7
2df856c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,10 +3,12 @@ package main | |
| import ( | ||
| "os" | ||
| "strings" | ||
| "time" | ||
|
|
||
| "github.com/projectdiscovery/goflags" | ||
| "github.com/projectdiscovery/gologger" | ||
| "github.com/projectdiscovery/tlsx/internal/runner" | ||
| "github.com/projectdiscovery/tlsx/pkg/output" | ||
| "github.com/projectdiscovery/tlsx/pkg/tlsx/clients" | ||
| "github.com/projectdiscovery/tlsx/pkg/tlsx/openssl" | ||
| "github.com/projectdiscovery/utils/errkit" | ||
|
|
@@ -29,6 +31,24 @@ func process() error { | |
| if err := readFlags(); err != nil { | ||
| return errkit.Wrapf(err, "could not read flags") | ||
| } | ||
|
|
||
| // Initialize output coordinator if output file is specified | ||
| var coord *output.AsyncOutputCoordinator | ||
| var err error | ||
| if options.OutputFile != "" { | ||
| coord, err = output.NewAsyncOutputCoordinator(options.OutputFile, 10000, 1*time.Second) | ||
| if err != nil { | ||
| return errkit.Wrapf(err, "could not initialize output coordinator") | ||
| } | ||
| options.AsyncOutputCoordinator = coord | ||
| coord.HandleSignals() | ||
| defer func() { | ||
| if err := coord.GracefulShutdown(); err != nil { | ||
| gologger.Warning().Msgf("Error during graceful shutdown: %v", err) | ||
| } | ||
| }() | ||
|
Comment on lines
+44
to
+50
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: find . -name "coordinator.go" -type fRepository: projectdiscovery/tlsx Length of output: 92 🏁 Script executed: wc -l ./pkg/output/coordinator.goRepository: projectdiscovery/tlsx Length of output: 96 🏁 Script executed: cat -n ./pkg/output/coordinator.goRepository: projectdiscovery/tlsx Length of output: 4786 Ensure
Protect 🤖 Prompt for AI Agents |
||
| } | ||
|
|
||
| runner, err := runner.New(options) | ||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if err != nil { | ||
| return errkit.Wrapf(err, "could not create runner") | ||
|
|
@@ -112,7 +132,7 @@ func readFlags(args ...string) error { | |
| flagSet.StringSliceVarP(&options.Resolvers, "resolvers", "r", nil, "list of resolvers to use", goflags.FileCommaSeparatedStringSliceOptions), | ||
| flagSet.StringVarP(&options.CACertificate, "cacert", "cc", "", "client certificate authority file"), | ||
| flagSet.StringSliceVarP(&options.Ciphers, "cipher-input", "ci", nil, "ciphers to use with tls connection", goflags.FileCommaSeparatedStringSliceOptions), | ||
| flagSet.StringSliceVar(&options.ServerName, "sni", nil, "tls sni hostname to use", goflags.FileCommaSeparatedStringSliceOptions), | ||
| flagSet.StringSliceVar(&options.ServerName, "sni", nil, "tls sni hostname to use", goflags.NormalizedStringSliceOptions), | ||
| flagSet.BoolVarP(&options.RandomForEmptyServerName, "random-sni", "rs", false, "use random sni when empty"), | ||
| flagSet.BoolVarP(&options.ReversePtrSNI, "rev-ptr-sni", "rps", false, "perform reverse PTR to retrieve SNI from IP"), | ||
| flagSet.StringVar(&options.MinVersion, "min-version", "", "minimum tls version to accept (ssl30,tls10,tls11,tls12,tls13)"), | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,140 @@ | ||
| package output | ||
|
|
||
| import ( | ||
| "bufio" | ||
| "context" | ||
| "encoding/json" | ||
| "os" | ||
| "os/signal" | ||
| "syscall" | ||
| "time" | ||
|
|
||
| "github.com/projectdiscovery/gologger" | ||
| ) | ||
|
|
||
| // AsyncOutputCoordinator manages async writing of scan results to disk. | ||
| // Uses buffered channel for concurrent submission and periodic flushing. | ||
| type AsyncOutputCoordinator struct { | ||
| outputChan chan []byte | ||
| file *os.File | ||
| writer *bufio.Writer | ||
| shutdownCtx context.Context | ||
| cancel context.CancelFunc | ||
| flushTicker *time.Ticker | ||
| done chan struct{} | ||
| } | ||
|
|
||
| // NewAsyncOutputCoordinator creates a new coordinator. | ||
| // bufferSize: Size of the buffered channel (e.g., 10000 for 10k pending results). | ||
| // flushInterval: How often to flush the buffer to disk (e.g., 1*time.Second). | ||
| func NewAsyncOutputCoordinator(filename string, bufferSize int, flushInterval time.Duration) (*AsyncOutputCoordinator, error) { | ||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| file, err := os.Create(filename) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| ctx, cancel := context.WithCancel(context.Background()) | ||
| coord := &AsyncOutputCoordinator{ | ||
| outputChan: make(chan []byte, bufferSize), | ||
| file: file, | ||
| writer: bufio.NewWriter(file), | ||
| shutdownCtx: ctx, | ||
| cancel: cancel, | ||
| flushTicker: time.NewTicker(flushInterval), | ||
| done: make(chan struct{}), | ||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| go coord.writeLoop() | ||
| return coord, nil | ||
| } | ||
|
|
||
| // writeLoop is the dedicated goroutine for writing to disk. | ||
| // Uses periodic flushing instead of flushing after every write. | ||
| func (c *AsyncOutputCoordinator) writeLoop() { | ||
| defer func() { | ||
| c.flushTicker.Stop() | ||
| close(c.done) | ||
| }() | ||
|
|
||
| for { | ||
| select { | ||
| case data, ok := <-c.outputChan: | ||
| if !ok { | ||
| // Channel closed, drain remaining data | ||
| if err := c.writer.Flush(); err != nil { | ||
| gologger.Warning().Msgf("Failed to flush writer during shutdown: %v", err) | ||
| } | ||
| return | ||
| } | ||
| if _, err := c.writer.Write(data); err != nil { | ||
| gologger.Warning().Msgf("Failed to write data: %v", err) | ||
| continue | ||
| } | ||
|
|
||
| case <-c.flushTicker.C: | ||
| if err := c.writer.Flush(); err != nil { | ||
| gologger.Warning().Msgf("Failed to flush writer: %v", err) | ||
| } | ||
|
|
||
| case <-c.shutdownCtx.Done(): | ||
| // Drain the channel before exiting | ||
| for { | ||
| select { | ||
| case data, ok := <-c.outputChan: | ||
| if !ok { | ||
| if err := c.writer.Flush(); err != nil { | ||
| gologger.Warning().Msgf("Failed to flush writer during shutdown: %v", err) | ||
| } | ||
| return | ||
| } | ||
| if _, err := c.writer.Write(data); err != nil { | ||
| gologger.Warning().Msgf("Failed to write data during shutdown: %v", err) | ||
| continue | ||
| } | ||
| default: | ||
| if err := c.writer.Flush(); err != nil { | ||
| gologger.Warning().Msgf("Failed to flush writer during shutdown: %v", err) | ||
| } | ||
| return | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Submit sends a result to the coordinator. | ||
| // Returns an error if the coordinator is shutting down. | ||
| func (c *AsyncOutputCoordinator) Submit(result interface{}) error { | ||
| data, err := json.Marshal(result) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| data = append(data, '\n') | ||
|
|
||
| select { | ||
| case c.outputChan <- data: | ||
| return nil | ||
| case <-c.shutdownCtx.Done(): | ||
| return context.Canceled | ||
| } | ||
| } | ||
|
|
||
| // GracefulShutdown waits for all data to be written and closes the file. | ||
| // Call this when the scan is complete or on program exit. | ||
| func (c *AsyncOutputCoordinator) GracefulShutdown() error { | ||
| c.cancel() | ||
| <-c.done | ||
| return c.file.Close() | ||
| } | ||
|
|
||
| // HandleSignals sets up signal handling for graceful shutdown on CTRL+C. | ||
| // Does not call os.Exit(), allowing defer statements to execute. | ||
| func (c *AsyncOutputCoordinator) HandleSignals() { | ||
| sigChan := make(chan os.Signal, 1) | ||
| signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) | ||
| go func() { | ||
| <-sigChan | ||
| gologger.Info().Msg("Received interrupt signal. Shutting down gracefully...") | ||
| c.GracefulShutdown() | ||
| }() | ||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
Repository: projectdiscovery/tlsx
Length of output: 350
🏁 Script executed:
Repository: projectdiscovery/tlsx
Length of output: 3248
🏁 Script executed:
Repository: projectdiscovery/tlsx
Length of output: 3120
🏁 Script executed:
Repository: projectdiscovery/tlsx
Length of output: 47
🏁 Script executed:
Repository: projectdiscovery/tlsx
Length of output: 1116
🏁 Script executed:
Repository: projectdiscovery/tlsx
Length of output: 805
🏁 Script executed:
Repository: projectdiscovery/tlsx
Length of output: 801
🏁 Script executed:
Repository: projectdiscovery/tlsx
Length of output: 615
Compile error:
AsyncOutputCoordinatorfield does not exist inOptionsstruct.Line 43 assigns
options.AsyncOutputCoordinator = coord, but theOptionsstruct inpkg/tlsx/clients/clients.go(lines 46–188) does not define this field. This will cause a compile-time error: "unknown field AsyncOutputCoordinator in struct literal".🤖 Prompt for AI Agents