From 55bef0924407c3bd43c5d72a0b936f041ca3ce4f Mon Sep 17 00:00:00 2001 From: Ringo Wantanabe Date: Fri, 5 Apr 2019 02:53:05 +0100 Subject: [PATCH] Initial commit --- README | 15 ++++++ cl-channel.asd | 13 +++++ cl-channel.lisp | 126 +++++++++++++++++++++++++++++++++++++++++++++ cl-dispatcher.lisp | 54 +++++++++++++++++++ package.lisp | 9 ++++ 5 files changed, 217 insertions(+) create mode 100644 README create mode 100644 cl-channel.asd create mode 100644 cl-channel.lisp create mode 100644 cl-dispatcher.lisp create mode 100644 package.lisp diff --git a/README b/README new file mode 100644 index 0000000..6025450 --- /dev/null +++ b/README @@ -0,0 +1,15 @@ +Go-like channels for atomically passing information between threads. + +(make-channel) ; make new channel +(make-channel 2) ; make new channel with max size of 2 (untested) + +(-> chan item) ; send item to channel +(<- chan) ; receive from channel (values item is-not-closed) +(release chan) ; close channel +(closed chan) ; is channel closed + + +(make-dispatcher) ; make dispatcher +(hook disp name lambda) ; add hook +(sig name (optional value)) ; signal name in parallel +(sig-serial name (optional value)) ; signal name in serial diff --git a/cl-channel.asd b/cl-channel.asd new file mode 100644 index 0000000..35141d9 --- /dev/null +++ b/cl-channel.asd @@ -0,0 +1,13 @@ +;;;; cl-channel.asd + +(asdf:defsystem #:cl-channel + :description "golang-like channels for CL" + :author "Avril " + :license "None" + :version "0.0.1" + :serial t + :depends-on ( :flan-utils + :bt-semaphore ) + :components ((:file "package") + (:file "cl-channel") + (:file "cl-dispatcher"))) diff --git a/cl-channel.lisp b/cl-channel.lisp new file mode 100644 index 0000000..1ae8278 --- /dev/null +++ b/cl-channel.lisp @@ -0,0 +1,126 @@ + + +(in-package :cl-channel) + +(flan-utils:enable-all-readers) + +(defstruct %queue + internal + mutex) + +[ +(defun make-queue (&optional (from nil)) + (let ((q (make-%queue))) + (setf (%queue-internal q) from) + (setf (%queue-mutex q) (bt:make-lock)) + q)) + +(defun queue-> (q i) + (bt:with-lock-held ((%queue-mutex q)) + (setf (%queue-internal q) (reverse (cons i (%queue-internal q)))))) + +(defun queue-poll (q) + (bt:with-lock-held ((%queue-mutex q)) + (list-length (%queue-internal q)))) + +(defun queue<- (q) + (bt:with-lock-held ((%queue-mutex q)) + (if (< (list-length (%queue-internal q)) 1) (values nil nil) + (values (pop (%queue-internal q)) t)))) + +(defun queue-clear (q) + (bt:with-lock-held ((%queue-mutex q)) + (setf (%queue-internal q) nil))) +] + +(defstruct %channel + internal + mutex + rel-send + rel-recv + rel-close + max + closed) + +(defmacro %atomic (chan &body body) + `(bt:with-lock-held ((%channel-mutex ,chan)) + ,@body)) + +(defun sigall (sem) + (bt-sem:signal-semaphore sem (bt-sem:semaphore-waiters sem))) + +[ +(defun make-channel (&optional (max 0)) + (let ((c (make-%channel))) + (setf (%channel-internal c) (make-queue)) + (setf (%channel-mutex c) (bt:make-lock)) + (setf (%channel-rel-send c) (bt-sem:make-semaphore)) + (setf (%channel-rel-recv c) (bt-sem:make-semaphore)) + (setf (%channel-rel-close c) (bt-sem:make-semaphore)) + (setf (%channel-max c) max) + (setf (%channel-closed c) nil) + c)) + +(defun closed (chan) (%atomic chan + (%channel-closed chan))) + +(defun poll (chan) + (%atomic chan + (queue-poll (%channel-internal chan)))) + +(defun <- (chan) + (let ((out nil) + (rout nil)) + (loop while (and (null out) ¬(closed chan)) do + (progn + (if (> (poll chan) 0) + (%atomic chan + (when (> (queue-poll (%channel-internal chan)) 0) + (setf out t) + (setf rout (queue<- (%channel-internal chan))) + (bt-sem:signal-semaphore (%channel-rel-send chan) 1))) + (bt-sem:wait-on-semaphore (%channel-rel-recv chan))))) + (if (closed chan) (values nil nil) (values rout t)))) + +(defun -> (chan item) + (loop while (and ¬(closed chan) (> (%channel-max chan) 0) (%atomic chan (>= (queue-poll (%channel-internal chan)) (%channel-max chan)))) + do (bt-sem:wait-on-semaphore (%channel-rel-send))) + (let ((lv (%atomic chan + (if ¬(if (or (%channel-closed chan) (and (> (%channel-max chan) 0) (>= (queue-poll (%channel-internal chan)) (%channel-max chan)))) + nil + (progn + (queue-> (%channel-internal chan) item) + (bt-sem:signal-semaphore (%channel-rel-recv chan) 1) + t)) + (if (%channel-closed chan) nil 'reset) + t)))) + (if (eq lv 'reset) + (-> chan item) + lv))) + +(defun release (chan) + (%atomic chan + (setf (%channel-closed chan) t) + (sigall (%channel-rel-recv chan)) + (sigall (%channel-rel-send chan)))) + +(defun poll (chan) + (%atomic chan + (queue-poll (%channel-internal chan)))) + +(defmacro make () + `(make-channel)) +] + +(defun test () + (let ((chan (make-channel))) + $(progn + (loop while ¬(closed chan) do (let ((val (<- chan))) + (pprint val) + (pprint ".") + (when (string-equal val "CLOSE") + (release chan)))) + (print "Thread end") + (print ".")) + (loop while ¬(closed chan) do (-> chan (write-to-string (read))))) + (print "End")) diff --git a/cl-dispatcher.lisp b/cl-dispatcher.lisp new file mode 100644 index 0000000..837e2f0 --- /dev/null +++ b/cl-dispatcher.lisp @@ -0,0 +1,54 @@ + + +(in-package :cl-dispatcher) + +(flan-utils:enable-all-readers) + +(defstruct %dispatcher + hooks + lock) + +(defmacro %atomic (disp &body thing) + `(bt:with-lock-held ((%dispatcher-lock disp)) + ,@thing)) + +[ +(defun make-dispatcher () + (let ((d (make-%dispatcher))) + (setf (%dispatcher-hooks d ) nil) + (setf (%dispatcher-lock d) (bt:make-lock)) + d)) + +(defmacro make () + `(make-dispatcher)) + +(defun hook (disp name lam) + (%atomic disp + (if (assoc name (%dispatcher-hooks disp)) + (push lam (cdr (assoc name (%dispatcher-hooks disp)))) + (push (cons name (list lam)) (%dispatcher-hooks disp))))) + +(defun sig (disp name &optional (x nil)) + (%atomic disp + (let ((hooks (assoc name (%dispatcher-hooks disp)))) + (if (null hooks) + nil + (mapc #'(lambda (y) $(funcall y x)) (cdr hooks)))))) + +(defun sig-serial (disp name &optional (x nil)) + (%atomic disp + (let ((hooks (assoc name (%dispatcher-hooks disp)))) + (if (null hooks) + nil + (mapc #'(lambda (y) (funcall y x)) (cdr hooks)))))) +] + +(defun test () + (let ((d (make))) + (hook d "test" (lambda (x) (print x))) + (hook d "test" (lambda (x) (print (cons "!!" x)))) + (hook d "test" (lambda (x) (print "HELLO"))) + + (sig-serial d "test" 'uwu) + (print 'signalled))) + diff --git a/package.lisp b/package.lisp new file mode 100644 index 0000000..e21a3cb --- /dev/null +++ b/package.lisp @@ -0,0 +1,9 @@ +;;;; package.lisp + +(defpackage #:cl-channel + (:use #:cl) + (:nicknames :channel)) + +(defpackage #:cl-dispatcher + (:use #:cl) + (:nicknames :dispatcher))