Kafka学习整理七producer和consumer编程实践.docx

上传人:b****1 文档编号:13567523 上传时间:2023-06-15 格式:DOCX 页数:31 大小:25.13KB
下载 相关 举报
Kafka学习整理七producer和consumer编程实践.docx_第1页
第1页 / 共31页
Kafka学习整理七producer和consumer编程实践.docx_第2页
第2页 / 共31页
Kafka学习整理七producer和consumer编程实践.docx_第3页
第3页 / 共31页
Kafka学习整理七producer和consumer编程实践.docx_第4页
第4页 / 共31页
Kafka学习整理七producer和consumer编程实践.docx_第5页
第5页 / 共31页
Kafka学习整理七producer和consumer编程实践.docx_第6页
第6页 / 共31页
Kafka学习整理七producer和consumer编程实践.docx_第7页
第7页 / 共31页
Kafka学习整理七producer和consumer编程实践.docx_第8页
第8页 / 共31页
Kafka学习整理七producer和consumer编程实践.docx_第9页
第9页 / 共31页
Kafka学习整理七producer和consumer编程实践.docx_第10页
第10页 / 共31页
Kafka学习整理七producer和consumer编程实践.docx_第11页
第11页 / 共31页
Kafka学习整理七producer和consumer编程实践.docx_第12页
第12页 / 共31页
Kafka学习整理七producer和consumer编程实践.docx_第13页
第13页 / 共31页
Kafka学习整理七producer和consumer编程实践.docx_第14页
第14页 / 共31页
Kafka学习整理七producer和consumer编程实践.docx_第15页
第15页 / 共31页
Kafka学习整理七producer和consumer编程实践.docx_第16页
第16页 / 共31页
Kafka学习整理七producer和consumer编程实践.docx_第17页
第17页 / 共31页
Kafka学习整理七producer和consumer编程实践.docx_第18页
第18页 / 共31页
Kafka学习整理七producer和consumer编程实践.docx_第19页
第19页 / 共31页
Kafka学习整理七producer和consumer编程实践.docx_第20页
第20页 / 共31页
亲,该文档总共31页,到这儿已超出免费预览范围,如果喜欢就下载吧!
下载资源
资源描述

Kafka学习整理七producer和consumer编程实践.docx

《Kafka学习整理七producer和consumer编程实践.docx》由会员分享,可在线阅读,更多相关《Kafka学习整理七producer和consumer编程实践.docx(31页珍藏版)》请在冰点文库上搜索。

Kafka学习整理七producer和consumer编程实践.docx

Kafka学习整理七producer和consumer编程实践

Kafka学习整理七(producer和consumer编程实践)

实践代码采用kafka-clientsV0.10.0.0编写

一、编写producer

第一步:

使用./kafka-topics.sh命令创建topic及partitions分区数

./kafka-topics.sh--create--zookepper"172.16.49.173:

2181"--topic"producer_test"--partitions10replication-factor31

第二步:

实现org.apache.kafka.clients.producer.Partitioner分区接口,以实现自定义的消息分区

importjava.util.List;

importjava.util.Map;

importorg.apache.kafka.clients.producer.Partitioner;

importmon.Cluster;

importmon.PartitionInfo;

importorg.slf4j.Logger;

importorg.slf4j.LoggerFactory;

