个人博客:https://blog.N0tExpectErr0r.cn

小专栏:https://xiaozhuanlan.com/N0tExpectErr0r

从前面的 OkHttp 源码解析中我们可以知道,OkHttp 中的 I/O 都不是通过我们平时所使用的 IOStream 来实现,而是使用了 Okio 这个第三方库,那它与寻常的 IOStream 有什么区别呢?让我们来分析一下它的源码。

Okio 中有两个非常重要的接口——Sink 以及 Source,它们都继承了 Closeable,其中 Sink 对应了我们原来所使用的 OutputStream,而 Source 则对应了我们原来所使用的 InputStream

Okio 的入口就是Okio 类,它是一个工厂类,可以通过它内部的一些 static 方法来创建 SinkSource 等对象。

Sink

Sink 实际上只是一个接口,让我们看看 Sink 中有哪些方法:

public interface Sink extends Closeable, Flushable {void write(Buffer source, long byteCount) throws IOException;@Override void flush() throws IOException;Timeout timeout();@Override void close() throws IOException;
}

可以看到,它主要包含了 writeflushtimeoutclose 这几个方法,我们可以通过 Okio.sink 方法基于 OutputStream 获取一个 Sink

private static Sink sink(final OutputStream out, final Timeout timeout) {if (out == null) throw new IllegalArgumentException("out == null");if (timeout == null) throw new IllegalArgumentException("timeout == null");return new Sink() {@Override public void write(Buffer source, long byteCount) throws IOException {checkOffsetAndCount(source.size, 0, byteCount);while (byteCount > 0) {timeout.throwIfReached();Segment head = source.head;int toCopy = (int) Math.min(byteCount, head.limit - head.pos);out.write(head.data, head.pos, toCopy);head.pos += toCopy;byteCount -= toCopy;source.size -= toCopy;if (head.pos == head.limit) {source.head = head.pop();SegmentPool.recycle(head);}}}@Override public void flush() throws IOException {out.flush();}@Override public void close() throws IOException {out.close();}@Override public Timeout timeout() {return timeout;}@Override public String toString() {return "sink(" + out + ")";}};
}

这里构建并实现了一个 Sink 的匿名内部类并返回,主要实现了它的 write 方法,剩余方法都是简单地转调到 OutputStream 的对应方法。

write 方法中,首先进行了一些状态检验,这里貌似在 Timeout 类中实现了对超时的处理,我们稍后再分析。之后从 Buffer 中获取了一个 Segment,并从中取出数据,计算出写入的量后将其写入 Sink 所对应的 OutputStream

Segment 采用了一种类似链表的形式进行连接,看来 Buffer 中维护了一个 Segment 链表,代表了数据的其中一段。这里将 Buffer 中的数据分段取出并写入了 OutputStream 中。

最后,通过 SegmentPool.recycle 方法对当前 Segment 进行回收。

从上面的代码中我们可以获取到如下信息:

  1. Buffer 其实就是内存中的一段数据的抽象,其中通过 Segment 以链表的形式保存用于存储数据。
  2. Segment 存储数据采用了分段的存储方式,因此获取数据时需要分段从 Segment 中获取数据。
  3. 有一个 SegmentPool 池用于实现 Segment 的复用。
  4. Segment 的使用有点类似链表。

Source

SourceSink 一样,也仅仅是一个接口:

public interface Source extends Closeable {long read(Buffer sink, long byteCount) throws IOException;Timeout timeout();@Override void close() throws IOException;
}

Okio 中可以通过 source 方法根据 InputStream 创建一个 Source

private static Source source(final InputStream in, final Timeout timeout) {if (in == null) {throw new IllegalArgumentException("in == null");} else if (timeout == null) {throw new IllegalArgumentException("timeout == null");} else {return new Source() {public long read(Buffer sink, long byteCount) throws IOException {if (byteCount < 0L) {throw new IllegalArgumentException("byteCount < 0: " + byteCount);} else if (byteCount == 0L) {return 0L;} else {try {timeout.throwIfReached();Segment tail = sink.writableSegment(1);int maxToCopy = (int)Math.min(byteCount, (long)(8192 - tail.limit));int bytesRead = in.read(tail.data, tail.limit, maxToCopy);if (bytesRead == -1) {return -1L;} else {tail.limit += bytesRead;sink.size += (long)bytesRead;return (long)bytesRead;}} catch (AssertionError var7) {if (Okio.isAndroidGetsocknameError(var7)) {throw new IOException(var7);} else {throw var7;}}}}public void close() throws IOException {in.close();}public Timeout timeout() {return timeout;}public String toString() {return "source(" + in + ")";}};}
}

这里构建并实现了 Source 的一个匿名内部类并返回,应该就是 Source 的默认实现了。

它除了 read 方法其他都只是简单地调用了 InputStream 的对应方法,我们重点看 read 方法:

