I have and set out to learn this cool technology.
Before proceeding further, let me share a little bit about Apache Kafka (excerpt taken from its official website):
| Apache Kafka is publish-subscribe messaging rethought as a distributed commit log. |
In a summary, this is what I did:
(1) Installed Hortonworks HDP 2.2 (with Ambari)
(1) Installed Hortonworks HDP 2.2 (with Ambari)
(2) Installed Apache Kafka (and all required components) through Ambari
(3) Configured 3 Kafka brokers.
(4) Downloaded a large sample data set (about 10GB).
NOTE: If you would like to download some sample data sets, you can refer here.
NOTE: If you would like to download some sample data sets, you can refer here.
(5) Wrote a simple producer (see below).
(6) Executed the producer to load data into Kafka.
(7) Executed the console consumer to check on the data.
Eg. kafka-console-consumer.sh --topic datatest --zookeeper hdp1:2181 --from-beginning
Eg. kafka-console-consumer.sh --topic datatest --zookeeper hdp1:2181 --from-beginning
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import java.io.*;
import kafka.producer.*;
import kafka.javaapi.producer.*;
public class KafkaProducer {
private final static String TOPIC = "datatest";
private final static String DELIMITER = "~~";
public static void main(String[] argv)
{
Properties properties = new Properties();
properties.put("metadata.broker.list","hdp1:6667,hdp2:6667,hdp3:6667");
properties.put("serializer.class","kafka.serializer.StringEncoder");
ProducerConfig producerConfig = new ProducerConfig(properties);
kafka.javaapi.producer.Producer<String, String> producer = new kafka.javaapi.producer.Producer<String, String>(producerConfig);
KeyedMessage<String, String> message = null;
try
{
FileReader fr = new FileReader(new File("/opt/data/movies.txt"));
BufferedReader br = new BufferedReader(fr);
String s = null;
String msg = null;
long ctr = 0;
while ((s = br.readLine()) != null)
{
if (s.length() != 0)
{
if (msg == null)
{
msg = s;
}
else
{
msg = msg + DELIMITER + s;
}
}
else
{
ctr++;
message = new KeyedMessage<String, String>(TOPIC, msg);
producer.send(message);
msg = null;
message = null;
}
}
}
catch (IOException fie)
{
System.out.println(fie);
}
producer.close();
}
}
|
No comments:
Post a Comment