Steps:
1. Create Event Hub
2. Configure Event Hub with Send/Receive policy (this
step will provide the keys to connect)
3. Encode the key
4. Ensure to use proper partition
5. Ensure to use the proper Consumer group in the
Receiver.
6. AMQP libraries
7. Sender code
8. Receiver code
Create Event Hub:
Follow steps in https://azure.microsoft.com/en-us/documentation/articles/event-hubs-csharp-ephcs-getstarted/
Configure Event Hub with rules:
https://azure.microsoft.com/en-us/documentation/articles/event-hubs-csharp-ephcs-getstarted/
(Step 6)
Encode the keys:
Collect the keys from EventHub configuration from the above
steps.
String
myKey = "myKey=";
URLEncoder.encode(myKey);
Ensure to use proper partition:
This is an important step. This is unlike other AMQP
implementation. Event hub creates partitions for parallel processing, ensure
that your sender and receiver uses the correct partitions. The properties could
be specified by properties file.
The partition information is highlighted in the servicebus.properties
files below.
Tips:
1. Partition
while sending
Your connection to the Queue
will go through.
2. Partition
while receiving
Your default connection will
not go through and you will receive error.
Exception in thread
"main" javax.jms.JMSException: Invalid EventHub address. It must be
either of the following. Sender: <EventHubName>. Partition Sender:
<EventHubName>/Partitions/<PartitionNumber>. Partition Receiver:
<EventHubName>/ConsumerGroups/<ConsumerGroupName>/Partitions/<PartitionNumber>.
TrackingId:59faccff54a742ba8eabf5a9d07ecb4b_G8,TimeStamp:12/28/2015 3:18:51 AM
Ensure to use the proper Consumer group in the
Receiver:
Highlighted
below in servicebus.properties in Receiver below.
AMQP libraries:
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-amqp-1-0-common</artifactId>
<version>0.32</version>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-amqp-1-0-client</artifactId>
<version>0.32</version>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-amqp-1-0-client-jms</artifactId>
<version>0.32</version>
</dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jms_1.1_spec</artifactId>
<version>1.1.1</version>
</dependency>
Sender code:
·
servicebus.properties:
connectionfactory.SBCF = amqps://SendRule(your
policy):<<url encoded value>>@<your name space>.servicebus.windows.net/?sync-publish=false
queue.EventHub = <<your event hub>>/Partitions/0
e.g.
connectionfactory.SBCF = amqps://SendRule:nju%asadfasdf@xyz-ns.servicebus.windows.net/?sync-publish=false
queue.EventHub = xyz/Partitions/0
·
Initialize connection factory:
//
http://people.apache.org/~rgodfrey/qpid-java-amqp-1-0-client-jms.html
// Configure
JNDI environment
Hashtable<String,
String> env = new Hashtable<String,
String>();
env.put(Context.INITIAL_CONTEXT_FACTORY,
"org.apache.qpid.amqp_1_0.jms.jndi.PropertiesFileInitialContextFactory");
env.put(Context.PROVIDER_URL, "servicebus.properties");
Context
context = new InitialContext(env);
ConnectionFactory
cf =
(ConnectionFactory) context.lookup("SBCF");
Destination
queue = (Destination)
context.lookup("EventHub");
// Create
Connection
Connection
connection = cf.createConnection();
·
Create a session:
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
·
Create Message Producer:
sender = session.createProducer(queue);
·
Send Message:
BytesMessage
message = session.createBytesMessage();
message.writeBytes(messageCode.getBytes("UTF-8"));
sender.send(message);
Receiver code:
·
servicebus.properties:
connectionfactory.SBCFR = amqps://ReceiveRule:<<your
encoded key>>@xyz-ns.servicebus.windows.net
queue.EventHub = xyz/ConsumerGroups/$Default/Partitions/0
·
Initialize connection factory:
Same as above
·
Create a session:
Same as above
·
Create Message Producer:
consumer = session.createConsumer(queue);
·
Receive Message:
o Implement
a listener
class Listener implements MessageListener
{
@Override
public void
onMessage(Message arg0) {
o Read
message
int messageLength = (int) message.getBodyLength();
if (messageLength != 0) {
byte[] readBytes = new byte[messageLength];
message.readBytes(readBytes);
return new String(readBytes);
}
o Register
the listener
Listener
myListener = new Listener();
consumer.setMessageListener(myListener);
o Important:
Start the connection factory
connection.start();
References:
·
https://azure.microsoft.com/en-us/documentation/articles/service-bus-java-how-to-use-jms-api-amqp/
Thank you for sharing this information. I find this information is easy to understand and very useful.
ReplyDeleteMicrosoft Azure Online Training
Nice article I was impressed by seeing this blog, it was very interesting and it is Thanks for sharing all the information with us all.very useful for me. This is good information and really helpful for the people who need information about this.
ReplyDeleteoracle training in chennai
oracle training institute in chennai
oracle training in bangalore
oracle training in hyderabad
oracle training
oracle online training
hadoop training in chennai
hadoop training in bangalore