linux中编写并发队列类

这篇文章主要介绍了linux中编写并发队列类,功能有:并发阻塞队列、有超时限制、有大小限制

设计并发队列

代码如下:

#include <pthread.h>

#include <list>

using namespace std;

template <typename T>

class Queue

{

public:

Queue( )

{

pthread_mutex_init(&_lock, NULL);

}

~Queue( )

{

pthread_mutex_destroy(&_lock);

}

void push(const T& data);

T pop( );

private:

list<T> _list;

pthread_mutex_t _lock;

};

template <typename T>

void Queue<T>::push(const T& value )

{

pthread_mutex_lock(&_lock);

_list.push_back(value);

pthread_mutex_unlock(&_lock);

}

template <typename T>

T Queue<T>::pop( )

{

if (_list.empty( ))

{

throw "element not found";

}

pthread_mutex_lock(&_lock);

T _temp = _list.front( );

_list.pop_front( );

pthread_mutex_unlock(&_lock);

return _temp;

}

上述代码是有效的。但是,请考虑这样的情况:您有一个很长的队列(可能包含超过 100,000 个元素),而且在代码执行期间的某个时候,从队列中读取数据的线程远远多于添加数据的线程。因为添加和取出数据操作使用相同的互斥锁,所以读取数据的速度会影响写数据的线程访问锁。那么,使用两个锁怎么样?一个锁用于读取操作,另一个用于写操作。给出修改后的 Queue 类。

代码如下:

template <typename T>

class Queue

{

public:

Queue( )

{

pthread_mutex_init(&_rlock, NULL);

pthread_mutex_init(&_wlock, NULL);

}

~Queue( )

{

pthread_mutex_destroy(&_rlock);

pthread_mutex_destroy(&_wlock);

}

void push(const T& data);

T pop( );

private:

list<T> _list;

pthread_mutex_t _rlock, _wlock;

};

template <typename T>

void Queue<T>::push(const T& value )

{

pthread_mutex_lock(&_wlock);

_list.push_back(value);

pthread_mutex_unlock(&_wlock);

}

template <typename T>

T Queue<T>::pop( )

{

if (_list.empty( ))

{

throw "element not found";

}

pthread_mutex_lock(&_rlock);

T _temp = _list.front( );

_list.pop_front( );

pthread_mutex_unlock(&_rlock);

return _temp;

}

设计并发阻塞队列

目前,如果读线程试图从没有数据的队列读取数据,仅仅会抛出异常并继续执行。但是,这种做法不总是我们想要的,读线程很可能希望等待(即阻塞自身),直到有数据可用时为止。这种队列称为阻塞的队列。如何让读线程在发现队列是空的之后等待?一种做法是定期轮询队列。但是,因为这种做法不保证队列中有数据可用,它可能会导致浪费大量 CPU 周期。推荐的方法是使用条件变量,即 pthread_cond_t 类型的变量。

代码如下:

template <typename T>

class BlockingQueue

{

public:

BlockingQueue ( )

{

pthread_mutexattr_init(&_attr);

// set lock recursive

pthread_mutexattr_settype(&_attr,PTHREAD_MUTEX_RECURSIVE_NP);

pthread_mutex_init(&_lock,&_attr);

pthread_cond_init(&_cond, NULL);

}

~BlockingQueue ( )

{

pthread_mutex_destroy(&_lock);

pthread_cond_destroy(&_cond);

}

void push(const T& data);

bool push(const T& data, const int seconds); //time-out push

T pop( );

T pop(const int seconds); // time-out pop

private:

list<T> _list;

pthread_mutex_t _lock;

pthread_mutexattr_t _attr;

pthread_cond_t _cond;

};

template <typename T>

T BlockingQueue<T>::pop( )

{

pthread_mutex_lock(&_lock);

while (_list.empty( ))

{

pthread_cond_wait(&_cond, &_lock) ;

}

T _temp = _list.front( );

_list.pop_front( );

pthread_mutex_unlock(&_lock);

return _temp;

}

