定义

什么是Stream流,Java doc中是这样写的

A sequence of elements supporting sequential and parallel aggregate operations

翻译一下就是一个支持顺序和并行聚合操作的元素序列。
可以把它理解成一个迭代器,但是只能遍历一次,就像是流水一样,要处理的元素在流中传输,并且可以在流中设置多个处理节点,元素在经过每个节点后会被节点的逻辑所处理。比如可以进行过滤、排序、转换等操作。

Stream流的使用可以分为三个步骤:

  • 数据源,创建流
  • 中间操作,可以有多个,生成一个新的流
  • 终端操作,只能有一个,放在最后,代表流中止。

Stream流有几个特点:
1、Stream流一般不会改变数据源,只会生成一个新的数据流。
2、Stream流不会存储数据,只会根据设置的操作节点处理数据。
3、Stream流是延迟执行的,只有在调用终端操作后才会进行流转。

看一下Stream的结构

使用

数据源生成流

  • 如果是集合的话,可以直接使用stream()创建流。
  • 如果是数组的话,可以使用Arrays.stream()Stream.of()来创建流。
// 集合生成流
List<String> strList = new ArrayList<>();
Stream<String> stream = strList.stream();//数据生成流
String[] strs = new String[]{"1","2","3"};
Stream<String> stream1 = Arrays.stream(strs);
Stream<String> stream2 = Stream.of(strs);

中间操作

在上边Stream定义中,返回是Stream类型的大多数都是中间操作,入参大多数都是函数式编程,不熟悉的可以看看这篇Java函数式编程。常用的中间操作有

  • 过滤操作 filter()
Arrays.stream(strs).filter(s -> s.equals("1"));
  • 排序操作 sorted()
Arrays.stream(strs).sorted();
  • 去重操作 distinct()
Arrays.stream(strs).distinct();
  • 映射操作,将流中元素转换成新的元素

    • mapToInt()转换成Integer类型
    • mapToLong()转换成Long类型
    • mapToDouble()转换成Double类型
    • map() 自定义转换类型,这是一个使用频率非常高的方法。
//将字符串转换成Integer
Arrays.stream(strs).mapToInt(s -> Integer.valueOf(s));
//将字符串转换成Long
Arrays.stream(strs).mapToLong(s -> Long.valueOf(s));
//将字符串转换成Doublde
Arrays.stream(strs).mapToDouble(s -> Double.valueOf(s));
//自定义转换的类型
Arrays.stream(strs).map(s -> new BigDecimal(s));

中间操作是可以有多个的,我们可以根据业务功能组合多个中间操作,比如求数组中字符串包含s的字符串长度排序

Arrays.stream(strs).filter(e->e.contains("s")).map(String::length).sorted();

终端操作

终端操作,表示结束流操作,是在流的最后,常用的有

  • 统计 count()
long count = Arrays.stream(strs).count();
// count=3
  • 获取最小值 min()
// 将字符串转换成Interger类型再比较大小OptionalInt min = Arrays.stream(strs).mapToInt(Integer::valueOf).min();System.out.println(min.getAsInt());// 1
  • 获取最大值 max()
 OptionalInt max = Arrays.stream(strs).mapToInt(Integer::valueOf).max();System.out.println(max.getAsInt());// 3
  • 匹配

    • anyMatch(),只要有一个匹配就返回true
    • allMatch(),只有全部匹配才返回true
    • noneMatch(),只要有一个匹配就返回 false
boolean all = Arrays.stream(strs).allMatch(s -> s.equals("2"));
boolean any = Arrays.stream(strs).anyMatch(s -> s.equals("2"));
boolean none = Arrays.stream(strs).noneMatch(s -> s.equals("2"));
// all = false
// any = true
// none = false
  • 组合 reduce()将Stream 中的元素组合起来,有两种用法

    • Optional reduce(BinaryOperator accumulator) 没有起始值只有运算规则
    • T reduce(T identity, BinaryOperator accumulator),有运算起始值和运算规则、返回的是和起始值一样的类型
