Spring Cloud Gateway
Spring Cloud Gateway 是 Spring Cloud 的一个全新项目,该项目是基于 Spring 5.0,Spring Boot 2.0 和 Project Reactor 等技术开发的网关,它旨在为微服务架构提供一种简单有效的统一的 API 路由管理方式。
Spring Cloud Gateway 作为 Spring Cloud 生态系统中的网关,目标是替代 Netflix Zuul,其不仅提供统一的路由方式,并且基于 Filter 链的方式提供了网关基本的功能,例如:安全,监控/指标,和限流。
相关概念:
Route(路由):这是网关的基本构建块。它由一个 ID,一个目标 URI,一组断言和一组过滤器定义。如果断言为真,则路由匹配。
Predicate(断言):这是一个 Java 8 的 Predicate。输入类型是一个 ServerWebExchange。我们可以使用它来匹配来自 HTTP 请求的任何内容,例如 headers 或参数。
Filter(过滤器):这是org.springframework.cloud.gateway.filter.GatewayFilter的实例,我们可以使用它修改请求和响应。
工作流程:
客户端向 Spring Cloud Gateway 发出请求。如果 Gateway Handler Mapping 中找到与请求相匹配的路由,将其发送到 Gateway Web Handler。Handler 再通过指定的过滤器链来将请求发送到我们实际的服务执行业务逻辑,然后返回。 过滤器之间用虚线分开是因为过滤器可能会在发送代理请求之前(“pre”)或之后(“post”)执行业务逻辑。
Spring Cloud Gateway 的特征:
基于 Spring Framework 5,Project Reactor 和 Spring Boot 2.0
动态路由
Predicates 和 Filters 作用于特定路由
集成 Hystrix 断路器
集成 Spring Cloud DiscoveryClient
易于编写的 Predicates 和 Filters
限流
路径重写
快速上手
引入spring-boot 2.1.1.RELEASE ,springcloud的版本为 Greenwich.M3
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<java.version>1.8</java.version>
<spring-cloud.version>Greenwich.M3</spring-cloud.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
添加的依赖包如下
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
注意springcloud gateway使用的web框架为webflux,和springMVC不兼容。引入的限流组件是hystrix。redis底层不再使用jedis,而是lettuce。
路由断言
接下来就是配置了,可以使用java代码硬编码配置路由过滤器,也可以使用yml配置文件配置。下面我们首先介绍配置文件配置方式
application.yml
server.port: 8082
spring:
application:
name: gateway
cloud:
gateway:
routes:
- id: path_route
uri: :8000
order: 0
predicates:
- Path=/foo/**
filters:
- StripPrefix=1
上面给出了一个根据请求路径来匹配目标uri的例子,如果请求的路径为/foo/bar,则目标uri为 :8000/bar。如果上面例子中没有加一个StripPrefix=1过滤器,则目标uri 为:8000/foo/bar,StripPrefix过滤器是去掉一个路径。
其他的路由断言和过滤器使用方法请查看官网
https://cloud.spring.io/spring-cloud-static/spring-cloud-gateway/2.1.0.RC2/single/spring-cloud-gateway.html#gateway-how-it-works
接下来我们来看一下设计一个网关应该需要的一些功能
修改接口返回报文
因为网关路由的接口返回报文格式各异,并且网关也有有一些限流、认证、熔断降级的返回报文,为了统一这些报文的返回格式,网关必须要对接口的返回报文进行修改,过滤器代码如下:
package org.gateway.filter.global;
import java.nio.charset.Charset;
import org.gateway.response.Response;
import org.reactivestreams.Publisher;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import com.alibaba.fastjson.JSON;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Component
public class WrapperResponseFilter implements GlobalFilter, Ordered {
@Override
public int getOrder() {
// -1 is response write filter, must be called before that
return -2;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpResponse originalResponse = exchange.getResponse();
DataBufferFactory bufferFactory = originalResponse.bufferFactory();
ServerHttpResponseDecorator decoratedResponse = new ServerHttpResponseDecorator(originalResponse) {
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
if (body instanceof Flux) {
Flux<? extends DataBuffer> fluxBody = (Flux<? extends DataBuffer>) body;
return super.writeWith(fluxBody.map(dataBuffer -> {
// probably should reuse buffers
byte[] content = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(content);
// 释放掉内存
DataBufferUtils.release(dataBuffer);
String rs = new String(content, Charset.forName("UTF-8"));
Response response = new Response();
response.setCode("1");
response.setMessage("请求成功");
response.setData(rs);
byte[] newRs = JSON.toJSONString(response).getBytes(Charset.forName("UTF-8"));
originalResponse.getHeaders().setContentLength(newRs.length);//如果不重新设置长度则收不到消息。
return bufferFactory.wrap(newRs);
}));
}
// if body is not a flux. never got there.
return super.writeWith(body);
}
};
// replace response with decorator
return chain.filter(exchange.mutate().response(decoratedResponse).build());
}
}
需要注意的是order需要小于-1,需要先于NettyWriteResponseFilter过滤器执行。
有了一个这样的过滤器,我们就可以统一返回报文格式了。
认证
以下提供一个简单的认证过滤器
package org.gateway.filter.global;
import java.nio.charset.StandardCharsets;
import org.gateway.response.Response;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import com.alibaba.fastjson.JSON;
import reactor.core.publisher.Mono;
@Component
public class AuthFilter implements GlobalFilter{
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
String token = exchange.getRequest().getHeaders().getFirst("token");
if ("token".equals(token)) {
return chain.filter(exchange);
}
ServerHttpResponse response = exchange.getResponse();
Response data = new Response();
data.setCode("401");
data.setMessage("非法请求");
byte[] datas = JSON.toJSONString(data).getBytes(StandardCharsets.UTF_8);
DataBuffer buffer = response.bufferFactory().wrap(datas);
response.setStatusCode(HttpStatus.UNAUTHORIZED);
response.getHeaders().add("Content-Type", "application/json;charset=UTF-8");
return response.writeWith(Mono.just(buffer));
}
}
限流
springcloud gateway 为我们提供了限流过滤器RequestRateLimiterGatewayFilterFactory,和限流的实现类RedisRateLimiter使用令牌桶限流。但是官方的不一定满足我们的需求,所以我们重新写一个过滤器(基本和官方一致),只是将官方的返回报文改了。
package org.gateway.limiter;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.gateway.response.Response;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.factory.AbstractGatewayFilterFactory;
import org.springframework.cloud.gateway.filter.ratelimit.KeyResolver;
import org.springframework.cloud.gateway.filter.ratelimit.RateLimiter;
import org.springframework.cloud.gateway.route.Route;
import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpResponse;
import com.alibaba.fastjson.JSON;
import reactor.core.publisher.Mono;
/**
* User Request Rate Limiter filter. See https://stripe.com/blog/rate-limiters and
*/
public class RateLimiterGatewayFilterFactory extends AbstractGatewayFilterFactory<RateLimiterGatewayFilterFactory.Config> {
public static final String KEY_RESOLVER_KEY = "keyResolver";
private final RateLimiter defaultRateLimiter;
private final KeyResolver defaultKeyResolver;
public RateLimiterGatewayFilterFactory(RateLimiter defaultRateLimiter,
KeyResolver defaultKeyResolver) {
super(Config.class);
this.defaultRateLimiter = defaultRateLimiter;
this.defaultKeyResolver = defaultKeyResolver;
}
public KeyResolver getDefaultKeyResolver() {
return defaultKeyResolver;
}
public RateLimiter getDefaultRateLimiter() {
return defaultRateLimiter;
}
@SuppressWarnings("unchecked")
@Override
public GatewayFilter apply(Config config) {
KeyResolver resolver = (config.keyResolver == null) ? defaultKeyResolver : config.keyResolver;
RateLimiter<Object> limiter = (config.rateLimiter == null) ? defaultRateLimiter : config.rateLimiter;
return (exchange, chain) -> {
Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
return resolver.resolve(exchange).flatMap(key ->
// TODO: if key is empty?
limiter.isAllowed(route.getId(), key).flatMap(response -> {
for (Map.Entry<String, String> header : response.getHeaders().entrySet()) {
exchange.getResponse().getHeaders().add(header.getKey(), header.getValue());
}
if (response.isAllowed()) {
return chain.filter(exchange);
}
ServerHttpResponse rs = exchange.getResponse();
Response data = new Response();
data.setCode("101");
data.setMessage("访问过快");
byte[] datas = JSON.toJSONString(data).getBytes(StandardCharsets.UTF_8);
DataBuffer buffer = rs.bufferFactory().wrap(datas);
rs.setStatusCode(HttpStatus.UNAUTHORIZED);
rs.getHeaders().add("Content-Type", "application/json;charset=UTF-8");
return rs.writeWith(Mono.just(buffer));
}));
};
}
public static class Config {
private KeyResolver keyResolver;
private RateLimiter rateLimiter;
private HttpStatus statusCode = HttpStatus.TOO_MANY_REQUESTS;
public KeyResolver getKeyResolver() {
return keyResolver;
}
public Config setKeyResolver(KeyResolver keyResolver) {
this.keyResolver = keyResolver;
return this;
}
public RateLimiter getRateLimiter() {
return rateLimiter;
}
public Config setRateLimiter(RateLimiter rateLimiter) {
this.rateLimiter = rateLimiter;
return this;
}
public HttpStatus getStatusCode() {
return statusCode;
}
public Config setStatusCode(HttpStatus statusCode) {
this.statusCode = statusCode;
return this;
}
}
}
然后限流必须要有一个key,根据什么来进行限流,ip,接口,或者用户来进行限流,所以我们自定义一个KeyResolver
package org.gateway.limiter;
import org.springframework.cloud.gateway.filter.ratelimit.KeyResolver;
import org.springframework.web.server.ServerWebExchange;
import com.alibaba.fastjson.JSON;
import reactor.core.publisher.Mono;
public class CustomKeyResolver implements KeyResolver {
public static final String BEAN_NAME = "customKeyResolver";
@Override
public Mono<String> resolve(ServerWebExchange exchange) {
return Mono.just(getKey(exchange));
}
/**
*
* @param exchange
* @return
*/
private String getKey(ServerWebExchange exchange) {
LimitKey limitKey = new LimitKey();
limitKey.setApi(exchange.getRequest().getPath().toString());
limitKey.setBiz(exchange.getRequest().getQueryParams().getFirst("biz"));
return JSON.toJSONString(limitKey);
}
}
最后RedisRateLimiter我们也需要重写,因为不支持多级限流,原生的只会判断一个key。代码如下:
/**
* This uses a basic token bucket algorithm and relies on the fact that Redis scripts
* execute atomically. No other operations can run between fetching the count and
* writing the new count.
*/
@Override
public Mono<Response> isAllowed(String routeId, String id) {
if (!this.initialized.get()) {
throw new IllegalStateException("RedisRateLimiter is not initialized");
}
LimitConfig limitConfig = getLimitConfig(routeId);
if (limitConfig == null