首页 | 新闻 | 新品 | 文库 | 方案 | 视频 | 下载 | 商城 | 开发板 | 数据中心 | 座谈新版 | 培训 | 工具 | 博客 | 论坛 | 百科 | GEC | 活动 | 主题月 | 电子展
返回列表 回复 发帖

Perl 中的线程(3)共享与同步

Perl 中的线程(3)共享与同步

threads::shared和现有大多数线程模型不同,在 Perl ithreads 线程模型中,默认情况下任何数据结构都不是共享的。当一个新线程被创建以后,它就已经包含了当前所有数据结构的一份私有拷贝,新建线程中对这份拷贝的数据结构的任何操作都不会在其他线程中有效。因此,如果需要使用任何共享的数据,都必须显式地申明。threads::shared 包可以用来实现线程间共享数据的目的。
清单 9. 在线程中申明和使用共享数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#!/usr/bin/perl
#

use threads;
use threads::shared;
use strict;

my $var   :shared  = 0;       # use :share tag to define
my @array :shared = (); # use :share tag to define
my %hash = ();
share(%hash);                  # use share() funtion to define


sub start {
$var = 100;

@array[0] = 200;
@array[1] = 201;

$hash{'1'} = 301;
$hash{'2'} = 302;
}

sub verify {
   sleep(1);                      # make sure thread t1 execute firstly
   printf("var = $var\n");     # var=100

for(my $i = 0; $i < scalar(@array); $i++) {
       printf("array[$i] = $array[$i]\n");    # array[0]=200; array[1]=201
}

foreach my $key ( sort( keys(%hash) ) ) {
printf("hash{$key} = $hash{$key}\n"); # hash{1}=301; hash{2}=302
}
}

my $t1 = threads->create( \&start );
my $t2 = threads->create( \&verify );

$t1->join();
$t2->join();




锁多线程间既然有了共享的数据,那么就必须对共享数据进行小心地访问,否则,冲突在所难免。Perl ithreads 线程模型中内置的 lock 方法实现了线程间共享数据的锁机制。有趣的是,并不存在一个 unlock 方法用来显式地解锁,锁的生命周期以代码块为单位,也就是说,当 lock 操作所在的代码块执行结束之后,也就是锁被隐式释放之时。例如
清单 10. 线程中的锁机制
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
use threads::shared;

# in thread 1
{
   lock( $share );        # lock for 3 seconds
   sleep(3);               # other threads can not lock again
}
# unlock implicitly now after the block

# in thread 2
{
   lock($share);          # will be blocked, as already locked by thread 1
   $share++;               # after thread 1 quit from the block
}
# unlock implicitly now after the block




上面的示例中,我们在 thread 1 中使用 lock 方法锁住了一个普通的标量,这会导致 thread 2 在试图获取 $share 变量的锁时被阻塞,当 thread 1 从调用 lock 的代码块中退出时,锁被隐式地释放,从而 thread 2 阻塞结束,lock 成功以后,thread 2 才可以执行 $share++ 的操作。对于数组和哈希表来说,lock 必须用在整个数据结构上,而不是用在数组或哈希表的某一个元素上。例如
清单 11. 在数组或哈希表上使用锁机制
1
2
3
4
5
6
7
8
9
10
11
12
13
use threads;
use threads::shared;

{
   lock(@share);          # the array has been locked
   lock(%hash);           # the hash has been locked
   sleep(3);               # other threads can not lock again
}

{
   lock($share[1]);     # error will occur
   lock($hash{key});    # error will occur
}




假如一个线程对某一个共享变量实施了锁操作,在它没有释放锁之前,如果另外一个线程也对这个共享变量实施锁操作,那么这个线程就会被阻塞,阻塞不会被自动中止而是直到前一个线程将锁释放为止。这样的模式就带来了我们常见的死锁问题。例如
清单 12. 线程中的死锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
use threads;
use threads::shared;
# in thread 1
{
   lock($a);              # lock for 3 seconds
   sleep(3);              # other threads can not lock again
   lock($b);              # dead lock here
}

# in thread 2
{
   lock($b);              # will be blocked, as already locked by thread 1
   sleep(3);              # after thread 1 quit from the block
   lock($a);              # dead lock here
}




死锁常常是多线程程序中最隐蔽的问题,往往难以发现与调试,也增加了排查问题的难度。为了避免在程序中死锁的问题,在程序中我们应该尽量避免同时获取多个共享变量的锁,如果无法避免,那么一是要尽量使用相同的顺序来获取多个共享变量的锁,另外也要尽可能地细化上锁的粒度,减少上锁的时间。
信号量Thread::Semaphore 包为线程提供了信号量的支持。你可以创建一个自己的信号量,并通过 down 操作和 up 操作来实现对资源的同步访问。实际上,down 操作和 up 操作对应的就是我们所熟知的 P 操作和 V 操作。从内部实现上看,Thread::Semaphore 本质上就是加了锁的共享变量,无非是把这个加了锁的共享变量封装成了一个线程安全的包而已。由于信号量不必与任何变量绑定,因此,它非常灵活,可以用来控制你想同步的任何数据结构和程序行为。例如
清单 13. 线程中的信号量
1
2
3
4
5
6
7
8
use threads;
use threads::shared;
use Thread::Semaphore;

my $s = Thread::Semaphore->new();
$s->down();                # P operation
...
$s->up();                  # V operation




从本质上说,信号量是一个共享的整型变量的引用。默认情况下,它的初始值为 1,down 操作使它的值减 1,up 操作使它的值加 1。当然,你也可以自定义信号量初始值和每次 up 或 down 操作时信号量的变化。例如
清单 14. 线程中的信号量
1
2
3
4
5
6
7
8
9
10
use threads;
use Thread::Semaphore;

my $s = Thread::Semaphore->new(5);
printf("s = " . ${$s} . "\n");         # s = 5
$s->down(3);
printf("s = " . ${$s} . "\n");         # s = 2
...
$s->up(4);
printf("s = " . ${$s} . "\n");         # s = 6




线程队列Thread:ueue 包为线程提供了线程安全的队列支持。与信号量类似,从内部实现上看,Thread:ueue 也是把一个通过锁机制实现同步访问的共享队列封装成了一个线程安全的包,并提供统一的使用接口。Thread:ueue 在某些情况下可以大大简化线程间通信的难度和成本。例如在生产者 - 消费者模型中,生产者可以不断地在线程队列上做 enqueue 操作,而消费者只需要不断地在线程队列上做 dequeue 操作,这就很简单地实现了生产者和消费者之间同步的问题。例如
清单 15. 生产者 - 消费者模型中对线程队列的使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#!/usr/bin/perl
#

use threads;
use Thread:ueue;

my $q = Thread:ueue->new();

sub produce {
   my $name = shift;
   while(1) {
       my $r = int(rand(100));
       $q->enqueue($r);
       printf("$name produce $r\n");
       sleep(int(rand(3)));
   }
}

sub consume {
   my $name = shift;
   while(my $r = $q->dequeue()) {
       printf("consume $r\n");
   }
}

my $producer1 = threads->create(\&produce, "producer1");
my $producer2 = threads->create(\&produce, "producer2");
my $consumer1 = threads->create(\&consume, "consumer2");

$producer1->join();
$producer2->join();
$consumer1->join();

返回列表