首先它进行了一些相关的状态检测,之后通过 sink.writeableSegment 获取到了一个可以写入的 Segment。之后从 InputStream 中读取数据向 Segment 中写入,读取的大小被限制为了 8192 个字节。

Buffer

BufferSinkSource 中都担任了一个十分重要的地位,它对应了我们内存中存储的数据,对这些数据进行了抽象。下面让我们对 Buffer 进行分析:

Buffer 虽然是我们内存中数据的抽象,但数据实际上并不是存储在 Buffer 中的,它在内部维护了一个 Segment 的循环链表,Segment 才是真正存储数据的地方。它通过 Segment 将数据分成了几段,通过链表进行连接。在 Buffer 内部封装了许多 I/O 操作,都是在对 Segment 中的数据进行处理。

为什么要使用 Segment 对数据进行分段存储而不直接存储整个数据呢?由于数据是分段存放的,这些段中的某一部分可能与另一个 Buffer 中的数据恰好是相同的,此时就体现出了 Segment 的灵活性,我们不需要将数据拷贝到另一个 Buffer 中,只需要将其 Segment 指向这个重复段的 Segment 即可。同时,对于一些如将数据从 Source 转移到 Sink 中这种情况,也不需要进行拷贝,只需要将链表指向我们的 Segment 即可,极大地提高了效率,同时节省了内存空间。

Segment

我们首先看一下存储数据的 Segment,它代表了数据中的一段,是一个双向的循环链表,主要有以下的参数:

final class Segment {// Segment 存储数据的大小static final int SIZE = 8192;// 进行数据共享的最小字节数static final int SHARE_MINIMUM = 1024;// 存储数据的字节数组final byte[] data;// 用户读取数据的下一个起始位置int pos;// 可以被写入的下一个起始位置int limit;// 数据是否已被共享boolean shared;// 该字节数组是否属于该Segmentboolean owner;// 链表指针Segment next;// 链表指针Segment prev;//...
}

可以看到,其中 pos 代表了下一次读取的起始位置,而 limit 代表了下一次写入的起始位置,我们可以根据它们两个值将整个 Segment 的空间分为如图的三段:

其中已读区域的数据我们以后都不会再用到,已写入区域的数据正在等待读取,而空闲区域还没有填入数据,可以进行写入。

共享机制

同时,Segment 还支持了对数据的共享,通过 sharedowner 字段分别表明了数据是否已被共享以及其是否属于当前 Segment。同时它提供了两种拷贝方式: sharedCopy 以及 unsharedCopy

unsharedCopy 返回了一个新的 Segment,并将 data 数组通过 clone 方法拷贝到了新 Segment 中:

/** Returns a new segment that its own private copy of the underlying byte array. */
final Segment unsharedCopy() {return new Segment(data.clone(), pos, limit, false, true);
}

sharedCopy 同样返回了一个新的 Segment,但其 data 数组是与新 Segment 进行共享的:

/*** Returns a new segment that shares the underlying byte array with this. Adjusting pos and limit* are safe but writes are forbidden. This also marks the current segment as shared, which* prevents it from being pooled.*/
final Segment sharedCopy() {shared = true;return new Segment(data, pos, limit, true, false);
}

同时通过注释我们可以看到,当数据共享后,为了保证安全性,禁止了写入操作。同时将被拷贝的 Segment 也标记为了 shared,从而防止其被回收。

这样的设计同样是为了减少拷贝,从而提高 I/O 的效率。

合并与分割

Segment 还支持了与前一个 Segment 的合并以及对自身的分割操作,从而使得使用者能够更灵活地操作。

合并操作会在当前 Segment 与它的前一个节点都没有超过其大小的一半时,将二者的数据进行合并,并将当前 Segment 进行回收,从而增大内存的利用效率:

/*** Call this when the tail and its predecessor may both be less than half* full. This will copy data so that segments can be recycled.*/
public final void compact() {if (prev == this) throw new IllegalStateException();// 上一个节点的数据不是可以写入的(是共享数据),取消合并if (!prev.owner) return;// 计算当前节点与前一个节点的剩余空间int byteCount = limit - pos;int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);// 没有足够的写入空间时,不进行合并åif (byteCount > availableByteCount) return;// 进行合并,将当前节点数据写入前一节点writeTo(prev, byteCount);// 从链表中删除当前节点,并进行回收pop();SegmentPool.recycle(this);
}

而分割操作则会将 Segment 中的数据分割为 [pos, pos+byteCount)[pos+byteCount, limit) 的两段:

