Implementing Pub/Sub based on AWS technologies

AWS provides two services – Amazon Simple Notification Service and Amazon Simple Queue Service -that combined can be used as the foundation of a complete pub/sub service

Existing AWS capabilities

Amazon Simple Notification Service  (Amazon SNS) is a web service that enables applications, end-users, and devices to instantly send and receive notifications from the cloud. A very simplified SNS architecture looks like the following: (Figure 1):

(Click here to enlarge)

Figure 1 Amazon SNS based architecture

Here multiple publishing applications can talk to multiple subscribing applications using the SNS topic as an intermediary. The advantage of this implementation is that publishers and subscribers do not need to know about each other and, as a result, SNS supports fully dynamic integration between applications. SNS supports multiple transports for notification delivery, including HTTP, HTTPs, Email, SMS and Amazon Simple Queue.

Amazon Simple Queuing Service (Amazon SQS) offers reliable and scalable hosted queues for storing messages as they travel between computers. By using Amazon SQS, you can move data between distributed components of your applications that perform different tasks without losing messages or requiring each component to be always available. Combining SQS with SNS offers two additional advantages not provided by SNS alone – removing temporal coupling and supporting symmetric load balancing of specific consuming applications.  The latter can be done by having multiple instances of the same application read messages from the same queue. The overall architecture for combined SNS/SQS implementation is presented below (Figure 2). Here we show one of the subscribing applications as load balanced.

(Click here to enlarge)

Figure 2 Combining SNS with SQS

The main drawback of this implementation is that both publishers and subscribers need to explicitly agree on the names of SNS topics. Additionally, if a given consumer wants to get information from multiple topics, he needs to register his queue to the multiple topics.

Desired Pub/Sub implementation

A typical solution for this problem is a tree-based topic organization implemented by the majority of pub/sub engines. The main principles of such an organization are outlined at OASIS specification Web Services Topics 1.3.

This specification defines topics

“… as a way to organize and categorize a set of Notifications. The Topics mechanism provides a convenient means by which subscribers can reason about Notifications of interest… a Publisher may associate [publication] with one or more Topics. When a Subscriber creates a Subscription, it may supply a Topic filter expression, associating the Subscription with one or more Topics… Each Topic… can have zero or more child Topics, and a child Topic can itself contain further child Topics. A Topic without aparent is termed a root Topic. A particular root Topic and all its descendants form a hierarchy (termed a Topic Tree).”

An example of the topic tree for cell phone sales is presented below (Figure 3).

Figure 3 Example Topic tree

Here a topic tree root represents sales. Sales can be broken down by regions (North America, Europe and Asia Pacific, in our example). Sales for a given region can be further broken down by the phone type, and so on.

The reason why such a structure is important in a pub/sub system is the fact that a tree reflects an organization of data. If a consumer is interested in events about smartphone sales in North America, he is listening on this specific topic. If he is interested in events about all sales in North America, he is listening on the topic North America with the assumption that he will get all of the notifications from the child topics.

Of course, this approach does not solve all the problems. For example, if a consumer wants to listen to the events about all smartphone sales, he needs to explicitly subscribe to smartphone sale events for all regions. This situation is typically reflected in the appropriate topic tree design. A tree is designed based on the information organization and typical usage patterns. In some cases, multiple topics are design to satisfy different internal needs (see topics namespaces in Web Services Topics 1.3). Another important feature of modern pub/sub architectures is content-based message filtering:

“In a content-based system, messages are only delivered to a subscriber if the attributes or content of those messages match constraints defined by the subscriber. The subscriber is responsible for classifying the messages.”

In other words, in this case, subscribers can explicitly specify the content of the messages that they are interested in, using a list of regular expressions.

The combination of this filtering and a hierarchical topic structure allows for the creation of very flexible and powerful pub/sub implementations.

In this article we will show how this type of system can be easily built using AWS components.

Proposed Pub/Sub architecture

