Skip to content

Loopback outbound: add timeouts (fixes half-closed tcp issue #5917)#6000

Open
xsm1997 wants to merge 1 commit intoXTLS:mainfrom
xsm1997:main
Open

Loopback outbound: add timeouts (fixes half-closed tcp issue #5917)#6000
xsm1997 wants to merge 1 commit intoXTLS:mainfrom
xsm1997:main

Conversation

@xsm1997
Copy link
Copy Markdown

@xsm1997 xsm1997 commented Apr 22, 2026

fixes #5917.

我遇到的这个问题查明白了,不是tunnel入站的问题,而是loopback出站缺少timeout导致的。这里补上了loopback出站的三个timeout。

luci-app-passwall的默认分流配置目前会使用到loopback,协议链是tunnel inbound -> loopback -> vless outbound。之前tunnel已经改成dispatchlink了,而loopback还是task.Run。应该是这么一接导致outbound没办法close到inbound吧。

PS:loopback原先没有user_level,我为了拿policy的timeout,写了个5的user_level,和tun一样。可能需要修改。

@Fangliding
Copy link
Copy Markdown
Member

loopback只是让它重新进路由而已 理论上还是会继续往后传递超时链条的 这个以后也很可能改成dispatchlink
我不是说找一下goroutine卡在哪了么

@xsm1997
Copy link
Copy Markdown
Author

xsm1997 commented Apr 23, 2026

开了200个socat进行压力测试,下面是抓到的pprof goroutine。此时应用服务器(服务器上的socat做的server)已经关闭,xray服务器上的连接已经消失。看上去是卡在了loopback的requestDone里,在等buf.Copy

top

Details

Fetching profile over HTTP from http://localhost:6060/debug/pprof/goroutine
Saved profile in /home/kuma/pprof/pprof.main.goroutine.005.pb.gz
File: main
Build ID: 59d2d93a4702c341e6f0728f6d232bd1328e7dc2
Type: goroutine
Time: 2026-04-23 08:50:31 CST
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 618, 99.52% of 621 total
Dropped 67 nodes (cum <= 3)
Showing top 10 nodes out of 41
      flat  flat%   sum%        cum   cum%
       618 99.52% 99.52%        618 99.52%  runtime.gopark
         0     0% 99.52%        200 32.21%  github.com/xtls/xray-core/app/dispatcher.(*DefaultDispatcher).DispatchLink
         0     0% 99.52%        201 32.37%  github.com/xtls/xray-core/app/dispatcher.(*DefaultDispatcher).routedDispatch
         0     0% 99.52%        200 32.21%  github.com/xtls/xray-core/app/dispatcher.(*cachedReader).ReadMultiBuffer
         0     0% 99.52%        200 32.21%  github.com/xtls/xray-core/app/proxyman/inbound.(*tcpWorker).callback
         0     0% 99.52%          3  0.48%  github.com/xtls/xray-core/app/proxyman/inbound.(*udpWorker).handlePackets
         0     0% 99.52%        201 32.37%  github.com/xtls/xray-core/app/proxyman/outbound.(*Handler).Dispatch
         0     0% 99.52%        200 32.21%  github.com/xtls/xray-core/common/buf.(*Buffer).ReadFrom (inline)
         0     0% 99.52%        200 32.21%  github.com/xtls/xray-core/common/buf.(*ReadVReader).ReadMultiBuffer
         0     0% 99.52%        200 32.21%  github.com/xtls/xray-core/common/buf.(*TimeoutWrapperReader).ReadMultiBuffer

list DispatchLink

Details

(pprof) list DispatchLink
Total: 621
ROUTINE ======================== github.com/xtls/xray-core/app/dispatcher.(*DefaultDispatcher).DispatchLink in /home/kuma/work/xray-core/app/dispatcher/default.go
         0        200 (flat, cum) 32.21% of Total
         .          .    324:func (d *DefaultDispatcher) DispatchLink(ctx context.Context, destination net.Destination, outbound *transport.Link) error {
         .          .    325:   if !destination.IsValid() {
         .          .    326:           return errors.New("Dispatcher: Invalid destination.")
         .          .    327:   }
         .          .    328:   outbounds := session.OutboundsFromContext(ctx)
         .          .    329:   if len(outbounds) == 0 {
         .          .    330:           outbounds = []*session.Outbound{{}}
         .          .    331:           ctx = session.ContextWithOutbounds(ctx, outbounds)
         .          .    332:   }
         .          .    333:   ob := outbounds[len(outbounds)-1]
         .          .    334:   ob.OriginalTarget = destination
         .          .    335:   ob.Target = destination
         .          .    336:   content := session.ContentFromContext(ctx)
         .          .    337:   if content == nil {
         .          .    338:           content = new(session.Content)
         .          .    339:           ctx = session.ContextWithContent(ctx, content)
         .          .    340:   }
         .          .    341:   outbound = WrapLink(ctx, d.policy, d.stats, outbound)
         .          .    342:   sniffingRequest := content.SniffingRequest
         .          .    343:   if !sniffingRequest.Enabled {
         .          .    344:           d.routedDispatch(ctx, outbound, destination)
         .          .    345:   } else {
         .          .    346:           cReader := &cachedReader{
         .          .    347:                   reader: outbound.Reader.(buf.TimeoutReader),
         .          .    348:           }
         .          .    349:           outbound.Reader = cReader
         .          .    350:           result, err := sniffer(ctx, cReader, sniffingRequest.MetadataOnly, destination.Network)
         .          .    351:           if err == nil {
         .          .    352:                   content.Protocol = result.Protocol()
         .          .    353:           }
         .          .    354:           if err == nil && d.shouldOverride(ctx, result, sniffingRequest, destination) {
         .          .    355:                   domain := result.Domain()
         .          .    356:                   errors.LogInfo(ctx, "sniffed domain: ", domain)
         .          .    357:                   destination.Address = net.ParseAddress(domain)
         .          .    358:                   protocol := result.Protocol()
         .          .    359:                   if resComp, ok := result.(SnifferResultComposite); ok {
         .          .    360:                           protocol = resComp.ProtocolForDomainResult()
         .          .    361:                   }
         .          .    362:                   isFakeIP := false
         .          .    363:                   if fkr0, ok := d.fdns.(dns.FakeDNSEngineRev0); ok && fkr0.IsIPInIPPool(ob.Target.Address) {
         .          .    364:                           isFakeIP = true
         .          .    365:                   }
         .          .    366:                   if sniffingRequest.RouteOnly && protocol != "fakedns" && protocol != "fakedns+others" && !isFakeIP {
         .          .    367:                           ob.RouteTarget = destination
         .          .    368:                   } else {
         .          .    369:                           ob.Target = destination
         .          .    370:                   }
         .          .    371:           }
         .        200    372:           d.routedDispatch(ctx, outbound, destination)
         .          .    373:   }
         .          .    374:
         .          .    375:   return nil
         .          .    376:}
         .          .    377:
ROUTINE ======================== github.com/xtls/xray-core/common/mux.(*Server).DispatchLink in /home/kuma/work/xray-core/common/mux/server.go
         0        200 (flat, cum) 32.21% of Total
         .          .     62:func (s *Server) DispatchLink(ctx context.Context, dest net.Destination, link *transport.Link) error {
         .          .     63:   if dest.Address != muxCoolAddress {
         .        200     64:           return s.dispatcher.DispatchLink(ctx, dest, link)
         .          .     65:   }
         .          .     66:   worker, err := NewServerWorker(ctx, s.dispatcher, link)
         .          .     67:   if err != nil {
         .          .     68:           return err
         .          .     69:   }

list routedDispatch

Details

(pprof) list routedDispatch
Total: 621
ROUTINE ======================== github.com/xtls/xray-core/app/dispatcher.(*DefaultDispatcher).routedDispatch in /home/kuma/work/xray-core/app/dispatcher/default.go
         0        201 (flat, cum) 32.37% of Total
         .          .    434:func (d *DefaultDispatcher) routedDispatch(ctx context.Context, link *transport.Link, destination net.Destination) {
         .          .    435:   outbounds := session.OutboundsFromContext(ctx)
         .          .    436:   ob := outbounds[len(outbounds)-1]
         .          .    437:
         .          .    438:   var handler outbound.Handler
         .          .    439:
         .          .    440:   routingLink := routing_session.AsRoutingContext(ctx)
         .          .    441:   inTag := routingLink.GetInboundTag()
         .          .    442:   isPickRoute := 0
         .          .    443:   if forcedOutboundTag := session.GetForcedOutboundTagFromContext(ctx); forcedOutboundTag != "" {
         .          .    444:           ctx = session.SetForcedOutboundTagToContext(ctx, "")
         .          .    445:           if h := d.ohm.GetHandler(forcedOutboundTag); h != nil {
         .          .    446:                   isPickRoute = 1
         .          .    447:                   errors.LogInfo(ctx, "taking platform initialized detour [", forcedOutboundTag, "] for [", destination, "]")
         .          .    448:                   handler = h
         .          .    449:           } else {
         .          .    450:                   errors.LogError(ctx, "non existing tag for platform initialized detour: ", forcedOutboundTag)
         .          .    451:                   common.Close(link.Writer)
         .          .    452:                   common.Interrupt(link.Reader)
         .          .    453:                   return
         .          .    454:           }
         .          .    455:   } else if d.router != nil {
         .          .    456:           if route, err := d.router.PickRoute(routingLink); err == nil {
         .          .    457:                   outTag := route.GetOutboundTag()
         .          .    458:                   if h := d.ohm.GetHandler(outTag); h != nil {
         .          .    459:                           isPickRoute = 2
         .          .    460:                           if route.GetRuleTag() == "" {
         .          .    461:                                   errors.LogInfo(ctx, "taking detour [", outTag, "] for [", destination, "]")
         .          .    462:                           } else {
         .          .    463:                                   errors.LogInfo(ctx, "Hit route rule: [", route.GetRuleTag(), "] so taking detour [", outTag, "] for [", destination, "]")
         .          .    464:                           }
         .          .    465:                           handler = h
         .          .    466:                   } else {
         .          .    467:                           errors.LogWarning(ctx, "non existing outTag: ", outTag)
         .          .    468:                           common.Close(link.Writer)
         .          .    469:                           common.Interrupt(link.Reader)
         .          .    470:                           return // DO NOT CHANGE: the traffic shouldn't be processed by default outbound if the specified outbound tag doesn't exist (yet), e.g., VLESS Reverse Proxy
         .          .    471:                   }
         .          .    472:           } else {
         .          .    473:                   errors.LogInfo(ctx, "default route for ", destination)
         .          .    474:           }
         .          .    475:   }
         .          .    476:
         .          .    477:   if handler == nil {
         .          .    478:           handler = d.ohm.GetDefaultHandler()
         .          .    479:   }
         .          .    480:
         .          .    481:   if handler == nil {
         .          .    482:           errors.LogInfo(ctx, "default outbound handler not exist")
         .          .    483:           common.Close(link.Writer)
         .          .    484:           common.Interrupt(link.Reader)
         .          .    485:           return
         .          .    486:   }
         .          .    487:
         .          .    488:   ob.Tag = handler.Tag()
         .          .    489:   if accessMessage := log.AccessMessageFromContext(ctx); accessMessage != nil {
         .          .    490:           if tag := handler.Tag(); tag != "" {
         .          .    491:                   if inTag == "" {
         .          .    492:                           accessMessage.Detour = tag
         .          .    493:                   } else if isPickRoute == 1 {
         .          .    494:                           accessMessage.Detour = inTag + " ==> " + tag
         .          .    495:                   } else if isPickRoute == 2 {
         .          .    496:                           accessMessage.Detour = inTag + " -> " + tag
         .          .    497:                   } else {
         .          .    498:                           accessMessage.Detour = inTag + " >> " + tag
         .          .    499:                   }
         .          .    500:           }
         .          .    501:           log.Record(accessMessage)
         .          .    502:   }
         .          .    503:
         .        201    504:   handler.Dispatch(ctx, link)
         .          .    505:}

list Dispatch

Details

(pprof) list Dispatch
Total: 621
ROUTINE ======================== github.com/xtls/xray-core/app/dispatcher.(*DefaultDispatcher).DispatchLink in /home/kuma/work/xray-core/app/dispatcher/default.go
         0        200 (flat, cum) 32.21% of Total
         .          .    324:func (d *DefaultDispatcher) DispatchLink(ctx context.Context, destination net.Destination, outbound *transport.Link) error {
         .          .    325:   if !destination.IsValid() {
         .          .    326:           return errors.New("Dispatcher: Invalid destination.")
         .          .    327:   }
         .          .    328:   outbounds := session.OutboundsFromContext(ctx)
         .          .    329:   if len(outbounds) == 0 {
         .          .    330:           outbounds = []*session.Outbound{{}}
         .          .    331:           ctx = session.ContextWithOutbounds(ctx, outbounds)
         .          .    332:   }
         .          .    333:   ob := outbounds[len(outbounds)-1]
         .          .    334:   ob.OriginalTarget = destination
         .          .    335:   ob.Target = destination
         .          .    336:   content := session.ContentFromContext(ctx)
         .          .    337:   if content == nil {
         .          .    338:           content = new(session.Content)
         .          .    339:           ctx = session.ContextWithContent(ctx, content)
         .          .    340:   }
         .          .    341:   outbound = WrapLink(ctx, d.policy, d.stats, outbound)
         .          .    342:   sniffingRequest := content.SniffingRequest
         .          .    343:   if !sniffingRequest.Enabled {
         .          .    344:           d.routedDispatch(ctx, outbound, destination)
         .          .    345:   } else {
         .          .    346:           cReader := &cachedReader{
         .          .    347:                   reader: outbound.Reader.(buf.TimeoutReader),
         .          .    348:           }
         .          .    349:           outbound.Reader = cReader
         .          .    350:           result, err := sniffer(ctx, cReader, sniffingRequest.MetadataOnly, destination.Network)
         .          .    351:           if err == nil {
         .          .    352:                   content.Protocol = result.Protocol()
         .          .    353:           }
         .          .    354:           if err == nil && d.shouldOverride(ctx, result, sniffingRequest, destination) {
         .          .    355:                   domain := result.Domain()
         .          .    356:                   errors.LogInfo(ctx, "sniffed domain: ", domain)
         .          .    357:                   destination.Address = net.ParseAddress(domain)
         .          .    358:                   protocol := result.Protocol()
         .          .    359:                   if resComp, ok := result.(SnifferResultComposite); ok {
         .          .    360:                           protocol = resComp.ProtocolForDomainResult()
         .          .    361:                   }
         .          .    362:                   isFakeIP := false
         .          .    363:                   if fkr0, ok := d.fdns.(dns.FakeDNSEngineRev0); ok && fkr0.IsIPInIPPool(ob.Target.Address) {
         .          .    364:                           isFakeIP = true
         .          .    365:                   }
         .          .    366:                   if sniffingRequest.RouteOnly && protocol != "fakedns" && protocol != "fakedns+others" && !isFakeIP {
         .          .    367:                           ob.RouteTarget = destination
         .          .    368:                   } else {
         .          .    369:                           ob.Target = destination
         .          .    370:                   }
         .          .    371:           }
         .        200    372:           d.routedDispatch(ctx, outbound, destination)
         .          .    373:   }
         .          .    374:
         .          .    375:   return nil
         .          .    376:}
         .          .    377:
ROUTINE ======================== github.com/xtls/xray-core/app/dispatcher.(*DefaultDispatcher).routedDispatch in /home/kuma/work/xray-core/app/dispatcher/default.go
         0        201 (flat, cum) 32.37% of Total
         .          .    434:func (d *DefaultDispatcher) routedDispatch(ctx context.Context, link *transport.Link, destination net.Destination) {
         .          .    435:   outbounds := session.OutboundsFromContext(ctx)
         .          .    436:   ob := outbounds[len(outbounds)-1]
         .          .    437:
         .          .    438:   var handler outbound.Handler
         .          .    439:
         .          .    440:   routingLink := routing_session.AsRoutingContext(ctx)
         .          .    441:   inTag := routingLink.GetInboundTag()
         .          .    442:   isPickRoute := 0
         .          .    443:   if forcedOutboundTag := session.GetForcedOutboundTagFromContext(ctx); forcedOutboundTag != "" {
         .          .    444:           ctx = session.SetForcedOutboundTagToContext(ctx, "")
         .          .    445:           if h := d.ohm.GetHandler(forcedOutboundTag); h != nil {
         .          .    446:                   isPickRoute = 1
         .          .    447:                   errors.LogInfo(ctx, "taking platform initialized detour [", forcedOutboundTag, "] for [", destination, "]")
         .          .    448:                   handler = h
         .          .    449:           } else {
         .          .    450:                   errors.LogError(ctx, "non existing tag for platform initialized detour: ", forcedOutboundTag)
         .          .    451:                   common.Close(link.Writer)
         .          .    452:                   common.Interrupt(link.Reader)
         .          .    453:                   return
         .          .    454:           }
         .          .    455:   } else if d.router != nil {
         .          .    456:           if route, err := d.router.PickRoute(routingLink); err == nil {
         .          .    457:                   outTag := route.GetOutboundTag()
         .          .    458:                   if h := d.ohm.GetHandler(outTag); h != nil {
         .          .    459:                           isPickRoute = 2
         .          .    460:                           if route.GetRuleTag() == "" {
         .          .    461:                                   errors.LogInfo(ctx, "taking detour [", outTag, "] for [", destination, "]")
         .          .    462:                           } else {
         .          .    463:                                   errors.LogInfo(ctx, "Hit route rule: [", route.GetRuleTag(), "] so taking detour [", outTag, "] for [", destination, "]")
         .          .    464:                           }
         .          .    465:                           handler = h
         .          .    466:                   } else {
         .          .    467:                           errors.LogWarning(ctx, "non existing outTag: ", outTag)
         .          .    468:                           common.Close(link.Writer)
         .          .    469:                           common.Interrupt(link.Reader)
         .          .    470:                           return // DO NOT CHANGE: the traffic shouldn't be processed by default outbound if the specified outbound tag doesn't exist (yet), e.g., VLESS Reverse Proxy
         .          .    471:                   }
         .          .    472:           } else {
         .          .    473:                   errors.LogInfo(ctx, "default route for ", destination)
         .          .    474:           }
         .          .    475:   }
         .          .    476:
         .          .    477:   if handler == nil {
         .          .    478:           handler = d.ohm.GetDefaultHandler()
         .          .    479:   }
         .          .    480:
         .          .    481:   if handler == nil {
         .          .    482:           errors.LogInfo(ctx, "default outbound handler not exist")
         .          .    483:           common.Close(link.Writer)
         .          .    484:           common.Interrupt(link.Reader)
         .          .    485:           return
         .          .    486:   }
         .          .    487:
         .          .    488:   ob.Tag = handler.Tag()
         .          .    489:   if accessMessage := log.AccessMessageFromContext(ctx); accessMessage != nil {
         .          .    490:           if tag := handler.Tag(); tag != "" {
         .          .    491:                   if inTag == "" {
         .          .    492:                           accessMessage.Detour = tag
         .          .    493:                   } else if isPickRoute == 1 {
         .          .    494:                           accessMessage.Detour = inTag + " ==> " + tag
         .          .    495:                   } else if isPickRoute == 2 {
         .          .    496:                           accessMessage.Detour = inTag + " -> " + tag
         .          .    497:                   } else {
         .          .    498:                           accessMessage.Detour = inTag + " >> " + tag
         .          .    499:                   }
         .          .    500:           }
         .          .    501:           log.Record(accessMessage)
         .          .    502:   }
         .          .    503:
         .        201    504:   handler.Dispatch(ctx, link)
         .          .    505:}
ROUTINE ======================== github.com/xtls/xray-core/app/proxyman/outbound.(*Handler).Dispatch in /home/kuma/work/xray-core/app/proxyman/outbound/handler.go
         0        201 (flat, cum) 32.37% of Total
         .          .    180:func (h *Handler) Dispatch(ctx context.Context, link *transport.Link) {
         .          .    181:   outbounds := session.OutboundsFromContext(ctx)
         .          .    182:   ob := outbounds[len(outbounds)-1]
         .          .    183:   content := session.ContentFromContext(ctx)
         .          .    184:   if h.senderSettings != nil && h.senderSettings.TargetStrategy.HasStrategy() && ob.Target.Address.Family().IsDomain() && (content == nil || !content.SkipDNSResolve) {
         .          .    185:           strategy := h.senderSettings.TargetStrategy
         .          .    186:           if ob.Target.Network == net.Network_UDP && ob.OriginalTarget.Address != nil {
         .          .    187:                   strategy = strategy.GetDynamicStrategy(ob.OriginalTarget.Address.Family())
         .          .    188:           }
         .          .    189:           ips, err := internet.LookupForIP(ob.Target.Address.Domain(), strategy, nil)
         .          .    190:           if err != nil {
         .          .    191:                   errors.LogInfoInner(ctx, err, "failed to resolve ip for target ", ob.Target.Address.Domain())
         .          .    192:                   if h.senderSettings.TargetStrategy.ForceIP() {
         .          .    193:                           err := errors.New("failed to resolve ip for target ", ob.Target.Address.Domain()).Base(err)
         .          .    194:                           session.SubmitOutboundErrorToOriginator(ctx, err)
         .          .    195:                           common.Interrupt(link.Writer)
         .          .    196:                           common.Interrupt(link.Reader)
         .          .    197:                           return
         .          .    198:                   }
         .          .    199:
         .          .    200:           } else {
         .          .    201:                   unchangedDomain := ob.Target.Address.Domain()
         .          .    202:                   ob.Target.Address = net.IPAddress(ips[dice.Roll(len(ips))])
         .          .    203:                   errors.LogInfo(ctx, "target: ", unchangedDomain, " resolved to: ", ob.Target.Address.String())
         .          .    204:           }
         .          .    205:   }
         .          .    206:   if ob.Target.Network == net.Network_UDP && ob.OriginalTarget.Address != nil && ob.OriginalTarget.Address != ob.Target.Address {
         .          .    207:           link.Reader = &buf.EndpointOverrideReader{Reader: link.Reader, Dest: ob.Target.Address, OriginalDest: ob.OriginalTarget.Address}
         .          .    208:           link.Writer = &buf.EndpointOverrideWriter{Writer: link.Writer, Dest: ob.Target.Address, OriginalDest: ob.OriginalTarget.Address}
         .          .    209:   }
         .          .    210:   if h.mux != nil {
         .          .    211:           test := func(err error) {
         .          .    212:                   if err != nil {
         .          .    213:                           err := errors.New("failed to process mux outbound traffic").Base(err)
         .          .    214:                           session.SubmitOutboundErrorToOriginator(ctx, err)
         .          .    215:                           errors.LogInfo(ctx, err.Error())
         .          .    216:                           common.Interrupt(link.Writer)
         .          .    217:                           common.Interrupt(link.Reader)
         .          .    218:                   }
         .          .    219:           }
         .          .    220:           if ob.Target.Network == net.Network_UDP && ob.Target.Port == 443 {
         .          .    221:                   switch h.udp443 {
         .          .    222:                   case "reject":
         .          .    223:                           test(errors.New("XUDP rejected UDP/443 traffic").AtInfo())
         .          .    224:                           return
         .          .    225:                   case "skip":
         .          .    226:                           goto out
         .          .    227:                   }
         .          .    228:           }
         .          .    229:           if h.xudp != nil && ob.Target.Network == net.Network_UDP {
         .          .    230:                   if !h.xudp.Enabled {
         .          .    231:                           goto out
         .          .    232:                   }
         .          .    233:                   test(h.xudp.Dispatch(ctx, link))
         .          .    234:                   return
         .          .    235:           }
         .          .    236:           if h.mux.Enabled {
         .          .    237:                   test(h.mux.Dispatch(ctx, link))
         .          .    238:                   return
         .          .    239:           }
         .          .    240:   }
         .          .    241:out:
         .        201    242:   err := h.proxy.Process(ctx, link, h)
         .          .    243:   var errC error
         .          .    244:   if err != nil {
         .          .    245:           errC = errors.Cause(err)
         .          .    246:           if goerrors.Is(errC, io.EOF) || goerrors.Is(errC, io.ErrClosedPipe) || goerrors.Is(errC, context.Canceled) {
         .          .    247:                   err = nil
ROUTINE ======================== github.com/xtls/xray-core/common/mux.(*Server).DispatchLink in /home/kuma/work/xray-core/common/mux/server.go
         0        200 (flat, cum) 32.21% of Total
         .          .     62:func (s *Server) DispatchLink(ctx context.Context, dest net.Destination, link *transport.Link) error {
         .          .     63:   if dest.Address != muxCoolAddress {
         .        200     64:           return s.dispatcher.DispatchLink(ctx, dest, link)
         .          .     65:   }
         .          .     66:   worker, err := NewServerWorker(ctx, s.dispatcher, link)
         .          .     67:   if err != nil {
         .          .     68:           return err
         .          .     69:   }

list Process

Details

(pprof) list Process
Total: 621
ROUTINE ======================== github.com/xtls/xray-core/proxy/dokodemo.(*DokodemoDoor).Process in /home/kuma/work/xray-core/proxy/dokodemo/dokodemo.go
         0        200 (flat, cum) 32.21% of Total
         .          .     73:func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn stat.Connection, dispatcher routing.Dispatcher) error {
         .          .     74:   errors.LogDebug(ctx, "processing connection from: ", conn.RemoteAddr())
         .          .     75:   // forward to TCP if from UNIX
         .          .     76:   if network == net.Network_UNIX {
         .          .     77:           network = net.Network_TCP
         .          .     78:   }
         .          .     79:   dest := net.Destination{
         .          .     80:           Network: network,
         .          .     81:           Address: d.address,
         .          .     82:           Port:    d.port,
         .          .     83:   }
         .          .     84:
         .          .     85:   if !d.config.FollowRedirect {
         .          .     86:           host, port, err := net.SplitHostPort(conn.LocalAddr().String())
         .          .     87:           if dest.Address == nil {
         .          .     88:                   if err != nil {
         .          .     89:                           dest.Address = net.DomainAddress("localhost")
         .          .     90:                   } else {
         .          .     91:                           if strings.Contains(host, ".") {
         .          .     92:                                   dest.Address = net.LocalHostIP
         .          .     93:                           } else {
         .          .     94:                                   dest.Address = net.LocalHostIPv6
         .          .     95:                           }
         .          .     96:                   }
         .          .     97:           }
         .          .     98:           if dest.Port == 0 {
         .          .     99:                   dest.Port = net.Port(common.Must2(strconv.Atoi(port)))
         .          .    100:           }
         .          .    101:           if d.portMap != nil && d.portMap[port] != "" {
         .          .    102:                   h, p, _ := net.SplitHostPort(d.portMap[port])
         .          .    103:                   if len(h) > 0 {
         .          .    104:                           dest.Address = net.ParseAddress(h)
         .          .    105:                   }
         .          .    106:                   if len(p) > 0 {
         .          .    107:                           dest.Port = net.Port(common.Must2(strconv.Atoi(p)))
         .          .    108:                   }
         .          .    109:           }
         .          .    110:   }
         .          .    111:
         .          .    112:   destinationOverridden := false
         .          .    113:   if d.config.FollowRedirect {
         .          .    114:           outbounds := session.OutboundsFromContext(ctx)
         .          .    115:           if len(outbounds) > 0 {
         .          .    116:                   ob := outbounds[len(outbounds)-1]
         .          .    117:                   if ob.Target.IsValid() {
         .          .    118:                           dest = ob.Target
         .          .    119:                           destinationOverridden = true
         .          .    120:                   }
         .          .    121:           }
         .          .    122:           iConn := stat.TryUnwrapStatsConn(conn)
         .          .    123:           if tlsConn, ok := iConn.(tls.Interface); ok && !destinationOverridden {
         .          .    124:                   if serverName := tlsConn.HandshakeContextServerName(ctx); serverName != "" {
         .          .    125:                           dest.Address = net.DomainAddress(serverName)
         .          .    126:                           destinationOverridden = true
         .          .    127:                           ctx = session.ContextWithMitmServerName(ctx, serverName)
         .          .    128:                   }
         .          .    129:                   if tlsConn.NegotiatedProtocol() != "h2" {
         .          .    130:                           ctx = session.ContextWithMitmAlpn11(ctx, true)
         .          .    131:                   }
         .          .    132:           }
         .          .    133:   }
         .          .    134:   if !dest.IsValid() || dest.Address == nil {
         .          .    135:           return errors.New("unable to get destination")
         .          .    136:   }
         .          .    137:
         .          .    138:   inbound := session.InboundFromContext(ctx)
         .          .    139:   inbound.Name = "dokodemo-door"
         .          .    140:   inbound.CanSpliceCopy = 1
         .          .    141:   inbound.User = &protocol.MemoryUser{
         .          .    142:           Level: d.config.UserLevel,
         .          .    143:   }
         .          .    144:
         .          .    145:   ctx = log.ContextWithAccessMessage(ctx, &log.AccessMessage{
         .          .    146:           From:   conn.RemoteAddr(),
         .          .    147:           To:     dest,
         .          .    148:           Status: log.AccessAccepted,
         .          .    149:           Reason: "",
         .          .    150:   })
         .          .    151:   errors.LogInfo(ctx, "received request for ", conn.RemoteAddr())
         .          .    152:
         .          .    153:   var reader buf.Reader
         .          .    154:   if dest.Network == net.Network_TCP {
         .          .    155:           reader = buf.NewReader(conn)
         .          .    156:   } else {
         .          .    157:           reader = buf.NewPacketReader(conn)
         .          .    158:   }
         .          .    159:
         .          .    160:   var writer buf.Writer
         .          .    161:   if network == net.Network_TCP {
         .          .    162:           writer = buf.NewWriter(conn)
         .          .    163:   } else {
         .          .    164:           // if we are in TPROXY mode, use linux's udp forging functionality
         .          .    165:           if !destinationOverridden {
         .          .    166:                   writer = &buf.SequentialWriter{Writer: conn}
         .          .    167:           } else {
         .          .    168:                   back := conn.RemoteAddr().(*net.UDPAddr)
         .          .    169:                   if !dest.Address.Family().IsIP() {
         .          .    170:                           if len(back.IP) == 4 {
         .          .    171:                                   dest.Address = net.AnyIP
         .          .    172:                           } else {
         .          .    173:                                   dest.Address = net.AnyIPv6
         .          .    174:                           }
         .          .    175:                   }
         .          .    176:                   addr := &net.UDPAddr{
         .          .    177:                           IP:   dest.Address.IP(),
         .          .    178:                           Port: int(dest.Port),
         .          .    179:                   }
         .          .    180:                   var mark int
         .          .    181:                   if d.sockopt != nil {
         .          .    182:                           mark = int(d.sockopt.Mark)
         .          .    183:                   }
         .          .    184:                   pConn, err := FakeUDP(addr, mark)
         .          .    185:                   if err != nil {
         .          .    186:                           return err
         .          .    187:                   }
         .          .    188:                   writer = NewPacketWriter(pConn, &dest, mark, back)
         .          .    189:                   defer writer.(*PacketWriter).Close() // close fake UDP conns
         .          .    190:           }
         .          .    191:   }
         .          .    192:
         .        200    193:   if err := dispatcher.DispatchLink(ctx, dest, &transport.Link{
         .          .    194:           Reader: reader,
         .          .    195:           Writer: writer},
         .          .    196:   ); err != nil {
         .          .    197:           return errors.New("failed to dispatch request").Base(err)
         .          .    198:   }
ROUTINE ======================== github.com/xtls/xray-core/proxy/loopback.(*Loopback).Process in /home/kuma/work/xray-core/proxy/loopback/loopback.go
         0        200 (flat, cum) 32.21% of Total
         .          .     25:func (l *Loopback) Process(ctx context.Context, link *transport.Link, _ internet.Dialer) error {
         .          .     26:   outbounds := session.OutboundsFromContext(ctx)
         .          .     27:   ob := outbounds[len(outbounds)-1]
         .          .     28:   if !ob.Target.IsValid() {
         .          .     29:           return errors.New("target not specified.")
         .          .     30:   }
         .          .     31:   ob.Name = "loopback"
         .          .     32:   destination := ob.Target
         .          .     33:
         .          .     34:   errors.LogInfo(ctx, "opening connection to ", destination)
         .          .     35:
         .          .     36:   input := link.Reader
         .          .     37:   output := link.Writer
         .          .     38:
         .          .     39:   var conn net.Conn
         .          .     40:   err := retry.ExponentialBackoff(2, 100).On(func() error {
         .          .     41:           dialDest := destination
         .          .     42:
         .          .     43:           content := new(session.Content)
         .          .     44:           content.SkipDNSResolve = true
         .          .     45:
         .          .     46:           ctx = session.ContextWithContent(ctx, content)
         .          .     47:
         .          .     48:           inbound := &session.Inbound{}
         .          .     49:           originInbound := session.InboundFromContext(ctx)
         .          .     50:           if originInbound != nil {
         .          .     51:                   // get a shallow copy to avoid modifying the inbound tag in upstream context
         .          .     52:                   *inbound = *originInbound
         .          .     53:           }
         .          .     54:
         .          .     55:           inbound.Tag = l.config.InboundTag
         .          .     56:
         .          .     57:           ctx = session.ContextWithInbound(ctx, inbound)
         .          .     58:
         .          .     59:           rawConn, err := l.dispatcherInstance.Dispatch(ctx, dialDest)
         .          .     60:           if err != nil {
         .          .     61:                   return err
         .          .     62:           }
         .          .     63:
         .          .     64:           var readerOpt cnc.ConnectionOption
         .          .     65:           if dialDest.Network == net.Network_TCP {
         .          .     66:                   readerOpt = cnc.ConnectionOutputMulti(rawConn.Reader)
         .          .     67:           } else {
         .          .     68:                   readerOpt = cnc.ConnectionOutputMultiUDP(rawConn.Reader)
         .          .     69:           }
         .          .     70:
         .          .     71:           conn = cnc.NewConnection(cnc.ConnectionInputMulti(rawConn.Writer), readerOpt)
         .          .     72:           return nil
         .          .     73:   })
         .          .     74:   if err != nil {
         .          .     75:           return errors.New("failed to open connection to ", destination).Base(err)
         .          .     76:   }
         .          .     77:   defer conn.Close()
         .          .     78:
         .          .     79:   requestDone := func() error {
         .          .     80:           var writer buf.Writer
         .          .     81:           if destination.Network == net.Network_TCP {
         .          .     82:                   writer = buf.NewWriter(conn)
         .          .     83:           } else {
         .          .     84:                   writer = &buf.SequentialWriter{Writer: conn}
         .          .     85:           }
         .          .     86:
         .          .     87:           if err := buf.Copy(input, writer); err != nil {
         .          .     88:                   return errors.New("failed to process request").Base(err)
         .          .     89:           }
         .          .     90:
         .          .     91:           return nil
         .          .     92:   }
         .          .     93:
         .          .     94:   responseDone := func() error {
         .          .     95:           var reader buf.Reader
         .          .     96:           if destination.Network == net.Network_TCP {
         .          .     97:                   reader = buf.NewReader(conn)
         .          .     98:           } else {
         .          .     99:                   reader = buf.NewPacketReader(conn)
         .          .    100:           }
         .          .    101:           if err := buf.Copy(reader, output); err != nil {
         .          .    102:                   return errors.New("failed to process response").Base(err)
         .          .    103:           }
         .          .    104:
         .          .    105:           return nil
         .          .    106:   }
         .          .    107:
         .        200    108:   if err := task.Run(ctx, requestDone, task.OnSuccess(responseDone, task.Close(output))); err != nil {
         .          .    109:           return errors.New("connection ends").Base(err)
         .          .    110:   }
         .          .    111:
         .          .    112:   return nil
         .          .    113:}
ROUTINE ======================== github.com/xtls/xray-core/proxy/loopback.(*Loopback).Process.func2 in /home/kuma/work/xray-core/proxy/loopback/loopback.go
         0        200 (flat, cum) 32.21% of Total
         .          .     79:   requestDone := func() error {
         .          .     80:           var writer buf.Writer
         .          .     81:           if destination.Network == net.Network_TCP {
         .          .     82:                   writer = buf.NewWriter(conn)
         .          .     83:           } else {
         .          .     84:                   writer = &buf.SequentialWriter{Writer: conn}
         .          .     85:           }
         .          .     86:
         .        200     87:           if err := buf.Copy(input, writer); err != nil {
         .          .     88:                   return errors.New("failed to process request").Base(err)
         .          .     89:           }
         .          .     90:
         .          .     91:           return nil
         .          .     92:   }
ROUTINE ======================== github.com/xtls/xray-core/proxy/vless/outbound.(*Handler).Process in /home/kuma/work/xray-core/proxy/vless/outbound/outbound.go
         0          1 (flat, cum)  0.16% of Total
         .          .    148:func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer internet.Dialer) error {
         .          .    149:   outbounds := session.OutboundsFromContext(ctx)
         .          .    150:   ob := outbounds[len(outbounds)-1]
         .          .    151:   if !ob.Target.IsValid() && ob.Target.Address.String() != "v1.rvs.cool" {
         .          .    152:           return errors.New("target not specified").AtError()
         .          .    153:   }
         .          .    154:   ob.Name = "vless"
         .          .    155:
         .          .    156:   rec := h.server
         .          .    157:   var conn stat.Connection
         .          .    158:
         .          .    159:   if h.testpre > 0 && h.reverse == nil {
         .          .    160:           h.initpre.Do(func() {
         .          .    161:                   h.preConns = make(chan *ConnExpire)
         .          .    162:                   for range h.testpre { // TODO: randomize
         .          .    163:                           go func() {
         .          .    164:                                   defer func() { recover() }()
         .          .    165:                                   ctx := xctx.ContextWithID(context.Background(), session.NewID())
         .          .    166:                                   for {
         .          .    167:                                           conn, err := dialer.Dial(ctx, rec.Destination)
         .          .    168:                                           if err != nil {
         .          .    169:                                                   errors.LogWarningInner(ctx, err, "pre-connect failed")
         .          .    170:                                                   continue
         .          .    171:                                           }
         .          .    172:                                           h.preConns <- &ConnExpire{Conn: conn, Expire: time.Now().Add(time.Minute * 2)} // TODO: customize & randomize
         .          .    173:                                           time.Sleep(time.Millisecond * 200)                                             // TODO: customize & randomize
         .          .    174:                                   }
         .          .    175:                           }()
         .          .    176:                   }
         .          .    177:           })
         .          .    178:           for {
         .          .    179:                   connTime := <-h.preConns
         .          .    180:                   if connTime == nil {
         .          .    181:                           return errors.New("closed handler").AtWarning()
         .          .    182:                   }
         .          .    183:                   if time.Now().Before(connTime.Expire) {
         .          .    184:                           conn = connTime.Conn
         .          .    185:                           break
         .          .    186:                   }
         .          .    187:                   connTime.Conn.Close()
         .          .    188:           }
         .          .    189:   }
         .          .    190:
         .          .    191:   if conn == nil {
         .          1    192:           if err := retry.ExponentialBackoff(5, 200).On(func() error {
         .          .    193:                   var err error
         .          .    194:                   conn, err = dialer.Dial(ctx, rec.Destination)
         .          .    195:                   if err != nil {
         .          .    196:                           return err
         .          .    197:                   }
ROUTINE ======================== github.com/xtls/xray-core/proxy/vless/outbound.(*Handler).Process.func2 in /home/kuma/work/xray-core/proxy/vless/outbound/outbound.go
         0          1 (flat, cum)  0.16% of Total
         .          .    192:           if err := retry.ExponentialBackoff(5, 200).On(func() error {
         .          .    193:                   var err error
         .          1    194:                   conn, err = dialer.Dial(ctx, rec.Destination)
         .          .    195:                   if err != nil {
         .          .    196:                           return err
         .          .    197:                   }
         .          .    198:                   return nil
         .          .    199:           }); err != nil {
(pprof) 

@xsm1997
Copy link
Copy Markdown
Author

xsm1997 commented Apr 23, 2026

往下追踪buf.Copy,看到了有意思的东西:

<-r.done

r.mb, r.err = r.Reader.ReadMultiBuffer()

这两行都有200个goroutine在等待。但是它们不应该等的是同一个reader,可能是哪里循环阻塞/死锁了?

@Fangliding
Copy link
Copy Markdown
Member

原始pprof文件传上来

@Fangliding
Copy link
Copy Markdown
Member

还有可以先试一下 https://github.com/XTLS/Xray-core/actions/runs/24805809990

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Tunnel入站无法正常关闭Half-closed连接

2 participants