S
- 通常是Enum
的状态类型;
如果状态管理未被使用,请使用Void
public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder
ByteToMessageDecoder
的一个特殊变体,它可以在阻塞I / O范例中实现非阻塞解码器。
最大的区别ReplayingDecoder
和ByteToMessageDecoder
是ReplayingDecoder
,您可以实现decode()
种decodeLast()
方法就像已经获得所有所需的字节,而不是检查所需的字节的可用性。 例如,以下ByteToMessageDecoder
实施:
public class IntegerHeaderFrameDecoder extends ByteToMessageDecoder
{
@Override
protected void decode(ChannelHandlerContext
ctx,
ByteBuf
buf, List<Object> out) throws Exception {
if (buf.readableBytes() < 4) {
return;
}
buf.markReaderIndex();
int length = buf.readInt();
if (buf.readableBytes() < length) {
buf.resetReaderIndex();
return;
}
out.add(buf.readBytes(length));
}
}
与ReplayingDecoder
一样被简化为:
public class IntegerHeaderFrameDecoder
extends ReplayingDecoder
<Void
> {
protected void decode(ChannelHandlerContext
ctx,
ByteBuf
buf) throws Exception {
out.add(buf.readBytes(buf.readInt()));
}
}
ReplayingDecoder
传递一个专门的ByteBuf
实现,当缓冲区中没有足够的数据时,该实现会抛出某种类型的Error
。 在上面的IntegerHeaderFrameDecoder
,当您调用buf.readInt()
时,您只是假定缓冲区中将有4个或更多字节。 如果缓冲区中真的有4个字节,它将按照您的预期返回整数头。 否则, Error
将会被提升,控制权将被返回到ReplayingDecoder
。 如果ReplayingDecoder
捕获到Error
,则它会将缓冲区的readerIndex
回退到'初始'位置(即缓冲区的开始位置),并在缓冲区接收到更多数据时再次调用decode(..)
方法。
请注意, ReplayingDecoder
总是抛出相同的缓存Error
实例,以避免创建新的Error
并为每次抛掷填充其堆栈跟踪的开销。
以简单性为代价, ReplayingDecoder
强制您有一些限制:
decode(..)
方法来解码单个消息。 例如,下面的代码将不起作用: public class MyDecoder extends ReplayingDecoder
<Void
> {
private final Queue<Integer> values = new LinkedList<Integer>();
@Override
public void decode(.., ByteBuf
buf, List<Object> out) throws Exception {
// A message contains 2 integers.
values.offer(buf.readInt());
values.offer(buf.readInt());
// This assertion will fail intermittently since values.offer()
// can be called more than two times!
assert values.size() == 2;
out.add(values.poll() + values.poll());
}
}
正确的实现如下所示,您还可以使用下一节详细介绍的“检查点”功能。 public class MyDecoder extends ReplayingDecoder
<Void
> {
private final Queue<Integer> values = new LinkedList<Integer>();
@Override
public void decode(.., ByteBuf
buf, List<Object> out) throws Exception {
// Revert the state of the variable that might have been changed
// since the last partial decode.
values.clear();
// A message contains 2 integers.
values.offer(buf.readInt());
values.offer(buf.readInt());
// Now we know this assertion will never fail.
assert values.size() == 2;
out.add(values.poll() + values.poll());
}
}
幸运的是,使用checkpoint()
方法可以显着提高复杂解码器实现的性能。 checkpoint()
方法更新缓冲区的'初始'位置,以便ReplayingDecoder
将缓冲区的readerIndex倒readerIndex
您调用checkpoint()
方法的最后位置。
checkpoint(T)
与Enum
虽然您可以使用checkpoint()
方法并checkpoint()
管理解码器的状态,但管理解码器状态的最简单方法是创建一个代表解码器当前状态的Enum
类型,并在状态更改时调用checkpoint(T)
方法。 根据您想要解码的消息的复杂程度,您可以拥有任意数量的状态:
public enum MyDecoderState {
READ_LENGTH,
READ_CONTENT;
}
public class IntegerHeaderFrameDecoder
extends ReplayingDecoder
<MyDecoderState> {
private int length;
public IntegerHeaderFrameDecoder() {
// Set the initial state.
super(MyDecoderState.READ_LENGTH);
}
@Override
protected void decode(ChannelHandlerContext
ctx,
ByteBuf
buf, List<Object> out) throws Exception {
switch (state()) {
case READ_LENGTH:
length = buf.readInt();
checkpoint(MyDecoderState.READ_CONTENT);
case READ_CONTENT:
ByteBuf frame = buf.readBytes(length);
checkpoint(MyDecoderState.READ_LENGTH);
out.add(frame);
break;
default:
throw new Error("Shouldn't reach here.");
}
}
}
checkpoint()
带参数调用checkpoint()
管理解码器状态的另一种方法是自己管理它。
public class IntegerHeaderFrameDecoder
extends ReplayingDecoder
<Void
> {
private boolean readLength;
private int length;
@Override
protected void decode(ChannelHandlerContext
ctx,
ByteBuf
buf, List<Object> out) throws Exception {
if (!readLength) {
length = buf.readInt();
readLength = true;
checkpoint();
}
if (readLength) {
ByteBuf frame = buf.readBytes(length);
readLength = false;
checkpoint();
out.add(frame);
}
}
}
如果你打算写一个协议复用器,你可能会想更换ReplayingDecoder
与另一个(协议检测) ReplayingDecoder
, ByteToMessageDecoder
或MessageToMessageDecoder
(实际协议解码器)。 通过调用ChannelPipeline.replace(ChannelHandler, String, ChannelHandler)
不可能实现这一点 ,但需要一些额外的步骤:
public class FirstDecoder extends ReplayingDecoder
<Void
> {
@Override
protected void decode(ChannelHandlerContext
ctx,
ByteBuf
buf, List<Object> out) {
...
// Decode the first message
Object firstMessage = ...;
// Add the second decoder
ctx.pipeline().addLast("second", new SecondDecoder());
if (buf.isReadable()) {
// Hand off the remaining data to the second decoder
out.add(firstMessage);
out.add(buf.readBytes(super.actualReadableBytes()));
} else {
// Nothing to hand off
out.add(firstMessage);
}
// Remove the first decoder (me)
ctx.pipeline().remove(this);
}
ByteToMessageDecoder.Cumulator
ChannelHandler.Sharable
COMPOSITE_CUMULATOR, MERGE_CUMULATOR
Modifier | Constructor and Description |
---|---|
protected |
ReplayingDecoder()
创建一个没有初始状态的新实例(即:
null )。
|
protected |
ReplayingDecoder(S initialState)
用指定的初始状态创建一个新实例。
|
Modifier and Type | Method and Description |
---|---|
protected void |
callDecode(ChannelHandlerContext ctx, ByteBuf in, java.util.List<java.lang.Object> out)
调用一次数据应该从给定的 ByteBuf 解码。
|
protected void |
checkpoint()
存储内部累积缓冲区的阅读器位置。
|
protected void |
checkpoint(S state)
存储内部累积缓冲区的读取器位置并更新当前的解码器状态。
|
protected S |
state()
返回此解码器的当前状态。
|
protected S |
state(S newState)
设置此解码器的当前状态。
|
actualReadableBytes, channelInactive, channelRead, channelReadComplete, decode, decodeLast, discardSomeReadBytes, handlerRemoved, handlerRemoved0, internalBuffer, isSingleDecode, setCumulator, setDiscardAfterReads, setSingleDecode, userEventTriggered
channelActive, channelRegistered, channelUnregistered, channelWritabilityChanged, exceptionCaught
ensureNotSharable, handlerAdded, isSharable
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
handlerAdded
protected ReplayingDecoder()
null
)。
protected ReplayingDecoder(S initialState)
protected void checkpoint()
protected void checkpoint(S state)
protected S state()
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, java.util.List<java.lang.Object> out)
ByteToMessageDecoder
ByteBuf
解码。
只要解码应该发生,该方法将调用ByteToMessageDecoder.decode(ChannelHandlerContext, ByteBuf, List)
。
callDecode
在课程
ByteToMessageDecoder
ctx
- 这是ByteToMessageDecoder
所属的ChannelHandlerContext
in
- 从中读取数据的ByteBuf
out
- the
List
to which decoded messages should be added
Copyright © 2008–2018 The Netty Project. All rights reserved.