Simple pub/sub messaging for the web

Ruby server


Both the server and client support an extension mechanism that lets you intercept messages as they pass in and out. This lets you modify messages for any purpose you like, including messages on /meta/* channels that are used by the protocol. An extension is just an object that has either an incoming() or outgoing() method (or both). These methods should accept a message and a callback function, and should call the function with the message once they have made any modifications.

Extensions use a callback instead of simply returning the modified message since this allows you to use asynchronous logic to make your modifications.

As an example, suppose we want to authenticate subscription messages by checking an authentication token against a list we’re keeping in a file on disk. Clients subscribe to channels by sending a message to the /meta/subscribe channel with the channel they want to subscribe in the subscription field. Let’s say our authentication file contains a JSON object that maps channels to required tokens:

// tokens.json

  "/users/jcoglan/updates": "rt6utrb",
  "/artists/mclusky/news":  "99taaec" 

The server can validate subscription messages by checking that they have the right auth token attached. By convention, data added by extensions is stored in the message’s ext field.

class ServerAuth
  def incoming(message, callback)
    # Let non-subscribe messages through
    unless message['channel'] == '/meta/subscribe'
      return callback.call(message)

    # Get subscribed channel and auth token
    subscription = message['subscription']
    msg_token    = message['ext'] && message['ext']['authToken']

    # Find the right token for the channel
    @file_content ||= File.read('./tokens.json')

    registry = JSON.parse(@file_content)
    token    = registry[subscription]

    # Add an error if the tokens don't match
    if token != msg_token
      message['error'] = 'Invalid subscription auth token'

    # Call the server back now we're done


If you add an error property to a message, the server will not process the message further and will simply return it to the sender, effectively blocking the subscription attempt. You should always make sure your extension calls the callback, as failing to do so could block delivery of other messages in the same request.

When implementing authentication, remember that a message published to channel /foo/bar/qux will be routed to subscriptions to /foo/bar/qux, /foo/bar/*, /foo/bar/**, /foo/** and /**. Take appropriate measures in your extensions to correctly authenticate subscriptions.

On the client side, you’ll need to make sure the client sends the right auth token to satisfy the server. We do this by adding an outgoing extension on the client side.

class ClientAuth
  def outgoing(message, callback)
    # Again, leave non-subscribe messages alone
    unless message['channel'] == '/meta/subscribe'
      return callback.call(message)

    # Add ext field if it's not present
    message['ext'] ||= {}

    # Set the auth token
    message['ext']['authToken'] = 'rt6utrb'

    # Carry on and send the message to the server


If an extension has an added() method, that will be called when the extension is added to its host. To remove an extension, call:

# Calls extension.removed() if defined

Accessing request data

On the server side, you can gain access to details of the request in which the message was delivered, by writing extensions with 3 arguments:

class ServerExt
  def incoming(message, request, callback)
    if request && request.env['HTTP_ORIGIN'] != 'http://example.com'
      message['error'] = '403::Forbidden origin'

If an extension method has 3 arguments, the second argument will be a Rack::Request object, if the message was delivered by HTTP or WebSocket. For messages sent by a local server-side client, request will be nil.

You should not use the request object to send a response yourself; you should only use its properties to make a decision about how to handle the message, for example by checking the Origin or Cookie headers.

If you use Cookie for authorization, you must implement CSRF protection since Faye allows cross-origin connections.

To write extensions you’ll need to know what kinds of messages are used by the Bayeux protocol; see the specification for more details.