Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

客户端主动断开链接时服务端未执行完的任务需要清理吗 #1684

Open
povs2012 opened this issue Jan 10, 2025 · 13 comments
Open

Comments

@povs2012
Copy link

我用workflow做服务端推送,在process里创建一个条件任务,在客户端主动关闭链接时,如果这个条件任务一直没有信号触发,那是否这个task永远不会完成了,我看series的callback一直没有执行

    SeriesWork* series_work = series_of(task);
    task->get_connection()->set_context(nullptr, [series_work,](void*)
    {
        log_info("connection closed");
        series_work->cancel(); // 这里加不加cancel都不行
    });
    series_work->set_context(ctx);
    series_work->set_callback([this](const SeriesWork* series)
    {
           // 这里一直未执行
    }

要一直等到我服务端下次触发定时发心跳触发条件任务才会执行到callback

@povs2012
Copy link
Author

我后来发现需要在链接断开时先cancel任务,再触发一次条件任务才会真正执行到callback

@Barenboim
Copy link
Contributor

Barenboim commented Jan 10, 2025

是的,cancel一个series无法打断一个运行中的任务,只能在任务结束之后取消后续的任务。你现在这个做法有一些风险吧,连接并不与series绑定,你并不能保证cancel时series还在不在。除非你保证都是短连接,这个做法好像是安全的。

比较好的判断client是否关闭了连接的做法,是在中间task callback里,调用server task的closed()接口,如果连接已经关了就cancel series,或不再加入新任务。

@Barenboim
Copy link
Contributor

我大概明白你的实际问题了。你希望cancel的时候,已经dispatch但是未被signal的WFConditional任务,可以像series里还没有被调起的任务一样被取消。你这个想法还是比较自然而且比较有趣。目前dispatch了的conditional就是一个运行中的任务无法打断,必须接收signal才可以结束。我想想你这个问题可不可以解决。

@povs2012
Copy link
Author

是的,我是觉得如果客户端断开,定时任务、WFConditional任务、WFCounterTask这类的任务都应该取消,否则不注意很容易出现任务永远不完成,会出现资源泄漏

@Barenboim
Copy link
Contributor

Barenboim commented Jan 13, 2025

是的,我是觉得如果客户端断开,定时任务、WFConditional任务、WFCounterTask这类的任务都应该取消,否则不注意很容易出现任务永远不完成,会出现资源泄漏

你对workflow的各种控制组件玩得很熟啊,而且这个想法也很有意思。但我想了一会感觉好像比较难搞,特别是对于非命名的counter,conditioal,直接把任务干掉无法保证其它人不会再对它进行count或signal。而非命名的timer,虽然也是可以cancel的,但实现起来非常复杂,而且可能需要增加不必要的锁。

但我现在也比较关注你这个需求怎么才可以得到解决。因为你现在这个写法是不严密的。在连接关闭时取消series这个做法,如果确实取消了那没有什么问题。但是如果没有取消,就需要在server task的callback里把context设置回去。注意只有在process和server task callback里可以拿到连接并修改上下文。所以做法如下:

void process(WFHttpTask *server_task)
{
    SeriesWork *series = series_of(server_task);

    // 设置链接上下文和deleter,把series当作上下文。
    auto deleter = [](void*context) {
        log_info("connection closed");
        if (context)  // 这里需要判断空,是因为task callback里会置空这个context,但无法原子的置空deleter
        {
            ((SeriesWork *)context)->cancel();
            // 处理正在等待的timer,counter或conditional,最好是用命名组件,这样没有并发的问题需要处理。
        }
    });
    server_task->get_connection()->set_context(series, deleter);

    // 设置的callback,回复结束后会被调用。如果series被cancel成功,不会被调用。
    server_task->set_callback([deleter](WFHttpTask *server_task) {
        SeriesWork *series = series_of(server_task);
        WFConnection *conn = server_task->get_connection();
        if (conn) // 在callback里设置连接上下文,需要检查链接是否还存在
            conn->test_set_context(series, nullptr, deleter);  // 因为原子性问题,deleter无法置为nullptr
    });

    ...
}

我认为以上代码可以正确处理你的需求,但依然需要你手动给正在等待的conditional等组件发信号。

比较复杂的是callback里设置(清除)连接上下文的操作。server task回复成功之后,连接就可能被复用了,所以清除上下文要判断一下是否已经被设置过了。这里通过比较context是否还是本series来实现。但deleter还是需要设置成原值而不是nullptr,因为做不到联合的原子性(可查看一下WFConnection.h里的实现)。我考虑加两个只修改context不动deleter的函数。

