tencent cloud

腾讯云智能数智人

产品简介
产品概述
产品功能
产品优势
购买指南
价格指南
购买指引
使用代金券购买流程
退费说明
数智人形象介绍
形象类别介绍
基础形象库
形象和声音复刻指引
数智人平台操作指引
访问平台
形象生产与资产管理
数智人会话互动应用与管理
播报数智人视频生成与管理
运营管理与分析
服务端 API 接入
数智人 API 接入模式概述
数智人 aPaas 接口调用方式
2D真人小样本形象、声音定制 API 文档
视频生成服务 API 文档
交互数智人服务 API 文档
个人资产管理 API 文档
客户端 SDK 接入
整体介绍
3D 端渲染 SDK 接入
2D 端渲染 SDK 接入
数智人 SSML 标记语言规范
相关协议
隐私协议
DSA(Data Sharing Agreement)
常见问题

最佳实践 demo

PDF
聚焦模式
字号
最后更新时间: 2025-10-20 10:54:02

ASR/ChatGPT/数智人整合 Demo

本 Demo 展示了如何通过 ASR/ChatGPT,与数智人进行交互。

alt text



交互数智人实践教程概述

交互数智人 aPaaS 接口的调用建议参考以下流程:
1. 初始化 API 调用对象:设置 API 调用的基础信息,包括 AppKey、Access Token 以及 HTTP 和 WebSocket 的域名。
2. 创建视频流:使用形象资产 ID 或数智人平台项目 ID 创建视频流,指定视频流协议和驱动类型。
3. 检测会话状态:循环检查会话状态,直到会话状态变为已就绪(SessionStatus=1)。
4. 开启会话:调用 API 以开启会话。
5. 创建长链接通道:建立长连接通道,以便进行持续的通信。启动两个线程,一个用于定期获取下行信息,另一个用于定期发送心跳指令,以保持连接活跃。
6. 发送驱动指令:根据不同的场景(参见本文档:1.数智人绑定对话服务场景、2.用户自行接入大模型场景、3.音频驱动场景),发送相应的驱动指令,详细的适用场景与驱动指令的关系参考驱动指令说明
7. 关闭长连接通道和会话:根据不同场景的情况关闭长连接通道和会话。
其中步骤需要根据不同的场景来选择不同的驱动指令,其他步骤的逻辑基本一致。以下是该流程的代码 Demo:
Go
Python
package main

import (
"encoding/base64"
"fmt"
"io"
"log"
"os"
"strings"
"time"

uuid "github.com/satori/go.uuid"
"k8s.io/apimachinery/pkg/util/rand"
)

func main() {
//第一步:初始化api调用对象
var api = NewVHApi(
"your appkey", //your appkey
"your accesstoken", //your accesstoken
"https://gw.tvs.qq.com", //http domain
"wss://gw.tvs.qq.com") //wss domain
//第二步:使用形象资产ID或数智人项目ID创建视频流(使用形象资产ID可以指定音色相关参数,对话场景这些也没有明确;使用数智人项目ID里面包含了音色信息以及对话服务配置)
//这里使用数智人项目ID创建视频流,以webrtc协议为例
createSessionRsp, err := api.CreateSession(CreateSessionReq{
UserId: "your user id",
VirtualmanProjectId: "your virtualman project id",
Protocol: "webrtc", //可选:webrtc、rtmp、trtc
DriverType: 1, //drivetype=1代表支持文本驱动指令;drivetype=3代表支持音频和文本驱动指令
})
if err != nil {
log.Fatalf("err:%v", err)
}
fmt.Println("获取流播放地址", createSessionRsp.PlayStreamAddr)
for {
var userInput string
fmt.Print("是否成功复制地址(输入'y'确定): ")
fmt.Scanln(&userInput)
if strings.ToLower(userInput) == "y" {
break
}
}
//第三步:循环检测会话状态直到SessionStatus=1(已就绪)
sessionStatus := createSessionRsp.SessionStatus
for sessionStatus != 1 {
time.Sleep(time.Second * 1)
statSessionRsp, err := api.StatSession()
if err != nil {
log.Fatalf("err:%v", err)
}
sessionStatus = statSessionRsp.SessionStatus
}
//第四步:开启会话
_, err = api.StartSession()
if err != nil {
log.Fatalf("err:%v", err)
}

//第五步:创建长链接通道
err = api.CommandChannel()
if err != nil {
log.Fatalf("err:%v", err)
}
// 开启协程每过3s获取下行信息
go func() {
for {
time.Sleep(time.Second * 3)
fmt.Println("\\n-----开始调用长连接下行信息-----")
api.ReadWssMessage()
}
}()
//开启协程每过1分钟发送心跳指令
go func() {
for {
time.Sleep(time.Minute * 1)
fmt.Println("\\n-----开始调用长连接发送心跳指令-----")
api.SendHeartBeatCommand()
}
}()

//第六步:发送驱动指令
//根据不同的场景选择不同的函数发送驱动指令,如大模型对话,调用以下函数(更多场景请见下文)
largeModelDialogue()
//第七步:关闭长连接通道和会话
//90s后关闭长连接通道(根据自己的场景来选择断连时间)
time.Sleep(time.Second * 90)
api.CloseSession()
}
import base64
import time
import uuid
import random
import logging
import sys
import threading
from websocket import WebSocketApp # 需要安装websockets库: pip install websocket-client

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class VHApi:
def __init__(self, appkey, accesstoken, http_domain, wss_domain):
self.appkey = appkey
self.accesstoken = accesstoken
self.http_domain = http_domain
self.wss_domain = wss_domain
self.session_id = None
self.ws = None
self.session_status = 0 # 0: 未就绪, 1: 已就绪

def create_session(self, user_id, virtualman_project_id, protocol="rtmp", driver_type=1):
"""创建视频流会话"""
# 在实际实现中,这里应该发送HTTP请求到API
logger.info(f"创建会话 - 用户ID: {user_id}, 项目ID: {virtualman_project_id}")
# 模拟API响应
self.session_id = str(uuid.uuid4())
play_stream_addr = f"{self.http_domain}/stream/{self.session_id}"
return {
"PlayStreamAddr": play_stream_addr,
"SessionStatus": 0
}

def stat_session(self):
"""查询会话状态"""
# 在实际实现中,这里应该发送HTTP请求到API
logger.info(f"查询会话状态 - 会话ID: {self.session_id}")
# 模拟状态更新
# 实际应用中应该从API响应中获取真实状态
return {"SessionStatus": self.session_status}

def start_session(self):
"""开启会话"""
# 在实际实现中,这里应该发送HTTP请求到API
logger.info(f"开启会话 - 会话ID: {self.session_id}")
return True

def command_channel(self):
"""创建长连接通道"""
logger.info(f"创建长连接通道 - {self.wss_domain}/ws/{self.session_id}")
# 实际应用中应该实现WebSocket连接
self.ws = WebSocketApp(
f"{self.wss_domain}/ws/{self.session_id}",
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close
)
# 启动WebSocket线程
threading.Thread(target=self.ws.run_forever, daemon=True).start()
time.sleep(1) # 等待连接建立
return None

def on_message(self, ws, message):
"""处理收到的消息"""
logger.info(f"收到消息: {message}")

def on_error(self, ws, error):
"""处理错误"""
logger.error(f"WebSocket错误: {error}")

def on_close(self, ws, close_status_code, close_msg):
"""处理连接关闭"""
logger.info(f"WebSocket连接关闭: {close_status_code} {close_msg}")

def read_wss_message(self):
"""读取下行信息"""
# 在实际实现中,应该从WebSocket连接读取消息
logger.info("读取下行信息...")
# 这里可以实现消息读取逻辑

def send_heartbeat_command(self):
"""发送心跳指令"""
if self.ws and self.ws.sock and self.ws.sock.connected:
logger.info("发送心跳指令")
# 实际应用中应该发送真实的心跳数据
self.ws.send('{"type": "heartbeat", "timestamp": %d}' % int(time.time()))
else:
logger.warning("WebSocket连接未建立,无法发送心跳")

def close_session(self):
"""关闭会话"""
logger.info(f"关闭会话 - 会话ID: {self.session_id}")
if self.ws:
self.ws.close()
# 在实际实现中,这里应该发送关闭会话的HTTP请求

def large_model_dialogue():
"""大模型对话处理"""
logger.info("开始大模型对话交互")
# 实际应用中应该实现对话逻辑
# 这里可以添加用户输入和发送消息的逻辑

def main():
# 第一步: 初始化API调用对象
api = VHApi(
"your appkey", # 你的appkey
"your accesstoken", # 你的accesstoken
"https://gw.tvs.qq.com", # HTTP域名
"wss://gw.tvs.qq.com" # WSS域名
)
# 第二步: 创建视频流会话
try:
create_session_rsp = api.create_session(
user_id="your user id",
virtualman_project_id="your virtualman project id",
protocol="webrtc", # 可选: webrtc、rtmp、trtc
driver_type=1 # 1: 支持文本驱动; 3: 支持音频和文本驱动
)
logger.info(f"获取流播放地址: {create_session_rsp['PlayStreamAddr']}")
# 等待用户确认
while True:
user_input = input("是否成功复制地址(输入'y'确定): ").strip().lower()
if user_input == 'y':
break
except Exception as e:
logger.error(f"创建会话错误: {e}")
return
# 第三步: 循环检测会话状态直到就绪
api.session_status = create_session_rsp["SessionStatus"]
while api.session_status != 1:
time.sleep(1)
try:
stat_session_rsp = api.stat_session()
api.session_status = stat_session_rsp["SessionStatus"]
logger.info(f"会话状态: {api.session_status} (等待就绪...)")
except Exception as e:
logger.error(f"查询会话状态错误: {e}")
return
# 第四步: 开启会话
try:
api.start_session()
except Exception as e:
logger.error(f"开启会话错误: {e}")
return
# 第五步: 创建长连接通道
try:
api.command_channel()
except Exception as e:
logger.error(f"创建长连接通道错误: {e}")
return
# 开启协程每3秒获取下行信息
def read_message_loop():
while True:
time.sleep(3)
print("\\n-----开始调用长连接下行信息-----")
api.read_wss_message()
threading.Thread(target=read_message_loop, daemon=True).start()
# 开启协程每分钟发送心跳指令
def heartbeat_loop():
while True:
time.sleep(60)
print("\\n-----开始调用长连接发送心跳指令-----")
api.send_heartbeat_command()
threading.Thread(target=heartbeat_loop, daemon=True).start()
# 第六步: 发送驱动指令
large_model_dialogue()
# 第七步: 90秒后关闭连接和会话
try:
time.sleep(90)
api.close_session()
logger.info("会话已关闭")
except Exception as e:
logger.error(f"关闭会话错误: {e}")

if __name__ == "__main__":
main()



数智人绑定对话服务实践教程

数智人绑定的对话服务目前主要有如下两种配置:
1. 云小微对话(客服机器人对话)
2. 腾讯行业大模型对话
说明:
请到 数智人的项目创建及管理 绑定数智人对话服务,只能 使用数智人平台项目创建会话 方式。
对话服务推荐采用 文本驱动指令(WebSocket 长连接) ,以下是这两种配置的代码示例:
Go
Python
// 客服机器人对话
func robotDialogue() {
fmt.Println("\\n-----云小微对话(客服机器人对话)-----")
err := api.SendTextDriveWssCommand(TextDriveWssCommand{
Data: &TextWssData{
Text: "微信简介", //根据客服机器人问答知识库回答
ChatCommand: "", //使用客服对话or大模型对话
},
})
if err != nil {
log.Fatalf("err:%v", err)
}
}

# 在VHApi类中添加发送文本驱动命令的方法
def send_text_drive_wss_command(self, text, chat_command=""):
"""发送文本驱动指令到WebSocket"""
if not self.ws or not self.ws.sock or not self.ws.sock.connected:
raise Exception("WebSocket连接未建立")
# 构造发送的数据结构
command = {
"Data": {
"Text": text,
"ChatCommand": chat_command
},
"Type": "text_drive" # 假设需要指定命令类型
}
try:
import json
self.ws.send(json.dumps(command))
logger.info(f"已发送文本驱动指令: {text}")
return True
except Exception as e:
logger.error(f"发送文本驱动指令失败: {e}")
raise


# 客服机器人对话函数
def robot_dialogue(api):
"""云小微对话(客服机器人对话)"""
print("\\n-----云小微对话(客服机器人对话)-----")
try:
# 发送文本驱动指令,查询"微信简介"
api.send_text_drive_wss_command(
text="微信简介", # 根据客服机器人问答知识库回答
chat_command="" # 使用客服对话or大模型对话
)
except Exception as e:
logger.error(f"错误: {e}")
sys.exit(1)


# 在主函数中调用时,需要修改原来的largeModelDialogue()调用
# 改为:robot_dialogue(api)

Go
Python
// 腾讯行业大模型对话
func largeModelDialogue() {
fmt.Println("\\n-----腾讯行业大模型对话-----")
err := api.SendTextDriveWssCommand(TextDriveWssCommand{
Data: &TextWssData{
Text: "你是谁", //可以在模型管理里面自定义配置问答
ChatCommand: "", //使用客服对话or大模型对话
ChatRoundId: strings.Replace(uuid.NewV4().String(), "-", "", -1)[:32], //使用腾讯行业大模型对话服务的数智人项目需要传入此字段,用于区分多轮对话
},
})
if err != nil {
log.Fatalf("err:%v", err)
}
}
# 扩展VHApi类的send_text_drive_wss_command方法以支持多轮对话ID
def send_text_drive_wss_command(self, text, chat_command="", chat_round_id=None):
"""发送文本驱动指令到WebSocket,支持多轮对话ID"""
if not self.ws or not self.ws.sock or not self.ws.sock.connected:
raise Exception("WebSocket连接未建立")
# 构造发送的数据结构
command_data = {
"Text": text,
"ChatCommand": chat_command
}
# 如果提供了多轮对话ID,则添加到数据中
if chat_round_id:
command_data["ChatRoundId"] = chat_round_id
command = {
"Data": command_data,
"Type": "text_drive"
}
try:
import json
self.ws.send(json.dumps(command))
logger.info(f"已发送大模型对话指令: {text} (对话ID: {chat_round_id})")
return True
except Exception as e:
logger.error(f"发送大模型对话指令失败: {e}")
raise


# 腾讯行业大模型对话函数
def large_model_dialogue(api):
"""腾讯行业大模型对话"""
print("\\n-----腾讯行业大模型对话-----")
try:
# 生成32位的ChatRoundId(UUID去除横杠后取前32位)
chat_round_id = str(uuid.uuid4()).replace("-", "")[:32]
# 发送文本驱动指令
api.send_text_drive_wss_command(
text="你是谁", # 可在模型管理中自定义配置问答
chat_command="", # 区分客服对话或大模型对话
chat_round_id=chat_round_id # 用于区分多轮对话
)
except Exception as e:
logger.error(f"错误: {e}")
sys.exit(1)


# 在主函数中调用方式:
# large_model_dialogue(api)


用户自行接入大模型实践教程

用户自行接入大模型: 指用户自行接入了 ChatGPT、腾讯混元等大模型,输出通常为流式文本。
该场景推荐使用 流式文本驱动指令 中的非子句模式(isSentence=false),非子句模式下服务端会对客户端发送的文本重新组句,能很好的配合大模型的流式输出,用户接入大模型的场景完整 Demo 参考 ASR/ChatGPT/数智人整合 Demo
以下是该场景对应的代码示例:
Go
Python
// 用户自行接入大模型来驱动数智人的场景
func SendNotSentenceStreamText() {
//模拟大模型的返回结果
rspText := "我是一个数智人,我在测试发送流式文本非子句模式。"
//模拟大模型生成流式文本
streamChan := streamResponse(rspText, time.Millisecond*500)
reqId := strings.Replace(uuid.NewV4().String(), "-", "", -1)[:32]
seq := 1
for char := range streamChan {
api.SendStreamTextDriveWssCommand(StreamTextDriveWssCommand{
ReqId: reqId,
Data: &StreamTextWssData{
Text: char,
Seq: seq,
},
})
seq++
}
// 发送final数据包
api.SendStreamTextDriveWssCommand(StreamTextDriveWssCommand{
ReqId: reqId,
Data: &StreamTextWssData{
Seq: 1,
IsFinal: true,
},
})
}

