spring boot整合spring-kafka实现发送接收消息实例代码

 更新时间:2017年06月29日 11:34:27   作者:honway  
这篇文章主要给大家介绍了关于spring-boot整合spring-kafka实现发送接收消息的相关资料,文中介绍的非常详细,对大家具有一定的参考学习价值,需要的朋友们下面跟着小编一起来看看吧。

Java技术迷

前言

由于我们的新项目使用的是spring-boot,而又要同步新项目中建的数据到老的系统当中.原来已经有一部分的同步代码,使用的是kafka. 其实只是做数据的同步,我觉得选MQ没必要使用kafka.首先数据量不大,其实搞kafka又要搞集群,ZK.只是用做一些简单数据同步的话,有点大材小用.

没办法,咱只是个打工的,领导让搞就搞吧.刚开始的时候发现有一个spring-integration-kafka,描述中说是基于spring-kafka做了一次重写.但是我看了官方文档.实在是搞的有点头大.功能一直没实现.文档写的也不是很漂亮,也可能是刚起步,有很多的问题.我这里只能放弃了,使用了spring-kafka.

实现方法

pom.xml文件如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>
  
 <groupId>org.linuxsogood.sync</groupId>
 <artifactId>linuxsogood-sync</artifactId>
 <version>1.0.0-SNAPSHOT</version>
  
 <parent>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-parent</artifactId>
  <version>1.4.0.RELEASE</version>
 </parent>
  
 <properties>
  <java.version>1.8</java.version>
  <!-- 依赖版本 -->
  <mybatis.version>3.3.1</mybatis.version>
  <mybatis.spring.version>1.2.4</mybatis.spring.version>
  <mapper.version>3.3.6</mapper.version>
  <pagehelper.version>4.1.1</pagehelper.version>
 </properties>
  
 <dependencies>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
  </dependency>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-jdbc</artifactId>
  </dependency>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-aop</artifactId>
  </dependency>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-freemarker</artifactId>
  </dependency>
  <!--<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-integration</artifactId>
   <scope>compile</scope>
  </dependency>
  <dependency>
   <groupId>org.springframework.integration</groupId>
   <artifactId>spring-integration-kafka</artifactId>
   <version>2.0.1.RELEASE</version>
   <scope>compile</scope>
  </dependency>
  <dependency>
   <groupId>org.springframework.integration</groupId>
   <artifactId>spring-integration-core</artifactId>
   <version>4.3.1.RELEASE</version>
   <scope>compile</scope>
  </dependency>-->
  <dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
   <version>1.1.0.RELEASE</version>
  </dependency>
  <!--<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka-test</artifactId>
   <version>1.1.0.RELEASE</version>
  </dependency>-->
  <dependency>
   <groupId>junit</groupId>
   <artifactId>junit</artifactId>
   <version>4.12</version>
   <scope>test</scope>
  </dependency>
  <dependency>
   <groupId>org.assertj</groupId>
   <artifactId>assertj-core</artifactId>
   <version>3.5.2</version>
  </dependency>
  <dependency>
   <groupId>org.hamcrest</groupId>
   <artifactId>hamcrest-all</artifactId>
   <version>1.3</version>
   <scope>test</scope>
  </dependency>
  <dependency>
   <groupId>org.mockito</groupId>
   <artifactId>mockito-all</artifactId>
   <version>1.9.5</version>
   <scope>test</scope>
  </dependency>
  <dependency>
   <groupId>org.springframework</groupId>
   <artifactId>spring-test</artifactId>
   <version>4.2.3.RELEASE</version>
   <scope>test</scope>
  </dependency>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-test</artifactId>
   <scope>test</scope>
  </dependency>
  <dependency>
   <groupId>mysql</groupId>
   <artifactId>mysql-connector-java</artifactId>
  </dependency>
  <dependency>
   <groupId>com.microsoft.sqlserver</groupId>
   <artifactId>sqljdbc4</artifactId>
   <version>4.0.0</version>
  </dependency>
  <dependency>
   <groupId>com.alibaba</groupId>
   <artifactId>druid</artifactId>
   <version>1.0.11</version>
  </dependency>
  
  <!--Mybatis-->
  <dependency>
   <groupId>org.mybatis</groupId>
   <artifactId>mybatis</artifactId>
   <version>${mybatis.version}</version>
  </dependency>
  <dependency>
   <groupId>org.mybatis</groupId>
   <artifactId>mybatis-spring</artifactId>
   <version>${mybatis.spring.version}</version>
  </dependency>
  <!--<dependency>
   <groupId>org.mybatis.spring.boot</groupId>
   <artifactId>mybatis-spring-boot-starter</artifactId>
   <version>1.1.1</version>
  </dependency>-->
  <!-- Mybatis Generator -->
  <dependency>
   <groupId>org.mybatis.generator</groupId>
   <artifactId>mybatis-generator-core</artifactId>
   <version>1.3.2</version>
   <scope>compile</scope>
   <optional>true</optional>
  </dependency>
  <!--分页插件-->
  <dependency>
   <groupId>com.github.pagehelper</groupId>
   <artifactId>pagehelper</artifactId>
   <version>${pagehelper.version}</version>
  </dependency>
  <!--通用Mapper-->
  <dependency>
   <groupId>tk.mybatis</groupId>
   <artifactId>mapper</artifactId>
   <version>${mapper.version}</version>
  </dependency>
  <dependency>
   <groupId>com.alibaba</groupId>
   <artifactId>fastjson</artifactId>
   <version>1.2.17</version>
  </dependency>
 </dependencies>
 <repositories>
  <repository>
   <id>repo.spring.io.milestone</id>
   <name>Spring Framework Maven Milestone Repository</name>
   <url>https://repo.spring.io/libs-milestone</url>
  </repository>
 </repositories>
 <build>
  <finalName>mybatis_generator</finalName>
  <plugins>
   <plugin>
    <groupId>org.mybatis.generator</groupId>
    <artifactId>mybatis-generator-maven-plugin</artifactId>
    <version>1.3.2</version>
    <configuration>
     <verbose>true</verbose>
     <overwrite>true</overwrite>
    </configuration>
   </plugin>
   <plugin>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-maven-plugin</artifactId>
    <configuration>
     <mainClass>org.linuxsogood.sync.Starter</mainClass>
    </configuration>
   </plugin>
  </plugins>
 </build>
