【Linux】线程ID与互斥、同步(锁、条件变量)
线程ID及进程地址空间布局
运行代码,这个很大的数字就是线程id。
通过 ps -aL 指令,看到LWP跟线程id是不一样的。
给用户提供的线程的id,不是内核中的lwp,而是pthread库维护的一个唯一值。
我们把上面的数字转换成十六进制,可以看出这是一个地址
理解库
动态库被运行时加载,动态库没被加载前在磁盘中。
pthread库本质是一个文件。
我们刚刚写的可执行程序,它也是个文件,所以他也在磁盘中。可执行程序内部用线程库来创建多线程。
程序运行时,会变成一个进程,加载到内存中,内存中就有该进程的代码和数据。创建线程时,要先把库加载到内存中,然后再映射到该进程的地址空间才能用。映射要映射到堆栈之间的共享区。如果有多个多线程进程,它只需要把共享区的代码,经过页表映射到已经加载到内存的库,此时多个进程就可以使用同一个库里的方法来创建线程。
Linux只维护轻量级进程,linux中的pcb里与执行流相关的属性都是轻量级进程的属性,所有的属性都是围绕lwp展开的。我们在用户层的概念是线程,要的是线程的id,与线程相关的内容在Linux中是没有的,它没有维护。所以这部分属性由库来进行维护。
为了更好的管理线程,创建线程时,库会为我们的每一个线程申请一个内存块(描述线程的相关结构体字段属性)。
未来要找一个线程的所有属性,只要找到线程控制块的地址即可。所以pthread_t id就是一个地址。
pthread_t类型的线程ID,本质就是线程属性集合的起始虚拟地址 ---- 在pthread库中维护。
由上图可得,一个全局变量,本身就是被所有线程共享的。
如果我们想让两个线程各自私有一份变量,g++有一个编译选项 __thread
用__thread修饰这个全局变量即可。运行后,主线程和新线程gval的地址也不一样了。这种情况叫线程的局部存储,原始代码里只看到一个gval,但是他们用的是各自的gval。
这种情况只在Linux中有效。__thread只能用来修饰内置类型。
线程简单封装
#pragma once
#include <iostream>
#include <string>
#include <pthread.h>
namespace ThreadMoudle
{
// 线程要执行的方法,后面我们随时调整
typedef void (*func_t)(const std::string &name); // 函数指针类型
class Thread
{
public:
void Excute()
{
std::cout << _name << " is running" << std::endl;
_isrunning = true;
_func(_name);
_isrunning = false;
}
public:
Thread(const std::string &name, func_t func):_name(name), _func(func)
{
std::cout << "create " << name << " done" << std::endl;
}
static void *ThreadRoutine(void *args) // 变成static,内部没有this指针,pthread_create就能匹配上了
{
Thread *self = static_cast<Thread*>(args); // 获得了当前对象
self->Excute();
return nullptr;
}
bool Start()
{
int n = ::pthread_create(&_tid, nullptr, ThreadRoutine, this);//传this,线程函数才能拿到_func方法
if(n != 0) return false;
return true;
}
std::string Status()
{
if(_isrunning) return "running";
else return "sleep";
}
void Stop()
{
if(_isrunning)
{
::pthread_cancel(_tid);
_isrunning = false;
std::cout << _name << " Stop" << std::endl;
}
}
void Join()
{
::pthread_join(_tid, nullptr);
std::cout << _name << " Joined" << std::endl;
}
std::string Name()
{
return _name;
}
~Thread()
{
}
private:
std::string _name;
pthread_t _tid;
bool _isrunning;
func_t _func; // 线程要执行的回调函数
};
} // namespace ThreadModle
AI助手
线程互斥
进程线程间的互斥相关背景概念
临界资源:多线程执行流共享的资源就叫做临界资源
临界区:每个线程内部,访问临界资源的代码,就叫做临界区
互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用
原子性:不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成
下面通过抢票代码演示:
void route(const std::string &name)
{
while(true)
{
if(tickets > 0)
{
// 抢票过程
usleep(1000); // 1ms -> 抢票花费的时间
printf("who: %s, get a ticket: %d\n", name.c_str(), tickets);
tickets--;
}
else
{
break;
}
}
}
int main()
{
Thread t1("thread-1", route);
Thread t2("thread-2", route);
Thread t3("thread-3", route);
Thread t4("thread-4", route);
t1.Start();
t2.Start();
t3.Start();
t4.Start();
t1.Join();
t2.Join();
t3.Join();
t4.Join();
}
AI助手
线程就用我们前面封装的线程。每抢到一张票就--,直到没票为止。
由上面结果可知,抢票抢到负数去了。因此多线程并发访问公共资源时可能会引发异常。
大部分情况,线程使用的数据都是局部变量,变量的地址空间在线程栈空间内,这种情况,变量归属单个线程,其他线程无法获得这种变量。
但有时候,很多变量都需要在线程间共享,这样的变量称为共享变量,可以通过数据的共享,完成线程之间的交互。
多个线程并发的操作共享变量,会带来一些问题。
为什么会出现抢到负数的问题呢?
判断的过程是计算,由cpu来做。判断的过程:数据要先从内存移动到对应的寄存器中,然后再进行逻辑判断,然后才能得到结果,最后cpu再决定要执行if还是else。
之前讲过,cpu内寄存器只有一套,但是寄存器里的数据可以有多套。
这里有四个线程进行抢票, 如果一个线程在判断到一半时被切换了,他需要把寄存器里的数值带走,等到被唤醒的时候,他又要把数值恢复。
比如:假设票只剩一张了,线程a已经在判断完了,当他准备抢票的时候,他被切换了。此时他就把上下文数据保存。线程b被唤醒了,他也来进行票数判断,因为刚刚线程a还没来得及抢票,票数没--,所以线程b也判断成功,他也要抢票。线程a此时被唤醒,就往后执行代码进行抢票,然后--,票数就变成0。线程b又被唤醒,票数又--,就变成负数了。
如何解决上面的问题呢?加锁!
锁
pthread_mutex_t是互斥锁类型。
互斥锁在任何时刻,只允许一个线程进行资源访问。
有了锁,我们往往需要初始化和销毁锁,初始化有两种做法:
如果定义的是全局或者静态的锁,可以只使用pthread_mutex_t 锁的名字 =PTHREAD_MUTEX_INITIALIZER
如果定义的这把锁是动态申请的,比如new或栈上开辟的,必须使用pthread_mutex_init函数来进行初始化。参数1就是你自己定义的锁,参数2是属性,直接设为nullptr即可。
用完锁后,还需要销毁,用pthread_mutex_destroy函数,参数是锁的地址。如果锁是静态或者全局的,我们不需要destroy,全局的或者静态的变量会随着进程的运行而一直存在,进程结束他也就自动释放了。初始化和销毁的返回值,成功返回0,失败返回-1。
一旦有了锁,我们就需要对临界区进行保护, 就需要加锁和解锁。要对某个区域加锁,就要调用pthread_mutex_lock函数来加锁,参数就是你定义的锁。要解锁,就用pthread_mutex_unlock函数。
lock的情况:
互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功
发起函数调用时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但没有竞争到互斥量, 那么pthread_ lock调用会陷入阻塞(执行流被挂起),等待互斥量解锁。
所谓对临界区资源进行保护,本质是对临界区代码进行保护。
把前面抢票的代码改一下,加锁。运行结果发现不会再抢到负数了,而且运行时间明显比之前要长。
加锁的范围,粒度要尽量小,即临界区尽量小。
所有线程申请锁,前提是所有线程都看得到这把锁,因此锁本身也是共享资源。所以加锁的过程必须是原子的
原子性:要么不做,要么完成了。没有中间状态,就是原子性。
如果线程申请锁失败了,线程就要被阻塞
如果线程申请锁成功了,就继续往后运行,执行临界区代码。
在执行临界区代码期间,线程可以被切换。假设线程1正在执行临界区代码,此时线程被切换了,其他线程也无法进入临界区,因为线程1并没有释放锁。
结论:我这个线程访问临界区,对其他线程来说是原子的。因为对于其他线程,我要么没有申请锁,要么释放了锁,这样对他们才有意义。
下面用局部的锁,使用前,先对线程重新封装
上面是增加的内容,下面是完整的代码
#pragma once
#include <iostream>
#include <string>
#include <pthread.h>
namespace ThreadMoudle
{
class ThreadData
{
public:
ThreadData(const std::string &name,pthread_mutex_t *lock):_name(name),_lock(lock)
{}
public:
std::string _name;
pthread_mutex_t* _lock;
};
// 线程要执行的方法,后面我们随时调整
typedef void (*func_t)(ThreadData* td); // 函数指针类型
class Thread
{
public:
void Excute()
{
std::cout << _name << " is running" << std::endl;
_isrunning = true;
_func(_td);
_isrunning = false;
}
public:
Thread(const std::string &name, func_t func,ThreadData* td):_name(name), _func(func),_td(td)
{
std::cout << "create " << name << " done" << std::endl;
}
static void *ThreadRoutine(void *args) // 变成static,内部没有this指针,pthread_create就能匹配上了
{
Thread *self = static_cast<Thread*>(args); // 获得了当前对象
self->Excute();
return nullptr;
}
bool Start()
{
int n = ::pthread_create(&_tid, nullptr, ThreadRoutine, this);//传this,线程函数才能拿到_func方法
if(n != 0) return false;
return true;
}
std::string Status()
{
if(_isrunning) return "running";
else return "sleep";
}
void Stop()
{
if(_isrunning)
{
::pthread_cancel(_tid);
_isrunning = false;
std::cout << _name << " Stop" << std::endl;
}
}
void Join()
{
::pthread_join(_tid, nullptr);
std::cout << _name << " Joined" << std::endl;
delete _td;
}
std::string Name()
{
return _name;
}
~Thread()
{
}
private:
std::string _name;
pthread_t _tid;
bool _isrunning;
func_t _func; // 线程要执行的回调函数
ThreadData* _td;
};
} // namespace ThreadModle
AI助手
运行代码,结果如下图,4个线程的锁都是同一个地址,说明他们访问的都是同一把锁。
最后再把线程函数代码换成抢票的代码即可,结果跟用全局的一样。
因此锁的使用既可以用全局的,也可以以参数的形式传递给线程。
锁简单封装
因为是临时对象,在循环结束后会自动调用析构函数销毁。
互斥量实现原理
经过上面的例子,大家已经意识到单纯的 i++ 或者 ++i 都不是原子的,有可能会有数据一致性问题
为了实现互斥锁操作,大多数体系结构都提供了swap或exchange指令,该指令的作用是把寄存器和内存单元的数据相交换,由于只有一条指令,保证了原子性,即使是多处理器平台,访问内存的 总线周期也有先后,一个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。
Linux线程同步
同步概念与竞态条件
同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步
竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。
如上图,线程2一直抢到票,其他线程一直抢不到,这时候就需要线程同步
条件变量
当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。
例如一个线程访问队列时,发现队列为空,它只能等待,直到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。
举例:有A,B,C三个人,一个盘子。B拿出苹果放到盘子上,另外两人就可以到盘子上拿。为了在放苹果的时候,其他人不能来拿,就要加锁,盘子就是临界区。因为另外两人想拿苹果,就一直申请锁,导致B放不了苹果。此时就需要一个铃铛。A,C两人在外面排队,当B放好苹果后就摇铃铛,此时A和C就会根据排队的顺序依次进去拿苹果。
上面的铃铛就是条件变量,人就是线程。摇铃铛后,可以规定是唤醒一个线程还是唤醒全部。
认识接口
条件变量是 pthread_cond_t 的数据类型。它的使用跟前面互斥锁一样,可以定义成局部或者全局的。如果是全局或者静态的,可以直接使用 PTHREAD_COND_INITIALIZER 初始化。
如果是局部的,就用pthread_cond_init 函数初始化,使用完了就destroy销毁掉。
线程条件不满足时,线程就要等待,要在指定的条件变量上等待。
cond:要在这个条件变量上等待
等待完成后,就要进行唤醒。
pthread_cond_signal 表示唤醒一个线程。 pthread_cond_broadcast 表示唤醒所有线程。
条件变量接口使用例子:
#include <iostream>
#include <string>
#include <unistd.h>
#include <pthread.h>
const int num = 5;
pthread_mutex_t gmutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t gcond = PTHREAD_COND_INITIALIZER;
void *Wait(void *args)
{
std::string name = static_cast<const char *>(args);
while (true)
{
pthread_mutex_lock(&gmutex);
pthread_cond_wait(&gcond, &gmutex); // 这里就是线程等待的位置
usleep(10000);
std::cout << "I am : " << name << std::endl;
pthread_mutex_unlock(&gmutex);
// usleep(100000);
}
}
int main()
{
pthread_t threads[num];
for (int i = 0; i < num; i++)
{
char *name = new char[1024];
snprintf(name, 1024, "thread-%d", i + 1);
pthread_create(threads + i, nullptr, Wait, (void *)name);
usleep(10000);
}
sleep(1);
// 唤醒其他线程
while (true)
{
// pthread_cond_signal(&gcond);
pthread_cond_broadcast(&gcond);
std::cout << "唤醒一个线程...." << std::endl;
sleep(2);
}
for (int i = 0; i < num; i++)
{
pthread_join(threads[i], nullptr);
}
return 0;
}
AI助手
运行结果:
生产者消费者模型
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
生产者消费者模型优点:
解耦
支持并发
支持忙闲不均
为了方便记忆,这里有一个“321”原则:
一个交易场所(一段内存空间)
两种角色(生产、消费角色)
三种关系(生产和生产、消费和消费 、生产和消费)前两种是互斥关系,最后一种是互斥和同步的关系
基于BlockingQueue的生产者消费者模型
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出
BlockQueue.hpp
#pragma once
#include <iostream>
#include <string>
#include <queue>
#include <pthread.h>
const static int defaultcap = 5;
template <typename T>
class BlockQueue
{
private:
bool IsFull()
{
return _block_queue.size() == _max_cap;
}
bool IsEmpty()
{
return _block_queue.empty();
}
public:
BlockQueue(int cap = defaultcap) : _max_cap(cap)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_p_cond, nullptr);
pthread_cond_init(&_c_cond, nullptr);
}
// 假设:2个消费者
void Pop(T *out)
{
pthread_mutex_lock(&_mutex);
while (IsEmpty()) // while可以保证代码的鲁棒性(健壮性),不用if,因为如果只有一个生产品
{ //,有两个消费者,一次性唤醒两个消费者的话,他们会竞争锁,其中一个拿完东西后,另一个才能重新拿到锁
//另一个拿到锁后,不用if的话,就跳过判断了,此时队列是空的,就会异常,所以用while
// 添加尚未满足,但是线程被异常唤醒的情况,叫做伪唤醒!
pthread_cond_wait(&_c_cond, &_mutex); // 两个消费者都在这里等待了
}
// 1. 没有空 || 2. 被唤醒了
*out = _block_queue.front();
_block_queue.pop();
// if(_block_queue.size() > hight_water)
// pthread_cond_signal(&_p_cond);
pthread_mutex_unlock(&_mutex);
pthread_cond_signal(&_p_cond);
}
// 一个生产者
void Equeue(const T &in)
{
pthread_mutex_lock(&_mutex);
while (IsFull())
{
// 满了,生产者不能生产,必须等待
// 可是在临界区里面啊!
// 被调用的时候:除了让自己继续排队等待,还会自己释放传入的锁
// 函数返回的时候,不就还在临界区了!
// 返回时:必须先参与锁的竞争,重新加上锁,该函数才会返回!
pthread_cond_wait(&_p_cond, &_mutex);
}
// 1. 没有满 || 2. 被唤醒了
_block_queue.push(in); // 生产到阻塞队列
pthread_mutex_unlock(&_mutex);
// 让消费者消费
pthread_cond_signal(&_c_cond);//解锁和唤醒的顺序可以交换,不影响
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_p_cond);
pthread_cond_destroy(&_c_cond);
}
private:
std::queue<T> _block_queue; // 临界资源
int _max_cap;
pthread_mutex_t _mutex;
pthread_cond_t _p_cond; // 生产者条件变量
pthread_cond_t _c_cond; // 消费者条件变量
// int low_water = _max_cap/3
// int hight_water _max_cap/3*2
};
AI助手
main.cc
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <ctime>
#include <unistd.h>
void *Consumer(void *args)
{
BlockQueue<task_t> *bq = static_cast<BlockQueue<task_t> *>(args);
while(true)
{
// 1. 获取数据
task_t t;
bq->Pop(&t);
// 2. 处理数据
// t.Excute();
t();
// std::cout << "Consumer -> " << t.result() << std::endl;
}
}
void *Productor(void *args)
{
srand(time(nullptr) ^ getpid());
BlockQueue<task_t> *bq = static_cast<BlockQueue<task_t> *>(args);
while(true)
{
// 1. 构建数据/任务
// int x = rand() % 10 + 1; // [1, 10]
// usleep(x * 1000);
// int y = rand() % 10 + 1; // [1, 10]
// Task t(x, y);
// 2. 生产数据
bq->Equeue(Download);
std::cout << "Productor -> Download" << std::endl;
sleep(1);
}
}
int main()
{
BlockQueue<task_t> *bq = new BlockQueue<task_t>();
pthread_t c1,c2, p1,p2,p3;
pthread_create(&c1, nullptr, Consumer, bq);
pthread_create(&c2, nullptr, Consumer, bq);
pthread_create(&p1, nullptr, Productor, bq);
pthread_create(&p2, nullptr, Productor, bq);
pthread_create(&p3, nullptr, Productor, bq);
pthread_join(c1, nullptr);
pthread_join(c2, nullptr);
pthread_join(p1, nullptr);
pthread_join(p2, nullptr);
pthread_join(p3, nullptr);
return 0;
}
AI助手
Task.hpp
#pragma once
#include<iostream>
#include<functional>
// typedef std::function<void()> task_t;
using task_t = std::function<void()>;
void Download()
{
std::cout << "我是一个下载的任务" << std::endl;
}
// // 要做加法
// class Task
// {
// public:
// Task()
// {
// }
// Task(int x, int y) : _x(x), _y(y)
// {
// }
// void Excute()
// {
// _result = _x + _y;
// }
// void operator ()()
// {
// Excute();
// }
// std::string debug()
// {
// std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=?";
// return msg;
// }
// std::string result()
// {
// std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=" + std::to_string(_result);
// return msg;
// }
// private:
// int _x;
// int _y;
// int _result;
// };
————————————————
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/qinjh_/article/details/141596008
版权声明:
作者:SE_Wang
链接:https://www.cnesa.cn/2541.html
来源:CNESA
文章版权归作者所有,未经允许请勿转载。
共有 0 条评论