org.jboss.netty.handler.queue
类 BufferedWriteHandler

java.lang.Object
  继承者 org.jboss.netty.channel.SimpleChannelHandler
      继承者 org.jboss.netty.handler.queue.BufferedWriteHandler
所有已实现的接口:
ChannelDownstreamHandler, ChannelHandler, ChannelUpstreamHandler

public class BufferedWriteHandler
extends SimpleChannelHandler

模拟缓冲写操作.该处理器存储所有写请求到一个为绑定的Queue并当flush()方法被调用时刷新它们到下游.

这里有个演示用法的例子:

 BufferedWriteHandler bufferedWriter = new BufferedWriteHandler();
 ChannelPipeline p = ...;
 p.addFirst("buffer", bufferedWriter);
 
 ...
 
 Channel ch = ...;
 
 // msg1, 2, and 3被存储到bufferedWriter的队列里.
 ch.write(msg1);
 ch.write(msg2);
 ch.write(msg3);
 
 // 需要时被刷新.
 bufferedWriter.flush();
 

自动刷新

当关联的Channel被断开或关闭时写请求队列会自动刷新.否则不会刷新该队列.这意味着在增加更多的队列大小前你必须调用 flush().你可以通过继承该处理器实现自己自动刷新的策略:
 public class AutoFlusher extends BufferedWriteHandler {
 
     private final AtomicLong bufferSize = new AtomicLong();
 
     @Override
     public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) {
         super.writeRequested(ctx, e);
 
         ChannelBuffer data = (ChannelBuffer) e.getMessage();
         int newBufferSize = bufferSize.addAndGet(data.readableBytes());
 
         // 如果获得大于8KiB则刷新队列.
         if (newBufferSize > 8192) {
             flush();
             bufferSize.set(0);
         }
     }
 }
 

合并刷新

如果队列里有两个或更多的写请求并且他们的消息类型都是ChannelBuffer,那么它们可以被合并到一个单一的写请求以节省系统调用数量.
 合并前:                           合并后:
 +-------+-------+-------+        +-------------+
 | Req C | Req B | Req A |------\\| Request ABC |
 | "789" | "456" | "123" |------//| "123456789" |
 +-------+-------+-------+        +-------------+
 
这个特性默认是关闭的.当你创建该处理器或调用flush(boolean)时可以重写默认值.当你调用构造方法时指定了true,那么调用 flush()会一直合并该队列. 否则,你每次刷新都必须调用参数为true的flush(boolean)来打开该特性.

合并的缺点是ChannelFuture和它原始写请求关联的ChannelFutureListener 收到的通知会比实际写出的通知要迟.当合并写请求完全被写时它们会被通知.

以下例子实现了合并策略以减少可写通道的写请求数量:

 public class ConsolidatingAutoFlusher extends BufferedWriteHandler {
 
     public ConsolidatingAutoFlusher() {
         // 默认允许合并.
         super(true);
     }
 
     @Override
     public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
         ChannelConfig cfg = e.getChannel().getConfig();
         if (cfg instanceof NioSocketChannelConfig) {
             // 更低的水印增加合并机会.
             cfg.setWriteBufferLowWaterMark(0);
         }
         super.channelOpen(e);
     }
 
     @Override
     public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
         super.writeRequested(ctx, et);
         if (e.getChannel().isWritable()) {
             flush();
         }
     }
 
     @Override
     public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
         if (e.getChannel().isWritable()) {
             flush();
         }
     }
 }
 

写优先权

你可以在该处理器的构造方法指定一个未绑定优先权的队列来实现写的优先权.在设计适当策略检测flush()方法多久被调用一次时就会需要. 例如,你可以使用HashedWheelTimer每秒周期性的调用flush().


嵌套类摘要
 
从接口 org.jboss.netty.channel.ChannelHandler 继承的嵌套类/接口
ChannelHandler.Sharable
 
构造方法摘要
BufferedWriteHandler()
          创建一个默认未绑定BlockingQueue实现且没有缓冲合并的新实例.
BufferedWriteHandler(boolean consolidateOnFlush)
          创建一个默认未绑定BlockingQueue实现的新实例.