func streamResponse(text string, delay time.Duration) <-chan string {
ch := make(chan string)
go func() {
for _, char := range text {
//模拟大模型流式返回延时
time.Sleep(delay)
ch <- string(char)
}
close(ch)
}()
return ch
}
import time
import uuid
import json
import logging

# 扩展VHApi类,添加流式文本发送方法
class VHApi:
# ... 保留之前的其他方法 ...

def send_stream_text_drive_wss_command(self, req_id, text="", seq=1, is_final=False):
"""发送流式文本驱动指令"""
if not self.ws or not self.ws.sock or not self.ws.sock.connected:
raise Exception("WebSocket连接未建立")
# 构造流式文本命令结构
command = {
"ReqId": req_id,
"Data": {
"Text": text,
"Seq": seq,
"IsFinal": is_final
},
"Type": "stream_text_drive"
}
try:
self.ws.send(json.dumps(command))
if is_final:
logging.info(f"已发送最终数据包 - ReqId: {req_id}, Seq: {seq}")
else:
logging.info(f"已发送流式文本 - ReqId: {req_id}, Seq: {seq}, 内容: {text}")
return True
except Exception as e:
logging.error(f"发送流式文本失败: {e}")
raise


def stream_response(text, delay=0.5):
"""模拟大模型流式返回文本(替代Go的channel)"""
for char in text:
time.sleep(delay) # 模拟生成延迟
yield char # 用生成器替代Go的channel流式返回


def send_not_sentence_stream_text(api):
"""用户自行接入大模型驱动数智人(流式文本非子句模式)"""
# 模拟大模型返回结果
rsp_text = "我是一个数智人,我在测试发送流式文本非子句模式。"
# 模拟大模型生成流式文本(500毫秒间隔)
stream_gen = stream_response(rsp_text, delay=0.5)
# 生成32位请求ID
req_id = str(uuid.uuid4()).replace("-", "")[:32]
seq = 1
# 发送流式文本
for char in stream_gen:
api.send_stream_text_drive_wss_command(
req_id=req_id,
text=char,
seq=seq
)
seq += 1
# 发送最终数据包
api.send_stream_text_drive_wss_command(
req_id=req_id,
seq=seq, # 这里原Go代码用了1,实际应该用递增后的序号更合理,保留原逻辑可改为1
is_final=True
)


# 调用方式:在主函数中根据需要调用
# send_not_sentence_stream_text(api)


音频驱动场景实践教程