public final Segment split(int byteCount) {if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();Segment prefix;// We have two competing performance goals://  - Avoid copying data. We accomplish this by sharing segments.//  - Avoid short shared segments. These are bad for performance because they are readonly and//    may lead to long chains of short segments.// To balance these goals we only share segments when the copy will be large.if (byteCount >= SHARE_MINIMUM) {// 如果拷贝量大于1024字节,通过共享的形式prefix = sharedCopy();} else {// 拷贝量低于1024字节,通过arrayCopy进行拷贝prefix = SegmentPool.take();System.arraycopy(data, pos, prefix.data, 0, byteCount);}// 对limit及pos进行修改prefix.limit = prefix.pos + byteCount;pos += byteCount;prev.push(prefix);return prefix;
}

这里首先对不同数据段的数据进行了处理,如果数据段大于了 1024 字节,则将数据通过共享交给了分割的前一个节点,两端 Segment 公用同一个 data 数组,否则通过拷贝的形式构建一个新的 Segment

为什么这里需要对数据大小的不同采用不同的处理方式呢?我们可以看到上面的注释,里面给出了答案:首先,为了避免拷贝数据带来的性能开销,加入了共享 Segment 的功能。但是由于共享的数据是只读的,如果有很多很短的数据段的话,使用的表现并不会很好,因此只有当拷贝的数据量比较大时,才会进行 Segment 的共享。

之后,将二者的 poslimit 都进行了设置。由于 pos 之前的部分及 limit 之后的部分都不会影响到我们正常的读取和写入,因此我们可以不用关心它们目前的状态,没必要再对它们进行一些如填充零之类的操作。

SegmentPool

同时,Okio 还使用了 SegmentPool 来实现一个对象池,从而避免 Segment 频繁地创建及销毁所带来的性能开销。

SegmentPool 的实现十分简单,它内部维护了一个单链表,用于存储被回收存在池中的 Segment,其最大容量被限制在了 64 k。

当需要 Segment 时,可以通过 take 方法来获取一个被回收的对象:

static Segment take() {synchronized (SegmentPool.class) {if (next != null) {Segment result = next;next = result.next;result.next = null;byteCount -= Segment.SIZE;return result;}}return new Segment(); // Pool is empty. Don't zero-fill while holding a lock.
}

它会在单链表中找到一个空闲的 Segment 并初始化后返回。若当前链表中没有对象,则会创建一个新的 Segment

Segment 使用完毕时,首先可以通过 Segmentpop 操作将其从链表中移除,之后可以调用 SegmentPool.recycle 方法对其进行回收:

static void recycle(Segment segment) {if (segment.next != null || segment.prev != null) throw new IllegalArgumentException();if (segment.shared) return; // This segment cannot be recycled.synchronized (SegmentPool.class) {if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full.byteCount += Segment.SIZE;segment.next = next;segment.pos = segment.limit = 0;next = segment;}
}

回收 Segment 时,不会对只读的 Segment 进行回收,若 Segment 个数超过了上限,则不会对该 Segment 进行回收。

数据转移

Okio 与 java.io 有个很大的不同,体现在 Buffer 的数据转移上,我们可以通过其 copyTo 方法来完成数据的转移。之所以叫转移,因为它相对于复制来说,是有很大的数据提升的。例如我们可以看到两个 Buffer 之间的数据转移是如何进行的:

public final Buffer copyTo(Buffer out, long offset, long byteCount) {if (out == null) throw new IllegalArgumentException("out == null");checkOffsetAndCount(size, offset, byteCount);if (byteCount == 0) return this;out.size += byteCount;// 跳过不进行拷贝的 SegmentSegment s = head;for (; offset >= (s.limit - s.pos); s = s.next) {offset -= (s.limit - s.pos);}for (; byteCount > 0; s = s.next) {// 通过 sharedCopy 将数据拷贝到 copy 中  Segment copy = s.sharedCopy();copy.pos += offset;copy.limit = Math.min(copy.pos + (int) byteCount, copy.limit);// 插入 Segmentif (out.head == null) {out.head = copy.next = copy.prev = copy;} else {out.head.prev.push(copy);}byteCount -= copy.limit - copy.pos;offset = 0;}return this;
}

从上面的代码中可以看出,实际上这个过程是通过了 Segment 共享实现的,因此不需要进行拷贝,极大地提高了数据转移的效率。

BufferedSource

我们可以通过 Okio.buffer 方法对一个普通的 Source 进行包装,获取一个具有缓冲能力的 BufferSource,它是一个接口,定义了一系列读取的方法:

public interface BufferedSource extends Source, ReadableByteChannel {@DeprecatedBuffer buffer();Buffer getBuffer();boolean exhausted() throws IOException;void require(long byteCount) throws IOException;boolean request(long byteCount) throws IOException;byte readByte() throws IOException;short readShort() throws IOException;short readShortLe() throws IOException;// ...一系列读取方法long indexOf(byte b, long fromIndex) throws IOException;long indexOf(byte b, long fromIndex, long toIndex) throws IOException;long indexOf(ByteString bytes) throws IOException;long indexOf(ByteString bytes, long fromIndex) throws IOException;long indexOfElement(ByteString targetBytes) throws IOException;long indexOfElement(ByteString targetBytes, long fromIndex) throws IOException;boolean rangeEquals(long offset, ByteString bytes) throws IOException;boolean rangeEquals(long offset, ByteString bytes, int bytesOffset, int byteCount)throws IOException;BufferedSource peek();InputStream inputStream();
}

