这篇文章和你一起过下Netty的主发行版本的一些显著的改变和新特性,让你在把你的应用程序转换到新版本的时候有个概念。 项目结构改变Netty的包名从org.jboss.netty改为io.netty,因为我们不在是JBoss.org的一部分了。 二进制JAR包被分为了多个子模块以便用户能够从类路径中去掉非必需的特性。当前的结构如下:
所有的Netty的Jar(除了netty-all外)包现在都是OSGI的bundle,能够用在你喜欢的OSGI容器上。 常用API的变化
Buffer API变化ChannelBuffer → ByteBuf由于上文所提到的结构上的变化,buffer API现在可以作为一个单独的包被使用。为此,ChannelBuffer这个类型名也不那么讲得通了,而应该变更为ByteBuf。 用来创建新buffer的功能类ChannelBuffers被拆分为两个功能类:Unpooled和BufUtil。就像这个名字所暗示的,4.0引入了一个新的池化的ByteBufs,它可以通过ByteBuf的分配器(Allocator)的对应 实现ByteBufAllocator来获得。 大多数的buffer变成了动态的,具备可配置的最大容量在3.x时期,buffer分为固定和动态两种类型。一个固定buffer的容量在创建之后就无法改变,而动态buffer的容量在write*(译者按:writeByte,writeInt,writeLong...)方法需要更多空间时自动扩 容。 从4.0开始,所有buffer都变成了动态的。但是,相对于之前的动态进行了优化。你可以更容易也更安全的对一个buffer的容量进行扩大和缩小。之所以说它容易是因为有一个新的ByteBuf.capacity(int newCapacity)的方法。说它安全是因为你可以设置一个容量的最大值,以防止容量没有限制的增大。 // 不要再使用 dynamicBuffer() - 使用 buffer(). 唯一的例外是那些使用wrappedBuffer方法创建的,包装(warp)了一个buffer或一个byte数组的buffer。你无法扩大它的容量,因为这样会使包装一个已有buffer的目的是去意义——减少内存 的复制。如果你想要在包装了一个buffer之后改变它的容量,你应该重新创建一个拥有足够容量的buffer,然后将你想要包装的那个buffer的内容复制过来。
Pooled ByteBuf前面已经提到Netty引入了pooledByteBufinstances。这在很多方面都很实用,举列 如下:
public interface ByteBufAllocator {要想从一个handler那里获取当前的 ByteBufAllocator,可以使用ChannelHandlerContext.alloc()或Channel.alloc()方法: Channel channel = ...; 一旦一个ByteBuf被写入远程节点,它会再次自动的释放进入释放到池(the pool)里。 默认的ByteBufAllocator为PooledByteBufAllocator.如果你不希望使用buffer pooling或使用你自己的allocator,你可以运用Channel.config ().setAllocator(..),以及一个可供选择的 allocator,比如UnpooledByteBufAllocator。 |
事件对象从ChannelHandler中消失了在3.x时代,所有的I/O操作都会创建一个新的 4.0通过把事件对象替换为直接与类型相对应(译者注:原文为strongly typed,但是我觉得直译为强类型不太容易理解)的方法调用,几乎完全避免了事件对象的创建。3.x中,有类似于 handleUpstream()和handleDownstream()这种能够捕获所有相关类型事件的处理器方法,4.0中你将不会再看到它们的身影了。所有的事件类型现在都有各自对应的处理器方法: ChannelHandlerContext类也被修改来反映上述提到的变化:
// Before: 所有这些变化意味着用户无法去扩展ChannelEvent这个已经不存在的接口了。那用户要怎样才能定义他或她自己的事件类型呢,就像IdleStateEvent?4.0中的ChannelHandler有一个处理器方法叫做 userEventTriggered(),它就是被设计用来满足这种特殊的用户需求。 |
Simplified channel state model在3.x中,当一个新的Channel被创建并连接成功,至少三个ChannelStateEvent会被触发:channelOpen、channelBound以及channelConnected。当一个Channel关闭,则对应channelDisconnected 、channelUnbound以及channelClosed三个事件。 fixme 但是,触发这么多事件的意义并不那么明显。如果在一个Channel进入可读或可写的状态时通知用户,想来会更有帮助。 fixme channelOpen、channelBound和channelConnected被合并为channelActive。channelDisconnected、channelUnbound和 channelClosed被合并为channelInactive。类似的,Channel.isBound()和Channel.isConnected()也被合并为了 Channel.isActive()。 需要注意的是,channelRegistered和channelUnregistered这两个事件与channelOpen和channelClosed具有的意义是不一样的。它们 (channelRegistered和channelUnregistered)是在支持Channel的动态注册、注销以及再注册时被引入的,就像下图所示: fixme |
每个处理器的缓存不像3.x那样在每次读操作都简历一个新堆里的缓存来触发上游的MessageEvent,4.0不会每次都创建新的 缓存。它直接从socket中读取数据到由用户的ChannelInboundByteHandler和ChannelInboundMessageHandler实现创建的入站缓存。 因为由上述处理器创建的入站缓存直到关联的通道关闭前都会重用,所以在上面的GC和内存带宽消耗都能保持较小。同样,当接收到的数据被销毁时用户已经 完成操作,codec的实现就变得更简单和有效了。 |
在创建出站缓存时也是差不多的(不会新建)。用户的ChannelOutBoundBYteHandler和ChannelOutboundMessageHandler来操作。 不需要每条消息都有一个事件4.0里不再有了messageReceived或 public void inboundBufferUpdated(ChannelHandlerContext ctx) {作为选择,用户能够在每个单独的入站(或出站)消息中触发这样的事件来模拟老的行为,尽管相对新方法来说效率更低。 |
消息处理器 vs. 字节处理器在3.x里一个MessageEvent持有一个任意的对象。它能够是一个ChannelBuffer或是一个用户自定义的对象,它们都是同样对待的: @Override在4.0里,它们就分别对待了,因为一个处理器不再处理一个独立的消息,而是处理多种多样的消息: public void inboundBufferUpdated(ChannelHandlerContext ctx) {你可能发现一个ServerChannel的处理器是一个入站缓存是Queue<Channel>的入站处理器是较为有趣的。 处理器适配器大多数用户都发现创建和管理它的生命周期是繁琐的,因此它支持用户扩展预定义好的适配器类来使得更方便:
|
明智的和不易出错的入站流量挂起3.x有一个由Channel.setReadable(boolean)提供的不是很明显的入站流量挂起机制。它引入了在ChannelHandler之间的复杂交互操作,同时处理器由于不正确实现而很容易互相干扰。 4.0里,新的名为read()的出站操作增加了。如果 |
暂停接收传入的连接在3.x里,没有方法让一个用户告诉Netty来厅子接收传入连接,除非是阻塞I/O线程或者关闭服务器socket。在aotu-read标志没有设置的时候,4.0涉及到的read()操作就像一个普通的通道。 半关闭socketTCP和SCTP允许用户关闭一个socket的出站流 3.x没有shutdownOutput()操作。同样,它总是在SocketChannel.read(..)返回-1的时候关闭链接。 要支持半关闭socket,4.0增加了 |
灵活的I/O线程分配在3.x里,一个Channel是由ChannelFactory创建的,同时新创建的Channel会自动注册到一个隐藏的I/O 感谢这个变化(举例来说,分离了ChannelFactory和I/O线程),用户可以注册不同的Channel实现到同一个EventLoopGroup,或者同一个Channel实现到不同的EventLoopGroup。例 如,你可以运行一个NIO服务器socket,NIO UDP socket,以及虚拟机内部的通道在同一个I/O线程里。在编写一个需要最小延迟的代理服务器时这确实很有用。 |
能够从一个已存在的jdk套接字上创建一个Channel3.x没提供方法从已存在的jdk套接字(如java.nio.channels.SocketChannel)创建一个新的通道。现在你可以用4.0这样做了。 取消注册和重新注册一个Channel从/到一个I/O线程一旦一个新的Channel在3.x里创建,它完全绑定到一个单一的I/O线程上,直到它底层的socket关闭。在4.0里,用户能够从I/O线程里取消注册一个Channel来完全控制它底层jdk套接字。 例如,你能够利用Netty提供的高层次无阻塞I/O的优势来解决复杂的协议,然后取消注册Channel并且切换到阻塞模式来在可能的最大吞吐量下传输一个文件。当然,它能够再次注册已经取消了注册的Channel。 java.nio.channels.FileChannel myFile = ...; |
调度任意的任务到一个I/O线程里运行当一个Channel被注册到 public class MyHandler extends ChannelOutboundMessageHandlerAdapter { 简化的关闭releaseExternalResources() |
有两个方法来配置Netty的Channel的
socket参数。第一个是明确地调用ChannelConfig的setter,例如
SocketChannelConfig.setTcpNoDelay(true)。这是最为类型安全的方法。另外一个是调用
ChannelConfig.setOption()方法。有时候你不得不决定在运行时的时候socket要配置什么选项,同时这个方法在这种情况下有点
不切实际。然而,在3.x里它是容易出错的,因为一个用户必需用一对字符串和对象来指定
选项。如果用户调用了错误的选项名或者值,他或她将会赵宇到一个ClassCastException或指定的选项甚至可能会默默地忽略了。
4.0引入了名为ChannelOption的新的类型,它提供了类型安全地访问socket选项。
ChannelConfig cfg = ...;
// Before:
cfg.setOption("tcpNoDelay", true);
cfg.setOption("tcpNoDelay", 0); // Runtime ClassCastException
cfg.setOption("tcpNoDelays", true); // Typo in the option name - ignored silently
// After:
cfg.setOption(ChannelOption.TCP_NODELAY, true);
cfg.setOption(ChannelOption.TCP_NODELAY, 0); // Compile error
在回应用户指令里,你可以附加任意的对象到
Channel和ChannelHandlerContext。一个名为AttributeMap的新接口被加入了,它被Channel和
ChannelHandlerContext继承。作为替代,ChannelLocal和
Channel.attachment被移除。这些属性会在他们关联的Channel被垃圾回收的同时回收。
public class MyHandler extends ChannelInboundMessageHandlerAdapter<MyMessage> {
private static final AttributeKey<MyState> STATE =
new AttributeKey<MyState>("MyHandler.state");
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
ctx.attr(STATE).set(new MyState());
ctx.fireChannelRegistered();
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MyMessage msg) {
MyState state = ctx.attr(STATE).get();
}
...
}
bootstrap API已经重头重写,尽管它的目的还是一样;它执行需要使服务器或客户端运行的典型步骤,通常能在样板代码里找到。新的bootstrap同样采取了流畅的接口。
public static void main(String[] args) throws Exception {
// Configure the server.
ServerBootstrap b = new ServerBootstrap();
try {
b.group(new NioEventLoopGroup(), new NioEventLoopGroup())
.channel(new NioServerSocketChannel())
.option(ChannelOption.SO_BACKLOG, 100)
.localAddress(8080)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(handler1, handler2, ...);
}
});
// Start the server.
ChannelFuture f = b.bind().sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
b.shutdown();
}
}
ChannelPipelineFactory → ChannelInitializer和你在上面的例子注意到的一样,ChannelPipelineFactory不再存在了。而是由ChannelInitializer来替换,它给予了在Channel和ChannelPipeline的配置的更多控制。 请注意,你不能自己创建一个新的ChannelPipeline。通过观察目前为止的用例报告,Netty项目队伍总结到让用户去创建自己的管道实现或者是继承默认的实现是没有好处的。 因此,ChannelPipeline不再让用户创建。ChannelPipeline由Channel自动创建。 |
ChannelFuture拆分为ChannelFuture和ChannelPromiseChannelFuture已经被拆分为ChannelFuture和ChannelPromise了。这不仅仅是让异步操作里的生产者和消费者间的约定更明显,同样也是得在使用从链中返回的ChannelFuture更加安全,因为 ChannelFuture的状态是不能改变的。 由于这个编号,一些方法现在都采用ChannelPromiser而不是ChannelFuture来改变它的状态。 |
良好定义的线程模型在3.x里并没有良好设计的线程模型,尽管曾经要修复线程模型在3.5的不一致性。4.0定义的一个严格的线程模型来帮助用户编写ChannelHandler而不必担心太多关于线程安全的东西。
|
不再有ExecutionHandler ——它包含到核心里在你加入一个ChannelHandler到一个ChannelPipeline来告诉管道总是通过指定的EventExecutor调用加入的ChannelHander处理器的方法的时候,你可以指定一个 EventExecutor。 Channel ch = ...; EventExecutor是EventLoop的超类,同时也继承了ScheduledExecutorService。 fixme 编码解码器框架变化在编码解码器框架里有实质性的内部改变,因为4.0需要一个处理器来创建和管理它的缓存(看这篇文章的每个处理器缓存部分。)然而,从用户角度来看这些变化都不是很大的。
|
编码解码器嵌入器→ EmbeddedChannel编码解码器嵌入器已经被 io.netty.channel.embedded.EmbeddedByteChannel和EmbeddedMessageChannel替换了。EmbeddedChannel允许用户对任何包含编码解码器的管道进行单元测试。 HTTP编码解码器HTTP解码器现在在每个HTTP消息中总生成多个消息对象: 1 * HttpRequest / HttpResponse 要看更多的细节,请到转到已更新了的HttpSnoopServer例子。如果你希望为一个单一的HTTP消息处理多个消息,你可以把HttpObjectAggregator放入管道里。 HttpObjectAggregator会把多个消息转换为 一个单一的FullHttpRequest或是FullHttpResponse。 传输实现的变化下面是传输协议新加入的东西:
|
用例学习:移植示例Factorial这部分粗略地展示把示例Factorial从3.0移植到4.0的步骤。示例Factorial已经移植到4.0了,它放在io.netty.example.factorial包里。请浏览示例的源代码来看下每一处的变化。 移植服务端
|
大部分和移植服务端差不多,但你要在你编写一个潜在的大数据流时要多注意下。
private void sendNumbers() {
// Do not send more than 4096 numbers.
boolean finished = false;
MessageBuf<Object> out = ctx.nextOutboundMessageBuffer();
while (out.size() < 4096) {
if (i <= count) {
out.add(Integer.valueOf(i));
i ++;
} else {
finished = true;
break;
}
}
ChannelFuture f = ctx.flush();
if (!finished) {
f.addListener(numberSender);
}
}
private final ChannelFutureListener numberSender = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
sendNumbers();
}
}
};