java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > CompletableFuture处理List数据

CompletableFuture并行处理List分批数据demo

作者:丰木

这篇文章主要介绍了CompletableFuture并行处理List分批数据实现实例,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

 CompletableFuture并行处理List分批数据

import cn.hutool.core.util.RandomUtil;
import com.google.common.collect.Lists;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.collections.CollectionUtils;
import org.junit.Test;
import org.springframework.util.StopWatch;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
 * @author nieweijun
 * @since 2022/2/24 10:53
 */
public class TestCf {
    private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 100, 3, TimeUnit.SECONDS, new LinkedBlockingDeque<>(512));
    @Test
    public void testTotalAmount() {
        final BigDecimal[] sum = {new BigDecimal(0)};
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        // 1. 假设 sku库存对象的id是0到10000, 先初始化数据id
        List<SkuInventory> skuInventories = IntStream.rangeClosed(1, 10000).boxed()
                .map(i -> SkuInventory.builder().id(String.valueOf(i)).build())
                .collect(Collectors.toList());
        // 2. 查询所有库存, 计算价值总额
        // 模拟每一条数据:批量/每次100个
        List<List<SkuInventory>> partitionsList = Lists.partition(skuInventories, 100);
        // 3. 查算:使用异步
        List<CompletableFuture> cfList = new ArrayList<>();
        CompletableFuture[] cfArray = new CompletableFuture[partitionsList.size()];
        partitionsList.stream().forEach(partition -> {
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> queryBatch(partition), executor);
            cfList.add(future);
        });
        CompletableFuture.allOf(cfList.toArray(cfArray)).join(); // 全执行完
        // 4.统计
        skuInventories.stream().forEach(e -> {
            BigDecimal multiply = BigDecimal.valueOf(e.getCount()).multiply(e.getPrice());
            sum[0] = sum[0].add(multiply);
        });
        stopWatch.stop();
        System.out.println("结果是: " + sum[0]);
        System.out.println("耗时毫秒数为: " + stopWatch.getTotalTimeMillis());
    }
    // 模拟查询数据; 设置数据值
    private void queryBatch(List<SkuInventory> partList) {
        if (CollectionUtils.isEmpty(partList)) {
            return;
        }
        long millisCost = RandomUtil.randomInt(100, 1000);
        // 模拟查询耗时
        try {
            TimeUnit.MILLISECONDS.sleep(millisCost);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 模拟赋值: 每个数量为10, 价格1.0
        partList.stream().forEach(e -> {
            e.setCount(10);
            e.setPrice(new BigDecimal("1.0"));
        });
        System.out.println("==========一批partList处理完成耗时[" + millisCost + "ms]==========");
    }
    // sku库存对象
    @Builder
    @NoArgsConstructor
    @AllArgsConstructor
    @Data
    static class SkuInventory {
        /** id */
        private String id;
        /** 库存数量 */
        private Integer count;
        /** 单价 */
        private BigDecimal price;
    }
}

以上就是CompletableFuture并行处理List分批数据的详细demo,更多关于CompletableFuture处理List数据的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文