publicclassMyPartitionimplementsPartitioner{

privatestaticLoggerLOG=LoggerFactory.getLogger(MyPartition.class);

publicMyPartition(){

//TODOAuto-generatedconstructorstub

}

@Override

publicvoidconfigure(Mapconfigs){

//TODOAuto-generatedmethodstub

}

@Override

publicintpartition(Stringtopic,Objectkey,byte[]keyBytes,Objectvalue,byte[]valueBytes,Clustercluster){

//TODOAuto-generatedmethodstub

Listpartitions=cluster.partitionsForTopic(topic);

intnumPartitions=partitions.size();

intpartitionNum=0;

try{

partitionNum=Integer.parseInt((String)key);

}catch(Exceptione){

partitionNum=key.hashCode();

}

LOG.info("themessagesendTotopic:

"+topic+"andthepartitionNum:

"+partitionNum);

returnMath.abs(partitionNum%numPartitions);

}

@Override

publicvoidclose(){

//TODOAuto-generatedmethodstub

}

}

12345678910111213141516171819202122232425262728293031323334353637383940414243

第三步:

编写producer

importjava.util.Properties;

importorg.apache.kafka.clients.producer.Callback;

importorg.apache.kafka.clients.producer.KafkaProducer;

importorg.apache.kafka.clients.producer.ProducerRecord;

importorg.apache.kafka.clients.producer.RecordMetadata;

importorg.slf4j.Logger;

importorg.slf4j.LoggerFactory;

publicclassPartitionTest{

privatestaticLoggerLOG=LoggerFactory.getLogger(PartitionTest.class);

publicstaticvoidmain(String[]args){

//TODOAuto-generatedmethodstub

Propertiesprops=newProperties();

props.put("bootstrap.servers","172.16.49.173:

9092;172.16.49.173:

9093");

props.put("retries",0);

//props.put("batch.size",16384);

props.put("linger.ms",1);

//props.put("buffer.memory",33554432);

props.put("key.serializer","mon.serialization.StringSerializer");

props.put("value.serializer","mon.serialization.StringSerializer");

props.put("partitioner.class","com.goodix.kafka.MyPartition");

KafkaProducerproducer=newKafkaProducer(props);

ProducerRecordrecord=newProducerRecord("producer_test","2223132132",

"test23_60");

producer.send(record,newCallback(){

@Override

publicvoidonCompletion(RecordMetadatametadata,Exceptione){

//TODOAuto-generatedmethodstub

if(e!

=null)

LOG.error("theproducerhasaerror:

"+e.getMessage());

else{

LOG.info("Theoffsetoftherecordwejustsentis:

"+metadata.offset());

LOG.info("Thepartitionoftherecordwejustsentis:

"+metadata.partition());

}

}

});

try{

Thread.sleep(1000);

producer.close();

}catch(InterruptedExceptione1){

//TODOAuto-generatedcatchblock

e1.printStackTrace();

}

}

}1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950

备注:

要先用命令创建topic及partitions分区数;否则在自定义的分区中如果有大于1的情况下,发送数据消息到kafka时会报expiredduetotimeoutwhilerequestingmetadatafrombrokers错误

二、使用OldConsumerHighLevelAPI编写consumer

第一步:

编写具体处理消息的类

importjava.io.UnsupportedEncodingException;

importorg.slf4j.Logger;

importorg.slf4j.LoggerFactory;

importkafka.consumer.ConsumerIterator;

importkafka.consumer.KafkaStream;

importkafka.message.MessageAndMetadata;

 

publicclassConsumerworkimplementsRunnable{

privatestaticLoggerLOG=LoggerFactory.getLogger(Consumerwork.class);

@SuppressWarnings("rawtypes")

privateKafkaStreamm_stream;

privateintm_threadNumber;

@SuppressWarnings("rawtypes")

publicConsumerwork(KafkaStreama_stream,inta_threadNumber){

//TODOAuto-generatedconstructorstub

m_threadNumber=a_threadNumber;

m_stream=a_stream;

}

@SuppressWarnings("unchecked")

@Override

publicvoidrun(){

//TODOAuto-generatedmethodstub

ConsumerIteratorit=m_stream.iterator();

while(it.hasNext())

try{

MessageAndMetadatathisMetadata=it.next();

StringjsonStr=newString(thisMetadata.message(),"utf-8");

LOG.info("Thread"+m_threadNumber+":

"+jsonStr);

LOG.info("partion"+thisMetadata.partition()+",offset:

"+thisMetadata.offset());

try{

Thread.sleep(1000);

}catch(InterruptedExceptione){

//TODOAuto-generatedcatchblock

e.printStackTrace();

}

}catch(UnsupportedEncodingExceptione){

//TODOAuto-generatedcatchblock

e.printStackTrace();

}

}

}12345678910111213141516171819202122232425262728293031323334353637383940414243

第二步:

编写启动Consumer主类

mportjava.util.HashMap;

importjava.util.List;

importjava.util.Map;

importjava.util.Properties;

importjava.util.Scanner;

importjava.util.concurrent.ExecutorService;

importjava.util.concurrent.Executors;

importjava.util.concurrent.TimeUnit;

importorg.slf4j.Logger;

importorg.slf4j.LoggerFactory;

importkafka.consumer.ConsumerConfig;

importkafka.consumer.KafkaStream;

importkafka.javaapi.consumer.ConsumerConnector;

publicclassConsumerGroup{

privatefinalConsumerConnectorconsumer;

privatefinalStringtopic;

privateExecutorServiceexecutor;

privatestaticLoggerLOG=LoggerFactory.getLogger(ConsumerGroup.class);

publicConsumerGroup(Stringa_zookeeper,Stringa_groupId,Stringa_topic){

consumer=kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper,a_groupId));

this.topic=a_topic;

}

publicstaticvoidmain(String[]args){

Scannersc=newScanner(System.in);

System.out.println("请输入zookeeper集群地址(如zk1:

2181,zk2:

2181,zk3:

2181):

");

StringzooKeeper=sc.nextLine();

System.out.println("请输入指定的消费group名称:

");

StringgroupId=sc.nextLine();

System.out.println("请输入指定的消费topic名称:

");

Stringtopic=sc.nextLine();

System.out.println("请输入指定的消费处理线程数:

");

intthreads=sc.nextInt();

LOG.info("Startingconsumerkafkamessageswithzk:

"+zooKeeper+"andthetopicis"+topic);

ConsumerGroupexample=newConsumerGroup(zooKeeper,groupId,topic);

example.run(threads);

try{

Thread.sleep(1000);

}catch(InterruptedExceptionie){

}

//example.shutdown();

}

privatevoidshutdown(){

//TODOAuto-generatedmethodstub

if(consumer!

=null)

consumer.shutdown();

if(executor!

=null)

executor.shutdown();

try{

if(!

executor.awaitTermination(5000,TimeUnit.MILLISECONDS)){

LOG.info("Timedoutwaitingforconsumerthreadstoshutdown,exitinguncleanly");

}

}catch(InterruptedExceptione){

LOG.info("Interruptedduringshutdown,exitinguncleanly");

}

}

privatevoidrun(inta_numThreads){

//TODOAuto-generatedmethodstub

MaptopicCountMap=newHashMap();

topicCountMap.put(topic,newInteger(a_numThreads));

Map>>consumerMap=consumer.createMessageStreams(topicCountMap);

List>streams=consumerMap.get(topic);

//nowlaunchallthethreads

//

executor=Executors.newFixedThreadPool(a_numThreads);

//nowcreateanobjecttoconsumethemessages

//

intthreadNumber=0;

LOG.info("thestreamssizeis"+streams.size());

for(finalKafkaStreamstream:

streams){

executor.submit(newcom.goodix.kafka.oldconsumer.Consumerwork(stream,threadNumber));

//mitOffsets();

threadNumber++;

}

}

privateConsumerConfigcreateConsumerConfig(Stringa_zookeeper,Stringa_groupId){

//TODOAuto-generatedmethodstub

Propertiesprops=newProperties();

props.put("zookeeper.connect",a_zookeeper);

props.put("group.id",a_groupId);

props.put("zookeeper.session.timeout.ms","60000");

props.put("zookeeper.sync.time.ms","200");

props.put("mit.interval.ms","1000");

props.put("auto.offset.reset","smallest");

//props.put("rebalance.max.retries","5");

//props.put("rebalance.backoff.ms","15000");

returnnewConsumerConfig(props);

}

}

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798

1.topicCountMap.put(topic,newInteger(a_numThreads))是告诉Kafka我有多少个线程来处理消息。

(1).这个线程数必须是小等于topic的partition分区数;可以通过./kafka-topics.sh--describe--zookeeper"172.16.49.173:

2181"--topic"producer_test"命令来查看分区的情况

(2).kafka会根据partition.assignment.strategy指定的分配策略来指定线程消费那些分区的消息;这里没有单独配置该项即是采用的默认值range策略(按照阶段平均分配)。

比如分区有10个、线程数有3个,则线程1消费0,1,2,3,线程2消费4,5,6,线程3消费7,8,9。

另外一种是roundrobin(循环分配策略),官方文档中写有使用该策略有两个前提条件的,所以一般不要去设定。

(3).经过测试:

consumerMap.get(topic).size(),应该是获得的目前该topic有数据的分区数

(4).stream即指的是来自一个或多个服务器上的一个或者多个partition的消息。

每一个stream都对应一个单线程处理。

因此,client能够设置满足自己需求的stream数目。

总之,一个stream也许代表了多个服务器partion的消息的聚合,但是每一个partition都只能到一个stream

2.Executors.newFixedThreadPool(a_numThreads)是创建一个创建固定容量大小的缓冲池:

每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。

线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

3.props.put(“auto.offset.reset”,“smallest”)是指定从最小没有被消费offset开始;如果没有指定该项则是默认的为largest,这样的话该consumer就得不到生产者先产生的消息。

4.要使用oldconsumerAPI需要引用kafka_2.11以及kafka-clients。

 

org.apache.kafka

kafka_2.11

0.10.0.0

 

org.apache.kafka

kafka-clients

0.10.0.0

12345678910

三、使用OldSimpleConsumerAPI编写consumer

这是一个更加底层和复杂的API

使用的场景

由于使用该API需要自己控制的项比较多,也比较复杂,官方给出了一些合适的适用场景,也可以理解成为这些场景是HighLevelConsumerAPI不能够做到的

1.针对一个消息读取多次

2.在一个process中,仅仅处理一个topic中的一个partitions

3.使用事务,确保每个消息只被处理一次123

需要处理的事情

1.必须在程序中跟踪offset值

2.必须找出指定TopicPartition中的leadbroker

3.必须处理broker的变动123

使用SimpleConsumer的步骤

首先,你必须知道读哪个topic的哪个partition

然后,找到负责该partition的brokerleader,从而找到存有该partition副本的那个bro

展开阅读全文
相关资源
猜你喜欢
相关搜索
资源标签

当前位置:首页 > 临时分类 > 批量上传

copyright@ 2008-2023 冰点文库 网站版权所有

经营许可证编号:鄂ICP备19020893号-2