编程
Producer是一个应用程序,它创建消息并发送它们到Kafka broker中。这些producer在本质上是不同。比如,前端应用程序,后端服务,代理服务,适配器对于潜在的系统,Hadoop对于的Producer。这些不同的Producer能够使用不同的语言实现,比如java、C和Python。下面的这部图表解释了消息producer的Kafka API.
下面将详细介绍如果编写一个简单的Producer和Consumer应用程序。
发送简单消息给Kafka broker,Producer端编写类ClusterProducer。
public classClusterProducer extends Thread {
private static final Log log =LogFactory.getLog(ClusterProducer.class);
public void sendData() {
Random rnd = new Random();
Properties props =PropertiesParser.getProperties(PropertiesSettings.PRODUCER_FILE_NAME);
if (props == null) {
log.error("can't loadspecified file " + PropertiesSettings.PRODUCER_FILE_NAME);
return;
}
//set the producer configurationproperties
ProducerConfig config = newProducerConfig(props);
Producer<String, String> producer= new Producer<String, String>(config);
//Send the data
int count = 1;
KeyedMessage<String, String>data;
while (count < 100) {
String sign = "*";
String ip = "192.168.2."+ rnd.nextInt(255);
StringBuffer sb = newStringBuffer();
for (int i = 0; i < count; i++){
sb.append(sign);
}
log.info("set data:" +sb);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
data = new KeyedMessage<String,String>(PropertiesSettings.TOPIC_NAME, ip, sb.toString());
producer.send(data);
count++;
}
producer.close();
}
public void run() {
sendData();
}
public static void main(String[] args) {
new ClusterProducer().sendData();
}
}
定于Consumer获取端,获取对应topic的数据:
public class Consumerextends Thread {
private static final Log log =LogFactory.getLog(Consumer.class);
private final ConsumerConnector consumer;
private final String topic;
public Consumer(String topic) {
consumer =kafka.consumer.Consumer.createJavaConsumerConnector(
createConsumerConfig());
this.topic = topic;
}
private static ConsumerConfigcreateConsumerConfig() {
Properties props = new Properties();
props.put("zookeeper.connect", KafkaProperties.zkConnect);
props.put("group.id",KafkaProperties.groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
}
public void run() {
Map<String, Integer>topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, newInteger(1));
Map<String,List<KafkaStream<byte[], byte[]>>> consumerMap =consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]>stream = consumerMap.get(topic).get(0);
ConsumerIterator<byte[], byte[]>it = stream.iterator();
while (it.hasNext()) {
log.info("+message: " +new String(it.next().message()));
}
}
public static void main(String[] args) {
Consumer client = new Consumer("cluster_statistics_topic");
client.
辅助类:
public interface PropertiesSettings {
final static String CONSUMER_FILE_NAME = "consumer.properties";
final static String PRODUCER_FILE_NAME = "producer.properties";
final static String TOPIC_NAME = "cluster_statistics_topic";
final static String TOPIC_A = "cluster_statistics_topic_A";
}
package com.kafka.utils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
/**
* @author JohnLiu
* @version 0.1.0
* @date 2014/8/27
*/
public class PropertiesParser {
private static final Log log = LogFactory.getLog(PropertiesParser.class);
/* properties file type */
Properties props = null;
/* constructor method*/
public PropertiesParser(Properties props) {
this.props = props;
}
/**
* Get the trimmed String value of the property with the given
* <code>name</code>. If the value the empty String (after
* trimming), then it returns null.
*/
public String getStringProperty(String name) {
return getStringProperty(name, null);
}
/**
* Get the trimmed String value of the property with the given
* <code>name</code> or the given default value if the value is
* null or empty after trimming.
*/
public String getStringProperty(String name, String def) {
String val = props.getProperty(name, def);
if (val == null) {
return def;
}
val = val.trim();
return (val.length() == 0) ? def : val;
}
private Properties loadPropertiesFile() {
Properties props = new Properties();
InputStream in;
ClassLoader cl = getClass().getClassLoader();
if (cl == null)
cl = findClassloader();
if (cl == null)
try {
throw new ProcessingException("Unable to find a class loader on the current thread or class.");
} catch (ProcessingException e) {
e.printStackTrace();
}
in = cl.getResourceAsStream(PropertiesSettings.CONSUMER_FILE_NAME);
try {
props.load(in);
} catch (IOException ioe) {
log.error("can't load " + PropertiesSettings.CONSUMER_FILE_NAME, ioe);
}
return props;
}
private ClassLoader findClassloader() {
// work-around set context loader for windows-service started jvms (QUARTZ-748)
if (Thread.currentThread().getContextClassLoader() == null && getClass().getClassLoader() != null) {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
}
return Thread.currentThread().getContextClassLoader();
}
public static Properties getProperties(final String fileName) {
Properties props = new Properties();
InputStream in = Thread.currentThread().getContextClassLoader()
.getResourceAsStream(fileName);
try {
props.load(in);
} catch (IOException ioe) {
log.error("can't load " + fileName, ioe);
}
return props;
}
}
配置参数文件consumer.properties:
zookeeper.connect=bigdata09:2181,bigdata08:2181,bigdata07:2181
group.id=cluster_group
zookeeper.session.timeout.ms=400
zookeeper.sync.time.ms=200
auto.commit.interval.ms=1000
配置参数文件producer.properties:
metadata.broker.list=bigdata09:9092,bigdata08:9092,bigdata07:9092
serializer.class=kafka.serializer.StringEncoder
#partitioner.class=com.kafka.producer.SimplePartitioner
request.required.acks=1
分别执行上面的代码,可以发送或者得到对应topic信息。
Enjoy yourself!(*^__^*) ……