Saturday, April 28, 2012

Using gproc and cowboy to broadcast messages via websockets in Erlang

Recently, I have been doing a fair bit of reading around event based programming and some of the design patterns associated with it, namely the Actor Model.  The basic concept is that instead of dealing with threads, locks and shared memory, there is no shared memory (everything is immutable), no side effects and thus there is no need for locks.  Instead of threads, Erlang, for example, will spawn tons of processes and if you want to have them communicate you do this by message passing, not shared memory.  The result here, is that this can produce much better concurrency on multi-core systems, for a bunch of reasons, one of them being that message passing between ultra-lightweight immutable processes is much more efficient than managing and context switching between threads.  Erlang has its own scheduler that will spawn one thread per core and manage all the processes in each core for you allowing you to scale with the number of cores with zero development (its part of the VM).

This is actually one of the key reasons I recently started exploring these design patterns.  CPU clock speeds have somewhat leveled off over the last 12-24 months and companies like Intel and AMD are focusing more on adding cores, rather than raw clock speed.  What this means, is that vertical scaling is not really increasing at the rate it once was and we have to think about programming for a true multi-core environment.  To explore Erlang, I took a real use case I had that I thought would be perfect for maximizing concurrency on a box, websockets.  The real-time web is inevitable and upon us, and with the dizzying amount of "event based, asynchronous" javascript frameworks, it should be obvious that this is the path of modern web application development - pushing data from the server to the client.

Now, websockets is a relatively new technology, and I am not even 100% positive that the RFCs are finalized as they have changed a few times already.  However, doing true bidirectional communication  between the browser and the server is something that I was very much interested in, so the research began....

In researching libraries, there seemed to be a few contenders. Misultin and Cowboy seemed to be the most prevalent that supported websockets so I dug in.  About a month ago there was a thread about Misultin stopping development (I am sure someone will pick it up) and while the benchmarks I read looked impressive, Cowboy seemed to have a much stronger and active development community.

So, my first task was to figure out how to build a websocket server that would establish a native websocket connection with the browser, reply to a message sent to it and also allow a background process to shoot a message to a bunch of connected clients.  All but the latter are pretty much straight out of the box.  Luckily, the creator of cowboy pointed out the module gproc, which enables you to keep a "process dictionary" of a set of processes (in our case a websocket connection) and give it a name to be referenced later.  Whenever a websocket connection is created, we register that connection and give it a name (the same name in fact, for every connection).  Then, anywhere in our code we can pass a message to that gproc name which will then fire the message over all the websocket connections that were registered with that name.  As long as our websocket handler recognizes this message, we pass it on through.  Enough talk, lets look at some code....

First things first, the way cowboy works, is you need to register a "handler".  This is essentially a module that gets called when a connection is made (a callback) that needs to implement a certain set of functions.  To implement a handler in cowboy you need a function that looks like this:

-module(my_handler).
-export([init/3, handle/2, terminate/2]).

init({tcp, http}, Req, Opts) ->
    {ok, Req, undefined_state}.

handle(Req, State) ->
    {ok, Req2} = cowboy_http_req:reply(200, [], <<"Hello World!">>, Req),
    {ok, Req2, State}.

terminate(Req, State) ->
    ok.


Now whenever an http call is made, we will pass through init/3 and then handle/2.  Here you can access all the necessary http req params (query, host, header, etc) and take some action.

Next up is our generic websocket code in the module.  The first thing we need to do is add some code to init to "upgrade" the connection to websockets.  The websocket specification says that if the browser supports websockets,  one of the things it initiates the connection with is a request with a header that looks like "Upgrade: websocket".  So in our init we look for this which allows us to tell cowboy to switch to a websocket connection instead of HTTP:



init({_Any,http}, Req, []) ->
    case cowboy_http_req:header('Upgrade', Req) of
                {undefined, Req2} -> {ok, Req2, undefined};
                {<<"websocket">>, _Req2} -> {upgrade, protocol, cowboy_http_websocket};
                {<<"WebSocket">>, _Req2} -> {upgrade, protocol, cowboy_http_websocket}
        end.

Now, this well tell cowboy to change the handler from http to websockets.  Our next step is to do a couple of things, first to define a "key" that gproc will use to store all the processes and secondly to register each new websocket connection against this key.  This will allow us to later reference every connection by this key so we can broadcast the message.  So first lets do a simple define of our key

-define(WSKey,{pubsub,wsbroadcast}).

Now, when using websockets in cowboy there are a few functions in the callback hander module, websocket_init,  websocket_handle, websocket_info, websocket_terminate.  We are going to focus on websocket_init (new connection) to register the connection and websocket_info which deals with messages over the pipe.  First, lets register the connection in websocket_init


