Messing with Server Push

Okay… I'm a fiend. I'm a server-push fiend. I did some amazing semantics and implementation for server push in Lift.

I've been coming up to speed with Clojure via Project Plugh. The first step in my building a bunch of tools to use in Clojure-land.

The nifty thing about what I've built so far is that you can run a chat application just by using core.async Channels.

The Chat App

So, that chat app using AngularJS, modified clang code, and some stuff I've been brewing in Plugh.

The View

First, let's look at the view… it's standard Angular code:

<h2>Chat</h2>

<div ng-controller="Chatter">
	<ul>
		<li ng-repeat="n in chats">{{n}}</li>
	</ul>

	<input ng-model="line"><button ng-click="send()">Send</button>
</div>

Pretty simple.

The client side

Here's the AngularJS controller, written in ClojureScript:

(def server-chan (pc/server-chan "The Chat Server"))

Define the channel that, on the server-side contains our chat server.

(def.controller pc/m Chatter [$scope]
  (assoc! $scope :chats (clj->js []))

Define a JavaScript array for chats.

  (assoc! $scope :line "")

And the line variable.

  (defn.scope send [] 
    (let [msg (:line $scope)]
      (go 
        (>! server-chan {:msg msg})))
    (assoc! $scope :line ""))

When the user clicks the Send button, take the current value of line and send it as a message to the server.

  (go
    (let [rc (chan)]
      (>! server-chan {:add rc})
      (while true
        (let [chats (<! rc)]
          (in-scope (doseq [m chats] (.push (:chats $scope) m)))
        ))))
  )

And register ourselves as a listener with the server by creating a local Channel, chan and sending it in an :add message to the server.

Then wait for message from the server and append them to the chats array within the scope of Angular (so that Angular picks up the changes and displays them.)

The server side

Here's the code on the server side:

(go 
    (let [server-chan (make-server-chan "The Chat Server")]

Define the named server-side channel.

      (while true
        (let [v (<! server-chan)]

Get the next message.

          (when-let [n (:add v)]
            (swap! listeners #(conj % n))
            (>! n @chats))

Add to the listeners.

          (when-let [chat-msg (:msg v)]
            (swap! chats #(conj % chat-msg))
            (doseq [ch @listeners] (>! ch [chat-msg])))))))

process a chat message and re-send it to all of the listeners.

Boom. That's it.

I want my HTTP

So, how do we send messages across HTTP and deal with stuff?

Client side auto-channel proxies

The first thing we do is change the serializer for a Channel:

(extend-protocol IPrintWithWriter
  cljs.core.async.impl.channels/ManyToManyChannel
  (-pr-writer [chan writer opts]
              (let [guid (find-guid-for-chan chan)]
              (-write writer "#guid-chan\"")
              (-write writer guid)
              (-write writer "\""))))

The above code looks up the guid for the channel. If there's no guid for the channel, one is created. So, we have a text representation for the channel to send to the server.

And when we read channels back, we use:

(defn ^:private guid-chan-reader
  [s]
  (find-chan-for-guid (str s)))
	
(cr/register-tag-parser! "guid-chan" guid-chan-reader)

The channel reader finds the appropriate channel for the guid. But if the guid has not been seen, then we create a proxy back to the server:

(defn find-chan-for-guid [guid]
  (or (get @guid-to-chan guid)
      (let [nc (chan)]
        (register-chan nc guid)
        (go
          (while true
            (let [msg (<! nc)]
              (send-to-server (pr-str {:chan nc :msg msg}))
              )))
        nc)
      ))

And that proxy waits for messages and then send the message to the server with the channel guid.

Finally, when we get a message from the server back to the client, the process is reversed:

(fn [me] 
    (do
      (let [info (.-data me)
            parsed (cr/read-string info)
            chan (:chan parsed)
            msg (:msg parsed)]
        (if (and chan msg)
          (do 
            (go (>! chan msg)))                             
          )))))

We look up the channel from the message's :chan guid and send a message to that channel.

Server side

The server side is much the same, except the Clojure serialization incantations are marginally different from the ClojureScript incantations.

Here's our writer:

(defmethod print-method 
  clojure.core.async.impl.channels.ManyToManyChannel
  [chan, ^java.io.Writer w]
  (let [guid (find-guid-for-chan chan)]
    (.write w "#guid-chan\"")
    (.write w guid)
    (.write w "\"")))

And our reader (we use the edn) code for reading because it does not exec code from the wire:

(edn/read-string {:readers {'guid-chan find-chan-for-guid}} data)

And here's the complete "read from the wire and dispatch message" code:

(binding [*websocket* ch]
  (let [thing (edn/read-string 
          {:readers {'guid-chan find-chan-for-guid}} data)
        chan (:chan thing)
        msg (:msg thing)]
  (if (and chan msg)
    (go (>! chan msg))))))))

Finally, our to-client proxy code:

(go
 (loop []
   (let [msg (<! nc)]
     (hs/send! socket {:body (pr-str {:chan nc :msg msg})})
     )
   (recur)))

And that's it.

To Do

There's a lot of stuff yet to do in the code including dealing with transient web socket drops, de-cupping messages, optional sessions, garbage collection of channel proxies when the channel on the other side closes.

But the key pieces of cross address space core.async via HTTP is in Plugh.

Yay!

Thanks

The Clojure community is pretty amazing. Key thanks go out to David Nolen who maintains the ClojureScript core.async stuff along with dozens of other packages.