Sunday, December 27, 2015

Send/Receive messages using AMQP in Java in Azure Event Hub

Steps:
1. Create Event Hub
2. Configure Event Hub with Send/Receive policy (this step will provide the keys to connect)
3. Encode the key
4. Ensure to use proper partition
5. Ensure to use the proper Consumer group in the Receiver.
6. AMQP libraries
7. Sender code
8. Receiver code

Create Event Hub:

Configure Event Hub with rules:

Encode the keys:
Collect the keys from EventHub configuration from the above steps.
            String myKey = "myKey=";
            URLEncoder.encode(myKey);

Ensure to use proper partition:
This is an important step. This is unlike other AMQP implementation. Event hub creates partitions for parallel processing, ensure that your sender and receiver uses the correct partitions. The properties could be specified by properties file.
The partition information is highlighted in the servicebus.properties files below.
Tips:
1.       Partition while sending
Your connection to the Queue will go through.
2.       Partition while receiving
Your default connection will not go through and you will receive error.
Exception in thread "main" javax.jms.JMSException: Invalid EventHub address. It must be either of the following. Sender: <EventHubName>. Partition Sender: <EventHubName>/Partitions/<PartitionNumber>. Partition Receiver: <EventHubName>/ConsumerGroups/<ConsumerGroupName>/Partitions/<PartitionNumber>. TrackingId:59faccff54a742ba8eabf5a9d07ecb4b_G8,TimeStamp:12/28/2015 3:18:51 AM

Ensure to use the proper Consumer group in the Receiver:
Highlighted below in servicebus.properties in Receiver below.

AMQP libraries:
            <dependency>
                  <groupId>org.apache.qpid</groupId>
                  <artifactId>qpid-amqp-1-0-common</artifactId>
                  <version>0.32</version>
            </dependency>
            <dependency>
                  <groupId>org.apache.qpid</groupId>
                  <artifactId>qpid-amqp-1-0-client</artifactId>
                  <version>0.32</version>
            </dependency>
            <dependency>
                  <groupId>org.apache.qpid</groupId>
                  <artifactId>qpid-amqp-1-0-client-jms</artifactId>
                  <version>0.32</version>
            </dependency>
            <dependency>
                  <groupId>org.apache.geronimo.specs</groupId>
                  <artifactId>geronimo-jms_1.1_spec</artifactId>
                  <version>1.1.1</version>
            </dependency>

Sender code:
·         servicebus.properties:
connectionfactory.SBCF = amqps://SendRule(your policy):<<url encoded value>>@<your name space>.servicebus.windows.net/?sync-publish=false
queue.EventHub = <<your event hub>>/Partitions/0

e.g.
connectionfactory.SBCF = amqps://SendRule:nju%asadfasdf@xyz-ns.servicebus.windows.net/?sync-publish=false
queue.EventHub = xyz/Partitions/0

·         Initialize connection factory:
            // http://people.apache.org/~rgodfrey/qpid-java-amqp-1-0-client-jms.html
            // Configure JNDI environment
            Hashtable<String, String> env = new Hashtable<String, String>();
            env.put(Context.INITIAL_CONTEXT_FACTORY,
                        "org.apache.qpid.amqp_1_0.jms.jndi.PropertiesFileInitialContextFactory");
            env.put(Context.PROVIDER_URL, "servicebus.properties");
            Context context = new InitialContext(env);

            ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");

            Destination queue = (Destination) context.lookup("EventHub");

            // Create Connection
            Connection connection = cf.createConnection();
·         Create a session:
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
·         Create Message Producer:
            sender = session.createProducer(queue);
·         Send Message:
            BytesMessage message = session.createBytesMessage();
            message.writeBytes(messageCode.getBytes("UTF-8"));
            sender.send(message);

Receiver code:
·         servicebus.properties:
connectionfactory.SBCFR = amqps://ReceiveRule:<<your encoded key>>@xyz-ns.servicebus.windows.net
queue.EventHub = xyz/ConsumerGroups/$Default/Partitions/0

·         Initialize connection factory:
Same as above
·         Create a session:
Same as above
·         Create Message Producer:
            consumer = session.createConsumer(queue);
·         Receive Message:
o   Implement a listener
class Listener implements MessageListener {
      @Override
      public void onMessage(Message arg0) {
o   Read message
            int messageLength =  (int) message.getBodyLength();
            if (messageLength != 0) {
                  byte[] readBytes = new byte[messageLength];
                  message.readBytes(readBytes);
                  return new String(readBytes);
            }
o   Register the listener
            Listener myListener = new Listener();
            consumer.setMessageListener(myListener);
o   Important: Start the connection factory
            connection.start();





References:

2 comments:

  1. Thank you for sharing this information. I find this information is easy to understand and very useful.
    Microsoft Azure Online Training

    ReplyDelete
  2. Nice article I was impressed by seeing this blog, it was very interesting and it is Thanks for sharing all the information with us all.very useful for me. This is good information and really helpful for the people who need information about this.
    oracle training in chennai

    oracle training institute in chennai

    oracle training in bangalore

    oracle training in hyderabad

    oracle training

    oracle online training

    hadoop training in chennai

    hadoop training in bangalore

    ReplyDelete