java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > java项目互传输数据

java通过RESTful API实现两个项目之间相互传输数据

作者:Luffy船长

一些特殊场景中,两个项目发布在不同的服务器,并且由于服务器限制特殊情况ip无法相通时进行开放接口方式进行数据传输,下面我们就来看看java通过RESTful API实现这一需求吧

项目场景

一些特殊场景中,两个项目发布在不同的服务器,并且由于服务器限制特殊情况ip无法相通时进行开放接口方式进行数据传输

问题描述

两个服务器之间ip无法相互访问,数据没法进行数据传输

解决方案

通过RESTful API方式定义开放接口实现数据传输

1.开放接口定义

@RequestMapping(value = "/getss", produces = "application/json;charset=UTF-8")
public void getTestChunked(HttpServletResponse response) {
    log.info("getTest分块传输接口调用");
    long startTime = System.currentTimeMillis();
    OutputStream outputStream = null;
    JsonGenerator generator = null;

    // 调用 GetTime 类的 getFSKTime 方法
    GetTime getTime = new GetTime();
    HashMap<Object, Object> timeData = getTime.getFSKTime();

    try {
        // 获取查询参数
        String formattedDate = (String) timeData.get("formattedDate");
        String formattedEnd = (String) timeData.get("formattedEnd");
        log.info("查询时间范围:{} ~ {}", formattedDate, formattedEnd);

        // 获取所有数据
        DataSourceUtil.setDB("db2");
        List<HbCcsPolicyDataRc> dataList = synchronizationService.getSynchronization(formattedDate, formattedEnd);

        if (dataList == null) {
            dataList = Collections.emptyList();
        }

        log.info("查询完成:共{}条数据,准备分块传输", dataList.size());

        // 设置响应头
        response.setContentType("application/json;charset=UTF-8");
        response.setHeader("Connection", "keep-alive");
        response.setHeader("Transfer-Encoding", "chunked");
        response.setHeader("X-Total-Count", String.valueOf(dataList.size()));
        response.setHeader("Cache-Control", "no-cache");
        response.setHeader("Pragma", "no-cache");

        // 使用Jackson流式API分块写入响应
        ObjectMapper objectMapper = new ObjectMapper();
        outputStream = response.getOutputStream();
        generator = objectMapper.getFactory().createGenerator(outputStream);

        generator.writeStartArray(); // 开始数组

        int chunkSize = 500; // 减小每块的大小,避免缓冲区溢出
        int totalSize = dataList.size();

        for (int i = 0; i < totalSize; i++) {
            // 检查客户端是否仍然连接
            try {
                response.getOutputStream(); // 这将抛出异常如果客户端断开
            } catch (IOException e) {
                log.warn("客户端已断开连接,终止传输");
                break;
            }

            // 写入单个对象
            objectMapper.writeValue(generator, dataList.get(i));

            // 每chunkSize条数据刷新一次缓冲区
            if ((i + 1) % chunkSize == 0) {
                generator.flush();
                log.info("已传输{}条数据,进度: {}%", i + 1, (i + 1) * 100 / totalSize);

                // 添加小延迟,避免 overwhelming 客户端
                try {
                    Thread.sleep(10);
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }

        generator.writeEndArray(); // 结束数组
        generator.flush();

        log.info("分块传输完成:共{}条数据,耗时{}ms",
                dataList.size(), System.currentTimeMillis() - startTime);

    } catch (ClientAbortException e) {
        log.warn("客户端中止了连接: {}", e.getMessage());
    } catch (Exception e) {
        log.error("getTest分块传输接口执行失败", e);
        if (!response.isCommitted()) {
            try {
                response.sendError(HttpStatus.INTERNAL_SERVER_ERROR.value(), "数据获取失败");
            } catch (IOException ex) {
                log.error("发送错误响应失败", ex);
            }
        }
    } finally {
        // 确保资源被正确关闭
        try {
            if (generator != null) {
                generator.close();
            }
        } catch (IOException e) {
            log.warn("关闭JsonGenerator时发生错误", e);
        }
        try {
            if (outputStream != null) {
                outputStream.close();
            }
        } catch (IOException e) {
            log.warn("关闭OutputStream时发生错误", e);
        }

        // 帮助GC回收大对象
        data1 = null;
        System.gc();
    }
}

2.开放接口调用

public void fetchDataFromServer1() {
    log.info("===== 开始执行 fetchDataFromServer1(分块传输模式)=====");
    RestTemplate chunkedRestTemplate = createChunkedRestTemplate();

    String url = server1BaseUrl + "/getReceive/getsss";
    log.info("调用 getTestChunked 分块传输接口,URL: {}", url);

    try {
        // 使用自定义的ResponseExtractor来处理流式响应
        List<HbCcsPolicyDataRc> dataList = chunkedRestTemplate.execute(
                url,
                HttpMethod.GET,
                null,
                new ResponseExtractor<List<HbCcsPolicyDataRc>>() {
                    @Override
                    public List<HbCcsPolicyDataRc> extractData(ClientHttpResponse response) throws IOException {
                        return processStreamingResponse(response, HbCcsPolicyDataRc.class);
                    }
                }
        );

        log.info("===== fetchDataFromServer1 完成,共处理{}条数据 =====", dataList != null ? dataList.size() : 0);
        getCCSCroData();
    } catch (Exception e) {
        log.error("fetchDataFromServer1 整体失败", e);
        throw new RuntimeException("fetchDataFromServer1 失败:" + e.getMessage(), e);
    } finally {
        // 帮助GC回收资源
        System.gc();
    }
}
// 修改流式响应处理方法
private <T> List<T> processStreamingResponse(ClientHttpResponse response, Class<T> valueType) throws IOException {
    List<T> dataList = new ArrayList<>();
    ObjectMapper objectMapper = getConfiguredObjectMapper();
    InputStream inputStream = response.getBody();

    try (JsonParser parser = objectMapper.getFactory().createParser(inputStream)) {
        if (parser.nextToken() != JsonToken.START_ARRAY) {
            throw new IOException("Expected data to start with an Array");
        }

        int count = 0;
        long lastLogTime = System.currentTimeMillis();

        while (parser.nextToken() != JsonToken.END_ARRAY) {
            T record = objectMapper.readValue(parser, valueType);
            dataList.add(record);
            count++;

            // 每1000条或每30秒日志输出一次
            long currentTime = System.currentTimeMillis();
            if (count % 1000 == 0 || currentTime - lastLogTime > 30000) {
                log.info("已解析{}条数据", count);
                lastLogTime = currentTime;
            }

            // 定期批量处理,避免内存占用过高
            if (count % 5000 == 0) {
                processBatchData(dataList, valueType);
                dataList.clear(); // 清空列表,避免内存占用过高
                System.gc(); // 建议垃圾回收
            }
        }

        // 处理最后一批数据
        if (!dataList.isEmpty()) {
            processBatchData(dataList, valueType);
        }
    }

    log.info("共解析{}条数据", dataList.size());
    return dataList;
}

// 批量处理数据的方法
private <T> void processBatchData(List<T> dataList, Class<T> valueType) {
    try {
        DataSourceUtil.setDB("db3");

        if (valueType == HbCcsPolicyDataRc.class) {
            dmService.setHbCcsPolicyDataRc((List<HbCcsPolicyDataRc>) dataList);
        } else if (valueType == HbCcsCrosssellPEMIUM.class) {
            dmService.setHbCcsCrosssellPEMIUM((List<HbCcsCrosssellPEMIUM>) dataList);
        } else if (valueType == HbCcsLpDataRc.class) {
            dmService.HbCcsLpDataRc((List<HbCcsLpDataRc>) dataList);
        }

        log.info("成功处理一批数据,数量: {}", dataList.size());
    } catch (Exception e) {
        log.error("处理批量数据时发生错误", e);
        // 这里可以添加重试逻辑或错误记录
    }
}


// 6. 工具方法:复用 JSON 解析器配置(避免重复设置容错属性)
private ObjectMapper getConfiguredObjectMapper() {
    ObjectMapper objectMapper = new ObjectMapper();
    // 容错配置:忽略未知字段、允许单值数组
    objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    objectMapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, true);
    return objectMapper;
}





// 创建专用的分块传输RestTemplate
private RestTemplate createChunkedRestTemplate() {
    SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();

    // 设置超时时间(单位:毫秒)
    factory.setConnectTimeout(120000); // 2分钟连接超时
    factory.setReadTimeout(3600000);   // 1小时读取超时(大数据传输需要更长时间)

    // 禁用缓冲
    factory.setBufferRequestBody(false);

    RestTemplate restTemplate = new RestTemplate(factory);

    // 配置消息转换器
    List<HttpMessageConverter<?>> converters = new ArrayList<>();

    // 字符串转换器
    StringHttpMessageConverter stringConverter = new StringHttpMessageConverter(StandardCharsets.UTF_8);
    stringConverter.setWriteAcceptCharset(false);
    converters.add(stringConverter);

    // JSON转换器
    MappingJackson2HttpMessageConverter jsonConverter = new MappingJackson2HttpMessageConverter();
    jsonConverter.setSupportedMediaTypes(Collections.singletonList(MediaType.APPLICATION_JSON));
    converters.add(jsonConverter);

    restTemplate.setMessageConverters(converters);

    return restTemplate;
}

3.核心配置

这个配置两个项目的yml都需要加

server1:
  base-url: http://127.0.0.1:8060#根据情况而变
  auth:
    username: admin
    password: admin

server:
  max-http-header-size: 1000000
  servlet:
    multipart:
      max-file-size: 1000MB
      max-request-size: 1000MB
  tomcat:
    max-swallow-size: 1000MB
    max-http-form-post-size: 1000MB
    threads:
      max: 200
      min-spare: 20
    keep-alive-timeout: 300000
@Value("${server1.base-url}")
private String server1BaseUrl;

这段代码加到controller类中这是将yml配置文件的路径注册到controller了

这种适合小数据传输,大数据传输需要配置服务器的Nginx配置需要将接口配置上尽量统一接口前缀不然可能会报错具体就不展示了

到此这篇关于java通过RESTful API实现两个项目之间相互传输数据的文章就介绍到这了,更多相关java项目互传输数据内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文