The proposed architecture is presented in the picture below (Figure 4). In this architecture Pub/Sub server is implemented as a web application running inside a Tomcat container. Here we are also leveraging AWS Elastic Load Balancer, which allows us to dynamically expand and contract the size of the Pub/Sub server cluster based on the current load. In addition, it uses the Relational Data Service to store the current configuration to enable dynamic creation of additional Pub/Sub instances. To improve the overall performance, we are trying to minimize the amount of database access by keeping the current topology in memory. This makes the actual message routing very fast. This solution does though require a mechanism to notify all the servers of any topology change (which due to load balancer usage can be processed on any server). This can be easily done using Amazon SNS. Finally, we are leveraging Amazon SQS for delivering notifications to the consumers. Note that a given consumer can be listening on multiple queues.

(Click here to enlarge)

Figure 4 Proposed overall Architecture

Pub/Sub Server

At the heart of this implementation is a custom Pub/Sub server. The server implementation comprises three main layers – persistence, domain and services.

Persistence

Server persistence is implemented using JPA 2.0 and defines 3 main entities – topic, subscription and semantic filter.

A topic entity (Listing 1) describes information stored about a specific topic, including topic id (internal id for the database), topic name (a string identifying topic), a Boolean variable (defining whether this topic is a root topic), a reference to topic parent and children (allowing the traversal of a topic hierarchy), and a list of subscriptions associated with a given topic.

@Entity
@NamedQueries({
    @NamedQuery(name="Topic.RootTopics",
                    query="SELECT t FROM Topic t where t.root='true'"),
    @NamedQuery(name="Topic.AllTopics",
	                     query="SELECT t FROM Topic t")
})
@Table(name = "Topic")
public class Topic {

	@Id @GeneratedValue(strategy=GenerationType.IDENTITY)
	private long id;    //Auto generated ID

	@Column(name = "name",nullable = false, length = 32)
        private String name;                          // Topic name

	@Column(name = "root",nullable = false)
	private Boolean root = false;           //root flag    

	@ManyToOne(fetch=FetchType.LAZY)
	@JoinColumn(name="TOPIC_ID")
	private Topic parent;

        @OneToMany(mappedBy="parent",cascade=CascadeType.ALL,orphanRemoval=true)
	private List<Topic> children;

	@OneToMany(mappedBy="topic",cascade=CascadeType.ALL,orphanRemoval=true)
	private List<Subscription> subscriptions;
        ………………………………………………………………………………………………

Listing 1 Topic Entity

We also define two named queries here that are used to access the topic – RootTopics gets a topic structure starting from the root and AllTopics gets all existing topics.

This entity provides a complete topic definition and can also support multiple topic trees (not part of the example implementation).

A subscription entity (Listing 2) describes information stored about a specific subscription, including subscription id (internal id for the database), name of the queue (SQS queue ARN) to which this subscription should be delivered, a reference to a topic that this subscription is associated with, and a list of semantic filters. Notification will be delivered to a given queue (client) only if all the filters accept the message (see below). If a notification does not contain semantic filters, all the messages from associated topics will be directly passed to the queue.

@Entity
@NamedQueries({
    @NamedQuery(name="Subscription.AllSubscriptions",
	                    query="SELECT s FROM Subscription s")
})
@Table(name = "Subscription")
public class Subscription {

       @Id @GeneratedValue(strategy=GenerationType.IDENTITY)
       private long id;    //Auto generated ID

       @Column(name = "queue",nullable = false, length = 128)
       private String queue;

       @ManyToOne(fetch=FetchType.LAZY)
       @JoinColumn(name="TOPIC_ID")
       private Topic topic;

