文章目录:

1.承接Future和FutureTask

2.CompletableFuture四大静态方法

2.1 runAsync(Runnable runnable)

2.2 runAsync(Runnable runnable, Executor executor)

2.3 supplyAsync(Suppliersupplier)

2.4 supplyAsync(Supplier supplier, Executor executor)

3.CompletableFuture减少阻塞和轮询

4.电商小案例——简单应用CompletableFuture

5.CompletableFuture常用API

5.1 获得结果和触发计算

5.2 对计算结果进行处理

5.3 对计算结果进行消费

5.4 CompletableFuture和线程池(thenRun、thenRunAsync)

5.5 对计算速度进行选用

5.6 对计算结果进行合并


1.承接Future和FutureTask

在上一篇文章中和大家分享了Future和FutureTask,也在文末指明了这二者的缺点,链接:Java——聊聊JUC中的Future和FutureTask

由此引出了异步编程界的大佬 CompletableFuture。

由图可知,二者在都实现了 Future 接口,而 CompletableFuture 又实现了 CompletionStage 这个接口,强大之处就在这里。

CompletableFuture 提供了非常强大的针对Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调方式处理计算结果,也提供了转换和组合 CompletableFuture 的一系列方法,它可以代表一个明确完成的 Future,也可以代表一个完成阶段 CompletionStage。


2.CompletableFuture四大静态方法

我们现在都知道了 CompletableFuture 是一个类,学Java的最清楚了,给我个类,老子就 new,你别说别的,老子就是new,new就完事了,但是翻了翻jdk官方文档,打脸了。。。

他告诉我们对于 CompletableFuture 的无参构造,其实是创建了一个不完整的 CompletableFuture,那还new个屁啊。     所以这就需要它提供的四大静态方法来获取 CompletableFuture 对象了。(实际开发中常用的是第二组),对于这两组,我们都可以单独的传入 Runnable 接口或者 Supplier 供给型接口,也可以结合线程池传入。如果没有传入线程池参数,那么将使用默认的  ForkJoinPool.commonPool()  ;如果指定了线程池参数,将以我们自定义的为主。

2.1 runAsync(Runnable runnable)

package com.szh.demo;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;public class CompletableFutureDemo1 {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {System.out.println(Thread.currentThread().getName());try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}});System.out.println(completableFuture.get());}
}

可以看到,这是没有返回值、没有指定线程池的静态方法,所以使用默认的 ForkJoinPool.commonPool,返回值因为没有所以就是 null。

2.2 runAsync(Runnable runnable, Executor executor)

package com.szh.demo;import java.util.concurrent.*;public class CompletableFutureDemo2 {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService threadPool = Executors.newFixedThreadPool(3);CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {System.out.println(Thread.currentThread().getName());try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}, threadPool);System.out.println(completableFuture.get());threadPool.shutdown();}
}

这里,我们自定义了线程池,所以就使用我们自己定义的了。

2.3 supplyAsync(Supplier<U> supplier)

package com.szh.demo;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;public class CompletableFutureDemo3 {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName());try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return "hello supplyAsync";});System.out.println(completableFuture.get());}
}

这是有返回值的一组,不指定线程池就使用默认的,get可以正常获取到异步线程的执行结果。

2.4 supplyAsync(Supplier<U> supplier, Executor executor)

package com.szh.demo;import java.util.concurrent.*;public class CompletableFutureDemo4 {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService threadPool = Executors.newFixedThreadPool(3);CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName());try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return "hello supplyAsync";}, threadPool);System.out.println(completableFuture.get());threadPool.shutdown();}
}

这里,我们自定义了线程池,所以就使用我们自己定义的了。  get可以正常获取到异步线程的执行结果。


3.CompletableFuture减少阻塞和轮询

