基于Redis的基本数据结构和业务逻辑设计实现用户活跃排行榜。
1.1、场景说明
技术派中,设计了一个社区用户的活跃排行榜,包括日榜和月榜。
用户活跃度计算方式:
- 用户每访问一个新的页面, +1分
- 对于一篇文章,点赞、收藏, +2分;取消点赞、取消收藏,将之前的活跃分收回。
- 文章评论, +3分
- 发布一篇审核通过的文章, +10分
榜单:
展示活跃度最高的前三十名用户。
效果如下:
1.2、方案设计
使用Redis的ZSet数据结构实现,以下是ZSet的简介:
Redis的ZSet(有序集合)是一种有序的、唯一的数据结构。它类似于Set,但每个元素都关联着一个分数(score),用于进行排序。
ZSet的特点包括:
- 有序性:ZSet中的元素按照分数进行排序,可以根据分数进行范围查询、区间获取等操作。
- 唯一性:ZSet中的元素是唯一的,不会存在重复的元素。
- 快速的插入和删除:ZSet使用了跳跃表(Skip List)和哈希表(Hash Table)的结合体,使得插入和删除操作的时间复杂度为O(log N)。
- 高效的查找:通过索引和跳跃表的特性,可以在O(log N)的时间复杂度内查找某个元素。
ZSet常用的操作包括:
- ZADD:向ZSet中添加一个元素,同时指定其分数。
- ZREM:从ZSet中移除一个元素。
- ZRANGE:按照分数的顺序,获取指定范围内的元素。
- ZSCORE:获取指定元素的分数。
- ZINCRBY:将指定元素的分数增加一个特定的值。
ZSet广泛应用于排行榜、计分系统、排行榜、时间轴等场景,提供了高效的排序和检索功能。
1.3、排行榜实现
1.3.1、业务实体设计
我们先实现一个更新用户活跃的方法,首先定义一个涵盖该业务场景的参数传递实体 ActivityScoreBo,记录用户活动(是否访问页面、点赞、收藏、评论、关注、发布文章)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| @Data @Accessors(chain = true) public class ActivityScoreBo {
private String path;
private Long articleId;
private Boolean rate;
private Boolean praise;
private Boolean collect;
private Boolean publishArticle;
private Long followedUserId;
private Boolean follow; }
|
有了业务实体,进一步我们需要计算活跃度。活跃度包括日榜和月榜,如下为对应的key生成。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
|
private String todayRankKey() { return ACTIVITY_SCORE_KEY + DateUtil.format(DateTimeFormatter.ofPattern("yyyyMMdd"), System.currentTimeMillis()); }
private String monthRankKey() { return ACTIVITY_SCORE_KEY + DateUtil.format(DateTimeFormatter.ofPattern("yyyyMM"), System.currentTimeMillis()); }
|
1.3.2、计算活跃度
(1)根据传入的业务实体,判断文章浏览、点赞、收藏、关注、排列、发文等行为,并计算对应的字段field和分数score,不同行为的加分机制见上述场景说明。
(2)接着,根据用户ID和当天日期(日为单位)生成一个唯一的活跃度信息的键值(userActionKey)。
(3)通过RedisClient从Redis中获取该userActionKey键对应的field值(ans)。
(4)如果ans为null,说明之前没有加分记录,执行加分操作:
- 如果加分数大于0,将加分记录保存到Redis的hash结构中,并设置有效期为一个月。/注:【加分记录使用Redis的Hash结构存储,key为userActionKey,字段为field,value为分数score】
- 更新当天和当月的活跃度排行榜,使用Redis的zIncrBy函数。
- 如果新的活跃度得分大于等于加分数,更新日活跃榜单和月活跃榜单的有效期。
(5)如果ans大于0,说明之前该field已经加过分,继续判断:
- 如果分数小于0,说明是减分行为,应从Redis中删除加分记录。
- 更新当天和当月的活跃度排行榜。
幂等策略:
在上述加分操作中,为了防止重复加活跃度,我们做了一个幂等操作。
就是将用户的每个加分项,都记录下来,在执行具体加分时,基于此来做幂等判定 。
因此,我们对每个用户维护一个活跃更新的操作历史记录表,保存在redis的hash数据结构中,每天一个记录。
1 2 3
| key: activity_rank_{user_id}_{年月日} field: 活跃度更新key value: 添加的活跃度
|
思考:
1、事务问题:虽然单个redis操作是原子性的,但多次的redis操作,存在事务问题。
2、并发问题:没有做并发,幂等无法100%生效,依然可能存在重复添加/扣减活跃度的情况
问题一:
通过最终一致性(Eventual Consistency)来解决多次Redis操作的事务问题是一种常见的方法。最终一致性是指在分布式系统中,经过一段时间后,系统的所有副本最终会达到一致的状态。
在Redis中,可以使用以下方法来实现最终一致性:
- 批量操作:将多个操作组合成一个批量操作,减少网络往返的次数。例如,使用管道(Pipeline)来发送多个命令,然后一次性获取它们的响应。
- 异步操作:将操作异步化,即将操作放入消息队列或任务队列中,由后台线程或其他服务异步处理。这样可以避免阻塞主线程,并允许操作在不同的时间点执行。
- 回滚机制:在执行操作之前,先将相关数据备份或记录下来。如果操作失败,可以使用备份数据进行回滚操作。
- 重试机制:如果某个操作失败,可以进行重试,直到操作成功或达到最大重试次数。
- 业务层面的补偿机制:如果操作失败,可以通过业务逻辑来进行补偿操作,以达到一致性。
需要注意的是,最终一致性并不能提供强一致性的保证,因此在某些场景下可能会出现数据不一致的情况。在选择使用最终一致性来解决事务问题时,需要根据具体的业务需求和数据一致性要求来评估和权衡。
问题二:
通过加锁解决并发问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
| public void addActivityScore(Long userId, ActivityScoreBo activityScore) { if (userId == null) { return; }
String field; int score = 0; if (activityScore.getPath() != null) { field = "path_" + activityScore.getPath(); score = 1; } else if (activityScore.getArticleId() != null) { field = activityScore.getArticleId() + "_"; if (activityScore.getPraise() != null) { field += "praise"; score = BooleanUtils.isTrue(activityScore.getPraise()) ? 2 : -2; } else if (activityScore.getCollect() != null) { field += "collect"; score = BooleanUtils.isTrue(activityScore.getCollect()) ? 2 : -2; } else if (activityScore.getRate() != null) { field += "rate"; score = BooleanUtils.isTrue(activityScore.getRate()) ? 3 : -3; } else if (BooleanUtils.isTrue(activityScore.getPublishArticle())) { field += "publish"; score += 10; } } else if (activityScore.getFollowedUserId() != null) { field = activityScore.getFollowedUserId() + "_follow"; score = BooleanUtils.isTrue(activityScore.getFollow()) ? 2 : -2; } else { return; }
final String todayRankKey = todayRankKey(); final String monthRankKey = monthRankKey(); final String userActionKey = ACTIVITY_SCORE_KEY + userId + DateUtil.format(DateTimeFormatter.ofPattern("yyyyMMdd"), System.currentTimeMillis()); Integer ans = RedisClient.hGet(userActionKey, field, Integer.class); if (ans == null) { if (score > 0) { RedisClient.hSet(userActionKey, field, score); RedisClient.expire(userActionKey, 31 * DateUtil.ONE_DAY_SECONDS);
Double newAns = RedisClient.zIncrBy(todayRankKey, String.valueOf(userId), score); RedisClient.zIncrBy(monthRankKey, String.valueOf(userId), score); if (log.isDebugEnabled()) { log.info("活跃度更新加分! key#field = {}#{}, add = {}, newScore = {}", todayRankKey, userId, score, newAns); } if (newAns <= score) { RedisClient.expire(todayRankKey, 31 * DateUtil.ONE_DAY_SECONDS); RedisClient.expire(monthRankKey, 12 * DateUtil.ONE_MONTH_SECONDS); } } } else if (ans > 0) { if (score < 0) { Boolean oldHave = RedisClient.hDel(userActionKey, field); if (BooleanUtils.isTrue(oldHave)) { Double newAns = RedisClient.zIncrBy(todayRankKey, String.valueOf(userId), score); RedisClient.zIncrBy(monthRankKey, String.valueOf(userId), score); if (log.isDebugEnabled()) { log.info("活跃度更新减分! key#field = {}#{}, add = {}, newScore = {}", todayRankKey, userId, score, newAns); } } } } }
|
1.3.3、触发活跃度更新
前面只是提供了一个增加活跃度的方法,但是什么时候调用它呢?
我们这里借助之前实现 Event/Listenter方式来处理活跃度更新。
文章/用户的相关操作事件监听,并更新对应的活跃度:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| public class UserActivityListener { @Autowired private UserActivityRankService userActivityRankService;
@EventListener(classes = NotifyMsgEvent.class) @Async public void notifyMsgListener(NotifyMsgEvent msgEvent) { switch (msgEvent.getNotifyType()) { case COMMENT: case REPLY: CommentDO comment = (CommentDO) msgEvent.getContent(); userActivityRankService.addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo() .setRate(true).setArticleId(comment.getArticleId())); break; case COLLECT: UserFootDO foot = (UserFootDO) msgEvent.getContent(); userActivityRankService.addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo().setCollect(true).setArticleId(foot.getDocumentId())); break; case CANCEL_COLLECT: foot = (UserFootDO) msgEvent.getContent(); userActivityRankService.addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo().setCollect(false).setArticleId(foot.getDocumentId())); break; case PRAISE: foot = (UserFootDO) msgEvent.getContent(); userActivityRankService.addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo().setPraise(true).setArticleId(foot.getDocumentId())); break; case CANCEL_PRAISE: foot = (UserFootDO) msgEvent.getContent(); userActivityRankService.addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo().setPraise(false).setArticleId(foot.getDocumentId())); break; case FOLLOW: UserRelationDO relation = (UserRelationDO) msgEvent.getContent(); userActivityRankService.addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo().setFollow(true).setArticleId(relation.getUserId())); break; case CANCEL_FOLLOW: relation = (UserRelationDO) msgEvent.getContent(); userActivityRankService.addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo().setFollow(false).setArticleId(relation.getUserId())); break; default: } }
@Async @EventListener(ArticleMsgEvent.class) public void publishArticleListener(ArticleMsgEvent<ArticleDO> event) { ArticleEventEnum type = event.getType(); if (type == ArticleEventEnum.ONLINE) { userActivityRankService.addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo().setPublishArticle(true).setArticleId(event.getContent().getId())); } }
}
|
接下来,就是对应方法触发后事件更新了,包括ArticleMsgEvent、NotifyMsgEvent等。
另外,针对用户浏览页面的活跃度触发,我们在 Filte/Inteceptor 层实现,通过GlobalViewInterceptor的preHandle方法实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { if (handler instanceof HandlerMethod) { HandlerMethod handlerMethod = (HandlerMethod) handler; Permission permission = handlerMethod.getMethod().getAnnotation(Permission.class); if (permission == null) { permission = handlerMethod.getBeanType().getAnnotation(Permission.class); }
if (permission == null || permission.role() == UserRole.ALL) { if (ReqInfoContext.getReqInfo() != null) { SpringUtil.getBean(UserActivityRankService.class).addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo().setPath(ReqInfoContext.getReqInfo().getPath())); } return true; } } return true; }
|
1.3.4、排行榜查询
接下来就是将这个榜单展示给用户看。
基本流程如下:
1、从redis中获取topN的用户+评分
2、查询用户的信息
3、根据用户评分进行排序,并更新每个用户的排名
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public List<RankItemDTO> queryRankList(ActivityRankTimeEnum time, int size) { String rankKey = time == ActivityRankTimeEnum.DAY ? todayRankKey() : monthRankKey(); List<ImmutablePair<String, Double>> rankList = RedisClient.zTopNScore(rankKey, size); if (CollectionUtils.isEmpty(rankList)) { return Collections.emptyList(); }
Map<Long, Integer> userScoreMap = rankList.stream().collect(Collectors.toMap(s -> Long.valueOf(s.getLeft()), s -> s.getRight().intValue())); List<SimpleUserInfoDTO> users = userService.batchQuerySimpleUserInfo(userScoreMap.keySet());
List<RankItemDTO> rank = users.stream() .map(user -> new RankItemDTO().setUser(user).setScore(userScoreMap.getOrDefault(user.getUserId(), 0))) .sorted((o1, o2) -> Integer.compare(o2.getScore(), o1.getScore())) .collect(Collectors.toList());
IntStream.range(0, rank.size()).forEach(i -> rank.get(i).setRank(i + 1)); return rank; }
|
其中核心方法是Redis的zRangeWithScores
,用以获取指定排名的用户和对应分数。
zrevrange
是Redis中的一个有序集合操作命令,用于按照分数从大到小的顺序获取有序集合中指定范围内的成员。
命令语法如下:
ZREVRANGE key start stop [WITHSCORES]
参数说明:
key
:有序集合的键名。
start
:指定范围的起始位置,从0开始计数,表示成员的排名。
stop
:指定范围的结束位置,从0开始计数,表示成员的排名。
WITHSCORES
(可选):如果提供了该参数,命令会返回成员和对应的分数,以一个成员和一个分数交替排列的方式返回结果。
示例:
假设有一个有序集合名为 myset
,包含以下成员和对应的分数:
“member1” -> 10
“member2” -> 20
“member3” -> 30
“member4” -> 40
“member5” -> 50
使用 ZREVRANGE myset 0 2
命令,将返回范围为0到2的成员:
- “member5”
- “member4”
- “member3”
使用 ZREVRANGE myset 0 2 WITHSCORES
命令,将返回范围为0到2的成员和对应的分数:
- “member5”
- “50”
- “member4”
- “40”
- “member3”
- “30”
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public static List<ImmutablePair<String, Double>> zTopNScore(String key, int n) { return template.execute(new RedisCallback<List<ImmutablePair<String, Double>>>() { @Override public List<ImmutablePair<String, Double>> doInRedis(RedisConnection connection) throws DataAccessException { Set<RedisZSetCommands.Tuple> set = connection.zRangeWithScores(keyBytes(key), -n, -1); if (set == null) { return Collections.emptyList(); } return set.stream() .map(tuple -> ImmutablePair.of(toObj(tuple.getValue(), String.class), tuple.getScore())) .sorted((o1, o2) -> Double.compare(o2.getRight(), o1.getRight())).collect(Collectors.toList()); } }); }
|