diff --git a/README b/README index 46c59fb..8ab7eca 100644 --- a/README +++ b/README @@ -1,12 +1,15 @@ -Go-like channels for atomically passing information between threads. +=== Go-like channels for atomically passing information between threads, and some other stuff. === (channel:make-channel) ; make new channel (channel:make-channel 2) ; make new channel with max size of 2 (untested) (channel:-> chan item) ; send item to channel -(channel:<- chan) ; receive from channel (values item is-not-closed) +(channel:<- chan) ; receive from channel (values item is-not-closed), blocks until there is one (channel:release chan) ; close channel (channel:closed chan) ; is channel closed +(channel:¬closed chan) ; (not (closed chan)) + +--- Signalling --- (dispatcher:make-dispatcher) ; make dispatcher @@ -14,6 +17,7 @@ Go-like channels for atomically passing information between threads. (dispatcher:sig name (optional value)) ; signal name in parallel (dispatcher:sig-serial name (optional value)) ; signal name in serial +--- Atomic operations --- (box:make &optional value) ; make a box with default value diff --git a/async-tools.lisp b/async-tools.lisp new file mode 100644 index 0000000..3ab633c --- /dev/null +++ b/async-tools.lisp @@ -0,0 +1,82 @@ + +(in-package #:async-tools) + +(defun val (v) v) + +(defun sexpr-reader (stream char &key (func 'val)) + "Read next token only if S expression, else return as is" + (if (char= (peek-char t stream t nil t) #\() + (values (funcall func (read stream t nil t)) t) + (let ((*readtable* (copy-readtable))) + (set-macro-character char nil) + (values (read-from-string (concatenate 'string (string char) (write-to-string (read stream t nil t)))) nil)))) + +(defun async-reader (stream char) + (multiple-value-bind (thing okay) (sexpr-reader stream char) + (if okay + (cons 'async (list thing)) + thing))) + +(defstruct promise + thread + value + ended) + +(defmacro async (&body form) + "Run body in seperate thread, returns promise." + `(let ((end (make-promise))) + (setf (promise-value end) nil) + (setf (promise-ended end) nil) + (setf (promise-thread end) + (bt:make-thread + #'(lambda () + (setf (promise-value end) (progn ,@form)) + (setf (promise-ended end) t)))) + end)) + +(defun wait (promise) + "Wait for promise to complete, returns the last value evaluated and T if the thread exited cleanly" + (bt:join-thread (promise-thread promise)) + (value promise)) + +(defun alive? (promise) + "Is the thread alive?" + (bt:thread-alive-p (promise-thread promise))) + +(defun value (promise) + "The current value of the promise and T if the thread exited cleanly, (returns nil nil if it has not terminated yet)" + (values (promise-value promise) (clean? promise))) + +(defun ended? (promise) + "Has the promise completed?" + (promise-ended promise)) + +(defun thread (promise) + "The thread for the promise" + (promise-thread promise)) + +(defun clean? (promise) + "Did the thread exit, and did it do so cleanly?" + (and (ended? promise) (not (alive? promise)))) + +(defun kill (promise) + "Kill thread. Unclean exit." + (bt:destroy-thread (promise-thread promise))) + +(mapc 'export '( + promise + promise-p + wait + alive? + thread + value + ended? + clean? + async)) + +(defmacro enable-reader () + "Turn on reader macroi $(form) to run (async form)" + '(eval-when (:compile-toplevel :load-toplevel :execute) + (set-macro-character #\$ 'async-reader))) + +(export 'enable-reader) diff --git a/cl-channel.asd b/cl-channel.asd index ab12114..f2d6127 100644 --- a/cl-channel.asd +++ b/cl-channel.asd @@ -8,6 +8,7 @@ :serial t :depends-on ( :bt-semaphore ) :components ((:file "package") + (:file "async-tools") (:file "cl-channel") (:file "cl-box") (:file "cl-dispatcher"))) diff --git a/cl-channel.lisp b/cl-channel.lisp index 6a08d70..8ec2956 100644 --- a/cl-channel.lisp +++ b/cl-channel.lisp @@ -7,6 +7,7 @@ internal mutex) +(async-tools:enable-reader) (mapc 'export (list (defun make-queue (&optional (from nil)) @@ -65,6 +66,9 @@ (defun closed (chan) (%atomic chan (%channel-closed chan))) +(defun ¬closed (chan) + (not (closed chan))) + (defun poll (chan) (%atomic chan (queue-poll (%channel-internal chan)))) @@ -114,19 +118,19 @@ )) -#|(defun test () +(defun test () (let ((chan (make-channel))) - $(progn - (loop while ¬(closed chan) do (let ((val (<- chan))) + + $(progn (loop while (not (closed chan)) do (let ((val (<- chan))) (pprint val) (pprint ".") (when (string-equal val "CLOSE") (release chan)))) (print "Thread end") (print ".")) - $(loop while ¬(closed chan) do + $(loop while (not (closed chan)) do (progn (sleep 2) (-> chan 'teste))) - (loop while ¬(closed chan) do (-> chan (write-to-string (read))))) - (print "End"))|# + (loop while (not (closed chan)) do (-> chan (write-to-string (read))))) + (print "End")) diff --git a/cl-dispatcher.lisp b/cl-dispatcher.lisp index 5b1c4b3..cde23ae 100644 --- a/cl-dispatcher.lisp +++ b/cl-dispatcher.lisp @@ -32,7 +32,7 @@ (let ((hooks (assoc name (%dispatcher-hooks disp)))) (if (null hooks) nil - (mapcar #'(lambda (y) (bt:make-thread (funcall y x))) (cdr hooks)))))) + (mapcar #'(lambda (y) (bt:make-thread (lambda () (funcall y x)))) (cdr hooks)))))) (defun sig-serial (disp name &optional (x nil)) (%atomic disp @@ -40,13 +40,15 @@ (if (null hooks) nil (mapc #'(lambda (y) (funcall y x)) (cdr hooks)))))) + )) (defun test () - (let ((d (make))) + (let ((d (make-dispatcher))) (hook d "test" (lambda (x) (print x))) (hook d "test" (lambda (x) (print (cons "!!" x)))) (hook d "test" (lambda (x) (print "HELLO"))) (sig-serial d "test" 'uwu) (print 'signalled))) + diff --git a/package.lisp b/package.lisp index d94bdf7..6bc7f43 100644 --- a/package.lisp +++ b/package.lisp @@ -1,5 +1,8 @@ ;;;; package.lisp +(defpackage #:async-tools + (:use #:cl)) + (defpackage #:cl-channel (:use #:cl) (:nicknames :channel))