上面的代码还比较复杂,涉及workflow比较高级的玩法。但基于你对熟悉程度看,应该可以理解。

@Barenboim
Copy link
Contributor

Barenboim commented Jan 13, 2025

3aeb5a0
看一下这个commit。如果用这个最新代码,callback里可以就可以直接写成:

conn->test_set_context(series, nullptr);

不用再捕获deleter。
如果有什么好的想法欢迎继续讨论。

@povs2012
Copy link
Author

povs2012 commented Jan 13, 2025

其实我的原始需求很简单,就是我后台有一个消息总线服务,客户端发起一个订阅请求,会向消息总线注册订阅的主题,消息总线定时刷新收消息,收到消息发一个signal使WFHttpTask *server_task push消息给客户端(基本上就是WFrest的sse实现模式),因为我不想每一个请求都创建定时任务收发消息,所以我自身从消息总线上收发消息的定时器是全局唯一的,如果是基于libuv这类的tcp长链接的,一般就是设置个connection的回调,在连接建立时订阅,在链接断开时取消订阅,非常直观。我想的是workflow是不是也可以提供一个回调,保证哪怕客户端断开链接也能正常响应的回调,现在的task的callback并不能保证调用

@Barenboim
Copy link
Contributor

你说“task的callback并不能保证调用”这句话我不太能理解。task的callback是否调用是个确定的事件。你说的task是指这个server task吗?你希望在callback里做什么事情呢?

@povs2012
Copy link
Author

我说错了,不是task的callback,是series_of(task)->set_callback不一定调用,我是觉得这个callbck要能客户端取消时也调用,这样就可以在这个callback里清理一些资源了;就是我上面的例子,因为我在WFHttpTask *server_task的process里创建了一个WFConditional任务等待接收消息总线的消息,因为客户端主动断开了连接,此时已经没人订阅这个消息了,理论上此时应该可以安全的取消对应主题的订阅了,但是SeriesWork 的callback不一定什么时候才会调用,导致不能及时清理

@Barenboim
Copy link
Contributor

Barenboim commented Jan 13, 2025

我说错了,不是task的callback,是series_of(task)->set_callback不一定调用,我是觉得这个callbck要能客户端取消时也调用,这样就可以在这个callback里清理一些资源了;就是我上面的例子,因为我在WFHttpTask *server_task的process里创建了一个WFConditional任务等待接收消息总线的消息,因为客户端主动断开了连接,此时已经没人订阅这个消息了,理论上此时应该可以安全的取消对应主题的订阅了,但是SeriesWork 的callback不一定什么时候才会调用,导致不能及时清理

不是啊,series的callback是一定会调用的,无论是否cancel。而且这个我在注释里也写了:

    /* Cancel a running series. Typically, called in the callback of a task
     * that belongs to the series. All subsequent tasks in the series will be
     * destroyed immediately and recursively (ParallelWork), without callback.
     * But the callback of this canceled series will still be called. */
     virtual void cancel() { this->canceled = true; }

这个必须是会调用的。只有被cancel的series里,没有执行的task是直接删除的。如果你的series callback没有调用,看看是否是series还没有完成(有一个task正在执行,包括正在等信号的conditional)。

@Barenboim
Copy link
Contributor

看起来主要问题还是conditional没有办法直接结束,这个还是只能手动处理。

@povs2012
Copy link
Author

是的,因为这个conditional等在了一个信号上,这个信号不确定什么时候会触发,除非我单独设计个短时心跳定时触发这个信号才能保证及时触发,但其实客户端已经断开了,在断开时就可以进行资源的清理了,没用必要等到下次心跳或下次来对应主题的消息。

@Barenboim
Copy link
Contributor

是的,因为这个conditional等在了一个信号上,这个信号不确定什么时候会触发,除非我单独设计个短时心跳定时触发这个信号才能保证及时触发,但其实客户端已经断开了,在断开时就可以进行资源的清理了,没用必要等到下次心跳或下次来对应主题的消息。

那你可以在你上面的connection deleter里做这个触发。我后面那个程序是解决长连接情况下的问题,但感觉你这个业务不太可能连接还要复用。所以你在deleter里触发信号,应该可以。series cancel可能都没有必要调用了(process里可能需要调用一下server_task->noreply())。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants