tencent cloud

最佳实践 demo
最后更新时间:2025-10-20 10:54:02
最佳实践 demo
最后更新时间: 2025-10-20 10:54:02

ASR/ChatGPT/数智人整合 Demo

本 Demo 展示了如何通过 ASR/ChatGPT,与数智人进行交互。

alt text



交互数智人实践教程概述

交互数智人 aPaaS 接口的调用建议参考以下流程:
1. 初始化 API 调用对象:设置 API 调用的基础信息,包括 AppKey、Access Token 以及 HTTP 和 WebSocket 的域名。
2. 创建视频流:使用形象资产 ID 或数智人平台项目 ID 创建视频流,指定视频流协议和驱动类型。
3. 检测会话状态:循环检查会话状态,直到会话状态变为已就绪(SessionStatus=1)。
4. 开启会话:调用 API 以开启会话。
5. 创建长链接通道:建立长连接通道,以便进行持续的通信。启动两个线程,一个用于定期获取下行信息,另一个用于定期发送心跳指令,以保持连接活跃。
6. 发送驱动指令:根据不同的场景(参见本文档:1.数智人绑定对话服务场景、2.用户自行接入大模型场景、3.音频驱动场景),发送相应的驱动指令,详细的适用场景与驱动指令的关系参考驱动指令说明
7. 关闭长连接通道和会话:根据不同场景的情况关闭长连接通道和会话。
其中步骤需要根据不同的场景来选择不同的驱动指令,其他步骤的逻辑基本一致。以下是该流程的代码 Demo:
Go
Python
package main

import (
"encoding/base64"
"fmt"
"io"
"log"
"os"
"strings"
"time"

uuid "github.com/satori/go.uuid"
"k8s.io/apimachinery/pkg/util/rand"
)

func main() {
//第一步:初始化api调用对象
var api = NewVHApi(
"your appkey", //your appkey
"your accesstoken", //your accesstoken
"https://gw.tvs.qq.com", //http domain
"wss://gw.tvs.qq.com") //wss domain
//第二步:使用形象资产ID或数智人项目ID创建视频流(使用形象资产ID可以指定音色相关参数,对话场景这些也没有明确;使用数智人项目ID里面包含了音色信息以及对话服务配置)
//这里使用数智人项目ID创建视频流,以webrtc协议为例
createSessionRsp, err := api.CreateSession(CreateSessionReq{
UserId: "your user id",
VirtualmanProjectId: "your virtualman project id",
Protocol: "webrtc", //可选:webrtc、rtmp、trtc
DriverType: 1, //drivetype=1代表支持文本驱动指令;drivetype=3代表支持音频和文本驱动指令
})
if err != nil {
log.Fatalf("err:%v", err)
}
fmt.Println("获取流播放地址", createSessionRsp.PlayStreamAddr)
for {
var userInput string
fmt.Print("是否成功复制地址(输入'y'确定): ")
fmt.Scanln(&userInput)
if strings.ToLower(userInput) == "y" {
break
}
}
//第三步:循环检测会话状态直到SessionStatus=1(已就绪)
sessionStatus := createSessionRsp.SessionStatus
for sessionStatus != 1 {
time.Sleep(time.Second * 1)
statSessionRsp, err := api.StatSession()
if err != nil {
log.Fatalf("err:%v", err)
}
sessionStatus = statSessionRsp.SessionStatus
}
//第四步:开启会话
_, err = api.StartSession()
if err != nil {
log.Fatalf("err:%v", err)
}

//第五步:创建长链接通道
err = api.CommandChannel()
if err != nil {
log.Fatalf("err:%v", err)
}
// 开启协程每过3s获取下行信息
go func() {
for {
time.Sleep(time.Second * 3)
fmt.Println("\\n-----开始调用长连接下行信息-----")
api.ReadWssMessage()
}
}()
//开启协程每过1分钟发送心跳指令
go func() {
for {
time.Sleep(time.Minute * 1)
fmt.Println("\\n-----开始调用长连接发送心跳指令-----")
api.SendHeartBeatCommand()
}
}()

//第六步:发送驱动指令
//根据不同的场景选择不同的函数发送驱动指令,如大模型对话,调用以下函数(更多场景请见下文)
largeModelDialogue()
//第七步:关闭长连接通道和会话
//90s后关闭长连接通道(根据自己的场景来选择断连时间)
time.Sleep(time.Second * 90)
api.CloseSession()
}
import base64
import time
import uuid
import random
import logging
import sys
import threading
from websocket import WebSocketApp # 需要安装websockets库: pip install websocket-client

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class VHApi:
def __init__(self, appkey, accesstoken, http_domain, wss_domain):
self.appkey = appkey
self.accesstoken = accesstoken
self.http_domain = http_domain
self.wss_domain = wss_domain
self.session_id = None
self.ws = None
self.session_status = 0 # 0: 未就绪, 1: 已就绪

def create_session(self, user_id, virtualman_project_id, protocol="rtmp", driver_type=1):
"""创建视频流会话"""
# 在实际实现中,这里应该发送HTTP请求到API
logger.info(f"创建会话 - 用户ID: {user_id}, 项目ID: {virtualman_project_id}")
# 模拟API响应
self.session_id = str(uuid.uuid4())
play_stream_addr = f"{self.http_domain}/stream/{self.session_id}"
return {
"PlayStreamAddr": play_stream_addr,
"SessionStatus": 0
}

def stat_session(self):
"""查询会话状态"""
# 在实际实现中,这里应该发送HTTP请求到API
logger.info(f"查询会话状态 - 会话ID: {self.session_id}")
# 模拟状态更新
# 实际应用中应该从API响应中获取真实状态
return {"SessionStatus": self.session_status}

def start_session(self):
"""开启会话"""
# 在实际实现中,这里应该发送HTTP请求到API
logger.info(f"开启会话 - 会话ID: {self.session_id}")
return True

def command_channel(self):
"""创建长连接通道"""
logger.info(f"创建长连接通道 - {self.wss_domain}/ws/{self.session_id}")
# 实际应用中应该实现WebSocket连接
self.ws = WebSocketApp(
f"{self.wss_domain}/ws/{self.session_id}",
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close
)
# 启动WebSocket线程
threading.Thread(target=self.ws.run_forever, daemon=True).start()
time.sleep(1) # 等待连接建立
return None

def on_message(self, ws, message):
"""处理收到的消息"""
logger.info(f"收到消息: {message}")

def on_error(self, ws, error):
"""处理错误"""
logger.error(f"WebSocket错误: {error}")

def on_close(self, ws, close_status_code, close_msg):
"""处理连接关闭"""
logger.info(f"WebSocket连接关闭: {close_status_code} {close_msg}")

def read_wss_message(self):
"""读取下行信息"""
# 在实际实现中,应该从WebSocket连接读取消息
logger.info("读取下行信息...")
# 这里可以实现消息读取逻辑

def send_heartbeat_command(self):
"""发送心跳指令"""
if self.ws and self.ws.sock and self.ws.sock.connected:
logger.info("发送心跳指令")
# 实际应用中应该发送真实的心跳数据
self.ws.send('{"type": "heartbeat", "timestamp": %d}' % int(time.time()))
else:
logger.warning("WebSocket连接未建立,无法发送心跳")

def close_session(self):
"""关闭会话"""
logger.info(f"关闭会话 - 会话ID: {self.session_id}")
if self.ws:
self.ws.close()
# 在实际实现中,这里应该发送关闭会话的HTTP请求

def large_model_dialogue():
"""大模型对话处理"""
logger.info("开始大模型对话交互")
# 实际应用中应该实现对话逻辑
# 这里可以添加用户输入和发送消息的逻辑

def main():
# 第一步: 初始化API调用对象
api = VHApi(
"your appkey", # 你的appkey
"your accesstoken", # 你的accesstoken
"https://gw.tvs.qq.com", # HTTP域名
"wss://gw.tvs.qq.com" # WSS域名
)
# 第二步: 创建视频流会话
try:
create_session_rsp = api.create_session(
user_id="your user id",
virtualman_project_id="your virtualman project id",
protocol="webrtc", # 可选: webrtc、rtmp、trtc
driver_type=1 # 1: 支持文本驱动; 3: 支持音频和文本驱动
)
logger.info(f"获取流播放地址: {create_session_rsp['PlayStreamAddr']}")
# 等待用户确认
while True:
user_input = input("是否成功复制地址(输入'y'确定): ").strip().lower()
if user_input == 'y':
break
except Exception as e:
logger.error(f"创建会话错误: {e}")
return
# 第三步: 循环检测会话状态直到就绪
api.session_status = create_session_rsp["SessionStatus"]
while api.session_status != 1:
time.sleep(1)
try:
stat_session_rsp = api.stat_session()
api.session_status = stat_session_rsp["SessionStatus"]
logger.info(f"会话状态: {api.session_status} (等待就绪...)")
except Exception as e:
logger.error(f"查询会话状态错误: {e}")
return
# 第四步: 开启会话
try:
api.start_session()
except Exception as e:
logger.error(f"开启会话错误: {e}")
return
# 第五步: 创建长连接通道
try:
api.command_channel()
except Exception as e:
logger.error(f"创建长连接通道错误: {e}")
return
# 开启协程每3秒获取下行信息
def read_message_loop():
while True:
time.sleep(3)
print("\\n-----开始调用长连接下行信息-----")
api.read_wss_message()
threading.Thread(target=read_message_loop, daemon=True).start()
# 开启协程每分钟发送心跳指令
def heartbeat_loop():
while True:
time.sleep(60)
print("\\n-----开始调用长连接发送心跳指令-----")
api.send_heartbeat_command()
threading.Thread(target=heartbeat_loop, daemon=True).start()
# 第六步: 发送驱动指令
large_model_dialogue()
# 第七步: 90秒后关闭连接和会话
try:
time.sleep(90)
api.close_session()
logger.info("会话已关闭")
except Exception as e:
logger.error(f"关闭会话错误: {e}")

if __name__ == "__main__":
main()



数智人绑定对话服务实践教程

数智人绑定的对话服务目前主要有如下两种配置:
1. 云小微对话(客服机器人对话)
2. 腾讯行业大模型对话
说明:
请到 数智人的项目创建及管理 绑定数智人对话服务,只能 使用数智人平台项目创建会话 方式。
对话服务推荐采用 文本驱动指令(WebSocket 长连接) ,以下是这两种配置的代码示例:
Go
Python
// 客服机器人对话
func robotDialogue() {
fmt.Println("\\n-----云小微对话(客服机器人对话)-----")
err := api.SendTextDriveWssCommand(TextDriveWssCommand{
Data: &TextWssData{
Text: "微信简介", //根据客服机器人问答知识库回答
ChatCommand: "", //使用客服对话or大模型对话
},
})
if err != nil {
log.Fatalf("err:%v", err)
}
}

# 在VHApi类中添加发送文本驱动命令的方法
def send_text_drive_wss_command(self, text, chat_command=""):
"""发送文本驱动指令到WebSocket"""
if not self.ws or not self.ws.sock or not self.ws.sock.connected:
raise Exception("WebSocket连接未建立")
# 构造发送的数据结构
command = {
"Data": {
"Text": text,
"ChatCommand": chat_command
},
"Type": "text_drive" # 假设需要指定命令类型
}
try:
import json
self.ws.send(json.dumps(command))
logger.info(f"已发送文本驱动指令: {text}")
return True
except Exception as e:
logger.error(f"发送文本驱动指令失败: {e}")
raise


# 客服机器人对话函数
def robot_dialogue(api):
"""云小微对话(客服机器人对话)"""
print("\\n-----云小微对话(客服机器人对话)-----")
try:
# 发送文本驱动指令,查询"微信简介"
api.send_text_drive_wss_command(
text="微信简介", # 根据客服机器人问答知识库回答
chat_command="" # 使用客服对话or大模型对话
)
except Exception as e:
logger.error(f"错误: {e}")
sys.exit(1)


# 在主函数中调用时,需要修改原来的largeModelDialogue()调用
# 改为:robot_dialogue(api)

Go
Python
// 腾讯行业大模型对话
func largeModelDialogue() {
fmt.Println("\\n-----腾讯行业大模型对话-----")
err := api.SendTextDriveWssCommand(TextDriveWssCommand{
Data: &TextWssData{
Text: "你是谁", //可以在模型管理里面自定义配置问答
ChatCommand: "", //使用客服对话or大模型对话
ChatRoundId: strings.Replace(uuid.NewV4().String(), "-", "", -1)[:32], //使用腾讯行业大模型对话服务的数智人项目需要传入此字段,用于区分多轮对话
},
})
if err != nil {
log.Fatalf("err:%v", err)
}
}
# 扩展VHApi类的send_text_drive_wss_command方法以支持多轮对话ID
def send_text_drive_wss_command(self, text, chat_command="", chat_round_id=None):
"""发送文本驱动指令到WebSocket,支持多轮对话ID"""
if not self.ws or not self.ws.sock or not self.ws.sock.connected:
raise Exception("WebSocket连接未建立")
# 构造发送的数据结构
command_data = {
"Text": text,
"ChatCommand": chat_command
}
# 如果提供了多轮对话ID,则添加到数据中
if chat_round_id:
command_data["ChatRoundId"] = chat_round_id
command = {
"Data": command_data,
"Type": "text_drive"
}
try:
import json
self.ws.send(json.dumps(command))
logger.info(f"已发送大模型对话指令: {text} (对话ID: {chat_round_id})")
return True
except Exception as e:
logger.error(f"发送大模型对话指令失败: {e}")
raise


# 腾讯行业大模型对话函数
def large_model_dialogue(api):
"""腾讯行业大模型对话"""
print("\\n-----腾讯行业大模型对话-----")
try:
# 生成32位的ChatRoundId(UUID去除横杠后取前32位)
chat_round_id = str(uuid.uuid4()).replace("-", "")[:32]
# 发送文本驱动指令
api.send_text_drive_wss_command(
text="你是谁", # 可在模型管理中自定义配置问答
chat_command="", # 区分客服对话或大模型对话
chat_round_id=chat_round_id # 用于区分多轮对话
)
except Exception as e:
logger.error(f"错误: {e}")
sys.exit(1)


# 在主函数中调用方式:
# large_model_dialogue(api)


用户自行接入大模型实践教程

用户自行接入大模型: 指用户自行接入了 ChatGPT、腾讯混元等大模型,输出通常为流式文本。
该场景推荐使用 流式文本驱动指令 中的非子句模式(isSentence=false),非子句模式下服务端会对客户端发送的文本重新组句,能很好的配合大模型的流式输出,用户接入大模型的场景完整 Demo 参考 ASR/ChatGPT/数智人整合 Demo
以下是该场景对应的代码示例:
Go
Python
// 用户自行接入大模型来驱动数智人的场景
func SendNotSentenceStreamText() {
//模拟大模型的返回结果
rspText := "我是一个数智人,我在测试发送流式文本非子句模式。"
//模拟大模型生成流式文本
streamChan := streamResponse(rspText, time.Millisecond*500)
reqId := strings.Replace(uuid.NewV4().String(), "-", "", -1)[:32]
seq := 1
for char := range streamChan {
api.SendStreamTextDriveWssCommand(StreamTextDriveWssCommand{
ReqId: reqId,
Data: &StreamTextWssData{
Text: char,
Seq: seq,
},
})
seq++
}
// 发送final数据包
api.SendStreamTextDriveWssCommand(StreamTextDriveWssCommand{
ReqId: reqId,
Data: &StreamTextWssData{
Seq: 1,
IsFinal: true,
},
})
}

func streamResponse(text string, delay time.Duration) <-chan string {
ch := make(chan string)
go func() {
for _, char := range text {
//模拟大模型流式返回延时
time.Sleep(delay)
ch <- string(char)
}
close(ch)
}()
return ch
}
import time
import uuid
import json
import logging

# 扩展VHApi类,添加流式文本发送方法
class VHApi:
# ... 保留之前的其他方法 ...

def send_stream_text_drive_wss_command(self, req_id, text="", seq=1, is_final=False):
"""发送流式文本驱动指令"""
if not self.ws or not self.ws.sock or not self.ws.sock.connected:
raise Exception("WebSocket连接未建立")
# 构造流式文本命令结构
command = {
"ReqId": req_id,
"Data": {
"Text": text,
"Seq": seq,
"IsFinal": is_final
},
"Type": "stream_text_drive"
}
try:
self.ws.send(json.dumps(command))
if is_final:
logging.info(f"已发送最终数据包 - ReqId: {req_id}, Seq: {seq}")
else:
logging.info(f"已发送流式文本 - ReqId: {req_id}, Seq: {seq}, 内容: {text}")
return True
except Exception as e:
logging.error(f"发送流式文本失败: {e}")
raise


def stream_response(text, delay=0.5):
"""模拟大模型流式返回文本(替代Go的channel)"""
for char in text:
time.sleep(delay) # 模拟生成延迟
yield char # 用生成器替代Go的channel流式返回


def send_not_sentence_stream_text(api):
"""用户自行接入大模型驱动数智人(流式文本非子句模式)"""
# 模拟大模型返回结果
rsp_text = "我是一个数智人,我在测试发送流式文本非子句模式。"
# 模拟大模型生成流式文本(500毫秒间隔)
stream_gen = stream_response(rsp_text, delay=0.5)
# 生成32位请求ID
req_id = str(uuid.uuid4()).replace("-", "")[:32]
seq = 1
# 发送流式文本
for char in stream_gen:
api.send_stream_text_drive_wss_command(
req_id=req_id,
text=char,
seq=seq
)
seq += 1
# 发送最终数据包
api.send_stream_text_drive_wss_command(
req_id=req_id,
seq=seq, # 这里原Go代码用了1,实际应该用递增后的序号更合理,保留原逻辑可改为1
is_final=True
)


# 调用方式:在主函数中根据需要调用
# send_not_sentence_stream_text(api)


音频驱动场景实践教程

用户可以使用音频文件来驱动数智人,以下是该场景对应的代码示例:
Go
Python
func SendAudio() {
/*
音频格式:格式-PCM,采样率-16kHz,采样位深-16bits,声道-单声道。
1.如果是麦克风实时收音发送,每录制160ms(5120B)的数据即可发出,中间不需要间隔等待;如果是读取离线音频文件发送,片包大小为160ms(5120B),片包发送间隔为120ms。
2.最后一个片包大小按实际发送(需小于160ms)。
3.当数据包发送完毕后,必须再发送一个IsFinal=true的空数据包(Audio字段填空串)结束当次音频驱动使数字人回到静默状态。
4.发送音频实时率要介于[0.75,1],小于0.75会触发限速,大于1会导致画面卡顿。比如160ms音频片包大小,发送间隔不能低于120ms,不能高于160ms。
*/
chunkSize := 5120 //块大小-5120B
sleepBase := 120 //间隔ms
seq := 1
reqId := strings.Replace(uuid.NewV4().String(), "-", "", -1)[:32]
//读取音频二进制文件
voiceFile, err := os.Open("your_audio_file.pcm")
if err != nil {
log.Fatalf("err:%v", err)
}
defer voiceFile.Close()
buffer := make([]byte, chunkSize)
for {
n, err := voiceFile.Read(buffer)
if err != nil && err != io.EOF {
fmt.Println("Error reading file:", err)
break
}
if n == 0 { // 文件读取完毕
break
}
err = api.SendVoiceDriveCommand(VoiceDriveCommand{
ReqId: reqId,
Data: &VoiceData{
Audio: base64.StdEncoding.EncodeToString(buffer), //声音二进制数据编码Base64后的值。
Seq: seq,
IsFinal: false,
},
})
if err != nil {
log.Fatalf("err:%v", err)
}
seq++ // 递增序列号
// 等待120ms到160ms的随机时间
sleepDuration := time.Duration(sleepBase+rand.Intn(41)) * time.Millisecond
time.Sleep(sleepDuration)
}
// 发送final数据包
api.SendVoiceDriveCommand(VoiceDriveCommand{
ReqId: reqId,
Data: &VoiceData{
Seq: seq,
IsFinal: true,
},
})
}
import base64
import time
import random
import uuid
import logging
import sys

