baijiayun.client 源代码

# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals

import logging
import time

from . import api
from .base import BaseClient
from ..core.utils import Md5Signer, to_text
from ..storage.cache import BaiJiaYunClientCache
from ..storage.memorystorage import MemoryStorage

logger = logging.getLogger(__name__)
default_storage = MemoryStorage()


[文档]class BaiJiaYunClient(BaseClient): doc = api.Doc() evaluation = api.Evaluation() hudong = api.HuDong() live = api.Live() liveaccount = api.LiveAccount() livesetting = api.LiveSetting() notice = api.Notice() playback = api.PlayBack() quiz = api.Quiz() room = api.Room() roomdata = api.RoomData() smallcourse = api.SmallCourse() subaccount = api.SubAccount() video = api.Video() videoaccount = api.VideoAccount() videodata = api.VideoData() def __init__(self, partner_id, secret_key, private_domain=None, cache_prefix=None, storage=None, timeout=None): super(BaiJiaYunClient, self).__init__(timeout) self.partner_id = partner_id self.secret_key = secret_key self.private_domain = private_domain self.cache = BaiJiaYunClientCache(storage or default_storage, cache_prefix or self.partner_id) if private_domain is not None: self.API_BASE_URL = 'https://%s.at.baijiayun.com/' % private_domain @property def partner_key(self): return self.get_partner_key() def get_partner_key(self, regenerate=0, force=False): partner_key = self.cache.partner_key.get() if partner_key is None or force: partner_key = self._request( 'POST', '/openapi/partner/createkey', data={ 'partner_id': self.partner_id, 'secret_key': self.secret_key, 'regenerate': regenerate, 'timestamp': int(time.time()) }, result_processor=lambda x: x['partner_key'] ) self.cache.partner_key.set(value=partner_key, ttl=7200) return partner_key def get_sign(self, data): signer = Md5Signer(delimiter=b'&', key="partner_key=" + self.partner_key) for k, v in data.items(): v = to_text(v) signer.add_data("%s=%s" % (k, v)) return signer.signature def add_sign(self, data, timestamp_key="timestamp", sign_key="sign"): if timestamp_key and timestamp_key not in data: data[timestamp_key] = int(time.time()) data[sign_key] = self.get_sign(data) return data def _handle_pre_request(self, method, uri, kwargs): if method == 'POST': data = kwargs.setdefault('data', dict()) data.update(kwargs.pop('params', dict())) else: data = kwargs.setdefault('params', dict()) self.add_sign(data) return method, uri, kwargs def _handle_request_except(self, e, func, *args, **kwargs): raise e
[文档] def get_web_sign(self, room_id, user_number, user_name, user_role, user_avatar='', group_id=None): """ 直播进教室签名的计算 :param room_id: 教室号 :param user_number: 用户number号,必须为数字,不带符号的正整数,最大支持19位 :param user_name: 用户昵称 :param user_role: 用户角色 0:学生 1:老师 2:助教 :param user_avatar: 用户头像url ,必须传http/https开头的绝对路径 :param group_id: 分组号 """ data = { 'room_id': room_id, 'user_number': user_number, 'user_name': user_name, 'user_role': user_role, 'user_avatar': user_avatar } if group_id is not None: data['group_id'] = group_id return self.get_sign(data)
[文档] def check_sign(self, data, check_timestamp_second=300): """ 回调接口签名验证 :param data: 请求全部数据 :param check_timestamp_second: 时间戳与服务器时间误差范围,传0不验证时间戳 """ assert isinstance(data, dict) if 'sign' not in data: return False if check_timestamp_second > 0: if 'timestamp' not in data or abs(data['timestamp'] - time.time()) > check_timestamp_second: return False data = data.copy() sign = data.pop('sign', None) return sign == self.get_sign(data)