目录

async、packaged_task、promise、future的区别与使用


使用方法

想要更详尽的介绍可以看看这本书,这几个函数牵扯到的内容是并发操作的同步,对应《C++并发编程实战》的第四章,这本书很难啃,有很多地方我也是断断续续看了几遍才有体会。

首先我们来看看让人迷惑的地方:std::ayncstd::package_taskstd::promise 这几个调用都能获得 std::future,那到底有什么区别呢?下面我们挨个来看看基本使用方法,再进行一个小总结。

std::async

前言

async 这个词在其他语言中想必也不陌生,比如 js 里有await和async关键字用于执行异步任务等等,C++中的这个词表示一个函数,实际上也是用于异步执行。但是与js那个实现原理有很大的不同,简单的说就是js那个更加的上层,而C++这个偏底层。导致的区别就是js那两个关键字配合起来只为达到并发的效果,而其中的实现细节是非常复杂的,比如js实际上始终只用到了一个线程来实现了并发,这点类似于协程,具体到内部js是通过事件循环队列来实现的,所以你始终无法确切的理解到它的底层工作。 而C++的async与之相比,就好像一个原始人,需要你传递参数来确定它的工作机制,但和C++标准库的其他api相比,它却是非常高级的存在。

使用

在此之前先简单了解下C++的async使用方式,一个简单的代码如下:

#include <future>
#include <iostream>
int find_the_answer_to_ltuae();
void do_other_stuff();
int main()
{
    std::future<int> the_answer=std::async(find_the_answer_to_ltuae);
    do_other_stuff();
    std::cout<<"The answer is "<<the_answer.get()<<std::endl;
}

这个函数会返回一个future用于将异步的任务和主线程同步。也就是当你调用future的get方法时,它会等待异步任务完成,并得到对应的返回值。 关于 std::future 也有三个重要的api:wait、wait_for、get,前面两个不返回结果,只等待任务,wait_for可以设置等待的时间限制。

但是请注意,这个任务不一定是按照你想的那样异步执行的,它可能在你调用get方法的时候才执行,这个取决于你传递的第一个参数。

  • std::launch::async:表示会开启一个线程去执行任务。
  • std::launch::deferred:表示延迟调用,只有在外界需要得到结果的时候才调用。
  • 不传参数或者两者相与:表示由C++底层去调度,可能是async也可能是defferred。

实际上async函数在这几个C++的api里是最高级的,它同时拥有了异步与同步的能力。

std::packaged_task

这个api与线程是没有任何关系的,它只是负责把普通的函数或者仿函数包装成方便异步转同步的任务。

比如我们开启一个线程去执行任务,我们又想对外界保有同步获取任务结果的能力,这个时候就需要用 std::package_task 来对现有的任务进行封装了。 如下代码:

以抖音用户的一次点赞行为对应的数据库底层需要调用的api举例子: plusVideoLike:增加对应视频的点赞数目,返回值为自定义的error类型 plusUserLikeList:增加用户喜欢的视频,返回值为自定义的error类型 很明显,这个两个过程互相并不影响,可以使用并发操作进行。