       @OneToMany(mappedBy="subscription",
         	                        cascade=CascadeType.ALL,orphanRemoval=true)
       private List<SemanticFilter> filters;
       ……………………………………………………………………………………

Listing 2 Subscription Entity

We also define here a named query that is used for getting all existing subscriptions.

Finally, a semantic filter entity (Listing 3) describes information stored about a specific semantic filter, including the semantic filter id (internal id for the database), the name of the attribute that this semantic filter tests, a regular expression that is used, and a reference to a subscription that this semantic filter is associated with.

Entity
@NamedQueries({
    @NamedQuery(name="SemanticFilter.AllSemanticFilters",
	                    query="SELECT sf FROM SemanticFilter sf")
})
@Table(name = "Filter")
public class SemanticFilter {

       @Id @GeneratedValue(strategy=GenerationType.IDENTITY)
       private long id;    //Auto generated ID

       @Column(name = "attribute",nullable = false, length = 32)
       private String attribute;                     // attribute name

       @Column(name = "filter",nullable = false, length = 128)
       private String filter;                       // regex filter

       @ManyToOne(fetch=FetchType.LAZY)
       @JoinColumn(name="SUBSCRIPTION_ID")
       private Subscription subscription;
       ……………………………………………………………………

Listing 3 Semantic filter entity

We also define a named query here that is used for getting all existing semantic filters.

In addition to entities, the persistence layer contains a persistence manager class, responsible for:

Managing database access and transactions

Reading and writing objects to/from the database

Converting between domain objects (see below) and persistence entities

Sending notifications about topology changes

Domain Model

A responsibility of domain model objects is to support service operations including the subscription to and publication of data, as well as the actual publishing of the notifications to subscribers’ queues. Although, in our simple implementation, we could have merged the domain and persistence models, we chose to separate them. The data model for both layers is the same, but there are additional methods on domain objects that explicitly support Pub/Sub implementation.

Filter processing implementation (Listing 4) leverages Java String built in support for Regex processing.

	public boolean accept(String value){
	       if(value == null)
	              return false;
	       return value.matches(_pattern);
	}

Listing 4 Filter processing method

Publication implementation (Listing 5) is a method on the subscription class. Note that this method is implementing “ORing” of the semantic filters. This limitation can be overcome by having multiple subscriptions for a given client or extending the subscription implementation to support Boolean functions.

public void publish(Map<String, String> attributes, String message){

       if((_filters != null) && (_filters.size() > 0)){
	      for(DomainSemanticFilter f : _filters){
	            String av = attributes.get(f.getField());
	            if(av == null)
	                   return;
	            if(!f.accept(av))
	                   return;
	      }
        }
	SQSPublisher.getPublisher().sendMessage(_queue, message);
}

Listing 5 Publication implementation

This implementation leverages SQSPublisher class (Listing 6), which is based on the existing AWS Java APIs.

import java.io.IOException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.PropertiesCredentials;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.CreateQueueRequest;
import com.amazonaws.services.sqs.model.DeleteQueueRequest;
import com.amazonaws.services.sqs.model.SendMessageRequest;

public class SQSPublisher {

	 private static SQSPublisher _publisher;

	 private AmazonSQSClient _sqs;    

	 private SQSPublisher()throws IOException {
	           AWSCredentials credentials = new PropertiesCredentials(
	                             this.getClass().getClassLoader().getResourceAsStream("AwsCredentials.properties"));
	           _sqs = new AmazonSQSClient(credentials);
	 }

	 public String createQueue(String name){
	          CreateQueueRequest request = new CreateQueueRequest(name);
	          return _sqs.createQueue(request).getQueueUrl();
	 }

	 public void sendMessage(String queueURL, String message){
	          SendMessageRequest request = new SendMessageRequest(queueURL, message);
	          _sqs.sendMessage(request);

	 }

         public void deleteQueue(String queueURL){
	          DeleteQueueRequest request = new DeleteQueueRequest(queueURL);
	          _sqs.deleteQueue(request);
	 }

