Introducing Dragonmark Circulate August 16, 2014
Distributed Communicating Sequential Processes (CSP)
Communicating Sequential Processes (CSP) provides excellent patterns for building concurrent systems. Clojure's core.async provides a Clojure implementation of CSP in a single address space.
However, very few programs run in a single address space. Web applications run in a combination of the browser and one or more servers. Very often, applications will span a cluster of servers.
Dragonmark Circulate
provides a mechanism for distributing core.async
channels
across address spaces while providing the same semantics
to all the address spaces.
Some macros
I've written some macros to make writing core.async
code
easier and more linear.
gofor
The gofor
macro allows you to build a comprehension
around core.async
messsage sends and receives.
Here's an example of gofor
:
(dc/gofor
:let [b 44]
[root (get-service my-root {:service 'root})]
[added (add root {:service 'wombat2 :channel (chan) :public true})]
[b (list root)]
:let [a (+ 1 1)]
(reset! my-atom b)
:error (reset! my-atom (str "Got an error " &err " for frogs " &var)))
I've blogged about gofor
. Please see
that post for more information about the macro.
The key take-away from the gofor
macro is timeouts and error handling.
Because the timeout mechanism is automatic (defaulting to 30 seconds),
if a distributed system is unavailable, the macro will timeout.
build-service
services from functions
It's pretty easy to convert a set of functions in a package
to a core.async
channel that will respond/route messages
to the marked functions.
(sc/defn ^:service get-42 :- sc/Num
"Hello"
([] 42)
([x :- sc/Num] (+ x 42)))
(sc/defn ^:service plus-one :- sc/Num
[x :- sc/Num]
(+ 1 x))
The above functions are marked with ^:service
metadata.
Calling the build-service
macro in the current namespace,
one gets a channel that responds to messages formatted by the
gofor
macro. For example:
(def service-channel (dc/build-service))
Thus, in a gofor
comprehension:
(dc/gofor
:let [service (dc/build-service)]
[x (get-42 service)]
[y (plus-one service {:x x})]
(reset! atom-42 [x y])
:error (reset! atom-42 &err))
Note, too, that using Prismatic Schema
in the function definitions, one gets nice, typed documentation by sending
the _commands
command:
[docs (_commands service)]
:let [_ (println "Commands:\n" docs)]
And the resulting documentation:
Commands:
{get-42 Inputs: ([] [x :- sc/Num])
Returns: sc/Num
Hello, plus-one Inputs: [x :- sc/Num]
Returns: sc/Num}
The root context for a distributed service
The build-root-channel
function returns a channel
that by default responds to the commands add
,
remove
, list
, and get-service
. The
add
and remove
commands only respond to messages
with {:local true}
in its metadata
so that only messages sent locally can add metadata,
it's a security feature.
One can add a service to the root channel:
(dc/gofor
[_ (add b-root {:service '42
:public true
:channel service-for-b})]
The 42
service is another channel.
We can also list services and get services:
service-list (list b-root-proxy)
service-42 (get-service b-root-proxy {:service '42})
Doing it distributed
So, we have a root service that will respond to local messages that add services (or remove services). We tie it all together by creating a transport:
b-transport (dc/build-transport b-root message-source message-sink)
Where b-root
is a root channel created via build-root-channel
and message-source
is a channel that will have String content from remote
transports and message-sink
is a channel that serialized message
strings will be sent to.
The source and sink represent abstractions over transporting Strings across address spaces. So, a source/sink pair may represent a web-socket connection, a pair of message queues, etc.
They are abstractions of the message across address spaces. This means that Dragonmark can abstract messages across address spaces by properly serializing the messages and sending them to the sink as well as receiving messages from the source, deserializing, and processing them.
A bit of serial magic
Dragonmark uses Cognitect Transit
for serialization. It includes a custom serializer for core.async
channels
such that when a channel is serialized, a GUID for the channel is created (or
looked up if the channel has already been encountered) and on the other
side, when the GUID is deserialized, a proxy channel is created. When
a message is sent to the proxy, it is serialized and sent to the target in
the target channel's address space. This makes sending a message to a remote
channel as simple as sending a message to a local channel. Just send the
message that includes a "reply" channel. The recipient channel does
work and sends a response back to the reply channel. This works the
same in a local system and across address spaces.
Works well with gofor
This works particularly well with gofor
. When a message is sent to a channel,
an _answer
channel is created and sent with the message. The response is
sent to the answer channel (which is a proxy in the local address space) and
the answer is populated into the variable. Also, the temporary answer
channel is closed and this triggers a close message to be sent to the
remote system which closes the proxy channel and removes it.
All together now
So, pulling it all together, we get:
(dc/gofor
[_ (add b-root {:service '42
:public true
:channel service-for-b})]
[_ (inc b-root-proxy)]
[answer (get b-root-proxy)
service-list (list b-root-proxy)
service-42 (get-service b-root-proxy {:service '42})]
[answer2 (get-42 service-42)]
(do
(reset! res [answer (into #{} service-list) answer2])
(reset! done true))
:error
(do
(reset! res &err)
(reset! done true)))
All of the messages for the b-root-proxy
are serialized and sent
across the faux address space and processed by the remote b-root
handler. This includes asking for the service list, getting a service,
and invoking the service.
Where to from here?
Over the next few months, I plan to build transports. My first two are transports for web sockets so that Om apps don't need to explicitly talk HTTP, but do all the service communication via channels. I also plan to do a RabbitMQ transport so that it's easy to build a distributed back end.
What needs to happen in Dragonmark itself?
- Improved error detection/reporting... not just timeouts.
- Improved mechanisms for discovery of local services and network services.
- Maybe enhance services to support REST endpoints as well, so one only writes the service once and it works via channels and HTTP