copy some async stuff from flan-utils

master
Avril 6 years ago
parent 111f317d20
commit 38b01ee387
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -1,12 +1,15 @@
Go-like channels for atomically passing information between threads.
=== Go-like channels for atomically passing information between threads, and some other stuff. ===
(channel:make-channel) ; make new channel
(channel:make-channel 2) ; make new channel with max size of 2 (untested)
(channel:-> chan item) ; send item to channel
(channel:<- chan) ; receive from channel (values item is-not-closed)
(channel:<- chan) ; receive from channel (values item is-not-closed), blocks until there is one
(channel:release chan) ; close channel
(channel:closed chan) ; is channel closed
(channel:¬closed chan) ; (not (closed chan))
--- Signalling ---
(dispatcher:make-dispatcher) ; make dispatcher
@ -14,6 +17,7 @@ Go-like channels for atomically passing information between threads.
(dispatcher:sig name (optional value)) ; signal name in parallel
(dispatcher:sig-serial name (optional value)) ; signal name in serial
--- Atomic operations ---
(box:make &optional value) ; make a box with default value

@ -0,0 +1,82 @@
(in-package #:async-tools)
(defun val (v) v)
(defun sexpr-reader (stream char &key (func 'val))
"Read next token only if S expression, else return as is"
(if (char= (peek-char t stream t nil t) #\()
(values (funcall func (read stream t nil t)) t)
(let ((*readtable* (copy-readtable)))
(set-macro-character char nil)
(values (read-from-string (concatenate 'string (string char) (write-to-string (read stream t nil t)))) nil))))
(defun async-reader (stream char)
(multiple-value-bind (thing okay) (sexpr-reader stream char)
(if okay
(cons 'async (list thing))
thing)))
(defstruct promise
thread
value
ended)
(defmacro async (&body form)
"Run body in seperate thread, returns promise."
`(let ((end (make-promise)))
(setf (promise-value end) nil)
(setf (promise-ended end) nil)
(setf (promise-thread end)
(bt:make-thread
#'(lambda ()
(setf (promise-value end) (progn ,@form))
(setf (promise-ended end) t))))
end))
(defun wait (promise)
"Wait for promise to complete, returns the last value evaluated and T if the thread exited cleanly"
(bt:join-thread (promise-thread promise))
(value promise))
(defun alive? (promise)
"Is the thread alive?"
(bt:thread-alive-p (promise-thread promise)))
(defun value (promise)
"The current value of the promise and T if the thread exited cleanly, (returns nil nil if it has not terminated yet)"
(values (promise-value promise) (clean? promise)))
(defun ended? (promise)
"Has the promise completed?"
(promise-ended promise))
(defun thread (promise)
"The thread for the promise"
(promise-thread promise))
(defun clean? (promise)
"Did the thread exit, and did it do so cleanly?"
(and (ended? promise) (not (alive? promise))))
(defun kill (promise)
"Kill thread. Unclean exit."
(bt:destroy-thread (promise-thread promise)))
(mapc 'export '(
promise
promise-p
wait
alive?
thread
value
ended?
clean?
async))
(defmacro enable-reader ()
"Turn on reader macroi $(form) to run (async form)"
'(eval-when (:compile-toplevel :load-toplevel :execute)
(set-macro-character #\$ 'async-reader)))
(export 'enable-reader)

@ -8,6 +8,7 @@
:serial t
:depends-on ( :bt-semaphore )
:components ((:file "package")
(:file "async-tools")
(:file "cl-channel")
(:file "cl-box")
(:file "cl-dispatcher")))

@ -7,6 +7,7 @@
internal
mutex)
(async-tools:enable-reader)
(mapc 'export (list
(defun make-queue (&optional (from nil))
@ -65,6 +66,9 @@
(defun closed (chan) (%atomic chan
(%channel-closed chan)))
(defun ¬closed (chan)
(not (closed chan)))
(defun poll (chan)
(%atomic chan
(queue-poll (%channel-internal chan))))
@ -114,19 +118,19 @@
))
#|(defun test ()
(defun test ()
(let ((chan (make-channel)))
$(progn
(loop while ¬(closed chan) do (let ((val (<- chan)))
$(progn (loop while (not (closed chan)) do (let ((val (<- chan)))
(pprint val)
(pprint ".")
(when (string-equal val "CLOSE")
(release chan))))
(print "Thread end")
(print "."))
$(loop while ¬(closed chan) do
$(loop while (not (closed chan)) do
(progn
(sleep 2)
(-> chan 'teste)))
(loop while ¬(closed chan) do (-> chan (write-to-string (read)))))
(print "End"))|#
(loop while (not (closed chan)) do (-> chan (write-to-string (read)))))
(print "End"))

@ -32,7 +32,7 @@
(let ((hooks (assoc name (%dispatcher-hooks disp))))
(if (null hooks)
nil
(mapcar #'(lambda (y) (bt:make-thread (funcall y x))) (cdr hooks))))))
(mapcar #'(lambda (y) (bt:make-thread (lambda () (funcall y x)))) (cdr hooks))))))
(defun sig-serial (disp name &optional (x nil))
(%atomic disp
@ -40,13 +40,15 @@
(if (null hooks)
nil
(mapc #'(lambda (y) (funcall y x)) (cdr hooks))))))
))
(defun test ()
(let ((d (make)))
(let ((d (make-dispatcher)))
(hook d "test" (lambda (x) (print x)))
(hook d "test" (lambda (x) (print (cons "!!" x))))
(hook d "test" (lambda (x) (print "HELLO")))
(sig-serial d "test" 'uwu)
(print 'signalled)))

@ -1,5 +1,8 @@
;;;; package.lisp
(defpackage #:async-tools
(:use #:cl))
(defpackage #:cl-channel
(:use #:cl)
(:nicknames :channel))

Loading…
Cancel
Save