Release Notes
お知らせ
スマート字幕機能 | 説明 | 対応する入力タイプ |
音声認識(ASR)による字幕生成 | ASRにより、対話音声を字幕ファイルに変換し、大規模言語モデルによる翻訳を行います。 音声認識とLLM翻訳の精度を向上させるため、ホットワード辞書や用語集の設定をサポートしています。 字幕を映像へ焼き込むレンダリング処理をサポートしています。 | 音声ファイル、動画ファイル、ライブストリーミング、リアルタイム音声ストリーム |
文字認識(OCR)による字幕生成 | OCRにより、画面上の文字を字幕ファイルとして抽出し、大規模言語モデルによる翻訳を行います。 | 動画ファイル(画面に焼き付け字幕があるもの) |
字幕ファイルの翻訳 | 字幕ファイルを入力とし、LLMを通じて多言語に翻訳し、新しい字幕ファイルを生成します。 | 字幕ファイル(WebVTT、SRT形式に対応) |






{"InputInfo": {"Type": "URL","UrlInputInfo": {"Url": "https://test-1234567.cos.ap-guangzhou.myqcloud.com/video/test.mp4" // 処理したいビデオのURLに置き換えてください}},"SmartSubtitlesTask": {"Definition": 122 // 122はプリセットの「中国ソース→中英字幕生成」テンプレートIDです。ご自身のカスタムスマート字幕テンプレートIDに置き換え可能です"UserExtPara": "" //拡張パラメータ},"OutputStorage": {"CosOutputStorage": {"Bucket": "test-1234567","Region": "ap-guangzhou"},"Type": "COS"},"OutputDir": "/output/","Action": "ProcessMedia","Version": "2019-06-12"}
{"InputInfo": {"Type": "COS","CosInputInfo": {"Bucket": "facedetectioncos-125*****11","Region": "ap-guangzhou","Object": "/video/123.mp4"}},"ScheduleId": 12345, //カスタムワークフローIDに置き換えてください。12345は単なる例であり、実際の意味はありません"Action": "ProcessMedia","Version": "2019-06-12"}
パラメータ名 | パラメータタイプ | 機能の説明 | 参考値 |
need_wordlist | int | 単語のタイムスタンプを返すかどうかを指定します(一部のテンプレートは非対応のため返されません)。デフォルトでは返されません。 1:単語のタイムスタンプを返します。 | 1 |
accurate_mode | int | 精確モードを有効にするかどうかを指定します。精確モードはオプション機能で、より精度の高いタイムスタンプを提供します。デフォルトでは無効です。 1:精確モードを有効にします。 | 1 |
adapt_words | string | ホットワードのテキストです。具体的なフォーマットについては、 をご参照ください。ホットワードは不確定なシナリオに適しており、リクエストごとにカスタマイズ可能です。指定できるホットワードの数は128個以内です。 | Tencent Cloud|10,メディア処理|10 |
{"MediaProcessTask": {"TranscodeTaskSet": [{"Definition": 100040, //トランスコードテンプレートID。必要なテンプレートIDに置き換えてください"OverrideParameter": { //パラメータの上書き。トランスコードテンプレート内の一部パラメータを柔軟に上書きできます"SubtitleTemplate": { //字幕焼き込み設定"Path": "https://test-1234567.cos.ap-nanjing.myqcloud.com/mps_autotest/subtitle/1.vtt","StreamIndex": 2,"FontType": "simkai.ttf","FontSize": "10px","FontColor": "0xFFFFFF","FontAlpha": 0.9}}}]},"InputInfo": { //入力情報"Type": "URL","UrlInputInfo": {"Url": "https://test-1234567.cos.ap-nanjing.myqcloud.com/mps_autotest/subtitle/123.mkv"}},"OutputStorage": { //出力バケット"Type": "COS","CosOutputStorage": {"Bucket": "test-1234567","Region": "ap-nanjing"}},"OutputDir": "/mps_autotest/output2/", //出力パス"Action": "ProcessMedia","Version": "2019-06-12"}





TaskNotifyConfigパラメータでイベントコールバックを設定できます。タスク処理が完了すると、設定されたコールバック情報を通じてタスク結果が通知されます。ParseNotificationを使用してイベント通知結果を解析できます。
_トランスコードテンプレート名を追加して生成したストリームアドレス)をプルすると、字幕が表示されます。再生URLの結合ルールについては、再生URLの結合をご参照ください。{"Url": "http://5000-wenzhen.liveplay.myqcloud.com/live/123.flv","AiRecognitionTask": {"Definition": 10101 //10101はプリセットの中国語字幕テンプレートIDです。ご自身のカスタムスマート認識テンプレートIDに置き換え可能です},"OutputStorage": {"CosOutputStorage": {"Bucket": "6c0f30dfvodgzp*****0800-10****53","Region": "ap-guangzhou"},"Type": "COS"},"OutputDir": "/6c0f30dfvodgzp*****0800/0d1409d3456551**********652/","TaskNotifyConfig": {"NotifyType": "URL","NotifyUrl": "http://****.qq.com/callback/qtatest/?token=*****"},"Action": "ProcessLiveStream","Version": "2019-06-12"}
#!/usr/bin/env python3# -*- coding: utf-8 -*-import argparseimport structimport timeimport osimport signalimport sysimport hashlibimport hmacimport randomfrom urllib.parse import urlencode, urlunsplit, quoteimport websocketsimport asyncioimport loggingimport json# Setup logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__)class AudioPacket:def __init__(self, format=1, is_end=False, timestamp=0, audio_src_id="123456", ext_data=b'', data=b''):self.format = formatself.is_end = is_endself.timestamp = timestampself.audio_src_id = audio_src_idself.ext_data = ext_dataself.data = datadef marshal(self):"""Serialize audio packet to binary format"""header = struct.pack('>BBQH',self.format,1 if self.is_end else 0,self.timestamp,len(self.audio_src_id))audio_src_bytes = self.audio_src_id.encode('utf-8')ext_len = struct.pack('>H', len(self.ext_data))return header + audio_src_bytes + ext_len + self.ext_data + self.datadef sha256hex(s):"""Calculate SHA256 hex digest"""if isinstance(s, str):s = s.encode('utf-8')return hashlib.sha256(s).hexdigest()def hmacsha256(s, key):"""Calculate HMAC-SHA256"""if isinstance(s, str):s = s.encode('utf-8')if isinstance(key, str):key = key.encode('utf-8')return hmac.new(key, s, hashlib.sha256).digest()def generate_random_number(digits):"""Generate random number with specified digits"""low = 10 ** (digits - 1)high = (10 ** digits) - 1return random.randint(low, high)def generate_url_v3(args):"""Generate WebSocket URL with TC3-HMAC-SHA256 signature"""query_params = {}if args.dstLang:query_params["transSrc"] = args.langquery_params["transDst"] = args.dstLangelse:query_params["asrDst"] = args.langquery_params["fragmentNotify"] = "1" if args.frame else "0"query_params["timeoutSec"] = str(args.timeout)timestamp = int(time.time())expire_timestamp = timestamp + 3600query_params["timeStamp"] = str(timestamp)query_params["expired"] = str(expire_timestamp)query_params["secretId"] = args.secretIdquery_params["nonce"] = str(generate_random_number(10))# Sort keys and build canonical query stringsorted_keys = sorted(query_params.keys())canonical_query = "&".join(["{}={}".format(k, quote(query_params[k], safe=''))for k in sorted_keys])# Build canonical requestpath = "/wss/v1/{}".format(args.appid)http_method = "post"canonical_uri = pathcanonical_headers = "content-type:application/json; charset=utf-8\\nhost:{}\\n".format(args.addr)signed_headers = "content-type;host"canonical_request = "{}\\n{}\\n{}\\n{}\\n{}\\n".format(http_method,canonical_uri,canonical_query,canonical_headers,signed_headers,)# Build string to signdate = time.strftime("%Y-%m-%d", time.gmtime(timestamp))credential_scope = "{}/mps/tc3_request".format(date)hashed_canonical = sha256hex(canonical_request)algorithm = "TC3-HMAC-SHA256"string_to_sign = "{}\\n{}\\n{}\\n{}".format(algorithm,timestamp,credential_scope,hashed_canonical)# Calculate signaturesecret_date = hmacsha256(date, "TC3" + args.secretKey)secret_service = hmacsha256("mps", secret_date)secret_signing = hmacsha256("tc3_request", secret_service)signature = hmac.new(secret_signing,string_to_sign.encode('utf-8'),hashlib.sha256).hexdigest()# Add signature to query paramsquery_params["signature"] = signature# Build final URLscheme = "wss" if args.ssl else "ws"url = urlunsplit((scheme,args.addr,path,urlencode(query_params),""))return urlasync def receive_messages(websocket, stop_event):"""Handle incoming WebSocket messages"""try:while not stop_event.is_set():message = await websocket.recv()if isinstance(message, bytes):try:message = message.decode('utf-8')except UnicodeDecodeError:message = str(message)logger.info("Received: %s", message)except Exception as e:logger.info("Connection closed: %s", e)async def run_client():parser = argparse.ArgumentParser()parser.add_argument("--addr", default="mps.cloud.tencent.com", help="websocket service address")parser.add_argument("--file", default="./wx_voice.pcm", help="pcm file path")parser.add_argument("--appid", default="121313131", help="app id")parser.add_argument("--lang", default="zh", help="language")parser.add_argument("--dstLang", default="", help="destination language")parser.add_argument("--frame", action="store_true", help="enable frame notify")parser.add_argument("--secretId", default="123456", help="secret id")parser.add_argument("--secretKey", default="123456", help="secret key")parser.add_argument("--ssl", action="store_true", help="use SSL")parser.add_argument("--timeout", type=int, default=10, help="timeout seconds")parser.add_argument("--wait", type=int, default=700, help="wait seconds after end")args = parser.parse_args()url = generate_url_v3(args)logger.info("Connecting to %s", url)try:# Python 3.6 compatible websockets connectionwebsocket = await websockets.connect(url, ping_timeout=5)# Handle initial responseinitial_msg = await websocket.recv()try:result = json.loads(initial_msg)if result.get("Code", 0) != 0:logger.error("Handshake failed: %s", result.get("Message", ""))returnlogger.info("TaskId %s handshake success", result.get("TaskId", ""))except ValueError: # json.JSONDecodeError not available in 3.6logger.error("Invalid initial message")return# Setup signal handlerloop = asyncio.get_event_loop()stop_event = asyncio.Event()loop.add_signal_handler(signal.SIGINT, stop_event.set)# Start receiverreceiver_task = asyncio.ensure_future(receive_messages(websocket, stop_event))# Audio processingtry:with open(args.file, "rb") as fd:PCM_DUR_MS = 40pcm = bytearray(PCM_DUR_MS * 32)pkt = AudioPacket(data=pcm)is_end = Falsewait_until = 0while not stop_event.is_set():if is_end:if time.time() > wait_until:logger.info("Finish")breakawait asyncio.sleep(0.1)continue# Read PCM datan = fd.readinto(pkt.data)if n < len(pkt.data):pkt.is_end = Trueis_end = Truewait_until = time.time() + args.wait# Send audio packetawait websocket.send(pkt.marshal())logger.info("Sent ts %d", pkt.timestamp)pkt.timestamp += n // 32await asyncio.sleep(PCM_DUR_MS / 1000)except IOError: # FileNotFoundError not available in 3.6logger.error("Open file error: %s", args.file)return# Cleanupawait asyncio.wait_for(receiver_task, timeout=1)await websocket.close()except Exception as e:logger.error("Connection error: %s", e)returnif __name__ == "__main__":# Python 3.6 compatible asyncio runnerloop = asyncio.get_event_loop()try:loop.run_until_complete(run_client())finally:loop.close()
フィードバック