协同编辑方案设计
每个执行动作都是一个事件。 执行用户将事件发送给服务器,然后服务器将事件转发给其他用户。这个过程中执行用户为事件生产者,其他用户为事件消费者。
事件包括:
- 用户A建立连接,其他用户显示“用户A加入编辑”。
- 用户A执行编辑操作,其他用户同步显示执行结果。
- 用户A断开连接,其他用户显示“用户A退出编辑”
编辑操作仅包括旋转和放大缩小。
如何解决并发冲突
协同编辑操作是并发的,可以会存在并发冲突。
一个解决方案是同一时间只允许一个用户进行编辑,其他用户只能实时查看编辑状态。某一用户进行编辑时,会进行加锁。这种方法实现简单,但效率低下。
OT算法
实时协同 OT 算法(Operational Transformation)广泛应用于在线文档协作等场景。
OT算法的核心概念包括操作、转化、因果一致性。基本实现思路为将操作统一收集,按顺序执行,之后给所有用户返回统一的结果。
WebSocket
WebSocket 是一种 全双工通信协议,让客户端(比如浏览器)和服务器之间能够保持实时、持续的连接。
WebSocket 的主要作用是 实现实时数据传输。
WebSocket 和 HTTP 是两种不同的通信协议,但它们是紧密相关的,都是基于 TCP 协议、都可以在同样的端口上工作(比如 80 和 443)。WebSocket 的连接需要通过 HTTP 协议发起一个握手(称为 HTTP Upgrade 请求),这个握手请求是 WebSocket 建立连接的前提,表明希望切换协议;服务器如果支持 WebSocket,会返回一个 HTTP 101 状态码,表示协议切换成功。握手完成后,HTTP 协议的作用结束,通信会切换为 WebSocket 协议,双方可以开始全双工通信。
WebSocket类似客户端和服务器之间的代理,用来保存和转发会话信息。
协同编辑功能实现
功能基本的实现思路为:
- 客户端向服务器发送建立连接请求
- 服务器进行校验,并作出响应
- 客户端向服务器发送编辑请求
- 服务器做出响应
- 客户端断开
需要的数据模型为编辑请求信息类、响应消息类、消息枚举类、编辑操作枚举类等。
权限校验
权限校验通过实现 HandshakeInterceptor
接口中的beforeHandshake
方法,来自定义一个 WebSocket 拦截器实现。校验成功后,将用户信息保存在attributes
中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
| @Component @Slf4j public class WsHandshakeInterceptor implements HandshakeInterceptor {
@Resource private UserService userService;
@Resource private PictureService pictureService;
@Resource private SpaceService spaceService;
@Resource private SpaceUserAuthManager spaceUserAuthManager;
@Override public boolean beforeHandshake(@NotNull ServerHttpRequest request, @NotNull ServerHttpResponse response, @NotNull WebSocketHandler wsHandler, @NotNull Map<String, Object> attributes) { if (request instanceof ServletServerHttpRequest) { HttpServletRequest servletRequest = ((ServletServerHttpRequest) request).getServletRequest(); String pictureId = servletRequest.getParameter("pictureId"); if (StrUtil.isBlank(pictureId)) { log.error("缺少图片参数,拒绝握手"); return false; } User loginUser = userService.getLoginUser(servletRequest); if (ObjUtil.isEmpty(loginUser)) { log.error("用户未登录,拒绝握手"); return false; } Picture picture = pictureService.getById(pictureId); if (picture == null) { log.error("图片不存在,拒绝握手"); return false; } Long spaceId = picture.getSpaceId(); Space space = null; if (spaceId != null) { space = spaceService.getById(spaceId); if (space == null) { log.error("空间不存在,拒绝握手"); return false; } if (space.getSpaceType() != SpaceTypeEnum.TEAM.getValue()) { log.info("不是团队空间,拒绝握手"); return false; } } List<String> permissionList = spaceUserAuthManager.getPermissionList(space, loginUser); if (!permissionList.contains(SpaceUserPermissionConstant.PICTURE_EDIT)) { log.error("没有图片编辑权限,拒绝握手"); return false; } attributes.put("user", loginUser); attributes.put("userId", loginUser.getId()); attributes.put("pictureId", Long.valueOf(pictureId)); } return true; }
@Override public void afterHandshake(@NotNull ServerHttpRequest request, @NotNull ServerHttpResponse response, @NotNull WebSocketHandler wsHandler, Exception exception) { } }
|
WebSocket 处理器
可能同时有多个协同编辑任务,因此需要同时保存这些会话。
可以使用两个map来管理用户和用户会话集合,以pictureId为key,用户或用户会话集合为值。 因为多个任务并发执行,使用 ConcurrentHashMap<>()
。
1 2 3 4 5
| private final Map<Long, Long> pictureEditingUsers = new ConcurrentHashMap<>();
private final Map<Long, Set<WebSocketSession>> pictureSessions = new ConcurrentHashMap<>();
|
其他功能主要是消息处理。初步采用同一时刻仅允许一个用户进行编辑。
Disruptor 优化
当前问题
虽然不同任务可以并发,但一个任务占据单个线程,若并发量提高则导致响应时间较长。
可以专门使用一个线程异步处理消息,为了保证消息的顺序处理,需要一个队列进行维护。
使用线程池足以解决问题。为了提高性能,可以使用 Disruptor 无锁队列来减少线程上下文的切换。Disruptor 还可以通过优雅停机机制,在服务停止前执行完所有的任务,再退出服务,防止消息丢失。
Disruptor
数据结构使用环形缓冲区,依赖 CAS(Compare-And-Swap)和内存屏障的无锁技术进行并发控制。
实现时主要通过事件、生成者、消费者来实现。
实现
在初始化 Disruptor 时,引入自定义事件和消费者处理器。 广播消息方法修改为生产者生成时间。
具体细节为:
- 定义事件类型
PictureEditEvent
- 定义消费者
PictureEditEventWorkHandler implements WorkHandler<PictureEditEvent>
- 添加 Disruptor 配置类,用于初始化 Disruptor,在此绑定事件类型和消费者。 同时在
close
方法上添加@PreDestroy
注解,调用shutdown
方法实现优雅停机。
- 定义生产者,在环形缓冲区中添加事件
上一节中广播方法修改为生产事件。