Monday, February 2, 2015

Apache Kafka: A simple producer

If you are into big data and analytic, you must have heard of Apache Kafka lately.

I have and set out to learn this cool technology.

Before proceeding further, let me share a little bit about Apache Kafka (excerpt taken from its official website):
Apache Kafka is publish-subscribe messaging rethought as a distributed commit log.

In a summary, this is what I did:
(1) Installed Hortonworks HDP 2.2 (with Ambari)
(2) Installed Apache Kafka (and all required components) through Ambari
(3) Configured 3 Kafka brokers.
(4) Downloaded a large sample data set (about 10GB).
NOTE: If you would like to download some sample data sets, you can refer here.
(5) Wrote a simple producer (see below).
(6) Executed the producer to load data into Kafka.
(7) Executed the console consumer to check on the data.
Eg. kafka-console-consumer.sh --topic datatest --zookeeper hdp1:2181 --from-beginning


import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties; 
import java.io.*;
import kafka.producer.*;
import kafka.javaapi.producer.*;
 
public class KafkaProducer {
 private final static String TOPIC = "datatest";
 private final static String DELIMITER = "~~";

 public static void main(String[] argv)
 {
  Properties properties = new Properties();
  properties.put("metadata.broker.list","hdp1:6667,hdp2:6667,hdp3:6667");
  properties.put("serializer.class","kafka.serializer.StringEncoder");

  ProducerConfig producerConfig = new ProducerConfig(properties);
  kafka.javaapi.producer.Producer<String, String> producer = new kafka.javaapi.producer.Producer<String, String>(producerConfig);

  KeyedMessage<String, String> message = null;

  try
  {
   FileReader fr = new FileReader(new File("/opt/data/movies.txt"));
   BufferedReader br = new BufferedReader(fr);
   
   String s = null;
   String msg = null;
   long ctr = 0;

   while ((s = br.readLine()) != null)
   {
    if (s.length() != 0)
    {
     if (msg == null)
     {
      msg = s;
     }
     else
     {
      msg = msg + DELIMITER + s;
     }
    }
    else
    {
     ctr++;
     message = new KeyedMessage<String, String>(TOPIC, msg);
     producer.send(message);
     msg = null; 
     message = null;
    } 
   }
  }
  catch (IOException fie)
  {
   System.out.println(fie);
  }

  producer.close();
 }
} 

No comments:

Post a Comment