# 扩展VHApi类,添加音频驱动命令发送方法
class VHApi:
# ... 保留之前的其他方法 ...

def send_voice_drive_command(self, req_id, audio="", seq=1, is_final=False):
"""发送音频驱动指令"""
if not self.ws or not self.ws.sock or not self.ws.sock.connected:
raise Exception("WebSocket连接未建立")
# 构造音频命令结构
command = {
"ReqId": req_id,
"Data": {
"Audio": audio,
"Seq": seq,
"IsFinal": is_final
},
"Type": "voice_drive"
}
try:
import json
self.ws.send(json.dumps(command))
if is_final:
logging.info(f"已发送音频最终数据包 - ReqId: {req_id}, Seq: {seq}")
else:
logging.info(f"已发送音频分片 - ReqId: {req_id}, Seq: {seq}, 大小: {len(audio)}B")
return True
except Exception as e:
logging.error(f"发送音频分片失败: {e}")
raise


def send_audio(api):
"""发送音频文件到数智人系统
音频格式要求:PCM,采样率16kHz,采样位深16bits,单声道
"""
chunk_size = 5120 # 块大小-5120B(对应160ms音频)
sleep_base = 120 # 基础间隔时间(ms)
seq = 1
# 生成32位请求ID
req_id = str(uuid.uuid4()).replace("-", "")[:32]
try:
# 打开音频文件
with open("your_audio_file.pcm", "rb") as voice_file:
while True:
# 读取音频数据
buffer = voice_file.read(chunk_size)
if not buffer: # 文件读取完毕
break
# 对音频数据进行Base64编码
audio_base64 = base64.b64encode(buffer).decode('utf-8')
# 发送音频分片
api.send_voice_drive_command(
req_id=req_id,
audio=audio_base64,
seq=seq,
is_final=False
)
seq += 1 # 递增序列号
# 等待120ms到160ms的随机时间(保持实时率在0.75-1之间)
sleep_ms = sleep_base + random.randint(0, 40) # 0-40的随机数,总间隔120-160ms
time.sleep(sleep_ms / 1000.0) # 转换为秒
# 发送最终数据包(空音频,标记结束)
api.send_voice_drive_command(
req_id=req_id,
audio="", # 空字符串表示结束
seq=seq,
is_final=True
)
except FileNotFoundError:
logging.error("音频文件未找到: your_audio_file.pcm")
sys.exit(1)
except Exception as e:
logging.error(f"发送音频时出错: {e}")
sys.exit(1)


# 调用方式:在主函数中根据需要调用
# send_audio(api)Python


最佳实践 Demo

Demo 内容
语言
最新版本文件名
Demo 下载链接
ASR/ChatGPT/数智人整合 Demo
Python
asr_chatgpt_vh_demo_20240812.zip
注意:
Demo 运行方式,请仔细阅读 Demo 包中的 README.md。

本页内容是否解决了您的问题?
您也可以 联系销售 提交工单 以寻求帮助。

文档反馈