Python 策略模式(灵活调用个不同平台直播SDK)

Python策略模式(灵活调用个不同平台直播SDK)

Main.png

说明:

使用策略模式封装了不同直播平台的SDK。

代码目录结构:

├── init.py
├── context.py
├── live.py
├── live_aliyun.py
├── live_aliyun_edge.py
└── live_ucloud.py

context.py

环境类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Created by lizhiwei at 2018/12/17
import logging

from .live_aliyun import LiveAliyun
from .live_aliyun_edge import LiveAliyunEdge
from .live_ucloud import LiveUcloud

logger = logging.getLogger('debug')


class LiveContext(object):
LIVE_PLATFORM = {
"aliyun": LiveAliyun,
"ucloud": LiveUcloud,
'aliyun_edge': LiveAliyunEdge
}

def __init__(self, platform="", channel=""):
"""Constructor"""
self._platform = platform.lower()
self._channel = channel

self._live = self.LIVE_PLATFORM[self._platform](channel)

@property
def channel(self):
"""
The channel property - the getter
"""
return self._channel

@channel.setter
def channel(self, value):
"""
The setter of the channel property
"""
self._channel = value

@property
def platform(self):
"""
The platform property - the getter
"""
return self._platform

@platform.setter
def platform(self, value):
"""
The setter of the platform property
"""
self._platform = value

@property
def pusher(self):
"""
The pusher property - the getter
"""
return self._live.get_pusher()

@property
def rtmp_player(self):
"""
The rtmp player property - the getter
"""
return self._live.get_rtmp_player()

@property
def hls_player(self):
"""
The hls player property - the getter
"""
return self._live.get_hls_player()

@property
def players(self):
"""
The players property - the getter
"""
return self._live.get_players()

def record_pusher(self, record_id):
"""
The record_pusher
"""
return self._live.get_record_pusher(record_id=record_id)

def get_obj_url(self, name, expires=3600):
return self._live.get_obj_url(name, expires)

def put_stream_file(self, local, target, mime_type='image/jpeg'):
return self._live.put_stream_file(local=local, target=target, mime_type=mime_type)

def upload_file(self, local, target, mime_type='image/jpeg'):
return self._live.upload_file(local, target)

live.py

抽象策略类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Created by lizhiwei at 2018/12/17


class Live(object):
"""Live stragegy."""

def __init__(self, channel):
self.channel = channel

def get_pusher(self):
raise NotImplementedError()

def get_rtmp_player(self):
raise NotImplementedError()

def get_record_pusher(self, record_id):
raise NotImplementedError()

def get_hls_player(self):
raise NotImplementedError()

def get_flv_player(self):
raise NotImplementedError()

def get_players(self):
raise NotImplementedError()

def get_obj_url(self, name, expries):
raise NotImplementedError()

def put_stream_file(self, local, target, mime_type):
raise NotImplementedError()

def upload_file(self, local, target):
raise NotImplementedError()

live_aliyun.py

阿里云直播

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Created by lizhiwei at 2018/12/17
import logging

from django.conf import settings

from utils import oss_util
from utils.aliyun_auth import add_a_auth
from .live import Live

logger = logging.getLogger('debug')


class LiveAliyun(Live):
"""Live Aliyun stragegy.

参考:http://www.voidcn.com/article/p-yrdycvtt-bpk.html
"""

ak_id = settings.ALIYUN_AK_ID
ak_secret = settings.ALIYUN_AK_SECRET
centre_live_domain = settings.ALIYUN_CENTRE_LIVE_DOMAIN
live_record_oss_bucket = settings.ALIYUN_CENTRE_LIVE_RECORD_OSS_BUCKET
live_record_region = settings.ALIYUN_CENTRE_LIVE_RECORD_REGION
app_name = settings.ALIYUN_CENTRE_LIVE_APPNAME
a_auth_key = settings.ALIYUN_CENTRE_A_AUTH_KEY
auth_exp = settings.ALIYUN_AUTH_EXP
oss_auth_exp = settings.ALIYUN_OSS_URL_EXPRIES

