CompletableFuture并行处理List分批数据demo
作者:丰木
这篇文章主要介绍了CompletableFuture并行处理List分批数据实现实例,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
CompletableFuture并行处理List分批数据
import cn.hutool.core.util.RandomUtil; import com.google.common.collect.Lists; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.commons.collections.CollectionUtils; import org.junit.Test; import org.springframework.util.StopWatch; import java.math.BigDecimal; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; /** * @author nieweijun * @since 2022/2/24 10:53 */ public class TestCf { private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 100, 3, TimeUnit.SECONDS, new LinkedBlockingDeque<>(512)); @Test public void testTotalAmount() { final BigDecimal[] sum = {new BigDecimal(0)}; StopWatch stopWatch = new StopWatch(); stopWatch.start(); // 1. 假设 sku库存对象的id是0到10000, 先初始化数据id List<SkuInventory> skuInventories = IntStream.rangeClosed(1, 10000).boxed() .map(i -> SkuInventory.builder().id(String.valueOf(i)).build()) .collect(Collectors.toList()); // 2. 查询所有库存, 计算价值总额 // 模拟每一条数据:批量/每次100个 List<List<SkuInventory>> partitionsList = Lists.partition(skuInventories, 100); // 3. 查算:使用异步 List<CompletableFuture> cfList = new ArrayList<>(); CompletableFuture[] cfArray = new CompletableFuture[partitionsList.size()]; partitionsList.stream().forEach(partition -> { CompletableFuture<Void> future = CompletableFuture.runAsync(() -> queryBatch(partition), executor); cfList.add(future); }); CompletableFuture.allOf(cfList.toArray(cfArray)).join(); // 全执行完 // 4.统计 skuInventories.stream().forEach(e -> { BigDecimal multiply = BigDecimal.valueOf(e.getCount()).multiply(e.getPrice()); sum[0] = sum[0].add(multiply); }); stopWatch.stop(); System.out.println("结果是: " + sum[0]); System.out.println("耗时毫秒数为: " + stopWatch.getTotalTimeMillis()); } // 模拟查询数据; 设置数据值 private void queryBatch(List<SkuInventory> partList) { if (CollectionUtils.isEmpty(partList)) { return; } long millisCost = RandomUtil.randomInt(100, 1000); // 模拟查询耗时 try { TimeUnit.MILLISECONDS.sleep(millisCost); } catch (InterruptedException e) { e.printStackTrace(); } // 模拟赋值: 每个数量为10, 价格1.0 partList.stream().forEach(e -> { e.setCount(10); e.setPrice(new BigDecimal("1.0")); }); System.out.println("==========一批partList处理完成耗时[" + millisCost + "ms]=========="); } // sku库存对象 @Builder @NoArgsConstructor @AllArgsConstructor @Data static class SkuInventory { /** id */ private String id; /** 库存数量 */ private Integer count; /** 单价 */ private BigDecimal price; } }
以上就是CompletableFuture并行处理List分批数据的详细demo,更多关于CompletableFuture处理List数据的资料请关注脚本之家其它相关文章!