Integer[] integers = new Integer[]{1,2,3};
Optional<Integer> reduce1 = Arrays.stream(integers).reduce((i1, i2) -> i1 + i2);
Integer reduce2 = Arrays.stream(integers).reduce(100, (i1, i2) -> i1 + i2);
// reduce1.get() = 6
// reduce2 = 106
  • 转换 collect(),转换作用是将流再转换成集合或数组,这也是一个使用频率非常高的方法。
    collect()一般配合Collectors使用,Collectors 是一个收集器的工具类,内置了一系列收集器实现,比如toList() 转换成list集合,toMap()转换成Map,toSet()转换成Set集合,joining() 将元素收集到一个可以用分隔符指定的字符串中。
String[] strs = new String[]{"11111", "222", "3"};
//统计每个字符串的长度
List<Integer> lengths = Arrays.stream(strs).map(String::length).collect(Collectors.toList());
String s = Arrays.stream(strs).collect(Collectors.joining(","));
// lengths=[5,3,1]
// s = 11111,222,3

合理的组合Steam操作,可以很大的提升生产力

原理


Stream的实现类中,将Stream划分成了HeadStatelessOpStatefulOpHead控制数据流入,中间操作分为了StatelessOpStatefulOp

StatelessOp代表无状态操作:每个数据的处理是独立的,不会影响或依赖之前的数据。像filter()map()等。

StatefulOp代表有状态操作::处理时会记录状态,比如后面元素的处理会依赖前面记录的状态,或者拿到所有元素才能继续下去等这样有状态的操作,像sorted()

现在已下面代码为例,分析一下Stream的原理

 list.stream().filter(e -> e.length() > 1).sorted().filter(e -> e.equals("333")).collect(Collectors.toList());

数据源生成流

首先,进入到list.stream()

//Collection#streamdefault Stream<E> stream() {return StreamSupport.stream(spliterator(), false);}default Spliterator<E> spliterator() {return Spliterators.spliterator(this, 0);}
//StreamSupport#stream
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {Objects.requireNonNull(spliterator);return new ReferencePipeline.Head<>(spliterator,StreamOpFlag.fromCharacteristics(spliterator),parallel);
}

将原数据封装成Spliterator,同时生成一个Head,将Spliterator放到Head中。

中间操作

接着分析中间操作.filter(e -> e.length() > 1)的代码

//ReferencePipeline#filter
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {Objects.requireNonNull(predicate);return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,StreamOpFlag.NOT_SIZED) {@OverrideSink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {@Overridepublic void begin(long size) {downstream.begin(-1);}@Overridepublic void accept(P_OUT u) {if (predicate.test(u))downstream.accept(u);}};}};
}

返回的是一个无状态操作StatelessOp,查看StatelessOp的构造函数

// AbstractPipeline#AbstractPipelineAbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {if (previousStage.linkedOrConsumed)throw new IllegalStateException(MSG_STREAM_LINKED);previousStage.linkedOrConsumed = true;previousStage.nextStage = this;this.previousStage = previousStage;this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);this.sourceStage = previousStage.sourceStage;if (opIsStateful())sourceStage.sourceAnyStateful = true;this.depth = previousStage.depth + 1;}

构造函数中有previousStage.nextStage = this;this.previousStage = previousStage;,相当于将当前的StatelessOp操作拼接到Head后面,构成了一条双向链表。

再看后面的.sorted().filter(e -> e.equals("333")).limit(10),也会将操作添加到了双向链表后面。.sorted()在链表后面添加的是StatefulOp有状态操作。

终端操作

最后走到终端操作.collect(Collectors.toList())。进入到collect()

//ReferencePipeline#collect
public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {A container;if (isParallel()&& (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))&& (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {container = collector.supplier().get();BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();forEach(u -> accumulator.accept(container, u));}else {container = evaluate(ReduceOps.makeRef(collector));}return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)? (R) container: collector.finisher().apply(container);
}

并发操作先不看,直接看container = evaluate(ReduceOps.makeRef(collector));ReduceOps.makeRef()返回是TerminalOp,代表的是终端操作。

evaluate()

