索引:
基于list的实现方式基于publish/subscribe实战消息队列简介
消息队列:是消息的顺序集合。比如网站的PV统计和查看,传统方式就是每个页面发一个AJAX然后mysql给PV+1。用户量非常大的时候,没有办法实时插入PV。结合Redis消息队列的实现,也是每个用户访问的时候发送ajax到控制器,这个时候redis每次rpushpvlog,相当于直接往数组后面插入一万个行为,接下来用一个脚本运输处理pvlog,setpv查看时候getpv,如果想处理用户请求时间等等,同样可以这样异步处理。
常见场景和解决的问题:
应对流量峰值异步消费(不定速的插入,生产和匀速的处理,消费)解耦应用(不同来源的生产和同步去向的消费,基于publish/subscribe实现),即消息队列作为消息池,同时往里面写入的可能有多种数据,根据不同的场景来进行消费。redis实现消息队列原理使用redis实现的最主要优势是简单快捷,性能没有kafka高,但是安装简便,kafka性能高但是比较重,如果消息队列不是很多,比如说一个博客计算pv,那么kafka可能比整个项目还要大。
实现方式:
方法一:基于list的实现方式
Screen-Shot--04-06-at-7.01.54-PM.png
核心代码没有用消息队列的方式,使用incrBy大概上限在万
?php$redis=newRedis();$redis-connect(.0.0.1,);$res-select(0);$key=pv:index;//看一下是不是没有,如果没有的话就设置成0if(false===$redis-get($key)){$redis-set($key,0);}//如果有了就增加1注意incrBy的上线大概在万$redis-incrBy($key,1);`用list来实现
//用list消息队列实现$key=listpv:index;$redis-rPush($key,1);//后台的cron来实现//先连接redis$redis=newRedis();$redis-connect(.0.0.1,);$res-select(0);$key=pv:index;//用一个死循环while(true){if(false!==$redis-lPop($key)){$redis-incrBy(pv:index);}}//然后在terminal中无线循环这个脚本基于list来实现消息队列的特点:
与水库相似的地方:
水库的容量决定承载能力—redis的容量决定业务的承载能力每一滴随只可能经过一个闸门—每条消息只能被一个消费者消费与水库不同的地方:
水库用于蓄水—一般要把消息全部消费掉不要的随扔掉—处理失败的消息要做容错
方法二:基于publish/subscribe
Screen-Shot--04-06-at-7.53.22-PM.png
频道固定,生产者和消费者不固定,可能一对多,也可能多对一,也可能多对多。命令行的实现方式:
首先起两个rediscli,一个作为订阅,一个作为发布
订阅者:SUBSCRIBEchannel1channel2
发布者:PUBLISHchannel1helloChannelPUBLISHchannel2helloChannel2
如果这个时候发布到一个没有被订阅的channel,那么这条消息就会丢失。如果有多个订阅了同一个channel,但有信息发布到同一个channel的时候,他们都会受到代码实现:发布者:
?php//发布者$redis=newRedis();$redis-connect(.0.0.1,);$res=$redis-publish(c1,helloc1);echoclientsreadingc1:{$res}\n;$res=$redis-publish(c2,helloc2);echoclientsreadingc2:{$res}\n;$res=$redis-publish(c3,helloc3);echoclientsreadingc3:{$res}\n;监听者:
?php$redis=newRedis();$redis-connect(.0.0.1,);//超时控制$redis-setOption(Redis::OPT_READ_TIMEOUT,-1);//订阅$redis-subscribe([c1,c2],function(Redis$instance,$channel,$message){echoreceivedmessageform{$channel}:{$message}\n;})```php###实战部分-生成内容页的质量分:实现三个功能:-统计首页、列表页、内容页的PV-统计浏览时间超过5s的内容页-内容页的PV+1分,浏览时间超过5s+5分,不超过5秒-1分,生成内容页的质量分前端部分:```javascriptscript//ajax访问ajax.php,给内容页增加PV$get(ajax.php?action=pvfrom=articleaid=?=$aid?);//如果页面打开时间超过5秒,则发出统计setTimeout(function(){$.get(ajax.php?action=get5aid=?=$aid?);},);/script后端部分:发布的实现:
?php$action=$_GET[action];$redis=newRedis();$redis-connect(.0.0.1,);//首页的PV$channelPvIndex=pv:index;//内容页的pv$channelPvList=pv:list;//内容页的PV$channelPvArticle=pv:article;//内容页浏览超过5秒$channelGT5=gt5:article;if(pv===$action){$from=$_GET[from];if(index===$from){$redis-publish($channelPVIndex,1);}elseif(list===$from){$tid=intval($_GET[tid]);$redis-publish($channelPvList,$tid);}elseif(articel===$_GET[aid]){$aid=intval($_GET[aid]);$redis-publish($channelPvArticle,$aid);}}elseif(gt5==$action){$aid=intval($GET[aid]){$redis-publish($channelGT5,$aid);}}else{//unknownaction}订阅的实现:
?php//订阅的实现$redis=newRedis();$redis-connect(.0.0.1,);//首页的PV$channelPvIndex=pv:index;//内容页的pv$channelPvList=pv:list;//内容页的PV$channelPvArticle=pv:article;//内容页浏览超过5秒$channelGT5=gt5:article;//频道和PV的key的映射$keyMap=[$channelPVIndex=realtimepv:index,$channelPvList=realtimepv:list,$channelPvArticle=realtimepv:article];$redis-setOption(Redis::OPT_READ_TIMEOUT,-1);$redis-subscribe([$channelPVIndex,$channelPvList,$channelPvArticle,$channelGT5],function(Redis$instance,$channel,$message){//注意在subscribe的回调中只能够执行订阅、取消订阅、模式订阅、模式取消订阅,无法执行incrBy//尝试取消订阅命令(证明上面一句话)//$instance-unsubscribe([$channelName]);//因此要想incryBy只能重新实例化一个redis$redis2=newRedis();$redis2-connect(.0.0.1,);global$keyMap;//这里可以使用闭包实现if(!isset($$keyMap[$channelName])){$realTimePvKey=$keyMap[$channelName];//映射过来$redis2-incrBy($realTimePvKey,1);}})注意在subscribe的回调中只能够执行订阅、取消订阅、模式订阅、模式取消订阅,无法执行incrBy,因此要想incryBy只能重新实例化一个redis
计算质量分:
?php//订阅的实现$redis=newRedis();$redis-connect(.0.0.1,);//内容页的PV$channelPvArticle=pv:article;//内容页浏览超过5秒$channelGT5=gt5:article;$redis-setOption(Redis::OPT_READ_TIMEOUT,-1);$redis-subscribe([$channelPVIndex,$channelPvList,$channelPvArticle,$channelGT5],function(Redis$instance,$channel,$message){/***使用pv和gt5的数量计算文章的质量分*1.如何计算?=gt5*6*2.以什么形式保存?HASH*gt5:int*score:int*/$redis2=newRedis();$redis2-connect(.0.0.1,);global$keyMap;//这里可以使用闭包实现if(gt5:article===$channelName){echo${channelName}\n;$key=realtimescore:.intval($message);$res=$redis2-hIncrBy($keymgt5,1);echo${channelName}\n;if($res){$score=$res*6;$redis2-hSet($key,score,$score);echo{$score}\n;}else{//报警}}})本人文章均为原创,转载请注明出处。