解决Netty解码http请求获取URL乱码问题
作者:天上飞下一毛雪
Netty解码http请求获取URL乱码
解决方案
获取URI时,使用URLDecoder进行解码
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { FullHttpRequest fhr = (FullHttpRequest) msg; String uri = URLDecoder.decode(fhr.uri().trim().replace("/", "") .replace("\\", ""), "UTF-8"); }
原因
1、URLEncoder.encode和URLDecoder.decode
URL只能使用英文字母、阿拉伯数字和某些标点符号,不能使用其他文字和符号,即
只有字母和数字[0-9a-zA-Z]、一些特殊符号$-_.+!*'()[不包括双引号]、以及某些保留字(空格转换为+),才可以不经过编码直接用于URL,如果URL中有汉字,就必须编码后使用。
URLDecoder
类包含一个decode(String s,String enc)静态方法,它可以将application/x-www-form-urlencoded MIME字符串转成编码前的字符串;URLEncoder
类包含一个encode(String s,String enc)静态方法,它可以将中文字符及特殊字符用转换成application/x-www-form-urlencoded MIME字符串。
2、使用URLEncoder.encode编码
public static String urlEncode(String urlToken) { String encoded = null; try { //用URLEncoder.encode方法会把空格变成加号(+),encode之后在替换一下 encoded = URLEncoder.encode(urlToken, "UTF-8").replace("+", "%20"); } catch (UnsupportedEncodingException e) { logger.error("URLEncode error {}", e); } return encoded; }
3、使用URLEncoder.encode解码
public static String urlEncode(String urlToken) { String decoded = null; try { decoded =URLDecoder.decode(urlToken, "UTF-8"); } catch (UnsupportedEncodingException e) { logger.error("URLEncode error {}", e); } return decoded; }
Netty---编解码(原理)
1.ByteToMessageDecoder
用于将ByteBuf解码成为POJO对象
重要字段:
ByteBuf cumulation; //缓存 private Cumulator cumulator = MERGE_CUMULATOR; //累计器 private boolean singleDecode; private boolean first; //是否第一次解码 private boolean firedChannelRead; //状态码 private byte decodeState = STATE_INIT; private int discardAfterReads = 16; //解码次数阈值,用来删除已读数据 private int numReads; //解码次数
介绍一下累计器:Cumulator类是干什么的
它的本类中的内部类,而且还是一个接口,只提供了方法。它的实现,只有匿名类,所以就是开头的静态两个字段了。
public interface Cumulator { ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in); }
也就是我们默认使用的cumulator->MEGRE_CUMULATOR,我们看看它是如何实现的cumulator接口
public static final Cumulator MERGE_CUMULATOR = new Cumulator() { //参数:ByteBuf的分配器,本类中的ByteBuf,传递过来的ByteBuf @Override public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { if (!cumulation.isReadable() && in.isContiguous()) { 累加的不可读(比如空缓存),且新的是连续的 cumulation.release(); //释放 return in; } try { final int required = in.readableBytes(); //返回可读区域 //可读区域,大于累加器中的可写区域, 或者累加器只能读 if (required > cumulation.maxWritableBytes() || (required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1) || cumulation.isReadOnly()) { return expandCumulation(alloc, cumulation, in); //扩充累计器 } //写入到累计器中 cumulation.writeBytes(in, in.readerIndex(), required); in.readerIndex(in.writerIndex()); //调整in的读指针到写的位置,那么可读区域为0 return cumulation; } finally { in.release(); //释放ByteBuf } } };
这个类的实现方法,很重要,因为下面的ChannelRead()方法的核心就是调用上面的方法,
重要方法:channelRead()
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { //判断传入的 是否是ByteBuf对象 CodecOutputList out = CodecOutputList.newInstance(); try { first = cumulation == null; //如果为null,说明是第一次 cumulation = cumulator.cumulate(ctx.alloc(), first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg); //判断解码器是否缓存了没有解码完成的半包信息 callDecode(ctx, cumulation, out); //如果为空,说明第一次解析,或者上一次的已经解析完成。 }... } finally { try { if (cumulation != null && !cumulation.isReadable()) { //不为空,不可读,要释放 numReads = 0; cumulation.release(); cumulation = null; } else if (++numReads >= discardAfterReads) {//读取数据的次数大于阈值,则尝试丢弃已读数据 numReads = 0; discardSomeReadBytes(); } int size = out.size(); firedChannelRead |= out.insertSinceRecycled(); //有被添加或者设置,表示已经读过了 fireChannelRead(ctx, out, size); //尝试传递数据 } finally { out.recycle(); } } } else { ctx.fireChannelRead(msg); //其他类型进行传递 } }
先看ctx.alloc()方法就得到的什么,它对应上面cumulator()的第一个参数,返回的自然是Bytebuf的分配器
public ByteBufAllocator alloc() { return channel().config().getAllocator(); //返回ByteBufAllocator,要嘛是池化的,要嘛是非池化 }
如何对msg中的信息,进行转移到本地的cumulator中,
之后调用callDecode进行解码
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { try { while (in.isReadable()) {//可读 int outSize = out.size(); //数量 if (outSize > 0) { //一个一个的把解析出来的结果,传递下去 fireChannelRead(ctx, out, outSize); //传递 out.clear(); //已经传播 的,要清理掉。 if (ctx.isRemoved()) { //上下文被移除了,就不处理了 break; } outSize = 0; } //继续编解码, int oldInputLength = in.readableBytes(); decodeRemovalReentryProtection(ctx, in, out); //解码 ★ if (ctx.isRemoved()) { break; } if (outSize == out.size()) { //没有新生成的消息, if (oldInputLength == in.readableBytes()) { //没有读取数据 break; } else { continue; } } if (oldInputLength == in.readableBytes()) { //解码器没有读取数据 ... } if (isSingleDecode()) { //是否每次只解码一条,就返回 break; ... }
这个方法具体的逻辑就是解码+传播解码出的pojo,传播pojo就是调用context.fire..方法,没什么好看的,我们之前的pipline讲解的时候,已经讲过了事件传播的逻辑,这里我们重点看解码方法
decodeRemovalReentryProtection(),它其实也没有实现解码,功能,我们前面说过,本类只是一个抽象类,具体的解码要交给它的子类,实现类,比如我们之前 章节,解码器的使用部分,我们自定义的Handler继承这个类,它的里面才真正实现了解码的功能。!
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { decodeState = STATE_CALLING_CHILD_DECODE; //状态,调用子类 解码 try { decode(ctx, in, out); //调用子类解码 } finally { boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING; decodeState = STATE_INIT; //处理完了,设置为初始化 if (removePending) { fireChannelRead(ctx, out, out.size()); out.clear(); handlerRemoved(ctx); } } }
再来看,丢弃已读部分的ByteBuf
protected final void discardSomeReadBytes() { if (cumulation != null && !first && cumulation.refCnt() == 1) { cumulation.discardSomeReadBytes(); } }
它其实是一个入口,具体的实现是在AbstractByteBuf中
public ByteBuf discardSomeReadBytes() { if (readerIndex > 0) { if (readerIndex == writerIndex) { ensureAccessible(); adjustMarkers(readerIndex); writerIndex = readerIndex = 0; return this; } if (readerIndex >= capacity() >>> 1) { setBytes(0, this, readerIndex, writerIndex - readerIndex); writerIndex -= readerIndex; adjustMarkers(readerIndex); readerIndex = 0; return this; } } ensureAccessible(); return this; }
2.FixedLengthFrameDecoder
它是ByteToMessageDecoder的子类,也就是实现了具体的decode,解决半包,粘包问题,通过固定长度的手法。
它的字段只有一个,frameLength,固定的长度大小,
方法也就是构造方法+decoder()
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { Object decoded = decode(ctx, in); if (decoded != null) { out.add(decoded); } }
调用重载的方法,简单判断一下长度,然后读取
protected Object decode( @SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception { if (in.readableBytes() < frameLength) { return null; } else { return in.readRetainedSlice(frameLength); //AbstracByteBuf实现的方法 } }
3.MessageToByteEncoder
位于outbound中,功能是将pojo编码成为Byte[]组,
两个字段:
private final TypeParameterMatcher matcher; //类型参数匹配器,针对范型的 private final boolean preferDirect;
第一个字段更重要,是以前没见过的类型,用来处理范型进行匹配的,主要运用在构造方法中。
3.1 TypeParameterMatcher
先看字段,就一个成员Noop,匿名类,实现的是自己!也就实现了match方法,返回true。逻辑简单。
private static final TypeParameterMatcher NOOP = new TypeParameterMatcher() { @Override public boolean match(Object msg) { return true; } };
常用方法:
get(),跟回传进来的Class对象,判断是哪个类型,如果是Object,就是上面NOOP,
public static TypeParameterMatcher get(final Class<?> parameterType) { final Map<Class<?>, TypeParameterMatcher> getCache = InternalThreadLocalMap.get().typeParameterMatcherGetCache(); TypeParameterMatcher matcher = getCache.get(parameterType); //缓存中获取 if (matcher == null) { //未击中 if (parameterType == Object.class) { matcher = NOOP; } else { //内部类,封装Class,match匹配的时候,利用反射,判断是否是这个类的实例 matcher = new ReflectiveMatcher(parameterType); } getCache.put(parameterType, matcher); //放入缓存中 } return matcher; }
内部类,和上面的NOOP逻辑相似
private static final class ReflectiveMatcher extends TypeParameterMatcher { private final Class<?> type; ReflectiveMatcher(Class<?> type) { this.type = type; } @Override //判断 msg是否是type的实现类 public boolean match(Object msg) { return type.isInstance(msg); } }
3.2 write()方法
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf buf = null; try { if (acceptOutboundMessage(msg)) { //类型匹配 @SuppressWarnings("unchecked") I cast = (I) msg; //类型转换 buf = allocateBuffer(ctx, cast, preferDirect); //分配空间 try { encode(ctx, cast, buf); //调用子类编码方法 } finally { ReferenceCountUtil.release(cast); //释放 } if (buf.isReadable()) { //可读 ctx.write(buf, promise); //传播 } else { buf.release(); ctx.write(Unpooled.EMPTY_BUFFER, promise); } buf = null; } else { ctx.write(msg, promise); } } ...释放 }
if中的方法,就会调用上方的matcher进行匹配
public boolean acceptOutboundMessage(Object msg) throws Exception { return matcher.match(msg); }
然后分配一个空间,作为ByteBuf
protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, @SuppressWarnings("unused") I msg, boolean preferDirect) throws Exception { if (preferDirect) { //是否是直接内存 return ctx.alloc().ioBuffer(); } else { return ctx.alloc().heapBuffer(); } }
再调用子类,实现类的encode()方法,进行编码,同样也就是调用ByteBuf的写入方法,将对象写进去。
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。