彩神网

当前位置:彩神网 > 媒体报道 > >> 浏览文章

[精通函数式编程](六) Stream高并发实战

前言:

作者简介:小明java问道之路,专注于研究计算机底层,就职于金融公司后端高级工程师,擅长交易领域的高安全/可用/并发/性能的设计和架构

Java领域优质创作者、阿里云专家博主、华为云享专家

如果此文还不错的话,还请关注、点赞、收藏三连支持一下博主哦

本文导读

我们上讲看Stream接口提供大量API可以方便的处理元素,这讲Stream高并发(并发、并行、多线程)、ForkJoin线程池框架的实战

一、并行流(parallelStream、parallel、sequential)

并行流就是把一个内容拆分成多个数据块来执行,用不同的线程分别处理每个数据块的流

parallelStream、.parallel都可以将流转换成并行流,.parallel的粒度更小。要注意的是顺序流(.stream)调用parallel方法不意味本身有什么实际变化,它内部设置了一个boolean,表示调用parallel之后的操作都是并行的

对并行流调用后可以使用.sequential变成顺序流。

Listcollect10=orderInfos.stream.parallel.map(OrderInfo::getOrderId).collect(toList);BigDecimaltotalSubOrderAmt1=orderInfos.stream.map(OrderInfo::getSubOrderInfoList).flatMap(subOrderInfos->subOrderInfos.stream).filter(subOrderInfo->null!=subOrderInfo.getSubOrderAmt).parallel.map(SubOrderInfo::getSubOrderAmt).sequential.reduce(BigDecimal.ZERO,BigDecimal::add);Listcollect11=orderInfos.parallelStream.map(OrderInfo::getOrderId).collect(toList);

并行流不是一定并行,多线程中保证原子操作会有对象的可变状态,当多个线程共享对象时,共享的可变状态会不断被线程锁住,会影响并行流和并行计算

所以我们是否可以使用并行流需要避免共享变量,而且当较小的数据量的时候并行处理的开销不一定会小于计算开销,同时集合的数据结构也会对并行流有影响,ArrayList拆分的开销,要小于LinkedList(前者底层是数组可以直接在内存拆分,后者底层是链表,内存不连续需要遍历拆分,ArrayList、HashSet、TreeSet可拆分性好,LinkedList可拆分性极差)

二、Fork/Jion框架

并行流的背后的原理是java7里面的分支/合并框架,分支/合并框架的目的就是以递归的方式,将数据块(任务)并行的拆分成更小的数据块,然后将每个子任务合并成整体,ExecutorService接口的实现,ExecutorService把子任务分配给线程池ForkJionPool中的工作线程。

要把任务提交到这个ForkJionPool线程池,必须创建RecursiveTask的实现类,要定义RecursiveTask只需要实现他的抽象方法compute

创建一个ForkJoinTask,并把对象传给我们自定义的ForkJoinCalculator,创建一个ForkJoinPool并把任务传递给他的调用方法invoke,返回值就是ForkJoinCalculator定义的结果

扩展RecursiveTask,穿件ForkJoin框架,创建起始位置和终止位置,实现compute方法,实现fork、join金额累加

publicstaticvoidmain(String[]args){ListorderInfos=Arrays.asList(newOrderInfo("123",BigDecimal.ONE),newOrderInfo("456",BigDecimal.TEN),newOrderInfo("789",BigDecimal.TEN));//创建一个ForkJoinTask,并把对象传给我们自定义的ForkJoinCalculatorForkJoinCalculatororderInfoForkJoinTask=newForkJoinCalculator(orderInfos,0,orderInfos.size);//创建一个ForkJoinPool并把任务传递给他的调用方法,返回值就是ForkJoinCalculator定义的结果System.out.println(newForkJoinPool.invoke(orderInfoForkJoinTask));}/***扩展RecursiveTask,穿件ForkJoin框架,创建起始位置和终止位置*/staticclassForkJoinCalculatorextendsRecursiveTask{ListorderInfos;intstart;intend;BigDecimalamt;publicForkJoinCalculator(ListorderInfos,intstart,intend){this.orderInfos=orderInfos;this.amt=orderInfos.get(start).getOrderAmt;this.start=start;this.end=end;}@OverrideprotectedBigDecimalcompute{intlength=end-start;//如果大小等于阈值,返回结果if(length

Fork/Jion框架还有几个框架需要注意的,对于一个任务调用join方法,会阻塞调用方,直到出任务结果;不应该在RecursiveTask里面实现ForkJoinPool应该使用compute或fork方法;compute实现需要中断方法并且这里面实现比较困难需要多加练习。

工作窃取,在实际工作中任务差不多被平均分配到ForkJoinPool的所有线程,每个线程都为分配给线程的任务保存一个双向LinkedQueue(双向链式队列),每个人物完成就会从队列的头取一个在进行执行,但是某些线程执行可能过快,此时这个线程会从其他线程的队列尾,取走一个任务执行,这也是Fork/Jion框架能有高性能的原因

总结

本文讲解Stream高并发(并发、并行、多线程)、ForkJoin线程池框架的实战

 

随机文章

相关站点

友情链接

彩神网平台,彩神网官网,彩神网网址,彩神网下载,彩神网app,彩神网开户,彩神网投注,彩神网购彩,彩神网注册,彩神网登录,彩神网邀请码,彩神网技巧,彩神网手机版,彩神网靠谱吗,彩神网走势图,彩神网开奖结果

Powered by 彩神网 @2018 RSS地图 HTML地图