它主要有两个实现类:BufferRealBufferedSource。其中 RealBufferedSource 显然是我们通过 buffer 方法包装后得到的类,而 Buffer 实际上对 BufferSource 也进行了实现,通过一系列 read 方法可以从 Segment 中读取处对应的数据。而我们的 RealBufferedSource 则是 Source 的一个包装类,并且其维护了一个 Buffer,从而提高 Input 的效率。我们先分析其思路,再来讨论为什么这样能提高 Input 的效率。

我们可以首先看到 RealBufferedSource 的读取方法,这里以 readByteArray 方法举例:

@Override public byte[] readByteArray(long byteCount) throws IOException {require(byteCount);return buffer.readByteArray(byteCount);
}

这里首先调用了 require 方法,之后再从 buffer 中将数据读出,看来在 require 中将数据先读取到了 buffer 中。

我们看到 require 方法:

@Override public void require(long byteCount) throws IOException {if (!request(byteCount)) throw new EOFException();
}

它实际上转调到了 request 方法:

@Override public boolean request(long byteCount) throws IOException {if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);if (closed) throw new IllegalStateException("closed");while (buffer.size < byteCount) {if (source.read(buffer, Segment.SIZE) == -1) return false;}return true;
}

request 方法中,不断地向 buffer 中读取,每次读取 Segment.SIZE 也就是 8192 个字节。也就是它读取的量是 byteCount 以 8192 字节向上取整。为什么它不刚好读取 byteCount 个字节,要读满 8192 个字节呢?

这就是一种预取思想,因为 I/O 操作往往是非常频繁的,如果进行了一次读取,那就很有可能还会进行下一次读取,因此我们预先把它下一次可能读取的部分一起读取出来,这样下次读取时,就不需要再对系统进行请求以获取数据了,可以直接从我们的 buffer 中拿到。这就是为什么说加入 buffer 提高了我们 I/O 的效率。

可能还有人会问,为什么这样能提高 I/O 效率呢,不都是读了一样的量么?这个就涉及到一些操作系统的知识了。在现代的操作系统中,我们的程序往往运行在用户态,而用户态实际上是没有进行 I/O 的权限的,因此往往都是向操作系统发起请求,切换到内核态,再进行 I/O,完成后再次回到用户态。这样的用户态及内核态的切换实际上是非常耗时的,并且这个过程中也伴随着拷贝。因此采用上面的 buffer 可以有效地减少我们的这种系统 I/O 调用,加快我们的效率。

BufferedSink

我们同样可以通过 Okio.buffer 方法对一个普通的 Sink 进行包装,从而获取一个带有 buffer 缓冲能力的 BufferedSinkBufferedSink 也是一个接口 ,内部定义了一系列写入的方法:

public interface BufferedSink extends Sink, WritableByteChannel {Buffer buffer();BufferedSink write(ByteString byteString) throws IOException;BufferedSink write(byte[] source) throws IOException;BufferedSink write(byte[] source, int offset, int byteCount) throws IOException;long writeAll(Source source) throws IOException;BufferedSink write(Source source, long byteCount) throws IOException;// ...一些对 write 的封装@Override void flush() throws IOException;BufferedSink emit() throws IOException;BufferedSink emitCompleteSegments() throws IOException;OutputStream outputStream();
}

BufferedSink 同样有两个实现类:BufferRealBufferedSink,我们可以先看到 RealBufferedSink,它是一个 Sink 的包装类,并且内部维护了一个 Buffer

write

我们先看看其写入方法:

@Override public BufferedSink write(byte[] source, int offset, int byteCount) throws IOException {if (closed) throw new IllegalStateException("closed");buffer.write(source, offset, byteCount);return emitCompleteSegments();
}

这里拿了一个简单的写入 byte[] 的方法进行了举例,它首先将数据写入了 buffer 中,之后调用了 emitCompleteSegments 方法。可以看到这里并没有对 sink真正进行写入,那写入究竟是在哪里进行的呢?我们看看 emitCompleteSegments 方法中做了什么:

@Override public BufferedSink emitCompleteSegments() throws IOException {if (closed) throw new IllegalStateException("closed");long byteCount = buffer.completeSegmentByteCount();if (byteCount > 0) sink.write(buffer, byteCount);return this;
}

这里首先调用 buffer.completeSegmentByteCount 方法获取到了 buffer 中已写入但未被读取的部分字节数(只包括已经被写满了的 Segment 中的),之后调用 sink.write 将其写入到了 sink 中。

