产品动态
产品公告
智能字幕功能 | 描述 | 支持输入类型 |
语音识别(ASR)生成字幕 | 通过 ASR 语音识别,将对白转换为字幕文件,并进行大模型翻译。 支持配置热词库和术语库,以提高语音识别和大模型翻译的准确率。 支持将字幕压制渲染到视频画面中。 | 音频文件、视频文件、直播流、实时音频流 |
文本识别(OCR)生成字幕 | 通过 OCR 文本识别,将画面上的文字提取为字幕文件,并进行大模型翻译。 | 视频文件(且画面上带有硬字幕) |
翻译字幕文件 | 输入需为字幕文件,通过大模型翻译为多语种,生成新字幕文件。 | 字幕文件(支持 WebVTT 、SRT 格式) |






{"InputInfo": {"Type": "URL","UrlInputInfo": {"Url": "https://test-1234567.cos.ap-guangzhou.myqcloud.com/video/test.mp4" // 替换成需要处理的视频 URL}},"SmartSubtitlesTask": {"Definition": 122, //122为预设中文源视频-生成中英文字幕模板ID,可替换为您的自定义智能字幕模板ID"UserExtPara": "" //扩展参数},"OutputStorage": {"CosOutputStorage": {"Bucket": "test-1234567","Region": "ap-guangzhou"},"Type": "COS"},"OutputDir": "/output/","Action": "ProcessMedia","Version": "2019-06-12"}
{"InputInfo": {"Type": "COS","CosInputInfo": {"Bucket": "facedetectioncos-125*****11","Region": "ap-guangzhou","Object": "/video/123.mp4"}},"ScheduleId": 12345, //替换为自定义编排ID,12345为填写示例,不具备实际意义"Action": "ProcessMedia","Version": "2019-06-12"}
参数名称 | 参数类型 | 功能描述 | 参考值 |
need_wordlist | int | 是否返回字词时间戳(部分模板不支持,不返回),默认不返回。 1:返回字词时间戳。 | 1 |
accurate_mode | int | 是否开启精准模式,精准模式是一个可选项,提供更加精准的时间戳,默认不开启。 1:开启精准模式。 | 1 |
adapt_words | string | 腾讯云|10,媒体处理|10 |
{"MediaProcessTask": {"TranscodeTaskSet": [{"Definition": 100040, //转码模板ID;需要替换为您需要的转码模板"OverrideParameter": { //覆盖参数;用于灵活覆盖转码模板中的部分参数"SubtitleTemplate": { //字幕压制配置"Path": "https://test-1234567.cos.ap-nanjing.myqcloud.com/mps_autotest/subtitle/1.vtt","StreamIndex": 2,"FontType": "simkai.ttf","FontSize": "10px","FontColor": "0xFFFFFF","FontAlpha": 0.9}}}]},"InputInfo": { //输入信息"Type": "URL","UrlInputInfo": {"Url": "https://test-1234567.cos.ap-nanjing.myqcloud.com/mps_autotest/subtitle/123.mkv"}},"OutputStorage": { //输出存储桶"Type": "COS","CosOutputStorage": {"Bucket": "test-1234567","Region": "ap-nanjing"}},"OutputDir": "/mps_autotest/output2/", //输出路径"Action": "ProcessMedia","Version": "2019-06-12"}





TaskNotifyConfig 参数配置事件回调。当任务处理完成后,会通过配置的回调信息回调任务结果,您可以通过 ParseNotification 解析事件通知结果。
{"Url": "http://5000-wenzhen.liveplay.myqcloud.com/live/123.flv","AiRecognitionTask": {"Definition": 10101 //10101为预设中文字幕模板ID,可替换为您的自定义智能识别模板ID},"OutputStorage": {"CosOutputStorage": {"Bucket": "6c0f30dfvodgzp*****0800-10****53","Region": "ap-guangzhou"},"Type": "COS"},"OutputDir": "/6c0f30dfvodgzp*****0800/0d1409d3456551**********652/","TaskNotifyConfig": {"NotifyType": "URL","NotifyUrl": "http://****.qq.com/callback/qtatest/?token=*****"},"Action": "ProcessLiveStream","Version": "2019-06-12"}
#!/usr/bin/env python3# -*- coding: utf-8 -*-import argparseimport structimport timeimport osimport signalimport sysimport hashlibimport hmacimport randomfrom urllib.parse import urlencode, urlunsplit, quoteimport websocketsimport asyncioimport loggingimport json# Setup logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__)class AudioPacket:def __init__(self, format=1, is_end=False, timestamp=0, audio_src_id="123456", ext_data=b'', data=b''):self.format = formatself.is_end = is_endself.timestamp = timestampself.audio_src_id = audio_src_idself.ext_data = ext_dataself.data = datadef marshal(self):"""Serialize audio packet to binary format"""header = struct.pack('>BBQH',self.format,1 if self.is_end else 0,self.timestamp,len(self.audio_src_id))audio_src_bytes = self.audio_src_id.encode('utf-8')ext_len = struct.pack('>H', len(self.ext_data))return header + audio_src_bytes + ext_len + self.ext_data + self.datadef sha256hex(s):"""Calculate SHA256 hex digest"""if isinstance(s, str):s = s.encode('utf-8')return hashlib.sha256(s).hexdigest()def hmacsha256(s, key):"""Calculate HMAC-SHA256"""if isinstance(s, str):s = s.encode('utf-8')if isinstance(key, str):key = key.encode('utf-8')return hmac.new(key, s, hashlib.sha256).digest()def generate_random_number(digits):"""Generate random number with specified digits"""low = 10 ** (digits - 1)high = (10 ** digits) - 1return random.randint(low, high)def generate_url_v3(args):"""Generate WebSocket URL with TC3-HMAC-SHA256 signature"""query_params = {}if args.dstLang:query_params["transSrc"] = args.langquery_params["transDst"] = args.dstLangelse:query_params["asrDst"] = args.langquery_params["fragmentNotify"] = "1" if args.frame else "0"query_params["timeoutSec"] = str(args.timeout)timestamp = int(time.time())expire_timestamp = timestamp + 3600query_params["timeStamp"] = str(timestamp)query_params["expired"] = str(expire_timestamp)query_params["secretId"] = args.secretIdquery_params["nonce"] = str(generate_random_number(10))# Sort keys and build canonical query stringsorted_keys = sorted(query_params.keys())canonical_query = "&".join(["{}={}".format(k, quote(query_params[k], safe=''))for k in sorted_keys])# Build canonical requestpath = "/wss/v1/{}".format(args.appid)http_method = "post"canonical_uri = pathcanonical_headers = "content-type:application/json; charset=utf-8\\nhost:{}\\n".format(args.addr)signed_headers = "content-type;host"canonical_request = "{}\\n{}\\n{}\\n{}\\n{}\\n".format(http_method,canonical_uri,canonical_query,canonical_headers,signed_headers,)# Build string to signdate = time.strftime("%Y-%m-%d", time.gmtime(timestamp))credential_scope = "{}/mps/tc3_request".format(date)hashed_canonical = sha256hex(canonical_request)algorithm = "TC3-HMAC-SHA256"string_to_sign = "{}\\n{}\\n{}\\n{}".format(algorithm,timestamp,credential_scope,hashed_canonical)# Calculate signaturesecret_date = hmacsha256(date, "TC3" + args.secretKey)secret_service = hmacsha256("mps", secret_date)secret_signing = hmacsha256("tc3_request", secret_service)signature = hmac.new(secret_signing,string_to_sign.encode('utf-8'),hashlib.sha256).hexdigest()# Add signature to query paramsquery_params["signature"] = signature# Build final URLscheme = "wss" if args.ssl else "ws"url = urlunsplit((scheme,args.addr,path,urlencode(query_params),""))return urlasync def receive_messages(websocket, stop_event):"""Handle incoming WebSocket messages"""try:while not stop_event.is_set():message = await websocket.recv()if isinstance(message, bytes):try:message = message.decode('utf-8')except UnicodeDecodeError:message = str(message)logger.info("Received: %s", message)except Exception as e:logger.info("Connection closed: %s", e)async def run_client():parser = argparse.ArgumentParser()parser.add_argument("--addr", default="mps.cloud.tencent.com", help="websocket service address")parser.add_argument("--file", default="./wx_voice.pcm", help="pcm file path")parser.add_argument("--appid", default="121313131", help="app id")parser.add_argument("--lang", default="zh", help="language")parser.add_argument("--dstLang", default="", help="destination language")parser.add_argument("--frame", action="store_true", help="enable frame notify")parser.add_argument("--secretId", default="123456", help="secret id")parser.add_argument("--secretKey", default="123456", help="secret key")parser.add_argument("--ssl", action="store_true", help="use SSL")parser.add_argument("--timeout", type=int, default=10, help="timeout seconds")parser.add_argument("--wait", type=int, default=700, help="wait seconds after end")args = parser.parse_args()url = generate_url_v3(args)logger.info("Connecting to %s", url)try:# Python 3.6 compatible websockets connectionwebsocket = await websockets.connect(url, ping_timeout=5)# Handle initial responseinitial_msg = await websocket.recv()try:result = json.loads(initial_msg)if result.get("Code", 0) != 0:logger.error("Handshake failed: %s", result.get("Message", ""))returnlogger.info("TaskId %s handshake success", result.get("TaskId", ""))except ValueError: # json.JSONDecodeError not available in 3.6logger.error("Invalid initial message")return# Setup signal handlerloop = asyncio.get_event_loop()stop_event = asyncio.Event()loop.add_signal_handler(signal.SIGINT, stop_event.set)# Start receiverreceiver_task = asyncio.ensure_future(receive_messages(websocket, stop_event))# Audio processingtry:with open(args.file, "rb") as fd:PCM_DUR_MS = 40pcm = bytearray(PCM_DUR_MS * 32)pkt = AudioPacket(data=pcm)is_end = Falsewait_until = 0while not stop_event.is_set():if is_end:if time.time() > wait_until:logger.info("Finish")breakawait asyncio.sleep(0.1)continue# Read PCM datan = fd.readinto(pkt.data)if n < len(pkt.data):pkt.is_end = Trueis_end = Truewait_until = time.time() + args.wait# Send audio packetawait websocket.send(pkt.marshal())logger.info("Sent ts %d", pkt.timestamp)pkt.timestamp += n // 32await asyncio.sleep(PCM_DUR_MS / 1000)except IOError: # FileNotFoundError not available in 3.6logger.error("Open file error: %s", args.file)return# Cleanupawait asyncio.wait_for(receiver_task, timeout=1)await websocket.close()except Exception as e:logger.error("Connection error: %s", e)returnif __name__ == "__main__":# Python 3.6 compatible asyncio runnerloop = asyncio.get_event_loop()try:loop.run_until_complete(run_client())finally:loop.close()
文档反馈