示例项目

另一种线程策略

在上一个示例中,我们看到了基于“每个连接一个线程”模式构建的多线程服务器。在今天的示例中,我们将最新体育赛事资讯、实时赔率分析及在线投注平台探索另一种模式,即工作线程模式。在此模式中,服务器由一个或多个在服务器生命周期内运行的线程组成。这些线程处理可供它们执行的任何工作。完成任务后,工作线程不会退出;相反,它将检查是否有更多的工作可供它去做。

工作队列

工作线程模式的一个重要组件是任务队列数据结构,它可以存储要由工作线程处理的任务。每个工作线程由一个无限循环组成。在循环的每次迭代中,工作将检查任务队列,以查看是否有任何工作可供它执行。如果队列上有可用的工作,则worker将从队列中删除一个任务,处理该任务,然后返回队列检查是否有更多的工作。

由于任务队列将由多个工作线程共享,因此我们必须在设置任务队列数据结构时小心翼翼,以使该结构及其方法线程安全。

使用信号量

在上一讲中,我们看到了我们可以用互斥锁在代码的临界区上加锁。互斥锁将阻止其他线程运行相同的代码,直到设置锁的线程离开临界区并解锁互斥锁。

使用互斥锁的另一种选择是使用信号量。信号量是一种存储单个无符号整数的数据类型。信号量提供线程安全的函数来增加和减少整数。

为了使用信号量函数,我们包含了semaphore.h头文件。

# include < semaphore.h >

要创建一个信号量,我们使用sem_init()函数:

sem_t信号;sem_init(信号量0 1);

sem_init()的第三个参数是信号量的初始值。第二个参数是一个选项设置,用于指定信号量是否在线程或进程之间共享。该选项的值为0表示我们只想在同一进程中的线程之间共享信号量。

为了增加和减少信号量,我们使用sem_post()和sem_wait()函数。sem_post()函数将信号量加1并立即返回。如果信号量当前具有非零值,sem_wait()将信号量递减并立即返回。如果信号量的值为0,sem_wait()将阻塞,直到信号量具有非零值:然后它将尝试减少信号量并返回。

信号量的一个简单用途是复制互斥锁的行为。互斥码

//在线程之外:pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;//在我们的线程代码中:pthread_mutex_lock(&mutex);//执行一些操作pthread_mutex_unlock(&mutex);

是否等同于信号量代码

//在线程外执行:sem_t信号量;sem_init(信号量0 1);//在我们的线程代码中:sem_wait(&semaphore);//做一些事情sem_post(&semaphore);

任务队列

在工作线程版本的web服务器赢博体育程序中,我们需要一个任务队列来存储客户端套接字连接的文件描述符。当客户端到达时,服务器将把该客户端套接字的文件描述符放在任务队列上,然后返回监听新的客户端连接。工作线程将由无限循环组成,这些循环检查任务队列中是否有文件描述符要服务。

任务队列将被实现为一个queue结构体:

#define QUEUE_SIZE 16 typedef struct {int d[QUEUE_SIZE];int面前;int返回;sem_t互斥对象;sem_t槽;sem_t项目;}队列;

下面的函数是我们将与队列一起使用的方法:

队列* queueCreate ();Void enqueue(queue* q,int fd);Int dequeue(queue* q);

为了使队列线程安全,我们将使用三个独立的信号量。互斥信号量将实现一个简单的互斥,它将保护方法中代码的关键部分。items信号量将维护当前存储在队列中的文件描述符的数量。slots信号量将对队列中有多少空闲空间可用来存储额外的文件描述符进行计数。

下面是这些方法的代码:

queue* queueCreate() {queue* q = (queue*) malloc(sizeof(queue));Q ->front = 0;Q ->back = 0;sem_init (q - >互斥0 1);sem_init (q - >槽0 QUEUE_SIZE);sem_init (q - >项目,0,0);返回问;}无效enqueue(queue* q,int fd) {sem_wait(&q->插槽);sem_wait (q - >互斥);Q ->d[Q ->back] = fd;q->back = (q->back+1)%QUEUE_SIZE;sem_post (q - >互斥);sem_post (q - >项目);} int dequeue(queue* q) {int fd;sem_wait (q - >项目);sem_wait (q - >互斥);Fd = q->d[q->front];q->front = (q->front+1)%QUEUE_SIZE;sem_post (q - >互斥);sem_post (q - >槽);返回fd;}

这三个信号量将合作调节与队列的交互。例如,考虑dequeue()方法。我们在该方法中做的第一件事是与items信号量交互,它可以告诉我们队列中有多少项。我们首先尝试递减这个信号量,因为我们即将从队列中删除一个项目。如果队列中当前有一些项,则减量立即成功,然后进行下一步。如果当前队列中没有项目,则对sem_wait()的调用将阻塞,直到项目信号量超过0。

如果项目可用,我们锁定互斥锁,继续执行从队列中删除文件描述符的代码,然后解锁互斥锁。在退出该方法时,我们增加了slots信号量,因为我们刚刚通过删除一个条目在队列中创建了一个额外的空闲插槽。

服务器代码

在服务器的main()函数中,我们首先设置任务队列和工作线程:

//设置队列queue* q = queueCreate();//设置工作线程pthread_t w1,w2;pthread_create (w1, NULL, workerThread q);pthread_create (w2, NULL, workerThread q);

然后我们运行通常的代码来设置服务器套接字并开始监听客户端连接。接受客户端连接的代码只是简单地获取在任务队列上创建的每个新客户端套接字,供工作人员处理,然后返回侦听新连接:

当(1){struct sockaddr_in客户端;Int new_socket, c = sizeof(struct sockaddr_in);New_socket = accept(server_socket, (struct sockaddr *) &client, (socklen_t*)&c);If (new_socket != -1) {enqueue(q,new_socket);}}

工作线程的thread函数非常简单:

void* workerThread(void *arg) {queue* q = (queue*) arg;While (1) {int fd = dequeue(q);serveRequest (fd);}返回NULL;}

与前面一样,与客户机的交互由serveRequest()函数处理,该函数与我们在第一个版本的服务器程序中看到的函数相同。

编译多文件项目

这个版本的web服务器的另一个新特性是在项目中使用多个源文件。队列数据结构在一个名为queue.h的头文件和一个源代码文件queue.c中实现。服务器的代码位于一个单独的源代码文件miniweb.c中。

处理具有多个源代码文件的项目的最佳方法是为项目设置一个makefile。这是我为这个项目设置的makefile:

miniweb: miniweb。o队列。O GCC miniweb。o队列。O -pthread -o miniwebminiweb.c -c -g -pthread queue.h0: queue.c queue.h GCC queue.c

该项目的主要目标,miniweb可执行文件,依赖于从源文件编译的目标文件。要从单个源代码文件编译目标文件,我们使用gcc -c选项编译源代码文件,该选项告诉编译器将C代码编译为目标代码,而不将代码链接到可执行文件中。

miniweb可执行文件的build命令将对目标文件进行操作,并将它们链接在一起,使miniweb可执行。

注意,在赢博体育这些构建命令中都使用了-pthread选项。任何使用POSIX线程的项目都需要该选项。

一旦你为一个项目设置了一个makefile,你就可以使用Visual Studio Code中的makefile扩展来编译和运行这个项目。单击Code中的Makefile选项卡以访问构建、运行和调试项目的按钮。