线程(Thread System in Chez Scheme)

下面介绍Chez Scheme线程系统过程和语法形式。 除了锁,锁增量和锁减量之外,线程系统的功能在非基于Windows的系统上在Posix线程系统(pthreads)之上实现,并在基于Windows的系统上直接使用Windows API。 有关线程创建和交互的基本详细信息,请查阅系统上的相应文档。

大多数原生的Scheme过程都是线程安全 的,这意味着可以从多个线程中同时调用它们。 这包括诸如consmake-string之类的分配操作,诸如carvector-ref之类的访问器,诸如**+sqrt之类的数字运算符以及诸如appendmap**之类的非破坏性的高级原生操作。

简单的变动运算符(例如set-car!,vector-set!和record字段变动器)是线程安全的。 同样,对局部变量的分配,包括(未导出的)库和顶级程序变量的分配也是线程安全的。

大多数I/O操作应被视为具有破坏性,因为它们可能会修改端口的内部结构。

使用没有进行合适的同步策略的非线程安全的运算符可能会破坏它们所操作的对象。 这种损坏可能导致错误的行为,内存故障,甚至导致系统中止的不可恢复的错误。

线程创建

(fork-thread thunk)

thunk 必须是一个接受0个参数的过程。

fork-thread在一个新线程中调用thunk ,并返回一个线程对象。

除了打印它外,fork-thread返回的线程对象无法执行任何操作。

除了使用fork-thread以外,可通过外部代码来创建的线程必须在触及任何Scheme数据或调用任何Scheme过程之前调用Sactivate_thread

(thread? obj)

返回:如果obj 是一个线程对象,返回#t,否则返回#f

(get-thread-id)

返回:当前线程的id

线程id是由线程分配的线程号,并且与get-process-id返回的进程id没有关系,进程id在所有线程中都是相同的。

互斥锁

(make-mutex)

返回:一个新的互斥锁对象

(mutex? obj)

返回:如果obj 是互斥锁,返回#t

(mutex-acquire mutex) | (mutex-acquire mutex block?)

mutex 必须是一个互斥锁。

mutex-acquire获取由mutex 标识的互斥锁。可选的bool参数block? 默认为#t,指定线程是否应阻塞等待互斥锁。block? 如果省略或为true,则线程将阻塞,直到获取了互斥锁为止,并返回未指定的值。

如果block? 为false并且互斥锁当前已属于其他线程,当前线程并不会阻塞,而是,mutex-acquire立即返回值**#f**,以指示互斥锁不可用。 如果block? 为false并且成功获取了互斥锁,mutex-acquire返回#t。

互斥锁在Posix线程术语中是递归的,这意味着调用线程可以使用互斥锁获取(重新)获取它已经拥有的互斥锁(注:锁是可以重入的)。 在这种情况下,释放互斥锁需要执行相等数量的mutex-release

(mutex-release mutex)

mutex 必须是一个互斥锁。

mutex-release释放由mutex 标识的互斥锁。 如果mutex 不属于调用线程(即释放别人的互斥锁),则会导致无法预料的行为。

(with-mutex mutex body1 body2 …)

with-mutex计算表达式mutex ,其必须可以计算为一个互斥锁,获取锁,并计算body1 body2 … ,然后释放锁。无论body是正常返回还是通过控制操作(即可能由于错误而抛到continuation)释放互斥量,这都会导致with-mutex形式的非本地退出。如果控制随后通过continuation调用返回到body,则将重新获取互斥锁。

与直接使用mutex-acquiremutex-release相比,使用with-mutex通常更方便,更安全。

条件

(make-condition)

返回:一个新的条件对象

(thread-condition? obj)

返回:如果obj 是条件对象,返回#t

(condition-wait cond mutex) | (condition-wait cond mutex timeout)

返回:如果调用线程被条件唤醒返回#t,如果调用线程超时等待返回#f

cond 必须是条件对象,并且mutex 必须是互斥锁。 可选参数timeout 是类型为time-durationtime-utc或**#f的时间记录,表示没有超时。 默认为#f**。

condition-wait等待由cond 标识的条件,且等待指定的timeout时长。在调用condition-wait时,调用线程必须已获取互斥锁mutex 。由于调用condition-wait的副作用而释放了mutex 。当稍后通过下述的过程之一从条件变量释放线程或超时到期时,将重新获取mutex ,并返回condition-wait

(condition-signal cond)

cond 必须是一个条件对象。

condition-signal释放所有等待cond 标识的条件对象的线程的其中一个。

(condition-broadcast cond)

cond 必须是一个条件对象。

condition-broadcast释放所有等待cond 标识的条件对象的线程

锁比互斥锁更原生,但是更加灵活和有效。