template <typename T>

void BlockingQueue <T>::push(const T& value )

{

pthread_mutex_lock(&_lock);

const bool was_empty = _list.empty( );

_list.push_back(value);

pthread_mutex_unlock(&_lock);

if (was_empty)

pthread_cond_broadcast(&_cond);

}

并发阻塞队列设计有两个要注意的方面:

1.可以不使用 pthread_cond_broadcast,而是使用 pthread_cond_signal。但是,pthread_cond_signal 会释放至少一个等待条件变量的线程,这个线程不一定是等待时间最长的读线程。尽管使用 pthread_cond_signal 不会损害阻塞队列的功能,但是这可能会导致某些读线程的等待时间过长。

2.可能会出现虚假的线程唤醒。因此,在唤醒读线程之后,要确认列表非空,然后再继续处理。强烈建议使用基于 while 循环的 pop()。

设计有超时限制的并发阻塞队列

在许多系统中,如果无法在特定的时间段内处理新数据,就根本不处理数据了。例如,新闻频道的自动收报机显示来自金融交易所的实时股票行情,它每 n 秒收到一次新数据。如果在 n 秒内无法处理以前的一些数据,就应该丢弃这些数据并显示最新的信息。根据这个概念,我们来看看如何给并发队列的添加和取出操作增加超时限制。这意味着,如果系统无法在指定的时间限制内执行添加和取出操作,就应该根本不执行操作。

代码如下:

template <typename T>

bool BlockingQueue <T>::push(const T& data, const int seconds)

{

struct timespec ts1, ts2;

const bool was_empty = _list.empty( );

clock_gettime(CLOCK_REALTIME, &ts1);

pthread_mutex_lock(&_lock);

clock_gettime(CLOCK_REALTIME, &ts2);

if ((ts2.tv_sec – ts1.tv_sec) <seconds)

{

was_empty = _list.empty( );

_list.push_back(value);

}

pthread_mutex_unlock(&_lock);

if (was_empty)

pthread_cond_broadcast(&_cond);

}

template <typename T>

T BlockingQueue <T>::pop(const int seconds)

{

struct timespec ts1, ts2;

clock_gettime(CLOCK_REALTIME, &ts1);

pthread_mutex_lock(&_lock);

clock_gettime(CLOCK_REALTIME, &ts2);

// First Check: if time out when get the _lock

if ((ts1.tv_sec – ts2.tv_sec) < seconds)

{

ts2.tv_sec += seconds; // specify wake up time

while(_list.empty( ) && (result == 0))

{

result = pthread_cond_timedwait(&_cond, &_lock, &ts2) ;

}

if (result == 0) // Second Check: if time out when timedwait

{

T _temp = _list.front( );

_list.pop_front( );

pthread_mutex_unlock(&_lock);

return _temp;

}

}

pthread_mutex_unlock(&lock);

throw "timeout happened";

}

设计有大小限制的并发阻塞队列

最后,讨论有大小限制的并发阻塞队列。这种队列与并发阻塞队列相似,但是对队列的大小有限制。在许多内存有限的嵌入式系统中,确实需要有大小限制的队列。

对于阻塞队列,只有读线程需要在队列中没有数据时等待。对于有大小限制的阻塞队列,如果队列满了,写线程也需要等待。

代码如下:

template <typename T>

class BoundedBlockingQueue

{

public:

BoundedBlockingQueue (int size) : maxSize(size)

{

pthread_mutex_init(&_lock, NULL);

pthread_cond_init(&_rcond, NULL);

pthread_cond_init(&_wcond, NULL);

_array.reserve(maxSize);

}

~BoundedBlockingQueue ( )

{

pthread_mutex_destroy(&_lock);

pthread_cond_destroy(&_rcond);

pthread_cond_destroy(&_wcond);

}

void push(const T& data);

T pop( );

private:

vector<T> _array; // or T* _array if you so prefer

int maxSize;

pthread_mutex_t _lock;

pthread_cond_t _rcond, _wcond;

};

