Cross Address Space core.async in Clojure/ClojureScript August 13, 2013
Messing with Server Push
Okay… I'm a fiend. I'm a server-push fiend. I did some amazing semantics and implementation for server push in Lift.
I've been coming up to speed with Clojure via Project Plugh. The first step in my building a bunch of tools to use in Clojure-land.
The nifty thing about what I've built so far is that you can run a chat application just by using core.async Channels.
The Chat App
So, that chat app using AngularJS, modified clang code, and some stuff I've been brewing in Plugh.
The View
First, let's look at the view… it's standard Angular code:
<h2>Chat</h2>
<div ng-controller="Chatter">
<ul>
<li ng-repeat="n in chats">{{n}}</li>
</ul>
<input ng-model="line"><button ng-click="send()">Send</button>
</div>
Pretty simple.
The client side
Here's the AngularJS controller, written in ClojureScript:
(def server-chan (pc/server-chan "The Chat Server"))
Define the channel that, on the server-side contains our chat server.
(def.controller pc/m Chatter [$scope]
(assoc! $scope :chats (clj->js []))
Define a JavaScript array for chats
.
(assoc! $scope :line "")
And the line
variable.
(defn.scope send []
(let [msg (:line $scope)]
(go
(>! server-chan {:msg msg})))
(assoc! $scope :line ""))
When the user clicks the Send
button, take the current value of line
and
send it as a message to the server.
(go
(let [rc (chan)]
(>! server-chan {:add rc})
(while true
(let [chats (<! rc)]
(in-scope (doseq [m chats] (.push (:chats $scope) m)))
))))
)
And register ourselves as a listener with the server by creating a local
Channel, chan
and sending it in an :add
message to the server.
Then wait for message from the server and append them to the chats
array within the scope of Angular (so that Angular picks up the changes and
displays them.)
The server side
Here's the code on the server side:
(go
(let [server-chan (make-server-chan "The Chat Server")]
Define the named server-side channel.
(while true
(let [v (<! server-chan)]
Get the next message.
(when-let [n (:add v)]
(swap! listeners #(conj % n))
(>! n @chats))
Add to the listeners.
(when-let [chat-msg (:msg v)]
(swap! chats #(conj % chat-msg))
(doseq [ch @listeners] (>! ch [chat-msg])))))))
process a chat message and re-send it to all of the listeners.
Boom. That's it.
I want my HTTP
So, how do we send messages across HTTP and deal with stuff?
Client side auto-channel proxies
The first thing we do is change the serializer for a Channel:
(extend-protocol IPrintWithWriter
cljs.core.async.impl.channels/ManyToManyChannel
(-pr-writer [chan writer opts]
(let [guid (find-guid-for-chan chan)]
(-write writer "#guid-chan\"")
(-write writer guid)
(-write writer "\""))))
The above code looks up the guid
for the channel. If there's no guid
for the channel, one is created. So, we have a text representation for the channel
to send to the server.
And when we read channels back, we use:
(defn ^:private guid-chan-reader
[s]
(find-chan-for-guid (str s)))
(cr/register-tag-parser! "guid-chan" guid-chan-reader)
The channel reader finds the appropriate channel for the guid
.
But if the guid
has not been seen, then we create a proxy back
to the server:
(defn find-chan-for-guid [guid]
(or (get @guid-to-chan guid)
(let [nc (chan)]
(register-chan nc guid)
(go
(while true
(let [msg (<! nc)]
(send-to-server (pr-str {:chan nc :msg msg}))
)))
nc)
))
And that proxy waits for messages and then send the message to the server
with the channel guid
.
Finally, when we get a message from the server back to the client, the process is reversed:
(fn [me]
(do
(let [info (.-data me)
parsed (cr/read-string info)
chan (:chan parsed)
msg (:msg parsed)]
(if (and chan msg)
(do
(go (>! chan msg)))
)))))
We look up the channel from the message's :chan
guid
and send a message to that channel.
Server side
The server side is much the same, except the Clojure serialization incantations are marginally different from the ClojureScript incantations.
Here's our writer:
(defmethod print-method
clojure.core.async.impl.channels.ManyToManyChannel
[chan, ^java.io.Writer w]
(let [guid (find-guid-for-chan chan)]
(.write w "#guid-chan\"")
(.write w guid)
(.write w "\"")))
And our reader (we use the edn) code for reading because it does not exec code from the wire:
(edn/read-string {:readers {'guid-chan find-chan-for-guid}} data)
And here's the complete "read from the wire and dispatch message" code:
(binding [*websocket* ch]
(let [thing (edn/read-string
{:readers {'guid-chan find-chan-for-guid}} data)
chan (:chan thing)
msg (:msg thing)]
(if (and chan msg)
(go (>! chan msg))))))))
Finally, our to-client proxy code:
(go
(loop []
(let [msg (<! nc)]
(hs/send! socket {:body (pr-str {:chan nc :msg msg})})
)
(recur)))
And that's it.
To Do
There's a lot of stuff yet to do in the code including dealing with transient web socket drops, de-cupping messages, optional sessions, garbage collection of channel proxies when the channel on the other side closes.
But the key pieces of cross address space core.async via HTTP is in Plugh.
Yay!
Thanks
The Clojure community is pretty amazing. Key thanks go out to David Nolen who maintains the ClojureScript core.async stuff along with dozens of other packages.