只要锁在进程共享的内存中分配,它们还可以独立于线程系统使用(包括在Chez Scheme的非线程版本中)以在分离的Scheme进程中执行同步操作。 锁只是一个字长的整数,即iptruptr外部类型,其中包含目标计算机的本机字节序,可能是使用define-ftype定义的大结构的一部分。必须在驻留于Scheme堆外部的内存中显式分配它,并在适当时显式释放。当仅涉及线程时(即,当不涉及多个进程时),可以通过foreign-alloc分配内存。 当涉及多个进程时,应在进程共享的某个区域中分配该锁。

使用ftype-init-lock! 初始化后,进程或线程可以尝试通过**ftype-lock!ftype-spin-lock!锁定lock 。一旦锁已被锁定并且在解锁之前,即使通过最近锁定它的进程或线程,再一次尝试锁定该锁也会失败。 任何进程或线程可以使用ftype-unlock!**来解锁lock,而不仅仅是通过最近锁定锁的进程或线程来解锁。

锁机制提供的结构很少,并且分配和使用中的错误可能导致内存错误,死锁和其他问题。 因此,通常建议仅将锁用作更高级别抽象的一部分,以确保按规范方式使用锁。

(define lock
  (make-ftype-pointer uptr
    (foreign-alloc (ftype-sizeof uptr))))

(ftype-init-lock! uptr () lock)
(ftype-lock! uptr () lock)  #t
(ftype-lock! uptr () lock)  #f
(ftype-unlock! uptr () lock)
(ftype-spin-lock! uptr () lock)
(ftype-lock! uptr () lock)  #f
(ftype-unlock! uptr () lock)

(ftype-init-lock! ftype-name (a …) fptr-expr) | (ftype-init-lock! ftype-name (a …) fptr-expr index)

(ftype-lock! ftype-name (a …) fptr-expr) | (ftype-lock! ftype-name (a …) fptr-expr index)

(ftype-spin-lock! ftype-name (a …) fptr-expr) | (ftype-spin-lock! ftype-name (a …) fptr-expr index)

(ftype-unlock! ftype-name (a …) fptr-expr) | (ftype-unlock! ftype-name (a …) fptr-expr index)

它们每个的语法都类似于ftype-set!,尽管带有隐式的val-expr 。 特别是,对fptr-expr 和访问器a … 的限制和处理是相似的,但有一个重要的限制:最后一个访问器所指定的字段(该格式在其上进行操作)必须是一个字长的整数,即 ,iptr,uptr或具有本地字节序的等效项。

**ftype-init-lock!**应该在使用任何其他运算符之前用于初始化锁; 如果不这样做,则其他操作符的行为是不确定的。

ftype-lock! 可用于加锁。如果发现在操作时锁已解锁,则将其锁定并返回#t;如果发现该锁已加锁,则返回#f且不更改该锁。

ftype-spin-lock! 也可以用来加锁。 如果在操作时发现该锁已解锁,则将其锁定并返回; 如果发现该锁已锁定,它将一直等待直到锁被解锁,然后再锁定并返回。如果没有其他进程或者线程来释放该锁,该操作不会返回且不能以正常的方式(包括GC)中断。也不保证公平,因此即使其他进程正在主动锁定和释放该锁,进程也可能无限期挂起。

ftype-unlock! 用于解锁。如果发现锁已被锁定,则将其解锁并返回。 否则,它将返回而不更改锁。

原子操作Locked increment and decrement

当需要原子递增或递减时,可以使用此处描述的锁定操作。

(ftype-locked-incr! ftype-name (a …) fptr-expr) | (ftype-locked-incr! ftype-name (a …) fptr-expr index)

返回:如果更新的值为0,则为#t,否则为#f

(ftype-locked-decr! ftype-name (a …) fptr-expr) | (ftype-locked-decr! ftype-name (a …) fptr-expr index)

返回:如果更新的值为0,则为#t,否则为#f

它们每个的语法都类似于ftype-set!,尽管带有隐式的val-expr 。 特别是,对fptr-expr 和访问器a … 的限制和处理是相似的,但有一个重要的限制:最后一个访问器所指定的字段(该格式在其上进行操作)必须是一个字长的整数,即 ,iptr,uptr或具有本地字节序的等效项。

ftype-locked-incr! 自动读取指定字段的值,将值加1,然后将新值写回该字段。 同样,ftype-locked-decr! 原子读取指定字段的值,从该值中减去1,然后将新值写回到该字段中。 如果新值为0,则两者都返回#t,否则返回#f。

引用计数

在Scheme堆之外管理内存的应用程序可以利用Scheme存储管理系统通过ftype guardians 执行引用计数。 在引用计数的内存管理系统中,每个对象都保存着指向它的指针计数。 当创建一个新的指针时,该计数增加;而在删除指针时,该计数减小。 当计数达到零时,不再需要该对象,并且可以将其占用的内存用于其他目的。