BufferedWriteHandler(java.util.Queue<MessageEvent> queue)
          创建一个指定线程安全的未绑定Queue且没有缓冲合并的新实例.请注意指定一个绑定了的Queue或一个非线程安全的 Queue将会导致意想不到的行为.
BufferedWriteHandler(java.util.Queue<MessageEvent> queue, boolean consolidateOnFlush)
          创建一个指定线程安全的未绑定Queue且没有缓冲合并的新实例.请注意指定一个绑定了的Queue或一个非线程安全的 Queue将会导致意想不到的行为.
 
方法摘要
 void closeRequested(ChannelHandlerContext ctx, ChannelStateEvent e)
          当Channel.close()被调用时调用.
 void disconnectRequested(ChannelHandlerContext ctx, ChannelStateEvent e)
          当Channel.disconnect()被调用时调用.
 void flush()
          发送队列里的写请求到下游.
 void flush(boolean consolidateOnFlush)
          发送队列里的写请求到下游.
 boolean isConsolidateOnFlush()
           
 void writeRequested(ChannelHandlerContext ctx, MessageEvent e)
          保存所有写请求到队列中以便在调用flush()时可以真实的被写.
 
从类 org.jboss.netty.channel.SimpleChannelHandler 继承的方法
bindRequested, channelBound, channelClosed, channelConnected, channelDisconnected, channelInterestChanged, channelOpen, channelUnbound, childChannelClosed, childChannelOpen, connectRequested, exceptionCaught, handleDownstream, handleUpstream, messageReceived, setInterestOpsRequested, unbindRequested, writeComplete
 
从类 java.lang.Object 继承的方法
equals, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

构造方法详细信息

BufferedWriteHandler

public BufferedWriteHandler()
创建一个默认未绑定BlockingQueue实现且没有缓冲合并的新实例.


BufferedWriteHandler

public BufferedWriteHandler(java.util.Queue<MessageEvent> queue)
创建一个指定线程安全的未绑定Queue且没有缓冲合并的新实例.请注意指定一个绑定了的Queue或一个非线程安全的 Queue将会导致意想不到的行为.


BufferedWriteHandler

public BufferedWriteHandler(boolean consolidateOnFlush)
创建一个默认未绑定BlockingQueue实现的新实例.

参数:
consolidateOnFlush - 如果是true则当调用flush()时缓冲写请求被合并到一个单一的写请求

BufferedWriteHandler

public BufferedWriteHandler(java.util.Queue<MessageEvent> queue,
                            boolean consolidateOnFlush)
创建一个指定线程安全的未绑定Queue且没有缓冲合并的新实例.请注意指定一个绑定了的Queue或一个非线程安全的 Queue将会导致意想不到的行为.

参数:
consolidateOnFlush - 如果是true则当调用flush()时缓冲写请求被合并到一个单一的写请求
方法详细信息

isConsolidateOnFlush

public boolean isConsolidateOnFlush()

flush

public void flush()
发送队列里的写请求到下游.


flush

public void flush(boolean consolidateOnFlush)
发送队列里的写请求到下游.

参数:
consolidateOnFlush - 如果是true则当调用flush()时缓冲写请求被合并到一个单一的写请求

writeRequested

public void writeRequested(ChannelHandlerContext ctx,
                           MessageEvent e)
                    throws java.lang.Exception
保存所有写请求到队列中以便在调用flush()时可以真实的被写.

覆盖:
SimpleChannelHandler 中的 writeRequested
抛出:
java.lang.Exception

disconnectRequested

public void disconnectRequested(ChannelHandlerContext ctx,
                                ChannelStateEvent e)
                         throws java.lang.Exception
从类 SimpleChannelHandler 复制的描述
Channel.disconnect()被调用时调用.

覆盖:
SimpleChannelHandler 中的 disconnectRequested
抛出:
java.lang.Exception

closeRequested

public void closeRequested(ChannelHandlerContext ctx,
                           ChannelStateEvent e)
                    throws java.lang.Exception
从类 SimpleChannelHandler 复制的描述
Channel.close()被调用时调用.

覆盖:
SimpleChannelHandler 中的 closeRequested
抛出:
java.lang.Exception