package com.szh.demo;import java.util.Objects;
import java.util.concurrent.*;public class CompletableFutureDemo5 {public static void main(String[] args) {ExecutorService threadPool = Executors.newFixedThreadPool(1);try {CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + " ---- come in");int result = ThreadLocalRandom.current().nextInt(10);try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}
//                if (result > 2) {
//                    int ans = 10 / 0;
//                }System.out.println("出结果了:" + result);return result;}, threadPool).whenComplete((v, e) -> {if (Objects.isNull(e)) {System.out.println("计算完成:" + v);}}).exceptionally(e -> {e.printStackTrace();System.out.println("异常情况:" + e.getMessage());return -1;});System.out.println(Thread.currentThread().getName() + " 线程在忙其他事情....");} catch (Exception e) {e.printStackTrace();} finally {threadPool.shutdown();}}
}

这里的异步线程计算结果需要3秒,而main线程在忙自己的业务,当异步线程计算完毕之后,会自动回调 whenComplete 方法。

将代码中注释的部分打开,出现异常之后,当异步线程计算结果超过2之后,就会发生异常。

总结:

  1. 异步线程执行任务结束时,会自动回调某个对象的方法。(上面的案例是 whenComplete )
  2. 主线程设置好回调之后,不再关心异步线程的任务执行的究竟怎样,异步任务之间可以顺序执行。
  3. 异步线程执行任务出异常时,会自动回调某个对象的方法。(上面的案例是 exceptionally )

4.电商小案例——简单应用CompletableFuture

对于同一款产品,同时搜索出本产品在各大电商平台的售价,案例中的产品就拿 mysql 书籍为例。

解决方案:

  1. step by step:一步一步执行,按部就班,先查京东,再查当当,最后查淘宝。
  2. all in:异步线程执行,万箭齐发,京东、当当、淘宝同时多任务同时查询。
package com.szh.demo;import lombok.Getter;import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;@Getter
class NetMail {private String netMailName;public NetMail(String netMailName) {this.netMailName = netMailName;}//根据商品名称模拟一个随机商品价格,睡了一秒模拟在电商网站中搜索的耗时//这里偷懒,价格这块就使用double简单点,不再使用BigDecimal了public double calculatePrice(String productName) {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);}
}public class CompletableFutureDemo6 {static List<NetMail> netMailList = Arrays.asList(new NetMail("jd"),new NetMail("dangdang"),new NetMail("taobao"),new NetMail("tmail"),new NetMail("pdd"));//step by step,一家一家搜索public static List<String> getPrice(List<NetMail> netMailList, String productName) {return netMailList.stream().map(netMail -> String.format(productName + " in %s price is %.2f", netMail.getNetMailName(), netMail.calculatePrice(productName))).collect(Collectors.toList());}//CompletableFuture,万箭齐发同时搜索//List<NetMail> ---> List<CompletableFuture<String>> ---> List<String>public static List<String> getPriceByCompletableFuture(List<NetMail> netMailList, String productName) {return netMailList.stream()//开启异步多线程模式同时搜索,将每一个搜索任务都映射成一个CompletableFuture异步任务.map(netMail -> CompletableFuture.supplyAsync(() -> String.format(productName + " in %s price is %.2f", netMail.getNetMailName(), netMail.calculatePrice(productName)))).collect(Collectors.toList()).stream()//将异步多线程的执行结果再次映射成新的List流//join方法和get方法是一样的,获取到异步线程的执行结果,区别是join方法不会产生异常.map(CompletableFuture::join).collect(Collectors.toList());}public static void main(String[] args) {//方式一耗时long startTime1 = System.currentTimeMillis();List<String> list1 = getPrice(netMailList, "Redis");list1.forEach(System.out::println);long endTime1 = System.currentTimeMillis();System.out.println("step by step原始方式耗时:" + (endTime1 - startTime1) + " ms");System.out.println("---------------------------------------");//方式二耗时long startTime2 = System.currentTimeMillis();List<String> list2 = getPriceByCompletableFuture(netMailList, "Redis");list2.forEach(System.out::println);long endTime2 = System.currentTimeMillis();System.out.println("CompletableFuture方式耗时:" + (endTime2 - startTime2) + " ms");}
}