(ftype-guardian ftype-name)

ftype-name 必须命名一个ftype。 ftype的第一个基本字段(或在unions的情况下为第一个基本字段)必须是具有本地尾数的字长整数(iptr或uptr)。 假定此字段保存引用计数。

返回新的ftype guardian g ,可以使用其注册ftype-name 类型(或ftype-nam e的某些子类型)的ftype-pointer。 通过使用ftype指针作为参数调用g ,可以向g 注册ftype指针。

ftype guardian不会像普通的guardian那样自动保护其注册的ftype指针免遭回收。而是,对于每个通过普通(非弱,非监护人指针)变得不可访问的已注册的ftype指针,guardian会减少ftype指针指向的对象的引用计数。如果引用计数值为0,则ftype指针将保留并可以从guardian中检索。 但是,如果生成的引用计数值非零,则不会保留ftype指针。假设回收器外部的代码正确维护了引用计数,则从ftype guardian检索的对象(通过不带参数的调用)将确保具有0引用计数。回收器使用等效的ftype-locked-decr!来减少引用计数,以支持由多个进程共享的内存中的非Scheme对象。在这样的系统中,程序本身应使用ftype-locked-incr!ftype-locked-decr! 或非Scheme等效项(例如,第4.8节中所述C语言的scheme.h中的 LOCKED_INCR和LOCKED_DECR宏)来维护引用计数。

下面的示例为ftype对象定义了一个简单的ftype和一个分配器,该对象释放以前分配的且不再可访问的ftype对象。

module (A make-A free-dropped-As)
  (define-ftype A
    (struct
      [refcount uptr]
      [data int]))
  (define g (ftype-guardian A))
  (define free-dropped-As
    (lambda ()
      (let ([a (g)])
        (when a
          (printf "freeing ~s\n" (ftype-ref A (data) a))
          (foreign-free (ftype-pointer-address a))
          (free-dropped-As)))))
  (define make-A
    (lambda (n)
      (free-dropped-As)
      (let ([a (make-ftype-pointer A (foreign-alloc (ftype-sizeof A)))])
        (ftype-set! A (refcount) a 1)
        (ftype-set! A (data) a n)
        (g a)
        a))))

我们可以通过分配,丢弃并立即回收指向A的ftype指针进行测试。

> (do ([i 10 (fx- i 1)])
      ((fx= i 0))
(make-A i)
    (collect))
freeing 10
freeing 9
freeing 8
freeing 7
freeing 6
freeing 5
freeing 4
freeing 3
freeing 2
> (free-dropped-As)
freeing 1

由ftype guardian保护的对象可能包含指向其他对象的指针,这些其他对象的引用计数也应在分配包含对象时增加,并在释放包含对象时减少。

线程参数

(make-thread-parameter object) | (make-thread-parameter object procedure)

创建线程参数后,将在每个当前线程和将来的线程中放置一个单独的位置,以保存参数的内部状态变量的值。 (当该参数变得不可访问时,存储管理器可以消除该位置。)一个线程中对线程参数的更改不会被其他任何线程看到。

创建新线程时(请参阅fork-thread),每个线程参数的当前值(而非位置)都由新线程从派生线程继承。 类似地,当第一次激活通过其他方式创建的线程时(请参见4.8节中的Sactivate_thread),每个线程参数的当前值(而非位置)都由新线程从主(原始)线程继承。

大多数内置参数是线程参数,但有些是全局的。 在他们定义的地方已经标注了是线程的还是全局的。 在非线程版本的Chez Scheme中,内置全局参数和线程参数之间没有区别。

I/O 缓冲区

Chez Scheme为提高效率而缓冲文件I/O操作,但是缓冲的I/O不是线程安全的。 两个线程并发地读写同一个缓冲端口可能会破坏该端口,从而导致缓冲区溢出,并最终导致无效的内存引用。

当以缓冲模式none打开时,可以禁用二进制输出端口上的缓冲。 但是,由于需要支持先行(lookahead),因此无法完全禁用输入端口上的缓冲,并且由于要在字符和字节之间进行转换的代码转换器有时需要先行输入,因此无法完全禁用文本端口(甚至是文本输出端口)上的缓冲。

因此,除非在二进制输出端口开启缓冲模式为none的特殊情况下,否则两个线程绝不应该并发地读写同一个端口。替代方法包括指定一个线程为给定端口执行所有I / O,和为每个线程提供通用端口包装程序,仅在获取互斥量后才将请求转发到端口。

