java-io / Okio

Okio是对java.io和java.nio的补充,它使访问,存储和处理数据变得更加容易。Sink代表的输出流,Source代表的是输入流。


传统io

InputStream 和 OutputStream 区别:
内存 ———OutputStream——> (write写入到)外部存储(文件)
外部存储(文件) ———IntputStream——> (read读出到)内存

装饰器设计模式应用场景有个很重要的特点:装饰器类会附加跟原始类相关的增强功能

实现机制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class FileInputStream extends InputStream{

}
class BufferedInputStream extends FilterInputStream {

}
class FilterInputStream extends InputStream {
protected volatile InputStream in;
protected FilterInputStream(InputStream in) {
this.in = in;
}
public int read() throws IOException {
return in.read();
}
public void close() throws IOException {
in.close();
}
//...
}

BufferedInputStream就是这个装饰器类,它提供的增强功能:增加缓存功能。通过提供一块缓冲区,输入流可以先放到这个缓冲区里面,然后再输出到目的地(内存或网络)。它的好处就减少和内存的读取交互次数,毕竟频繁的读取交互是比较耗费性能的。

举个例子解释下缓冲区是如何提升性能的:
假设有一个8K大小的文件,如果仅使用InputStream来读取,每次读取1K,则需要读取8次,也就需要和文件交互8次。但如果使用缓冲流BufferedInputStream来读取,在第一次读取文件的时候,就会从文件中一次性读取8K(BufferedInputStream中默认缓冲区大小)数据到缓冲区中,虽然最后还是得会从缓冲区每次读取1K,共读取8次,但是这8次是从缓冲区读,远比直接与文件交互的性能高。另外可见的是,当文件数据越大,通过缓冲区的方式效率提升越明显。


OKio代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_okio);
createFile();
readFromFile();
}

private void readFromFile() {
if (file.exists()) {
try {
source = Okio.source(file);
bufferedSource = Okio.buffer(source);
String data = bufferedSource.readString(StandardCharsets.UTF_8);
source.close();
Log.d(TAG, "readFromFile: " + data);
} catch (Exception e) {
e.printStackTrace();
}
}
}

private void createFile() {
File f = new File(pathName);
if (!f.exists()) {
f.mkdirs();
}
writeData(new File(fileName));
}

private void writeData(File file) {
Log.d(TAG, "writeData: ");
try {
sink = Okio.appendingSink(file);
bufferedSink = Okio.buffer(sink);
bufferedSink.writeUtf8("stew1");
bufferedSink.writeUtf8("\n");
bufferedSink.writeUtf8("stew2");
bufferedSink.flush();
sink.close();
} catch (Exception e) {
e.printStackTrace();
}
}

从代码中可以看出,读写文件关键一步要创建出 BufferedSource 或 BufferedSink 对象。有了这两个对象,就可以直接读写文件了。

不过也没比传统IO使用简洁到哪里去。其实是因为这个例子比较简单,前面提到传统IO使用了装饰者设计模式来可以提供增强能力。

把场景稍微变复杂点,那假设需要读取一个整数或浮点数,就需要用DataInputStream来增强,同时为了效率还需要缓存功能,就还要装饰一层BufferInputStream。类似下面这样(核心代码):

1
2
3
4
fileStream = new FileInputStream(path); 
binStream = new BufferedInputStream(fileStream);
dataInputStream = new DataInputStream(binStream);
dataInputStream.readInt();

但Okio为我们提供的BufferedSink和BufferedSource就具有以上基本所有的功能,不需要再串上一系列的装饰类。类似下面这样(核心代码):

1
2
3
Source source = Okio.source(new File(path));
BufferedSource bufferedSource = Okio.buffer(source)) {
bufferedSource.readInt()

Sink

1
2
3
4
5
6
7
8
9
10
public interface Sink extends Closeable, Flushable{
void write(Buffer source, long byteCount) throws IOException;
@Override void flush() throws IOException;
Timeout timeout();
@Override void close() throws IOException;
}

public interface BufferedSink extends Sink

final class RealBufferedSink implements BufferedSink

Source

1
2
3
4
5
6
7
8
9
public interface Source extends Closeable {
long read(Buffer sink, long byteCount) throws IOException;
Timeout timeout();
@Override void close() throws IOException;
}

public interface BufferedSource extends Source

final class RealBufferedSource implements BufferedSource

Timeout(Okio中的超时机制)

同步超时:

例如当从输入流 Source 读取数据超时后,输入流将被关闭,任务到此结束

主要使用两个判断条件来判断任务是否超时了:
任务设置了结束时间( hasDeadline = true )并且当前已经过了结束时间( deadlineNanoTime )
任务已经过了超时时间( timeoutNanos )

1
2
3
4
5
6
7
public class Timeout {
.....
private boolean hasDeadline;//是否设置了结束时间
private long deadlineNanoTime;//结束时间
private long timeoutNanos;//超时时间
.....
}

sink和source方法中都有timeout.throwIfReached();

1
2
3
4
5
6
7
8
9
public void throwIfReached() throws IOException {
if (Thread.interrupted()) {
throw new InterruptedIOException("thread interrupted");
}

if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) {
throw new InterruptedIOException("deadline reached");
}
}