websocket_init(_Any, Req, []) ->
        Req2 = cowboy_http_req:compact(Req),
        gproc:reg({p, l, ?WSKey}),
        {ok, Req2, undefined, hibernate}.

Here we call gproc:reg() which takes 3 parameters, p is the (property), l is the scope (local) and ?WSKey is our key to register this process against.  Since every connection will use the same key we now have an easy way to store and reference every connection.  Next, we update websocket_info to handle the message being sent over the connection.


websocket_info(Info, Req, State) ->
    case Info of
        {_PID,?WSKey,Msg} ->
                {reply, {text, Msg}, Req, State, hibernate};
        _ -> 
                        {ok, Req, State, hibernate}
    end.

Here we match on a message being sent over the websocket connection of the format {_PID,?WSKey,Msg}.  Once we match, we simply pass the message over the connection.  Our last step is to actually broadcast a message that contains this format.  Now, here is where you need to roll your own logic to determine how you want to ingest messages (queue, webservice, etc), but once your app gets the message you are waiting for all you need is one line to send the message:

gproc:send({p, l, ?WSKey}, {self(), ?WSKey, Msg})


This simply sends message "Msg" to all the processes that were registered with the key ?WSKey and then gets handled in websocket_info and thus passed over the websocket connection.

Now, you can obviously get clever and put some logic in websocket_init to determine which key the user should get so that you can segment your users.  You can also use the handle/2 method to do things like accept a query param like ?channel=X which would allow you to subscribe to certain keys via an HTTP call from the client side.  A lot of possibilities here.

Well, hopefully this helps someone get going with websockets, in my next post I am going to take this example to the next level and wire up an AMQP connection to consume messages in real-time and then broadcast those consumed messages over the connection.  Stay tuned...





12 comments:

Will said...

Fantastic, this is exactly what I needed for sending messages to all connected websocket clients. Great stuff and really easy!

andy said...

Hi
And how is it possible to get list of all processes registered with some key?

Dberg said...

Well the send() method basically does this

lists:foreach(fun(Pid) ->
Pid ! Msg
end, lookup_pids(Key)),

so you can probably just call gproc:lookup_pids(Key) which in turn just calls the select

ets:select(?TAB, [{{{Key,'_'}, '$1', '_'},[],['$1']}])

Eduardo said...

From the gproc webpage:

subscribe(EventType) ->
%% Gproc notation: {p, l, Name} means {(p)roperty, (l)ocal, Name}
gproc:reg({p, l, {?MODULE, EventType}}).

P is for property, not process

Dberg said...

Thanks, Fixed.

Atul said...

Waiting for second part with AMQP broker. I am working on integration with RabbitMQ so i am waiting eagerly for second part of this tutorial..

Dberg said...

Atul,

I am working on an updated post as well as the open source version of the product. In the meantime you can checkout a blogpost picked up by vmware here http://blogs.vmware.com/vfabric/2013/03/scaling-real-time-comments-huffpost-live-with-rabbitmq.html that outlines my talk i gave at Erlang SF 2013. it covers some of the RabbitMQ integration that we did.

Atul Singh said...

Thanks Adam for the reply. Actually I have implemented it using RabbitMQ and Gproc however I am just worried as GPROC will keep it in ETS table and loop through all the connected user to send the message. Will it not be slow when user volume is really high??

Is there any other approach.. I like kaazing for doing this stuff as kaazing will take all the load and only one connection will be open with Server... but unfortunately its expensive :)

Dberg said...

Scanning the ets table on an anchored ordered select should not be too expensive. Here is a very simple test i wrote for 1 million records https://gist.github.com/denen99/5682399

And here is the result on my laptop

> test:start().
Table created
Load data complete
runselect took 0.042299 seconds
foldl took 0.06906 seconds
ok

So scanning 1,000,000 ets rows took about 40ms on my laptop. you probably cant get much more than 1,000,000 connections on a single node so this is a theoretical max you would need to test for.

Atul Singh said...

Hey Adam.. Thanks a lot mate... that will be helpful for me. Good news is I tweaked gen_bunny code little bit now I am able to connect to RabbitMQ using cowboy and each user can directly subscribed to RabbitMQ via cowboy using websocket..

I appreciate all your inputs.. Thanks again....

Dberg said...

we wrote our own rabbitMQ consumer and store connections in an ETS table. Then you can map subscriptions via websockets to a channel subscription stored in the ETS table

Atul Singh said...

Thats cool