</project>

orm层使用了MyBatis,又使用了通用Mapper和分页插件.

kafka消费端配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import org.linuxsogood.sync.listener.Listener;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
  
import java.util.HashMap;
import java.util.Map;
  
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
  
 @Value("${kafka.broker.address}")
 private String brokerAddress;
  
 @Bean
 KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
 ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
 factory.setConsumerFactory(consumerFactory());
 factory.setConcurrency(3);
 factory.getContainerProperties().setPollTimeout(3000);
 return factory;
 }
  
 @Bean
 public ConsumerFactory<String, String> consumerFactory() {
 return new DefaultKafkaConsumerFactory<>(consumerConfigs());
 }
  
 @Bean
 public Map<String, Object> consumerConfigs() {
 Map<String, Object> propsMap = new HashMap<>();
 propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
 propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
 propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
 propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
 propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
 propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
 propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "firehome-group");
 propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 return propsMap;
 }
  
 @Bean
 public Listener listener() {
 return new Listener();
 }
}

生产者的配置.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
  
import java.util.HashMap;
import java.util.Map;
  
@Configuration
@EnableKafka
public class KafkaProducerConfig {
  
 @Value("${kafka.broker.address}")
 private String brokerAddress;
  
 @Bean
 public ProducerFactory<String, String> producerFactory() {
 return new DefaultKafkaProducerFactory<>(producerConfigs());
 }
  
 @Bean
 public Map<String, Object> producerConfigs() {
 Map<String, Object> props = new HashMap<>();
 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
 props.put(ProducerConfig.RETRIES_CONFIG, 0);
 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
 props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
 props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
 return props;
 }
  
 @Bean
 public KafkaTemplate<String, String> kafkaTemplate() {
 return new KafkaTemplate<String, String>(producerFactory());
 }
}

监听,监听里面,写的就是业务逻辑了,从kafka里面得到数据后,具体怎么去处理. 如果需要开启kafka处理消息的广播模式,多个监听要监听不同的group,即方法上的注解@KafkaListener里的group一定要不一样.如果多个监听里的group写的一样,就会造成只有一个监听能处理其中的消息,另外监听就不能处理消息了.也即是kafka的分布式消息处理方式.

在同一个group里的监听,共同处理接收到的消息,会根据一定的算法来处理.如果不在一个组,但是监听的是同一个topic的话,就会形成广播模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import com.alibaba.fastjson.JSON;
import org.linuxsogood.qilian.enums.CupMessageType;
import org.linuxsogood.qilian.kafka.MessageWrapper;
import org.linuxsogood.qilian.model.store.Store;
import org.linuxsogood.sync.mapper.StoreMapper;
import org.linuxsogood.sync.model.StoreExample;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
 
import java.util.List;
import java.util.Optional;
 
public class Listener {
 
 private static final Logger LOGGER = LoggerFactory.getLogger(Listener.class);
 
 @Autowired
 private StoreMapper storeMapper;
 
 /**
  * 监听kafka消息,如果有消息则消费,同步数据到新烽火的库
  * @param record 消息实体bean
  */
 @KafkaListener(topics = "linuxsogood-topic", group = "sync-group")
 public void listen(ConsumerRecord<?, ?> record) {
  Optional<?> kafkaMessage = Optional.ofNullable(record.value());
  if (kafkaMessage.isPresent()) {
   Object message = kafkaMessage.get();
   try {
    MessageWrapper messageWrapper = JSON.parseObject(message.toString(), MessageWrapper.class);
    CupMessageType type = messageWrapper.getType();
    //判断消息的数据类型,不同的数据入不同的表
    if (CupMessageType.STORE == type) {
     proceedStore(messageWrapper);
    }
   } catch (Exception e) {
    LOGGER.error("将接收到的消息保存到数据库时异常, 消息:{}, 异常:{}",message.toString(),e);
   }
  }
 }
 