waitUntilNotified方法中用到了timeoutNanos

异步超时:

Okio对于socket的封装,使用了AsyncTimeout,是继承于Timeout所实现的一个异步超时类,用于任务超时后,关闭 Socket


Segment

Segment字面翻译就是片段,Okio将数据也就是Buffer分割成一块块的片段,同时segment拥有前置节点和后置节点,构成一个双向循环链表

这样采取分片使用链表连接,片中使用数组存储,兼具读的连续性,以及写的可插入性,对比单一使用链表或者数组,是一种折中的方案,读写更快,而且有个好处根据需求改动分片的大小来权衡读写的业务操作

1
2
3
4
5
6
7
8
9
static final int SIZE = 8192;
static final int SHARE_MINIMUM = 1024;
final byte[] data;
int pos;
int limit;
boolean shared;
boolean owner;
Segment pre;
Segment next;

SIZE就是一个segment的最大字节数
SHARE_MINIMUM和共享内存有关
data就是保存的字节数组
pos,limit就是开始和结束点的index
shared和owner用来设置状态判断是否可写,一个有共享内存的segment是不能写入的
pre,next就是前置后置节点。

SegmentPool ,segment的对象池

1
2
3
static final long MAX_SIZE = 64 * 1024;
static Segment next;
static long byteCount;

池子的上限是64k,相当于8个segment,next这个节点可以看出SegmentPool是按照单链表的方式进行存储的,byteCount则是目前已有的大小。

take(),recycle(Segment segment),操作中都有加锁逻辑

如果要回收的segment有前后引用或者是共享的,就不能被回收,所以要回收前先将引用置空,同样这里也加了锁,以免那个同时回收超过池子最大的大小,然后就是将回收的插到表头的操作。所以SegmentPool无论是回收和取对象都是在表头操作。


Okio围绕着两个封装了许多功能的类型来组建的:——ByteStrings和Buffers

不可变的ByteString

ByteString不仅是不可变的,同时在内部有两个filed,分别是byte数据,以及String的数据,这样能够让这个类在Byte和String转换上基本没有开销,同样的也需要保存两份引用,这是明显的空间换时间的方式

Buffer

Buffer这个类实际上就是整个读和写的核心,括 RealBufferedSource和RealBufferedSink 实际上都只是一个代理,里面的操作全部都是通过Buffer来完成的

1
2
3
public class Buffer implements BufferedSource, BufferedSink, Cloneable { 
long size;
Segment head;

Buffer一共实现了三个接口,读,写,以及clone。
clone是一种对象生成的方式,是除了常规的new·关键字以及反序列化之外的一种方式,主要分为深拷贝和浅拷贝两种,Buffer采用的是深拷贝的方式

对应实现的clone方法,如果整个Buffer的size为null,也就是没有数据,那么就返回一个新建的Buffer对象,如果不为空就是遍历所有的segment并且都创建一个对应的Segment,这样clone出来的对象就是一个全新的毫无关系的对象。

举例:

writeShort用来给Buffer中写入一个short的数据,首先通过writableSegment拿到一个能够有2个字节空间的segment,tail中的data就是字节数组,limit则是数据的尾部索引,写数据就是在尾部继续往后写,直接设置在data通过limit自增后的index,然后重置尾部索引,并且buffer的size大小加2。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

//整型:
//byte:1个字节 8位 -128~127
//short :2个字节 16位
//int :4个字节 32位
//long:8个字节 64位

//浮点型:
//float:4个字节 32 位
//double :8个字节 64位

@Override public Buffer writeShort(int s) {
Segment tail = writableSegment(2);
byte[] data = tail.data;
int limit = tail.limit;
data[limit++] = (byte) ((s >>> 8) & 0xff);
data[limit++] = (byte) (s & 0xff);
tail.limit = limit;
size += 2;
return this;
}

readShort:如果short被segment分隔开 通过readByte来一个个字节读

读的方法相对于写的方法就复杂一些,因为buffer是分块的,读数据的过程就有可能是跨segment的,比如前面一个字节,下一个segment一个字节,这种情况就转化为readbyte,读两个字节后合成一个short对象,对于连续的读可以直接通过pos索引自增达到目的,读完后Buffer的size减2。

并且会有当前的segment会出现读完后数据为null的情况,此时头部索引pos和尾部索引limit就重合了,通过pop方法可以把这个segment分离出来,并且将下一个segment设置为Buffer的head,然后将分离出来的segment回收到对象池中。