java实现异步线程,回调接口方式
作者:墨笔之风
这篇文章主要介绍了java实现异步线程,回调接口方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
最近在业余时间呢,无意间发现一个问题,使用异步线程推送回调数据
这里小编使用了两个IDEA程序分别模拟接收方和发送方
发送方
package com.slg.util; import com.alibaba.fastjson.JSONObject; import com.google.gson.Gson; import com.slg.entity.dto.SettlementMergerResp; import lombok.extern.slf4j.Slf4j; import org.apache.poi.ss.formula.functions.T; import org.springframework.beans.factory.annotation.Autowired; import java.io.OutputStream; import java.net.HttpURLConnection; import java.net.URL; import java.util.Objects; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; /** * @author Administrator * 异步处理数据推送 */ @Slf4j public class CallbackUtil { // 推送地址(使用另一个IDEA模拟(http://localhost:8848/callback)这个客户端) private static final String INTERFACE_CALLBACK = "http://localhost:8848/callback"; // private static final ExecutorService executorService = Executors.newFixedThreadPool(10); private static final AtomicInteger SUBMITTED_TASKS = new AtomicInteger(0); private static final ExecutorService executorService = CustomThreadPoolUtil.getExecutorService(); /** * @param object * @Description 构建异步线程,回调接口 * @Date 2024/5/29 * 等待线程池关闭完成后再提交任务 shutdown * 如果你想立即关闭线程池 hutdownNow() * 确保不会在关闭线程池后提交任务 isShutdown **/ public static void getCallBackThread(Object object) { log.info("回调接口=======================>正在进行数据推送:{}", object); if (!executorService.isShutdown()) { log.info("准备推送数据: {}", object); SUBMITTED_TASKS.incrementAndGet(); executorService.submit(() -> { try { CallbackUtil.sendCallback(object); log.info("数据成功推送给用户!"); } catch (Exception e) { log.error("推送数据时出现异常: {}", e.getMessage()); } finally { SUBMITTED_TASKS.decrementAndGet(); if (SUBMITTED_TASKS.get() == 0) { log.info("关闭线程池"); executorService.shutdown(); log.info("已关闭线程池"); } } }); SUBMITTED_TASKS.incrementAndGet(); } log.info("回调接口=======================>正在进行数据推送完毕:{}", object); } /** * @param object (测试对象=>可自行模拟) * @Description 数据推送地址 * @Date 2024/5/29 **/ public static void sendCallback(Object object) throws Exception { URL url = new URL(INTERFACE_CALLBACK); HttpURLConnection connection = (HttpURLConnection) url.openConnection(); connection.setRequestMethod("POST"); connection.setRequestProperty("Content-Type", "application/json"); connection.setDoOutput(true); String jsonPayload = convertToJson(object); try (OutputStream outputStream = connection.getOutputStream()) { outputStream.write(jsonPayload.getBytes()); outputStream.flush(); } int responseCode = connection.getResponseCode(); if (responseCode == HttpURLConnection.HTTP_OK) { // TODO 请求成功,可以根据需要进行进一步处理 } else { // TODO 请求失败,可以根据需要进行错误处理 } connection.disconnect(); } private static String convertToJson(Object object) { return new Gson().toJson(object); } }
接收方
package com.example.demo.controller; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; import com.sun.net.httpserver.HttpServer; import lombok.Data; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.InetSocketAddress; import java.util.concurrent.*; /** * @author Administrator * 手动创建线程池 */ @Data public class CustomThreadPoolUtil { public static void main(String[] args) throws IOException { int port = 8848; HttpServer server = HttpServer.create(new InetSocketAddress(port), 0); server.createContext("/callback", (HttpHandler) new CallbackHandler()); server.setExecutor(null); server.start(); System.out.println("服务器已启动,监听端口:" + port); } static class CallbackHandler implements HttpHandler { @Override public void handle(HttpExchange exchange) throws IOException { String requestMethod = exchange.getRequestMethod(); if (requestMethod.equalsIgnoreCase("POST")) { InputStream requestBody = exchange.getRequestBody(); byte[] buffer = new byte[requestBody.available()]; requestBody.read(buffer); String requestData = new String(buffer); System.out.println("接收到的数据:" + requestData); String response = "数据已接收"; exchange.sendResponseHeaders(200, response.getBytes().length); OutputStream outputStream = exchange.getResponseBody(); outputStream.write(response.getBytes()); outputStream.close(); } else { exchange.sendResponseHeaders(405, -1); } } } }
自定义线程池
package com.slg.util; import lombok.Data; import java.util.concurrent.*; /** * @author Administrator * 手动创建线程池 */ @Data public class CustomThreadPoolUtil { // 线程池大小 private static final int CORE_POOL_SIZE = 10; private static final int MAXIMUM_POOL_SIZE = 20; private static final long KEEP_ALIVE_TIME = 60L; private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS; private static final BlockingQueue<Runnable> WORK_QUEUE = new LinkedBlockingQueue<>(); // 自定义线程工厂 private static final ThreadFactory THREAD_FACTORY = new ThreadFactory() { private int count = 0; @Override public Thread newThread(Runnable r) { return new Thread(r, "CustomThreadPool-" + count++); } }; // 创建线程池 private static final ExecutorService executorService = new ThreadPoolExecutor( CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME, TIME_UNIT, WORK_QUEUE, THREAD_FACTORY ); public static ExecutorService getExecutorService() { return executorService; } }
测试效果:
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。