step by step 就是一个一个搜索,代码中模拟搜索一次就是1秒,所以五家电商平台就搜索了5次,大概5秒。

all in 就是开启异步多线程,在五家电商平台同时搜索,所以整体就是这1秒完事。


5.CompletableFuture常用API

5.1 获得结果和触发计算

package com.szh.demo;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;public class CompletableFutureDemo7 {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return "abc";});//等待拿到结果之后再走人System.out.println(completableFuture.get());//异步线程需要2秒才可以计算完成,主线程这边最多等待1秒,过期不候//System.out.println(completableFuture.get(1, TimeUnit.SECONDS));//和get()方法一样,只是join不会抛出异常//System.out.println(completableFuture.join());//如果主线程这边立刻获取的时候,异步线程还没有计算完成,则返回getNow中设定的备胎值//打开下面的try-catch语句,主线程会等待3秒之后再去获取异步线程的计算结果,而异步线程只需要2秒就可以计算完成,所以主线程可以拿到异步线程的计算结果
//        try {
//            TimeUnit.SECONDS.sleep(3);
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
//        System.out.println(completableFuture.getNow("xyz"));//根据主线程、异步线程的执行时间来决定是否打断异步线程的计算过程,直接返回complete设定的值//异步线程计算耗时2秒,主线程等待3秒,则不会打断,正常拿到异步线程的计算结果,false//异步线程计算耗时2秒,主线程等待1秒,则会打断,此时直接输出主线程complete设定的值,truetry {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}//System.out.println(completableFuture.complete("rst") + " " + completableFuture.join());}
}

public T get() 方法执行结果如下:↓↓↓

public T get(long timeout, TimeUnit unit) 打开注释,执行结果如下:↓↓↓

public T join()  打开注释,执行结果如下:↓↓↓

public T getNow(T valueIfAbsent)  打开注释,执行结果如下:↓↓↓ (详情参考上面代码中的注释,写的很详细了)

public boolean complete(T value)  打开注释,执行结果如下:↓↓↓(详情参考上面代码中的注释,写的很详细了)

5.2 对计算结果进行处理

这几个API的计算结果存在依赖关系,多个线程串行化。第一个任务执行完成后,执行第二个回调方法任务,会将第一个任务的执行结果,作为第二个任务的入参,传递到回调方法中。

下面的代码是针对 handle 方法走的,它和 thenApply 的区别主要在遇到异常的时候。

handle如果遇到异常,那么仅仅是异常所在的这一步出错,而之后的会继续正常执行。(有点类似于 try-catch-finally)

thenApply如果遇到异常,后续的就都不会再执行了。(有点类似于 try-catch)

package com.szh.demo;import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;public class CompletableFutureDemo8 {public static void main(String[] args) {ExecutorService threadPool = Executors.newFixedThreadPool(2);CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("111");return 1;}, threadPool).handle((f, e) -> {//int x = 10 / 0;System.out.println("222");return f + 2;}).handle((f, e) -> {System.out.println("333");return f + 3;}).whenComplete((v, e) -> {if (Objects.isNull(e)) {System.out.println("计算结果:" + v);}}).exceptionally(e -> {e.printStackTrace();System.out.println(e.getMessage());return -1;});System.out.println(Thread.currentThread().getName() + " 在忙其他任务");threadPool.shutdown();}
}

出现异常之后

将上述代码中的 handle 方法换成 thenApply之后(二者的函数式接口不一致,所以代码做如下修改),在没有异常情况下,和 handle 执行结果是一样的,就不再截图了。  如果出现异常了,thenApply是下图这种情况:↓↓↓

        .thenApply(f -> {int x = 10 / 0;System.out.println("222");return f + 2;}).thenApply(f -> {System.out.println("333");return f + 3;})

5.3 对计算结果进行消费

package com.szh.demo;import java.util.concurrent.CompletableFuture;/*** thenRun: 任务A执行完再执行任务B,并且B不需要A的结果* thenAccept: 任务A执行完再执行任务B,并且B需要A的结果,但是B无返回值* thenApply: 任务A执行完再执行任务B,并且B需要A的结果,同时B有返回值*/
public class CompletableFutureDemo9 {public static void main(String[] args) {System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}).join());System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenAccept(r -> System.out.println(r)).join());System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(r -> r + "resultB").join());}
}

