Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion doc/server_plugin.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ The response can look like any of the following:

### Operation

Currently `Login`, `NewProxy`, `CloseProxy`, `Ping`, `NewWorkConn` and `NewUserConn` operations are supported.
Currently `Login`, `NewProxy`, `ProxyStarted`, `CloseProxy`, `Ping`, `NewWorkConn` and `NewUserConn` operations are supported.

#### Login

Expand Down Expand Up @@ -138,6 +138,28 @@ Create new proxy
}
```

#### ProxyStarted

Proxy has been successfully started and port allocation is complete. This event is sent **after** the proxy is running,
so it includes the actual allocated port (which may differ from the requested port when `remotePort=0`).

This is a notification-only event - plugins cannot reject or modify the content since the proxy is already running.

```
{
"content": {
"user": {
"user": <string>,
"metas": map<string>string
"run_id": <string>
},
"proxy_name": <string>,
"proxy_type": <string>,
"remote_addr": <string> // The actual allocated address, e.g., ":6000"
}
}
```

#### CloseProxy

A previously created proxy is closed.
Expand Down
1 change: 1 addition & 0 deletions pkg/config/v1/validation/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ var (
SupportedHTTPPluginOps = []string{
splugin.OpLogin,
splugin.OpNewProxy,
splugin.OpProxyStarted,
splugin.OpCloseProxy,
splugin.OpPing,
splugin.OpNewWorkConn,
Expand Down
57 changes: 45 additions & 12 deletions pkg/plugin/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,24 @@ import (
)

type Manager struct {
loginPlugins []Plugin
newProxyPlugins []Plugin
closeProxyPlugins []Plugin
pingPlugins []Plugin
newWorkConnPlugins []Plugin
newUserConnPlugins []Plugin
loginPlugins []Plugin
newProxyPlugins []Plugin
proxyStartedPlugins []Plugin
closeProxyPlugins []Plugin
pingPlugins []Plugin
newWorkConnPlugins []Plugin
newUserConnPlugins []Plugin
}

func NewManager() *Manager {
return &Manager{
loginPlugins: make([]Plugin, 0),
newProxyPlugins: make([]Plugin, 0),
closeProxyPlugins: make([]Plugin, 0),
pingPlugins: make([]Plugin, 0),
newWorkConnPlugins: make([]Plugin, 0),
newUserConnPlugins: make([]Plugin, 0),
loginPlugins: make([]Plugin, 0),
newProxyPlugins: make([]Plugin, 0),
proxyStartedPlugins: make([]Plugin, 0),
closeProxyPlugins: make([]Plugin, 0),
pingPlugins: make([]Plugin, 0),
newWorkConnPlugins: make([]Plugin, 0),
newUserConnPlugins: make([]Plugin, 0),
}
}

Expand All @@ -51,6 +53,9 @@ func (m *Manager) Register(p Plugin) {
if p.IsSupport(OpNewProxy) {
m.newProxyPlugins = append(m.newProxyPlugins, p)
}
if p.IsSupport(OpProxyStarted) {
m.proxyStartedPlugins = append(m.proxyStartedPlugins, p)
}
if p.IsSupport(OpCloseProxy) {
m.closeProxyPlugins = append(m.closeProxyPlugins, p)
}
Expand Down Expand Up @@ -133,6 +138,34 @@ func (m *Manager) NewProxy(content *NewProxyContent) (*NewProxyContent, error) {
return content, nil
}

// ProxyStarted is called after a proxy has been successfully started and port
// allocation is complete. This is a notification-only event (fire and forget),
// plugins cannot reject or modify the content since the proxy is already running.
func (m *Manager) ProxyStarted(content *ProxyStartedContent) error {
if len(m.proxyStartedPlugins) == 0 {
return nil
}

errs := make([]string, 0)
reqid, _ := util.RandID()
xl := xlog.New().AppendPrefix("reqid: " + reqid)
ctx := xlog.NewContext(context.Background(), xl)
ctx = NewReqidContext(ctx, reqid)

for _, p := range m.proxyStartedPlugins {
_, _, err := p.Handle(ctx, OpProxyStarted, *content)
if err != nil {
xl.Warnf("send ProxyStarted request to plugin [%s] error: %v", p.Name(), err)
errs = append(errs, fmt.Sprintf("[%s]: %v", p.Name(), err))
}
}

if len(errs) > 0 {
return fmt.Errorf("send ProxyStarted request to plugin errors: %s", strings.Join(errs, "; "))
}
return nil
}

func (m *Manager) CloseProxy(content *CloseProxyContent) error {
if len(m.closeProxyPlugins) == 0 {
return nil
Expand Down
13 changes: 7 additions & 6 deletions pkg/plugin/server/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ import (
const (
APIVersion = "0.1.0"

OpLogin = "Login"
OpNewProxy = "NewProxy"
OpCloseProxy = "CloseProxy"
OpPing = "Ping"
OpNewWorkConn = "NewWorkConn"
OpNewUserConn = "NewUserConn"
OpLogin = "Login"
OpNewProxy = "NewProxy"
OpProxyStarted = "ProxyStarted"
OpCloseProxy = "CloseProxy"
OpPing = "Ping"
OpNewWorkConn = "NewWorkConn"
OpNewUserConn = "NewUserConn"
)

type Plugin interface {
Expand Down
10 changes: 10 additions & 0 deletions pkg/plugin/server/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,13 @@ type NewUserConnContent struct {
ProxyType string `json:"proxy_type"`
RemoteAddr string `json:"remote_addr"`
}

// ProxyStartedContent is sent after a proxy has been successfully started
// and port allocation is complete. This includes the actual allocated port
// which may differ from the requested port (e.g., when remotePort=0).
type ProxyStartedContent struct {
User UserInfo `json:"user"`
ProxyName string `json:"proxy_name"`
ProxyType string `json:"proxy_type"`
RemoteAddr string `json:"remote_addr"` // The actual allocated address (e.g., ":6000")
}
15 changes: 15 additions & 0 deletions server/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,21 @@ func (ctl *Control) handleNewProxy(m msg.Message) {
resp.RemoteAddr = remoteAddr
xl.Infof("new proxy [%s] type [%s] success", inMsg.ProxyName, inMsg.ProxyType)
metrics.Server.NewProxy(inMsg.ProxyName, inMsg.ProxyType)

// Notify plugins that proxy has started with the allocated port
startedContent := &plugin.ProxyStartedContent{
User: plugin.UserInfo{
User: ctl.loginMsg.User,
Metas: ctl.loginMsg.Metas,
RunID: ctl.loginMsg.RunID,
},
ProxyName: inMsg.ProxyName,
ProxyType: inMsg.ProxyType,
RemoteAddr: remoteAddr,
}
go func() {
_ = ctl.pluginManager.ProxyStarted(startedContent)
}()
}
_ = ctl.msgDispatcher.Send(resp)
}
Expand Down