当前位置: 首页 > 工具软件 > wechatpy > 使用案例 >

python3.6 Django 微信 OAuth 网页授权接入 Wechatpy 框架

范承教
2023-12-01

公众号 OAuth 网页授权接入(wechatpy)
微信网页授权(官方文档)
基本配置

wechatconfig.py

import json
import time

import requests
from wechatpy import WeChatClient
from wechatpy.client.api.jsapi import WeChatJSAPI
from wechatpy.client.api.media import WeChatMedia
from wechatpy.oauth import WeChatOAuth
from wechatpy.utils import random_string, to_text
from wx_libs.sign import Sign

基本配置

AppSecret = ""
TOKEN = ""
EncodingAESKey = ""
AppId = ""
DOMAIN = ""  
WechatJSConfig = '''
wx.config({  
    debug: false,
    appId: \'\',
    timestamp: {{timestamp}},
    nonceStr: \'{{nonceStr}}\',
    signature: \'{{signature}}\',
    jsApiList:[  # 前后端分离直接 放前端就好
    \'chooseImage\',
    \'previewImage\',
    \'uploadImage\',
    \'downloadImage\',
    \'onMenuShareTimeline\',
    \'chooseWXPay\',
    \'onMenuShareAppMessage\',
    \'checkJsApi\',
    \'translateVoice\',
    \'startRecord\',
    \'stopRecord\',
    \'onVoiceRecordEnd\',
    \'playVoice\',
    \'onVoicePlayEnd\',
    \'pauseVoice\',
    \'stopVoice\',
    \'uploadVoice\',
    \'downloadVoice\',
    \'getNetworkType\',
    \'openLocation\',
    \'getLocation\',
    \'hideOptionMenu\',
    \'showOptionMenu\',
    \'closeWindow\',
    \'hideMenuItems\',
    \'showMenuItems\',
    \'hideAllNonBaseMenuItem\',
    \'showAllNonBaseMenuItem\',
    ]
});
'''
ApiKey = ""
MchID = ""
NotifyUrl = ""

# client = WeChatClient(AppId, AppSecret)  # 创建微信客户端对象  放在每个方法中,意在只在调用特定的接口才创建客户端
# jsApiClient = WeChatJSAPI(client=client)  # 获取微信 JS-SDK ticket
# ticket = jsApiClient.get_jsapi_ticket()

# 获取用户信息  微信公众平台 OAuth 网页授权
oauthClient = WeChatOAuth(app_id=AppId,
                          secret=AppSecret,
                          redirect_uri=DOMAIN)


def get_mediaApiClient():
    """
    :上传临时素材
    :return:
    """
    client = Client()  # 创建微信客户端对象
    mediaApiClient = WeChatMedia(client=client)
    return mediaApiClient


def get_wechat_config(request):  # js 授权 返回函数值(前端) 像什么分享,看视频,听歌啥的
    """
    获取微信config字符串
    :param request:
    :return:
    """
    client = Client()  # 创建微信客户端对象
    jsApiClient = WeChatJSAPI(client=client)  # 获取微信 JS-SDK ticket

    timestamp = to_text(int(time.time()))
    nonce_str = random_string(32)
    jsapi_ticket = jsApiClient.get_jsapi_ticket()
    # url = DOMAIN + request.get_full_path()
    url = request.POST.get('uri')
    signature = jsApiClient.get_jsapi_signature(nonce_str,
                                                jsapi_ticket,
                                                timestamp,
                                                url)
    
    wechat_config = {
        'signature': signature,
        "AppId": AppId,
        "jsapi_ticket": jsapi_ticket,
        "timestamp": timestamp,
        "nonceStr": nonce_str,
        'url': url
    }

    return wechat_config


def get_token():
    """
    :http请求方式: GET
    https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid=APPID&secret=APPSECRET
    :获取用户 access_token
    :return:
    """

    url = 'https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid={AppId}&secret={AppSecret}'.format(
        AppId=AppId, AppSecret=AppSecret)
    token_str = requests.get(url).content.decode()

    token_json = json.loads(token_str)
    token = token_json.get('access_token')
    return token


def get_ticket():
    """
    :获取ticket
    :return:
    """
    client = Client()  # 创建微信客户端对象
    jsApiClient = WeChatJSAPI(client=client)  # 获取微信 JS-SDK ticket
    ticket = jsApiClient.get_jsapi_ticket()
    return ticket


