前言

webmvc和webflux作为spring framework的两个重要模块,代表了两个IO模型,阻塞式和非阻塞式的。

webmvc是基于servlet的阻塞式模型(一般称为oio),一个请求到达服务器后会单独分配一个线程去处理请求,如果请求包含IO操作,线程在IO操作结束之前一直处于阻塞等待状态,这样线程在等待IO操作结束的时间就浪费了。

webflux是基于reactor的非阻塞模型(一般称为nio),同样,请求到达服务器后也会分配一个线程去处理请求,如果请求包含IO操作,线程在IO操作结束之前不再是处于阻塞等待状态,而是去处理其他事情,等到IO操作结束之后,再通知(得益于系统的机制)线程继续处理请求。

这样线程就有效地利用了IO操作所消耗的时间。

WebFlux 增删改查完整实战 demo

Dao层 (又称 repository 层)

entity(又称 PO对象)

新建User 对象 ,代码如下:

package com.crazymaker.springcloud.reactive.user.info.entity;import com.crazymaker.springcloud.reactive.user.info.dto.User;import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Table;@Entity
@Table(name = "t_user")
public final class UserEntity extends User
{@Id@Column(name = "id")@GeneratedValue(strategy = GenerationType.IDENTITY)@Overridepublic long getUserId(){return super.getUserId();}@Column(name = "name")public String getName(){return super.getName();}
}

Dao 实现类

@Repository 用于标注数据访问组件,即 DAO 组件。实现代码中使用名为 repository 的 Map 对象作为内存数据存储,并对对象具体实现了具体业务逻辑。JpaUserRepositoryImpl 负责将 PO 持久层(数据操作)相关的封装组织,完成新增、查询、删除等操作。

package com.crazymaker.springcloud.reactive.user.info.dao.impl;import com.crazymaker.springcloud.reactive.user.info.dto.User;
import org.springframework.stereotype.Repository;import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import javax.persistence.Query;
import javax.transaction.Transactional;
import java.util.List;@Repository
@Transactional
public class JpaUserRepositoryImpl
{@PersistenceContextprivate EntityManager entityManager;public Long insert(final User user){entityManager.persist(user);return user.getUserId();}public void delete(final Long userId){Query query = entityManager.createQuery("DELETE FROM UserEntity o WHERE o.userId = ?1");query.setParameter(1, userId);query.executeUpdate();}@SuppressWarnings("unchecked")public List<User> selectAll(){return (List<User>) entityManager.createQuery("SELECT o FROM UserEntity o").getResultList();}@SuppressWarnings("unchecked")public User selectOne(final Long userId){Query query = entityManager.createQuery("SELECT o FROM UserEntity o WHERE o.userId = ?1");query.setParameter(1, userId);return (User) query.getSingleResult();}
}

Service服务层


