目录

细粒度锁线程安全队列实现


包含同步语义的简单实现

template <typename T>
class ThreadSafeQueue
{
public:
    void Push(T new_value)
    {
        std::lock_guard<std::mutex> lk(m_mtx);
        m_queue.push(std::move(new_value));
        m_cond.notify_one(); // 1
    }

    void WaitAndPop(T &value) // 2
    {
        std::unique_lock<std::mutex> lk(m_mtx);
        m_cond.wait(lk, [this]
                    { return !m_queue.empty(); });
        value = std::move(m_queue.front());
        m_queue.pop();
    }

    std::shared_ptr<T> WaitAndPop() // 3
    {
        std::unique_lock<std::mutex> lk(m_mtx);
        m_cond.wait(lk, [this]
                    { return !m_queue.empty(); }); // 4
        std::shared_ptr<T> res(
            std::make_shared<T>(std::move(m_queue.front())));
        m_queue.pop();
        return res;
    }

    bool TryPop(T &value)
    {
        std::lock_guard<std::mutex> lk(m_mtx);
        if (m_queue.empty())
            return false;
        value = std::move(m_queue.front());
        m_queue.pop();
        return true;
    }

    std::shared_ptr<T> TryPop()
    {
        std::lock_guard<std::mutex> lk(m_mtx);
        if (m_queue.empty())
            return std::shared_ptr<T>(); // 5
        std::shared_ptr<T> res(
            std::make_shared<T>(std::move(m_queue.front())));
        m_queue.pop();
        return res;
    }

    bool Empty() const
    {
        std::lock_guard<std::mutex> lk(m_mtx);
        return m_queue.empty();
    }

private:
    mutable mutex m_mtx;
    queue<T> m_queue;
    condition_variable m_cond;
};

这个版本是最为简单的实现版本,直接用的stl库中的队列来实现,所有成员函数公用一把锁来实现线程安全,需要注意的点有以下几点:

  1. 条件变量产生的虚假唤醒,你可以通过手动while循环来避免,也可以通过在wait后面加上谓词条件(lamda表达式)
  2. 锁需要设置为mutable,保证const版本的成员函数可用

但这个实现有非常大的隐患和不足!

隐患

如果在调用WaitAndPop函数时发生了异常,由于可能有其他的线程也在调用WaitAndPop发生等待,而由于每次notify一个线程,一旦构造 std::shared_ptr的过程中发生异常,那么其他的线程将会陷入永久的等待!

解决方法: 由于异常发生在内存的申请过程中,我们如果把 std::queue 中直接存入 shared_ptr 那么就不会有这个问题。

改写后的代码如下:

template <typename T>
class ThreadSafeQueue
{
public:
    void Push(T new_value)
    {
        auto data = std::make_shared(std::move(new_value));
        std::lock_guard<std::mutex> lk(m_mtx);
        m_queue.push(data);
        m_cond.notify_one(); // 1
    }

    void WaitAndPop(T &value) // 2
    {
        std::unique_lock<std::mutex> lk(m_mtx);
        m_cond.wait(lk, [this]
                    { return !m_queue.empty(); });
        value = std::move(*m_queue.front());
        m_queue.pop();
    }

    std::shared_ptr<T> WaitAndPop() // 3
    {
        std::unique_lock<std::mutex> lk(m_mtx);
        m_cond.wait(lk, [this]
                    { return !m_queue.empty(); }); // 4
        std::shared_ptr<T> res = m_queue.front();
        m_queue.pop();
        return res;
    }

    bool TryPop(T &value)
    {
        std::lock_guard<std::mutex> lk(m_mtx);
        if (m_queue.empty())
            return false;
        value = std::move(*m_queue.front());
        m_queue.pop();
        return true;
    }

    std::shared_ptr<T> TryPop()
    {
        std::lock_guard<std::mutex> lk(m_mtx);
        if (m_queue.empty())
            return std::shared_ptr<T>(); // 5
        std::shared_ptr<T> res = m_queue.front();
        m_queue.pop();
        return res;
    }

    bool Empty() const
    {
        std::lock_guard<std::mutex> lk(m_mtx);
        return m_queue.empty();
    }

private:
    mutable mutex m_mtx;
    queue<std::shared_ptr<T>> m_queue;
    condition_variable m_cond;
};

