时间:2021-7-24来源:本站原创作者:佚名
头部白癜风的治疗 https://m-mip.39.net/nk/mipso_4595402.html

Yar支持HTTP和TCP俩种Transporter,HTTP的是基于CURL,PHP中的Yar默认就是走的HTTPTransporter,这个大家应该都不陌生,但是基于TCP的,可能大家会用的少一些。

事实上,我6年前也写过一个C的Yarserver框架,叫做Yar-c,代码地址在Yar-CatGithub,它提供了服务启动,worker进程管理,Yar打包协议等。当时我们用这个框架,实现了高性能的微博白名单等服务,以供PHP端使用YarClient来调用。

只不过,YarC需要用C来写Handle,可能对于不少PHPer来说,会稍微有点陌生,那今天我们尝试用PHP来写一个TCP的Server,来介绍下如何实现对YarRPC协议的处理,这个例子可以方便的结合Swoole等异步PHP框架,实现一个高性能的YarTCPServer。这个过程中,会让大家了解Yar的RPC通信协议,以及捎带了解下Socket编程。

我们今天还是用“白名单”服务作为例子,我们提供一个接口,接受RPC客户端的请求,参数是一个用户ID,返回bool,表示是否在白名单:

functionquery(intid):bool;

首先,我们建立一个文件yar_server,为了方便的直接执行,我们在文件写下:

#!/bin/envphp7?phpclassWhiteList{}

然后,通过chmoda+x给这个文件增加可执行的权限。

第一步我们需要处理服务的启动参数处理,接受一个参数S表示要监听的IP和端口,值的格式是host:port,我们使用PHP的getopt函数来处理命令行参数:

classWhiteList{protectedhost;publicfunction__construct(){options=getOpt("S:");if(!isset(options["S"])){this-usage();}}protectedfunctionusage(){exit("Usage:yar_server-Shostname:port\n");}}

这样,当用户启动yar_server的时候,没有指定S参数,我们就退出,并提示Usage。我们还需要另外一个配置,就是指向一个词表文件,词表文件中每一行是一个在白名单中的用户ID,我们用F表示:

classWhiteList{protectedhost;protecteddicts;publicfunction__construct(){options=getOpt("S:F:");if(!isset(options["S"])

!isset(options["F"])){this-usage();}this-host=options["S"];this-dicts=options["F"];}protectedfunctionusage(){exit("Usage:yar_server-Fpath_to_dict-Shostname:port\n");}}

好了,现在启动参数处理完成,当然为了简单,我省去了对输入参数的有效性检查。

接下来,我们需要完成俩个函数,第一个是读取-F指定的词表文件,把所有的用户ID读入到一个数组中,因为我们的这个服务会是常驻进行,所以不用担心性能,它只会在启动阶段处理这个词表文件:

protectedfunctionloadDict(){this-ids=array();fp=fopen(this-dicts,"r");while(!feof(fp)){line=trim(fgets(fp));if(line){this-ids[line]=true;}}fclose(fp);echo"Loadingdictsuccessfully,",count(this-ids),"loaded\n";returnthis;}

因为用户ID是整型,所以我们把它当作Hashtable的key,这样在将来查找的时候,使用isset会非常高效。需要注意的是因为文件处理不是我们今天要讲的重点,也就省去了对文件存在行,可读性,合法性的检查。

好了,接下来是重点了,我们要启动一个IPV4TCPSocket服务,监听在host指定的地方,为了方便大家了解SocketAPI,我们不采用PHP的Stream系列函数,而是采用PHP直接包装的Socket系列API,首先我们用socket_create创建一个Socket套接字:

protectedfunctionlisten(){socket=socket_create(AF_INET,SOCK_STREAM,SOL_TCP);if(socket==false){thrownewException("socket_create()failed:reason:".socket_strerror(socket_last_error()));}}

然后,我们需要使用socket_bind绑定这个Socket到我们需要监听的地址,并且使用socket_listen来监听请求:

