Tuesday, July 22, 2008

JMS JRuby Producer and Consumer

So , after scouring the internet trying to find 1 consolidated location that demonstrated an easy way to use JMS inside of JRuby , I finally used some internet examples and some research on the SUN JMS API website to get this working.  The biggest thing I had to figure out was how to properly structure the message from the message producer.  This is a basic setup using activemq 5.1 , jruby 1.1.2 and some basic ruby skills.  Here is what I did.

 1 - fire up out of the box activemq installation.  Literally that easy, just run bin/activemq & 
 2 - copy activemq-all-5.1.0.jar from activemq installation directory to the same directory as your JRuby scripts
 3 - create the 2 scripts below , consumer.rb and producer.rb and you are off and running.  For the Rails enthusiasts , I can imagine a very nice replacement for ActiveMessaging where you create a simple JMS library that will do an async post of messages to the queue, very nice indeed.
4 - Run each script in a separate window (jruby consumer.rb  AND jruby producer.rb).  Producer.rb will simply give you a ">" prompt to type some text to demonstrate the concept


Here is the code I ended up with:


consumer.rb
------------------------
require "java"
require "activemq-all-5.1.0.jar"

include_class "org.apache.activemq.ActiveMQConnectionFactory"
include_class "org.apache.activemq.util.ByteSequence"
include_class "org.apache.activemq.command.ActiveMQBytesMessage"
include_class "javax.jms.MessageListener"
include_class "javax.jms.Session"

class MessageHandler
include javax.jms.Session
include javax.jms.MessageListener

def onMessage(serialized_message)
message_body = serialized_message.get_content.get_data.inject("") { |body, byte| body << byte }
puts message_body
end

def run
factory = ActiveMQConnectionFactory.new("tcp://localhost:61616")
connection = factory.create_connection();
session = connection.create_session(false, Session::AUTO_ACKNOWLEDGE);
queue = session.create_queue("test1-queue");

consumer = session.create_consumer(queue);
consumer.set_message_listener(self);

connection.start();
puts "Listening..."
end
end

handler = MessageHandler.new
handler.run

producer.rb
--------------
require "java"
require "activemq-all-5.1.0.jar"
require 'readline'

include_class "org.apache.activemq.ActiveMQConnectionFactory"
include_class "org.apache.activemq.util.ByteSequence"
include_class "org.apache.activemq.command.ActiveMQBytesMessage"
include_class "javax.jms.MessageListener"
include_class "javax.jms.Session"

class MessageHandler
include javax.jms.Session
include javax.jms.MessageListener

def initialize
factory = ActiveMQConnectionFactory.new("tcp://localhost:61616")
connection = factory.create_connection();
@session = connection.create_session(false, Session::AUTO_ACKNOWLEDGE);
queue = @session.create_queue("test1-queue");

@producer = @session.create_producer(queue);
end

def send_message(line)
puts "received input of #{line}"
m = @session.createTextMessage() ;
m.set_text(line)
@producer.send(m)
end

end

handler = MessageHandler.new
loop do
line = Readline::readline('> ', true)
handler.send_message(line)
end

Friday, July 11, 2008

User based memcached namespaces

So i ran into this interesting caching architecture problem where I needed a caching model such that each user had their own cache key, but one record update would invalidate every users' cache entries.  Since memcache does not support wildcards, and looping through cache keys is about as inefficient as it gets, after some memcached FAQ reading and some thought, I concluded the following cache model which is so simple, but is certainly very valuable.

Say you have a user who has many results, in a one to many relationship and can only see the results he/she has permission for.  This means every user can potentially have a different result set, hence a different cache value so 1 key per user.  We can very easily create a cache key called userid_results where userid is the unique ID of the user making the request.  Works well....until you add a new result.  If we have 1000 users, how do we invalidate all of those cache keys?  The following mechanism solves that issue:

  1. Create a version cache key named "results_version".  This cache key will simply hold the current version of the results table.  Whenever we insert a new record we increment the version.
  2. Create  a cache key that uses this version.  So in your code you will have to first fetch the version key (memcache->get(results_version)). Then use this value to generate the user cache key like user_results_results_version which would end up something like user_results_1003.  
  3. When a new record is inserted we increment the results_version key.  This sets it to 1004 in our example
  4. The next request for content will try to access user_results_1004, but this cache key does not exist yet so we will force a cache_miss, hit our DB or content source, then cache the result.

     Adam