这里其实很奇怪,按道理来说 buffer 的作用是通过缓存来进行一些优化,但这个方法将数据写入 buffer 后,数据又立即被写入到了 sink 中。这样相比直接写入到 sink 中,反而会带来性能的损耗啊。这里为什么要这样做呢?

我看到这里时对这段也比较奇怪,但考虑到 Okio 的整体设计来说,应该是把 Buffer 当做了一个数据统一的中转站,将读写的优化统一放在了 Buffer 中进行,因此考虑到整体的一致性,将 RealBufferedSink 也采用了通过 Buffer 中转的方式编写,应该算是一种妥协吧。并且采用 Buffer 还有好处就是,一份数据既可以用于读也可以用于写。

flush

RealBufferedSink 还支持了 flush 操作,通过 flush 方法可以将缓冲区的所有数据写入 sink 中:

@Override public void flush() throws IOException {if (closed) throw new IllegalStateException("closed");if (buffer.size > 0) {sink.write(buffer, buffer.size);}sink.flush();
}

emit

RealBufferedSink 还具有 emit 功能,分别是 emitCompleteSegments 方法及 emit 方法,前者是将所有已填满的 Segment 中已写入未读取的数据写入 sink,后者则是将 buffer 中所有已写入未读取数据写入 sink(类似 flush):

@Override public BufferedSink emitCompleteSegments() throws IOException {if (closed) throw new IllegalStateException("closed");long byteCount = buffer.completeSegmentByteCount();if (byteCount > 0) sink.write(buffer, byteCount);return this;
}@Override public BufferedSink emit() throws IOException {if (closed) throw new IllegalStateException("closed");long byteCount = buffer.size();if (byteCount > 0) sink.write(buffer, byteCount);return this;
}

Timeout 超时机制

Okio 中通过 Timeout 类实现了 SinkSource 的超时机制,在 Sink 的写入与 Source 的读取时对超时进行判断,如果超时则中断写入等操作。其中对于包装了普通 InputStream / OutputStream 的使用了普通的 Timeout,而对于对 Socket 进行了包装的则使用 AsyncTimeout

Timeout

我们先对普通的 Timeout 进行研究,Timeout 中主要有两个值,timeoutdeadline ,分别代表了 wait 的最大等待时间与完成某个工作的超时时间。

deadline

对于 deadline,我们可以通过 deadline 方法进行设定:

/** Set a deadline of now plus {@code duration} time. */
public final Timeout deadline(long duration, TimeUnit unit) {if (duration <= 0) throw new IllegalArgumentException("duration <= 0: " + duration);if (unit == null) throw new IllegalArgumentException("unit == null");return deadlineNanoTime(System.nanoTime() + unit.toNanos(duration));
}

之后,在每个需要检查超时的地方需要调用该 TimeoutthrowIfReached 方法(如 Sinkwrite 方法):