	 public static synchronized SQSPublisher getPublisher(){
	          if(_publisher == null)
	                   try {
	                              _publisher = new SQSPublisher();
	                   }catch (IOException e) {
	                            e.printStackTrace();
	                   }
	          return _publisher;
	 }

}

Listing 6 SQS publisher

Additional methods on this class can be used by a subscriber to create/destroy SQS queues.

In addition to SQS queues, our implementation also leverages SNS for synchronizing database changes. SNS interactions are implemented by SNSPubSub class (Listing 7), also leveraging AWS SNS Java APIs for its implementation.

import java.io.IOException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.PropertiesCredentials;
import com.amazonaws.services.sns.AmazonSNSClient;
import com.amazonaws.services.sns.model.PublishRequest;
import com.amazonaws.services.sns.model.SubscribeRequest;
import com.amazonaws.services.sns.model.SubscribeResult;
import com.amazonaws.services.sns.model.UnsubscribeRequest;

public class SNSPubSub {

	  private static SNSPubSub _topicPublisher;
	  private static String _topicARN;
	  private static String _endpoint;

	  private AmazonSNSClient _sns;
	  private String _protocol = "http";
	  private String _subscriptionARN;

	  private SNSPubSub()throws IOException {
	            AWSCredentials credentials = new PropertiesCredentials(
	                             this.getClass().getClassLoader().getResourceAsStream("AwsCredentials.properties"));
	            _sns = new AmazonSNSClient(credentials);
	  }

	  public void publish(String message){
	            PublishRequest request = new PublishRequest(_topicARN, message);
	            _sns.publish(request);
	  }

	  public void subscribe(){
	            SubscribeRequest request = new SubscribeRequest(_topicARN, _protocol, _endpoint);
	            _sns.subscribe(request);
	  }

	  public void confirmSubscription(String token){
	            ConfirmSubscriptionRequest request = new ConfirmSubscriptionRequest(_topicARN, token);
	            ConfirmSubscriptionResult result = _sns.confirmSubscription(request);
	            _subscriptionARN = result.getSubscriptionArn();
	  }

	  public void unSubscribe(){
	            if(_subscribed){
	                      UnsubscribeRequest request = new UnsubscribeRequest(_subscriptionARN);
	                      _sns.unsubscribe(request);
	            }
	  }

	  public static void configureSNS(String topicARN, String endpoint){
	             _topicARN = topicARN;
	             _endpoint = endpoint;
	  }

	  public static synchronized SNSPubSub getSNS(){
	             if(_topicPublisher == null){
	                       try{
	                                 _topicPublisher = new SNSPubSub();
	                       }
	                       catch(Exception e){
	                                e.printStackTrace();
	                       }
	             }
	             return _topicPublisher;
	  }
}

Listing 7 SNS Pub/Sub

Using SNSWhen using SNS it is important to remember that subscribing to the topic does not mean that you are ready to listen on the topic. SNS subscription is a two-step process. When you send a subscribe request to SNS, it will return a reply saying that it is necessary to confirm the subscription. That is why Listing 8 has both subscribe and confirmSubscription methods. A single schema:

