【从入门到放弃-Java】并发编程-NIO-Channel
前言
上篇[【從入門到放棄-Java】并發編程-NIO使用]()簡單介紹了nio的基礎使用,本篇將深入源碼分析nio中channel的實現。
簡介
channel即通道,可以用來讀、寫數據,它是全雙工的可以同時用來讀寫操作。這也是它與stream流的最大區別。
channel需要與buffer配合使用,channel通道的一端是buffer,一端是數據源實體,如文件、socket等。在nio中,通過channel的不同實現來處理 不同實體與數據buffer中的數據傳輸。
channel接口:
package java.nio.channels;import java.io.IOException; import java.io.Closeable;/*** A nexus for I/O operations.** <p> A channel represents an open connection to an entity such as a hardware* device, a file, a network socket, or a program component that is capable of* performing one or more distinct I/O operations, for example reading or* writing.** <p> A channel is either open or closed. A channel is open upon creation,* and once closed it remains closed. Once a channel is closed, any attempt to* invoke an I/O operation upon it will cause a {@link ClosedChannelException}* to be thrown. Whether or not a channel is open may be tested by invoking* its {@link #isOpen isOpen} method.** <p> Channels are, in general, intended to be safe for multithreaded access* as described in the specifications of the interfaces and classes that extend* and implement this interface.*** @author Mark Reinhold* @author JSR-51 Expert Group* @since 1.4*/public interface Channel extends Closeable {/*** Tells whether or not this channel is open.** @return <tt>true</tt> if, and only if, this channel is open*/public boolean isOpen();/*** Closes this channel.** <p> After a channel is closed, any further attempt to invoke I/O* operations upon it will cause a {@link ClosedChannelException} to be* thrown.** <p> If this channel is already closed then invoking this method has no* effect.** <p> This method may be invoked at any time. If some other thread has* already invoked it, however, then another invocation will block until* the first invocation is complete, after which it will return without* effect. </p>** @throws IOException If an I/O error occurs*/public void close() throws IOException;}常見的channel實現有:
- FileChannel:文件讀寫數據通道
- SocketChannel:TCP讀寫網絡數據通道
- ServerSocketChannel:服務端網絡數據讀寫通道,可以監聽TCP連接。對每一個新進來的連接都會創建一個SocketChannel。
- DatagramChannel:UDP讀寫網絡數據通道
FileChannel
FileChannel是一個抽象類,它繼承了AbstractInterruptibleChannel類,并實現了 SeekableByteChannel, GatheringByteChannel, ScatteringByteChannel接口。
具體的實現類主要是sun.nio.ch.FileChannelImpl。下面詳細分析下FileChannelImpl中每個方法的具體實現。
open
private FileChannelImpl(FileDescriptor var1, String var2, boolean var3, boolean var4, boolean var5, Object var6) {//主要記載操作系統維護的文件描述符this.fd = var1;//是否可讀this.readable = var3;//是否可寫this.writable = var4;//是否以追加的方式打開this.append = var5;this.parent = var6;this.path = var2;//底層使用native的read和write來處理文件的this.nd = new FileDispatcherImpl(var5); }//FileInputStream::getChannel 調用 FileChannelImpl.open(fd, path, true, false, this) 獲取只讀channel public static FileChannel open(FileDescriptor var0, String var1, boolean var2, boolean var3, Object var4) {return new FileChannelImpl(var0, var1, var2, var3, false, var4); }//FileOutputStream::getChannel 調用 FileChannelImpl.open(fd, path, false, true, append, this) 獲取只寫channel public static FileChannel open(FileDescriptor var0, String var1, boolean var2, boolean var3, boolean var4, Object var5) {return new FileChannelImpl(var0, var1, var2, var3, var4, var5); } private FileChannelImpl(FileDescriptor fd, String path, boolean readable,boolean writable, boolean direct, Object parent) {this.fd = fd;//是否可讀this.readable = readable;//是否可寫this.writable = writable;//對于從流創建的channel,在結束時要做不同的清理動作,(openJDK中才有,sun的jdk中沒有)this.parent = parent;//源文件的paththis.path = path;//是否使用DirectIOthis.direct = direct;this.nd = new FileDispatcherImpl();if (direct) {assert path != null;this.alignment = nd.setDirectIO(fd, path);} else {this.alignment = -1;}//當parent不存在時,則注冊一個cleaner,否則交由parent做清理動作。// Register a cleaning action if and only if there is no parent// as the parent will take care of closing the file descriptor.// FileChannel is used by the LambdaMetaFactory so a lambda cannot// be used here hence we use a nested class instead.this.closer = parent != null ? null :CleanerFactory.cleaner().register(this, new Closer(fd)); }// Used by FileInputStream.getChannel(), FileOutputStream.getChannel // and RandomAccessFile.getChannel() public static FileChannel open(FileDescriptor fd, String path,boolean readable, boolean writable,boolean direct, Object parent) {return new FileChannelImpl(fd, path, readable, writable, direct, parent); }- open方法主要是返回一個新new的FileChannelImpl對象,初始化時設置fileDescriptor、readable、writable、append、parent、path等屬性,看變量名很容易理解,在此不贅述變量含義。
read
//實現自SeekableByteChannel接口的方法,將文件中的內容讀取到給定的byteBuffer中 public int read(ByteBuffer dst) throws IOException {//保證讀寫時,channel處于開啟狀態ensureOpen();//判斷是否可讀if (!readable)throw new NonReadableChannelException();synchronized (positionLock) {if (direct)Util.checkChannelPositionAligned(position(), alignment);int n = 0;int ti = -1;try {//開始阻塞,并注冊為Interruptible,可以被中斷beginBlocking();//將當前線程添加到NativeThreadSet中,并返回索引,方便后續操作。//NativeThreadSet是一個線程安全的本地線程集合,方便管理,用來發送信號ti = threads.add();if (!isOpen())return 0;do {//當未被系統中斷(即讀取完畢)或channel未被關閉,則一直讀,將內容寫入到byteBuffer(dst)中n = IOUtil.read(fd, dst, -1, direct, alignment, nd);} while ((n == IOStatus.INTERRUPTED) && isOpen());return IOStatus.normalize(n);} finally {//把當前線程從set中移出threads.remove(ti);//結束,釋放鎖endBlocking(n > 0);assert IOStatus.check(n);}} }//實現自ScatteringByteChannel接口的方法,將文件中的內容依次讀取到給定的byteBuffer數組中。 public long read(ByteBuffer[] dsts, int offset, int length)throws IOException {if ((offset < 0) || (length < 0) || (offset > dsts.length - length))throw new IndexOutOfBoundsException();//保證讀寫時,channel處于開啟狀態ensureOpen();//判斷是否可讀if (!readable)throw new NonReadableChannelException();synchronized (positionLock) {if (direct)Util.checkChannelPositionAligned(position(), alignment);long n = 0;int ti = -1;try {//開始阻塞,并注冊為Interruptible,可以被中斷beginBlocking();//將當前線程添加到NativeThreadSet中,并返回索引,方便后續操作。//NativeThreadSet是一個線程安全的本地線程集合,方便管理,用來發送信號ti = threads.add();if (!isOpen())return 0;do {//當未被系統中斷(即讀取完畢)或channel未被關閉,則一直讀,將內容寫入到byteBuffer(dst)中n = IOUtil.read(fd, dsts, offset, length,direct, alignment, nd);} while ((n == IOStatus.INTERRUPTED) && isOpen());return IOStatus.normalize(n);} finally {//把當前線程從set中移出threads.remove(ti);//結束,釋放鎖endBlocking(n > 0);assert IOStatus.check(n);}} }write
//實現自SeekableByteChannel接口的方法,將byteBuffer中的內容寫入到文件中 public int write(ByteBuffer src) throws IOException {//保證寫時,channel處于開啟狀態ensureOpen();//判斷是否可寫if (!writable)throw new NonWritableChannelException();synchronized (positionLock) {if (direct)Util.checkChannelPositionAligned(position(), alignment);int n = 0;int ti = -1;try {//開始阻塞,并注冊為Interruptible,可以被中斷beginBlocking();//將當前線程添加到NativeThreadSet中,并返回索引,方便后續操作。//NativeThreadSet是一個線程安全的本地線程集合,方便管理,用來發送信號ti = threads.add();if (!isOpen())return 0;do {//當未被系統中斷(即寫入完畢)或channel未被關閉,則一直寫,將內容寫入到文件中n = IOUtil.write(fd, src, -1, direct, alignment, nd);} while ((n == IOStatus.INTERRUPTED) && isOpen());return IOStatus.normalize(n);} finally {//把當前線程從set中移出threads.remove(ti);//結束,釋放鎖assert IOStatus.check(n);}} }//實現自GatheringByteChannel接口的方法,將byteBuffer數組中的內容依次寫入到文件中 public long write(ByteBuffer[] srcs, int offset, int length)throws IOException {if ((offset < 0) || (length < 0) || (offset > srcs.length - length))throw new IndexOutOfBoundsException();//保證寫時,channel處于開啟狀態ensureOpen();//判斷是否可寫if (!writable)throw new NonWritableChannelException();synchronized (positionLock) {if (direct)Util.checkChannelPositionAligned(position(), alignment);long n = 0;int ti = -1;try {//開始阻塞,并注冊為Interruptible,可以被中斷beginBlocking();//將當前線程添加到NativeThreadSet中,并返回索引,方便后續操作。//NativeThreadSet是一個線程安全的本地線程集合,方便管理,用來發送信號ti = threads.add();if (!isOpen())return 0;do {//當未被系統中斷(即寫入完畢)或channel未被關閉,則一直寫,將內容寫入到文件中n = IOUtil.write(fd, srcs, offset, length,direct, alignment, nd);} while ((n == IOStatus.INTERRUPTED) && isOpen());return IOStatus.normalize(n);} finally {//把當前線程從set中移出threads.remove(ti);//結束,釋放鎖assert IOStatus.check(n);}} }position
//實現自SeekableByteChannel接口的方法,獲取當前channel的position public long position() throws IOException {ensureOpen();synchronized (positionLock) {long p = -1;int ti = -1;try {beginBlocking();ti = threads.add();if (!isOpen())return 0;boolean append = fdAccess.getAppend(fd);do {//append模式下,position在channel的末尾// in append-mode then position is advanced to end before writingp = (append) ? nd.size(fd) : nd.seek(fd, -1);} while ((p == IOStatus.INTERRUPTED) && isOpen());return IOStatus.normalize(p);} finally {threads.remove(ti);endBlocking(p > -1);assert IOStatus.check(p);}} }//實現自SeekableByteChannel接口的方法,設置當前channel的position為newPosition public FileChannel position(long newPosition) throws IOException {ensureOpen();if (newPosition < 0)throw new IllegalArgumentException();synchronized (positionLock) {long p = -1;int ti = -1;try {beginBlocking();ti = threads.add();if (!isOpen())return null;do {//設置當前position為newPositionp = nd.seek(fd, newPosition);} while ((p == IOStatus.INTERRUPTED) && isOpen());return this;} finally {threads.remove(ti);endBlocking(p > -1);assert IOStatus.check(p);}} }size
實現自SeekableByteChannel接口的方法,返回當前實體(文件)的大小
truncate
實現自SeekableByteChannel接口的方法,用來截取文件至newSize大小
force
實現自SeekableByteChannel接口的方法,用來將channel中尚未寫入磁盤的數據強制落盤
transferTo
將fileChannel中的數據傳遞至另一個channel
transferFrom
從其它channel讀取數據至fileChannel
SocketChannel
open
/*** Opens a socket channel.** <p> The new channel is created by invoking the {@link* java.nio.channels.spi.SelectorProvider#openSocketChannel* openSocketChannel} method of the system-wide default {@link* java.nio.channels.spi.SelectorProvider} object. </p>** @return A new socket channel** @throws IOException* If an I/O error occurs*/ public static SocketChannel open() throws IOException {return SelectorProvider.provider().openSocketChannel(); }open方法是調用SelectorProvider中實現了java.nio.channels.spi.SelectorProvider#openSocketChannel的方法,底層實際是new SocketChannelImpl,調用native方法創建socket
connect
public boolean connect(SocketAddress sa) throws IOException {//校驗Address是否合法InetSocketAddress isa = Net.checkAddress(sa);//獲取系統安全管理器SecurityManager sm = System.getSecurityManager();if (sm != null)//校驗IP和端口是否被允許連接sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());InetAddress ia = isa.getAddress();//如果是本機地址,則獲取本機的hostif (ia.isAnyLocalAddress())ia = InetAddress.getLocalHost();try {//加讀鎖readLock.lock();try {//加寫鎖writeLock.lock();try {int n = 0;//是否阻塞boolean blocking = isBlocking();try {//開啟connect前的校驗并設置為ST_CONNECTIONPENDING,如果blocking是true 即阻塞模式,則記錄當前線程的ID,以便接收信號處理。beginConnect(blocking, isa);do {//調用native connect方法n = Net.connect(fd, ia, isa.getPort());} while (n == IOStatus.INTERRUPTED && isOpen());} finally {//結束連接endConnect(blocking, (n > 0));}assert IOStatus.check(n);return n > 0;} finally {//釋放寫鎖writeLock.unlock();}} finally {//釋放讀鎖readLock.unlock();}} catch (IOException ioe) {// connect failed, close the channelclose();throw SocketExceptions.of(ioe, isa);} }configureBlocking
實現自SelectableChannel的接口方法,調用native方法設置socket的阻塞狀態
register
在AbstractSelectableChannel中定義,注冊要監聽的事件。
public final SelectionKey register(Selector sel, int ops, Object att)throws ClosedChannelException {if ((ops & ~validOps()) != 0)throw new IllegalArgumentException();if (!isOpen())throw new ClosedChannelException();synchronized (regLock) {if (isBlocking())throw new IllegalBlockingModeException();synchronized (keyLock) {// re-check if channel has been closedif (!isOpen())throw new ClosedChannelException();SelectionKey k = findKey(sel);if (k != null) {k.attach(att);k.interestOps(ops);} else {// 向Selector中注冊事件// New registrationk = ((AbstractSelector)sel).register(this, ops, att);addKey(k);}return k;}} }read
//實現自ReadableByteChannel接口的方法,從socket中讀取數據至ByteBuffer @Override public int read(ByteBuffer buf) throws IOException {Objects.requireNonNull(buf);readLock.lock();try {boolean blocking = isBlocking();int n = 0;try {//檢查channel是否開啟并已經是connected的狀態。如果blocking是true 即阻塞模式,則記錄當前線程的ID,以便接收信號處理。beginRead(blocking);// check if input is shutdownif (isInputClosed)return IOStatus.EOF;//如果是阻塞模式,則一直讀取直到數據讀取完畢;非阻塞模式則直接調用native方法不需要等待。if (blocking) {do {n = IOUtil.read(fd, buf, -1, nd);} while (n == IOStatus.INTERRUPTED && isOpen());} else {n = IOUtil.read(fd, buf, -1, nd);}} finally {endRead(blocking, n > 0);if (n <= 0 && isInputClosed)return IOStatus.EOF;}return IOStatus.normalize(n);} finally {readLock.unlock();} }//實現自ScatteringByteChannel接口的方法,從socket中依次讀取數據至ByteBuffer數組 @Override public long read(ByteBuffer[] dsts, int offset, int length)throws IOException {Objects.checkFromIndexSize(offset, length, dsts.length);readLock.lock();try {boolean blocking = isBlocking();long n = 0;try {beginRead(blocking);// check if input is shutdownif (isInputClosed)return IOStatus.EOF;//如果是阻塞模式,則一直讀取直到數據讀取完畢;非阻塞模式則直接調用native方法不需要等待。if (blocking) {do {n = IOUtil.read(fd, dsts, offset, length, nd);} while (n == IOStatus.INTERRUPTED && isOpen());} else {n = IOUtil.read(fd, dsts, offset, length, nd);}} finally {endRead(blocking, n > 0);if (n <= 0 && isInputClosed)return IOStatus.EOF;}return IOStatus.normalize(n);} finally {readLock.unlock();} }write
//實現自ReadableByteChannel接口的方法,將ByteBuffer中的數據寫入socket @Override public int write(ByteBuffer buf) throws IOException {Objects.requireNonNull(buf);writeLock.lock();try {boolean blocking = isBlocking();int n = 0;try {beginWrite(blocking);//如果是阻塞模式,則一直讀取直到數據讀取完畢;非阻塞模式則直接調用native方法不需要等待。if (blocking) {do {n = IOUtil.write(fd, buf, -1, nd);} while (n == IOStatus.INTERRUPTED && isOpen());} else {n = IOUtil.write(fd, buf, -1, nd);}} finally {endWrite(blocking, n > 0);if (n <= 0 && isOutputClosed)throw new AsynchronousCloseException();}return IOStatus.normalize(n);} finally {writeLock.unlock();} }@Override public long write(ByteBuffer[] srcs, int offset, int length)throws IOException {Objects.checkFromIndexSize(offset, length, srcs.length);writeLock.lock();try {boolean blocking = isBlocking();long n = 0;try {beginWrite(blocking);//如果是阻塞模式,則一直等待直到數據寫入完畢;非阻塞模式則直接調用native方法不需要等待。if (blocking) {do {n = IOUtil.write(fd, srcs, offset, length, nd);} while (n == IOStatus.INTERRUPTED && isOpen());} else {n = IOUtil.write(fd, srcs, offset, length, nd);}} finally {endWrite(blocking, n > 0);if (n <= 0 && isOutputClosed)throw new AsynchronousCloseException();}return IOStatus.normalize(n);} finally {writeLock.unlock();} }//實現自ReadableByteChannel接口的方法,將ByteBuffer數組中的數據依次寫入socket /*** Writes a byte of out of band data.*/ int sendOutOfBandData(byte b) throws IOException {writeLock.lock();try {boolean blocking = isBlocking();int n = 0;try {beginWrite(blocking);//如果是阻塞模式,則一直等待直到數據寫入完畢;非阻塞模式則直接調用native方法不需要等待。if (blocking) {do {n = sendOutOfBandData(fd, b);} while (n == IOStatus.INTERRUPTED && isOpen());} else {n = sendOutOfBandData(fd, b);}} finally {endWrite(blocking, n > 0);if (n <= 0 && isOutputClosed)throw new AsynchronousCloseException();}return IOStatus.normalize(n);} finally {writeLock.unlock();} }ServerSocketChannel
socket
@Override public ServerSocket socket() {synchronized (stateLock) {if (socket == null)socket = ServerSocketAdaptor.create(this);return socket;} }bind
@Override public ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException {synchronized (stateLock) {ensureOpen();if (localAddress != null)throw new AlreadyBoundException();InetSocketAddress isa = (local == null)? new InetSocketAddress(0): Net.checkAddress(local);SecurityManager sm = System.getSecurityManager();if (sm != null)sm.checkListen(isa.getPort());//綁定前做一些前置處理,如將tcp socket文件描述符轉換成SDPNetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort());//綁定IP和地址Net.bind(fd, isa.getAddress(), isa.getPort());//開始監聽,設置socket上最多可以掛起backlog個連接,若backlog小于1 則默認設置50個Net.listen(fd, backlog < 1 ? 50 : backlog);localAddress = Net.localAddress(fd);}return this; }accept
@Override public SocketChannel accept() throws IOException {acceptLock.lock();try {int n = 0;FileDescriptor newfd = new FileDescriptor();InetSocketAddress[] isaa = new InetSocketAddress[1];boolean blocking = isBlocking();try {begin(blocking);do {//阻塞等待接收客戶端鏈接n = accept(this.fd, newfd, isaa);} while (n == IOStatus.INTERRUPTED && isOpen());} finally {end(blocking, n > 0);assert IOStatus.check(n);}if (n < 1)return null;//新接收的socket初始設置為阻塞模式(因此非阻塞模式的每次需要顯示設置)// newly accepted socket is initially in blocking modeIOUtil.configureBlocking(newfd, true);InetSocketAddress isa = isaa[0];//用新接收的socket創建SocketChannelSocketChannel sc = new SocketChannelImpl(provider(), newfd, isa);// check permitted to accept connections from the remote addressSecurityManager sm = System.getSecurityManager();if (sm != null) {try {sm.checkAccept(isa.getAddress().getHostAddress(), isa.getPort());} catch (SecurityException x) {sc.close();throw x;}}return sc;} finally {acceptLock.unlock();} }ServerSocketChannel并沒有read和write方法,只是繼承了AbstractSelectableChannel,以便在selector中使用
DatagramChannel
open
public DatagramChannelImpl(SelectorProvider sp)throws IOException {super(sp);ResourceManager.beforeUdpCreate();try {//如果不支持IPv6則使用IPv4this.family = Net.isIPv6Available()? StandardProtocolFamily.INET6: StandardProtocolFamily.INET;//設置非流式的socket(tcp是流模式協議,udp是數據報模式協議)this.fd = Net.socket(family, false);this.fdVal = IOUtil.fdVal(fd);} catch (IOException ioe) {ResourceManager.afterUdpClose();throw ioe;} }receive
public SocketAddress receive(ByteBuffer dst) throws IOException {if (dst.isReadOnly())throw new IllegalArgumentException("Read-only buffer");readLock.lock();try {boolean blocking = isBlocking();int n = 0;ByteBuffer bb = null;try {SocketAddress remote = beginRead(blocking, false);boolean connected = (remote != null);SecurityManager sm = System.getSecurityManager();if (connected || (sm == null)) {// connected or no security managerdo {n = receive(fd, dst, connected);} while ((n == IOStatus.INTERRUPTED) && isOpen());if (n == IOStatus.UNAVAILABLE)return null;} else {// Cannot receive into user's buffer when running with a// security manager and not connectedbb = Util.getTemporaryDirectBuffer(dst.remaining());for (;;) {do {n = receive(fd, bb, connected);} while ((n == IOStatus.INTERRUPTED) && isOpen());if (n == IOStatus.UNAVAILABLE)return null;InetSocketAddress isa = (InetSocketAddress)sender;try {sm.checkAccept(isa.getAddress().getHostAddress(),isa.getPort());} catch (SecurityException se) {// Ignore packetbb.clear();n = 0;continue;}bb.flip();dst.put(bb);break;}}//sender:發送方地址, Set by receive0 (## ugh)assert sender != null;return sender;} finally {if (bb != null)Util.releaseTemporaryDirectBuffer(bb);endRead(blocking, n > 0);assert IOStatus.check(n);}} finally {readLock.unlock();} }send
public int send(ByteBuffer src, SocketAddress target)throws IOException {Objects.requireNonNull(src);InetSocketAddress isa = Net.checkAddress(target, family);writeLock.lock();try {boolean blocking = isBlocking();int n = 0;try {//當connect后,remote會設置為連接的地址SocketAddress remote = beginWrite(blocking, false);if (remote != null) {// connectedif (!target.equals(remote)) {throw new AlreadyConnectedException();}do {n = IOUtil.write(fd, src, -1, nd);} while ((n == IOStatus.INTERRUPTED) && isOpen());} else {// not connectedSecurityManager sm = System.getSecurityManager();if (sm != null) {InetAddress ia = isa.getAddress();if (ia.isMulticastAddress()) {sm.checkMulticast(ia);} else {sm.checkConnect(ia.getHostAddress(), isa.getPort());}}do {n = send(fd, src, isa);} while ((n == IOStatus.INTERRUPTED) && isOpen());}} finally {endWrite(blocking, n > 0);assert IOStatus.check(n);}return IOStatus.normalize(n);} finally {writeLock.unlock();} }connect
@Override public DatagramChannel connect(SocketAddress sa) throws IOException {InetSocketAddress isa = Net.checkAddress(sa, family);SecurityManager sm = System.getSecurityManager();if (sm != null) {InetAddress ia = isa.getAddress();if (ia.isMulticastAddress()) {sm.checkMulticast(ia);} else {sm.checkConnect(ia.getHostAddress(), isa.getPort());sm.checkAccept(ia.getHostAddress(), isa.getPort());}}readLock.lock();try {writeLock.lock();try {synchronized (stateLock) {ensureOpen();if (state == ST_CONNECTED)throw new AlreadyConnectedException();int n = Net.connect(family,fd,isa.getAddress(),isa.getPort());if (n <= 0)throw new Error(); // Can't happen// connectedremoteAddress = isa;state = ST_CONNECTED;// refresh local addresslocalAddress = Net.localAddress(fd);// flush any packets already received.boolean blocking = isBlocking();if (blocking) {IOUtil.configureBlocking(fd, false);}try {ByteBuffer buf = ByteBuffer.allocate(100);while (receive(buf) != null) {buf.clear();}} finally {if (blocking) {IOUtil.configureBlocking(fd, true);}}}} finally {writeLock.unlock();}} finally {readLock.unlock();}return this; }udp是數據報模式的協議,是沒有connect的。這里的connect實際上是在底層忽略了與其他地址的數據傳輸。
在connect后,就可以像socketChannel似得使用read和write了
總結
本文學習了各種channel的實現,主要是對底層native方法的一些封裝,針對不同屬性的實體(文件、socket),使用對應的channel與byteBuffer傳輸數據。再通過byteBuffer與byte數據進行轉換。
channel的實現中,封裝了大量的native方法,重要的底層實現全在native中,后續可以深入學習下。
本文中出現的byteBuffer和selector將在接下來的文章中,單獨分析。
原文鏈接
本文為云棲社區原創內容,未經允許不得轉載。
總結
以上是生活随笔為你收集整理的【从入门到放弃-Java】并发编程-NIO-Channel的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 首度公开!OceanBase存储系统架构
- 下一篇: 时尚电商新赛道:揭秘 FashionAI