iclientpy.rest.cmd.wxreceiver.server 源代码

from http.server import HTTPServer,BaseHTTPRequestHandler,HTTPStatus
from typing import List,Callable
from urllib import parse
from .WXBizMsgCrypt import WXBizMsgCrypt
import xml.etree.ElementTree as ET
import logging
import time


logger = logging.Logger(__name__)

_response_msg_format = '''<xml>
<ToUserName><![CDATA[{ToUserName}]]></ToUserName>
<FromUserName><![CDATA[{sCorpID}]]></FromUserName>
<CreateTime>{CreateTime}</CreateTime>
<MsgType><![CDATA[text]]></MsgType>
<Content><![CDATA[{content}]]></Content>
<MsgId>1234567890123456</MsgId><AgentID>1</AgentID></xml>'''


def _query_values(path:str, param_names:List[str]):
    kvps =  dict(parse.parse_qsl(parse.urlsplit(path).query))
    return tuple([kvps[name] for name in param_names])

from abc import abstractmethod

[文档]class WXHookReceiver(BaseHTTPRequestHandler):
[文档] @abstractmethod def get_wxcrypt(self) -> WXBizMsgCrypt: pass
[文档] @abstractmethod def get_sCorpID(self) -> str: pass
[文档] @abstractmethod def get_msg_fun(self) -> Callable[[str, str], str]: pass
[文档] def call_msg_fun(self, *args, **kwargs): try: return self.get_msg_fun()(*args, **kwargs) except: logger.exception('unknown error.') return 'Error'
[文档] def send_wx_msg(self,msg:str, to_user_name, sReqNonce): sRespData = _response_msg_format.format(content = msg, ToUserName = to_user_name, sCorpID = self.get_sCorpID(), CreateTime=str(int(time.time()))) ret, msg = self.get_wxcrypt().EncryptMsg(sRespData, sReqNonce) if ret != 0: logger.warning('EncryptMsg failed:' + str(ret)) self.finish() return self.send(msg)
[文档] def send(self, msg: str, status = HTTPStatus.OK): byte_content = bytes(msg, encoding='utf-8') self.send_response(status) self.send_header('Content-Length', str(len(byte_content))) self.send_header('Content-Type', 'text/plain; charset=utf-8') self.end_headers() self.wfile.write(byte_content)
[文档] def do_GET(self): sVerifyMsgSig, sVerifyTimeStamp, sVerifyNonce, sVerifyEchoStr = _query_values(self.path, ['msg_signature', 'timestamp', 'nonce', 'echostr']) ret, sEchoStr = self.get_wxcrypt().VerifyURL(sVerifyMsgSig, sVerifyTimeStamp, sVerifyNonce, sVerifyEchoStr) if ret == 0: self.send(str(sEchoStr, 'utf8')) else: logger.warning('VerifyURL failed:' + str(ret)) self.finish()
[文档] def do_POST(self): sReqMsgSig, sReqTimeStamp, sReqNonce = _query_values(self.path, ['msg_signature', 'timestamp', 'nonce']) sReqData = self.rfile.read(int(self.headers.get('Content-Length', 0))).decode('utf-8') ret, sMsg = self.get_wxcrypt().DecryptMsg(sReqData, sReqMsgSig, sReqTimeStamp, sReqNonce) if ret != 0: logger.warning('DecryptMsg failed:' + str(ret)) self.finish() return sMsg = str(sMsg, 'utf-8') xml_tree = ET.fromstring(sMsg) content = xml_tree.find("Content").text # type:str from_user_name = xml_tree.find('FromUserName').text self.send_wx_msg(self.call_msg_fun(content, from_user_name), from_user_name, sReqNonce)
[文档]def start_server(msg_fun: Callable[[str, str], str], sToken:str, sEncodingAESKey:str, sCorpID:str, port:int , bind_ip: str = '0.0.0.0', server_kls = HTTPServer): wxcrypt = WXBizMsgCrypt(sToken, sEncodingAESKey, sCorpID) class Handler(WXHookReceiver): def get_msg_fun(self): return msg_fun def get_sCorpID(self): return sCorpID def get_wxcrypt(self): return wxcrypt Handler.protocol_version = "HTTP/1.1" with server_kls((bind_ip, int(port)), Handler) as httpd: httpd.serve_forever()