 /**
  * 消息是店铺类型,店铺消息处理入库
  * @param messageWrapper 从kafka中得到的消息
  */
 private void proceedStore(MessageWrapper messageWrapper) {
  Object data = messageWrapper.getData();
  Store cupStore = JSON.parseObject(data.toString(), Store.class);
  StoreExample storeExample = new StoreExample();
  String storeName = StringUtils.isBlank(cupStore.getStoreOldName()) ? cupStore.getStoreName() : cupStore.getStoreOldName();
  storeExample.createCriteria().andStoreNameEqualTo(storeName);
  List<org.linuxsogood.sync.model.Store> stores = storeMapper.selectByExample(storeExample);
  org.linuxsogood.sync.model.Store convertStore = new org.linuxsogood.sync.model.Store();
  org.linuxsogood.sync.model.Store store = convertStore.convert(cupStore);
  //如果查询不到记录则新增
  if (stores.size() == 0) {
   storeMapper.insert(store);
  } else {
   store.setStoreId(stores.get(0).getStoreId());
   storeMapper.updateByPrimaryKey(store);
  }
 }
 
}

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作能带来一定的帮助,如果有疑问大家可以留言交流,谢谢大家对脚本之家的支持。

蓄力AI

微信公众号搜索 “ 脚本之家 ” ,选择关注

程序猿的那些事、送书等活动等着你

原文链接:http://linuxsogood.org/1572.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若内容造成侵权/违法违规/事实不符,请将相关资料发送至 reterry123@163.com 进行投诉反馈,一经查实,立即处理!

相关文章

  • SpringBoot基于Swagger2构建API文档过程解析

    SpringBoot基于Swagger2构建API文档过程解析

    这篇文章主要介绍了SpringBoot基于Swagger2构建API文档过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-11-11
  • Java自定义协议报文封装 添加Crc32校验的实例

    Java自定义协议报文封装 添加Crc32校验的实例

    下面小编就为大家分享一篇Java自定义协议报文封装 添加Crc32校验的实例,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-01-01
  • Java建造者设计模式详解

    Java建造者设计模式详解

    这篇文章主要为大家详细介绍了Java建造者设计模式,对建造者设计模式进行分析理解,感兴趣的小伙伴们可以参考一下
    2016-02-02
  • Java反转链表测试过程介绍

    Java反转链表测试过程介绍

    这篇文章主要介绍了Java反转链表测试过程,学习过数据结构的小伙伴们,对链表想来是并不陌生。本篇文章将为大家介绍几种在Java语言当中,实现链表反转的几种方法,以下是具体内容
    2023-04-04
  • java显示当前美国洛杉矶时间

    java显示当前美国洛杉矶时间

    这篇文章主要介绍了java显示当前美国洛杉矶时间的方法,也就是当前时间的切换,需要的朋友可以参考下
    2014-02-02
  • Spring中的@ExceptionHandler异常拦截器

    Spring中的@ExceptionHandler异常拦截器

    这篇文章主要介绍了Spring中的@ExceptionHandler异常拦截器,Spring的@ExceptionHandler可以用来统一处理方法抛出的异常,给方法加上@ExceptionHandler注解,这个方法就会处理类中其他方法抛出的异常,需要的朋友可以参考下
    2024-01-01
  • RocketMQ设计之异步刷盘

    RocketMQ设计之异步刷盘

    本文介绍RocketMQ设计之异步刷盘,RocketMQ消息存储到磁盘上,这样既保证断电后恢复,也让存储消息量超出内存限制,RocketMQ为了提高性能,会尽可能保证磁盘顺序写,消息通过Producer写入RocketMQ的时候,有两种方式,上篇介绍了同步刷盘,本文介绍异步刷盘,需要的朋友可以参考下
    2022-03-03
  • mybatis plus新增(insert)数据获取主键id的问题

    mybatis plus新增(insert)数据获取主键id的问题

    这篇文章主要介绍了mybatis plus新增(insert)数据获取主键id的问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-03-03
  • springboot定时任务@Scheduled执行多次的问题

    springboot定时任务@Scheduled执行多次的问题

    这篇文章主要介绍了springboot定时任务@Scheduled执行多次问题的解决,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-10-10
  • 一文了解为什么Java中只有值传递

    一文了解为什么Java中只有值传递

    Java 传参是值传递还是引用传递?这个问题很基础,但是许多人都有点懵。本文就来通过一些示例带大家详细了解一下,需要的可以参考一下
    2022-07-07

最新评论