|
|
Introduction
Messaging is a key part in distributed computing where communication should happen asynchronously. Typically, store and forward messaging is used with intermediate brokers to store and forward the messages. An event represents a state change in a software system. Processing such events can be used to derive new facts about the system. Topic-based publish and subscribe are used to send and receive events. Complex event processing techniques are used to derive more events from the original events.WSO2 Message Broker provides functionality to support both messaging and eventing. First, this article provides an overview of the component architecture. Second, it describes its two main features - Messaging and Eventing with relevant sample code [1].
Applies To
WSO2 MB | 1.0.0 |
Contents
Architecture Overview
The diagram above shows the composition of WSO2 MB components. It mainly consists of three main components called EventBroker, MessageBox and Qpid. These components provide an API for different types of clients. WS-Eventing[1] component implements the WS-Eventing specification so that clients can invoke it using WS-Eventing-compatible web service calls. Similarly, SQS component implements the SQS (Simple Queuing Service [2]) specification. As a result, any SQS client can use it. WSO2 MB is packaged with embedded Apache Qpid, which implements Advanced Message Queuing Protocol (AMQP). Therefore, any JMS client or an AMQP client can directly talk to the WSO2 MB as well. As shown in the diagram, both SQS and WS-Event implementations also use the embedded Qpid instance as the underling broker. This makes it possible to publish messages through a JMS API and receive from a WS-Event client and vice versa.
User authentication happens at different components. First, WS-Event service itself is a carbon admin service. Hence, authentication happens at the carbon authentication handler. For SQS, there is a special SQS authentication hander to handle the authentication. For direct Qpid server, invocations authentication happens at a Qpid authentication plug in. When EventBroker and MessageBox components invoke the Qpid instance, they use a special trust delegation technique based on a secret key shared between the Qpid instance and the other carbon components. First, at the startup time, the Qpid instance generates a key and shares with the other components. When these carbon components invoke Qpid for a particular user, it sets the password as the shared key. When authenticating, authentication plug-in checks for this special key and authenticates any users having that key as the password. Authorizing users to topics and queues happen at the Qpid Authorization plug-in. SQS specific authorization happens at the messagebox component.
The administrative console of the WSO2 MB allows users to assign role-based authorization to topics and user-based authorization to queues. Further it supports hierarchical roles and manages subscriptions, message boxes and queues. The following diagram shows a details of WSO2 MB components.
Messaging
Messaging is a key aspect of distributed computing. This enables sending messages between different applications asynchronously. One application can send the messages to the broker and other applications can receive messages from the broker. WSO2 MB supports two APIs for handling messages.JMS
This is a standard Java based API to send and receive messages. There are several ways to use JMS client API in order to send and receive messages. This article shows two such simple methods using QueueReceiver and QueueSender.Receiver
Properties initialContextProperties = new Properties(); initialContextProperties.put("java.naming.factory.initial", "org.apache.qpid.jndi.PropertiesFileInitialContextFactory"); String connectionString = "amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5672'"; initialContextProperties.put("connectionfactory.qpidConnectionfactory", connectionString); initialContextProperties.put("queue.myQueue", "myQueue"); try { InitialContext initialContext = new InitialContext(initialContextProperties); QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory) initialContext.lookup("qpidConnectionfactory"); QueueConnection queueConnection = queueConnectionFactory.createQueueConnection(); queueConnection.start(); QueueSession queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); // Receive message Queue queue = (Queue) initialContext.lookup("myQueue"); QueueReceiver queueReceiver = queueSession.createReceiver(queue); MessageListener messageListener = new MessageListener() { public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; System.out.println("Got the message ==> " + textMessage.getText()); synchronized (this) { this.notify(); } } catch (JMSException e) { e.printStackTrace(); } } }; queueReceiver.setMessageListener(messageListener); synchronized (messageListener){ try { messageListener.wait(); } catch (InterruptedException e) {} } queueReceiver.close(); queueSession.close(); queueConnection.stop(); queueConnection.close(); } catch (NamingException e) { e.printStackTrace(); } catch (JMSException e) { e.printStackTrace(); }First, it creates an initial context object with the required properties in order to look-up the objects. Qpid uses the AMQP. The connection string passed to client connection factory has the username, password, virtual host to connect and the server and port addresses. Then it creates an QueueConnection to communicate with the server. After that, it creates an auto acknowledged QueueSession which is used to create the QueueReceiver. The QueueReceiver listens to myTopic using a MessageListener. The receiver program waits on the MessageListener until it receives the first message.
Message Sender
Properties initialContextProperties = new Properties(); initialContextProperties.put("java.naming.factory.initial", "org.apache.qpid.jndi.PropertiesFileInitialContextFactory"); String connectionString = "amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5672'"; initialContextProperties.put("connectionfactory.qpidConnectionfactory", connectionString); initialContextProperties.put("queue.myQueue", "myQueue"); try { InitialContext initialContext = new InitialContext(initialContextProperties); QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory) initialContext.lookup("qpidConnectionfactory"); QueueConnection queueConnection = queueConnectionFactory.createQueueConnection(); queueConnection.start(); QueueSession queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); // first send three messages. TextMessage textMessage = queueSession.createTextMessage("My test message"); // Send message Queue queue = (Queue) initialContext.lookup("myQueue"); QueueSender queueSender = queueSession.createSender(queue); queueSender.send(textMessage); // Housekeeping queueSender.close(); queueSession.close(); queueConnection.stop(); queueConnection.close(); } catch (NamingException e) { e.printStackTrace(); } catch (JMSException e) { e.printStackTrace(); }Message sender also creates the QueueSession and creates a QueueSender to send messages. After that it sends a TextMessage to the broker.
SQS
SQS is a Web service which can be used to manage queues, send and receive messages from that. Unlike in a JMS queue, SQS Queue has a concept of visibility timeout in order to handle the network reliability issues. When a message is retrieved from an SQS queue, that message is not visible to other users (i.e. other users cannot retrieve the message) within the visibility timeout period. If the message is deleted within the visibility timeout period then the message is removed from the queue. If there is no delete request from the client within the visibility timeout period, then the message is put back to the queue so that everyone can access it. Following code shows how to access an SQS queue using relevant APIs.Obtaining the AcesskeyID and the SecretAccessKey
SQS uses a special symmetric key signature-based authentication mechanism. In order to access an SQS service, users should have accesskeyID and secretAccessKey. Users can obtain these keys either from the Administrator or by loging to WSO2 MB using administrative console themselves. The following code shows how to get those keys programatically using respective carbon admin service calls.System.setProperty("javax.net.ssl.trustStore", "/home/amila/projects/wso2mb-1.0.0/repository/resources/security/wso2carbon.jks"); System.setProperty("javax.net.ssl.trustStorePassword", "wso2carbon"); //first login to the server String servicesString = "https://localhost:9443/services/"; AuthenticationAdminServiceStub stub = new AuthenticationAdminServiceStub(servicesString + "AuthenticationAdmin"); stub._getServiceClient().getOptions().setManageSession(true); stub.login("admin", "admin", NetworkUtils.getLocalHostname()); ServiceContext serviceContext = stub._getServiceClient().getLastOperationContext().getServiceContext(); String sessionCookie = (String) serviceContext.getProperty(HTTPConstants.COOKIE_STRING); MessageBoxAdminServiceStub messageBoxAdminServiceStub = new MessageBoxAdminServiceStub(servicesString + "MessageBoxAdminService"); messageBoxAdminServiceStub._getServiceClient().getOptions().setManageSession(true); messageBoxAdminServiceStub._getServiceClient().getOptions().setProperty(HTTPConstants.COOKIE_STRING, sessionCookie); SQSKeys sqsKeys = messageBoxAdminServiceStub.getSQSKeys("admin"); accessKeyID = sqsKeys.getAccessKeyId(); secretAccessKey = sqsKeys.getSecretAccessKeyId();First, users have to authenticate to WSO2 MB using the AuthenticationAdminService. WSO2 Carbon keeps the authenticated user details in the httpSession object at the server. Therefore, we need to get the http cookie of this particular session. Once authenticated, the client can invoke the MessageBoxAdminService to obtain the required information. When invoking the MessageBoxAdminService it is required to use the same http cookie to use the authenticated session.
Accessing Queues
SQS queues can be accessed using a generated client for its services, namely, QueueService and MessageQueue. In order to properly authenticate, each and every request should have a soap header which is signed by using secret access key. Following method is used to add those authentication headers.OMFactory factory = OMAbstractFactory.getOMFactory(); OMNamespace awsNs = factory.createOMNamespace("http://security.amazonaws.com/doc/2007-01-01/", "aws"); OMElement accessKeyId = factory.createOMElement("AWSAccessKeyId", awsNs); accessKeyId.setText(accessKeyID); OMElement timestamp = factory.createOMElement("Timestamp", awsNs); timestamp.setText(new Date().toString()); OMElement signature = factory.createOMElement("Signature", awsNs); try { signature.setText(calculateRFC2104HMAC(action + timestamp.getText(), secretAccessKey)); } catch (SignatureException e) { } queueServiceStub._getServiceClient().removeHeaders(); queueServiceStub._getServiceClient().addHeader(accessKeyId); queueServiceStub._getServiceClient().addHeader(timestamp); queueServiceStub._getServiceClient().addHeader(signature);Following code explains how to handle SQS queues with the client API.
// first create a queue QueueServiceStub queueServiceStub = new QueueServiceStub("http://localhost:9763/services/QueueService"); CreateQueue createQueue = new CreateQueue(); createQueue.setQueueName(QUEUE_NAME); createQueue.setDefaultVisibilityTimeout(new BigInteger(DEFAULT_VISIBILITY_TIMEOUT)); // add security soap header for action CreateQueue addSoapHeader(queueServiceStub, "CreateQueue"); CreateQueueResponse createQueueResponse = queueServiceStub.createQueue(createQueue); String queueID = createQueueResponse.getCreateQueueResult().getQueueUrl().toString(); System.out.println("Queue created with URL ==>" + queueID); //send a message to the queue MessageQueueStub messageQueueStub = new MessageQueueStub(queueID); SendMessage sendMessage = new SendMessage(); sendMessage.setMessageBody("Test Send Message"); addSoapHeader(messageQueueStub, "SendMessage"); messageQueueStub.sendMessage(sendMessage); // receive the message back from the queue. ReceiveMessage receiveMessage = new ReceiveMessage(); receiveMessage.setMaxNumberOfMessages(new BigInteger(MAX_NUMBER_OF_MESSAGES)); receiveMessage.setVisibilityTimeout(new BigInteger("2000")); addSoapHeader(messageQueueStub, "ReceiveMessage"); ReceiveMessageResponse receiveMessageResponse = messageQueueStub.receiveMessage(receiveMessage); Message_type0[] message_type0s = receiveMessageResponse.getReceiveMessageResult().getMessage(); if (message_type0s != null) { for (Message_type0 message_type0 : message_type0s) { System.out.println("Received message ==> " + message_type0.getBody()); } } String[] receiptHandlers = new String[message_type0s.length]; for (int i = 0; i < message_type0s.length; i++) { receiptHandlers[i] = message_type0s[i].getReceiptHandle(); } // delete the message DeleteMessage deleteMessage = new DeleteMessage(); deleteMessage.setReceiptHandle(receiptHandlers); addSoapHeader(messageQueueStub, "DeleteMessage"); messageQueueStub.deleteMessage(deleteMessage); //finally deleting the queue DeleteQueue deleteQueue = new DeleteQueue(); addSoapHeader(messageQueueStub, "DeleteQueue"); messageQueueStub.deleteQueue(deleteQueue);First, it creates an SQS queue in the WSO2 MB. SQS queue always has a unique ID using which subsequent queue access can be done. Therefore, CreateQueueResponse message contains this ID for newly created queues. Then the client can use the MessageQueue service stub with the queue ID to perform other operations. After creating the client stub, Messages can be sent to queue using sendMessage operation. Similarly, messages can be retried using the receiveMessage operation. When retrieving the messages, the server returns a receipt handler for each message. When deleting the messages, this receipt handler should be sent with the delete request so that the server can relate the delete request to the message in the queue. Finally, it deletes the queue using the deleteQueue operation.
Eventing
Any operation that happens in a software system causes a change of state. These state changes can produce events. Details of the events can be communicated to other systems by sending the event details as a message. An intermediate broker can be used to publish these messages to any other system which has interest in these messages. WSO2 MB supports topic-based subscriptions and publishers for this requirement. As in the messaging case, both JMS and WS-Event clients are supported with WSO2 MB.JMS
There are several ways to subscribe to a topic and receive messages from a topic using JMS API. This article shows an example of using durable subscriptions using TopicReceiver and TopicPublisher.Subscriber
Properties initialContextProperties = new Properties(); initialContextProperties.put("java.naming.factory.initial", "org.apache.qpid.jndi.PropertiesFileInitialContextFactory"); String connectionString = "amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5672'"; initialContextProperties.put("connectionfactory.qpidConnectionfactory", connectionString); initialContextProperties.put("topic.myTopic", "myTopic"); try { InitialContext initialContext = new InitialContext(initialContextProperties); TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) initialContext.lookup("qpidConnectionfactory"); TopicConnection topicConnection = topicConnectionFactory.createTopicConnection(); topicConnection.start(); TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = (Topic) initialContext.lookup("myTopic"); TopicSubscriber topicSubscriber = topicSession.createDurableSubscriber(topic, "mySubscription"); MessageListener messageListener = new MessageListener() { public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; System.out.println("Got the message ==> " + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }; topicSubscriber.setMessageListener(messageListener); try { Thread.sleep(1000); } catch (InterruptedException e) {} topicSubscriber.close(); topicSession.close(); topicConnection.stop(); topicConnection.close(); } catch (NamingException e) { e.printStackTrace(); } catch (JMSException e) { e.printStackTrace(); }As in the earlier Messaging sample, first it creates an initial context and gets the TopicConnectionFactory from that. Then it creates a TopicSession and creates a durable subscription using that. Apache Qpid handles subscriptions also using queues at the server. When creating a durable subscription, it creates a durable queue to receive the published messages for that subscription. Therefore the subscription can receive the messages which are received when the subscription client is not connected to the server as well.
Publishers
Properties initialContextProperties = new Properties(); initialContextProperties.put("java.naming.factory.initial", "org.apache.qpid.jndi.PropertiesFileInitialContextFactory"); String connectionString = "amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5672'"; initialContextProperties.put("connectionfactory.qpidConnectionfactory", connectionString); initialContextProperties.put("topic.myTopic", "myTopic"); try { InitialContext initialContext = new InitialContext(initialContextProperties); TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) initialContext.lookup("qpidConnectionfactory"); TopicConnection topicConnection = topicConnectionFactory.createTopicConnection(); topicConnection.start(); TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = (Topic) initialContext.lookup("myTopic"); TopicPublisher topicPublisher = topicSession.createPublisher(topic); TextMessage textMessage = topicSession.createTextMessage("Test Message"); topicPublisher.publish(textMessage); topicPublisher.close(); topicSession.close(); topicConnection.stop(); topicConnection.close(); } catch (NamingException e) { e.printStackTrace(); } catch (JMSException e) { e.printStackTrace(); }Publisher also follows the same steps and creates a TopicPublisher to publish messages to a topic.
WS-Eventing
WS-Eventing defines a web service API for an Event source. Users can subscribe to this event source by providing an event sink URL. When an event occurs in the event source, it sends the event to the event sink URL. WS-Eventing does not provide an API to publish messages. But WSO2 MB creates its own "publish API" in order to make it work as an event broker. Further, WSO2 MB comes with the broker client interface which can be used to invoke the EventBroker service. Following code shows how to use broker client API to communicate with the WSO2 MB.Subscriber
private AxisServer axisServer; private BrokerClient brokerClient; public void start() { try { // setting the keystore to access using https System.setProperty("javax.net.ssl.trustStore", "/home/amila/projects/branch/carbon/3.2.0/products/mb/1.0.0/modules/distribution/product/target/wso2mb-1.0.0/repository/resources/security/wso2carbon.jks"); System.setProperty("javax.net.ssl.trustStorePassword", "wso2carbon"); // start a simple axis server and deploy a service to receive the message. this.axisServer = new AxisServer(); this.axisServer.deployService(EventSinkService.class.getName()); // give time to start the simple http server try { Thread.sleep(2000); } catch (InterruptedException e) { } } catch (AxisFault axisFault) { System.out.println("Can not start the server"); } } public String subscribe() { // subscribes using the sample axis2 server created try { // initiate the broker client. this will authenticate with the back end service this.brokerClient = new BrokerClient("https://localhost:9443/services/EventBrokerService", "admin", "admin"); String subscriptionID = this.brokerClient.subscribe("myTopic", "http://localhost:6060/axis2/services/EventSinkService/receive"); return subscriptionID; } catch (BrokerClientException e) { e.printStackTrace(); } catch (AxisFault axisFault) { axisFault.printStackTrace(); } catch (AuthenticationExceptionException e) { e.printStackTrace(); } return null; } public void unsubscribe(String subscriptionID) { try { this.brokerClient.unsubscribe(subscriptionID); } catch (RemoteException e) { e.printStackTrace(); } } public void stop() { try { this.axisServer.stop(); } catch (AxisFault axisFault) { axisFault.printStackTrace(); } } public static void main(String[] args) { Subscriber subscriber = new Subscriber(); subscriber.start(); String subscriptionId = subscriber.subscribe(); try { Thread.sleep(10000); } catch (InterruptedException e) { } subscriber.unsubscribe(subscriptionId); subscriber.stop(); }First subscriber creates an Axis2 service to receive the messages. Then use that service as the event sink URL. After waiting some time to receive messages, it unsubscribes and stops the Axis2 server.
Publisher
private BrokerClient brokerClient; public Publisher() { try { // setting the keystore to use with the https System.setProperty("javax.net.ssl.trustStore", "/home/amila/projects/branch/carbon/3.2.0/products/mb/1.0.0/modules/distribution/product/target/wso2mb-1.0.0/repository/resources/security/wso2carbon.jks"); System.setProperty("javax.net.ssl.trustStorePassword", "wso2carbon"); // creating a broker client instance this first authenticate with the back end this.brokerClient = new BrokerClient("https://localhost:9443/services/EventBrokerService", "admin", "admin"); } catch (AuthenticationExceptionException e) { e.printStackTrace(); } catch (AxisFault axisFault) { axisFault.printStackTrace(); } } public void publish() { try { // publish the message to the topic this.brokerClient.publish("myTopic", getOMElementToSend()); } catch (AxisFault axisFault) { axisFault.printStackTrace(); } } /** * generates a sample payload according to the EventSinkService * @return - sample payload */ private OMElement getOMElementToSend() { OMFactory omFactory = OMAbstractFactory.getOMFactory(); OMNamespace omNamespace = omFactory.createOMNamespace("http://wsevent.eventing.sample", "ns1"); OMElement receiveElement = omFactory.createOMElement("receive", omNamespace); OMElement messageElement = omFactory.createOMElement("message", omNamespace); messageElement.setText("Test publish message"); receiveElement.addChild(messageElement); return receiveElement; } public static void main(String[] args) { Publisher publisher = new Publisher(); publisher.publish(); }The Publisher publishers a message in the expected form of the EventSink service.
Conclusion
Messaging and Eventing play an important role in distributed computing. WSO2 MB, which address this space of the WSO2 carbon platform, provides support for different APIs, namely JMS, SQS and WS-Eventing to suit for different purposes. This article describes how to use each and every API in different types of clients.References
[1]http://schemas.xmlsoap.org/ws/2004/08/eventing/[2]http://aws.amazon.com/archives/Amazon-SQS/2317
Author
Amila Suriarachchi, Software Architect, WSO2 Inc.Attachment | Size |
---|---|
samples.zip | 15.29 KB |
Library Categories
Featured
Popular Content
- Developing Web Services Using Apache Axis2 Eclipse Plugins - Part 1(427367 reads)
- How to Embed an Axis2 based Web Service in your Webapp? (96100 reads)
- Hello World with Apache Axis2 (94541 reads)
- Web Services Security with Apache Rampart – Part 1 ( Transport Level Security ) (84203 reads)
- Downloading a Binary File from a Web Service using Axis2 and SOAP with Attachments (83732 reads)
- Developing Web Services Using Apache Axis2 Eclipse Plugins - Part 2(79350 reads)
- Writing Your Own services.xml for Axis2 Web Services (74449 reads)
- Reference Guide to Apache Axis2 Client API Parameters (68799 reads)
Learn Cloud
The WSO2 Application Server is a reliable application server that can host your enterprise web applications. The WSO2 Application Server as a Service is offered in StratosLive, the WSO2 Platform as a Service. This article explains how a simple web application can be developed and deployed from Carbon Studio to the WSO2 Application Server...
Latest Webinar
Await!
Tags
No comments:
Post a Comment