这个版本的代码不仅是预防了异常安全,同样性能也得到了很好的优化,Push 过程的内存申请过程可以放到临界区以外,提高了并发度。

设计细粒度锁队列提高并发

前面的简单版本,有个非常明显的不足,几乎没有任何并发的性能,因为所有的成员函数都必须加锁,临界区非常的大,这哪里是并发,这都强行变成了同步执行,那这样肯定不行啊,我们找找原因。

这个原因很简单,由于我们是通过stl内部的queue封装所实现的,我们的任何的成员函数操作实现都必须访问到这个共享变量,一旦变量被共享,要实现线程安全那就必须加锁同步,这便是原因所在了。

这就是我们现在要做的事情,把锁的粒度减少,实际就是把变量的共享和操作细分。

细粒度锁队列实现

实现简单队列

在这之前我们先自己实现一个简单的队列,如下:

template<typename T>
class Queue
{
private:
    struct node
    {
        T data_;
        std::unique_ptr <node> next_;

        node(T data) : data_(std::move(data))
        {}
    };

    std::unique_ptr <node> m_head;
    node *m_tail{};

public:
    Queue() = default;

    Queue(const Queue &other) = delete;

    Queue &operator=(const Queue &other) = delete;

    std::shared_ptr <T> TryPop()
    {
        if (!m_head)
        {
            return nullptr;
        }
        auto ret = std::make_shared<T>(std::move(m_head->data_));
        auto oldHead = std::move(m_head); 
        m_head = std::move(oldHead->next_); //这里把next资源进行转移,防止oldHead析构后导致整个链表析构
        return ret;
    }

    void Push(T new_value)
    {
        auto p = std::make_unique<node>(new_value);
        auto *new_tail = p.get();
        if (m_tail)
        {//如果队列不为空
            m_tail->next_ = std::move(p);
        } else
        {//队列为空则需要特殊处理
            m_head = std::move(p);
        }
        m_tail = new_tail;
    }
};
  • next指针为啥不用原始指针?嗯,其实应该要用原始指针的,这里偷个懒,为了不写delete语句,用的unique_ptr,在使用这个独占指针的时候记得要转移所有权,否则会出现连环析构的现象!
  • 由于使用了unique_ptr管理next_指针,那么析构的时候会自动完成,但是会有个问题,如果队列中的数据量大的话,整个函数栈会爆掉,我亲自测试了下,大概存入的数据量达到1e4级别就会爆栈。。。但是没关系,我们将他用作并发编程中的队列时,用于生产消费的队列里的空闲任务一般也不会到达这个量级,当然有空的话也可以改进然后优化。

分析并发设计

我们再来简单分析下这个内存共享的情况pop操作需要用到head,push操作需要用到head和tail。但是有个严重的问题:除了这两个内存被共享外,由于未采用空头节点,两个成员函数内用 next_ 指针访问到的内存都可能发生共享(对应 m_tail->next_oldHead->next_)。这样的话很难在保证细粒度的情况下实现线程安全了。。。这样下去的实现还不如之前的。

通过分离数据实现并发

前面的隐患已经分析清楚了,如何解决它?你可以使用预分配一个虚拟节点(无数据),确保这个节点永远在队列的最后,用来分离头尾指针能访问的节点”的办法,走出这个困境。这样通过 pop 和 push 操作通过 next_ 指针访问到的数据就永远不可能是同一个数据了。 代码如下:

template<typename T>
class Queue
{
private:
    struct node
    {
        std::shared_ptr<T> data_;
        std::unique_ptr <node> next_;
    };

    std::unique_ptr <node> m_head;
    node *m_tail;

public:
    Queue():m_head(new node),m_tail(m_head.get()){}; //初始化空节点

    Queue(const Queue &other) = delete;

    Queue &operator=(const Queue &other) = delete;

    std::shared_ptr <T> TryPop()
    {
        if (m_head.get() == m_tail)
        {
            return nullptr;
        }
        auto ret = m_head->data_;
        auto oldHead = std::move(m_head); 
        m_head = std::move(oldHead->next_);
        return ret;
    }

    void Push(T new_value)
    {
        auto data = std::make_shared<T>(std::move(new_value));
        auto p = std::make_unique<node>(new_value); //新的空节点
        m_tail->data_ = data;
        //开始移动补充最后的空节点
        auto* new_tail = p.get();
        m_tail->next_ = std::move(p);
        m_tail = new_tail;
    }
};

现在两个操作共享的内存就只有 m_head 和 m_tail 了,而且在 Push 操作中只使用到了共享内存 m_tail,那么接下来的并发安全实现可以开始细粒度化了,我们用两个互斥锁来实现它。一个互斥锁用于锁住访问m_head的行为,一个用于锁住访问m_tail的行为,具体到代码可以因使用时间的长短对临界区进行进一步缩小。

具体代码如下:

template<typename T>
class ThreadSafeQueue
{
    struct node
    {
        std::shared_ptr <T> data;
        std::unique_ptr <node> next;
    };
    std::mutex m_headMtx;
    std::unique_ptr <node> m_head;
    std::mutex m_tailMtx;
    node *m_tail;

public:
    ThreadSafeQueue() :
            m_head(new node), m_tail(m_head.get())
    {}

    ThreadSafeQueue(const ThreadSafeQueue &other) = delete;

    ThreadSafeQueue &operator=(const ThreadSafeQueue &other) = delete;

    std::shared_ptr <T> TryPop()
    {
        std::unique_ptr <node> old_head = pop_head();
        return old_head ? old_head->data : std::shared_ptr<T>();
    }

    void Push(T new_value)
    {
        std::shared_ptr <T> new_data(
                std::make_shared<T>(std::move(new_value)));
        std::unique_ptr <node> p(new node);
        node *const new_tail = p.get();
        
        //开始锁临界区
        std::lock_guard <std::mutex> tail_lock(m_tailMtx);
        m_tail->data = new_data;
        m_tail->next = std::move(p);
        m_tail = new_tail;
    }

private:
    
    node *get_tail()
    {
        std::lock_guard <std::mutex> tail_lock(m_tailMtx);
        return m_tail;
    }

    std::unique_ptr <node> pop_head()
    {
        //这里head一定要先被锁
        std::lock_guard <std::mutex> head_lock(m_headMtx);
        if (m_head.get() == get_tail())
        {
            return nullptr;
        }
        std::unique_ptr <node> old_head = std::move(m_head);
        m_head = std::move(old_head->next);
        return old_head;
    }
};

注意: 当get_tail()调用前,请确保 m_headMtx 已经上锁,这一步也是很重要的哦。如果不这样,调用pop_head()时,就无法确保 get_tail 得到的数据在使用的时候为最新,如下代码,如果进入head_lock临界区后,old_tail被其他线程改了,那么整个操作就不对了。

std::unique_ptr<node> pop_head() // 这是个有缺陷的实现
{
  node* const old_tail=get_tail();  // ① 在m_headMtx范围外获取旧尾节点的值
  std::lock_guard<std::mutex> head_lock(head_mutex);

  if(head.get()==old_tail)  // ②
  {
    return nullptr;
  }
  std::unique_ptr<node> old_head=std::move(head);
  head=std::move(old_head->next);  // ③
  return old_head;
}

再来看看异常安全是否有保证,如果TryPop() 中的对锁的操作会产生异常,由于直到锁获取后才能对数据进行修改。因此,TryPop()是异常安全的。另一方面,Push()可以在堆上新分配出一个T的实例,以及一个node的新实例,这里可能会抛出异常。但是,所有分配的对象都赋给了智能指针,那么当异常发生时,他们就会被释放掉。

并发度分析

TryPop()持有m_tailMtx也只有很短的时间,只为保护对tail的读取。因此,当有数据push进队列后,TryPop()几乎及可以完全并发调用了。同样在执行中,对m_headMtx的持有时间也是极短的。当并发访问时,会增加对TryPop()的访问次数;由于只有一个线程,在同一时间内可以访问pop_head(),且多线程情况下可以删除队列中的旧节点,并且安全的返回数据。

添加条件变量实现同步等待

现在已经实现了细粒度锁的线程安全队列,不过只有TryPop()可以并发访问(且只有一个重载存在)。那么方便的同步的WaitAndPop()呢?

