首页 | 新闻 | 新品 | 文库 | 方案 | 视频 | 下载 | 商城 | 开发板 | 数据中心 | 座谈新版 | 培训 | 工具 | 博客 | 论坛 | 百科 | GEC | 活动 | 主题月 | 电子展
返回列表 回复 发帖

通用线程:POSIX 线程详解,第 3 部分 使用条件变量提高效率-4

通用线程:POSIX 线程详解,第 3 部分 使用条件变量提高效率-4

调试时间在开始调试之前,还需要一个文件。以下是 dbug.h:
dbug.h
1
2
#define dabort() \
{  printf("Aborting at line %d in source file %s\n",__LINE__,__FILE__); abort(); }




此代码用于处理工作组代码中的不可纠正错误。
工作组代码说到工作组代码,以下就是:
workcrew.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
#include <stdio.h>
#include <stdlib.h>
#include "control.h"
#include "queue.h"
#include "dbug.h"
/* the work_queue holds tasks for the various threads to complete. */
struct work_queue {
  data_control control;
  queue work;
} wq;
/* I added a job number to the work node.  Normally, the work node
   would contain additional data that needed to be processed. */
typedef struct work_node {
  struct node *next;
  int jobnum;
} wnode;
/* the cleanup queue holds stopped threads.  Before a thread
   terminates, it adds itself to this list.  Since the main thread is
   waiting for changes in this list, it will then wake up and clean up
   the newly terminated thread. */
struct cleanup_queue {
  data_control control;
  queue cleanup;
} cq;
/* I added a thread number (for debugging/instructional purposes) and
   a thread id to the cleanup node.  The cleanup node gets passed to
   the new thread on startup, and just before the thread stops, it
   attaches the cleanup node to the cleanup queue.  The main thread
   monitors the cleanup queue and is the one that performs the
   necessary cleanup. */
typedef struct cleanup_node {
  struct node *next;
  int threadnum;
  pthread_t tid;
} cnode;
void *threadfunc(void *myarg) {
  wnode *mywork;
  cnode *mynode;
  mynode=(cnode *) myarg;
  pthread_mutex_lock(&wq.control.mutex);
  while (wq.control.active) {
    while (wq.work.head==NULL && wq.control.active) {
      pthread_cond_wait(&wq.control.cond, &wq.control.mutex);
    }
    if (!wq.control.active)
      break;
    //we got something!
    mywork=(wnode *) queue_get(&wq.work);
    pthread_mutex_unlock(&wq.control.mutex);
    //perform processing...
    printf("Thread number %d processing job %d\n",mynode->threadnum,mywork->jobnum);
    free(mywork);
    pthread_mutex_lock(&wq.control.mutex);
  }
  pthread_mutex_unlock(&wq.control.mutex);
  pthread_mutex_lock(&cq.control.mutex);
  queue_put(&cq.cleanup,(node *) mynode);
  pthread_mutex_unlock(&cq.control.mutex);
  pthread_cond_signal(&cq.control.cond);
  printf("thread %d shutting down...\n",mynode->threadnum);
  return NULL;
   
}
#define NUM_WORKERS 4
int numthreads;
void join_threads(void) {
  cnode *curnode;
  printf("joining threads...\n");
  while (numthreads) {
    pthread_mutex_lock(&cq.control.mutex);
    /* below, we sleep until there really is a new cleanup node.  This
       takes care of any false wakeups... even if we break out of
       pthread_cond_wait(), we don't make any assumptions that the
       condition we were waiting for is true.  */
    while (cq.cleanup.head==NULL) {
      pthread_cond_wait(&cq.control.cond,&cq.control.mutex);
    }
    /* at this point, we hold the mutex and there is an item in the
       list that we need to process.  First, we remove the node from
       the queue.  Then, we call pthread_join() on the tid stored in
       the node.  When pthread_join() returns, we have cleaned up
       after a thread.  Only then do we free() the node, decrement the
       number of additional threads we need to wait for and repeat the
       entire process, if necessary */
      curnode = (cnode *) queue_get(&cq.cleanup);
      pthread_mutex_unlock(&cq.control.mutex);
      pthread_join(curnode->tid,NULL);
      printf("joined with thread %d\n",curnode->threadnum);
      free(curnode);
      numthreads--;
  }
}
int create_threads(void) {
  int x;
  cnode *curnode;
  for (x=0; x<NUM_WORKERS; x++) {
    curnode=malloc(sizeof(cnode));
    if (!curnode)
      return 1;
    curnode->threadnum=x;
    if (pthread_create(&curnode->tid, NULL, threadfunc, (void *) curnode))
      return 1;
    printf("created thread %d\n",x);
    numthreads++;
  }
  return 0;
}
void initialize_structs(void) {
  numthreads=0;
  if (control_init(&wq.control))
    dabort();
  queue_init(&wq.work);
  if (control_init(&cq.control)) {
    control_destroy(&wq.control);
    dabort();
  }
  queue_init(&wq.work);
  control_activate(&wq.control);
}
void cleanup_structs(void) {
  control_destroy(&cq.control);
  control_destroy(&wq.control);
}
int main(void) {
  int x;
  wnode *mywork;
  initialize_structs();
  /* CREATION */
   
  if (create_threads()) {
    printf("Error starting threads... cleaning up.\n");
    join_threads();
    dabort();
  }
  pthread_mutex_lock(&wq.control.mutex);
  for (x=0; x<16000; x++) {
    mywork=malloc(sizeof(wnode));
    if (!mywork) {
      printf("ouch! can't malloc!\n");
      break;
    }
    mywork->jobnum=x;
    queue_put(&wq.work,(node *) mywork);
  }
  pthread_mutex_unlock(&wq.control.mutex);
  pthread_cond_broadcast(&wq.control.cond);
  printf("sleeping...\n");
  sleep(2);
  printf("deactivating work queue...\n");
  control_deactivate(&wq.control);
  /* CLEANUP  */
  join_threads();
  cleanup_structs();
}

返回列表