用户可以使用音频文件来驱动数智人,以下是该场景对应的代码示例:
Go
Python
func SendAudio() {
/*
音频格式:格式-PCM,采样率-16kHz,采样位深-16bits,声道-单声道。
1.如果是麦克风实时收音发送,每录制160ms(5120B)的数据即可发出,中间不需要间隔等待;如果是读取离线音频文件发送,片包大小为160ms(5120B),片包发送间隔为120ms。
2.最后一个片包大小按实际发送(需小于160ms)。
3.当数据包发送完毕后,必须再发送一个IsFinal=true的空数据包(Audio字段填空串)结束当次音频驱动使数字人回到静默状态。
4.发送音频实时率要介于[0.75,1],小于0.75会触发限速,大于1会导致画面卡顿。比如160ms音频片包大小,发送间隔不能低于120ms,不能高于160ms。
*/
chunkSize := 5120 //块大小-5120B
sleepBase := 120 //间隔ms
seq := 1
reqId := strings.Replace(uuid.NewV4().String(), "-", "", -1)[:32]
//读取音频二进制文件
voiceFile, err := os.Open("your_audio_file.pcm")
if err != nil {
log.Fatalf("err:%v", err)
}
defer voiceFile.Close()
buffer := make([]byte, chunkSize)
for {
n, err := voiceFile.Read(buffer)
if err != nil && err != io.EOF {
fmt.Println("Error reading file:", err)
break
}
if n == 0 { // 文件读取完毕
break
}
err = api.SendVoiceDriveCommand(VoiceDriveCommand{
ReqId: reqId,
Data: &VoiceData{
Audio: base64.StdEncoding.EncodeToString(buffer), //声音二进制数据编码Base64后的值。
Seq: seq,
IsFinal: false,
},
})
if err != nil {
log.Fatalf("err:%v", err)
}
seq++ // 递增序列号
// 等待120ms到160ms的随机时间
sleepDuration := time.Duration(sleepBase+rand.Intn(41)) * time.Millisecond
time.Sleep(sleepDuration)
}
// 发送final数据包
api.SendVoiceDriveCommand(VoiceDriveCommand{
ReqId: reqId,
Data: &VoiceData{
Seq: seq,
IsFinal: true,
},
})
}
import base64
import time
import random
import uuid
import logging
import sys

# 扩展VHApi类,添加音频驱动命令发送方法
class VHApi:
# ... 保留之前的其他方法 ...

def send_voice_drive_command(self, req_id, audio="", seq=1, is_final=False):
"""发送音频驱动指令"""
if not self.ws or not self.ws.sock or not self.ws.sock.connected:
raise Exception("WebSocket连接未建立")
# 构造音频命令结构
command = {
"ReqId": req_id,
"Data": {
"Audio": audio,
"Seq": seq,
"IsFinal": is_final
},
"Type": "voice_drive"
}
try:
import json
self.ws.send(json.dumps(command))
if is_final:
logging.info(f"已发送音频最终数据包 - ReqId: {req_id}, Seq: {seq}")
else:
logging.info(f"已发送音频分片 - ReqId: {req_id}, Seq: {seq}, 大小: {len(audio)}B")
return True
except Exception as e:
logging.error(f"发送音频分片失败: {e}")
raise


def send_audio(api):
"""发送音频文件到数智人系统
音频格式要求:PCM,采样率16kHz,采样位深16bits,单声道
"""
chunk_size = 5120 # 块大小-5120B(对应160ms音频)
sleep_base = 120 # 基础间隔时间(ms)
seq = 1
# 生成32位请求ID
req_id = str(uuid.uuid4()).replace("-", "")[:32]
try:
# 打开音频文件
with open("your_audio_file.pcm", "rb") as voice_file:
while True:
# 读取音频数据
buffer = voice_file.read(chunk_size)
if not buffer: # 文件读取完毕
break
# 对音频数据进行Base64编码
audio_base64 = base64.b64encode(buffer).decode('utf-8')
# 发送音频分片
api.send_voice_drive_command(
req_id=req_id,
audio=audio_base64,
seq=seq,
is_final=False
)
seq += 1 # 递增序列号
# 等待120ms到160ms的随机时间(保持实时率在0.75-1之间)
sleep_ms = sleep_base + random.randint(0, 40) # 0-40的随机数,总间隔120-160ms
time.sleep(sleep_ms / 1000.0) # 转换为秒
# 发送最终数据包(空音频,标记结束)
api.send_voice_drive_command(
req_id=req_id,
audio="", # 空字符串表示结束
seq=seq,
is_final=True
)
except FileNotFoundError:
logging.error("音频文件未找到: your_audio_file.pcm")
sys.exit(1)
except Exception as e:
logging.error(f"发送音频时出错: {e}")
sys.exit(1)


# 调用方式:在主函数中根据需要调用
# send_audio(api)Python


最佳实践 Demo

Demo 内容
语言
最新版本文件名
Demo 下载链接
ASR/ChatGPT/数智人整合 Demo
Python
asr_chatgpt_vh_demo_20240812.zip
注意:
Demo 运行方式,请仔细阅读 Demo 包中的 README.md。


帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