protectedfunctionlisten(){socket=socket_create(AF_INET,SOCK_STREAM,SOL_TCP);if(socket==false){thrownewException("socket_create()failed:reason:".socket_strerror(socket_last_error()));}list(hostname,port)=explode(":",this-host);if(socket_bind(socket,hostname,port)==false){thrownewException("socket_bind()failed:reason:".socket_strerror(socket_last_error()));}if(socket_listen(socket,64)===false){thrownewException("socket_listen()failed:reason:".socket_strerror(socket_last_error()));}echo"StartingYar_Serverat{this-host}\nPresssCtrl+Ctoquit\n";this-socket=socket;returnthis;}

好了,如果一切没问题,接下来我们就可以socket_accept来监听请求了,默认的socket是阻塞模式,如果没有请求,进程会一直阻塞等待,对于高性能的服务来说,最好采用非阻塞+select或者epoll的模式来同时处理多个请求,但是我们的这个例子主要是为了介绍Yar的协议,所以还是采用简单的阻塞模式。

接下来,我们来编写真正的RPC处理部分,首先我们通过accept接受一个请求,然后读取请求的的内容,分析请求头中的YarRPCHeader信息,YarRPC的协议头定义如下:

typedefstruct_yar_header{uint32_tid;//transactioniduint16_tversion;//protoclversionuint32_tmagic_num;//defaultis:0x80DFEC60uint32_treserved;unsignedcharprovider[32];//reqeustfromwhounsignedchartoken[32];//requesttoken,usedforauthenticationuint32_tbody_len;//requestbodylen}

其中,magic_num是用来验证请求有效性的一个特殊值,合法的YarRPC请求都会设置这个值为0x80DFEC60(我很想告诉你为啥是这个值,但我真不记得当时我为啥用这个数字了),这个头部是82个字节,可能有同学会问,不对啊一看这个Struct不应该是82啊,那是因为头部申明的时候采用pack模式,也就是不对齐,所以确实是82个字节.

这里有一个需要注意的是,0x80DFEC60如果你是自在32位的系统上的话,这个值超过了PHP的最大有符号整数的表示范围,类似我之前的这篇文章介绍的PHP_INT_MIN,PHP会自动转换成浮点数,所以如果是在32位系统上,你不能直接定义0x80DFEC60,而是需要这么写来定义这个值:

pack("H*","80DFEC60");

provider是一个字符串,标明了客户端的名字,比如对于Yar扩展的Yar_Client就是"YarPHPCient-x.x.x"。

token在设计的最初是为了做APIkey验证的,但是后来没用上,因为大部分都是内网应用,可以有多种办法来保证请求来源的合法性。

id是一个唯一请求id,这个是为了排查请求问题的,version默认为0,或者1,目前我没有升级过协议头,所以这个暂时我们也不用关心,reserved可以用来传递一些请求参数,比如客户端可以说明是否保持连接。

body_len是我们需要关心的,这个字段表明了这次请求,请求体一共多大(不包括Yar协议头部)。

所有的这些数字,都是以网络字节序传递的,我们采用PHP处理二进制流的unpack函数来解析读取进来的二进制流:

protectedfunctionparseHeader(header){returnunpack("Nid/nversion/Nmagic_num/Nreserved/A32provider/A32token/Nbody_len",header);}

这个函数会返回一个上面说到的头部结构体的数组。

对应的我们也需要使用pack来实现生成YarHeader的方法:

constYAR_MAGIC_NUM=0x80DFEC60;protectedfunctiongenHeader(id,len){bin=pack("NnNNA32A32N",id,0,self::YAR_MAGIC_NUM,0,"YarPHPTCPServer","",len);returnbin;}

如刚才说的,我们需要在接受一个请求以前,验证请求的合法性:

protectedfunctionvalidRequest(header){if(header["magic_num"]!=self::YAR_MAGIC_NUM){returnfalse;}returntrue;}

所以大概请求的处理整个逻辑框架是:

protectedfunctionaccept(){while((conn=socket_accept(this-socket))){buf=socket_read(conn,self::HEADER_SIZE,PHP_BINARY_READ);if(buf===false){socket_shutdown(conn);continue;}if(!this-validHeader(header=this-parseHeader(buf))){output=this-response(1,"illegalYarRPCrequest");gotoresponse;}buf=socket_read(conn,header["body_len"],PHP_BINARY_READ);if(buf===false){output=this-response(1,"insufficientrequestbody");gotoresponse;}if(!this-validPackager(buf)){output=this-response(1,"unsupportedpackager");gotoresponse;}buf=substr(buf,8);/*跳过打包信息的8个字节*/request=this-parseRequest(buf);if(request==false){this-response(1,"malformedrequestbody");gotoresponse;}status=this-handle(request,ret);output=this-response(status,ret);response:socket_write(conn,output,strlen(output));socket_shutdown(conn);/*关闭写*/}}

现在整体的框架就算完成了,我们需要完成handle,response方法就可以了,handle是要根据用户的请求中的m,来调用指定的方法。

protectedfunctionhandle(request,ret){if(request["m"]=="query"){ret=this-query(...request["p"]);}else{ret="unsupportedmethod".request["m"]."";return1;}return0;}

现在来实现query方法本身,这个会很简单,就检查下id是不是在白名单数组:

protectedfunctionquery(id){returnisset(this-ids[id]);}

好了,接下来我们要完成response方法,这个方法是打包一个符合Yar协议的返回体,包括82个字节的头部,8个字节的打包信息,以及序列化后的响应体,我们需要根据status不同,来选择设置响应体中的r还是e字段:

protectedfunctionresponse(status,ret){body=array();body["i"]=0;body["s"]=status;if(status==0){body["r"]=ret;}else{body["e"]=ret;}packed=serialize(body);header=this-genHeader(0,strlen(packed)+8);returnheader.str_pad("PHP",8,"\0").packed;}

好了,马上就要大功告成了,我们最后完成启动方法和析构函数(关闭socket):

publicfunctionrun(){this-loadDict()-listen()-accept();}publicfunction__destruct(){if(this-socket){socket_close(this-socket);}}

现在一切就绪,我们最后在文件末尾加入:

(newWhitelist)-run();

在测试之前,我们先准备一个测试词表,比如1到的id:

seq110user_id.dict

然后启动服务,监听在本机的端口:

./yar_server-Fuser_id.dict-S.0.0.1:Loadingdictsuccessfully,loadedStartingYar_Serverat.0.0.1:PresssCtrl+Ctoquit

不错,服务启动成功,然后我们使用Yar扩展来编写客户端(你需要首先安装好Yar扩展),测试下用户id和99的调用效果:

?phpyar=newYar_Client("tcp://.0.0.1:");var_dump(yar-query(""));var_dump(yar-query("99"));?

和调用HTTP的Yar服务不同,此处我们应该使用tcp://做地址头,表示这是一个TCP的服务。

来,运行一下看看:

php7client.phpbool(true)bool(false)

看起来不错,符合预期!

你也可以尝试故意构造一些错误的可能,比如调用不存在的方法之类的,来看看服务器的反应,这个例子的代码你可以在这里找到。

到这里我就算介绍完了如何采用PHP来编写Yar的TCP服务,大家应该可以很方便的把这个例子修改完善成自己希望的格式,或者嵌入Swoole。

还是要再次说明,因为本文的主要目的是为了介绍YarRPC通信协议,所以在服务管理这块并没有做的很完善,比如socket_accept,socket_read/write等都默认采用了阻塞模式,也没有加入超时设计,服务进程也只有一个,这个如果真的想用做实际服务的话,还是需要一些功课的,不过我相信你有兴趣的话,都是可以搞定的。:)

当然,最简单的是,你可以直接使用Yar-C服务框架来编写CYarTCP服务。

在这里也有一个Yar-CServer的例子yar_serverinC.

enjoy!

来源:
转载请注明原文网址:http://www.coolofsoul.com/jsyy/jsyy/22628.html
------分隔线----------------------------