S - 通常是Enum的状态类型;
如果状态管理未被使用,请使用Void
public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder
ByteToMessageDecoder的一个特殊变体,它可以在阻塞I / O范例中实现非阻塞解码器。
最大的区别ReplayingDecoder和ByteToMessageDecoder是ReplayingDecoder ,您可以实现decode()种decodeLast()方法就像已经获得所有所需的字节,而不是检查所需的字节的可用性。 例如,以下ByteToMessageDecoder实施:
public class IntegerHeaderFrameDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx,
ByteBuf buf, List<Object> out) throws Exception {
if (buf.readableBytes() < 4) {
return;
}
buf.markReaderIndex();
int length = buf.readInt();
if (buf.readableBytes() < length) {
buf.resetReaderIndex();
return;
}
out.add(buf.readBytes(length));
}
}
与ReplayingDecoder一样被简化为:
public class IntegerHeaderFrameDecoder
extends ReplayingDecoder<Void> {
protected void decode(ChannelHandlerContext ctx,
ByteBuf buf) throws Exception {
out.add(buf.readBytes(buf.readInt()));
}
}
ReplayingDecoder传递一个专门的ByteBuf实现,当缓冲区中没有足够的数据时,该实现会抛出某种类型的Error 。 在上面的IntegerHeaderFrameDecoder ,当您调用buf.readInt()时,您只是假定缓冲区中将有4个或更多字节。 如果缓冲区中真的有4个字节,它将按照您的预期返回整数头。 否则, Error将会被提升,控制权将被返回到ReplayingDecoder 。 如果ReplayingDecoder捕获到Error ,则它会将缓冲区的readerIndex回退到'初始'位置(即缓冲区的开始位置),并在缓冲区接收到更多数据时再次调用decode(..)方法。
请注意, ReplayingDecoder总是抛出相同的缓存Error实例,以避免创建新的Error并为每次抛掷填充其堆栈跟踪的开销。
以简单性为代价, ReplayingDecoder强制您有一些限制:
decode(..)方法来解码单个消息。 例如,下面的代码将不起作用: public class MyDecoder extends ReplayingDecoder<Void> {
private final Queue<Integer> values = new LinkedList<Integer>();
@Override
public void decode(.., ByteBuf buf, List<Object> out) throws Exception {
// A message contains 2 integers.
values.offer(buf.readInt());
values.offer(buf.readInt());
// This assertion will fail intermittently since values.offer()
// can be called more than two times!
assert values.size() == 2;
out.add(values.poll() + values.poll());
}
} 正确的实现如下所示,您还可以使用下一节详细介绍的“检查点”功能。 public class MyDecoder extends ReplayingDecoder<Void> {
private final Queue<Integer> values = new LinkedList<Integer>();
@Override
public void decode(.., ByteBuf buf, List<Object> out) throws Exception {
// Revert the state of the variable that might have been changed
// since the last partial decode.
values.clear();
// A message contains 2 integers.
values.offer(buf.readInt());
values.offer(buf.readInt());
// Now we know this assertion will never fail.
assert values.size() == 2;
out.add(values.poll() + values.poll());
}
} 幸运的是,使用checkpoint()方法可以显着提高复杂解码器实现的性能。 checkpoint()方法更新缓冲区的'初始'位置,以便ReplayingDecoder将缓冲区的readerIndex倒readerIndex您调用checkpoint()方法的最后位置。
checkpoint(T)与Enum 虽然您可以使用checkpoint()方法并checkpoint()管理解码器的状态,但管理解码器状态的最简单方法是创建一个代表解码器当前状态的Enum类型,并在状态更改时调用checkpoint(T)方法。 根据您想要解码的消息的复杂程度,您可以拥有任意数量的状态:
public enum MyDecoderState {
READ_LENGTH,
READ_CONTENT;
}
public class IntegerHeaderFrameDecoder
extends ReplayingDecoder<MyDecoderState> {
private int length;
public IntegerHeaderFrameDecoder() {
// Set the initial state.
super(MyDecoderState.READ_LENGTH);
}
@Override
protected void decode(ChannelHandlerContext ctx,
ByteBuf buf, List<Object> out) throws Exception {
switch (state()) {
case READ_LENGTH:
length = buf.readInt();
checkpoint(MyDecoderState.READ_CONTENT);
case READ_CONTENT:
ByteBuf frame = buf.readBytes(length);
checkpoint(MyDecoderState.READ_LENGTH);
out.add(frame);
break;
default:
throw new Error("Shouldn't reach here.");
}
}
}
checkpoint()带参数调用checkpoint() 管理解码器状态的另一种方法是自己管理它。
public class IntegerHeaderFrameDecoder
extends ReplayingDecoder<Void> {
private boolean readLength;
private int length;
@Override
protected void decode(ChannelHandlerContext ctx,
ByteBuf buf, List<Object> out) throws Exception {
if (!readLength) {
length = buf.readInt();
readLength = true;
checkpoint();
}
if (readLength) {
ByteBuf frame = buf.readBytes(length);
readLength = false;
checkpoint();
out.add(frame);
}
}
}
如果你打算写一个协议复用器,你可能会想更换ReplayingDecoder与另一个(协议检测) ReplayingDecoder , ByteToMessageDecoder或MessageToMessageDecoder (实际协议解码器)。 通过调用ChannelPipeline.replace(ChannelHandler, String, ChannelHandler)不可能实现这一点 ,但需要一些额外的步骤:
public class FirstDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx,
ByteBuf buf, List<Object> out) {
...
// Decode the first message
Object firstMessage = ...;
// Add the second decoder
ctx.pipeline().addLast("second", new SecondDecoder());
if (buf.isReadable()) {
// Hand off the remaining data to the second decoder
out.add(firstMessage);
out.add(buf.readBytes(super.actualReadableBytes()));
} else {
// Nothing to hand off
out.add(firstMessage);
}
// Remove the first decoder (me)
ctx.pipeline().remove(this);
}
ByteToMessageDecoder.CumulatorChannelHandler.SharableCOMPOSITE_CUMULATOR, MERGE_CUMULATOR| Modifier | Constructor and Description |
|---|---|
protected |
ReplayingDecoder()
创建一个没有初始状态的新实例(即:
null )。
|
protected |
ReplayingDecoder(S initialState)
用指定的初始状态创建一个新实例。
|
| Modifier and Type | Method and Description |
|---|---|
protected void |
callDecode(ChannelHandlerContext ctx, ByteBuf in, java.util.List<java.lang.Object> out)
调用一次数据应该从给定的 ByteBuf解码。
|
protected void |
checkpoint()
存储内部累积缓冲区的阅读器位置。
|
protected void |
checkpoint(S state)
存储内部累积缓冲区的读取器位置并更新当前的解码器状态。
|
protected S |
state()
返回此解码器的当前状态。
|
protected S |
state(S newState)
设置此解码器的当前状态。
|
actualReadableBytes, channelInactive, channelRead, channelReadComplete, decode, decodeLast, discardSomeReadBytes, handlerRemoved, handlerRemoved0, internalBuffer, isSingleDecode, setCumulator, setDiscardAfterReads, setSingleDecode, userEventTriggeredchannelActive, channelRegistered, channelUnregistered, channelWritabilityChanged, exceptionCaughtensureNotSharable, handlerAdded, isSharableclone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waithandlerAddedprotected ReplayingDecoder()
null )。
protected ReplayingDecoder(S initialState)
protected void checkpoint()
protected void checkpoint(S state)
protected S state()
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, java.util.List<java.lang.Object> out)
ByteToMessageDecoder
ByteBuf解码。
只要解码应该发生,该方法将调用ByteToMessageDecoder.decode(ChannelHandlerContext, ByteBuf, List) 。
callDecode在课程
ByteToMessageDecoder
ctx - 这是ByteToMessageDecoder所属的ChannelHandlerContext
in - 从中读取数据的ByteBuf
out - the
List to which decoded messages should be added
Copyright © 2008–2018 The Netty Project. All rights reserved.