智能云端素材库后端-图片协同编辑

协同编辑方案设计

每个执行动作都是一个事件。 执行用户将事件发送给服务器,然后服务器将事件转发给其他用户。这个过程中执行用户为事件生产者,其他用户为事件消费者。

事件包括:

  • 用户A建立连接,其他用户显示“用户A加入编辑”。
  • 用户A执行编辑操作,其他用户同步显示执行结果。
  • 用户A断开连接,其他用户显示“用户A退出编辑”

编辑操作仅包括旋转和放大缩小。

如何解决并发冲突

协同编辑操作是并发的,可以会存在并发冲突。

一个解决方案是同一时间只允许一个用户进行编辑,其他用户只能实时查看编辑状态。某一用户进行编辑时,会进行加锁。这种方法实现简单,但效率低下。

OT算法

实时协同 OT 算法(Operational Transformation)广泛应用于在线文档协作等场景。

OT算法的核心概念包括操作、转化、因果一致性。基本实现思路为将操作统一收集,按顺序执行,之后给所有用户返回统一的结果。

WebSocket

WebSocket 是一种 全双工通信协议,让客户端(比如浏览器)和服务器之间能够保持实时、持续的连接。

WebSocket 的主要作用是 实现实时数据传输。

WebSocket 和 HTTP 是两种不同的通信协议,但它们是紧密相关的,都是基于 TCP 协议、都可以在同样的端口上工作(比如 80 和 443)。WebSocket 的连接需要通过 HTTP 协议发起一个握手(称为 HTTP Upgrade 请求),这个握手请求是 WebSocket 建立连接的前提,表明希望切换协议;服务器如果支持 WebSocket,会返回一个 HTTP 101 状态码,表示协议切换成功。握手完成后,HTTP 协议的作用结束,通信会切换为 WebSocket 协议,双方可以开始全双工通信。

WebSocket类似客户端和服务器之间的代理,用来保存和转发会话信息。

协同编辑功能实现

功能基本的实现思路为:

  • 客户端向服务器发送建立连接请求
  • 服务器进行校验,并作出响应
  • 客户端向服务器发送编辑请求
  • 服务器做出响应
  • 客户端断开

需要的数据模型为编辑请求信息类、响应消息类、消息枚举类、编辑操作枚举类等。

权限校验

权限校验通过实现 HandshakeInterceptor 接口中的beforeHandshake方法,来自定义一个 WebSocket 拦截器实现。校验成功后,将用户信息保存在attributes中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@Component
@Slf4j
public class WsHandshakeInterceptor implements HandshakeInterceptor {

@Resource
private UserService userService;

@Resource
private PictureService pictureService;

@Resource
private SpaceService spaceService;

@Resource
private SpaceUserAuthManager spaceUserAuthManager;

@Override
public boolean beforeHandshake(@NotNull ServerHttpRequest request, @NotNull ServerHttpResponse response, @NotNull WebSocketHandler wsHandler, @NotNull Map<String, Object> attributes) {
if (request instanceof ServletServerHttpRequest) {
HttpServletRequest servletRequest = ((ServletServerHttpRequest) request).getServletRequest();
// 获取请求参数
String pictureId = servletRequest.getParameter("pictureId");
if (StrUtil.isBlank(pictureId)) {
log.error("缺少图片参数,拒绝握手");
return false;
}
User loginUser = userService.getLoginUser(servletRequest);
if (ObjUtil.isEmpty(loginUser)) {
log.error("用户未登录,拒绝握手");
return false;
}
// 校验用户是否有该图片的权限
Picture picture = pictureService.getById(pictureId);
if (picture == null) {
log.error("图片不存在,拒绝握手");
return false;
}
Long spaceId = picture.getSpaceId();
Space space = null;
if (spaceId != null) {
space = spaceService.getById(spaceId);
if (space == null) {
log.error("空间不存在,拒绝握手");
return false;
}
if (space.getSpaceType() != SpaceTypeEnum.TEAM.getValue()) {
log.info("不是团队空间,拒绝握手");
return false;
}
}
List<String> permissionList = spaceUserAuthManager.getPermissionList(space, loginUser);
if (!permissionList.contains(SpaceUserPermissionConstant.PICTURE_EDIT)) {
log.error("没有图片编辑权限,拒绝握手");
return false;
}
// 设置 attributes
attributes.put("user", loginUser);
attributes.put("userId", loginUser.getId());
attributes.put("pictureId", Long.valueOf(pictureId)); // 记得转换为 Long 类型
}
return true;
}

@Override
public void afterHandshake(@NotNull ServerHttpRequest request, @NotNull ServerHttpResponse response, @NotNull WebSocketHandler wsHandler, Exception exception) {
}
}

WebSocket 处理器

可能同时有多个协同编辑任务,因此需要同时保存这些会话。

可以使用两个map来管理用户和用户会话集合,以pictureId为key,用户或用户会话集合为值。 因为多个任务并发执行,使用 ConcurrentHashMap<>()

1
2
3
4
5
// 每张图片的编辑状态,key: pictureId, value: 当前正在编辑的用户 ID
private final Map<Long, Long> pictureEditingUsers = new ConcurrentHashMap<>();

// 保存所有连接的会话,key: pictureId, value: 用户会话集合
private final Map<Long, Set<WebSocketSession>> pictureSessions = new ConcurrentHashMap<>();

其他功能主要是消息处理。初步采用同一时刻仅允许一个用户进行编辑。

Disruptor 优化

当前问题

虽然不同任务可以并发,但一个任务占据单个线程,若并发量提高则导致响应时间较长。

可以专门使用一个线程异步处理消息,为了保证消息的顺序处理,需要一个队列进行维护。

使用线程池足以解决问题。为了提高性能,可以使用 Disruptor 无锁队列来减少线程上下文的切换。Disruptor 还可以通过优雅停机机制,在服务停止前执行完所有的任务,再退出服务,防止消息丢失。

Disruptor

数据结构使用环形缓冲区,依赖 CAS(Compare-And-Swap)和内存屏障的无锁技术进行并发控制。

实现时主要通过事件、生成者、消费者来实现。

实现

在初始化 Disruptor 时,引入自定义事件和消费者处理器。 广播消息方法修改为生产者生成时间。

具体细节为:

  • 定义事件类型 PictureEditEvent
  • 定义消费者 PictureEditEventWorkHandler implements WorkHandler<PictureEditEvent>
  • 添加 Disruptor 配置类,用于初始化 Disruptor,在此绑定事件类型和消费者。 同时在close方法上添加@PreDestroy注解,调用shutdown方法实现优雅停机。
  • 定义生产者,在环形缓冲区中添加事件

上一节中广播方法修改为生产事件。