def Client():
    """
    : 创建微信客户端对象
    :return:
    """
    client = WeChatClient(AppId, AppSecret)  # 创建微信客户端对象
    return client


views.py

# 1.00 类似微信认证  只需在配置服务器时候调用
@csrf_exempt
def Handle(request):
    """
    :param request:
    :return:
    """
    signature = request.GET.get('signature', '')  # 微信GET 请求携带参数
    timestamp = request.GET.get('timestamp', '')
    nonce = request.GET.get('nonce', '')
    echo_str = request.GET.get('echostr', '')
    try:
        check_signature(TOKEN, signature, timestamp, nonce)  # 验证步骤,通过验证就能连上微信服务器了
    except InvalidSignatureException:
        return HttpResponse(status=403)
    return HttpResponse(echo_str)
r = redis.Redis(host='127.0.0.1', port=6379)  # 创建redis对象
# 1.01 获取微信签名 signature
@csrf_exempt
def get_access_token(request):
    """
    : 获取微信签名 signature
    :param request:
    :return:
    """
    ACCESS_TOKEN = r.get('wx:ACCESS_TOKEN')  # 从redis中获取ACCESS_TOKEN
    if ACCESS_TOKEN:
        return JsonResponse({
            "status": "success",
            "code": 200,
            "WechatJSConfig": get_wechat_config(request)
        })
    try:
        ACCESS_TOKEN = get_token()  # 调用获取token 方法
        r.setex('wx:ACCESS_TOKEN', ACCESS_TOKEN, 7200)  # 将获取到的 ACCESS_TOKEN 存入redis中并且设置过期时间为7200s
        return JsonResponse({
            "status": "success",
            "code": 200,
            "WechatJSConfig": get_wechat_config(request)
        })
    except Exception as e:
        logger.error(e)
        return HttpResponse(str(e))
