Threads
创建一个新线程:
(thread (lambda() (displayln "This is a new thread")))
休眠线程并且杀掉:
(define worker (thread (lambda ()
(let loop ()
(displayln "Working...")
(sleep 0.2)
(loop)))))
(sleep 2.5)
(kill-thread worker)
如果主线程退出或者被杀掉,其子线程也面临着退出, 可以利用therad-wait来等待其他线程执行完成.
(define worker (thread
(lambda ()
(for ([i 100])
(printf "Working hard... ~a~n" i)))))
(thread-wait worker)
(displayln "Worker finished")
Thread Mailboxes
每个线程具备一个邮箱来接受消息,thread-send用于异步地向一个线程发送消息,thread-receive可获取邮箱中最老的消息,如有必要线程会阻塞等待消息到来。 这就是Actor并发模型,Erlang也是采用这种模型。
(define worker-thread (thread
(lambda ()
(let loop ()
(match (thread-receive)
[(? number? num)
(printf "Processing ~a~n" num)
(loop)]
['done
(printf "Done~n")])))))
(for ([i 20])
(thread-send worker-thread i))
(thread-send worker-thread 'done)
(thread-wait worker-thread)
Semaphores
用于同步。
(define output-semaphore (make-semaphore 1))
(define (make-thread name)
(thread (lambda ()
(for [(i 10)]
(semaphore-wait output-semaphore)
(printf "thread ~a: ~a~n" name i)
(semaphore-post output-semaphore)))))
(define threads
(map make-thread '(A B C)))
(for-each thread-wait threads)
或者使用call-with-semaphore,不用显式地释放信号量
(define output-semaphore (make-semaphore 1))
(define (make-thread name)
(thread (lambda ()
(for [(i 10)]
(call-with-semaphore
output-semaphore
(lambda ()
(printf "thread ~a: ~a~n" name i)))))))
(define threads
(map make-thread '(A B C)))
(for-each thread-wait threads)
Channels
通道用于线程之间传递数据,Golang采取的就是这种方式。
channel-put用来发送数据到通道,channel-get可以从通道中获取数据.
(define result-channel (make-channel))
(define result-thread
(thread (lambda ()
(let loop ()
(display (channel-get result-channel))
(loop)))))
(define work-channel (make-channel))
(define (make-worker thread-id)
(thread
(lambda ()
(let loop ()
(define item (channel-get work-channel))
(case item
[(DONE)
(channel-put result-channel
(format "Thread ~a done\n" thread-id))]
[else
(channel-put result-channel
(format "Thread ~a processed ~a\n"
thread-id
item))
(loop)])))))
(define work-threads (map make-worker '(1 2)))
(for ([item '(A B C D E F G H DONE DONE)])
(channel-put work-channel item))
(for-each thread-wait work-threads)
(channel-put result-channel "") ; waits until result-thread has printed all other output
缓冲异步通道Buffered Asynchronous Channels
与上述的channel类似,只是BAC会有一定的缓冲区,一般不会被阻塞,除非缓冲区满了(put操作)或者空(get操作).
(require racket/async-channel)
(define print-thread
(thread (lambda ()
(let loop ()
(displayln (thread-receive))
(loop)))))
(define (safer-printf . items)
(thread-send print-thread
(apply format items)))
(define work-channel (make-async-channel 3))
(define (make-worker-thread thread-id)
(thread
(lambda ()
(let loop ()
(define item (async-channel-get work-channel))
(safer-printf "Thread ~a processing item: ~a" thread-id item)
(loop)))))
(for-each make-worker-thread '(1 2 3))
(for ([item '(a b c d e f g h i j k l m)])
(async-channel-put work-channel item))
同步事件和sync
(define main-thread (current-thread))
(define alarm (alarm-evt (+ 3000 (current-inexact-milliseconds))))
(define channel (make-channel))
(define (make-worker-thread thread-id)
(thread
(lambda ()
(define evt (sync channel alarm))
(cond
[(equal? evt alarm)
(thread-send main-thread 'alarm)]
[else
(thread-send main-thread
(format "Thread ~a received ~a"
thread-id
evt))]))))
(make-worker-thread 1)
(make-worker-thread 2)
(make-worker-thread 3)
(channel-put channel 'A)
(channel-put channel 'B)
(let loop ()
(match (thread-receive)
['alarm
(displayln "Done")]
[result
(displayln result)
(loop)]))
欢迎加入Racket 隐修会 :731859928(QQ)