This is actually one of the key reasons I recently started exploring these design patterns. CPU clock speeds have somewhat leveled off over the last 12-24 months and companies like Intel and AMD are focusing more on adding cores, rather than raw clock speed. What this means, is that vertical scaling is not really increasing at the rate it once was and we have to think about programming for a true multi-core environment. To explore Erlang, I took a real use case I had that I thought would be perfect for maximizing concurrency on a box, websockets. The real-time web is inevitable and upon us, and with the dizzying amount of "event based, asynchronous" javascript frameworks, it should be obvious that this is the path of modern web application development - pushing data from the server to the client.
Now, websockets is a relatively new technology, and I am not even 100% positive that the RFCs are finalized as they have changed a few times already. However, doing true bidirectional communication between the browser and the server is something that I was very much interested in, so the research began....
In researching libraries, there seemed to be a few contenders. Misultin and Cowboy seemed to be the most prevalent that supported websockets so I dug in. About a month ago there was a thread about Misultin stopping development (I am sure someone will pick it up) and while the benchmarks I read looked impressive, Cowboy seemed to have a much stronger and active development community.
So, my first task was to figure out how to build a websocket server that would establish a native websocket connection with the browser, reply to a message sent to it and also allow a background process to shoot a message to a bunch of connected clients. All but the latter are pretty much straight out of the box. Luckily, the creator of cowboy pointed out the module gproc, which enables you to keep a "process dictionary" of a set of processes (in our case a websocket connection) and give it a name to be referenced later. Whenever a websocket connection is created, we register that connection and give it a name (the same name in fact, for every connection). Then, anywhere in our code we can pass a message to that gproc name which will then fire the message over all the websocket connections that were registered with that name. As long as our websocket handler recognizes this message, we pass it on through. Enough talk, lets look at some code....
First things first, the way cowboy works, is you need to register a "handler". This is essentially a module that gets called when a connection is made (a callback) that needs to implement a certain set of functions. To implement a handler in cowboy you need a function that looks like this:
-module(my_handler).
-export([init/3, handle/2, terminate/2]).
init({tcp, http}, Req, Opts) ->
{ok, Req, undefined_state}.
handle(Req, State) ->
{ok, Req2} = cowboy_http_req:reply(200, [], <<"Hello World!">>, Req),
{ok, Req2, State}.
terminate(Req, State) ->
ok.
Now whenever an http call is made, we will pass through init/3 and then handle/2. Here you can access all the necessary http req params (query, host, header, etc) and take some action.
Next up is our generic websocket code in the module. The first thing we need to do is add some code to init to "upgrade" the connection to websockets. The websocket specification says that if the browser supports websockets, one of the things it initiates the connection with is a request with a header that looks like "Upgrade: websocket". So in our init we look for this which allows us to tell cowboy to switch to a websocket connection instead of HTTP:
init({_Any,http}, Req, []) ->
case cowboy_http_req:header('Upgrade', Req) of
{undefined, Req2} -> {ok, Req2, undefined};
{<<"websocket">>, _Req2} -> {upgrade, protocol, cowboy_http_websocket};
{<<"WebSocket">>, _Req2} -> {upgrade, protocol, cowboy_http_websocket}
end.
Now, this well tell cowboy to change the handler from http to websockets. Our next step is to do a couple of things, first to define a "key" that gproc will use to store all the processes and secondly to register each new websocket connection against this key. This will allow us to later reference every connection by this key so we can broadcast the message. So first lets do a simple define of our key
-define(WSKey,{pubsub,wsbroadcast}).
Now, when using websockets in cowboy there are a few functions in the callback hander module, websocket_init, websocket_handle, websocket_info, websocket_terminate. We are going to focus on websocket_init (new connection) to register the connection and websocket_info which deals with messages over the pipe. First, lets register the connection in websocket_init
websocket_init(_Any, Req, []) ->
Req2 = cowboy_http_req:compact(Req),
gproc:reg({p, l, ?WSKey}),
{ok, Req2, undefined, hibernate}.
Here we call gproc:reg() which takes 3 parameters, p is the (property), l is the scope (local) and ?WSKey is our key to register this process against. Since every connection will use the same key we now have an easy way to store and reference every connection. Next, we update websocket_info to handle the message being sent over the connection.
websocket_info(Info, Req, State) ->
case Info of
{_PID,?WSKey,Msg} ->
{reply, {text, Msg}, Req, State, hibernate};
_ ->
{ok, Req, State, hibernate}
end.
Here we match on a message being sent over the websocket connection of the format {_PID,?WSKey,Msg}. Once we match, we simply pass the message over the connection. Our last step is to actually broadcast a message that contains this format. Now, here is where you need to roll your own logic to determine how you want to ingest messages (queue, webservice, etc), but once your app gets the message you are waiting for all you need is one line to send the message:
gproc:send({p, l, ?WSKey}, {self(), ?WSKey, Msg})
This simply sends message "Msg" to all the processes that were registered with the key ?WSKey and then gets handled in websocket_info and thus passed over the websocket connection.
Now, you can obviously get clever and put some logic in websocket_init to determine which key the user should get so that you can segment your users. You can also use the handle/2 method to do things like accept a query param like ?channel=X which would allow you to subscribe to certain keys via an HTTP call from the client side. A lot of possibilities here.
Well, hopefully this helps someone get going with websockets, in my next post I am going to take this example to the next level and wire up an AMQP connection to consume messages in real-time and then broadcast those consumed messages over the connection. Stay tuned...
init({_Any,http}, Req, []) ->
case cowboy_http_req:header('Upgrade', Req) of
{undefined, Req2} -> {ok, Req2, undefined};
{<<"websocket">>, _Req2} -> {upgrade, protocol, cowboy_http_websocket};
{<<"WebSocket">>, _Req2} -> {upgrade, protocol, cowboy_http_websocket}
end.
Now, this well tell cowboy to change the handler from http to websockets. Our next step is to do a couple of things, first to define a "key" that gproc will use to store all the processes and secondly to register each new websocket connection against this key. This will allow us to later reference every connection by this key so we can broadcast the message. So first lets do a simple define of our key
-define(WSKey,{pubsub,wsbroadcast}).
Now, when using websockets in cowboy there are a few functions in the callback hander module, websocket_init, websocket_handle, websocket_info, websocket_terminate. We are going to focus on websocket_init (new connection) to register the connection and websocket_info which deals with messages over the pipe. First, lets register the connection in websocket_init
websocket_init(_Any, Req, []) ->
Req2 = cowboy_http_req:compact(Req),
gproc:reg({p, l, ?WSKey}),
{ok, Req2, undefined, hibernate}.
Here we call gproc:reg() which takes 3 parameters, p is the (property), l is the scope (local) and ?WSKey is our key to register this process against. Since every connection will use the same key we now have an easy way to store and reference every connection. Next, we update websocket_info to handle the message being sent over the connection.
websocket_info(Info, Req, State) ->
case Info of
{_PID,?WSKey,Msg} ->
{reply, {text, Msg}, Req, State, hibernate};
_ ->
{ok, Req, State, hibernate}
end.
Here we match on a message being sent over the websocket connection of the format {_PID,?WSKey,Msg}. Once we match, we simply pass the message over the connection. Our last step is to actually broadcast a message that contains this format. Now, here is where you need to roll your own logic to determine how you want to ingest messages (queue, webservice, etc), but once your app gets the message you are waiting for all you need is one line to send the message:
gproc:send({p, l, ?WSKey}, {self(), ?WSKey, Msg})
This simply sends message "Msg" to all the processes that were registered with the key ?WSKey and then gets handled in websocket_info and thus passed over the websocket connection.
Now, you can obviously get clever and put some logic in websocket_init to determine which key the user should get so that you can segment your users. You can also use the handle/2 method to do things like accept a query param like ?channel=X which would allow you to subscribe to certain keys via an HTTP call from the client side. A lot of possibilities here.
Well, hopefully this helps someone get going with websockets, in my next post I am going to take this example to the next level and wire up an AMQP connection to consume messages in real-time and then broadcast those consumed messages over the connection. Stay tuned...