5.4 CompletableFuture和线程池(thenRunthenRunAsync

如果没有传入自定义线程池,那么大家都用默认的 ForkJoinPool。

如果你执行第一个任务的时候,传入了一个自定义线程池:

  • 调用thenRun方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池。
  • 调用thenRunAsync方法执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池。

这样的方法有好多组,thenAccept和thenAcceptAsync,thenApply和thenApplyAsync等,它们之间的区别也是这个。

package com.szh.demo;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;public class CompletableFutureDemo10 {public static void main(String[] args) {ExecutorService threadPool = Executors.newFixedThreadPool(3);try {CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("1号任务" + "\t" + Thread.currentThread().getName());return "abcd";}, threadPool).thenRun(() -> {try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("2号任务" + "\t" + Thread.currentThread().getName());}).thenRun(() -> {try {TimeUnit.MILLISECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("3号任务" + "\t" + Thread.currentThread().getName());}).thenRun(() -> {try {TimeUnit.MILLISECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("4号任务" + "\t" + Thread.currentThread().getName());});System.out.println(completableFuture.get(2L, TimeUnit.SECONDS));} catch (Exception e) {e.printStackTrace();} finally {threadPool.shutdown();}}
}

全部都是  thenRun,则会使用我们自定义的线程池。当然了如果全部都是thenRunAsync,也一样使用的是我们自定义的线程池。

将代码中的第一个 thenRun 改成 thenRunAsync,就不一样了。

在源码中是可以看到的,如果你调用的 thenRunAsync,那么他就会给你用默认的线程池。

下面那个布尔方法表示CPU的内核数是否大于1,现在的电脑基本都满足了,所以可以理解为恒定的true。

5.5 对计算速度进行选用

public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
package com.szh.demo;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureDemo11 {public static void main(String[] args) {CompletableFuture<String> playA = CompletableFuture.supplyAsync(() -> {System.out.println("A come in");try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return "playA";});CompletableFuture<String> playB = CompletableFuture.supplyAsync(() -> {System.out.println("B come in");try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}return "playB";});CompletableFuture<String> completableFuture = playA.applyToEither(playB, f -> f + " is winner....");System.out.println(completableFuture.join());}
}

5.6 对计算结果进行合并

public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)
package com.szh.demo;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureDemo12 {public static void main(String[] args) {CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + " 启动....");try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return 10;});CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + " 启动....");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return 20;});CompletableFuture<Integer> result = completableFuture1.thenCombine(completableFuture2, (x, y) -> {System.out.println("开始对两个结果进行合并....");return x * y;});System.out.println(result.join());}
}

第二种写法如下

package com.szh.demo;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureDemo12 {public static void main(String[] args) {CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + " 启动....");try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return 10;}).thenCombine(CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + " 启动....");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return 20;}), (x, y) -> {System.out.println("开始对两个结果进行合并....");return x * y;});System.out.println(completableFuture1.join());}
}