template <typename T>

void BoundedBlockingQueue <T>::push(const T& value )

{

pthread_mutex_lock(&_lock);

const bool was_empty = _array.empty( );

while (_array.size( ) == maxSize)

{

pthread_cond_wait(&_wcond, &_lock);

}

_array.push_back(value);

pthread_mutex_unlock(&_lock);

if (was_empty)

pthread_cond_broadcast(&_rcond);

}

template <typename T>

T BoundedBlockingQueue<T>::pop( )

{

pthread_mutex_lock(&_lock);

const bool was_full = (_array.size( ) == maxSize);

while(_array.empty( ))

{

pthread_cond_wait(&_rcond, &_lock) ;

}

T _temp = _array.front( );

_array.erase( _array.begin( ));

pthread_mutex_unlock(&_lock);

if (was_full)

pthread_cond_broadcast(&_wcond);

return _temp;

}

要注意的第一点是,这个阻塞队列有两个条件变量而不是一个。如果队列满了,写线程等待 _wcond 条件变量;读线程在从队列中取出数据之后需要通知所有线程。同样,如果队列是空的,读线程等待 _rcond 变量,写线程在把数据插入队列中之后向所有线程发送广播消息。如果在发送广播通知时没有线程在等待 _wcond 或 _rcond,会发生什么?什么也不会发生;系统会忽略这些消息。还要注意,两个条件变量使用相同的互斥锁。

(0)

相关推荐

  • Linux Shell多进程并发以及并发数控制

    本文小编为大家详细的讲解shell多进程并发,在大部分用户眼中,所谓的多进程 只不过是将多个任务放到后台执行而已,一起来看看吧具体的内容吧. 1. 基础知识准备 1.1. linux后台进程 Unix ...

  • Linux中IP隧道的分析与建议

    操作方法 01 Linux中IP隧道的分析与建议 ----随着计算机网络的日益普及,网络的安全成为目前的热门话题.本文以Linux 2.0.34(RedHat5.2采用)为基础,对隧道技术进行分析,并 ...

  • 在Linux中添加普通新用户

    在Linux中添加普通新用户 ,超级用户(也称为“root”)是一个具有修改系统中任何文件权力的特别账号。在日常工作中,最好不要使用超级用户账号进入系统,因为任何错误操作都可能导致巨大的损失。由于超级 ...

  • Linux中QQ软件的安装和配置

    很多朋友的机器上都安装了Linux,并且想在Linux环境中使用QQ。下面就将本人在Red Hat 9.0上配置QQ的成功经验共享出来。 选择QQ软件 腾迅公司并没有推出专门应用于Linux下的QQ软 ...

  • linux中grep命令的使用

    linux中grep命令的使用 grep (global search regular expression(RE) and print out the line,全面搜索正则表达式并把行打印出来)是 ...

  • Linux中find常见用法示例

    ·find path -option [ -print ] [ -exec -ok command ] {} ; find命令的参数; pathname: find命令所查找的目录路径.例如用.来表示 ...

  • Linux中如何对服务器进行压力测试

    http_load是基于Linux平台的一种性能测工具.它是以并行复用的方式运行,仅适用于Web页面的性能测试,不适用于访问数据库,而且测试结果分析是有限的,平台依赖Linux .http_load可 ...

  • Linux中 如何查看Ubuntu内存信息? 查看内存信息的命令

    分享一些在Linux中,查看内存信息的命令,不明觉厉,希望以后用的到 1. 查看内存插槽数,已使用插槽数,每条内存多大 sudo dmidecode|grep -P -A 5 "Memory/s+De ...

  • 在UNIX或Linux中TTY是什么意思

    在UNIX或Linux中TTY是什么意思,TTYN跟TTYP分别具体指的是什么意思,二者有什么差别? TTY 在Linux中,TTY也许是跟终端有关系的最为混乱的术语。TTY是TeleTYpe的一个老 ...