public void throwIfReached() throws IOException {if (Thread.interrupted()) {Thread.currentThread().interrupt(); // Retain interrupted status.throw new InterruptedIOException("interrupted");}if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) {throw new InterruptedIOException("deadline reached");}
}

这里很简单,就是进行时间的校验,若达到了设定的时间,则抛出异常从而中断后续操作。

timeout

同时,Timeout 还实现了对 monitor 进行 wait 的超时机制,通过 waitUntilNotified 方法可以等待 monitornotify,若等待的过程超过了 Timeout 所设定的时间或当前线程被中断,则会抛出异常,从而避免一直进行等待。并且,该方法需要在 synchronized 代码块中调用,以保证线程安全

在我们构造了一个 Timeout 后,可以使用 timeout 方法对其 wait 超时时间进行设定:

public Timeout timeout(long timeout, TimeUnit unit) {if (timeout < 0) throw new IllegalArgumentException("timeout < 0: " + timeout);if (unit == null) throw new IllegalArgumentException("unit == null");this.timeoutNanos = unit.toNanos(timeout);return this;
}

这里主要是将 timeoutNanos 设置为了对应的值。接着我们看到 waitUntilNotified 方法:

/*** Waits on {@code monitor} until it is notified. Throws {@link InterruptedIOException} if either* the thread is interrupted or if this timeout elapses before {@code monitor} is notified. The* caller must be synchronized on {@code monitor}.*/
public final void waitUntilNotified(Object monitor) throws InterruptedIOException {try {boolean hasDeadline = hasDeadline();long timeoutNanos = timeoutNanos();// 没有 timeout 的设定的话,直接调用 monitor 的 wait 方法if (!hasDeadline && timeoutNanos == 0L) {monitor.wait(); // There is no timeout: wait forever.return;}// 计算我们要 wait 的时间long waitNanos;long start = System.nanoTime();// 下面主要就是等待 timeout 与 deadline 中最小的那个if (hasDeadline && timeoutNanos != 0) {long deadlineNanos = deadlineNanoTime() - start;waitNanos = Math.min(timeoutNanos, deadlineNanos);} else if (hasDeadline) {waitNanos = deadlineNanoTime() - start;} else {waitNanos = timeoutNanos;}// wait 相应时间long elapsedNanos = 0L;if (waitNanos > 0L) {long waitMillis = waitNanos / 1000000L;monitor.wait(waitMillis, (int) (waitNanos - waitMillis * 1000000L));elapsedNanos = System.nanoTime() - start;}// 如果还没有 notify,则抛出异常if (elapsedNanos >= waitNanos) {throw new InterruptedIOException("timeout");}} catch (InterruptedException e) {// 线程如果在这个过程中被 interrupt,则抛出异常Thread.currentThread().interrupt();throw new InterruptedIOException("interrupted");}
}

AsyncTimeout

AsyncTimeoutTimeout 的子类,接下来我们看看 AsyncTimeout 是如何对 Socket 中的超时进行处理的。

首先可以看到 AsyncTimeout 中保存了一个 head 及一个 next 引用,显然这里是有一个链表存储的 AsyncTimeout 队列的:

// AsyncTimeout 队列的头部
static @Nullable AsyncTimeout head;
// 当前节点是否在队列中
private boolean inQueue;
// 下一个节点
private @Nullable AsyncTimeout next;

这里感觉与 MessageQueue 有点相似,猜测 AsyncTimeout 会根据超时的时间按序存储在队列中。

并且从 AsyncTimeout 的 JavaDoc 中可以看到,它需要使用者在异步的事件开始时调用 enter 方法,结束时调用 exit 方法。同时它在背后开辟了一个线程对超时进行定时检查。

enter & exit

让我们先看到 enter 方法:

public final void enter() {if (inQueue) throw new IllegalStateException("Unbalanced enter/exit");long timeoutNanos = timeoutNanos();boolean hasDeadline = hasDeadline// 时间到了就不加入队列了if (timeoutNanos == 0 && !hasDeadline) {return;}inQueue = true;// 开启线程对超时进行检查scheduleTimeout(this, timeoutNanos, hasDeadline);
}

上面主要是将其 inQueue 设置为了 true,之后调用 scheduleTimeout 方法对超时进行定时检查。我们暂时先不关注 scheduleTimeout 的具体实现。

接着我们看到 exit 方法:

/** Returns true if the timeout occurred. */
public final boolean exit() {if (!inQueue) return false;inQueue = false;return cancelScheduledTimeout(this);
}

这里也非常简单,就是将 inQueue 设置为了 false,并调用 cancelScheduledTimeout 方法停止前面的定时校验线程。

scheduleTimeout

我们接下来看看这个定时校验的具体实现,我们先看到 scheduleTimeout 方法:

private static synchronized void scheduleTimeout(AsyncTimeout node, long timeoutNanos, boolean hasDeadline) {// 如果队列中还没有节点,构造一个头节点并启动 Watchdogif (head == null) {head = new AsyncTimeout();new Watchdog().start();}long now = System.nanoTime();// 计算具体超时时间,主要是取 timeout 与 deadline 的最小值if (timeoutNanos != 0 && hasDeadline) {// Compute the earliest event; either timeout or deadline. Because nanoTime can wrap around,// Math.min() is undefined for absolute values, but meaningful for relative ones.node.timeoutAt = now + Math.min(timeoutNanos, node.deadlineNanoTime() - now);} else if (timeoutNanos != 0) {node.timeoutAt = now + timeoutNanos;} else if (hasDeadline) {node.timeoutAt = node.deadlineNanoTime();} else {throw new AssertionError();}long remainingNanos = node.remainingNanos(now);// 按剩余时间从小到大插入到队列中for (AsyncTimeout prev = head; true; prev = prev.next) {if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) {node.next = prev.next;prev.next = node;if (prev == head) {// 插入在队列头部,进行 notify AsyncTimeout.class.notify();}break;}}
}

上面的逻辑主要分为以下几步:

  1. 若队列中还没有节点,构造一个头节点并且启动 WatchdogWatchdog 是一个 Thread 的子类,也就是我们的定时扫描线程。
  2. 计算该 Timeout 的超时时间,取了 timeoutdeadline 的最小值
  3. 将该 timeout 按剩余时间从小到大的顺序插入队列中
  4. 若插入的位置是队列的头部,则进行 notify(这里还无法了解到意图,我们可以往后看看)

cancelScheduledTimeout

接着我们看看 cancelScheduledTimeout 做了些什么:

private static synchronized boolean cancelScheduledTimeout(AsyncTimeout node) {// Remove the node from the linked list.for (AsyncTimeout prev = head; prev != null; prev = prev.next) {if (prev.next == node) {prev.next = node.next;node.next = null;return false;}}// The node wasn't found in the linked list: it must have timed out!return true;
}

这里很简单,就是将该 AsyncTimeout 从队列中移除,若返回 true,则代表超时已经发生,若返回 false,则代表超时还未发生,该 Timeout 被移除。这个返回值同样反映到了我们的 exit 方法返回值中。

Watchdog

接着我们看看 Watchdog 究竟是如何对超时进行检测的:

private static final class Watchdog extends Thread {public void run() {while (true) {try {AsyncTimeout timedOut;synchronized (AsyncTimeout.class) {timedOut = awaitTimeout();// 找不到要 interrupt 的节点,继续寻找if (timedOut == null) continue;// 队列已空,停止线程if (timedOut == head) {head = null;return;}}// 调用 timeout 方法通知超时timedOut.timedOut();} catch (InterruptedException ignored) {}}}
}

Wachdog 中不断调用 awaitTimeout 方法尝试获取一个可以停止的 Timeout,之后调用了其 timeOut 方法通知外部已超时。

awaitTimeout

我们可以看看 awaitTimeout 做了什么:

static @Nullable AsyncTimeout awaitTimeout() throws InterruptedException {AsyncTimeout node = head.next;// 队列为空,wait 直到有新的节点加入if (node == null) {long startNanos = System.nanoTime();AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS);return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS? head  // The idle timeout elapsed.: null; // The situation has changed.}long waitNanos = node.remainingNanos(System.nanoTime());// 计算该节点需要 wait 的时间if (waitNanos > 0) {// wait 对应的时间long waitMillis = waitNanos / 1000000L;waitNanos -= (waitMillis * 1000000L);AsyncTimeout.class.wait(waitMillis, (int) waitNanos);return null;}// wait 过后已超时,将其移出队列head.next = node.next;node.next = null;return node;
}

这里主要有以下几步:

  1. 如果队列为空,wait 直到有新的节点加入队列
  2. 计算节点需要 wait 的时间并 wait 对应时间
  3. 时间到后,说明该节点超时,将其移出队列

通过这里的代码,我们就知道为什么前面在链表头部加入节点时需要进行一次 notify 了,主要有两个目的:

  1. 若队列中没有元素,可以通过 notify通知此处有新元素加入队列。
  2. 由于插入在头部,说明其比后面的节点的需要等待时间更少,因此需要停止前一次 wait 来计算该新的 Timeout 所需要的等待时间,并对其进行超时处理。

这里的处理和 Android 中 MessageQueue 的设计还是有异曲同工之妙的,我们可以学习一波。

sink & source

AsyncTimeout 还实现了 sinksource 方法来实现了支持 AsyncTimeout 超时机制的 SinkSource,主要是通过在其各种操作前后分别调用 enterexit。下面以 Sink 为例:

public final Sink sink(final Sink sink) {return new Sink() {@Override public void write(Buffer source, long byteCount) throws IOException {checkOffsetAndCount(source.size, 0, byteCount);while (byteCount > 0L) {// Count how many bytes to write. This loop guarantees we split on a segment boundary.long toWrite = 0L;for (Segment s = source.head; toWrite < TIMEOUT_WRITE_SIZE; s = s.next) {int segmentSize = s.limit - s.pos;toWrite += segmentSize;if (toWrite >= byteCount) {toWrite = byteCount;break;}}// Emit one write. Only this section is subject to the timeout.boolean throwOnTimeout = false;enter();try {sink.write(source, toWrite);byteCount -= toWrite;throwOnTimeout = true;} catch (IOException e) {throw exit(e);} finally {exit(throwOnTimeout);}}}@Override public void flush() throws IOException {boolean throwOnTimeout = false;enter();try {sink.flush();throwOnTimeout = true;} catch (IOException e) {throw exit(e);} finally {exit(throwOnTimeout);}}@Override public void close() throws IOException {boolean throwOnTimeout = false;enter();try {sink.close();throwOnTimeout = true;} catch (IOException e) {throw exit(e);} finally {exit(throwOnTimeout);}}@Override public Timeout timeout() {return AsyncTimeout.this;}@Override public String toString() {return "AsyncTimeout.sink(" + sink + ")";}
}

比较简单,这里就不做太多解释了。

总结

Okio 是一套基于 java.io 进行了一系列优化的十分优秀的 I/O 库,它通过引入了 Segment 机制大大降低了数据迁移的成本,减少了拷贝的次数,并且对 java.io 繁琐的体系进行了简化,使得整个库更易于使用。在 Okio 中还实现了很多有其它功能的 SourceSink,感兴趣的读者可以自行翻阅一下源码。同时各位可以去回顾一下前面的 OkHttp 源码解析中,OkHttp 是如何使用 Okio 进行 Socket 的数据写入及读取的。

Okio 源码解析 : 一套精简高效的 I/O 库相关推荐

  1. 牛掰!阿里P7大佬爆肝半个月,把安卓源码解析编成了508页的PDF

    前言 有一部分人拥有非常宝贵的精神,因为他们的坚持和贡献,我们中国的技术才一步步前进,一步步走向世界.Android行业一直在进步和发展,也正是因为这些人坚持总结经验和分享. 比如有位阿里P7大牛,每 ...

  2. loraserver 源码解析 (四) lora-gateway-bridge

    lora-gateway-bridge  负责接收 gateway 通过 udp 发送的 packet-forwarder 数据 然后通过 MQTT broker 将报文转发给 LoRa Server ...

  3. Redis源码解析——前言

    今天开启Redis源码的阅读之旅.对于一些没有接触过开源代码分析的同学来说,可能这是一件很麻烦的事.但是我总觉得做一件事,不管有多大多难,我们首先要在战略上蔑视它,但是要在战术上重视它.除了一些高大上 ...

  4. 2022-10-24 ClickHouse 源码解析-查询引擎经典理论

    ClickHouse 源码解析: 综述 ClickHouse 源码解析: MergeTree Write-Path ClickHouse 源码解析: MergeTree Read-Path Click ...

  5. SOFA BOLT源码解析之设计要点-线程模型

    1 设计要点解析 1.1  线程模型 此部分内容主要介绍蚂蚁为什么选择Netty4作为基础网络编程框架,来源于蚂蚁技术团队发布的一篇文章: 文章名称为:蚂蚁通信框架实践: 链接地址为:https:// ...

  6. 微服务开源框架TARS的RPC源码解析 之 初识TARS C++服务端

    作者:Cony 导语:微服务开源框架TARS的RPC调用包含客户端与服务端,<微服务开源框架TARS的RPC源码解析>系列文章将从初识客户端.客户端的同步及异步调用.初识服务端.服务端的工 ...

  7. OkHttp 源码解析(4.9.1 版本)

    文章目录 1.OkHttp 简介 2.OkHttp 配置与基本用法 2.1 依赖引入与配置 2.2 基本用法 3.OkHttp 常见对象介绍 4.OkHttp 源码解析 4.1 当我们调用`okhtt ...

  8. KiCAD源码解析(2):根目录CmakeList解析

    KiCAD源码解析(2):根目录CmakeList解析 Kicad根目录CmakeList解析 提示:想学习cmake的看此篇文章也用处多多 根目录CMakeLists.txt解析 KiCAD源码解析 ...

  9. [源码解析] 机器学习参数服务器ps-lite (1) ----- PostOffice

    [源码解析] 机器学习参数服务器ps-lite (1) ----- PostOffice 文章目录 [源码解析] 机器学习参数服务器ps-lite (1) ----- PostOffice 0x00 ...

最新文章

  1. INI文件快速解析java工具包
  2. python游戏-零基础python教程-用Python设计你的第一个小游戏
  3. js作用域链以及全局变量和局部变量
  4. hadoop程序开发 --- python
  5. 再见面试官:你能说说 Spring 框架中 Bean 的生命周期吗?
  6. 徐扬:互联网营销下的移动营销
  7. 老大,我想获取MySQL插入数据的自增ID,该怎么弄?
  8. HDU Calling Extraterrestrial Intelligence Again
  9. PLC跑马灯程序设计
  10. Robotium-基础理论介绍
  11. 摄像头设计工程师面试技巧_系统设计面试准备的5个技巧
  12. 三哥新发现了比金星还厉害的飞行物
  13. 吾生也有涯,而知也无涯,以有涯随无涯,殆己
  14. 阿里双十一,3分01秒破百亿;乐视网称贾跃亭无力履行承诺;法乐第未来宣布解职CFO和CTO丨价值早报
  15. VBA语言入门:一些简单语法在Excel应用实例
  16. 驱动开发思路以及应用程序与驱动程序的区别
  17. 远程开机并不难 用开机棒轻松打开局域网多台电脑
  18. 关于Ubuntu启动activeMq无效果的错误解决方案
  19. HTML5期末大作业:网站——个人网站介绍 (7页面带轮播特效)明星赵丽颖 学生DW网页设计作业源码 web课程设计网页规划与设计 大学生个人网站作业模板...
  20. 最新版Intel HD4000 桌面右键菜单去除方法

热门文章

  1. PHP搭建织梦网站,dedeCMS+PHPStudy帮助新手实现在本地搭建织梦网站
  2. 周杰伦 jay《蒲公英的约定》mp3 下载/试听/MV/在线播放
  3. 计算机二级msoffice操作题有哪些,计算机二级msoffice操作题
  4. 15个常用excel函数公式_Excel常用函数公式应用技巧解读
  5. 自动驾驶车载激光雷达-数据集整理
  6. 做自媒体内容采集有什么套路吗?怎么写出优质的文章?
  7. https的相关知识
  8. anocoda 安装
  9. Matlab:plot函数绘制二维折线图
  10. 求质数算法的 N 种境界[1] - 试除法和初级筛法