初始的控制台以及当前的输入和输出端口以及transcript端口都是线程安全的,因此多个线程向控制台打印错误和/或调试消息是安全的。即使在同一行内,输出也可能是交错的,但是端口不会损坏。 这些端口的线程安全性是通过为每个I/O操作获取一个互斥锁的高成本来实现的。

实例:有界队列

以下代码摘自文章“A Scheme for native threads1”,它使用许多线程系统功能实现了有界队列。 有界队列具有固定数量的可用插槽。 当队列已满时尝试入队会导致调用线程阻塞。 尝试从空队列中出队会导致调用线程阻塞。

(define-record-type bq
  (fields
    (immutable data)
    (mutable head)
    (mutable tail)
    (immutable mutex)
    (immutable ready)
    (immutable room))
  (protocol
    (lambda (new)
      (lambda (bound)
        (new (make-vector bound) 0 0 (make-mutex)
          (make-condition) (make-condition))))))

(define dequeue!
  (lambda (q)
    (with-mutex (bq-mutex q)
      (let loop ()
        (let ([head (bq-head q)])
          (cond
            [(= head (bq-tail q))
             (condition-wait (bq-ready q) (bq-mutex q))
             (loop)]
            [else
             (bq-head-set! q (incr q head))
             (condition-signal (bq-room q))
             (vector-ref (bq-data q) head)]))))))

(define enqueue!
  (lambda (item q)
    (with-mutex (bq-mutex q)
      (let loop ()
        (let* ([tail (bq-tail q)] [tail^ (incr q tail)])
          (cond
            [(= tail^ (bq-head q))
             (condition-wait (bq-room q) (bq-mutex q))
             (loop)]
			 [else
             (vector-set! (bq-data q) tail item)
             (bq-tail-set! q tail^)
             (condition-signal (bq-ready q))]))))))

(define incr
  (lambda (q i)
    (modulo (+ i 1) (vector-length (bq-data q)))))

下面的代码演示了有界队列在一组线程中的应用,这些线程分别扮演数据的消费者和生产者。

(define job-queue)
(define die? #f)

(define make-job
  (let ([count 0])
    (define fib
      (lambda (n)
	    (if (< n 2)
		  n
          (+ (fib (- n 2)) (fib (- n 1))))))
    (lambda (n)
      (set! count (+ count 1))
      (printf "Adding job #~s = (lambda () (fib ~s))\n" count n)
      (cons count (lambda () (fib n))))))

(define make-producer
  (lambda (n)
    (rec producer
      (lambda ()
        (printf "producer ~s posting a job\n" n)
        (enqueue! (make-job (+ 20 (random 10))) job-queue)
        (if die?
            (printf "producer ~s dying\n" n)
            (producer))))))


(define make-consumer
  (lambda (n)
    (rec consumer
      (lambda ()
        (printf "consumer ~s looking for a job~%" n)
        (let ([job (dequeue! job-queue)])
          (if die?
              (printf "consumer ~s dying\n" n)
              (begin
                (printf "consumer ~s executing job #~s~%" n (car job))
                (printf "consumer ~s computed:  ~s~%" n ((cdr job)))
                (consumer))))))))



(define (bq-test np nc)
  (set! job-queue (make-bq (max nc np)))
  (do ([np np (- np 1)])
      ((<= np 0))
      (fork-thread (make-producer np)))
  (do ([nc nc (- nc 1)])
      ((<= nc 0))
      (fork-thread (make-consumer nc))))

这是示例程序运行后的可能的前几行输出:

> (begin
    (bq-test 3 4)
    (system "sleep 3")
    (set! die? #t))
producer 3 posting a job
Adding job #1 = (lambda () (fib 29))
producer 3 posting a job
Adding job #2 = (lambda () (fib 26))
producer 3 posting a job
Adding job #3 = (lambda () (fib 22))
producer 3 posting a job
Adding job #4 = (lambda () (fib 21))
producer 2 posting a job
Adding job #5 = (lambda () (fib 29))
producer 1 posting a job
Adding job #6 = (lambda () (fib 29))
consumer 4 looking for a job
producer 3 posting a job
Adding job #7 = (lambda () (fib 24))
consumer 4 executing job #1
consumer 3 looking for a job
producer 2 posting a job
Adding job #8 = (lambda () (fib 26))
consumer 3 executing job #2
consumer 3 computed:  121393
consumer 3 looking for a job
producer 1 posting a job
Adding job #9 = (lambda () (fib 26))
...

在“A Scheme for native threads1”中给出了其他示例,包括可悬挂线程的定义和在无法访问时自动终止的线程。


  1. R. Kent Dybvig. A Scheme for native threads. In Symposium in Honor of Mitchell Wand, August 2009. http://www.ccs.neu.edu/events/wand-symposium/↩︎ ↩︎