UserService::addLike(int uid,int vid){
    auto task1 = std::package_task<error(int)>{plusVideoLike};
    auto task2 = std::package_task<error(int,int)>{plusUserLikeList};
    auto f1 = task1.get_future();
    auto f2 = task2.get_future();
    std::thread t1([&](){
       task1(vid); //video点赞+1 
    });
    std::thread t2([&](){
        task2(uid,vid); //用户点赞视频+1
    });
    
    //得到返回值的同时完成同步操作
    
    if(f1.get() == 某错误){
        
    }
    if(f2.get()== 某错误{
    
    }
}

std::promise

从上面看下来,我们其实不难发现,async和packaged_task好像有某种联系?似乎async是封装了packaged_task的方便操作的api,没错大概就是这样。同样promise也是packaged_task中不可缺少的一环。

std::promise算是C++中提供的方便线程间通信的纽带之一,它的行为有些类似于管道,熟悉Go的朋友,你可以把这个类似于无缓冲的管道,通过promise我们存入数据,通过它的future我们读取数据,这里就类似于管道的读写了,只不过不能通过一个类型进行双向读写,这一点还是没有Golang的方便啊。

线程间通信我们一般都是通过共享内存,而共享内存写起来就很麻烦,为了线程安全以及事件完成的通知还要手动去封装一大堆的东西(用条件变量、信号量之类的去封装),我们大部分场景并不需要这么高的控制粒度,所以标准库帮你封装了一个promise和对应的future进行线程间的通信也是一个选择,当然如果能再进化成Golang中的channl模样,那就更好了(似乎C++20有协程又有管道😁)。

说了这么多,来一个简单的例子吧。

我们再切换到一个场景:实现一个聊天程序的后台服务器,我们举其中较为契合的地方,比如一个聊天室的消息收发,当聊天室里的一个人发送消息后,需要将该消息对聊天室里的其他人进行广播,这个广播的过程,我们可以用并发来进行,但是最好建议不要把每个任务都开启一个新线程去执行,这样服务器迟早崩溃,最好的做法是使用线程池或者直接上协程无所畏惧。这里的执行成功或者失败需要传递到外界,这时我们可以使用promise当然你也可以用package_task将任务进行封装,只不过没有promise灵活(但大部分时候比promise好用,比如这里我还是用package_task好些)

我们把该类命名为ChatRoom类,方法如下:

  • broadcast(int userId,Message msg);用于对消息进行广播

房间内有非常多的用户对应User类,而user有对应的接收消息的方法:

  • receive(Message msg); 接收对应的消息
broadcast(int userId,Message msg){
    vector<future<error>> futures;
    for(auto&& user:Users){
        if(user.id != userId){
           thread_pool.submit([&]{
           package_task<bool(Message)>task{std::bind(User::recive,&user)};
           futures.push_back(task.get_future());
           task(msg);
          });
        }
    }
    bool f;
    do{ //主线程等待完成并进行对应的错误处理,当全都完成任务执行完成
        f = true;
        for(auto&& f:futures){
        if(f.wait_for(std::chrono::seconds(1) == std::future_status::ready){
            ... //对应的错误处理
            continue;
        }
        f = false;
      }
   }while(f);
}

其实这里的线程池返回一个 future 是最好,否则同步操作将不那么优雅。

自顶向下

看完上面的内容,相信你对这些线程同步的api应该有了一定的了解,我们会发现这几个api的应用场景都非常相似,实际上是一层套一层的往上进行封装起来得到对应的功能的。

有了promise,我们可以封装出package_task,有了package_task我们可以封装出async,这都是一环套一环,使用的复杂度不断降低的同时,线程同步的灵活性也在不断降低。当我们需要自己封装一套工具的时候,大概率是用的promise和package_task比较多,而我们只管简单使用时,则最好时使用已经封装好的上层api。比如我们如果要自己实现一个返回future的线程池,那么封装packaged_task是最好的选择

现在我们自顶向下,来简单看看如何实现(以下代码非原创,如有雷同,那就是我抄的😥)。

用packaged_task实现async

std::future<int> my_async(function<int(int i)> task, int i)
{
    std::packaged_task<int(int)> package{task};
    std::future<int> f = package.get_future();

    std::thread t(std::move(package), i);
    t.detach();
    return f;
}

int main()
{
    auto task = [](int i) { std::this_thread::sleep_for(std::chrono::seconds(5)); return i+100; };

    std::future<int> f = my_async(task, 5);
    std::cout << f.get() << std::endl;
    return 0;
}

使用promise实现packaged_task

template <typename> class my_task;

template <typename R, typename ...Args>
class my_task<R(Args...)>
{
    std::function<R(Args...)> fn;
    std::promise<R> pr;             // the promise of the result
public:
    template <typename ...Ts>
    explicit my_task(Ts &&... ts) : fn(std::forward<Ts>(ts)...) { }

    template <typename ...Ts>
    void operator()(Ts &&... ts)
    {
        pr.set_value(fn(std::forward<Ts>(ts)...));  // fulfill the promise
    }

    std::future<R> get_future() { return pr.get_future(); }

    // disable copy, default move
};

总结

  1. 如果需要封装自己的api方便使用,那么packaged_task和promise可能是你最常用的。
  2. 如果想要开箱即用,那么直接async也挺好。