网站建设制作设计营销公司四川,如何查看网站备案信息,广州网站优化快速获取排名,黄石网络推广公司想要将dify接入钉钉机器人中进行问答#xff0c;首先需要创建应用#xff0c;添加企业机器人#xff0c;创建消息卡片#xff0c;开发机器人回复消息的代码#xff0c;最后接入dify#xff0c;使用dify的工作流进行回答。
一、机器人准备
1、创建应用#xff1a;创建应…想要将dify接入钉钉机器人中进行问答首先需要创建应用添加企业机器人创建消息卡片开发机器人回复消息的代码最后接入dify使用dify的工作流进行回答。一、机器人准备1、创建应用创建应用 - 钉钉开放平台2、添加应用能力添加应用能力 - 钉钉开放平台3、配置企业机器人配置企业机器人 - 钉钉开放平台4、新建卡片模板打字机效果流式 AI 卡片 - 钉钉开放平台二、开发代码1、安装官方SDKpip install dingtalk_stream loguru2、修改模板代码以下为官方模板代码根据自己的需求修改消息接收和回复逻辑import os import logging import asyncio import argparse from loguru import logger from dingtalk_stream import AckMessage import dingtalk_stream from http import HTTPStatus from dashscope import Generation from typing import Callable def define_options(): parser argparse.ArgumentParser() parser.add_argument( --client_id, destclient_id, defaultos.getenv(DINGTALK_APP_CLIENT_ID), helpapp_key or suite_key from https://open-dev.digntalk.com, ) parser.add_argument( --client_secret, destclient_secret, defaultos.getenv(DINGTALK_APP_CLIENT_SECRET), helpapp_secret or suite_secret from https://open-dev.digntalk.com, ) options parser.parse_args() return options async def call_with_stream(request_content: str, callback: Callable[[str], None]): messages [{role: user, content: request_content}] responses Generation.call( Generation.Models.qwen_turbo, messagesmessages, result_formatmessage, # set the result to be message format. streamTrue, # set stream output. incremental_outputTrue, # get streaming output incrementally. ) full_content # with incrementally we need to merge output. length 0 for response in responses: if response.status_code HTTPStatus.OK: full_content response.output.choices[0][message][content] full_content_length len(full_content) if full_content_length - length 20: await callback(full_content) logger.info( f调用流式更新接口更新内容current_length: {length}, next_length: {full_content_length} ) length full_content_length else: raise Exception( fRequest id: {response.request_id}, Status code: {response.status_code}, error code: {response.code}, error message: {response.message} ) await callback(full_content) logger.info( fRequest Content: {request_content}\nFull response: {full_content}\nFull response length: {len(full_content)} ) return full_content async def handle_reply_and_update_card(self: dingtalk_stream.ChatbotHandler, incoming_message: dingtalk_stream.ChatbotMessage): # 卡片模板 ID card_template_id 8aebdfb9-28f4-4a98-98f5-396c3dde41a0.schema # 该模板只用于测试使用如需投入线上使用请导入卡片模板 json 到自己的应用下 content_key content card_data {content_key: } card_instance dingtalk_stream.AICardReplier( self.dingtalk_client, incoming_message ) # 先投放卡片: https://open.dingtalk.com/document/orgapp/create-and-deliver-cards card_instance_id await card_instance.async_create_and_deliver_card( card_template_id, card_data ) # 再流式更新卡片: https://open.dingtalk.com/document/isvapp/api-streamingupdate async def callback(content_value: str): return await card_instance.async_streaming( card_instance_id, content_keycontent_key, content_valuecontent_value, appendFalse, finishedFalse, failedFalse, ) try: full_content_value await call_with_stream( incoming_message.text.content, callback ) await card_instance.async_streaming( card_instance_id, content_keycontent_key, content_valuefull_content_value, appendFalse, finishedTrue, failedFalse, ) except Exception as e: self.logger.exception(e) await card_instance.async_streaming( card_instance_id, content_keycontent_key, content_value, appendFalse, finishedFalse, failedTrue, ) class CardBotHandler(dingtalk_stream.ChatbotHandler): def __init__(self, logger: logging.Logger logger): super(dingtalk_stream.ChatbotHandler, self).__init__() if logger: self.logger logger async def process(self, callback: dingtalk_stream.CallbackMessage): incoming_message dingtalk_stream.ChatbotMessage.from_dict(callback.data) self.logger.info(f收到消息{incoming_message}) if incoming_message.message_type ! text: self.reply_text(俺只看得懂文字喔~, incoming_message) return AckMessage.STATUS_OK, OK asyncio.create_task(handle_reply_and_update_card(self, incoming_message)) return AckMessage.STATUS_OK, OK def main(): options define_options() credential dingtalk_stream.Credential(options.client_id, options.client_secret) client dingtalk_stream.DingTalkStreamClient(credential) client.register_callback_handler( dingtalk_stream.ChatbotMessage.TOPIC, CardBotHandler() ) client.start_forever() if __name__ __main__: main()3、接入dify根据dify创建的工作流的API文档进行调用如果想进行多轮对话可以保存返回的conversation_idfull_content length 0 timeout aiohttp.ClientTimeout(total60) async with aiohttp.ClientSession(timeouttimeout) as session: try: async with session.post(DIFY_CHAT_URL, headersheaders, jsonpayload) as resp: if resp.status ! 200: error_text await resp.text() raise Exception(fDify API error: {resp.status}, {error_text}) async for line in resp.content: line line.decode(utf-8).strip() if not line or not line.startswith(data:): continue data_str line[5:].strip() # 去掉 data: if data_str [DONE]: break try: data json.loads(data_str) if conversation_id in data and not conv_id: # 首次创建对话保存 ID update_conversation_id(user_id, data[conversation_id]) except json.JSONDecodeError: continue if answer in data: chunk data[answer] full_content chunk # 策略1每次收到 chunk 都回调 # await callback(full_content) # 策略2累积一定长度再回调 if len(full_content) - length 20: await callback(full_content) logger.info( f调用流式更新接口更新内容current_length: {length}, next_length: {len(full_content)} ) length len(full_content) # 最终回调确保完整内容被处理 if full_content and len(full_content) length: await callback(full_content) logger.info( fRequest Content: {request_content}\nFull response: {full_content}\nFull response length: {len(full_content)} ) return full_content except Exception as e: logger.error(f调用 Dify 流式接口出错: {e}) raise4、拓展继续开发不仅可以进行文字对话还能够进行图片问答先调用self.get_image_download_url方法获取钉钉上传的图像再调用v1/files/upload接口即可上传图像到dify