//AbstractPipeline#evaluate
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {assert getOutputShape() == terminalOp.inputShape();if (linkedOrConsumed)throw new IllegalStateException(MSG_STREAM_LINKED);linkedOrConsumed = true;return isParallel()? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())): terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));}

先不管并行,进串行入evaluateSequential()

//ReduceOps#evaluateSequential
public <P_IN> R evaluateSequential(PipelineHelper<T> helper,Spliterator<P_IN> spliterator) {return helper.wrapAndCopyInto(makeSink(), spliterator).get();
}

makeSink()将返回一个Sink实例,并作为参数和 spliterator 一起传入最后一个节点(terminalOp)的 wrapAndCopyInto() 方法

//AbstractPipeline#wrapAndCopyInto
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);return sink;
}final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {Objects.requireNonNull(sink);for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {sink = p.opWrapSink(p.previousStage.combinedFlags, sink);}return (Sink<P_IN>) sink;
}

wrapSink()将最后一个节点创建的 Sink 传入,并且看到里面有个 for 循环。这个 for 循环是从最后一个节点开始,到第二个节点结束。每一次循环都是将上一节点的 combinedFlags 和当前的 Sink 包起来生成一个新的 Sink 。这和前面拼接各个操作很类似,只不过拼接的是 Sink 的实现类的实例,方向相反。

到现在整个流水已经拼接完成。真正的数据处理在copyInto()中。

//AbstractPipeline#copyInto
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {Objects.requireNonNull(wrappedSink);if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {wrappedSink.begin(spliterator.getExactSizeIfKnown());spliterator.forEachRemaining(wrappedSink);wrappedSink.end();}else {copyIntoWithCancel(wrappedSink, spliterator);}
}

Sink中有三个方法:

  • begin:节点开始准备
  • accept: 节点处理数据
  • end: 节点处理结束

Sink与操作是相关的,不同的Sink有不同的职责,无状态操作的 Sink 接收到通知或者数据,处理完了会马上通知自己的下游。有状态操作的 Sink 则像有一个缓冲区一样,它会等要处理的数据处理完了才开始通知下游,并将自己处理的结果传递给下游。

比如filter这种无状态的操作,处理完数据会直接交给下游,而像sorted这种无有状态的操作在begin阶段会先创建一个容器,accept会将流转过来的数据保存起来,最后在执行 end方法时才正在开始排序。排序之后再将数据,采用同样的方式依次传递给下游节点。

wrapAndCopyInto() 返回了 TerminalOps 创建的 Sink,这时候它里面已经包含了最终处理的结果。调用它的 get() 方法就获得了最终的结果。

Steam还可以支持并行流,把list.stream()换成list.parallelStream()即可使用并行操作。

并行过程中,构建操作链的双向链表是不变的,区别实在构建完后的操作

//AbstractPipeline#evaluate
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {assert getOutputShape() == terminalOp.inputShape();if (linkedOrConsumed)throw new IllegalStateException(MSG_STREAM_LINKED);linkedOrConsumed = true;return isParallel()? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())): terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));}

这次进入到 evaluateParallel()

//ReduceOps#evaluateSequential
public <P_IN> R evaluateParallel(PipelineHelper<T> helper,Spliterator<P_IN> spliterator) {return new ReduceTask<>(this, helper, spliterator).invoke().get();
}

ReduceTask继承自ForkJoinTaskSteam的并行底层用的是ForkJoin框架。