def __init__(self, channel):
super(LiveAliyun, self).__init__(channel=channel)

def get_pusher(self):
'''
中心推流:
完整的推流地址,形如:rtmp://video-center.alivecdn.com/{AppName}/{StreamName}?vhost={yourdomain}
eg.
rtmp://video-center.alivecdn.com/ra/raa?vhost=live.rabbitslive.com&auth_key=1545213384-0-0-ad382da6a9c36ce34c06a0c538d5f244

边缘推流:
rtmp://live.overseas.rabbitslive.com/rest/123?auth_key=1545716465-0-0-a7541fe9b19ddf594044b5ee6483f533
:return:
'''

# 中心推流
url = "rtmp://video-center.alivecdn.com/{AppName}/{StreamName}?vhost={LiveDomain}".format(
AppName=self.app_name,
StreamName=self.channel,
LiveDomain=self.centre_live_domain,
)
logger.debug("pusher url: %s" % url)
return add_a_auth(uri=url, key=self.a_auth_key, exp=self.auth_exp)

def get_rtmp_player(self):
url = "rtmp://{LiveDomain}/{AppName}/{StreamName}?".format(
AppName=self.app_name,
StreamName=self.channel,
LiveDomain=self.centre_live_domain,
)

return add_a_auth(uri=url, key=self.a_auth_key, exp=self.auth_exp)

def get_hls_player(self):
url = "http://{LiveDomain}/{AppName}/{StreamName}.m3u8".format(
AppName=self.app_name,
StreamName=self.channel,
LiveDomain=self.centre_live_domain,
)

return add_a_auth(uri=url, key=self.a_auth_key, exp=self.auth_exp)

def get_flv_player(self):
url = "http://{LiveDomain}/{AppName}/{StreamName}.flv".format(
AppName=self.app_name,
StreamName=self.channel,
LiveDomain=self.centre_live_domain,
)

return add_a_auth(uri=url, key=self.a_auth_key, exp=self.auth_exp)

def get_players(self):
return {
"rtmp": self.get_rtmp_player(),
"flv": self.get_flv_player(),
"hls": self.get_hls_player(),
}

def get_record_pusher(self, record_id):
url = "rtmp://video-center.alivecdn.com/record_{AppName}/{StreamName}_{record_id}?vhost={LiveDomain}".format(
AppName=self.app_name,
StreamName=self.channel,
LiveDomain=self.centre_live_domain,
record_id=record_id
)
logger.debug("pusher url: %s" % url)
return add_a_auth(uri=url, key=self.a_auth_key, exp=self.auth_exp)

def get_obj_url(self, name, expries):
return oss_util.get_obj_url(
access_key_id=self.ak_id,
access_key_secret=self.ak_secret,
region=self.live_record_region,
name=name,
bucket_name=self.live_record_oss_bucket,
expires=int(self.oss_auth_exp)
)

def put_stream_file(self, local, target, mime_type):
logger.debug('input: %s' % local)
logger.debug('target: %s' % target)
oss_util.put_obj(
access_key_id=self.ak_id,
access_key_secret=self.ak_secret,
region=self.live_record_region,
bucket_name=self.live_record_oss_bucket,
input=local,
target=target
)

def upload_file(self, local, target):
oss_util.put_obj_from_file(
access_key_id=self.ak_id,
access_key_secret=self.ak_secret,
region=self.live_record_region,
bucket_name=self.live_record_oss_bucket,
local_file=local,
target=target
)
import os
os.remove(local)

live_aliyun_edge.py

阿里云边缘推流(适用于海外地区)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Created by lizhiwei at 2018/12/17
import logging

from django.conf import settings

from utils import oss_util
from utils.aliyun_auth import add_a_auth
from .live import Live

logger = logging.getLogger('debug')


class LiveAliyunEdge(Live):
"""Live Aliyun stragegy.

参考:http://www.voidcn.com/article/p-yrdycvtt-bpk.html
"""

ak_id = settings.ALIYUN_AK_ID
ak_secret = settings.ALIYUN_AK_SECRET
push_domain = settings.ALIYUN_EDGE_LIVE_PUSH_DOMAIN
play_domain = settings.ALIYUN_EDGE_LIVE_PLAY_DOMAIN
live_record_oss_bucket = settings.ALIYUN_EDGE_LIVE_RECORD_OSS_BUCKET
live_record_region = settings.ALIYUN_EDGE_LIVE_RECORD_REGION
app_name = settings.ALIYUN_EDGE_LIVE_APPNAME
a_auth_key = settings.ALIYUN_EDGE_A_AUTH_KEY
auth_exp = settings.ALIYUN_AUTH_EXP
oss_auth_exp = settings.ALIYUN_OSS_URL_EXPRIES

def __init__(self, channel):
super(LiveAliyunEdge, self).__init__(channel=channel)

def get_pusher(self):
'''
中心推流:
完整的推流地址,形如:rtmp://video-center.alivecdn.com/{AppName}/{StreamName}?vhost={yourdomain}
eg.
rtmp://video-center.alivecdn.com/ra/raa?vhost=live.rabbitslive.com&auth_key=1545213384-0-0-ad382da6a9c36ce34c06a0c538d5f244

边缘推流:
rtmp://live.overseas.rabbitslive.com/rest/123?auth_key=1545716465-0-0-a7541fe9b19ddf594044b5ee6483f533
:return:
'''

# 边缘
url = "rtmp://{LiveDomain}/{AppName}/{StreamName}".format(
AppName=self.app_name,
StreamName=self.channel,
LiveDomain=self.push_domain,
)
logger.debug("pusher url: %s" % url)
# return url
return add_a_auth(uri=url, key=self.a_auth_key, exp=self.auth_exp)

def get_rtmp_player(self):
url = "rtmp://{LiveDomain}/{AppName}/{StreamName}".format(
AppName=self.app_name,
StreamName=self.channel,
LiveDomain=self.play_domain,
)

return add_a_auth(uri=url, key=self.a_auth_key, exp=self.auth_exp)

def get_hls_player(self):
url = "http://{LiveDomain}/{AppName}/{StreamName}.m3u8".format(
AppName=self.app_name,
StreamName=self.channel,
LiveDomain=self.play_domain,
)

return add_a_auth(uri=url, key=self.a_auth_key, exp=self.auth_exp)

def get_flv_player(self):
url = "http://{LiveDomain}/{AppName}/{StreamName}.flv".format(
AppName=self.app_name,
StreamName=self.channel,
LiveDomain=self.play_domain,
)

return add_a_auth(uri=url, key=self.a_auth_key, exp=self.auth_exp)

def get_players(self):
return {
"rtmp": self.get_rtmp_player(),
"flv": self.get_flv_player(),
"hls": self.get_hls_player(),
}

def get_record_pusher(self, record_id):
# edge

# live
# url = "rtmp://{LiveDomain}/{AppName}/{StreamName}".format(
# AppName=self.app_name,
# StreamName=self.channel,
# LiveDomain=self.push_domain,
# )

# record
# url = "rtmp://{LiveDomain}/record_{AppName}/{StreamName}_{record_id}".format(
#
# AppName=self.app_name,
# StreamName=self.channel,
# LiveDomain=self.push_domain,
# record_id=record_id
# )

# rtmp://video-center-jp.alivecdn.com/AppName/StreamName?vhost=live.overseas.publish.rabbitslive.com

# 日本节点使用中心推流进行录制
url = "rtmp://video-center-jp.alivecdn.com/record_{AppName}/{StreamName}_{record_id}?vhost={LiveDomain}".format(
AppName=self.app_name,
StreamName=self.channel,
LiveDomain=self.play_domain,
record_id=record_id
)

# TODO 日本节点录制失败,暂时使用国内节点进行录制
# url = "rtmp://video-center.alivecdn.com/record_{AppName}/{StreamName}_{record_id}?vhost=live.rabbitslive.com".format(
# AppName=self.app_name,
# StreamName=self.channel,
# # LiveDomain=self.centre_live_domain,
# record_id=record_id
# )
# logger.debug("pusher url: %s" % url)
# # return url
#
# return add_a_auth(uri=url, key="a8f5rl2TCUXCmoPL", exp=self.auth_exp)
return add_a_auth(uri=url, key=self.a_auth_key, exp=self.auth_exp)

def get_obj_url(self, name, expries):
url = oss_util.get_obj_url(
access_key_id=self.ak_id,
access_key_secret=self.ak_secret,
region=self.live_record_region,
name=name,
bucket_name=self.live_record_oss_bucket,
expires=int(self.oss_auth_exp)
)

# logger.debug('obj url: %s' % url)
return url

def put_stream_file(self, local, target, mime_type):
logger.debug('input: %s' % local)
logger.debug('target: %s' % target)
oss_util.put_obj(
access_key_id=self.ak_id,
access_key_secret=self.ak_secret,
region=self.live_record_region,
bucket_name=self.live_record_oss_bucket,
input=local,
target=target
)

def upload_file(self, local, target):
oss_util.put_obj_from_file(
access_key_id=self.ak_id,
access_key_secret=self.ak_secret,
region=self.live_record_region,
bucket_name=self.live_record_oss_bucket,
local_file=local,
target=target
)
import os
os.remove(local)

live_ucloud.py

Ucloud直播

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Created by lizhiwei at 2018/12/17
import time

from django.conf import settings

from utils.ucloud_util import auth_key, private_download_url, put_stream_file, post_upload_file
from .live import Live


class LiveUcloud(Live):
"""Live Ucloud stragegy."""

def __init__(self, channel):
super(LiveUcloud, self).__init__(channel=channel)

defget_pusher(self):
timestamp = int(time.time())
url = "rtmp://{0}/{1}/{2}".format(
settings.PUSH_DOMAIN,
settings.ACCESS_POINT,
self.channel, timestamp)
return url

def get_rtmp_player(self):
url = "rtmp://{0}/{1}/{2}".format(
settings.LIVE_DOMAIN,
settings.ACCESS_POINT,
self.channel)
return auth_key(url, settings.LIVE_KEY)

def get_record_pusher(self, record_id):
timestamp = int(time.time())
url = "rtmp://{0}/{1}/{2}?record=true&filename={1}_{2}_{3}".format(
settings.PUSH_DOMAIN,
settings.ACCESS_POINT,
self.channel,
record_id,
timestamp)
return url

def get_hls_player(self):
url = "http://{0}/{1}/{2}/playlist.m3u8".format(
settings.HLS_DOMAIN,
settings.ACCESS_POINT,
self.channel)
return auth_key(url, settings.LIVE_KEY)

def get_flv_player(self):
url = "http://{0}/{1}/{2}/playlist.flv".format(
settings.HLS_DOMAIN,
settings.ACCESS_POINT,
self.channel)
return auth_key(url, settings.LIVE_KEY)

def get_players(self):
return {
"rtmp": self.get_rtmp_player(),
"flv": self.get_flv_player(),
"hls": self.get_hls_player(),
}

def get_obj_url(self, name, expries=3600):
_, photo_url = private_download_url(settings.UCLOUD_PUB_KEY, settings.UCLOUD_PRIV_KEY,
settings.UCLOUD_BUCKET_DOMAIN,
name)
if 200 == _:
return photo_url

def put_stream_file(self, local, target, mime_type):
return put_stream_file(
settings.UCLOUD_PUB_KEY, settings.UCLOUD_PRIV_KEY,
settings.UCLOUD_BUCKET_DOMAIN,
key=target,
stream=local,
mime_type=mime_type
)

def upload_file(self, local, target):
resp_status = post_upload_file(
settings.UCLOUD_PUB_KEY, settings.UCLOUD_PRIV_KEY,
settings.UCLOUD_BUCKET_DOMAIN,
key=target,
local_file=local
)
if resp_status:
import os
os.remove(local)
-------------The End -------------