基于Redis的基本数据结构和业务逻辑设计实现用户活跃排行榜。

1.1、场景说明

技术派中,设计了一个社区用户的活跃排行榜,包括日榜和月榜。

用户活跃度计算方式:

  1. 用户每访问一个新的页面, +1分
  2. 对于一篇文章,点赞、收藏, +2分;取消点赞、取消收藏,将之前的活跃分收回。
  3. 文章评论, +3分
  4. 发布一篇审核通过的文章, +10分

榜单:
展示活跃度最高的前三十名用户。

效果如下:

img

1.2、方案设计

使用Redis的ZSet数据结构实现,以下是ZSet的简介:

Redis的ZSet(有序集合)是一种有序的、唯一的数据结构。它类似于Set,但每个元素都关联着一个分数(score),用于进行排序。

ZSet的特点包括:

  1. 有序性:ZSet中的元素按照分数进行排序,可以根据分数进行范围查询、区间获取等操作。
  2. 唯一性:ZSet中的元素是唯一的,不会存在重复的元素。
  3. 快速的插入和删除:ZSet使用了跳跃表(Skip List)和哈希表(Hash Table)的结合体,使得插入和删除操作的时间复杂度为O(log N)。
  4. 高效的查找:通过索引和跳跃表的特性,可以在O(log N)的时间复杂度内查找某个元素。

ZSet常用的操作包括:

  • ZADD:向ZSet中添加一个元素,同时指定其分数。
  • ZREM:从ZSet中移除一个元素。
  • ZRANGE:按照分数的顺序,获取指定范围内的元素。
  • ZSCORE:获取指定元素的分数。
  • ZINCRBY:将指定元素的分数增加一个特定的值。

ZSet广泛应用于排行榜、计分系统、排行榜、时间轴等场景,提供了高效的排序和检索功能。

1.3、排行榜实现

1.3.1、业务实体设计

​ 我们先实现一个更新用户活跃的方法,首先定义一个涵盖该业务场景的参数传递实体 ActivityScoreBo,记录用户活动(是否访问页面、点赞、收藏、评论、关注、发布文章)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@Data
@Accessors(chain = true)
public class ActivityScoreBo {
/**
* 访问页面增加活跃度
*/
private String path;

/**
* 目标文章
*/
private Long articleId;

/**
* 评论增加活跃度
*/
private Boolean rate;

/**
* 点赞增加活跃度
*/
private Boolean praise;

/**
* 收藏增加活跃度
*/
private Boolean collect;

/**
* 发布文章增加活跃度
*/
private Boolean publishArticle;

/**
* 被关注的用户
*/
private Long followedUserId;

/**
* 关注增加活跃度
*/
private Boolean follow;
}

有了业务实体,进一步我们需要计算活跃度。活跃度包括日榜和月榜,如下为对应的key生成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 当天活跃度排行榜
*
* @return 当天排行榜key
*/
private String todayRankKey() {
return ACTIVITY_SCORE_KEY + DateUtil.format(DateTimeFormatter.ofPattern("yyyyMMdd"), System.currentTimeMillis());
}

/**
* 本月排行榜
*
* @return 月度排行榜key
*/
private String monthRankKey() {
return ACTIVITY_SCORE_KEY + DateUtil.format(DateTimeFormatter.ofPattern("yyyyMM"), System.currentTimeMillis());
}

1.3.2、计算活跃度

(1)根据传入的业务实体,判断文章浏览、点赞、收藏、关注、排列、发文等行为,并计算对应的字段field和分数score,不同行为的加分机制见上述场景说明。

(2)接着,根据用户ID和当天日期(日为单位)生成一个唯一的活跃度信息的键值(userActionKey)。

(3)通过RedisClient从Redis中获取该userActionKey键对应的field值(ans)。

(4)如果ans为null,说明之前没有加分记录,执行加分操作:

    • 如果加分数大于0,将加分记录保存到Redis的hash结构中,并设置有效期为一个月。/注:【加分记录使用Redis的Hash结构存储,key为userActionKey,字段为field,value为分数score】
    • 更新当天和当月的活跃度排行榜,使用Redis的zIncrBy函数。
    • 如果新的活跃度得分大于等于加分数,更新日活跃榜单和月活跃榜单的有效期。

(5)如果ans大于0,说明之前该field已经加过分,继续判断:

    • 如果分数小于0,说明是减分行为,应从Redis中删除加分记录。
    • 更新当天和当月的活跃度排行榜。

幂等策略:

在上述加分操作中,为了防止重复加活跃度,我们做了一个幂等操作。

就是将用户的每个加分项,都记录下来,在执行具体加分时,基于此来做幂等判定 。

因此,我们对每个用户维护一个活跃更新的操作历史记录表,保存在redis的hash数据结构中,每天一个记录。

1
2
3
key: activity_rank_{user_id}_{年月日} 
field: 活跃度更新key
value: 添加的活跃度

思考:

1、事务问题:虽然单个redis操作是原子性的,但多次的redis操作,存在事务问题。

2、并发问题:没有做并发,幂等无法100%生效,依然可能存在重复添加/扣减活跃度的情况

问题一:

通过最终一致性(Eventual Consistency)来解决多次Redis操作的事务问题是一种常见的方法。最终一致性是指在分布式系统中,经过一段时间后,系统的所有副本最终会达到一致的状态。

在Redis中,可以使用以下方法来实现最终一致性:

  1. 批量操作:将多个操作组合成一个批量操作,减少网络往返的次数。例如,使用管道(Pipeline)来发送多个命令,然后一次性获取它们的响应。
  2. 异步操作:将操作异步化,即将操作放入消息队列或任务队列中,由后台线程或其他服务异步处理。这样可以避免阻塞主线程,并允许操作在不同的时间点执行。
  3. 回滚机制:在执行操作之前,先将相关数据备份或记录下来。如果操作失败,可以使用备份数据进行回滚操作。
  4. 重试机制:如果某个操作失败,可以进行重试,直到操作成功或达到最大重试次数。
  5. 业务层面的补偿机制:如果操作失败,可以通过业务逻辑来进行补偿操作,以达到一致性。

需要注意的是,最终一致性并不能提供强一致性的保证,因此在某些场景下可能会出现数据不一致的情况。在选择使用最终一致性来解决事务问题时,需要根据具体的业务需求和数据一致性要求来评估和权衡。

问题二:

通过加锁解决并发问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public void addActivityScore(Long userId, ActivityScoreBo activityScore) {
if (userId == null) {
return;
}

// 1. 计算活跃度(正为加活跃,负为减活跃)
String field;
int score = 0;
if (activityScore.getPath() != null) {
field = "path_" + activityScore.getPath();
score = 1;
} else if (activityScore.getArticleId() != null) {
field = activityScore.getArticleId() + "_";
if (activityScore.getPraise() != null) {
field += "praise";
score = BooleanUtils.isTrue(activityScore.getPraise()) ? 2 : -2;
} else if (activityScore.getCollect() != null) {
field += "collect";
score = BooleanUtils.isTrue(activityScore.getCollect()) ? 2 : -2;
} else if (activityScore.getRate() != null) {
// 评论回复
field += "rate";
score = BooleanUtils.isTrue(activityScore.getRate()) ? 3 : -3;
} else if (BooleanUtils.isTrue(activityScore.getPublishArticle())) {
// 发布文章
field += "publish";
score += 10;
}
} else if (activityScore.getFollowedUserId() != null) {
field = activityScore.getFollowedUserId() + "_follow";
score = BooleanUtils.isTrue(activityScore.getFollow()) ? 2 : -2;
} else {
return;
}

final String todayRankKey = todayRankKey();
final String monthRankKey = monthRankKey();
// 2. 幂等:判断之前是否有更新过相关的活跃度信息
final String userActionKey = ACTIVITY_SCORE_KEY + userId + DateUtil.format(DateTimeFormatter.ofPattern("yyyyMMdd"), System.currentTimeMillis());
Integer ans = RedisClient.hGet(userActionKey, field, Integer.class);
if (ans == null) {
// 2.1 之前没有加分记录,执行具体的加分
if (score > 0) {
// 记录加分记录
RedisClient.hSet(userActionKey, field, score);
// 个人用户的操作记录,保存一个月的有效期,方便用户查询自己最近31天的活跃情况
RedisClient.expire(userActionKey, 31 * DateUtil.ONE_DAY_SECONDS);

// 更新当天和当月的活跃度排行榜
Double newAns = RedisClient.zIncrBy(todayRankKey, String.valueOf(userId), score);
RedisClient.zIncrBy(monthRankKey, String.valueOf(userId), score);
if (log.isDebugEnabled()) {
log.info("活跃度更新加分! key#field = {}#{}, add = {}, newScore = {}", todayRankKey, userId, score, newAns);
}
if (newAns <= score) {
// 日活跃榜单,保存31天;月活跃榜单,保存1年
RedisClient.expire(todayRankKey, 31 * DateUtil.ONE_DAY_SECONDS);
RedisClient.expire(monthRankKey, 12 * DateUtil.ONE_MONTH_SECONDS);
}
}
} else if (ans > 0) {
// 2.2 之前已经加过分,因此这次减分可以执行
if (score < 0) {
Boolean oldHave = RedisClient.hDel(userActionKey, field);
if (BooleanUtils.isTrue(oldHave)) {
Double newAns = RedisClient.zIncrBy(todayRankKey, String.valueOf(userId), score);
RedisClient.zIncrBy(monthRankKey, String.valueOf(userId), score);
if (log.isDebugEnabled()) {
log.info("活跃度更新减分! key#field = {}#{}, add = {}, newScore = {}", todayRankKey, userId, score, newAns);
}
}
}
}
}

1.3.3、触发活跃度更新

前面只是提供了一个增加活跃度的方法,但是什么时候调用它呢?

我们这里借助之前实现 Event/Listenter方式来处理活跃度更新。

文章/用户的相关操作事件监听,并更新对应的活跃度:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class UserActivityListener {
@Autowired
private UserActivityRankService userActivityRankService;

/**
* 用户操作行为,增加对应的积分
*
* @param msgEvent
*/
@EventListener(classes = NotifyMsgEvent.class)
@Async
public void notifyMsgListener(NotifyMsgEvent msgEvent) {
switch (msgEvent.getNotifyType()) {
case COMMENT:
case REPLY:
CommentDO comment = (CommentDO) msgEvent.getContent();
userActivityRankService.addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo()
.setRate(true).setArticleId(comment.getArticleId()));
break;
case COLLECT:
UserFootDO foot = (UserFootDO) msgEvent.getContent();
userActivityRankService.addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo().setCollect(true).setArticleId(foot.getDocumentId()));
break;
case CANCEL_COLLECT:
foot = (UserFootDO) msgEvent.getContent();
userActivityRankService.addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo().setCollect(false).setArticleId(foot.getDocumentId()));
break;
case PRAISE:
foot = (UserFootDO) msgEvent.getContent();
userActivityRankService.addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo().setPraise(true).setArticleId(foot.getDocumentId()));
break;
case CANCEL_PRAISE:
foot = (UserFootDO) msgEvent.getContent();
userActivityRankService.addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo().setPraise(false).setArticleId(foot.getDocumentId()));
break;
case FOLLOW:
UserRelationDO relation = (UserRelationDO) msgEvent.getContent();
userActivityRankService.addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo().setFollow(true).setArticleId(relation.getUserId()));
break;
case CANCEL_FOLLOW:
relation = (UserRelationDO) msgEvent.getContent();
userActivityRankService.addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo().setFollow(false).setArticleId(relation.getUserId()));
break;
default:
}
}

/**
* 发布文章,更新对应的积分
*
* @param event
*/
@Async
@EventListener(ArticleMsgEvent.class)
public void publishArticleListener(ArticleMsgEvent<ArticleDO> event) {
ArticleEventEnum type = event.getType();
if (type == ArticleEventEnum.ONLINE) {
userActivityRankService.addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo().setPublishArticle(true).setArticleId(event.getContent().getId()));
}
}

}

接下来,就是对应方法触发后事件更新了,包括ArticleMsgEvent、NotifyMsgEvent等。

img

另外,针对用户浏览页面的活跃度触发,我们在 Filte/Inteceptor 层实现,通过GlobalViewInterceptor的preHandle方法实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
if (handler instanceof HandlerMethod) {
HandlerMethod handlerMethod = (HandlerMethod) handler;
Permission permission = handlerMethod.getMethod().getAnnotation(Permission.class);
if (permission == null) {
permission = handlerMethod.getBeanType().getAnnotation(Permission.class);
}

if (permission == null || permission.role() == UserRole.ALL) {
if (ReqInfoContext.getReqInfo() != null) {
// 用户活跃度更新
SpringUtil.getBean(UserActivityRankService.class).addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo().setPath(ReqInfoContext.getReqInfo().getPath()));
}
return true;
}
}
return true;
}

1.3.4、排行榜查询

接下来就是将这个榜单展示给用户看。

基本流程如下:

1、从redis中获取topN的用户+评分

2、查询用户的信息

3、根据用户评分进行排序,并更新每个用户的排名

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public List<RankItemDTO> queryRankList(ActivityRankTimeEnum time, int size) {
String rankKey = time == ActivityRankTimeEnum.DAY ? todayRankKey() : monthRankKey();
// 1. 获取topN的活跃用户
List<ImmutablePair<String, Double>> rankList = RedisClient.zTopNScore(rankKey, size);
if (CollectionUtils.isEmpty(rankList)) {
return Collections.emptyList();
}

// 2. 查询用户对应的基本信息
// 构建userId -> 活跃评分的map映射,用于补齐用户信息
Map<Long, Integer> userScoreMap = rankList.stream().collect(Collectors.toMap(s -> Long.valueOf(s.getLeft()), s -> s.getRight().intValue()));
List<SimpleUserInfoDTO> users = userService.batchQuerySimpleUserInfo(userScoreMap.keySet());

// 3. 根据评分进行排序
List<RankItemDTO> rank = users.stream()
.map(user -> new RankItemDTO().setUser(user).setScore(userScoreMap.getOrDefault(user.getUserId(), 0)))
.sorted((o1, o2) -> Integer.compare(o2.getScore(), o1.getScore()))
.collect(Collectors.toList());

// 4. 补齐每个用户的排名
IntStream.range(0, rank.size()).forEach(i -> rank.get(i).setRank(i + 1));
return rank;
}

其中核心方法是Redis的zRangeWithScores,用以获取指定排名的用户和对应分数。

zrevrange 是Redis中的一个有序集合操作命令,用于按照分数从大到小的顺序获取有序集合中指定范围内的成员。

命令语法如下:
ZREVRANGE key start stop [WITHSCORES]
参数说明:

  • key :有序集合的键名。
  • start :指定范围的起始位置,从0开始计数,表示成员的排名。
  • stop :指定范围的结束位置,从0开始计数,表示成员的排名。
  • WITHSCORES (可选):如果提供了该参数,命令会返回成员和对应的分数,以一个成员和一个分数交替排列的方式返回结果。

示例:
假设有一个有序集合名为 myset ,包含以下成员和对应的分数:
“member1” -> 10
“member2” -> 20
“member3” -> 30
“member4” -> 40
“member5” -> 50
使用 ZREVRANGE myset 0 2 命令,将返回范围为0到2的成员:

  1. “member5”
  2. “member4”
  3. “member3”
    使用 ZREVRANGE myset 0 2 WITHSCORES 命令,将返回范围为0到2的成员和对应的分数:
  4. “member5”
  5. “50”
  6. “member4”
  7. “40”
  8. “member3”
  9. “30”
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static List<ImmutablePair<String, Double>> zTopNScore(String key, int n) {
return template.execute(new RedisCallback<List<ImmutablePair<String, Double>>>() {
@Override
public List<ImmutablePair<String, Double>> doInRedis(RedisConnection connection) throws DataAccessException {
Set<RedisZSetCommands.Tuple> set = connection.zRangeWithScores(keyBytes(key), -n, -1);
if (set == null) {
return Collections.emptyList();
}
return set.stream()
.map(tuple -> ImmutablePair.of(toObj(tuple.getValue(), String.class), tuple.getScore()))
.sorted((o1, o2) -> Double.compare(o2.getRight(), o1.getRight())).collect(Collectors.toList());
}
});
}


本站总访问量