Java8中的Stream流相关推荐

  1. 【Java8新特性】面试官问我:Java8中创建Stream流有哪几种方式?

    写在前面 先说点题外话:不少读者工作几年后,仍然在使用Java7之前版本的方法,对于Java8版本的新特性,甚至是Java7的新特性几乎没有接触过.真心想对这些读者说:你真的需要了解下Java8甚至以 ...

  2. java8中的Stream流式操作总结,List转Map或List转LinkedHashMap使用Collectors.groupingBy用法

    前言背景描述: 开发使用本来是直接使用数据库的依据SQL进行group By获取到数据表的分组的一个字段的字符串,可是后来字符串越来越多越长,导致的最后的后面长度超多1024个汉字就会被截取,所以需要 ...

  3. Java8中的Stream

    Java8 Stream是一个非常好用的工具,结合Lambda表达式,可以非常方便的来操作各种集合. 文章目录 Stream知识图谱 Stream概述 Stream的创建 Stream的使用 遍历/匹 ...

  4. Java8新特性Stream流详解

    陈老老老板 说明:新的专栏,本专栏专门讲Java8新特性,把平时遇到的问题与Java8的写法进行总结,需要注意的地方都标红了,一起加油. 本文是介绍Java8新特性Stream流常用方法超详细教学 说 ...

  5. java8中的Stream用法详解

    项目github地址:bitcarmanlee easy-algorithm-interview-and-practice 欢迎大家star,留言,一起学习进步 1.为什么java8中加入Stream ...

  6. Java中的Stream流以及收集操作

    一.Stream流的生成方式 1.Stream流的使用: <1>:生成流:通过数据源(集合.数组等)生成流 <2>:中间操作:一个流后面可以跟随零个或多个中间操作,其目的主要是 ...

  7. 利用Java8新特性stream流给集合中的某个属性赋值

    今天在编写一个返回对象VO时,需要做一些处理,返回对象VO如下: CollectListVO @Data @JsonIgnoreProperties(ignoreUnknown = true) @Ap ...

  8. 巧用Java8中的Stream,让集合操作6到飞起!!!

    简介 java8也出来好久了,接口默认方法,lambda表达式,函数式接口,Date API等特性还是有必要去了解一下.比如在项目中经常用到集合,遍历集合可以试下lambda表达式,经常还要对集合进行 ...

  9. java8 sum_Java8的Stream流真香,没体验过的永远不会知道!

    虽然现在Oacle官方发布的最新JDK版本已经到了JDK14.但我相信很多团队的生产系统上还是JDK8,甚至有的团队还是JDK7或者JDK6.即便很多团队已经将生产环境升级为JDK8,但是代码却还是老 ...

最新文章

  1. CCAH-CCA-500-4题:Where are Hadoop task log files stored?
  2. python拼写检查_拼写检查 - Python文本处理教程™
  3. 怎么理解python的__init___理解Python中super()和__init__()方法
  4. 开发管理 (3) -项目启动会议
  5. 使用lodash防抖_什么,lodash 的防抖失效了?
  6. 利用Azure Functions和k8s构建Serverless计算平台
  7. C++17新特性学习笔记
  8. 基于小程序·云开发构建高考查分小程序丨实战
  9. Math 对象的扩展
  10. 生成特定分布随机数的方法
  11. menu什么意思中文意思_pipeline什么意思
  12. SQLyog客户端常用快捷键
  13. 红米k30pro工程测试代码_红米K30 PRO代号曝光,确定推出双版本,更强拍照对标荣耀30...
  14. s1200 博图高速脉冲计数值没有变化_如何实现SIMATIC S7-1200的高速计数器(HSC)软件门控制?...
  15. MicroSip客户端编译、运行
  16. Springboot 项目打包 Compilation failure: Compilation failure:
  17. 极大似然估计和交叉熵
  18. 【游戏】——微信打飞机
  19. 【警告】扣扣热键你了解多少
  20. Knowledge Tracing: A Survey阅读笔记

热门文章

  1. GHOST WIN7 X86 OEM 万能预装版 V3.0
  2. Ubuntu 22.04 随便一玩
  3. 输入一个性别、身高和体重值,输出其形体状态(太轻、标准、太重)
  4. Freemaker_入门+深入+开发指南+学习笔记
  5. 以太坊智能合约语言Solidity - 1 走进Solidity
  6. 内网映射代理方案(内网穿透)
  7. 创新电影院垂直社交新功能体验,电影分区放映弥补线下不足
  8. MySQL could not be resolved: Temporary failure in name resolution报错解决方法
  9. ROS机器人开发实践——ROS自定义消息
  10. window家庭版安装沙盒后 卸载沙盒