Push实现

向队列中添加新节点是相当简单的——下面的实现与上面的代码差不多。

template<typename T>
void ThreadSafe<T>::Push(T new_value)
{
  auto new_data = std::make_shared<T>(std::move(new_value));
  std::unique_ptr<node> p(new node);
  {//生产临界区
    std::lock_guard<std::mutex> tail_lock(tail_mutex);
    tail->data=new_data;
    auto* new_tail=p.get();
    tail->next=std::move(p);
    tail=new_tail;
  }
  data_cond.notify_one();
}

WaitAndPop实现

template<typename T>
class ThreadSafeQueue
{
private:
  node* get_tail()
  {
    std::lock_guard<std::mutex> tail_lock(tail_mutex);
    return tail;
  }

  std::unique_ptr<node> pop_head()  // 1
  {
    std::unique_ptr<node> old_head=std::move(head);
    head=std::move(old_head->next);
    return old_head;
  }

  std::unique_lock<std::mutex> wait_for_data()  // 2
  {
    std::unique_lock<std::mutex> head_lock(head_mutex);
    data_cond.wait(head_lock,[&]{return head.get()!=get_tail();});
    return std::move(head_lock);  // 3
  }

  std::unique_ptr<node> wait_pop_head()
  {
    std::unique_lock<std::mutex> head_lock(wait_for_data());  // 4
    return pop_head();
  }

  std::unique_ptr<node> wait_pop_head(T& value)
  {
    std::unique_lock<std::mutex> head_lock(wait_for_data());  // 5
    value=std::move(*head->data);
    return pop_head();
  }
public:
  std::shared_ptr<T> WaitAndPop()
  {
    std::unique_ptr<node> const old_head=wait_pop_head();
    return old_head->data;
  }

  void WaitAndPop(T& value)
  {
    auto _ = wait_pop_head(value);
  }
};

可能大家看到代码好像有点多,实际上都只是为了代码的重用,例如pop_head()①和wait_for_data()②,这些函数分别是删除头结点和等待队列中有数据弹出的。wait_for_data()特别值得关注,因为其不仅等待使用lambda函数对条件变量进行等待,而且它还会将锁的实例返回给调用者③。这就确保了wait_pop_head的线程安全。pop_head()是对TryPop()代码的复用。

TryPop和Empty实现

template<typename T>
class ThreadSafeQueue
{
private:
  std::unique_ptr<node> try_pop_head()
  {
    std::lock_guard<std::mutex> head_lock(head_mutex);
    if(head.get()==get_tail())
    {
      return std::unique_ptr<node>();
    }
    return pop_head();
  }

  std::unique_ptr<node> try_pop_head(T& value)
  {
    std::lock_guard<std::mutex> head_lock(head_mutex);
    if(head.get()==get_tail())
    {
      return std::unique_ptr<node>();
    }
    value=std::move(*head->data);
    return pop_head();
  }
public:
  std::shared_ptr<T> TryPop()
  {
    std::unique_ptr<node> old_head=try_pop_head();
    return old_head?old_head->data:std::shared_ptr<T>();
  }

  bool TryPop(T& value)
  {
    std::unique_ptr<node> const old_head=try_pop_head(value);
    return old_head;
  }

  void Empty()
  {
    std::lock_guard<std::mutex> head_lock(head_mutex);
    return (head.get()==get_tail());
  }
};

简单测试

  • 一个生产者,三个消费者,数据量15000
次数 v1版本 v3版本 v3原始指针版
第一次 7.31ms 8.51 8.53
第二次 6.61ms 9.26 7.25
第三次 7.60ms 8.90 7.95
  • 一个生产者,三个同时消费生产,一个消费,数据量1500000
次数 v1版本 v3版本 v3原始指针版
第一次 397.90 爆栈 399.68
第二次 398.0 爆栈 362.06
第三次 319.28 爆栈 355.62

我的测试仅限于少量线程,而且任务负担也不重,故测出来的结果竟然是直接封装标准库的队列性能最好(都是在release模式下),我猜大概率是标准库的内存分配器优于我这个简单的内存管理,再加上我测试的线程数量非常少,细粒度的锁并未体现出它的优势。。。

完整代码

代码仓库:thread_safe_queue