I have and set out to learn this cool technology.
Before proceeding further, let me share a little bit about Apache Kafka (excerpt taken from its official website):
Apache Kafka is publish-subscribe messaging rethought as a distributed commit log. |
In a summary, this is what I did:
(1) Installed Hortonworks HDP 2.2 (with Ambari)
(1) Installed Hortonworks HDP 2.2 (with Ambari)
(2) Installed Apache Kafka (and all required components) through Ambari
(3) Configured 3 Kafka brokers.
(4) Downloaded a large sample data set (about 10GB).
NOTE: If you would like to download some sample data sets, you can refer here.
NOTE: If you would like to download some sample data sets, you can refer here.
(5) Wrote a simple producer (see below).
(6) Executed the producer to load data into Kafka.
(7) Executed the console consumer to check on the data.
Eg. kafka-console-consumer.sh --topic datatest --zookeeper hdp1:2181 --from-beginning
Eg. kafka-console-consumer.sh --topic datatest --zookeeper hdp1:2181 --from-beginning
import java.text.SimpleDateFormat; import java.util.Date; import java.util.Properties; import java.io.*; import kafka.producer.*; import kafka.javaapi.producer.*; public class KafkaProducer { private final static String TOPIC = "datatest"; private final static String DELIMITER = "~~"; public static void main(String[] argv) { Properties properties = new Properties(); properties.put("metadata.broker.list","hdp1:6667,hdp2:6667,hdp3:6667"); properties.put("serializer.class","kafka.serializer.StringEncoder"); ProducerConfig producerConfig = new ProducerConfig(properties); kafka.javaapi.producer.Producer<String, String> producer = new kafka.javaapi.producer.Producer<String, String>(producerConfig); KeyedMessage<String, String> message = null; try { FileReader fr = new FileReader(new File("/opt/data/movies.txt")); BufferedReader br = new BufferedReader(fr); String s = null; String msg = null; long ctr = 0; while ((s = br.readLine()) != null) { if (s.length() != 0) { if (msg == null) { msg = s; } else { msg = msg + DELIMITER + s; } } else { ctr++; message = new KeyedMessage<String, String>(TOPIC, msg); producer.send(message); msg = null; message = null; } } } catch (IOException fie) { System.out.println(fie); } producer.close(); } } |
No comments:
Post a Comment