package com.crazymaker.springcloud.reactive.user.info.service.impl;import com.crazymaker.springcloud.common.util.BeanUtil;
import com.crazymaker.springcloud.reactive.user.info.dao.impl.JpaUserRepositoryImpl;
import com.crazymaker.springcloud.reactive.user.info.dto.User;
import com.crazymaker.springcloud.reactive.user.info.entity.UserEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;import javax.annotation.Resource;
import java.util.List;@Slf4j
@Service
@Transactional
public class JpaEntityServiceImpl
{@Resourceprivate JpaUserRepositoryImpl userRepository;@Transactional//增加用户public User addUser(User dto){User userEntity = new UserEntity();userEntity.setUserId(dto.getUserId());userEntity.setName(dto.getName());userRepository.insert(userEntity);BeanUtil.copyProperties(userEntity,dto);return dto;}@Transactional//删除用户public User delUser(User dto){userRepository.delete(dto.getUserId());return dto;}//查询全部用户public List<User> selectAllUser(){log.info("方法 selectAllUser 被调用了");return userRepository.selectAll();}//查询一个用户public User selectOne(final Long userId){log.info("方法 selectOne 被调用了");return userRepository.selectOne(userId);}}

Controller控制层

Spring Boot WebFlux也可以使用注解模式来进行API接口开发。

package com.crazymaker.springcloud.reactive.user.info.controller;import com.crazymaker.springcloud.common.result.RestOut;
import com.crazymaker.springcloud.reactive.user.info.dto.User;
import com.crazymaker.springcloud.reactive.user.info.service.impl.JpaEntityServiceImpl;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;import javax.annotation.Resource;/*** Mono 和 Flux 适用于两个场景,即:* Mono:实现发布者,并返回 0 或 1 个元素,即单对象。* Flux:实现发布者,并返回 N 个元素,即 List 列表对象。* 有人会问,这为啥不直接返回对象,比如返回 City/Long/List。* 原因是,直接使用 Flux 和 Mono 是非阻塞写法,相当于回调方式。* 利用函数式可以减少了回调,因此会看不到相关接口。这恰恰是 WebFlux 的好处:集合了非阻塞 + 异步*/
@Slf4j
@Api(value = "用户信息、基础学习DEMO", tags = {"用户信息DEMO"})
@RestController
@RequestMapping("/api/user")
public class UserReactiveController
{@ApiOperation(value = "回显测试", notes = "提示接口使用者注意事项", httpMethod = "GET")@RequestMapping(value = "/hello")@ApiImplicitParams({@ApiImplicitParam(paramType = "query", dataType="string",dataTypeClass = String.class, name = "name",value = "名称", required = true)})public Mono<RestOut<String>> hello(@RequestParam(name = "name") String name){log.info("方法 hello 被调用了");return  Mono.just(RestOut.succeed("hello " + name));}@ResourceJpaEntityServiceImpl jpaEntityService;@PostMapping("/add/v1")@ApiOperation(value = "插入用户" )@ApiImplicitParams({
//                @ApiImplicitParam(paramType = "body", dataType="java.lang.Long", name = "userId", required = false),
//                @ApiImplicitParam(paramType = "body", dataType="用户", name = "dto", required = true)@ApiImplicitParam(paramType = "body",dataTypeClass = User.class, dataType="User", name = "dto",  required = true),})
//    @ApiImplicitParam(paramType = "body", dataType="com.crazymaker.springcloud.reactive.user.info.dto.User",  required = true)public Mono<User> userAdd(@RequestBody User dto){//命令式写法
//        jpaEntityService.delUser(dto);//响应式写法return Mono.create(cityMonoSink -> cityMonoSink.success(jpaEntityService.addUser(dto)));}@PostMapping("/del/v1")@ApiOperation(value = "响应式的删除")@ApiImplicitParams({@ApiImplicitParam(paramType = "body", dataType="User",dataTypeClass = User.class,name = "dto",  required = true),})public Mono<User> userDel(@RequestBody User dto){//命令式写法//        jpaEntityService.delUser(dto);//响应式写法return Mono.create(cityMonoSink -> cityMonoSink.success(jpaEntityService.delUser(dto)));}@PostMapping("/list/v1")@ApiOperation(value = "查询用户")public Flux<User> listAllUser(){log.info("方法 listAllUser 被调用了");//命令式写法 改为响应式 以下语句,需要在流中执行
//        List<User> list = jpaEntityService.selectAllUser();//响应式写法Flux<User> userFlux = Flux.fromIterable(jpaEntityService.selectAllUser());return userFlux;}@PostMapping("/detail/v1")@ApiOperation(value = "响应式的查看")@ApiImplicitParams({@ApiImplicitParam(paramType = "body", dataTypeClass = User.class,dataType="User", name = "dto",  required = true),})public Mono<User> getUser(@RequestBody User dto){log.info("方法 getUser 被调用了");//构造流Mono<User> userMono = Mono.justOrEmpty(jpaEntityService.selectOne(dto.getUserId()));return userMono;}@PostMapping("/detail/v2")@ApiOperation(value = "命令式的查看")@ApiImplicitParams({@ApiImplicitParam(paramType = "body", dataType="User",dataTypeClass = User.class, name = "dto",  required = true),})        public RestOut<User> getUserV2(@RequestBody User dto){log.info("方法 getUserV2 被调用了");User user = jpaEntityService.selectOne(dto.getUserId());return RestOut.success(user);}}

从返回值可以看出,Mono 和 Flux 适用于两个场景,即:

  • Mono:实现发布者,并返回 0 或 1 个元素,即单对象
  • Flux:实现发布者,并返回 N 个元素,即 List 列表对象

有人会问,这为啥不直接返回对象,比如返回 City/Long/List。原因是,直接使用 Flux 和 Mono 是非阻塞写法,相当于回调方式。利用函数式可以减少了回调,因此会看不到相关接口。这恰恰是 WebFlux 的好处:集合了非阻塞 + 异步。

Mono

Mono 是什么? 官方描述如下:A Reactive Streams Publisher with basic rx operators that completes successfully by emitting an element, or with an error.

Mono 是响应流 Publisher 具有基础 rx 操作符。可以成功发布元素或者错误。如图所示:

file

Mono 常用的方法有:

  • Mono.create():使用 MonoSink 来创建 Mono
  • Mono.justOrEmpty():从一个 Optional 对象或 null 对象中创建 Mono。
  • Mono.error():创建一个只包含错误消息的 Mono
  • Mono.never():创建一个不包含任何消息通知的 Mono
  • Mono.delay():在指定的延迟时间之后,创建一个 Mono,产生数字 0 作为唯一值

Flux

Flux 是什么? 官方描述如下:A Reactive Streams Publisher with rx operators that emits 0 to N elements, and then completes (successfully or with an error).

Flux 是响应流 Publisher 具有基础 rx 操作符。可以成功发布 0 到 N 个元素或者错误。Flux 其实是 Mono 的一个补充。如图所示:

file

所以要注意:如果知道 Publisher 是 0 或 1 个,则用 Mono。

Flux 最值得一提的是 fromIterable 方法。 fromIterable(Iterable<? extends T> it) 可以发布 Iterable 类型的元素。当然,Flux 也包含了基础的操作:map、merge、concat、flatMap、take,这里就不展开介绍了。

使用配置模式进行WebFlux 接口开发

1 可以编写一个处理器类 Handler代替 Controller , Service 、dao层保持不变。

2 配置请求的路由

处理器类 Handler

处理器类 Handler需要从请求解析参数,并且封装响应,代码如下:

package com.crazymaker.springcloud.reactive.user.info.config.handler;import com.crazymaker.springcloud.common.exception.BusinessException;
import com.crazymaker.springcloud.reactive.user.info.dto.User;
import com.crazymaker.springcloud.reactive.user.info.service.impl.JpaEntityServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;import javax.annotation.Resource;import static org.springframework.http.MediaType.APPLICATION_JSON_UTF8;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;@Slf4j
@Component
public class UserReactiveHandler
{@Resourceprivate JpaEntityServiceImpl jpaEntityService;/*** 得到所有用户** @param request* @return*/public Mono<ServerResponse> getAllUser(ServerRequest request){log.info("方法 getAllUser 被调用了");return ok().contentType(APPLICATION_JSON_UTF8).body(Flux.fromIterable(jpaEntityService.selectAllUser()), User.class);}/*** 创建用户** @param request* @return*/public Mono<ServerResponse> createUser(ServerRequest request){// 2.0.0 是可以工作, 但是2.0.1 下面这个模式是会报异常Mono<User> user = request.bodyToMono(User.class);/**Mono 使用响应式的,时候都是一个流,是一个发布者,任何时候都不能调用发布者的订阅方法也就是不能消费它, 最终的消费还是交给我们的Springboot来对它进行消费,任何时候不能调用它的user.subscribe();不能调用block把异常放在统一的地方来处理*/return user.flatMap(dto ->{// 校验代码需要放在这里if (StringUtils.isBlank(dto.getName())){throw new BusinessException("用户名不能为空");}return ok().contentType(APPLICATION_JSON_UTF8).body(Mono.create(cityMonoSink -> cityMonoSink.success(jpaEntityService.addUser(dto))), User.class);});}/*** 根据id删除用户** @param request* @return*/public Mono<ServerResponse> deleteUserById(ServerRequest request){String id = request.pathVariable("id");// 校验代码需要放在这里if (StringUtils.isBlank(id)){throw new BusinessException("id不能为空");}User dto = new User();dto.setUserId(Long.parseLong(id));return ok().contentType(APPLICATION_JSON_UTF8).body(Mono.create(cityMonoSink -> cityMonoSink.success(jpaEntityService.delUser(dto))), User.class);}}

路由配置

package com.crazymaker.springcloud.reactive.user.info.config;import com.crazymaker.springcloud.reactive.user.info.config.handler.UserReactiveHandler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.server.WebFilter;import static org.springframework.web.reactive.function.server.RequestPredicates.DELETE;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RequestPredicates.POST;
import static org.springframework.web.reactive.function.server.RequestPredicates.accept;@Configuration
public class RoutersConfig
{@BeanRouterFunction<ServerResponse> routes(UserReactiveHandler handler){// 下面的相当于类里面的 @RequestMapping// 得到所有用户return RouterFunctions.route(GET("/user"), handler::getAllUser)// 创建用户.andRoute(POST("/user").and(accept(MediaType.APPLICATION_JSON_UTF8)), handler::createUser)// 删除用户.andRoute(DELETE("/user/{id}"), handler::deleteUserById);}@Value("${server.servlet.context-path}")private String contextPath;//处理上下文路径,没有上下文路径,此函数可以忽略@Beanpublic WebFilter contextPathWebFilter(){return (exchange, chain) ->{ServerHttpRequest request = exchange.getRequest();String requestPath = request.getURI().getPath();if (requestPath.startsWith(contextPath)){return chain.filter(exchange.mutate().request(request.mutate().contextPath(contextPath).build()).build());}return chain.filter(exchange);};}
}

集成Swagger

本文主要展示一下如何使用支持WebFlux的Swagger

maven依赖

        <dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>${swagger.version}</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-spring-webflux</artifactId><version>${swagger.version}</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger-ui</artifactId><version>${swagger.version}</version></dependency>
  • swagger.version目前是3.0.0,Spring 5引入了WebFlux,而当前版本的SpringFox Swagger2(2.9.2)还不支持WebFlux,得使用3.0.0才支持

swagger 配置

package com.crazymaker.springcloud.reactive.user.info.config;import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.web.util.UriComponentsBuilder;
import springfox.documentation.PathProvider;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.paths.DefaultPathProvider;
import springfox.documentation.spring.web.paths.Paths;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2WebFlux;@Configuration
@EnableSwagger2WebFlux
public class SwaggerConfig
{@Beanpublic Docket createRestApi(){
//        return new Docket(DocumentationType.OAS_30)return new Docket(DocumentationType.SWAGGER_2).apiInfo(apiInfo()).pathMapping(servletContextPath)  //注意webflux没有context-path配置,如果不加这句话的话,接口测试时路径没有前缀.select().apis(RequestHandlerSelectors.basePackage("com.crazymaker.springcloud.reactive.user.info.controller")).paths(PathSelectors.any()).build();}@Value("${server.servlet.context-path}")private String servletContextPath;//构建 api文档的详细信息函数private ApiInfo apiInfo(){return new ApiInfoBuilder()//页面标题.title("疯狂创客圈 springcloud + Nginx 高并发核心编程")//描述.description("Zuul+Swagger2  构建  RESTful APIs")//条款地址.termsOfServiceUrl("https://www.cnblogs.com/crazymakercircle/").contact(new Contact("疯狂创客圈", "https://www.cnblogs.com/crazymakercircle/", "")).version("1.0").build();}/*** 重写 PathProvider ,解决 context-path 重复问题* @return*/@Order(Ordered.HIGHEST_PRECEDENCE)@Beanpublic PathProvider pathProvider() {return new DefaultPathProvider() {@Overridepublic String getOperationPath(String operationPath) {operationPath = operationPath.replaceFirst(servletContextPath, "/");UriComponentsBuilder uriComponentsBuilder = UriComponentsBuilder.fromPath("/");return Paths.removeAdjacentForwardSlashes(uriComponentsBuilder.path(operationPath).build().toString());}@Overridepublic String getResourceListingPath(String groupName, String apiDeclaration) {apiDeclaration = super.getResourceListingPath(groupName, apiDeclaration);return apiDeclaration;}};}
}

测试

配置模式的 WebFlux Rest接口测试

配置模式的 WebFlux Rest接口只能使用PostMan测试,例子如下:

注意,不能带上下文路径:

http://192.168.68.1:7705/uaa-react-provider/user

注解模式的WebFlux Rest接口测试

swagger 增加界面

CRUD其他的界面,略过

配置大全

静态资源配置

@Configuration
@EnableWebFlux     //使用注解@EnableWebFlux
public class WebFluxConfig implements WebFluxConfigurer {       //继承WebFluxConfigurer //配置静态资源@Overridepublic void addResourceHandlers(ResourceHandlerRegistry registry) {registry.addResourceHandler("/static/**").addResourceLocations("classpath:/static/");registry.addResourceHandler("/file/**").addResourceLocations("file:" + System.getProperty("user.dir") + File.separator + "file" + File.separator);registry.addResourceHandler("/swagger-ui.html**").addResourceLocations("classpath:/META-INF/resources/");registry.addResourceHandler("/webjars/**").addResourceLocations("classpath:/META-INF/resources/webjars/");}//配置拦截器//配置编解码...
}

WebFluxSecurity配置

@Configuration
@EnableWebSecurity
public class WebMvcSecurityConfig extends WebSecurityConfigurerAdapter implements
AuthenticationEntryPoint,       //未验证回调
AuthenticationSuccessHandler,       //验证成功回调
AuthenticationFailureHandler,       //验证失败回调
LogoutSuccessHandler {      //登出成功回调@Overridepublic void commence(HttpServletRequest request, HttpServletResponse response, AuthenticationException authException) throws IOException, ServletException {sendJson(response, new Response<>(HttpStatus.UNAUTHORIZED.value(), "Unauthorized"));}@Overridepublic void onAuthenticationFailure(HttpServletRequest request, HttpServletResponse response, AuthenticationException exception) throws IOException, ServletException {sendJson(response, new Response<>(1, "Incorrect"));}@Overridepublic void onAuthenticationSuccess(HttpServletRequest request, HttpServletResponse response, Authentication authentication) throws IOException, ServletException {sendJson(response, new Response<>(0, authentication.getClass().getSimpleName()));}@Overridepublic void onLogoutSuccess(HttpServletRequest request, HttpServletResponse response, Authentication authentication) throws IOException, ServletException {sendJson(response, new Response<>(0, "Success"));}@Overrideprotected void configure(HttpSecurity http) throws Exception {http.csrf().disable().authorizeRequests().antMatchers("/swagger*/**", "/webjars/**", "/v2/api-docs").permitAll().and().authorizeRequests().antMatchers("/static/**", "/file/**").permitAll().and().authorizeRequests().anyRequest().authenticated().and().logout().logoutUrl("/user/logout")        //虚拟路径,不是控制器定义的路径.logoutSuccessHandler(this).permitAll().and().exceptionHandling().authenticationEntryPoint(this).and().formLogin().usernameParameter("username").passwordParameter("password").loginProcessingUrl("/user/login")      //虚拟路径,不是控制器定义的路径.successForwardUrl("/user/login")     //是控制器定义的路径.failureHandler(this).and().httpBasic().authenticationEntryPoint(this);}@Overrideprotected void configure(AuthenticationManagerBuilder auth) throws Exception {auth.userDetailsService(userDetailService);}

webflux-验证依赖于用户数据服务,需定义实现ReactiveUserDetailsService的Bean

@Configuration
@EnableWebFluxSecurity     //使用注解@EnableWebFluxSecurity
public class WebFluxSecurityConfig implements
WebFilter,      //拦截器
ServerLogoutSuccessHandler,     //登出成功回调
ServerAuthenticationEntryPoint,     //验证入口
ServerAuthenticationFailureHandler,     //验证成功回调
ServerAuthenticationSuccessHandler {        //验证失败回调//实现接口的方法@Overridepublic Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {//配置webflux的context-pathServerHttpRequest request = exchange.getRequest();if (request.getURI().getPath().startsWith(contextPath)) {exchange = exchange.mutate().request(request.mutate().contextPath(contextPath).build()).build();}//把查询参数转移到FormData中,不然验证过滤器(ServerFormLoginAuthenticationConverter)接受不到参数if (exchange.getRequest().getMethod() == HttpMethod.POST && exchange.getRequest().getQueryParams().size() > 0) {ServerWebExchange finalExchange = exchange;ServerWebExchange realExchange = new Decorator(exchange) {@Overridepublic Mono<MultiValueMap<String, String>> getFormData() {return super.getFormData().map(new Function<MultiValueMap<String, String>, MultiValueMap<String, String>>() {@Overridepublic MultiValueMap<String, String> apply(MultiValueMap<String, String> stringStringMultiValueMap) {if (stringStringMultiValueMap.size() == 0) {return finalExchange.getRequest().getQueryParams();} else {return stringStringMultiValueMap;}}});}};return chain.filter(realExchange);}return chain.filter(exchange);}@Overridepublic Mono<Void> onLogoutSuccess(WebFilterExchange webFilterExchange, Authentication authentication) {return sendJson(webFilterExchange.getExchange(), new Response<>("登出成功"));}@Overridepublic Mono<Void> commence(ServerWebExchange exchange, AuthenticationException e) {return sendJson(exchange, new Response<>(HttpStatus.UNAUTHORIZED.value(), "未验证"));}@Overridepublic Mono<Void> onAuthenticationFailure(WebFilterExchange webFilterExchange, AuthenticationException exception) {return sendJson(webFilterExchange.getExchange(), new Response<>(1, "验证失败"));}@Overridepublic Mono<Void> onAuthenticationSuccess(WebFilterExchange webFilterExchange, Authentication authentication) {return webFilterExchange.getChain().filter(webFilterExchange.getExchange().mutate().request(t -> t.method(HttpMethod.POST).path("/user/login"))       //转发到自定义控制器.build());}@Beanpublic SecurityWebFilterChain springSecurityFilterChain(ServerHttpSecurity http) {http.addFilterAfter(this, SecurityWebFiltersOrder.FIRST).csrf().disable().authorizeExchange().pathMatchers("/swagger*/**", "/webjars/**", "/v2/api-docs")       //swagger.permitAll().and().authorizeExchange().pathMatchers("/static/**", "/file/**")      //静态资源.permitAll().and().authorizeExchange().anyExchange().authenticated().and().logout()       //登出.logoutUrl("/user/logout").logoutSuccessHandler(this).and().exceptionHandling()       //未验证回调.authenticationEntryPoint(this).and().formLogin().loginPage("/user/login").authenticationFailureHandler(this)      //验证失败回调.authenticationSuccessHandler(this)     //验证成功回调.and().httpBasic().authenticationEntryPoint(this);      //basic验证,一般用于移动端return http.build();}
}

WebSession配置

@Configuration
@EnableRedisWebSession(maxInactiveIntervalInSeconds = 60) //使用注解@EnableRedisWebSession ,maxInactiveIntervalInSeconds设置数据过期时间,spring.session.timeout不管用
public class RedisWebSessionConfig { //考虑到分布式系统,一般使用redis存储session@Beanpublic LettuceConnectionFactory lettuceConnectionFactory() {return new LettuceConnectionFactory();}}
//单点登录使用ReactiveRedisSessionRepository.getSessionRedisOperations().scan方法查询相同用户名的session,删除其他session即可
public Mono<Map<String, String>> findByPrincipalName(String name) {return reactiveSessionRepository.getSessionRedisOperations().scan(ScanOptions.scanOptions().match(ReactiveRedisSessionRepository.DEFAULT_NAMESPACE + ":sessions:*").build()).flatMap(new Function<String, Publisher<Tuple2<String, Map.Entry<Object, Object>>>>() {@Overridepublic Publisher<Tuple2<String, Map.Entry<Object, Object>>> apply(String s) {return reactiveSessionRepository.getSessionRedisOperations().opsForHash().entries(s).map(new Function<Map.Entry<Object, Object>, Tuple2<String, Map.Entry<Object, Object>>>() {@Overridepublic Tuple2<String, Map.Entry<Object, Object>> apply(Map.Entry<Object, Object> objectObjectEntry) {return Tuples.of(s, objectObjectEntry);}});}}).filter(new Predicate<Tuple2<String, Map.Entry<Object, Object>>>() {@Overridepublic boolean test(Tuple2<String, Map.Entry<Object, Object>> rule) {Map.Entry<Object, Object> t = rule.getT2();String key = "sessionAttr:" + HttpSessionSecurityContextRepository.SPRING_SECURITY_CONTEXT_KEY;if (key.equals(t.getKey())) {User sci = (User) ((SecurityContextImpl) t.getValue()).getAuthentication().getPrincipal();return sci.getUsername().equals(name);}return false;}}).collectMap(new Function<Tuple2<String, Map.Entry<Object, Object>>, String>() {@Overridepublic String apply(Tuple2<String, Map.Entry<Object, Object>> rule) {return name;}}, new Function<Tuple2<String, Map.Entry<Object, Object>>, String>() {@Overridepublic String apply(Tuple2<String, Map.Entry<Object, Object>> rule) {return rule.getT1().replace(ReactiveRedisSessionRepository.DEFAULT_NAMESPACE + ":sessions:", "");}});}

对标的 SpringWebMVC配置

@Configuration
@EnableRedisHttpSession    //使用注解@EnableRedisHttpSession
public class RedisHttpSessionConfig { //考虑到分布式系统,一般使用redis存储session@Beanpublic LettuceConnectionFactory redisConnectionFactory() {return new LettuceConnectionFactory();}}
//单点登录使用FindByIndexNameSessionRepository根据用户名查询session,删除其他session即可
Map<String, Session> map = findByIndexNameSessionRepository.findByPrincipalName(name);

文件上传配置

//参数上传
//定义参数bean
@Setter
@Getter
@ToString
@ApiModel
public class QueryBean{@ApiModelProperty(value = "普通参数", required = false, example = "")private String query;@ApiModelProperty(value = "文件参数", required = false, example = "")private FilePart image;       //强调,webflux中使用FilePart作为接收文件的类型
}
//定义接口
@ApiOperation("一个接口")
@PostMapping("/path")
//这里需要使用@ApiImplicitParam显示配置【文件参数】才能使swagger界面显示上传文件按钮
@ApiImplicitParams({@ApiImplicitParam(paramType = "form", //表单参数dataType = "__file", //最新版本使用__file表示文件,以前用的是filename = "image", //和QueryBean里面的【文件参数image】同名value = "文件") //注释
})
public Mono<Response> bannerAddOrUpdate(QueryBean q) {}

WebFlux 执行流程

userAdd方法代码如下:

        public Mono<User> userAdd(@RequestBody User dto){//命令式写法
//        jpaEntityService.delUser(dto);//响应式写法return Mono.create(cityMonoSink -> cityMonoSink.success(jpaEntityService.addUser(dto)));}

由于返回的数据只有一个所以使用的是Mono作为返回数据,使用Mono类静态create方法创建Mono对象,代码如下:

public abstract class Mono<T> implements Publisher<T> {static final BiPredicate EQUALS_BIPREDICATE = Object::equals;public Mono() {}public static <T> Mono<T> create(Consumer<MonoSink<T>> callback) {return onAssembly(new MonoCreate(callback));}
...
}

​ 可以到create方法接收一个参数,参数是Consumer对象,通过callback可以看出,这里使用的是callback回调,下面看看Consumer接口的定义:


@FunctionalInterface
public interface Consumer<T> {/*** Performs this operation on the given argument.** @param t the input argument*/void accept(T t);/*** Returns a composed {@code Consumer} that performs, in sequence, this* operation followed by the {@code after} operation. If performing either* operation throws an exception, it is relayed to the caller of the* composed operation.  If performing this operation throws an exception,* the {@code after} operation will not be performed.** @param after the operation to perform after this operation* @return a composed {@code Consumer} that performs in sequence this* operation followed by the {@code after} operation* @throws NullPointerException if {@code after} is null*/default Consumer<T> andThen(Consumer<? super T> after) {Objects.requireNonNull(after);return (T t) -> { accept(t); after.accept(t); };}
}

通过上面的代码可以看出,有两个方法,一个是默认的方法andThen,还有一个accept方法,

Mono.create()方法的参数需要一个实现类,实现Consumer接口;Mono.create方法的参数指向的实例对象, 就是要实现这个accept方法。

例子中,下面的lambda表达式,就是accept方法的实现,实参的类型为 Consumer<MonoSink> , accept的实现为 如下:

cityMonoSink -> cityMonoSink.success(jpaEntityService.addUser(dto))

来来来,重复看一下,create方法的实现:

   public static <T> Mono<T> create(Consumer<MonoSink<T>> callback) {return onAssembly(new MonoCreate(callback));}

​ 在方法内部调用了onAssembly方法,参数是MonoCreate对象,然后我们看看MonoCreate类,代码如下:

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//package reactor.core.publisher;import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Scannable.Attr;
import reactor.core.publisher.FluxCreate.SinkDisposable;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;final class MonoCreate<T> extends Mono<T> {final Consumer<MonoSink<T>> callback;MonoCreate(Consumer<MonoSink<T>> callback) {this.callback = callback;}public void subscribe(CoreSubscriber<? super T> actual) {MonoCreate.DefaultMonoSink<T> emitter = new MonoCreate.DefaultMonoSink(actual);actual.onSubscribe(emitter);try {this.callback.accept(emitter);} catch (Throwable var4) {emitter.error(Operators.onOperatorError(var4, actual.currentContext()));}}static final class DefaultMonoSink<T> extends AtomicBoolean implements MonoSink<T>, InnerProducer<T> {final CoreSubscriber<? super T> actual;volatile Disposable disposable;static final AtomicReferenceFieldUpdater<MonoCreate.DefaultMonoSink, Disposable> DISPOSABLE = AtomicReferenceFieldUpdater.newUpdater(MonoCreate.DefaultMonoSink.class, Disposable.class, "disposable");volatile int state;static final AtomicIntegerFieldUpdater<MonoCreate.DefaultMonoSink> STATE = AtomicIntegerFieldUpdater.newUpdater(MonoCreate.DefaultMonoSink.class, "state");volatile LongConsumer requestConsumer;static final AtomicReferenceFieldUpdater<MonoCreate.DefaultMonoSink, LongConsumer> REQUEST_CONSUMER = AtomicReferenceFieldUpdater.newUpdater(MonoCreate.DefaultMonoSink.class, LongConsumer.class, "requestConsumer");T value;static final int NO_REQUEST_HAS_VALUE = 1;static final int HAS_REQUEST_NO_VALUE = 2;static final int HAS_REQUEST_HAS_VALUE = 3;DefaultMonoSink(CoreSubscriber<? super T> actual) {this.actual = actual;}public Context currentContext() {return this.actual.currentContext();}@Nullablepublic Object scanUnsafe(Attr key) {if (key != Attr.TERMINATED) {return key == Attr.CANCELLED ? OperatorDisposables.isDisposed(this.disposable) : super.scanUnsafe(key);} else {return this.state == 3 || this.state == 1;}}public void success() {if (STATE.getAndSet(this, 3) != 3) {try {this.actual.onComplete();} finally {this.disposeResource(false);}}}public void success(@Nullable T value) {if (value == null) {this.success();} else {int s;do {s = this.state;if (s == 3 || s == 1) {Operators.onNextDropped(value, this.actual.currentContext());return;}if (s == 2) {if (STATE.compareAndSet(this, s, 3)) {try {this.actual.onNext(value);this.actual.onComplete();} finally {this.disposeResource(false);}}return;}this.value = value;} while(!STATE.compareAndSet(this, s, 1));}}public void error(Throwable e) {if (STATE.getAndSet(this, 3) != 3) {try {this.actual.onError(e);} finally {this.disposeResource(false);}} else {Operators.onOperatorError(e, this.actual.currentContext());}}public MonoSink<T> onRequest(LongConsumer consumer) {Objects.requireNonNull(consumer, "onRequest");if (!REQUEST_CONSUMER.compareAndSet(this, (Object)null, consumer)) {throw new IllegalStateException("A consumer has already been assigned to consume requests");} else {return this;}}public CoreSubscriber<? super T> actual() {return this.actual;}public MonoSink<T> onCancel(Disposable d) {Objects.requireNonNull(d, "onCancel");SinkDisposable sd = new SinkDisposable((Disposable)null, d);if (!DISPOSABLE.compareAndSet(this, (Object)null, sd)) {Disposable c = this.disposable;if (c instanceof SinkDisposable) {SinkDisposable current = (SinkDisposable)c;if (current.onCancel == null) {current.onCancel = d;} else {d.dispose();}}}return this;}public MonoSink<T> onDispose(Disposable d) {Objects.requireNonNull(d, "onDispose");SinkDisposable sd = new SinkDisposable(d, (Disposable)null);if (!DISPOSABLE.compareAndSet(this, (Object)null, sd)) {Disposable c = this.disposable;if (c instanceof SinkDisposable) {SinkDisposable current = (SinkDisposable)c;if (current.disposable == null) {current.disposable = d;} else {d.dispose();}}}return this;}public void request(long n) {if (Operators.validate(n)) {LongConsumer consumer = this.requestConsumer;if (consumer != null) {consumer.accept(n);}int s;do {s = this.state;if (s == 2 || s == 3) {return;}if (s == 1) {if (STATE.compareAndSet(this, s, 3)) {try {this.actual.onNext(this.value);this.actual.onComplete();} finally {this.disposeResource(false);}}return;}} while(!STATE.compareAndSet(this, s, 2));}}public void cancel() {if (STATE.getAndSet(this, 3) != 3) {this.value = null;this.disposeResource(true);}}void disposeResource(boolean isCancel) {Disposable d = this.disposable;if (d != OperatorDisposables.DISPOSED) {d = (Disposable)DISPOSABLE.getAndSet(this, OperatorDisposables.DISPOSED);if (d != null && d != OperatorDisposables.DISPOSED) {if (isCancel && d instanceof SinkDisposable) {((SinkDisposable)d).cancel();}d.dispose();}}}}
}

上面的代码比较多,我们主要关注下面两个函数:

MonoCreate(Consumer<MonoSink<T>> callback) {this.callback = callback;}public void subscribe(CoreSubscriber<? super T> actual) {MonoCreate.DefaultMonoSink<T> emitter = new MonoCreate.DefaultMonoSink(actual);actual.onSubscribe(emitter);try {this.callback.accept(emitter);} catch (Throwable var4) {emitter.error(Operators.onOperatorError(var4, actual.currentContext()));}}

通过上面的代码可以看出,一个是构造器,参数是Consumer,里面进行操作保存了Consumer对象,然后在subscribe方法里面有一句代码是this.callback.accept(emitter),就是在这里进行了接口的回调,回调Consumer的accept方法,这个方法是在调用Mono.create()方法的时候实现了。然后在细看subscribe方法,这里面有一个actual.onSubscribe方法,通过方法名可以知道,这里是订阅了消息。webflux是基于reactor模型,基于事件消息和异步,这里也体现了一个异步。

Mono和Flux的其他用法可以参照上面的源码流程自己看看,就不细说了。

webflux + springboot 整合(史上最全)相关推荐

  1. Java常用工具类整合(史上最全)

    JSON转换工具 package com.taotao.utils;import java.util.List;import com.fasterxml.jackson.core.JsonProces ...

  2. 一文搞定:SpringBoot、SLF4j、Log4j、Logback、Netty之间混乱关系(史上最全)

    文章很长,建议收藏起来慢慢读!疯狂创客圈总目录 语雀版 | 总目录 码云版| 总目录 博客园版 为您奉上珍贵的学习资源 : 免费赠送 :<尼恩Java面试宝典>持续更新+ 史上最全 + 面 ...

  3. sentinel 史上最全

    文章很长,建议收藏起来,慢慢读! 备注:持续更新中- 推荐1:进大厂.升架构.拿高薪 必备 的 经典图书和资料: 高薪必备1 : <Netty Zookeeper Redis 高并发实战> ...

  4. Redis分布式锁(图解 - 秒懂 - 史上最全)

    文章很长,而且持续更新,建议收藏起来,慢慢读! 高并发 发烧友社群:疯狂创客圈(总入口) 奉上以下珍贵的学习资源: 疯狂创客圈 经典图书 : 极致经典 + 社群大片好评 < Java 高并发 三 ...

  5. Prometheus+Grafana (史上最全)

    尼恩大架构 最强环境 系列文章 一键打造 本地elk 实操环境: ELK日志平台(elasticsearch +logstash+kibana)原理和实操(史上最全) 高级开发必备,架构师必备 一键打 ...

  6. redis 集群 实操 (史上最全、5w字长文)

    文章很长,建议收藏起来慢慢读! 总目录 博客园版 为大家准备了更多的好文章!!!! 推荐:尼恩Java面试宝典(持续更新 + 史上最全 + 面试必备)具体详情,请点击此链接 尼恩Java面试宝典,34 ...

  7. 史上最全阿里 Java 面试题总结及答案

    史上最全阿里 Java 面试题总结及答案 qq_35151346 于 2019-08-06 13:26:53 发布 33740 收藏 817 分类专栏: 面试题 文章标签: 阿里巴巴 面试题 答案 j ...

  8. 架构设计面试题 (史上最全、持续更新、吐血推荐)

    文章很长,建议收藏起来,慢慢读! 高并发学习社群 - 疯狂创客圈奉献给大家: 经典图书 - <Netty Zookeeper Redis 高并发实战> 面试必备 + 大厂必备 + 涨薪必备 ...

  9. SpringCloud gateway (史上最全)

    前言 疯狂创客圈(笔者尼恩创建的高并发研习社群)Springcloud 高并发系列文章,将为大家介绍三个版本的 高并发秒杀: 一.版本1 :springcloud + zookeeper 秒杀 二.版 ...

  10. HR面试题(史上最全、持续更新、吐血推荐)

    文章很长,建议收藏起来,慢慢读! 疯狂创客圈为小伙伴奉上以下珍贵的学习资源: 疯狂创客圈 经典图书 : <Netty Zookeeper Redis 高并发实战> 面试必备 + 大厂必备 ...

最新文章

  1. Hololens2-Unity3D开发(一)
  2. 城市仿真为何成为大势所趋?
  3. boost::function_types::is_member_object_pointer的用法测试程序
  4. boost::detail::reference_content的用法测试程序
  5. Weblogic服务端请求伪造漏洞(SSRF)和反射型跨站请求伪造漏洞(CSS)修复教程...
  6. 【渝粤教育】国家开放大学2018年春季 0050-21T民族理论与民族政策 参考试题
  7. book1复习 使用java理解程序逻辑
  8. np.cross函数详解
  9. 北航计算机2014复试上机题,北航计算机系考研复试上机真题及答
  10. 可以学习的国外课件链接地址(自己收集)
  11. nacos配置中心信息 nacos版本2.0.3
  12. 无极浏览器(教学专用浏览器)官方版 v5.0.0.15
  13. vue中实现axios封装
  14. 学号在java是什么意思_在JAVA程序中增加姓名学号
  15. 计算机c盘崩了,崩溃,C盘爆红了!试试这5款电脑清理工具,每一个都很实用
  16. Rect.OverLaps() 改进
  17. Windows性能监控工具Perfmon使用指南
  18. vue 传参 微信_小猿圈web前端之微信小程序页面间跳转传参方式总结
  19. 8路抢答器,有裁判复位
  20. 侧边栏固定定位到版心两侧

热门文章

  1. 完整的浏览器统计信息-2012年9月
  2. 如何压缩zip格式的文件
  3. 2021-2022学习计划
  4. php常用模板引擎,PHP的常用的几大模板引擎_PHP教程
  5. epplus word html,使用EPPlus(C#)读写Excel
  6. DIV高度自适应浏览器高度方法
  7. 展锐Android 10平台OTA升级
  8. mysql8新建用户_mysql8.0以上添加用户与授权
  9. 手把手教你们Python配置OpenCV环境,小白看一遍就会了☀️《❤️记得收藏❤️》
  10. uni-app提供开箱即用的SSR支持