# 1.02 微信网页授权
def auth(request):
    """
    : 微信网页授权 通过code换取网页授权access_token, 
    :param request:
    :return:
    """
    if request.method == 'GET':
        code = request.GET['code'] if 'code' in request.GET else None
        state = request.GET['state']

        redirect_uri = "https://" + request.get_host() + request.get_full_path()  # OAuth2 redirect URI
        # oauthClient.redirect_uri = redirect_uri  # 这个方法和上面代码一样  获取授权地址
        if code:
            res = oauthClient.fetch_access_token(code=code)
            refresh_token = res['refresh_token']
            print('oauthClient.check_access_token():{} type={}'.format(oauthClient.check_access_token(),
                                                                       type(oauthClient.check_access_token()), ))
            if oauthClient.check_access_token():  # 检查access_token 有效性 , 如果无效则返回 4001, 拿着refresh_token 刷新access_token
                try:  # 利用access_token  获取用户信息
                    user_info = oauthClient.get_user_info()  # 拉取用户信息  此用户可能是 未关注公众号的
                    print("user_info= {} type={}".format(user_info, type(user_info)))
                    wechat_id = user_info['openid']
                    users = Wx_user_info.objects.filter(openid=wechat_id)  # 数据库中已经创建此用户  则更新
                    if not users:  # 如果没有此用户则 创建用户 粗如用户信息
                        user = Wx_user_info.objects.create(openid=user_info["openid"],
                                                           nickname=user_info["nickname"],
                                                           headimgurl=user_info["headimgurl"],
                                                           sex=int(user_info["sex"]),
                                                           city=user_info['city'] if user_info['city'] != None and
                                                                                     user_info['city'] != "" else "",
                                                           country=user_info['country'] if user_info[
                                                                                               'country'] != None and
                                                                                           user_info[
                                                                                               'country'] != "" else "",
                                                           province=user_info['province'] if user_info[
                                                                                                 'province'] != None and
                                                                                             user_info[
                                                                                                 'province'] != "" else ""
                                                           )
                        request.session['user'] = user  # 将用户信息 存至session 中
                        return redirect(state + "?access_token=" + res['access_token'] + "&openid=" + res['openid'])  # 重定向到 state(授权成功 跳转,由前端传state)
                    else:  # 如果存在 则更新用户信息

                        user = Wx_user_info.objects.filter(openid=user_info['openid']).update(
                            nickname=user_info["nickname"],
                            headimgurl=user_info["headimgurl"],
                            sex=int(user_info["sex"]),
                            city=user_info['city'] if user_info['city'] != None and user_info['city'] != "" else "",
                            country=user_info['country'] if user_info['country'] != None and user_info[
                                                                                                 'country'] != "" else "",
                            province=user_info['province'] if user_info['province'] != None and user_info[
                                                                                                    'province'] != "" else ""
                        )
                        request.session['user'] = user
                        return redirect(state + "?access_token=" + res['access_token'] + "&openid=" + res['openid'])
                except Exception as e:
                    logger.error(e)
                    return JsonResponse({
                        "status": "failed",
                        "code": 400,
                        "msg": str(e)
                    })
            else:
                res = oauthClient.refresh_access_token(refresh_token)
                print('res={}'.format(res))
                try:  # 利用access_token  获取用户信息
                    access_token = res['access_token']
                    user_info = oauthClient.get_user_info()
                    print("user_info= {} type={}".format(user_info, type(user_info)))

                    users = Wx_user_info.objects.filter(openid=user_info['openid'])
                    if not users:
                        user = Wx_user_info.objects.create(openid=user_info["openid"],
                                                           nickname=user_info["nickname"],
                                                           headimgurl=user_info["headimgurl"],
                                                           sex=int(user_info["sex"]),
                                                           city=user_info['city'] if user_info['city'] != None and
                                                                                     user_info['city'] != "" else "",
                                                           country=user_info['country'] if user_info[
                                                                                               'country'] != None and
                                                                                           user_info[
                                                                                               'country'] != "" else "",
                                                           province=user_info['province'] if user_info[
                                                                                                 'province'] != None and
                                                                                             user_info[
                                                                                                 'province'] != "" else ""
                                                           )
                        request.session['user'] = user
                        return redirect(state + "?access_token=" + access_token + "&openid=" + res['openid'])
                    else:
                        user = Wx_user_info.objects.filter(openid=user_info['openid']).update(
                            nickname=user_info["nickname"],
                            headimgurl=user_info["headimgurl"],
                            sex=int(user_info["sex"]),
                            city=user_info['city'] if user_info['city'] != None and user_info['city'] != "" else "",
                            country=user_info['country'] if user_info['country'] != None and user_info[
                                                                                                 'country'] != "" else "",
                            province=user_info['province'] if user_info['province'] != None and user_info[
                                                                                                    'province'] != "" else ""
                        )
                        request.session['user'] = user
                        return redirect(state + "?access_token=" + access_token + "&openid=" + res['openid'])
                except Exception as e:
                    logger.error(e)
                    return JsonResponse({
                        "status": "failed",
                        "code": 400,
                        "msg": str(e)
                    })
        else:  # 如果code 不存在 则重定向到 之前的页面
            scope = "snsapi_userinfo"
            state = "STATE"
            _OAUTH_URL = "https://open.weixin.qq.com/connect/oauth2/authorize?appid={0}&redirect_uri={1}&response_type=code&scope={2}&state={3}#wechat_redirect"
            _OAUTH_URL.format(AppId=AppId, redirect_url=quote(redirect_uri), scope=scope, state=state)
            return redirect(_OAUTH_URL)
当然 授权获取用户信息 ,可以写一个装饰器,这样就不用每次都授权(直接将上面1.02稍微修改下就好了)。上面代码都有些将用户信息存至session中,不过JavaScript获取不到session中的数据 , django前后端分离的话,单独写一个或取用户信息的接口就OK了!时间有限写得很粗糙,将就下吧。。。。

微信官方获取签名方法
import time
import random
import string
import hashlib


class Sign:
    def __init__(self, jsapi_ticket, url):
        self.ret = {
            'nonceStr': self.__create_nonce_str(),
            'jsapi_ticket': jsapi_ticket,
            'timestamp': self.__create_timestamp(),
            'url': url
        }

    def __create_nonce_str(self):
        return ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(15))

    def __create_timestamp(self):
        return int(time.time())

    def sign(self):
        string = '&'.join(['%s=%s' % (key.lower(), self.ret[key]) for key in sorted(self.ret)])
        print(string)
        self.ret['signature'] = hashlib.sha1(string.encode('utf-8')).hexdigest()
        return self.ret

if __name__ == '__main__':
    Sign()



 类似资料: