spring boot整合spring-kafka实现发送接收消息实例代码
前言
由于我们的新项目使用的是spring-boot,而又要同步新项目中建的数据到老的系统当中.原来已经有一部分的同步代码,使用的是kafka. 其实只是做数据的同步,我觉得选MQ没必要使用kafka.首先数据量不大,其实搞kafka又要搞集群,ZK.只是用做一些简单数据同步的话,有点大材小用.
没办法,咱只是个打工的,领导让搞就搞吧.刚开始的时候发现有一个spring-integration-kafka,描述中说是基于spring-kafka做了一次重写.但是我看了官方文档.实在是搞的有点头大.功能一直没实现.文档写的也不是很漂亮,也可能是刚起步,有很多的问题.我这里只能放弃了,使用了spring-kafka.
实现方法
pom.xml文件如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 | <? xml version = "1.0" encoding = "UTF-8" ?> < project xmlns = "http://maven.apache.org/POM/4.0.0" xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation = "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > < modelVersion >4.0.0</ modelVersion > < groupId >org.linuxsogood.sync</ groupId > < artifactId >linuxsogood-sync</ artifactId > < version >1.0.0-SNAPSHOT</ version > < parent > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-parent</ artifactId > < version >1.4.0.RELEASE</ version > </ parent > < properties > < java.version >1.8</ java.version > <!-- 依赖版本 --> < mybatis.version >3.3.1</ mybatis.version > < mybatis.spring.version >1.2.4</ mybatis.spring.version > < mapper.version >3.3.6</ mapper.version > < pagehelper.version >4.1.1</ pagehelper.version > </ properties > < dependencies > < dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-web</ artifactId > </ dependency > < dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-jdbc</ artifactId > </ dependency > < dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-aop</ artifactId > </ dependency > < dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-freemarker</ artifactId > </ dependency > <!--<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> <scope>compile</scope> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-kafka</artifactId> <version>2.0.1.RELEASE</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-core</artifactId> <version>4.3.1.RELEASE</version> <scope>compile</scope> </dependency>--> < dependency > < groupId >org.springframework.kafka</ groupId > < artifactId >spring-kafka</ artifactId > < version >1.1.0.RELEASE</ version > </ dependency > <!--<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <version>1.1.0.RELEASE</version> </dependency>--> < dependency > < groupId >junit</ groupId > < artifactId >junit</ artifactId > < version >4.12</ version > < scope >test</ scope > </ dependency > < dependency > < groupId >org.assertj</ groupId > < artifactId >assertj-core</ artifactId > < version >3.5.2</ version > </ dependency > < dependency > < groupId >org.hamcrest</ groupId > < artifactId >hamcrest-all</ artifactId > < version >1.3</ version > < scope >test</ scope > </ dependency > < dependency > < groupId >org.mockito</ groupId > < artifactId >mockito-all</ artifactId > < version >1.9.5</ version > < scope >test</ scope > </ dependency > < dependency > < groupId >org.springframework</ groupId > < artifactId >spring-test</ artifactId > < version >4.2.3.RELEASE</ version > < scope >test</ scope > </ dependency > < dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-test</ artifactId > < scope >test</ scope > </ dependency > < dependency > < groupId >mysql</ groupId > < artifactId >mysql-connector-java</ artifactId > </ dependency > < dependency > < groupId >com.microsoft.sqlserver</ groupId > < artifactId >sqljdbc4</ artifactId > < version >4.0.0</ version > </ dependency > < dependency > < groupId >com.alibaba</ groupId > < artifactId >druid</ artifactId > < version >1.0.11</ version > </ dependency > <!--Mybatis--> < dependency > < groupId >org.mybatis</ groupId > < artifactId >mybatis</ artifactId > < version >${mybatis.version}</ version > </ dependency > < dependency > < groupId >org.mybatis</ groupId > < artifactId >mybatis-spring</ artifactId > < version >${mybatis.spring.version}</ version > </ dependency > <!--<dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>1.1.1</version> </dependency>--> <!-- Mybatis Generator --> < dependency > < groupId >org.mybatis.generator</ groupId > < artifactId >mybatis-generator-core</ artifactId > < version >1.3.2</ version > < scope >compile</ scope > < optional >true</ optional > </ dependency > <!--分页插件--> < dependency > < groupId >com.github.pagehelper</ groupId > < artifactId >pagehelper</ artifactId > < version >${pagehelper.version}</ version > </ dependency > <!--通用Mapper--> < dependency > < groupId >tk.mybatis</ groupId > < artifactId >mapper</ artifactId > < version >${mapper.version}</ version > </ dependency > < dependency > < groupId >com.alibaba</ groupId > < artifactId >fastjson</ artifactId > < version >1.2.17</ version > </ dependency > </ dependencies > < repositories > < repository > < id >repo.spring.io.milestone</ id > < name >Spring Framework Maven Milestone Repository</ name > < url >https://repo.spring.io/libs-milestone</ url > </ repository > </ repositories > < build > < finalName >mybatis_generator</ finalName > < plugins > < plugin > < groupId >org.mybatis.generator</ groupId > < artifactId >mybatis-generator-maven-plugin</ artifactId > < version >1.3.2</ version > < configuration > < verbose >true</ verbose > < overwrite >true</ overwrite > </ configuration > </ plugin > < plugin > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-maven-plugin</ artifactId > < configuration > < mainClass >org.linuxsogood.sync.Starter</ mainClass > </ configuration > </ plugin > </ plugins > </ build > </ project > |
orm层使用了MyBatis,又使用了通用Mapper和分页插件.
kafka消费端配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 | import org.linuxsogood.sync.listener.Listener; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafka public class KafkaConsumerConfig { @Value ( "${kafka.broker.address}" ) private String brokerAddress; @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency( 3 ); factory.getContainerProperties().setPollTimeout( 3000 ); return factory; } @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this .brokerAddress); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false ); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100" ); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000" ); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer. class ); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer. class ); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "firehome-group" ); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest" ); return propsMap; } @Bean public Listener listener() { return new Listener(); } } |
生产者的配置.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 | import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafka public class KafkaProducerConfig { @Value ( "${kafka.broker.address}" ) private String brokerAddress; @Bean public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this .brokerAddress); props.put(ProducerConfig.RETRIES_CONFIG, 0 ); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384 ); props.put(ProducerConfig.LINGER_MS_CONFIG, 1 ); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432 ); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer. class ); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer. class ); return props; } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<String, String>(producerFactory()); } } |
监听,监听里面,写的就是业务逻辑了,从kafka里面得到数据后,具体怎么去处理. 如果需要开启kafka处理消息的广播模式,多个监听要监听不同的group,即方法上的注解@KafkaListener里的group一定要不一样.如果多个监听里的group写的一样,就会造成只有一个监听能处理其中的消息,另外监听就不能处理消息了.也即是kafka的分布式消息处理方式.
在同一个group里的监听,共同处理接收到的消息,会根据一定的算法来处理.如果不在一个组,但是监听的是同一个topic的话,就会形成广播模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 | import com.alibaba.fastjson.JSON; import org.linuxsogood.qilian.enums.CupMessageType; import org.linuxsogood.qilian.kafka.MessageWrapper; import org.linuxsogood.qilian.model.store.Store; import org.linuxsogood.sync.mapper.StoreMapper; import org.linuxsogood.sync.model.StoreExample; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import java.util.List; import java.util.Optional; public class Listener { private static final Logger LOGGER = LoggerFactory.getLogger(Listener. class ); @Autowired private StoreMapper storeMapper; /** * 监听kafka消息,如果有消息则消费,同步数据到新烽火的库 * @param record 消息实体bean */ @KafkaListener (topics = "linuxsogood-topic" , group = "sync-group" ) public void listen(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); try { MessageWrapper messageWrapper = JSON.parseObject(message.toString(), MessageWrapper. class ); CupMessageType type = messageWrapper.getType(); //判断消息的数据类型,不同的数据入不同的表 if (CupMessageType.STORE == type) { proceedStore(messageWrapper); } } catch (Exception e) { LOGGER.error( "将接收到的消息保存到数据库时异常, 消息:{}, 异常:{}" ,message.toString(),e); } } } /** * 消息是店铺类型,店铺消息处理入库 * @param messageWrapper 从kafka中得到的消息 */ private void proceedStore(MessageWrapper messageWrapper) { Object data = messageWrapper.getData(); Store cupStore = JSON.parseObject(data.toString(), Store. class ); StoreExample storeExample = new StoreExample(); String storeName = StringUtils.isBlank(cupStore.getStoreOldName()) ? cupStore.getStoreName() : cupStore.getStoreOldName(); storeExample.createCriteria().andStoreNameEqualTo(storeName); List<org.linuxsogood.sync.model.Store> stores = storeMapper.selectByExample(storeExample); org.linuxsogood.sync.model.Store convertStore = new org.linuxsogood.sync.model.Store(); org.linuxsogood.sync.model.Store store = convertStore.convert(cupStore); //如果查询不到记录则新增 if (stores.size() == 0 ) { storeMapper.insert(store); } else { store.setStoreId(stores.get( 0 ).getStoreId()); storeMapper.updateByPrimaryKey(store); } } } |
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作能带来一定的帮助,如果有疑问大家可以留言交流,谢谢大家对脚本之家的支持。
微信公众号搜索 “ 脚本之家 ” ,选择关注
程序猿的那些事、送书等活动等着你
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若内容造成侵权/违法违规/事实不符,请将相关资料发送至 reterry123@163.com 进行投诉反馈,一经查实,立即处理!
相关文章
SpringBoot基于Swagger2构建API文档过程解析
这篇文章主要介绍了SpringBoot基于Swagger2构建API文档过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下2019-11-11Spring中的@ExceptionHandler异常拦截器
这篇文章主要介绍了Spring中的@ExceptionHandler异常拦截器,Spring的@ExceptionHandler可以用来统一处理方法抛出的异常,给方法加上@ExceptionHandler注解,这个方法就会处理类中其他方法抛出的异常,需要的朋友可以参考下2024-01-01mybatis plus新增(insert)数据获取主键id的问题
这篇文章主要介绍了mybatis plus新增(insert)数据获取主键id的问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教2023-03-03springboot定时任务@Scheduled执行多次的问题
这篇文章主要介绍了springboot定时任务@Scheduled执行多次问题的解决,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教2021-10-10
最新评论