Why Java Developers Need Java Messaging System?

This post shares info on Java Messaging System. Java developers are using it in their practice for several reasons. You can read the post and find the reasons.

Why messaging system?

In the electronics world, data is the actual term used for all information. For which data, “Message” term can be used? Let’s take simple scenario, if you are registering your details in the web portal, it takes all your information and store entire data in relational database which will be called as a Data. Once you registered successfully, an email functionality triggers and send an email to you saying that account is successfully created. This email event is called as messaging.

In traditional system, both registering new user and sending email are tightly coupled i.e. both are in same application. This design is extremely responsible for the system’s performance. Imagine 100s of user registering in 15 minutes, the same application should focus creating new user and sending email. This process is too costly. To avoid this issue, messaging systems are come into picture.

 It runs independently and just looks up for the message and doesn’t care about where messages are coming from. If the message arrives, it consumes the message and sends it to destination. This is called as loosely coupled and both applications run independently.

The responsibility of Web Portal is to create new user and to send email to user is messaging system’s responsibility. Messaging system is not only used for sending email. There are many scenarios where we need Messaging system such as notification, news feed etc.

There are many messaging system in the market. Rabbit MQ is one of such popular messaging systems in Java, which includes three roles – publisher, subscriber and queue.

Publisher is the one who sends the message. The sent message will store in temporally in a buffer called queue. Subscriber is the one who receives the message. The Rabbit MQ is having all these players that are required for messaging service.

Why we need Apache Kafka?

Since there are more matured messaging systems available in the world, why we need Apache Kafka? That’s a question that triggers developer’s mind.

The other messaging systems handle only low volume data. But Rabbit MQ handles 20K messages per second. Apache kafka handle 100K messages per second. Kafka provides easy way to create many kafka clusters which is called as broker. This is for maintaining load balance. If one broker gets down, the other automatically takes care.

Apache kafka persists the messages in hard disk and java developers can set life time of message.

Following players are included in the Apache Kafka in messaging system-

  • Topics
  • Producer
  • Brokers
  • Consumer

Topics

Topics hold the stream of messages. For example Topic “A” can hold the message “Hello World”. If we create a topic, kafka creates by default one partition. Partition contains sequence of index in which messages are get stored. Based on our requirement we can create many partitions in Topics as in the diagram.

Topic “A”

Partition 1
0
1
2
3
4
5
Partition 2
0
1
2
Partition 3
0
1

Producer

Producer uses the topics to send the messages to broker.

Consumer

Consumer receives the messages from broker.

Broker

Broker acts like a middleware component for producer and consumer. If producer sends messages, those messages are maintained in broker. Consumers consume messages from broker.

Using kafka cluster we can create multiple brokers which maintain the messages and load balance. The beauty of Kafka broker is that it can handle many read/writes manipulation without any performance tuning.

The entire Kafka cluster runs in Zookeeper.

What is zookeeper?

Zookeeper is an environment that maintains all Kafka broker and its state. If any Kafka broker is down or not available, Zookeeper is liable to notify Consumer/Producer, so Consumer/Producer will start looking into another available Broker.

System diagram for Kafka messaging system:

Kafka messaging system environment

Kafka messaging system environment

Who are Leaders and Followers?

I have already told that we can create many Kafka brokers in which one Kafka broker is primary broker that handles all manipulation between Producer and Consumer. This Kafka is called as Leader and other Kafka brokers are called as followers. Due to some situation the leader Kafka is down or shut down for any reason, zookeeper selects a new leader from available follower, and notifies to Consumer/Producer about the new Kafka leader. Consumer/Producer starts looking into correct Kafka broker.

Kafka messaging system setup

Download zookeeper from following Apache link

http://www.apache.org/dyn/closer.cgi/zookeeper/

After download go to download folder and open conf folder

zookeeper-3.4.6\conf

You can see zoo_sample.cfg

Copy the this file as zoo.cfg

Open zoo.cfg

Change following property with correct path.

Data Dir=<Your path>

Zookeeper store all data in this path

You have done with Zookeeper configuration and this is a time start zookeeper server.

3

Double click highlighted file(zkServer) to start the zookeeper. You can see following console after starting the Zookeeper successfully.

4

Now we have configured Zookeeper for your Kafka server.

Download Kafka from following apache link

https://www.apache.org/dyn/closer.cgi?path=/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz

Extract the Kafka download folder as in below screenshot.

5

Go to bin folder and apply following command to start Kafka server.

kafka-server-start F:\Work\kafka\kafka\kafka_2.11-0.9.0.0\config\server.properties

This command starts your Kafka server.

If you want to stop kafka server, go to kafka_2.11-0.9.0.0\bin\windows and click kafka-server-stop

If you are using Linux, go to kafka_2.11-0.9.0.0\bin\

Now we have kafka server that is ready to start. Once you started the kafka server, you can see following console

6

Kafka Producer

publicclass MyProducer

{

                publicstaticvoid main(String[] args) throws Exception{

                                Properties props = new Properties();

                                props.put("bootstrap.servers", "localhost:9092");

                                props.put("acks", "all");

                                props.put("retries", 0);

                                props.put("batch.size", 16384);

                                props.put("buffer.memory", 33554432);

                                props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

                                props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

                                 Producer<String, String>producer = new KafkaProducer(props);

                                for(inti = 0; i< 5; i++) {

                                                producer.send(new ProducerRecord<String, String>("test-topic", "Test-Key " + (i+1) ,"Test-Value" + (i+1)));

                                                }

                                producer.close();

    }

}

The above code sets necessary configuration for Kafka producer and creates a new topic called “test-topic” which carries the five different messages. This topic message is sent by producer using producer.send method.

Kafka Consumer

publicstaticvoid main(String[] args) {

                //Kafka Consumer configuration

 Properties props = new Properties();

 props.put("bootstrap.servers", "localhost:9092");

 props.put("group.id", "test");

 props.put("enable.auto.commit", "true");

 props.put("auto.commit.interval.ms", "1000");

 props.put("session.timeout.ms", "30000");

 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

                props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

                      KafkaConsumer<String, String>consumer = new KafkaConsumer(props);

                consumer.subscribe(Arrays.asList("test-topic"));

while(true){

         ConsumerRecords<String, String>records = consumer.poll(100);

                                for (ConsumerRecord<String, String>record : records)

                                                  System.out.println("Partition Offset = " + record.offset() + " Key = " +  record.key() + " Value = " + record.value());

                                  }

}

In the above code sets, the necessary configuration for kafka Consumer and receiving messages from a producer. The Producer carries the messages via topic called “test-topic”.

Boostrap.servers

It will start up all available brokers in the kafka

Group.id

We can set group name for multiple consumer. In our case our consumer belongs to test group.

enable.auto.commit

It will commit the offset value based on auto.commit.interval.ms

session.timeout.ms

Using this timeout we can find out any failures in Kafka server.

Key.deserializer and value.deserializer

This is for internal string serialization process for message data.

Acks

This property is having three possible value which are 0, 1 and all. If set 0, producer doesn’t wait for acknowledgement. If set 1, producer accepts ack from leader, if set all, producer accepts ack from all replica i.e. if your kafka server is having more than 1 broker, it accepts the ack from all (Both leader and follower).

Retries

Number of try for sending messages to Consumer

batch.size

Buffer size for batch size. Let say producer is sending multiple messages to a partition, he will use the batch.size.

buffer.memory

The buffer.memory used by producer for sending messages.

Hope you have understood what Java Messaging System is and why java developers use it. If you have any question, ask from experts and get answered.