       <xsd:complexType name="NotificationType">
	       <xsd:sequence>
			<xsd:element name="Type" type="xsd:string" />
		        <xsd:element name="MessageId" type="xsd:string"/>
		        <xsd:element name="Token" type="xsd:string" minOccurs="0"/>
		        <xsd:element name="TopicArn" type="xsd:string"/>
			<xsd:element name="Message" type="xsd:string"/>
			<xsd:element name="SubscribeURL" type="xsd:string" minOccurs="0"/>
			<xsd:element name="Timestamp" type="xsd:string" />
			<xsd:element name="SignatureVersion" type="xsd:string" />
			<xsd:element name="Signature" type="xsd:string" />
			<xsd:element name="SigningCertURL" type="xsd:string" />
			<xsd:element name="UnsubscribeURL" type="xsd:string" minOccurs="0"/>
		</xsd:sequence>
	</xsd:complexType>

describes both message types – confirm request and actual notification. The two can be distinguished by examining the Type element. If the element value is “SubscriptionConfirmation” then it is a request for subscription confirmation, if it is “Notification” it is an actual notification.

Topic class implements two methods (Listing 8) to support publications.

public void publish(Map<String, String> attributes, String message){

	  if(_subscriptions == null)
	            return;
	  for(DomainSubscription ds : _subscriptions)
	            ds.publish(attributes, message);
}

public void processPublications(List<DomainTopic> tList, StringTokenizer st) throws PublicationException{

	 tList.add(this);
	 if(!st.hasMoreTokens())
	            return;
	 String topic = st.nextToken();
	 for(DomainTopic dt : _children){
	         if(topic.equalsIgnoreCase(dt.getName())){
	                   dt.processPublications(tList, st);
	                   return;
	         }
	 }
	 throw new PublicationException("Subtopic " + topic + " is not found in topic " + _name);
}

Listing 8 Topic publishing support

The processPublications method creates a list of topics that are associated with a given message. It does this by taking a tokenized topic tree string and adding itself on to the list if a token corresponds to its name. A topic publish method takes a map of message attributes and, for every subscription associated with this topic, it tries to publish a message.

The above methods are brought together by the publish method in the Domain manager class (Listing 9). This method first tokenizes topic string and then (using the processPublications method) builds a list of topics to whose subscribers this message is of interest. Once the list is created, it builds a map of message attributes (we assume an XML message) and publishes it to every topic in the list.

     public void publish (String topic, String message){
	      StringTokenizer st = new StringTokenizer(topic, ".");
	      List<DomainTopic> topics = new LinkedList<DomainTopic>();
	      DomainTopic root = PersistenceManager.getPersistenceManager().getRoot();
	      try {
	               if(!st.hasMoreTokens())
	                          return;
	               String t = st.nextToken();
	               if(!t.equalsIgnoreCase(root.getName()))
	                                  throw new PublicationException("Unrecognized subtopic name " + topic);
	               root.processPublications(topics, st);
	      }catch (PublicationException e) {
	               e.printStackTrace();
	               return;
	      }
	      MessageType msg = null;
	      try {
	               JAXBElement<MessageType> msgEl = (JAXBElement<MessageType>)
                      _        unmarshaller.unmarshal(new ByteArrayInputStream(message.getBytes()));
	               msg = msgEl.getValue();
	      } catch (JAXBException e) {
	               e.printStackTrace();
	               return;
	      }
	      Map<String, String> attributes = new HashMap<String, String>();
	      MessageEnvelopeType envelope = msg.getEnvelope();
	      if(envelope != null){
	               for(MessageAttributeType attribute : envelope.getAttribute()){
	                       attributes.put(attribute.getName(), attribute.getValue());
	               }
	      }
	      for(DomainTopic t : topics)
	               t.publish(attributes, message);
     }

Listing 9 Publish method implementation

Service Model

Access to the Pub/Sub functionality is implemented using a set of REST services (Listing 10).

@Path("/")
public class PubSubServiceImplementation {

	  // Functional methods
	  @POST
	  @Path("publish")
	  @Consumes("application/text")
	  public void publish (@QueryParam("topic")String topic, String message) throws PublicationException{
	           DomainManager.getDomainManager().publish(topic, message);
	  }

	  @GET
	  @Path("publish")
	  public void publishGet (@QueryParam("topic")String topic, @QueryParam("message")String message)  throws
PublicationException{
	           DomainManager.getDomainManager().publish(topic, message);
	  }

	  @POST
	  @Path("synch")
	  @Consumes("text/plain")
	  public void getSynchNotification (Object message){
	           PersistenceManager.setUpdated();
	  }

	  // Configuration methods

	  @GET
	  @Path("root")
	  @Produces("application/json")
	  public TopicType getRoot()throws PublicationException {
	           return DomainManager.getDomainManager().getRoot();
	  }

	  @GET
	  @Path("filters")
	  @Produces("application/json")
	  public FiltersType getFilters() throws PublicationException {
	           return DomainManager.getDomainManager().getFilters();
	  }

	  @POST
	  @Path("filter")
	  @Consumes("application/json")
	  public long addFilter(FilterType filter) throws PublicationException {
	           return DomainManager.getDomainManager().addFilter(filter);
	  }

	  @DELETE
	  @Path("filter/{id}")
	  public void deleteFilter(@PathParam("id")long id) throws PublicationException {
	           DomainManager.getDomainManager().removeFilter(id);
	  }

	  @GET
	  @Path("subscriptions")
	  @Produces("application/json")
	  public SubscriptionsType getSubscriptions() throws PublicationException {
	           return DomainManager.getDomainManager().getSubscriptions();
	  }

	  @POST
	  @Path("subscription")
	  @Consumes("application/json")
	  public long addSubscription(SubscriptionType s) throws PublicationException {
	           return DomainManager.getDomainManager().addSubscription(s, null);
	  }

	  @DELETE
	  @Path("subscription/{id}")
	  public void deleteSubscription(@PathParam("id")long id) throws PublicationException {
	           DomainManager.getDomainManager().removeSubscription(id);
	  }

	  @POST
	  @Path("subscriptionFilters/{sid}")
	  @Consumes("application/json")
	  public long assignFilersToSubscription(@PathParam("sid")long sid, IDsType ids)throws PublicationException{
	           return DomainManager.getDomainManager().assignFilersToSubscription(sid, ids);

	  }    

	  @POST
	  @Path("topic")
	  @Consumes("application/json")
	  public long addTopic(TopicType t) throws PublicationException {
	           return DomainManager.getDomainManager().addTopic(t, null);
	  }

	  @DELETE
	  @Path("topic/{id}")
	  public void deleteTopic(@PathParam("id")long id) throws PublicationException {
	           DomainManager.getDomainManager().removeTopic(id);
	  }

	  @POST
	  @Path("topicsubscription/{tid}")
	  @Consumes("application/json")
	  public void assignTopicHierarchy(@PathParam("tid")long tid, IDsType ids) throws PublicationException{
	           DomainManager.getDomainManager().assignTopicHierarchy(tid, ids);
	  }

	  @POST
	  @Path("topicsubscription/{tid}")
	  @Consumes("application/json")
	  public long assignTopicSubscriptions(@PathParam("tid")long tid, IDsType ids)throws PublicationException{
	           return DomainManager.getDomainManager().assignTopicSubscriptions(tid, ids);
	  }

Listing 10 Pub/Sub Services

These services are used by the message publisher (publish methods), service subscriber (create/delete semantic filters and subscriptions, as well as associating filters to subscriptions and subscriptions to topics), Pub/Sub implementation internally (get synch service) and admin applications.

Conclusion

The simple implementation presented here builds a very powerful, extensible, pub/sub implementation, whilst leveraging a lot of existing AWS capabilities with a minimal amount of custom Java code. It also fully takes advantage of existing AWS deployment support for load balancing and failover.

About the Author

Dr Boris Lublinsky is a Principal Architect at Nokia, where he is actively participating in Big Data, SOA, BPM and middleware implementations. Prior to this Boris was a Principal Architect at Herzum Software, where he was designing large-scale SOA systems for the clients, an Enterprise Architect at CNA Insurance where he was involved in design and implementation of CNA’s integration and SOA strategy, building application frameworks and implementing service-oriented architectures, Boris has over 25 years experience in enterprise and technical architecture and software engineering. He is an active member of OASIS SOA RM committee, coauthor of Applied SOA: Service-Oriented Architecture and Design Strategies book and author of numerous articles on Architecture, programming, Big Data, SOA and BPM.

(via InfoQ.com)