一、实现逻辑,PV操作
1.构建基于信号量的流程图,PV操作
a --release-->: b 反之, b --acquire-->: a
a --release-->: c 反之, c --acquire-->: a
b --release-->: e 反之, e --acquire-->: b
c --release-->: d 反之, d --acquire-->: c
d --release-->: e 反之, e --acquire-->: d
2.构建一个线程池,每个类需要一个线程执行
3.基于信号量PV操作,进行调度
二、代码
抽象父类
@Data
public abstract class BaseModule implements Callable<Object> {
protected String componentName;
protected Params params;
protected Object result;
public abstract boolean before() throws InterruptedException;
public abstract void after();
public abstract void activate(String attribute);
}
A类
@Data
public class UserChatModule extends BaseModule implements Callable<Object> {
public UserChatModule(String componentName, Params params ) {
this.componentName = componentName;
this.params = params;
}
//有两个output节点,所以需要释放两个信号量。信号量的信息
@Override
public Object call() {
try {
//检索是否需要已经就绪
while (before()){
//执行自己的逻辑
System.out.println("我是A类,我就是对话入口");
after();
break;
}
} catch (Exception e) {
e.printStackTrace();
}
this.setResult("a result");
return "a result";
}
public boolean before() throws InterruptedException{
Map<String, List<Semaphore>> acquireSemaphoreMap = this.params.getAcquireSemaphoreMap();
List<Semaphore> semaphoreList = acquireSemaphoreMap.get(this.componentName);
for (Semaphore semaphore : semaphoreList) {
activate(Config.inputNodeStatus.get(semaphore));//激活节点
semaphore.acquire(); //阻塞等待
}
System.out.println("我是A类,我等待信号量的个数为"+semaphoreList.size());
//根节点,不需要等待
return true;
}
public void after(){
Map<String, List<Semaphore>> releaseSemaphoreMap = this.params.getReleaseSemaphoreMap();
List<Semaphore> semaphoreList = releaseSemaphoreMap.get(this.componentName);
for (Semaphore semaphore : semaphoreList) {
semaphore.release();
}
System.out.println("我是A类,我释放信号量的个数为"+semaphoreList.size());
}
@Override
public void activate(String attribute) {
}
}
B类
@Data
public class KnowledgeModule extends BaseModule implements Callable<Object> {
private boolean userChatInput;//用户输入
private boolean trigger; //触发器
public KnowledgeModule(String componentName, Params params) {
this.componentName = componentName;
this.params = params;
}
@Override
public Object call() {
try {
while (before()) {
//执行自己的逻辑
System.out.println("我是B类,我是知识库检索");
if (userChatInput){
System.out.println(" 用户问题激活");
}
if (trigger){
System.out.println(" 触发器激活");
}
after();
break;
}
} catch (Exception e) {
e.printStackTrace();
}
this.setResult("b result");
return "b result";
}
public boolean before() throws InterruptedException {
//Main.s1.acquire();
Map<String, List<Semaphore>> acquireSemaphoreMap = this.params.getAcquireSemaphoreMap();
List<Semaphore> semaphoreList = acquireSemaphoreMap.get(this.componentName);
for (Semaphore semaphore : semaphoreList) {
activate(Config.inputNodeStatus.get(semaphore));//激活节点
semaphore.acquire(); //阻塞等待
}
System.out.println("我是B类,我等待信号量的个数为" + semaphoreList.size());
return true;
}
public void activate(String attribute) {
if (attribute.equals("userChatInput")) {
this.userChatInput = true;
}
if (attribute.equals("trigger")) {
this.trigger = true;
}
}
public void after() {
Map<String, List<Semaphore>> releaseSemaphoreMap = this.params.getReleaseSemaphoreMap();
List<Semaphore> semaphoreList = releaseSemaphoreMap.get(this.componentName);
for (Semaphore semaphore : semaphoreList) {
semaphore.release();
}
System.out.println("我是B类,我释放信号量的个数为" + semaphoreList.size());
// //释放信号量
// Main.s3.release();
}
}
D类
@Data
public class HttpModule extends BaseModule implements Callable<Object> {
private boolean trigger; //触发器
public HttpModule(String componentName, Params params) {
this.componentName = componentName;
this.params = params;
}
@Override
public Object call() {
try {
while (before()) {
//执行自己的逻辑
System.out.println("我是D类,我就是HTTP请求");
if (trigger){
System.out.println(" 触发器激活");
}
after();
break;
}
} catch (Exception e) {
e.printStackTrace();
}
this.setResult("d result");
return "d result";
}
public boolean before() throws InterruptedException {
Map<String, List<Semaphore>> acquireSemaphoreMap = this.params.getAcquireSemaphoreMap();
List<Semaphore> semaphoreList = acquireSemaphoreMap.get(this.componentName);
for (Semaphore semaphore : semaphoreList) {
activate(Config.inputNodeStatus.get(semaphore));//激活节点
semaphore.acquire(); //阻塞等待
}
System.out.println("我是D类,我等待信号量的个数为" + semaphoreList.size());
return true;
}
public void after() {
Map<String, List<Semaphore>> releaseSemaphoreMap = this.params.getReleaseSemaphoreMap();
List<Semaphore> semaphoreList = releaseSemaphoreMap.get(this.componentName);
for (Semaphore semaphore : semaphoreList) {
semaphore.release();
}
System.out.println("我是D类,我释放信号量的个数为" + semaphoreList.size());
}
public void activate(String attribute) {
if (attribute.equals("switch")) {
this.trigger = true;
}
}
}
E类
@Data
public class ChatModule extends BaseModule implements Callable<Object> {
private boolean trigger; //触发器
private boolean prompt;//提示
private boolean history;//历史记录
private boolean userChatInput;//用户输入
private boolean quoteQA;//引用QA
public ChatModule(String componentName, Params params ) {
this.componentName = componentName;
this.params = params;
}
@Override
public Object call() {
try {
while (before()){
//执行自己的逻辑
System.out.println("我是E类,我就是AI对话");
if (trigger){
System.out.println(" 触发器激活");
}
if (prompt){
System.out.println(" 提示激活");
}
if (history){
System.out.println(" 历史记录激活");
}
if (userChatInput){
System.out.println(" 用户输入激活");
}
if (quoteQA){
System.out.println(" 引用QA激活");
}
after();
break;
}
} catch (Exception e) {
e.printStackTrace();
}
this.setResult("e result");
return "e result";
}
public boolean before() throws InterruptedException{
Map<String, List<Semaphore>> acquireSemaphoreMap = this.params.getAcquireSemaphoreMap();
List<Semaphore> semaphoreList = acquireSemaphoreMap.get(this.componentName);
for (Semaphore semaphore : semaphoreList) {
activate(Config.inputNodeStatus.get(semaphore));//激活节点
semaphore.acquire(); //阻塞等待
}
System.out.println("我是E类,我等待信号量的个数为"+semaphoreList.size());
/* Main.s3.acquire();
Main.s5.acquire();*/
return true;
}
public void after(){
//释放信号量
Map<String, List<Semaphore>> releaseSemaphoreMap = this.params.getReleaseSemaphoreMap();
List<Semaphore> semaphoreList = releaseSemaphoreMap.get(this.componentName);
for (Semaphore semaphore : semaphoreList) {
semaphore.release();
}
System.out.println("我是E类,我释放信号量的个数为"+semaphoreList.size());
}
public void activate(String attribute) {
if (attribute.equals("switch")) {
this.trigger = true;
}
if (attribute.equals("prompt")) {
this.prompt = true;
}
if (attribute.equals("history")) {
this.history = true;
}
if (attribute.equals("userChatInput")) {
this.userChatInput = true;
}
if (attribute.equals("quoteQA")) {
this.quoteQA = true;
}
}
}
配置类
public class Config {
public static ExecutorService threadPool = Executors.newFixedThreadPool(20);
public static Map<Semaphore,String> inputNodeStatus = new HashMap<>(); //重启的话,需要将状态保存到数据库中
public static Map<Semaphore,String> outputNodeStatus = new HashMap<>(); //重启的话,需要将状态保存到数据库中
}
@Data
public class Params {
private List<Module> queryParams;
//获取的信号量
private Map<String, List<Semaphore>> acquireSemaphoreMap;
//释放的信号量
private Map<String, List<Semaphore>> releaseSemaphoreMap;
}
启动类
package com.example.liteflow.mysql.app.admin.async;
import cn.hutool.core.collection.CollectionUtil;
import com.alibaba.fastjson.JSONObject;
import com.example.liteflow.mysql.app.admin.dto.LineNode;
import com.example.liteflow.mysql.app.admin.dto.Module;
import com.example.liteflow.mysql.app.admin.dto.OutputNode;
import com.example.liteflow.mysql.app.admin.module.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.stream.Collectors;
/**
* 功能描述:
*
* @date: 2024/04/07 13:47
*/
public class Main {
//TODO:1.对于资源的消耗问题
//TODO:2.对于资源释放的问题
//TODO:3.每个类内部的逻辑,如何实现。贴合业务逻辑
public static void main(String[] args) throws InterruptedException, ExecutionException {
//todo:模拟数据,根据metadata动态构建整个流程图,通过智能体id查询JSON结构
String metadata1 = "[{\"moduleId\":\"a11\",\"componentName\":\"a\",\"outputs\":[{\"targets\":[{\"moduleId\":\"b\"},{\"moduleId\":\"c\"}]}]},{\"moduleId\":\"b111\",\"componentName\":\"b\",\"outputs\":[{\"targets\":[{\"moduleId\":\"e\"}]}]},{\"moduleId\":\"c111\",\"componentName\":\"c\",\"outputs\":[{\"targets\":[{\"moduleId\":\"d\"}]}]},{\"moduleId\":\"d111\",\"componentName\":\"d\",\"outputs\":[{\"targets\":[{\"moduleId\":\"e\"}]}]},{\"moduleId\":\"e111\",\"componentName\":\"e\",\"outputs\":[{\"targets\":[]}]}]";
String metadata2 = "[{\"moduleId\":\"userChatInput\",\"name\":\"对话入口\",\"componentName\":\"a\",\"outputs\":[{\"targets\":[{\"moduleId\":\"b\",\"key\":\"userChatInput\"},{\"moduleId\":\"c\",\"key\":\"userChatInput\"}]}]},{\"moduleId\":\"chatModule\",\"componentName\":\"e\",\"name\":\"AI对话\",\"outputs\":[{\"key\":\"answerText\",\"label\":\"core.module.output.label.Ai response content\",\"description\":\"core.module.output.description.Ai response content\",\"valueType\":\"string\",\"type\":\"source\",\"targets\":[]},{\"key\":\"finish\",\"label\":\"\",\"description\":\"\",\"valueType\":\"boolean\",\"type\":\"hidden\",\"targets\":[]},{\"key\":\"history\",\"label\":\"core.module.output.label.New context\",\"description\":\"core.module.output.description.New context\",\"valueType\":\"chatHistory\",\"type\":\"source\",\"targets\":[]},{\"key\":\"userChatInput\",\"label\":\"core.module.input.label.user question\",\"type\":\"hidden\",\"valueType\":\"string\",\"targets\":[]}]},{\"moduleId\":\"e0dz6q\",\"componentName\":\"b\",\"name\":\"知识库搜索\",\"outputs\":[{\"key\":\"userChatInput\",\"label\":\"core.module.input.label.user question\",\"type\":\"hidden\",\"valueType\":\"string\",\"targets\":[{\"moduleId\":\"e\",\"key\":\"userChatInput\"}]},{\"key\":\"isEmpty\",\"label\":\"core.module.output.label.Search result empty\",\"type\":\"source\",\"valueType\":\"boolean\",\"targets\":[]},{\"key\":\"unEmpty\",\"label\":\"core.module.output.label.Search result not empty\",\"type\":\"source\",\"valueType\":\"boolean\",\"targets\":[]},{\"key\":\"quoteQA\",\"label\":\"core.module.Dataset quote.label\",\"type\":\"source\",\"valueType\":\"datasetQuote\",\"targets\":[{\"moduleId\":\"e\",\"key\":\"quoteQA\"}]}]},{\"moduleId\":\"qv2xiz\",\"componentName\":\"c\",\"name\":\"知识库搜索\",\"outputs\":[{\"key\":\"userChatInput\",\"label\":\"core.module.input.label.user question\",\"type\":\"hidden\",\"valueType\":\"string\",\"targets\":[{\"moduleId\":\"d\",\"key\":\"switch\"}]},{\"key\":\"isEmpty\",\"label\":\"core.module.output.label.Search result empty\",\"type\":\"source\",\"valueType\":\"boolean\",\"targets\":[]},{\"key\":\"unEmpty\",\"label\":\"core.module.output.label.Search result not empty\",\"type\":\"source\",\"valueType\":\"boolean\",\"targets\":[]},{\"key\":\"quoteQA\",\"label\":\"core.module.Dataset quote.label\",\"type\":\"source\",\"valueType\":\"datasetQuote\",\"targets\":[{\"moduleId\":\"d\",\"key\":\"switch\"}]}]},{\"moduleId\":\"b2ujfq\",\"componentName\":\"d\",\"name\":\"HTTP 请求\",\"outputs\":[{\"key\":\"httpRawResponse\",\"label\":\"原始响应\",\"description\":\"HTTP请求的原始响应。只能接受字符串或JSON类型响应数据。\",\"valueType\":\"any\",\"type\":\"source\",\"targets\":[]},{\"key\":\"system_addOutputParam\",\"type\":\"addOutputParam\",\"valueType\":\"any\",\"label\":\"\",\"targets\":[],\"editField\":{\"key\":true,\"description\":true,\"dataType\":true,\"defaultValue\":true},\"defaultEditField\":{\"label\":\"\",\"key\":\"\",\"description\":\"\",\"outputType\":\"source\",\"valueType\":\"string\"}},{\"key\":\"finish\",\"label\":\"\",\"description\":\"\",\"valueType\":\"boolean\",\"type\":\"hidden\",\"targets\":[{\"moduleId\":\"e\",\"key\":\"switch\"}]}]}]";
String metadata3 = "[{\"moduleId\":\"a11\",\"componentName\":\"a\",\"outputs\":[{\"targets\":[{\"moduleId\":\"b\"},{\"moduleId\":\"c\"}]}]},{\"moduleId\":\"b111\",\"componentName\":\"b\",\"outputs\":[{\"targets\":[{\"moduleId\":\"e\"}]}]},{\"moduleId\":\"c111\",\"componentName\":\"c\",\"outputs\":[{\"targets\":[{\"moduleId\":\"d\"}]}]},{\"moduleId\":\"d111\",\"componentName\":\"d\",\"outputs\":[{\"targets\":[{\"moduleId\":\"e\"}]}]},{\"moduleId\":\"e111\",\"componentName\":\"e\",\"outputs\":[{\"targets\":[]}]}]";
process(metadata2);
//process(metadata1);
}
public static void process(String metadata) throws InterruptedException, ExecutionException {
//1.初始化信号量,准备数据
List<Module> moduleList = JSONObject.parseArray(metadata, Module.class);
Map<String, List<Semaphore>> acquireSemaphoreMap = new HashMap<>();
Map<String, List<Semaphore>> releaseSemaphoreMap = new HashMap<>();
for (Module module : moduleList) {
acquireSemaphoreMap.putIfAbsent(module.getComponentName(), CollectionUtil.newArrayList());
releaseSemaphoreMap.putIfAbsent(module.getComponentName(), CollectionUtil.newArrayList());
}
//2.构建基于信号量的流程图
builderGraph(moduleList, acquireSemaphoreMap, releaseSemaphoreMap);
//3.启动线程,进行调度
dispatch(moduleList, acquireSemaphoreMap, releaseSemaphoreMap);
}
public static void builderGraph(List<Module> moduleList, Map<String, List<Semaphore>> acquireSemaphoreMap, Map<String, List<Semaphore>> releaseSemaphoreMap) {
//2.构建基于信号量的流程图
for (Module node : moduleList) {
for (OutputNode output : node.getOutputs()) {
for (LineNode target : output.getTargets()) {
String componentName = node.getComponentName();//源
String targetId = target.getModuleId(); //终
String key = target.getKey();//点
// a-->b
System.out.print(componentName + " --release-->: " + targetId);
System.out.println(" 反之, " + targetId + " --acquire-->: " + componentName);
final Semaphore temp = new Semaphore(0, true);//公平锁
Config.inputNodeStatus.put(temp, key);
releaseSemaphoreMap.get(componentName).add(temp);
acquireSemaphoreMap.get(targetId).add(temp);
}
}
}
//求所有value值的总长度
/*releaseSemaphoreMap.entrySet().forEach(entry -> {
System.out.println("构建基于信号量的流程图完成 release " + entry.getKey() + " " + entry.getValue().size());
});*/
System.out.println("构建基于信号量的流程图完成 总的长度" + releaseSemaphoreMap.values().stream().mapToInt(List::size).sum());
/* acquireSemaphoreMap.entrySet().forEach(entry -> {
System.out.println("构建基于信号量的流程图完成 acquire " + entry.getKey() + " " + entry.getValue().size());
});*/
System.out.println("构建基于信号量的流程图完成 总的长度" + releaseSemaphoreMap.values().stream().mapToInt(List::size).sum());
}
public static void dispatch(List<Module> moduleList, Map<String, List<Semaphore>> acquireSemaphoreMap, Map<String, List<Semaphore>> releaseSemaphoreMap) throws InterruptedException, ExecutionException {
List<String> usedModuleNames = moduleList.stream().map(Module::getComponentName).collect(Collectors.toList());
//第一步:将信号量数据传递给每个类
Params params = new Params();
params.setQueryParams(moduleList);
params.setAcquireSemaphoreMap(acquireSemaphoreMap);
params.setReleaseSemaphoreMap(releaseSemaphoreMap);
//第二步:启动线程,进行调度
for (String usedModuleName : usedModuleNames) {
//实例化
switch (usedModuleName) {
case "userChatInput":
BaseModule a = new UserChatModule("a", params);
Config.threadPool.submit(a);//实例化改为从容器中获取,但是需要将容器改为多例
break;
case "b":
BaseModule b = new KnowledgeModule("b", params);
Config.threadPool.submit(b);
break;
case "c":
BaseModule c = new C("c", params);
Config.threadPool.submit(c);
break;
case "d":
BaseModule d = new HttpModule("d", params);
Config.threadPool.submit(d);
break;
case "e":
BaseModule e = new ChatModule("e", params);
Future<Object> eResult = Config.threadPool.submit(e);
//System.out.println(eResult.get());
break;
default:
break;
}
}
//es.shutdown();//关闭线程池
//第三步:等待线程执行完毕
System.out.println("主线程执行完毕");
}
}
三、执行结果
基于信号量机制实现类简单的工作流引擎,还需继续优化
文章评论