最近在项目中使用 Spring Cloud Gateway 时,我遇到了一个关于堆外内存(Off-heap Memory)持续增长的问题。
最初,我发现网关服务启动后,通过 Kubernetes 监控观察到 Pod 的内存使用量呈“阶梯状”上升。即使发生 GC,内存也没有明显回落,直到触及容器的内存上限。起初我并没有特别在意,直到我发现了以下错误:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 java.lang.OutOfMemoryError: Cannot reserve 13752223 bytes of direct buffer memory (allocated: 265781447, limit: 268435456) at java.nio.Bits.reserveMemory(Bits.java:178) ~[?:?] Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Error has been observed at the following site(s): *__checkpoint ⇢ com.abbemobility.framework.cloud.gateway.filter.ActuatorSecurityWebFilter [DefaultWebFilterChain] *__checkpoint ⇢ com.abbemobility.gateway.security.PreAuthorizationFilter [DefaultWebFilterChain] *__checkpoint ⇢ org.springframework.web.cors.reactive.CorsWebFilter [DefaultWebFilterChain] *__checkpoint ⇢ org.springframework.cloud.gateway.filter.WeightCalculatorWebFilter [DefaultWebFilterChain] *__checkpoint ⇢ org.springframework.security.web.server.authorization.AuthorizationWebFilter [DefaultWebFilterChain] *__checkpoint ⇢ org.springframework.security.web.server.authorization.ExceptionTranslationWebFilter [DefaultWebFilterChain] *__checkpoint ⇢ org.springframework.security.web.server.authentication.logout.LogoutWebFilter [DefaultWebFilterChain] *__checkpoint ⇢ org.springframework.security.web.server.savedrequest.ServerRequestCacheWebFilter [DefaultWebFilterChain] *__checkpoint ⇢ org.springframework.security.web.server.context.SecurityContextServerWebExchangeWebFilter [DefaultWebFilterChain] *__checkpoint ⇢ org.springframework.security.web.server.authentication.AuthenticationWebFilter [DefaultWebFilterChain] *__checkpoint ⇢ org.springframework.security.web.server.context.ReactorContextWebFilter [DefaultWebFilterChain] *__checkpoint ⇢ com.abbemobility.gateway.security.PreAuthorizationFilter [DefaultWebFilterChain] *__checkpoint ⇢ org.springframework.web.cors.reactive.CorsWebFilter [DefaultWebFilterChain] *__checkpoint ⇢ org.springframework.security.web.server.header.HttpHeaderWriterWebFilter [DefaultWebFilterChain] *__checkpoint ⇢ org.springframework.security.config.web.server.ServerHttpSecurity$ServerWebExchangeReactorContextWebFilter [DefaultWebFilterChain] *__checkpoint ⇢ org.springframework.security.web.server.WebFilterChainProxy [DefaultWebFilterChain] *__checkpoint ⇢ org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter [DefaultWebFilterChain]
检查了 JVM 参数中 MaxDirectMemorySize 的值:
1 - '-XX:MaxDirectMemorySize=256M'
查看了对应的内存监控,发现 Direct Buffers 的数值,无论是数量还是内存使用都在持续增长。
初步排查:定位 Netty 堆外内存 Spring Cloud Gateway 底层依赖 Netty,而 Netty 默认使用直接内存(Direct Memory)来分配 ByteBuf。有没有可能是这块出现问题?
我新增了以下 JVM 参数用于诊断:
1 2 - '-Dio.netty.leakDetection.level=ADVANCED' - '-XX:NativeMemoryTracking=summary'
同时添加了一个定时任务来监控 Netty 内存使用情况:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import io.netty.buffer.PooledByteBufAllocatorMetric;import io.netty.buffer.PooledByteBufAllocator;import lombok.extern.slf4j.Slf4j;import org.springframework.context.annotation.Configuration;import org.springframework.scheduling.annotation.EnableScheduling;import org.springframework.scheduling.annotation.Scheduled;@Slf4j @Configuration @EnableScheduling public class NettyMemoryMonitor { @Scheduled(fixedRate = 30000) public void printNettyMemoryStatus () { PooledByteBufAllocatorMetric metric = PooledByteBufAllocator.DEFAULT.metric(); log.info("【Netty内存监控】已用直接内存: {} MB, 直接内存Arena数: {}, Chunk大小: {} KB" , metric.usedDirectMemory() / 1024 / 1024 , metric.numDirectArenas(), metric.chunkSize() / 1024 ); } }
经过一段时间的观察,直接内存确实在持续增长:
1 2 3 4 5 6 7 8 9 【Netty内存监控】已用直接内存: 188 MB, 直接内存Arena数: 2, Chunk大小: 4096 KB 【Netty内存监控】已用直接内存: 167 MB, 直接内存Arena数: 2, Chunk大小: 4096 KB 【Netty内存监控】已用直接内存: 141 MB, 直接内存Arena数: 2, Chunk大小: 4096 KB 【Netty内存监控】已用直接内存: 123 MB, 直接内存Arena数: 2, Chunk大小: 4096 KB 【Netty内存监控】已用直接内存: 99 MB, 直接内存Arena数: 2, Chunk大小: 4096 KB 【Netty内存监控】已用直接内存: 78 MB, 直接内存Arena数: 2, Chunk大小: 4096 KB 【Netty内存监控】已用直接内存: 56 MB, 直接内存Arena数: 2, Chunk大小: 4096 KB 【Netty内存监控】已用直接内存: 42 MB, 直接内存Arena数: 2, Chunk大小: 4096 KB 【Netty内存监控】已用直接内存: 8 MB, 直接内存Arena数: 2, Chunk大小: 4096 KB
此时,我已经确认了问题出在 Netty 的直接内存管理上。
理解 Netty 内存管理机制 Spring Cloud Gateway 底层依赖 Netty,而 Netty 默认使用直接内存(Direct Memory)来分配 ByteBuf。
在原生的 Netty 中,我们通常直接调用 byteBuf.release() 或 ReferenceCountUtil.release(msg)。无论是 Spring 的包装还是 Netty 原生,调用这个 release() 的重要性可以用四个字来概括:生死攸关。
如果不调用它,应用程序最终一定会因为内存泄漏而崩溃。其重要性体现在以下几个核心层面:
拯救迟钝的 JVM 垃圾回收 为了追求极致的网络读写性能(实现零拷贝 Zero-Copy),Netty 默认使用的是堆外内存(Direct Memory) ,而不是 JVM 堆内存。
JVM 的垃圾回收器(GC)对”堆内内存”的管理非常高效,但对”堆外内存”的感知和回收非常迟钝。
如果完全依赖 JVM GC 来清理堆外内存,内存释放的速度会远远跟不上高并发下网络请求分配内存的速度。
触发 Netty 的池化与引用计数机制 既然不能靠 JVM,Netty 就自己实现了一套内存管理系统:内存池(Pooled Allocator) + 引用计数(Reference Counting) 。
每当 Netty 分配一块 DataBuffer 给请求使用时,它的引用计数器初始化为 1。
release() 方法的作用,就是明确告诉 Netty:”我用完这块内存了,把计数器减 1” 。
当计数器归零时,Netty 会立刻 将这块内存回收到它的内存池中,马上就能分配给下一个网络请求使用。
防止直接内存溢出导致宕机 如果读取了网络数据(比如 HTTP 请求的 Body),处理完业务逻辑后,忘记了调用 release,会发生什么?
计数器永远不会归零。
这块堆外内存永远不会被退还给 Netty 的内存池。
随着请求量增加,内存池很快被耗尽,Netty 不得不向操作系统申请新的物理内存。
最终,操作系统内存耗尽,应用抛出致命的 java.lang.OutOfMemoryError: Direct buffer memory。
由于 Spring Cloud Gateway 是 Spring 团队开发的,他们提供了一个工具类方法 DataBufferUtils.release(dataBuffer)。
当你使用 Spring WebFlux,且底层服务器是 Reactor Netty 时,这个 DataBuffer 本质上包装的就是 Netty 的 ByteBuf。
定位根因:自定义 Filter 中的 DataBuffer 未正确释放 在公司项目的网关层中,我实现了一些自定义的 GlobalFilter 和 GatewayFilter 用于处理请求体解密和响应体脱敏等操作。这些操作需要处理底层的 Flux<DataBuffer>。于是我开始推测,是否在自定义 Filter 中操作了这些 DataBuffer(底层即 Netty 的 ByteBuf)后,没有正确地释放它们,导致直接内存无法被回收。
前面提到的两个 JVM 参数在这里发挥了关键作用:
开启 Netty 内存泄漏检测 (-Dio.netty.leakDetection.level=ADVANCED):在日志中捕获未调用 release() 的 Buffer 分配和最后访问的代码堆栈。
开启本地内存追踪 (-XX:NativeMemoryTracking=summary):观察不同内存区域(如 Direct 区域)的变化情况。
通过日志观察,当发生泄漏并触发垃圾回收时,Netty 的 ResourceLeakDetector 输出了详细的泄漏报告:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 [ERROR] io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information. Recent access records: #1: io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290) io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800) io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:499) io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:397) io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) java.base/java.lang.Thread.run(Thread.java:840) #2: io.netty.buffer.AdvancedLeakAwareByteBuf.getByte(AdvancedLeakAwareByteBuf.java:155) io.netty.handler.codec.http.HttpObjectDecoder.decode(HttpObjectDecoder.java:417) io.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:239) io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:519) io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:458) io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:280) io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800) io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:499) io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:397) io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) java.base/java.lang.Thread.run(Thread.java:840) #3: io.netty.buffer.AdvancedLeakAwareByteBuf.forEachByte(AdvancedLeakAwareByteBuf.java:671) io.netty.handler.codec.http.HttpObjectDecoder$HeaderParser.parse(HttpObjectDecoder.java:941) io.netty.handler.codec.http.HttpObjectDecoder$LineParser.parse(HttpObjectDecoder.java:998) io.netty.handler.codec.http.HttpObjectDecoder.decode(HttpObjectDecoder.java:376) io.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:239) io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:519) io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:458) io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:280) io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800) io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:499) io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:397) io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) java.base/java.lang.Thread.run(Thread.java:840) #4: io.netty.buffer.AdvancedLeakAwareByteBuf.readRetainedSlice(AdvancedLeakAwareByteBuf.java:107) io.netty.handler.codec.http.HttpObjectDecoder.decode(HttpObjectDecoder.java:402) io.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:239) io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:519) io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:458) io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:280) io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800) io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:499) io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:397) io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) java.base/java.lang.Thread.run(Thread.java:840) #5: Hint: 'reactor.left.httpCodec' will handle the message from this point. io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:116) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:417) io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800) io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:499) io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:397) io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) java.base/java.lang.Thread.run(Thread.java:840) #6: Hint: 'DefaultChannelPipeline$HeadContext#0' will handle the message from this point. io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:116) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:417) io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800) io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:499) io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:397) io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) java.base/java.lang.Thread.run(Thread.java:840) Created at: io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:403) io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188) io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179) io.netty.channel.unix.PreferredDirectByteBufAllocator.ioBuffer(PreferredDirectByteBufAllocator.java:53) io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:120) io.netty.channel.epoll.EpollRecvByteAllocatorHandle.allocate(EpollRecvByteAllocatorHandle.java:75) io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:785) io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:499) io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:397) io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) java.base/java.lang.Thread.run(Thread.java:840) : 1 leak records were discarded because they were duplicates : 17 leak records were discarded because the leak record count is targeted to 4. Use system property io.netty.leakDetection.targetRecords to increase the limit.
异常堆栈分析:泄漏发生于 HTTP 响应处理阶段 通过分析异常堆栈,我确定了泄漏发生的具体位置:Spring Cloud Gateway 接收并处理下游微服务响应(Response) 的过程中。以下是核心技术线索:
核心线索 HttpClientCodec,证明网关正处于”客户端”角色: 异常堆栈中反复出现 io.netty.handler.codec.http.HttpClientCodec$Decoder.decode。在 Spring Cloud Gateway 的架构中,网关接收前端用户请求时使用的是 HttpServerCodec;只有当网关将请求路由给下游微服务,并作为客户端接收下游微服务的返回结果 时,才会使用 HttpClientCodec。
网络底层处于入站读取状态 (epollInReady): 堆栈的底层触发点是 epollInReady,这意味着网关服务器刚刚接收到了一批新的网络数据包,正在从系统内核读取数据。结合上一条,这批数据正是下游服务返回的 HTTP Response 数据流。
触发响应体解码 (HttpObjectDecoder): 日志显示数据进入了 HttpObjectDecoder,正在解析 HTTP 的 Header 和 Body(例如 readRetainedSlice 等操作)。在这个过程中,Netty 在直接内存(Direct Memory)池中分配了 ByteBuf 来承载这些字节。
移交 Reactor 框架处理 (reactor.left.httpCodec): 日志中的 Hint: 'reactor.left.httpCodec' will handle the message... 表明 Netty 已经完成了底层的字节读取,并将包含响应体的内存对象交给了 Reactor 响应式流。
修复方案:用 DataBufferUtils.join() 替代 .buffer() 经过上述分析,明确的结论是:在处理微服务响应(Response)的自定义 Filter 中,部分情况下遗漏了释放 Buffer 的操作,这是导致直接内存泄漏的根本原因。
回顾出问题的代码,在拦截并修改响应体时,我使用了 Reactor 的 .buffer() 操作符来收集数据:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Override public Mono<Void> writeWith (Publisher<? extends DataBuffer> body) { if (body instanceof Flux) { Flux<? extends DataBuffer > fluxBody = (Flux<? extends DataBuffer >) body; return super .writeWith(fluxBody.buffer() .map(dataBuffers -> { DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory (); DataBuffer join = dataBufferFactory.join(dataBuffers); byte [] content = new byte [join.readableByteCount()]; join.read(content); DataBufferUtils.release(join); return bufferFactory.wrap(transforBytes(content, ...)); })); } return super .writeWith(body); }
在正常的请求流转中,这段代码确实能正常释放内存。但是,当遇到网络抖动导致下游服务异常或客户端主动断开连接时,流会提前中断。此时 Reactor 会丢弃 .buffer() 正在收集的临时 List<DataBuffer>。由于 Reactor 作为通用响应式框架,它只负责丢弃 Java 对象引用,并不知道这些对象底层关联着需要调用 .release() 的 Netty 堆外内存,这就导致了泄漏。
针对这个问题,我采用了 Spring 官方提供的 DataBufferUtils.join() 来替代通用的 .buffer() 操作符:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Override public Mono<Void> writeWith (Publisher<? extends DataBuffer> body) { if (body instanceof Flux) { Flux<? extends DataBuffer > fluxBody = (Flux<? extends DataBuffer >) body; return super .writeWith(DataBufferUtils.join(fluxBody) .map(dataBuffer -> { byte [] content = new byte [dataBuffer.readableByteCount()]; dataBuffer.read(content); DataBufferUtils.release(dataBuffer); return bufferFactory.wrap(transforBytes(content, ...)); })); } return super .writeWith(body); }
为什么 DataBufferUtils.join() 能解决这个问题? 翻开 Spring 源码就一目了然:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public static Mono<DataBuffer> join (Publisher<? extends DataBuffer> buffers, int maxByteCount) { Assert.notNull(buffers, "'dataBuffers' must not be null" ); if (buffers instanceof Mono) { return (Mono<DataBuffer>) buffers; } return Flux.from(buffers) .collect(() -> new LimitedDataBufferList (maxByteCount), LimitedDataBufferList::add) .filter(list -> !list.isEmpty()) .map(list -> list.get(0 ).factory().join(list)) .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); }
在 Spring 源码中,DataBufferUtils.join() 底层深度借助 Reactor 的 doOnDiscard 机制——通过 .doOnDiscard(DataBuffer.class, DataBufferUtils::release) 这一关键钩子,确保即便流因异常中止,每个 DataBuffer 实例也能被精准调用 release() 方法释放。这一设计从根源上规避了 Netty 池化直接内存的泄漏风险,是响应式编程中资源管理的典型实践。
本以为这次问题得到了根本性解决,然而并没有🤷🏻♀️
下篇预告: Spring Cloud Gateway 堆外内存泄漏排查(下) 将记录泄漏问题的复现、进一步的修复方案以及最终的验证结果。