Friday, June 5, 2009

Scala Actor + Message Driven Bean (MDB) using JMS

 

What I wanted to do is to integrate Scala Actor and MDBs. The reason to do so is because Scala actors, as good as they are, are :

1) Not transactional

2) Cannot send message through  a network

This leads me to investigate what Java EE has to offer: Java Messaging Service.

Now in many implementation of JMS that I see, it will always end up in some Message Driven Bean (an EJB). So that is what I’m going to do. An actor will send a message in an MDB and the MDB replies back.

To use JMS with Scala actors, we need to use Spring. Spring offers a utility to create Message Driven Pojos (meaning, any Java object can be transformed into a message receiving object) Since Scala actors are actually Java objects on steroid, I figure that we can use Spring to equip plain old Scala actors with message (as in JMS message) receiving capability. This receiving actor will be called a “wormhole actor” as it bridge the JMS world and the Scala world.

OK, let us start: First we need to create a Spring xml file. This file contains several beans:

This bean helps us to find the JNDI engine of our Java EE server where the MDB resides.

    <bean id="myJndiTemplate" class="org.springframework.jndi.JndiTemplate" scope="singleton">
        <property name="environment">
            <map>
                <entry key="java.naming.factory.initial" value="com.sun.appserv.naming.S1ASCtxFactory"/>
                <entry key="java.naming.provider.url" value="iiop://127.0.0.1:3700"/>
            </map>
        </property>
    </bean>

This bean defines the queue factory

    <bean id="jmsQueueFactory"
               class="org.springframework.jndi.JndiObjectFactoryBean"
                abstract="false" lazy-init="default" autowire="default"
                dependency-check="default">
        <property name="jndiName">
            <value>jms/QueueFactory</value>
        </property>
        <property name="resourceRef">
            <value>false</value>
        </property>
        <property name="jndiTemplate" ref="myJndiTemplate"/>
    </bean>

These beans define the request and the response queue respectively:

    <bean id="jmsRequestQueue"
                class="org.springframework.jndi.JndiObjectFactoryBean"
                abstract="false" lazy-init="default" autowire="default"
                dependency-check="default">
        <property name="jndiName">
            <value>jms/RequestQueue</value>
        </property>
        <property name="resourceRef">
            <value>false</value>
        </property>
        <property name="jndiTemplate" ref="myJndiTemplate"/>
    </bean>

     <bean id="jmsResponseQueue"
                class="org.springframework.jndi.JndiObjectFactoryBean"
                abstract="false" lazy-init="default" autowire="default"
                dependency-check="default">
        <property name="jndiName">
            <value>jms/ResponseQueue</value>
        </property>
        <property name="resourceRef">
            <value>false</value>
        </property>
        <property name="jndiTemplate" ref="myJndiTemplate"/>
    </bean>

We then customise Spring’s JmsTemplate with the beans configured above. Note that we will use JmsTemplate to send JMS messages later:   

<bean id="jmsTemplate"
        class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="jmsQueueFactory"  />
        <property name="defaultDestination" ref="jmsRequestQueue"/>
    </bean>

Last but not least, we create a Message Driven Pojo which is the WormholeActor. Note that the WormholeActor is listening to the reponse queue.


    <bean id="messageListener" class="org.azrul.osconf.jmsActors.WormholeActor" />

    <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="jmsQueueFactory"/>
        <property name="destination" ref="jmsResponseQueue"/>
        <property name="messageListener" ref="messageListener" />
    </bean>

Next, we create a WormholeActor class in Scala

class WormholeActor extends Actor with MessageListener{
    def act:Unit = {
        loop
        {
            react
            {
                case msg:String =>
                    val context  = new ClassPathXmlApplicationContext(Array[String]("/org/azrul/osconf/jmsActors/client-context.xml"));
                    val jmsTemplate = context.getBean("jmsTemplate").asInstanceOf[JmsTemplate];

                    jmsTemplate.send(new MessageCreator() {
                            def createMessage(session:Session):Message={
                                val message =  session.createMapMessage;
                                message.setString("MESSAGE", msg);
                                return message;
                            }
                        });
            }
        }
    }

    def onMessage(message:Message):Unit={
        val responseMsg = message.asInstanceOf[MapMessage];
        println("-------From Listener------");
        println(responseMsg.getString("MESSAGE"));
    }
}

Basically it has two primary functions: act that reacts to Scala actor messages and onMessage that reacts to JMS messages. Note that when we receive a Scala message, we automatically send it to the JMS world (via our request queue created through our Spring config.)

Here is the MDB:

@MessageDriven(mappedName = "jms/RequestQueue", activationConfig = {
    @ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"),
    @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue")
})
public class MyMDBQueueBean implements MessageListener {

    public MyMDBQueueBean() {
    }

    public void onMessage(Message message) {
        try {
            MapMessage requestMsg = (MapMessage) message;
            System.out.println("-------From MDB------");
            System.out.println(requestMsg.getString("MESSAGE"));
            System.out.println("----------------------");

            Context ctx = new InitialContext();
            ConnectionFactory connectionFactory = (ConnectionFactory) ctx.lookup("jms/QueueFactory");
            Queue queue = (Queue) ctx.lookup("jms/ResponseQueue");
            javax.jms.Connection connection = connectionFactory.createConnection();
            javax.jms.Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            MessageProducer producer = session.createProducer(queue);
            MapMessage responseMsg = session.createMapMessage();

            responseMsg.setString("MESSAGE","READ:"+requestMsg.getString("MESSAGE"));
            producer.send(responseMsg);

        } catch (NamingException ex) {
            Logger.getLogger(MyMDBQueueBean.class.getName()).log(Level.SEVERE, null, ex);
        } catch (JMSException ex) {
            Logger.getLogger(MyMDBQueueBean.class.getName()).log(Level.SEVERE, null, ex);
        }
    }
}

The MDB listens to the request queue and forward any message to the response queue. Of course in your application, you may want to do some business logic between request and response. In our case, we just do this:

 

System.out.println("-------From MDB------");
System.out.println(requestMsg.getString("MESSAGE"));
System.out.println("----------------------");

 

To bootstrap it all, we need a Scala main function that will send a message to the wormhole actor:

object Main {

main(args : Array[String]) : Unit =
    {
        var wormholeActor = new WormholeActor;
        wormholeActor.start
        wormholeActor ! "I’m John Connor and if you’re listening to this you are the RESISTANCE!"

    }
}

Next, just run the MDB and run the Main object.

Note:Use Netbeans to easily create an MDB . Follow this tutorial under the topic Creating the NewMessage Message-Driven Bean

Complete source code for Actor (password: qwerty)

Dependencies:

No comments: