
package mainimport ("encoding/base64""fmt""io""log""os""strings""time"uuid "github.com/satori/go.uuid""k8s.io/apimachinery/pkg/util/rand")func main() {//第一步:初始化api调用对象var api = NewVHApi("your appkey", //your appkey"your accesstoken", //your accesstoken"https://gw.tvs.qq.com", //http domain"wss://gw.tvs.qq.com") //wss domain//第二步:使用形象资产ID或数智人项目ID创建视频流(使用形象资产ID可以指定音色相关参数,对话场景这些也没有明确;使用数智人项目ID里面包含了音色信息以及对话服务配置)//这里使用数智人项目ID创建视频流,以webrtc协议为例createSessionRsp, err := api.CreateSession(CreateSessionReq{UserId: "your user id",VirtualmanProjectId: "your virtualman project id",Protocol: "webrtc", //可选:webrtc、rtmp、trtcDriverType: 1, //drivetype=1代表支持文本驱动指令;drivetype=3代表支持音频和文本驱动指令})if err != nil {log.Fatalf("err:%v", err)}fmt.Println("获取流播放地址", createSessionRsp.PlayStreamAddr)for {var userInput stringfmt.Print("是否成功复制地址(输入'y'确定): ")fmt.Scanln(&userInput)if strings.ToLower(userInput) == "y" {break}}//第三步:循环检测会话状态直到SessionStatus=1(已就绪)sessionStatus := createSessionRsp.SessionStatusfor sessionStatus != 1 {time.Sleep(time.Second * 1)statSessionRsp, err := api.StatSession()if err != nil {log.Fatalf("err:%v", err)}sessionStatus = statSessionRsp.SessionStatus}//第四步:开启会话_, err = api.StartSession()if err != nil {log.Fatalf("err:%v", err)}//第五步:创建长链接通道err = api.CommandChannel()if err != nil {log.Fatalf("err:%v", err)}// 开启协程每过3s获取下行信息go func() {for {time.Sleep(time.Second * 3)fmt.Println("\\n-----开始调用长连接下行信息-----")api.ReadWssMessage()}}()//开启协程每过1分钟发送心跳指令go func() {for {time.Sleep(time.Minute * 1)fmt.Println("\\n-----开始调用长连接发送心跳指令-----")api.SendHeartBeatCommand()}}()//第六步:发送驱动指令//根据不同的场景选择不同的函数发送驱动指令,如大模型对话,调用以下函数(更多场景请见下文)largeModelDialogue()//第七步:关闭长连接通道和会话//90s后关闭长连接通道(根据自己的场景来选择断连时间)time.Sleep(time.Second * 90)api.CloseSession()}
import base64import timeimport uuidimport randomimport loggingimport sysimport threadingfrom websocket import WebSocketApp # 需要安装websockets库: pip install websocket-client# 配置日志logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__)class VHApi:def __init__(self, appkey, accesstoken, http_domain, wss_domain):self.appkey = appkeyself.accesstoken = accesstokenself.http_domain = http_domainself.wss_domain = wss_domainself.session_id = Noneself.ws = Noneself.session_status = 0 # 0: 未就绪, 1: 已就绪def create_session(self, user_id, virtualman_project_id, protocol="rtmp", driver_type=1):"""创建视频流会话"""# 在实际实现中,这里应该发送HTTP请求到APIlogger.info(f"创建会话 - 用户ID: {user_id}, 项目ID: {virtualman_project_id}")# 模拟API响应self.session_id = str(uuid.uuid4())play_stream_addr = f"{self.http_domain}/stream/{self.session_id}"return {"PlayStreamAddr": play_stream_addr,"SessionStatus": 0}def stat_session(self):"""查询会话状态"""# 在实际实现中,这里应该发送HTTP请求到APIlogger.info(f"查询会话状态 - 会话ID: {self.session_id}")# 模拟状态更新# 实际应用中应该从API响应中获取真实状态return {"SessionStatus": self.session_status}def start_session(self):"""开启会话"""# 在实际实现中,这里应该发送HTTP请求到APIlogger.info(f"开启会话 - 会话ID: {self.session_id}")return Truedef command_channel(self):"""创建长连接通道"""logger.info(f"创建长连接通道 - {self.wss_domain}/ws/{self.session_id}")# 实际应用中应该实现WebSocket连接self.ws = WebSocketApp(f"{self.wss_domain}/ws/{self.session_id}",on_message=self.on_message,on_error=self.on_error,on_close=self.on_close)# 启动WebSocket线程threading.Thread(target=self.ws.run_forever, daemon=True).start()time.sleep(1) # 等待连接建立return Nonedef on_message(self, ws, message):"""处理收到的消息"""logger.info(f"收到消息: {message}")def on_error(self, ws, error):"""处理错误"""logger.error(f"WebSocket错误: {error}")def on_close(self, ws, close_status_code, close_msg):"""处理连接关闭"""logger.info(f"WebSocket连接关闭: {close_status_code} {close_msg}")def read_wss_message(self):"""读取下行信息"""# 在实际实现中,应该从WebSocket连接读取消息logger.info("读取下行信息...")# 这里可以实现消息读取逻辑def send_heartbeat_command(self):"""发送心跳指令"""if self.ws and self.ws.sock and self.ws.sock.connected:logger.info("发送心跳指令")# 实际应用中应该发送真实的心跳数据self.ws.send('{"type": "heartbeat", "timestamp": %d}' % int(time.time()))else:logger.warning("WebSocket连接未建立,无法发送心跳")def close_session(self):"""关闭会话"""logger.info(f"关闭会话 - 会话ID: {self.session_id}")if self.ws:self.ws.close()# 在实际实现中,这里应该发送关闭会话的HTTP请求def large_model_dialogue():"""大模型对话处理"""logger.info("开始大模型对话交互")# 实际应用中应该实现对话逻辑# 这里可以添加用户输入和发送消息的逻辑def main():# 第一步: 初始化API调用对象api = VHApi("your appkey", # 你的appkey"your accesstoken", # 你的accesstoken"https://gw.tvs.qq.com", # HTTP域名"wss://gw.tvs.qq.com" # WSS域名)# 第二步: 创建视频流会话try:create_session_rsp = api.create_session(user_id="your user id",virtualman_project_id="your virtualman project id",protocol="webrtc", # 可选: webrtc、rtmp、trtcdriver_type=1 # 1: 支持文本驱动; 3: 支持音频和文本驱动)logger.info(f"获取流播放地址: {create_session_rsp['PlayStreamAddr']}")# 等待用户确认while True:user_input = input("是否成功复制地址(输入'y'确定): ").strip().lower()if user_input == 'y':breakexcept Exception as e:logger.error(f"创建会话错误: {e}")return# 第三步: 循环检测会话状态直到就绪api.session_status = create_session_rsp["SessionStatus"]while api.session_status != 1:time.sleep(1)try:stat_session_rsp = api.stat_session()api.session_status = stat_session_rsp["SessionStatus"]logger.info(f"会话状态: {api.session_status} (等待就绪...)")except Exception as e:logger.error(f"查询会话状态错误: {e}")return# 第四步: 开启会话try:api.start_session()except Exception as e:logger.error(f"开启会话错误: {e}")return# 第五步: 创建长连接通道try:api.command_channel()except Exception as e:logger.error(f"创建长连接通道错误: {e}")return# 开启协程每3秒获取下行信息def read_message_loop():while True:time.sleep(3)print("\\n-----开始调用长连接下行信息-----")api.read_wss_message()threading.Thread(target=read_message_loop, daemon=True).start()# 开启协程每分钟发送心跳指令def heartbeat_loop():while True:time.sleep(60)print("\\n-----开始调用长连接发送心跳指令-----")api.send_heartbeat_command()threading.Thread(target=heartbeat_loop, daemon=True).start()# 第六步: 发送驱动指令large_model_dialogue()# 第七步: 90秒后关闭连接和会话try:time.sleep(90)api.close_session()logger.info("会话已关闭")except Exception as e:logger.error(f"关闭会话错误: {e}")if __name__ == "__main__":main()
// 客服机器人对话func robotDialogue() {fmt.Println("\\n-----云小微对话(客服机器人对话)-----")err := api.SendTextDriveWssCommand(TextDriveWssCommand{Data: &TextWssData{Text: "微信简介", //根据客服机器人问答知识库回答ChatCommand: "", //使用客服对话or大模型对话},})if err != nil {log.Fatalf("err:%v", err)}}
# 在VHApi类中添加发送文本驱动命令的方法def send_text_drive_wss_command(self, text, chat_command=""):"""发送文本驱动指令到WebSocket"""if not self.ws or not self.ws.sock or not self.ws.sock.connected:raise Exception("WebSocket连接未建立")# 构造发送的数据结构command = {"Data": {"Text": text,"ChatCommand": chat_command},"Type": "text_drive" # 假设需要指定命令类型}try:import jsonself.ws.send(json.dumps(command))logger.info(f"已发送文本驱动指令: {text}")return Trueexcept Exception as e:logger.error(f"发送文本驱动指令失败: {e}")raise# 客服机器人对话函数def robot_dialogue(api):"""云小微对话(客服机器人对话)"""print("\\n-----云小微对话(客服机器人对话)-----")try:# 发送文本驱动指令,查询"微信简介"api.send_text_drive_wss_command(text="微信简介", # 根据客服机器人问答知识库回答chat_command="" # 使用客服对话or大模型对话)except Exception as e:logger.error(f"错误: {e}")sys.exit(1)# 在主函数中调用时,需要修改原来的largeModelDialogue()调用# 改为:robot_dialogue(api)
// 腾讯行业大模型对话func largeModelDialogue() {fmt.Println("\\n-----腾讯行业大模型对话-----")err := api.SendTextDriveWssCommand(TextDriveWssCommand{Data: &TextWssData{Text: "你是谁", //可以在模型管理里面自定义配置问答ChatCommand: "", //使用客服对话or大模型对话ChatRoundId: strings.Replace(uuid.NewV4().String(), "-", "", -1)[:32], //使用腾讯行业大模型对话服务的数智人项目需要传入此字段,用于区分多轮对话},})if err != nil {log.Fatalf("err:%v", err)}}
# 扩展VHApi类的send_text_drive_wss_command方法以支持多轮对话IDdef send_text_drive_wss_command(self, text, chat_command="", chat_round_id=None):"""发送文本驱动指令到WebSocket,支持多轮对话ID"""if not self.ws or not self.ws.sock or not self.ws.sock.connected:raise Exception("WebSocket连接未建立")# 构造发送的数据结构command_data = {"Text": text,"ChatCommand": chat_command}# 如果提供了多轮对话ID,则添加到数据中if chat_round_id:command_data["ChatRoundId"] = chat_round_idcommand = {"Data": command_data,"Type": "text_drive"}try:import jsonself.ws.send(json.dumps(command))logger.info(f"已发送大模型对话指令: {text} (对话ID: {chat_round_id})")return Trueexcept Exception as e:logger.error(f"发送大模型对话指令失败: {e}")raise# 腾讯行业大模型对话函数def large_model_dialogue(api):"""腾讯行业大模型对话"""print("\\n-----腾讯行业大模型对话-----")try:# 生成32位的ChatRoundId(UUID去除横杠后取前32位)chat_round_id = str(uuid.uuid4()).replace("-", "")[:32]# 发送文本驱动指令api.send_text_drive_wss_command(text="你是谁", # 可在模型管理中自定义配置问答chat_command="", # 区分客服对话或大模型对话chat_round_id=chat_round_id # 用于区分多轮对话)except Exception as e:logger.error(f"错误: {e}")sys.exit(1)# 在主函数中调用方式:# large_model_dialogue(api)
// 用户自行接入大模型来驱动数智人的场景func SendNotSentenceStreamText() {//模拟大模型的返回结果rspText := "我是一个数智人,我在测试发送流式文本非子句模式。"//模拟大模型生成流式文本streamChan := streamResponse(rspText, time.Millisecond*500)reqId := strings.Replace(uuid.NewV4().String(), "-", "", -1)[:32]seq := 1for char := range streamChan {api.SendStreamTextDriveWssCommand(StreamTextDriveWssCommand{ReqId: reqId,Data: &StreamTextWssData{Text: char,Seq: seq,},})seq++}// 发送final数据包api.SendStreamTextDriveWssCommand(StreamTextDriveWssCommand{ReqId: reqId,Data: &StreamTextWssData{Seq: 1,IsFinal: true,},})}func streamResponse(text string, delay time.Duration) <-chan string {ch := make(chan string)go func() {for _, char := range text {//模拟大模型流式返回延时time.Sleep(delay)ch <- string(char)}close(ch)}()return ch}
import timeimport uuidimport jsonimport logging# 扩展VHApi类,添加流式文本发送方法class VHApi:# ... 保留之前的其他方法 ...def send_stream_text_drive_wss_command(self, req_id, text="", seq=1, is_final=False):"""发送流式文本驱动指令"""if not self.ws or not self.ws.sock or not self.ws.sock.connected:raise Exception("WebSocket连接未建立")# 构造流式文本命令结构command = {"ReqId": req_id,"Data": {"Text": text,"Seq": seq,"IsFinal": is_final},"Type": "stream_text_drive"}try:self.ws.send(json.dumps(command))if is_final:logging.info(f"已发送最终数据包 - ReqId: {req_id}, Seq: {seq}")else:logging.info(f"已发送流式文本 - ReqId: {req_id}, Seq: {seq}, 内容: {text}")return Trueexcept Exception as e:logging.error(f"发送流式文本失败: {e}")raisedef stream_response(text, delay=0.5):"""模拟大模型流式返回文本(替代Go的channel)"""for char in text:time.sleep(delay) # 模拟生成延迟yield char # 用生成器替代Go的channel流式返回def send_not_sentence_stream_text(api):"""用户自行接入大模型驱动数智人(流式文本非子句模式)"""# 模拟大模型返回结果rsp_text = "我是一个数智人,我在测试发送流式文本非子句模式。"# 模拟大模型生成流式文本(500毫秒间隔)stream_gen = stream_response(rsp_text, delay=0.5)# 生成32位请求IDreq_id = str(uuid.uuid4()).replace("-", "")[:32]seq = 1# 发送流式文本for char in stream_gen:api.send_stream_text_drive_wss_command(req_id=req_id,text=char,seq=seq)seq += 1# 发送最终数据包api.send_stream_text_drive_wss_command(req_id=req_id,seq=seq, # 这里原Go代码用了1,实际应该用递增后的序号更合理,保留原逻辑可改为1is_final=True)# 调用方式:在主函数中根据需要调用# send_not_sentence_stream_text(api)
func SendAudio() {/*音频格式:格式-PCM,采样率-16kHz,采样位深-16bits,声道-单声道。1.如果是麦克风实时收音发送,每录制160ms(5120B)的数据即可发出,中间不需要间隔等待;如果是读取离线音频文件发送,片包大小为160ms(5120B),片包发送间隔为120ms。2.最后一个片包大小按实际发送(需小于160ms)。3.当数据包发送完毕后,必须再发送一个IsFinal=true的空数据包(Audio字段填空串)结束当次音频驱动使数字人回到静默状态。4.发送音频实时率要介于[0.75,1],小于0.75会触发限速,大于1会导致画面卡顿。比如160ms音频片包大小,发送间隔不能低于120ms,不能高于160ms。*/chunkSize := 5120 //块大小-5120BsleepBase := 120 //间隔msseq := 1reqId := strings.Replace(uuid.NewV4().String(), "-", "", -1)[:32]//读取音频二进制文件voiceFile, err := os.Open("your_audio_file.pcm")if err != nil {log.Fatalf("err:%v", err)}defer voiceFile.Close()buffer := make([]byte, chunkSize)for {n, err := voiceFile.Read(buffer)if err != nil && err != io.EOF {fmt.Println("Error reading file:", err)break}if n == 0 { // 文件读取完毕break}err = api.SendVoiceDriveCommand(VoiceDriveCommand{ReqId: reqId,Data: &VoiceData{Audio: base64.StdEncoding.EncodeToString(buffer), //声音二进制数据编码Base64后的值。Seq: seq,IsFinal: false,},})if err != nil {log.Fatalf("err:%v", err)}seq++ // 递增序列号// 等待120ms到160ms的随机时间sleepDuration := time.Duration(sleepBase+rand.Intn(41)) * time.Millisecondtime.Sleep(sleepDuration)}// 发送final数据包api.SendVoiceDriveCommand(VoiceDriveCommand{ReqId: reqId,Data: &VoiceData{Seq: seq,IsFinal: true,},})}
import base64import timeimport randomimport uuidimport loggingimport sys# 扩展VHApi类,添加音频驱动命令发送方法class VHApi:# ... 保留之前的其他方法 ...def send_voice_drive_command(self, req_id, audio="", seq=1, is_final=False):"""发送音频驱动指令"""if not self.ws or not self.ws.sock or not self.ws.sock.connected:raise Exception("WebSocket连接未建立")# 构造音频命令结构command = {"ReqId": req_id,"Data": {"Audio": audio,"Seq": seq,"IsFinal": is_final},"Type": "voice_drive"}try:import jsonself.ws.send(json.dumps(command))if is_final:logging.info(f"已发送音频最终数据包 - ReqId: {req_id}, Seq: {seq}")else:logging.info(f"已发送音频分片 - ReqId: {req_id}, Seq: {seq}, 大小: {len(audio)}B")return Trueexcept Exception as e:logging.error(f"发送音频分片失败: {e}")raisedef send_audio(api):"""发送音频文件到数智人系统音频格式要求:PCM,采样率16kHz,采样位深16bits,单声道"""chunk_size = 5120 # 块大小-5120B(对应160ms音频)sleep_base = 120 # 基础间隔时间(ms)seq = 1# 生成32位请求IDreq_id = str(uuid.uuid4()).replace("-", "")[:32]try:# 打开音频文件with open("your_audio_file.pcm", "rb") as voice_file:while True:# 读取音频数据buffer = voice_file.read(chunk_size)if not buffer: # 文件读取完毕break# 对音频数据进行Base64编码audio_base64 = base64.b64encode(buffer).decode('utf-8')# 发送音频分片api.send_voice_drive_command(req_id=req_id,audio=audio_base64,seq=seq,is_final=False)seq += 1 # 递增序列号# 等待120ms到160ms的随机时间(保持实时率在0.75-1之间)sleep_ms = sleep_base + random.randint(0, 40) # 0-40的随机数,总间隔120-160mstime.sleep(sleep_ms / 1000.0) # 转换为秒# 发送最终数据包(空音频,标记结束)api.send_voice_drive_command(req_id=req_id,audio="", # 空字符串表示结束seq=seq,is_final=True)except FileNotFoundError:logging.error("音频文件未找到: your_audio_file.pcm")sys.exit(1)except Exception as e:logging.error(f"发送音频时出错: {e}")sys.exit(1)# 调用方式:在主函数中根据需要调用# send_audio(api)Python
Demo 内容 | 语言 | 最新版本文件名 | Demo 下载链接 |
ASR/ChatGPT/数智人整合 Demo | Python | asr_chatgpt_vh_demo_20240812.zip |
文档反馈