Java——聊聊JUC中的CompletableFuture相关推荐

  1. Java——聊聊JUC中的Future和FutureTask

    1.回顾Future和FutureTask 首先,Future是一个接口(Java5就诞生了),而FutureTask是它的实现类.而Future接口则是定义了操作异步任务执行的一些方法,提供了一种异 ...

  2. Java——聊聊JUC中的Java内存模型(JMM)

    文章目录: 1.CPU缓存模型 2.Java内存模型Java Memory Model 3.JMM规范下的三大特性 3.1 原子性 3.2 可见性 3.3 有序性 4.JMM规范下,多线程对变量的读写 ...

  3. Java——聊聊JUC中的volatile与内存屏障

    文章目录: 1.volatile内存语义 2.内存屏障 2.1 粗分两种:写屏障 2.2 粗分两种:读屏障 2.3 细分四种 3.volatile可见性介绍 4.volatile无原子性介绍 5.vo ...

  4. java中什么是 伪共享_【Java】聊聊多线程中的伪共享现象

    首页 专栏 java 文章详情 0 聊聊多线程中的伪共享现象 小强大人发布于 1 月 27 日 什么是伪共享? 讲伪共享之前,让我们先乘坐时光机,回到大学课堂,来重温下计算机组成原理的基础知识.我们知 ...

  5. JUC里面的相关分类|| java并发编程中,关于锁的实现方式有两种synchronized ,Lock || Lock——ReentrantLock||AQS(抽象队列同步器)

    JUC分类 java并发编程中,关于锁的实现方式有两种synchronized ,Lock AQS--AbstractQueuedSynchronizer

  6. 多线程总结-JUC中常用的工具类

    本文只记录JUC中较常用到的一些工具类,只是列举其常见的使用方法,至于其实现原理,此处不做说明. CountDownLatch 一个同步工具类,允许一个或多个线程一直等待,直到其他线程运行完成后再执行 ...

  7. 基于《狂神说Java》JUC并发编程--学习笔记

    前言: 本笔记仅做学习与复习使用,不存在刻意抄袭. -------------------------------------------------------------------------- ...

  8. JUC高级一: CompletableFuture

    JUC高级: CompletableFuture 1. 线程基础知识复习 1.1 JUC四大口诀 高内聚低耦合前提下,封装思想 线程->操作---->资源类 判断.干活.通知 防止虚假唤醒 ...

  9. Java之JUC并发编程

    JUC JUC概述 进程与线程 线程的状态 wait / sleep的区别 并发和并行 管程 用户线程与守护线程 用户线程 守护线程 Lock 接口 synchronized Lock锁 lock() ...

最新文章

  1. A股融资融券余额是什么意思?
  2. JMeter重要知识点汇总
  3. oracle如何删除重复数据第一条,oracle删除重复数据保留第一条记录
  4. (转)python协程2:yield from 从入门到精通
  5. 欧拉折线法解常微分方程C语言,05常微分方程数值解.ppt
  6. 关于蚁剑/菜刀无法连接shell的一种可尝试解决方案
  7. ug不能自动启动服务器,没有足够的权限启动系统服务解决方法
  8. poj 3310(并查集判环,图的连通性,树上最长直径路径标记)
  9. 【JAVA线程间通信技术】
  10. 李想称十年后要成为汽车界苹果;雅虎邮箱停服;Linux内核欲采用现代C语言标准 | 极客头条...
  11. vue.js+koa2项目实战(四)搭建koa2服务端
  12. jquery按需加载js和css插件使用说明
  13. c语言 list 使用数组来实现_C|用数组或链表来实现队列这种抽象数据类型
  14. 为什么会出现35岁就失业的魔咒?
  15. Hash冲突的解决方式
  16. Sketch2AE插件(Sketch文件导入AE)最新破解版
  17. 2017年苹果开发者账号申请——账号VISA卡支付流程
  18. 读书、学习、工作和生活中收集的20条经典语录:21-40
  19. 计算机文件夹加密文件,电脑文件夹怎么加密,制作隐私的加密文件夹软件
  20. 《迅雷链精品课》第一课:认识区块链

热门文章

  1. 笔记四:Maven+SSM之整合redis
  2. 华为VR Glass 6DoF 评测
  3. Java数组解析(详解)
  4. M19.单链表删除指定结点--类型总结(链表)
  5. 11.1 - 每日一题 - 408
  6. Educational-Codeforces-Round-61-ABCDF题解
  7. pandapower最优潮流
  8. Java API 最佳开发实践
  9. 会操作excel就会批量证书打印!
  10. JAVA Spring Shiro mybaits 后台管理 二次开发框架 OA ERP CMS 微信 IM即时通讯(websocket)