Steps:
1. Create Event Hub
2. Configure Event Hub with Send/Receive policy (this
step will provide the keys to connect)
3. Encode the key
4. Ensure to use proper partition
5. Ensure to use the proper Consumer group in the
Receiver.
6. AMQP libraries
7. Sender code
8. Receiver code
Create Event Hub:
Configure Event Hub with rules:
Encode the keys:
Collect the keys from EventHub configuration from the above
steps.
String
myKey = "myKey=";
URLEncoder.encode(myKey);
Ensure to use proper partition:
This is an important step. This is unlike other AMQP
implementation. Event hub creates partitions for parallel processing, ensure
that your sender and receiver uses the correct partitions. The properties could
be specified by properties file.
The partition information is highlighted in the servicebus.properties
files below.
Tips:
1. Partition
while sending
Your connection to the Queue
will go through.
2. Partition
while receiving
Your default connection will
not go through and you will receive error.
Exception in thread
"main" javax.jms.JMSException: Invalid EventHub address. It must be
either of the following. Sender: <EventHubName>. Partition Sender:
<EventHubName>/Partitions/<PartitionNumber>. Partition Receiver:
<EventHubName>/ConsumerGroups/<ConsumerGroupName>/Partitions/<PartitionNumber>.
TrackingId:59faccff54a742ba8eabf5a9d07ecb4b_G8,TimeStamp:12/28/2015 3:18:51 AM
Ensure to use the proper Consumer group in the
Receiver:
Highlighted
below in servicebus.properties in Receiver below.
AMQP libraries:
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-amqp-1-0-common</artifactId>
<version>0.32</version>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-amqp-1-0-client</artifactId>
<version>0.32</version>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-amqp-1-0-client-jms</artifactId>
<version>0.32</version>
</dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jms_1.1_spec</artifactId>
<version>1.1.1</version>
</dependency>
Sender code:
·
servicebus.properties:
connectionfactory.SBCF = amqps://SendRule(your
policy):<<url encoded value>>@<your name space>.servicebus.windows.net/?sync-publish=false
queue.EventHub = <<your event hub>>/Partitions/0
e.g.
connectionfactory.SBCF = amqps://SendRule:nju%asadfasdf@xyz-ns.servicebus.windows.net/?sync-publish=false
queue.EventHub = xyz/Partitions/0
·
Initialize connection factory:
//
http://people.apache.org/~rgodfrey/qpid-java-amqp-1-0-client-jms.html
// Configure
JNDI environment
Hashtable<String,
String> env = new Hashtable<String,
String>();
env.put(Context.INITIAL_CONTEXT_FACTORY,
"org.apache.qpid.amqp_1_0.jms.jndi.PropertiesFileInitialContextFactory");
env.put(Context.PROVIDER_URL, "servicebus.properties");
Context
context = new InitialContext(env);
ConnectionFactory
cf =
(ConnectionFactory) context.lookup("SBCF");
Destination
queue = (Destination)
context.lookup("EventHub");
// Create
Connection
Connection
connection = cf.createConnection();
·
Create a session:
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
·
Create Message Producer:
sender = session.createProducer(queue);
·
Send Message:
BytesMessage
message = session.createBytesMessage();
message.writeBytes(messageCode.getBytes("UTF-8"));
sender.send(message);
Receiver code:
·
servicebus.properties:
connectionfactory.SBCFR = amqps://ReceiveRule:<<your
encoded key>>@xyz-ns.servicebus.windows.net
queue.EventHub = xyz/ConsumerGroups/$Default/Partitions/0
·
Initialize connection factory:
Same as above
·
Create a session:
Same as above
·
Create Message Producer:
consumer = session.createConsumer(queue);
·
Receive Message:
o Implement
a listener
class Listener implements MessageListener
{
@Override
public void
onMessage(Message arg0) {
o Read
message
int messageLength = (int) message.getBodyLength();
if (messageLength != 0) {
byte[] readBytes = new byte[messageLength];
message.readBytes(readBytes);
return new String(readBytes);
}
o Register
the listener
Listener
myListener = new Listener();
consumer.setMessageListener(myListener);
o Important:
Start the connection factory
connection.start();
References: