Faye

Simple pub/sub messaging for the web

Ruby server

Engines

In Faye, the ‘engine’ is the back-end of the server; the engine implements the messaging semantics and deals with state and storage, and the server maps the Bayeux protocol onto the engine’s functionality. See the architecture overview for more information.

The default engine that’s included with Faye stores all state in-process, making it fast but making it impossbile to run a single Faye service across multiple front-end web servers. The following engines are available as separate libraries:

  • faye-redis – Uses Redis to store state and distribute messages between any number of Faye server processes, letting you increase your connection capacity.

Creating your own engine

The pluggable engine architecture allows anyone to write a new back-end for Faye that works the way they want. Engines are passed in when a server is instantiated, for example the Redis engine is set up like so:

server = Faye::RackAdapter.new(
  :mount   => '/faye',
  :timeout => 25,
  :engine  => {
    :type  => Faye::Redis,
    :host  => 'localhost',
    :port  => 6379
  }
)

The :engine field has a :type setting that maps to the engine class, and further options that are used to configure that particular engine. Faye then uses the following API to interact with the engine; this is what you must implement if you are writing your own engine.

Engine.create(proxy, options)

The object passed in the :type field (for example Faye::Redis in the above example) must respond to create(proxy, options) and return an object that implements the rest of the API described below. options is a hash containing the :engine settings supplied by the user, for example in the above example it would be {:host => 'localhost', :port => 6379}.

proxy is an object that intermediates between the server and the engine, and provides methods the engine needs to communicate with the server. In particular it provides the following:

  • proxy.deliver(client_id, messages) – Tells the server to send messages to the given client_id. Will do nothing if there is no open connection for client_id, so you should check whether a connection is open using proxy.has_connection?(client_id) first. messages is an array of hashes representing Bayeux messages.
  • proxy.has_connection?(client_id) – Returns true iff the server currently holds an open /meta/connect request for the given client_id. This can be used when deciding whether to flush a message queue.
  • proxy.generate_id – Returns a random valid Faye client ID as a string. It does not guarantee that the ID is unique; this is the responsibility of the engine.
  • proxy.trigger(event_type, *args) – Triggers events that the user can listen to using on(). The engine is required to emit these events at certain times as detailed below, e.g. proxy.trigger(:disconnect, client_id).
  • proxy.timeout – Returns the server’s connection timeout in seconds. Can be used to set timeouts for inactive clients.
  • proxy.debug, proxy.info, proxy.warn, proxy.error – Lets the engine log messages using the standard set of log levels, e.g. proxy.info('Created new client: ?', client_id). Any ? in the message are replaced with the values of the other parameters, formatted as necessary.

The proxy also emits two events to let you know when a particular client connects and disconnects from the server. These correspond with the return value of proxy.has_connection?(client_id) changing between true and false. You can listen to these events using the on() method.

  • proxy.on('connection:open') { |client_id| } – Fires when the client with ID client_id connects to the server, either by opening a socket or sending a /meta/connect request.
  • proxy.on('connection:close') { |client_id| } – Fires when the client with ID client_id disconnects from the server, either by closing a socket or by the server sending a /meta/connect response. Note this does not mean the client’s session has ended, it just means it does not currently have an open network connection to the server.

The Faye engine API

The object returned by Engine.create(proxy, options) must implement the following interface to be a valid Faye engine. Note that return values are delivered via a block, which allows engine implementations to be asynchronous. Engines do not have to operate asynchronously, but should avoid doing anything that would block the event loop for too long.

The engine should assume that all validation required by the Bayeux protocol has already been performed by the server for all incoming messages. The engine does not need to reimplement any of this, it simply needs to implement the messaging semantics.

There are automated tests in the Faye project that you can use to make sure your engine conforms to the spec. See the faye-redis project for an example of using these tests.

engine.create_client { |client_id| ... }

Should generate and return a new unique Faye client ID. You can use proxy.generate_id to generate valid IDs, but the engine must make sure a unique value is assigned. After choosing an ID and allocating any required storage for it, the ID should be yielded to the block.

The engine should call proxy.trigger(:handshake, client_id) once a valid client ID has been selected.

engine.destroy_client(client_id) { ... }

Should destroy any subscriptions for the given client_id and mark the client_id as disconnected. No further messages will be sent by the client after this happens.

The engine should call proxy.trigger(:disconnect, client_id) in the current process, and should call proxy.trigger(:close, client_id) in all the Faye server processes, after deleting the client’s data. If a block is given it should be called after the client has been deleted.

engine.client_exists(client_id) { |exists| ... }

Should call the given block with true if the given client_id exists, and false otherwise. This is used by the server when validating incoming messages.

engine.ping(client_id)

The server calls this to tell the engine a connection has been received for the given client_id. You can use this as a heartbeat to check clients are still alive. No return value is expected.

engine.subscribe(client_id, channel) { |successful| ... }

Should store a subscription for the given client_id to the given channel, both of which are strings. If a block is given, it must be called with true or false to indicate whether the subscription was registered successfully.

The engine should call proxy.trigger(:subscribe, client_id, channel) iff the subscription was successful and the client was not previously subscribed to the channel.

engine.unsubscribe(client_id, channel) { |successful| ... }

Should delete a subscription for the given client_id to the given channel, both of which are strings. If a block is given, it must be called with true or false to indicate whether the subscription was destroyed successfully.

The engine should call proxy.trigger(:unsubscribe, client_id, channel) iff the subscription was destroyed and the client was actually subscribed to the channel.

engine.publish(message, channels)

Given a hash in message and an array of channel names in channels, the engine should deliver the message to all the clients subscribed to the given channels. For example, if the message was published to /msg/foo by the sender, then channels will be ['/msg/foo', '/msg/*', '/msg/**', '/**'].

Each message should be delivered to each subscribed client exactly once, even if the client is subscribed to several channels in the channels array. If the client is currently connected (see proxy.has_connection?) the engine should use proxy.deliver to send the message to the client. Otherwise the message should be queued until the next time the client connects.

On receipt of a message, the engine should call proxy.trigger(:publish, message['clientId'], message['channel'], message['data']).

Where applicable, the engine must make sure that any published message reaches all the clients that should receive it, whichever Faye server instance they are connected to. How the message is routed to other Faye processes is up to the engine.

engine.empty_queue(client_id)

The server calls this when a client connects to flush any queued messages for the client. The engine should check the client is connected, and if there are any queued messages for it, use proxy.deliver(client_id, messages) to send them to the client.

engine.disconnect

This is called when the server is shut down. The engine should close any connections is has to external processes like databases and cancel any timeouts it has pending.

Disconnecting inactive clients

When a client makes a connection, the engine will receive a call to engine.ping(client_id). This tells the engine the client is still alive. Sometimes, clients fail to send an explicit disconnect message when they shut down, and the engine must detect such clients and delete them. If a client goes much longer than proxy.timeout without sending a ping, the engine should do the following in this order:

  • Destroy all subscriptions for the client, calling proxy.trigger(:unsubscribe, client_id, channel) for each subscription
  • Delete the client, calling proxy.trigger(:disconnect, client_id) in the current process and proxy.trigger(:close, client_id) in all processes
  • Delete any other state associated with the client’s session

The strategy for cleaning up inactive clients is up to the engine, but the important point is it will not be explicitly initiated by the server; the engine must make sure this clean-up happens by itself and must trigger events so the user can monitor these timeouts.