问题导读:
1、mysql及hbase之间的数据如何同步?
2、同步代码各模块如何开发?
3、如何创建同步任务配置-mysql->mysql?
4、如何删除同步任务配置?
Datax概述
1.概述
2.功能清单
CRUD增删改查 、启动任务、停止任务
3.说明:本项目只支持mysql及hbase之间的数据同步
代码模块
配置文件
pom.xml
复制代码
DataxDolphinschedulerController
- import java.io.IOException;
- import java.io.UnsupportedEncodingException;
- import java.net.URLEncoder;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Map;
-
- import javax.servlet.http.HttpServletRequest;
-
- import com.alibaba.fastjson.JSONArray;
- import com.alibaba.fastjson.JSONObject;
- import com.geespace.microservices.builder.dto.ProcessDto;
- import com.geespace.microservices.builder.dto.SyncConfigDto;
- import com.geespace.microservices.builder.enums.DictionaryEnum;
- import com.geespace.microservices.builder.request.ConfigAddForm;
- import com.geespace.microservices.builder.request.ConfigSelectForm;
- import com.geespace.microservices.builder.request.ConfigUpdateForm;
- import com.geespace.microservices.builder.response.BizCode;
- import com.geespace.microservices.builder.response.DolphinschedulerResponse;
- import com.geespace.microservices.builder.response.Msg;
- import com.geespace.microservices.builder.response.PageResult;
- import com.geespace.microservices.builder.response.ReturnResult;
- import com.geespace.microservices.builder.service.SyncConfigService;
- import com.geespace.microservices.builder.tools.JsonTools;
-
- import lombok.extern.slf4j.Slf4j;
-
- import org.apache.commons.httpclient.HttpException;
- import org.apache.commons.httpclient.NameValuePair;
- import org.apache.commons.httpclient.methods.PostMethod;
- import org.springframework.beans.BeanUtils;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.http.HttpEntity;
- import org.springframework.http.HttpHeaders;
- import org.springframework.http.HttpMethod;
- import org.springframework.http.ResponseEntity;
- import org.springframework.transaction.annotation.Transactional;
- import org.springframework.util.StringUtils;
- import org.springframework.validation.annotation.Validated;
- import org.springframework.web.bind.annotation.DeleteMapping;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.PathVariable;
- import org.springframework.web.bind.annotation.PostMapping;
- import org.springframework.web.bind.annotation.PutMapping;
- import org.springframework.web.bind.annotation.RequestBody;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RequestMethod;
- import org.springframework.web.bind.annotation.RequestParam;
- import org.springframework.web.bind.annotation.RestController;
- import org.springframework.web.client.RestTemplate;
-
- /**
- * 迁移dolphinscheduler调度器
- *
- * @author: liudz
- * @date: 2021/5/7
- */
- @Slf4j
- @RestController
- @RequestMapping("/dolphinscheduler/v1")
- public class DataxDolphinschedulerController {
- @Autowired
- private RestTemplate restTemplate;
- @Value("${dolphinscheduler.token}")
- String token;
- @Value("${dolphinscheduler.address}")
- String address;
- public static final int ZERO = 0;
- public static final int SUCCESS = 200;
- public static final String CREATE = "create";
- public static final String UPDATE = "update";
- public static final String ADD = "add";
- public static final String DELETE = "delete";
- public static final String ONLINE = "ONLINE";
- public static final String OFFLINE = "OFFLINE";
- public static final int ONE_THOUSAND_AND_FIVE_HUNDRED = 1500;
- public static final int SIX = 6;
- public static final int EIGHTY = 80;
- public static final int THREE = 3;
- @Autowired
- private SyncConfigService syncConfigService;
-
-
- /**
- * 创建任务-创建用户下唯一工作流,无则创建有则并排添加
- * @param request request
- * @param form 任务参数
- * @author liudz
- * @date 2021/5/8
- * @return 执行结果
- **/
- @PostMapping("/project/process/datax")
- @Transactional(rollbackFor = Exception.class)
- public ReturnResult operatorDataxTask(HttpServletRequest request, @RequestBody @Validated ConfigAddForm form) {
- Long userId = Long.valueOf(request.getUserPrincipal().getName());
- form.setUserId(userId);
-
- ReturnResult<SyncConfigDto> dataxTaskReturnResult = syncConfigService.addConfig(form);
- if (dataxTaskReturnResult.getCode() != SUCCESS) {
- return dataxTaskReturnResult;
- }
- log.info("--(1)addDataxTaskResult--success");
- form.setId(dataxTaskReturnResult.getData().getId());
- if (dataxTaskReturnResult.getCode() == SUCCESS) {
- Boolean verifyResult = verifyProcessExist(userId + "-dataxTask", form.getProjectName());
- log.info("--(2)verifyProcessExist--success:{}", verifyResult);
- if (!verifyResult) {
- ProcessDto processDto = packageProcessParam(
- "create", userId + "-dataxTask", dataxTaskReturnResult.getData(), null);
- log.info("--(3)packageProcessParam--success");
- processDto.setProjectName(form.getProjectName());
- processDto.setProjectId(form.getProjectId());
- dataxTaskReturnResult = createProcess(processDto);
- } else {
- //获取用户下唯一工作流ID
- DolphinschedulerResponse processInfoList = getUserProcess(form.getProjectName());
- JSONObject processJson = new JSONObject();
- log.info("--(3)getUserProcess--success:{}", processInfoList);
- List<Map<String, Object>> list = (List<Map<String, Object>>) processInfoList.getData();
- for (Map<String, Object> map : list) {
- if (map.get("name").equals(userId + "-dataxTask")) {
- processJson.fluentPutAll(map);
- }
- }
- ProcessDto processDto = packageProcessParam(
- "add", userId + "-dataxTask", dataxTaskReturnResult.getData(), processJson);
- processDto.setId(processJson.getInteger("id"));
- log.info("--(4)packageProcessParam--success");
- if (processJson.getString(DictionaryEnum.RELEASE_STATE.getFiledString()).equals(ONLINE)) {
- releaseProcessDefinition(form.getProjectName(), userId + "-dataxTask",
- processDto.getId(), 0);
- log.info("--(5)releaseProcessDefinition--OFFLINE--success");
- }
- dataxTaskReturnResult = updateProcess(form, processDto);
- }
- }
- return dataxTaskReturnResult;
- }
- /**
- * 更新任务
- * @param request request
- * @param form 任务参数
- * @author liudz
- * @date 2021/5/8
- * @return 执行结果
- **/
- @PutMapping("/project/process/datax")
- @Transactional(rollbackFor = Exception.class)
- public ReturnResult updateDataxTask(HttpServletRequest request, @RequestBody @Validated ConfigUpdateForm form) {
- Long userId = Long.valueOf(request.getUserPrincipal().getName());
- form.setUserId(userId);
- ReturnResult<SyncConfigDto> dataxTaskReturnResult = syncConfigService.updateConfig(form);
- log.info("--(1)updateDataxTaskResult--mysql--success");
- if (dataxTaskReturnResult.getCode() == SUCCESS) {
- //获取用户下唯一工作流ID
- DolphinschedulerResponse processInfoList = getUserProcess(form.getProjectName());
- JSONObject processJson = new JSONObject();
- log.info("--(2)getUserProcess--success:{}", processInfoList);
- List<Map<String, Object>> list = (List<Map<String, Object>>) processInfoList.getData();
- for (Map<String, Object> map : list) {
- if (map.get("name").equals(userId + "-dataxTask")) {
- processJson.fluentPutAll(map);
- }
- }
- ProcessDto processDto = packageProcessParam(
- "update", userId + "-dataxTask", dataxTaskReturnResult.getData(), processJson);
- processDto.setProjectName(form.getProjectName());
- processDto.setProjectId(form.getProjectId());
- processDto.setId(processJson.getInteger("id"));
- log.info("--(3)packageProcessParam--success");
- if (processJson.getString(DictionaryEnum.RELEASE_STATE.getFiledString()).equals(ONLINE)) {
- releaseProcessDefinition(form.getProjectName(), userId + "-dataxTask",
- processDto.getId(), 0);
- log.info("--(4)releaseProcessDefinition--OFFLINE--success");
- }
- ConfigAddForm configAddForm = new ConfigAddForm();
- BeanUtils.copyProperties(form, configAddForm);
- return updateProcess(configAddForm, processDto);
- }
- return dataxTaskReturnResult;
- }
- /**
- * 删除任务
- * @param request request
- * @param projectName 项目名称
- * @param id 任务ID
- * @author liudz
- * @date 2021/5/8
- * @return 执行结果
- **/
- @DeleteMapping("/project/process/datax/{projectName}/{id}")
- @Transactional(rollbackFor = Exception.class)
- public ReturnResult deleteDataxTask(HttpServletRequest request, @PathVariable("projectName") String projectName,
- @PathVariable("id") Long id) {
- Long userId = Long.valueOf(request.getUserPrincipal().getName());
- SyncConfigDto syncConfigDto = new SyncConfigDto();
- syncConfigDto.setId(id);
- ConfigAddForm configAddForm = new ConfigAddForm();
- configAddForm.setProjectName(projectName);
- ReturnResult dataxTaskReturnResult = syncConfigService.delete(id, userId);
- log.info("--(1)deleteDataxTask--mysql--success");
- if (dataxTaskReturnResult.getCode() == SUCCESS) {
- //获取用户下唯一工作流ID
- DolphinschedulerResponse processInfoList = getUserProcess(projectName);
- JSONObject processJson = new JSONObject();
- log.info("--(2)getUserProcess--success:{}", processInfoList);
- List<Map<String, Object>> list = (List<Map<String, Object>>) processInfoList.getData();
- for (Map<String, Object> map : list) {
- if (map.get("name").equals(userId + "-dataxTask")) {
- processJson.fluentPutAll(map);
- }
- }
- ProcessDto processDto = packageProcessParam(
- "delete", userId + "-dataxTask", syncConfigDto, processJson);
- processDto.setId(processJson.getInteger("id"));
- log.info("--(3)packageProcessParam--success");
- if (processJson.getString(DictionaryEnum.RELEASE_STATE.getFiledString()).equals(ONLINE)) {
- releaseProcessDefinition(projectName, userId + "-dataxTask",
- processDto.getId(), 0);
- log.info("--(4)releaseProcessDefinition--OFFLINE--success");
- }
- if (JSONObject.parseObject(processDto.getLocations()).keySet().size() == 0) {
- //删除工作流
- deleteProcess(configAddForm, processDto);
- } else {
- //更新工作流
- updateProcess(configAddForm, processDto);
- }
- }
- return dataxTaskReturnResult;
- }
-
- /**
- * 校验工作流是否存在
- *
- * @param processName
- * 工作流名称
- * @param projectName 项目名称
- * @author liudz
- * @date 2021/5/8
- * @return boolean
- **/
- public Boolean verifyProcessExist(String processName, String projectName) {
- HttpHeaders headers = new HttpHeaders();
- headers.set("token", token);
- headers.set("Content-Type", "application/json");
- HttpEntity requestEntity = new HttpEntity(headers);
- ResponseEntity<DolphinschedulerResponse> returnResult =
- restTemplate.exchange(address + "/dolphinscheduler/projects/" + projectName
- + "/process/verify-name?name=" + processName,
- HttpMethod.GET, requestEntity, DolphinschedulerResponse.class);
- if (returnResult.getBody().getCode() == ZERO) {
- return false;
- }
- return true;
- }
-
- /**
- * 创建工作流
- * @param processDto processDto
- * @author liudz
- * @date 2021/5/7
- * @return 执行结果
- **/
- public ReturnResult createProcess(ProcessDto processDto) {
- try {
- String postURL = address + "/dolphinscheduler/projects/"
- + URLEncoder.encode(processDto.getProjectName(), "utf-8") + "/process/save";
- PostMethod postMethod = new PostMethod(postURL);
- postMethod.setRequestHeader("Content-Type",
- "application/x-www-form-urlencoded;charset=utf-8");
- postMethod.setRequestHeader("token", token);
- NameValuePair[] data = {new NameValuePair("connects", processDto.getConnects()),
- new NameValuePair("name", processDto.getName()),
- new NameValuePair("locations", processDto.getLocations()),
- new NameValuePair("processDefinitionJson", processDto.getProcessDefinitionJson())};
- postMethod.setRequestBody(data);
- org.apache.commons.httpclient.HttpClient httpClient = new org.apache.commons.httpclient.HttpClient();
- httpClient.executeMethod(postMethod);
- JSONObject result = JSONObject.parseObject(postMethod.getResponseBodyAsString());
- log.info("--(5)httpCreateProcess:{}", result);
- if (!result.get(DictionaryEnum.CODE.getFiledString()).equals(ZERO)) {
- return ReturnResult.error(BizCode.DB_INSERT_ERROR, result.getString("msg"));
- }
- } catch (Exception e) {
- log.info("请求异常:{}", e);
- }
- return ReturnResult.success();
- }
-
- /**
- * 更新工作流
- * @param vo vo
- * @param processDto processDto
- * @author liudz
- * @date 2021/5/7
- * @return 执行结果
- **/
- public ReturnResult updateProcess(ConfigAddForm vo, ProcessDto processDto) {
- try {
-
- String postURL = address + "/dolphinscheduler/projects/"
- + URLEncoder.encode(vo.getProjectName(), "utf-8") + "/process/update";
- PostMethod postMethod = new PostMethod(postURL);
- postMethod.setRequestHeader("Content-Type",
- "application/x-www-form-urlencoded;charset=utf-8");
- postMethod.setRequestHeader("token", token);
- // 参数设置,需要注意的就是里边不能传NULL,要传空字符串
- NameValuePair[] data = {new NameValuePair("connects", processDto.getConnects()),
- new NameValuePair("name", processDto.getName()),
- new NameValuePair("locations", processDto.getLocations()),
- new NameValuePair("id", processDto.getId().toString()),
- new NameValuePair("processDefinitionJson", processDto.getProcessDefinitionJson())};
- postMethod.setRequestBody(data);
- org.apache.commons.httpclient.HttpClient httpClient = new org.apache.commons.httpclient.HttpClient();
- httpClient.executeMethod(postMethod);
- JSONObject result = JSONObject.parseObject(postMethod.getResponseBodyAsString());
- log.info("--(5)httpUpdateProcess:{}", result);
- if (!result.get(DictionaryEnum.CODE.getFiledString()).equals(ZERO)) {
- return ReturnResult.error(BizCode.DB_INSERT_ERROR, result.getString("msg"));
- }
- } catch (Exception e) {
- log.info("请求异常:{}", e);
- }
- return ReturnResult.success();
- }
- /**
- * 删除工作流
- * @param dto dto
- * @param processDto processDto
- * @author liudz
- * @date 2021/5/7
- * @return 执行结果
- **/
- public DolphinschedulerResponse deleteProcess(ConfigAddForm dto, ProcessDto processDto) {
- HttpHeaders headers = new HttpHeaders();
- headers.set("token", token);
- headers.set("Content-Type", "application/json");
- HttpEntity requestEntity = new HttpEntity(headers);
- ResponseEntity<DolphinschedulerResponse> returnResult =
- restTemplate.exchange(address + "/dolphinscheduler/projects/" + dto.getProjectName()
- + "/process/delete?processDefinitionId=" + processDto.getId(),
- HttpMethod.GET, requestEntity, DolphinschedulerResponse.class);
- log.info("--(5)httpDeleteProcess:{}", returnResult);
- return returnResult.getBody();
- }
-
- /**
- * 获取dolphinscheduler上的资源spark可拖拽jar的id
- *
- * @author liudz
- * @date 2021/5/8
- * @return id
- **/
- public Integer getSparkResourceJarId() {
- Integer resourceId = null;
- HttpHeaders headers = new HttpHeaders();
- headers.set("token", token);
- headers.set("Content-Type", "application/json");
- HttpEntity requestEntity = new HttpEntity(headers);
- ResponseEntity<DolphinschedulerResponse> returnResult =
- restTemplate.exchange(address + "/dolphinscheduler/resources/authorize-resource-tree?userId=1",
- HttpMethod.GET, requestEntity, DolphinschedulerResponse.class);
- List<Map<String, Object>> list = (List<Map<String, Object>>) returnResult.getBody().getData();
- for (Map<String, Object> map : list) {
- if (map.get("name").equals("big_data02.jar")) {
- resourceId = Integer.valueOf(map.get("id").toString());
- }
- }
- return resourceId;
- }
- /**
- * 获取dolphinscheduler上的某用户下唯一工作流
- * @param projectName 项目名称
- * @author liudz
- * @date 2021/5/8
- * @return id
- **/
- public DolphinschedulerResponse getUserProcess(String projectName) {
- HttpHeaders headers = new HttpHeaders();
- headers.set("token", token);
- headers.set("Content-Type", "application/json");
- HttpEntity requestEntity = new HttpEntity(headers);
- ResponseEntity<DolphinschedulerResponse> returnResult =
- restTemplate.exchange(address + "/dolphinscheduler/projects/" + projectName + "/process/list",
- HttpMethod.GET, requestEntity, DolphinschedulerResponse.class);
- return returnResult.getBody();
- }
- /**
- * 封装参数
- * @param type 操作类型
- * @param processName 用户工作流名称
- * @param dto 任务参数
- * @param processJson 工作流json
- * @author liudz
- * @date 2021/5/13
- * @return ProcessDto
- **/
- public ProcessDto packageProcessParam(String type, String processName, SyncConfigDto dto, JSONObject processJson) {
- ProcessDto processDto = new ProcessDto();
- processDto.setConnects("[]");
- processDto.setName(processName);
- JSONObject locationsOne = new JSONObject();
- JSONObject locationsTwo = new JSONObject();
- locationsTwo.fluentPut("name", "datax-" + dto.getId()).fluentPut("targetarr", "").fluentPut("nodenumber", "0");
- locationsTwo.fluentPut("x", 0).fluentPut("y", 0);
- locationsOne.put("datax-" + dto.getId(), locationsTwo);
-
- // 创建工作流
- if (CREATE.equals(type)) {
- processDto = packageProcessParamOfCreate(processDto, dto, locationsOne);
- } else if (ADD.equals(type)) {
- //工作流添加节点
- processDto = packageProcessParamOfAdd(processDto, dto, processJson, locationsOne, locationsTwo);
- } else if (UPDATE.equals(type)) {
- //更新工作流-只更新参数processDefinitionJson的tasks参数
- processDto = packageProcessParamOfUpdate(processDto, processJson, dto);
- } else if (DELETE.equals(type)) {
- //更新工作流或删除工作流-更新则删除参数processDefinitionJson的tasks参数
- processDto = packageProcessParamOfDelete(processDto, processJson, dto);
- }
- return processDto;
- }
- /**
- * packageProcessParamOfCreate
- * @param processDto 工作流参数
- * @param dto 任务参数
- * @param locationsOne locationsOne
- * @author liudz
- * @date 2021/5/7
- * @return ProcessDto
- **/
- public ProcessDto packageProcessParamOfCreate(ProcessDto processDto, SyncConfigDto dto, JSONObject locationsOne) {
- processDto.setLocations(locationsOne.toString());
- JSONObject processOne = new JSONObject();
- processOne.fluentPut("globalParams", new ArrayList<>()).fluentPut("tenantId", THREE).fluentPut("timeout", 0);
- JSONObject processTwo = new JSONObject();
- processTwo.fluentPut("type", "DATAX").fluentPut("id", "datax-" + dto.getId());
- processTwo.fluentPut("name", "datax-" + dto.getId()).fluentPut("description", "");
-
- String taskJsonString = dto.getContent().toString();
- processTwo.put("params", JSONObject.parseObject("{"localParams":[],"customConfig":1,"
- + ""json":"" + taskJsonString.replace(""", "\\"") + ""}"));
-
- JSONObject jsonTimeout = new JSONObject();
- jsonTimeout.fluentPut("strategy", "").fluentPut("interval", null).fluentPut("enable", false);
- processTwo.fluentPut("timeout", jsonTimeout).fluentPut("runFlag", "NORMAL");
- JSONObject processTree = new JSONObject();
- processTree.fluentPut("successNode", new ArrayList<>()).fluentPut("failedNode", new ArrayList<>());
- JSONObject jsonconditionResult = new JSONObject();
- jsonconditionResult.put("successNode", new ArrayList<>());
- jsonconditionResult.put("failedNode", new ArrayList<>());
- processTwo.fluentPut("conditionResult", jsonconditionResult).fluentPut("dependence", new JSONObject());
- processTwo.fluentPut("maxRetryTimes", "0").fluentPut("retryInterval", "1");
- processTwo.fluentPut("taskInstancePriority", "MEDIUM").fluentPut("workerGroup", "default");
- processTwo.fluentPut("preTasks", new ArrayList<>());
- JSONArray processTaskArray = new JSONArray();
- processTaskArray.add(processTwo);
- processOne.put("tasks", processTaskArray);
- processDto.setProcessDefinitionJson(processOne.toString());
- return processDto;
- }
- /**
- * packageProcessParamOfAdd
- * @param processDto 工作流参数
- * @param locationsOne locationsOne
- * @param locationsTwo locationsTwo
- * @param dto 任务参数
- * @param processJson 工作流json
- * @author liudz
- * @date 2021/5/7
- * @return ProcessDto
- **/
- public ProcessDto packageProcessParamOfAdd(ProcessDto processDto, SyncConfigDto dto, JSONObject processJson,
- JSONObject locationsOne, JSONObject locationsTwo) {
- String maxTaskKey = JsonTools.getJsonStringMaxKey(processJson.getString("locations"));
- Integer x = processJson.getJSONObject("locations").getJSONObject(maxTaskKey).getInteger("x");
- Integer y = processJson.getJSONObject("locations").getJSONObject(maxTaskKey).getInteger("y");
- if (x < ONE_THOUSAND_AND_FIVE_HUNDRED) {
- locationsTwo.fluentPut("x", x + EIGHTY).fluentPut("y", y);
- } else if (x >= ONE_THOUSAND_AND_FIVE_HUNDRED) {
- locationsTwo.fluentPut("y", y + EIGHTY).fluentPut("x", 0);
- }
- locationsOne = processJson.getJSONObject("locations").fluentPut("datax-" + dto.getId(), locationsTwo);
- processDto.setLocations(locationsOne.toString());
- processDto.setId(processJson.getInteger("id"));
- JSONObject processTwo = new JSONObject();
- processTwo.fluentPut("type", "DATAX").fluentPut("id", "datax-" + dto.getId());
- processTwo.fluentPut("name", "datax-" + dto.getId()).fluentPut("description", "");
- String taskJsonString = dto.getContent().toString().replace("}}", "} }").replace("{{", "{ {");
- processTwo.put("params", JSONObject.parseObject("{"localParams":[],"customConfig":1,"
- + ""json":"" + taskJsonString.replace(""", "\\"") + ""}"));
- JSONObject jsonTimeout = new JSONObject();
- jsonTimeout.fluentPut("strategy", "").fluentPut("interval", null).fluentPut("enable", false);
- processTwo.fluentPut("timeout", jsonTimeout).fluentPut("runFlag", "NORMAL");
- JSONObject processTree = new JSONObject();
- processTree.fluentPut("successNode", new ArrayList<>()).fluentPut("failedNode", new ArrayList<>());
- JSONObject jsonconditionResult = new JSONObject();
- jsonconditionResult.put("successNode", new ArrayList<>());
- jsonconditionResult.put("failedNode", new ArrayList<>());
- processTwo.fluentPut("conditionResult", jsonconditionResult).fluentPut("dependence", new JSONObject());
- processTwo.fluentPut("maxRetryTimes", "0").fluentPut("retryInterval", "1");
- processTwo.fluentPut("taskInstancePriority", "MEDIUM").fluentPut("workerGroup", "default");
- processTwo.fluentPut("preTasks", new ArrayList<>());
- JSONObject jsonNew = processJson.getJSONObject("processDefinitionJson");
- JSONArray jsonArray = jsonNew.getJSONArray("tasks");
- jsonArray.add(processTwo);
- jsonNew.put("tasks", jsonArray);
- processDto.setProcessDefinitionJson(jsonNew.toString());
- return processDto;
- }
- /**
- * packageProcessParamOfUpdate
- * @param processDto 工作流参数
- * @param dto 任务参数
- * @param processJson 工作流json
- * @author liudz
- * @date 2021/5/7
- * @return ProcessDto
- **/
- public ProcessDto packageProcessParamOfUpdate(ProcessDto processDto, JSONObject processJson, SyncConfigDto dto) {
- processDto.setLocations(processJson.getString("locations"));
- processDto.setId(processJson.getInteger("id"));
- JSONArray jsonTasksArray = processJson.getJSONObject("processDefinitionJson").getJSONArray("tasks");
- JSONArray copyJsonTasksArray = new JSONArray();
- copyJsonTasksArray.addAll(jsonTasksArray);
- JSONObject processDefinitionJson = new JSONObject();
- String taskJsonString = dto.getContent().toString();
- for (Object object : jsonTasksArray) {
- JSONObject jsonObject = JSONObject.parseObject(object.toString());
- if (Long.valueOf(jsonObject.getString("id").substring(SIX)) == dto.getId()) {
- String json = jsonObject.getString("json");
- json = taskJsonString;
- copyJsonTasksArray.remove(jsonObject);
- jsonObject.getJSONObject("params").put("json", json);
- copyJsonTasksArray.add(jsonObject);
- processDefinitionJson = processJson.getJSONObject("processDefinitionJson");
- processDefinitionJson.put("tasks", copyJsonTasksArray);
- }
- }
- processDto.setProcessDefinitionJson(processDefinitionJson.toString());
- return processDto;
- }
- /**
- * packageProcessParamOfDelete
- * @param processDto 工作流参数
- * @param dto 任务参数
- * @param processJson 工作流json
- * @author liudz
- * @date 2021/5/7
- * @return ProcessDto
- **/
- public ProcessDto packageProcessParamOfDelete(ProcessDto processDto, JSONObject processJson, SyncConfigDto dto) {
- processDto.setId(processJson.getInteger("id"));
- JSONObject locationsJson = processJson.getJSONObject("locations");
- JSONObject processDefinitionJson = processJson.getJSONObject("processDefinitionJson");
- JSONArray processDefinitionArray = processDefinitionJson.getJSONArray("tasks");
- JSONArray copyProcessDefinitionArray = new JSONArray();
- copyProcessDefinitionArray.addAll(processDefinitionArray);
- if (locationsJson.containsKey(DictionaryEnum.DATAX.getFiledString() + dto.getId())) {
- locationsJson.remove("datax-" + dto.getId());
- for (Object object : processDefinitionArray) {
- if (JSONObject.parseObject(object.toString()).getString("id").equals("datax-" + dto.getId())) {
- copyProcessDefinitionArray.remove(object);
- }
- }
- processDefinitionJson.put("tasks", copyProcessDefinitionArray);
- }
- processDto.setLocations(locationsJson.toString());
- processDto.setProcessDefinitionJson(processDefinitionJson.toString());
- return processDto;
- }
-
- /**
- * 工作流【上线或者下线】
- * @param projectName 项目名称
- * @param processName 用户工作流名称
- * @param processId 工作流ID
- * @param releaseState 上下线状态操作【0:下线,1:上线】
- * @author liudz
- * @date 2021/5/7
- * @return 执行结果
- **/
- public ReturnResult releaseProcessDefinition(String projectName, String processName, Integer processId,
- Integer releaseState) {
- try {
- String postURL = address + "/dolphinscheduler/projects/"
- + URLEncoder.encode(projectName, "utf-8") + "/process/release";
- PostMethod postMethod = new PostMethod(postURL);
- postMethod.setRequestHeader("Content-Type",
- "application/x-www-form-urlencoded;charset=utf-8");
- postMethod.setRequestHeader("token", token);
- // 参数设置,需要注意的就是里边不能传NULL,要传空字符串
- NameValuePair[] data = {new NameValuePair("name", processName),
- new NameValuePair("processId", processId.toString()),
- new NameValuePair("releaseState", releaseState.toString())};
- postMethod.setRequestBody(data);
- org.apache.commons.httpclient.HttpClient httpClient = new org.apache.commons.httpclient.HttpClient();
- httpClient.executeMethod(postMethod);
- JSONObject result = JSONObject.parseObject(postMethod.getResponseBodyAsString());
- if (!result.get(DictionaryEnum.CODE.getFiledString()).equals(ZERO)) {
- return ReturnResult.error(BizCode.DB_INSERT_ERROR, result.getString("msg"));
- }
- } catch (Exception e) {
- log.info("请求异常:{}", e);
- }
- return ReturnResult.success();
- }
- /**
- * 运行流程实例
- * @param projectName 项目名称
- * @param request request
- * @param id 数据同步任务ID
- * @author liudz
- * @date 2021/5/7
- * @return 执行结果
- **/
- @GetMapping("/project/process/datax/start")
- public DolphinschedulerResponse startProcessDataxTask(
- @RequestParam("projectName") String projectName, @RequestParam("id") Integer id,
- HttpServletRequest request) {
- try {
- Long userId = Long.valueOf(request.getUserPrincipal().getName());
- DolphinschedulerResponse processInfoList = getUserProcess(projectName);
- if (processInfoList.getCode() != ZERO) {
- return processInfoList;
- }
- JSONObject processJson = new JSONObject();
- log.info("--(1)getUserProcess--success:{}", processInfoList);
- List<Map<String, Object>> list = (List<Map<String, Object>>) processInfoList.getData();
- for (Map<String, Object> map : list) {
- if (map.get("name").equals(userId + "-dataxTask")) {
- processJson.fluentPutAll(map);
- }
- }
- if (processJson.getString(DictionaryEnum.RELEASE_STATE.getFiledString()).equals(OFFLINE)) {
- releaseProcessDefinition(projectName, userId + "-dataxTask",
- processJson.getInteger("id"), 1);
- log.info("--(2)releaseProcessDefinition--ONLINE--success");
- }
- String postURL = address + "/dolphinscheduler/projects/" + URLEncoder.encode(projectName, "utf-8")
- + "/executors/start-process-instance";
- PostMethod postMethod = new PostMethod(postURL);
- postMethod.setRequestHeader("Content-Type",
- "application/x-www-form-urlencoded;charset=utf-8");
- postMethod.setRequestHeader("token", token);
- // 参数设置,需要注意的就是里边不能传NULL,要传空字符串
- NameValuePair[] data = packageNameValuePair(processJson, id);
- postMethod.setRequestBody(data);
- org.apache.commons.httpclient.HttpClient httpClient = new org.apache.commons.httpclient.HttpClient();
- httpClient.executeMethod(postMethod);
- JSONObject result = JSONObject.parseObject(postMethod.getResponseBodyAsString());
- log.info("--(2)startProcessInstance--result:{}", result);
- if (!result.get(DictionaryEnum.CODE.getFiledString()).equals(ZERO)) {
- return DolphinschedulerResponse.error(result.getInteger("code"), result.getString("msg"));
- }
- } catch (Exception e) {
- log.info("请求异常:{}", e);
- }
- return DolphinschedulerResponse.success();
- }
- /**
- * packageNameValuePair封装参数
- * @param processJson 工作流json
- * @param dragSparkTaskId 任务ID
- * @author liudz
- * @date 2021/5/14
- * @return NameValuePair
- **/
- public NameValuePair[] packageNameValuePair(JSONObject processJson, Integer dragSparkTaskId) {
- NameValuePair[] data = {
- new NameValuePair("failureStrategy", "CONTINUE"),
- new NameValuePair("processDefinitionId", processJson.getString("id")),
- new NameValuePair("processInstancePriority", "MEDIUM"),
- new NameValuePair("warningGroupId", "0"),
- new NameValuePair("warningType", "NONE"),
- new NameValuePair("runMode", "RUN_MODE_SERIAL"),
- new NameValuePair("startNodeList", "datax-" + dragSparkTaskId),
- new NameValuePair("taskDependType", "TASK_POST"),
- new NameValuePair("workerGroup", "default")};
- return data;
- }
-
-
- /**
- * stopProcessDataxTask
- * @param id id
- * @param executeType executeType
- * @param projectName 项目名称
- * @return ReturnResult
- * @author: liudz
- * @author: lty update 2020/5/27
- * @date: 2020/4/28 10:31
- */
- @GetMapping(value = "/project/process/datax/execute/{projectName}/{id}/{executeType}")
- public DolphinschedulerResponse<String> stopProcessDataxTask(@PathVariable("projectName") String projectName,
- @PathVariable("id") Long id, @PathVariable("executeType") String executeType) {
- log.info("--(1)stopProcessDataxTask--begin--projectName:{},id:{},executeType:{}", projectName, id, executeType);
- try {
- HttpHeaders headers = new HttpHeaders();
- headers.set("token", token);
- headers.set("Content-Type", "application/json");
- HttpEntity requestEntity = new HttpEntity(headers);
- ResponseEntity<JSONObject> returnResult = restTemplate.exchange(address + "/"
- + "dolphinscheduler/projects/" + projectName + "/task-instance/list-paging?"
- + "pageNo=1&pageSize=100&taskName=datax-" + id, HttpMethod.GET, requestEntity, JSONObject.class);
- List<Map<String, Object>> list =
- (List<Map<String, Object>>) returnResult.getBody().getJSONObject("data").get("totalList");
- Integer processInstanceId = null;
- for (Map<String, Object> map : list) {
- if (map.get("state").equals("RUNNING_EXEUTION")) {
- processInstanceId = Integer.valueOf(map.get("processInstanceId").toString());
- }
-
- }
- if (StringUtils.isEmpty(processInstanceId)) {
- return DolphinschedulerResponse.error(Msg.TASK_HAS_BEEN_STOPPED);
- }
- log.info("--(2)getProcessInstanceId--success--:{}", processInstanceId);
- String postURL = address + "/dolphinscheduler/projects/"
- + URLEncoder.encode(projectName, "utf-8") + "/executors/execute";
- PostMethod postMethod = new PostMethod(postURL);
- postMethod.setRequestHeader("Content-Type",
- "application/x-www-form-urlencoded;charset=utf-8");
- postMethod.setRequestHeader("token", token);
- NameValuePair[] data = {new NameValuePair("executeType", executeType),
- new NameValuePair("processInstanceId", processInstanceId.toString())};
- postMethod.setRequestBody(data);
- org.apache.commons.httpclient.HttpClient httpClient = new org.apache.commons.httpclient.HttpClient();
- httpClient.executeMethod(postMethod);
- JSONObject result = JSONObject.parseObject(postMethod.getResponseBodyAsString());
- if (!result.get(DictionaryEnum.CODE.getFiledString()).equals(ZERO)) {
- return DolphinschedulerResponse.error(result.getInteger("code"), result.getString("msg"));
- }
- log.info("--(3)stopProcessSparkTask--success--:{}", result);
- } catch (UnsupportedEncodingException e) {
- log.info("UnsupportedEncodingException:{}", e);
- } catch (HttpException e) {
- log.info("HttpException:{}", e);
- } catch (IOException e) {
- log.info("IOException:{}", e);
- }
- return DolphinschedulerResponse.success();
- }
-
- /**
- * 查询全部同步任务配置(分页)
- *
- * @param form
- * name
- * @param request
- * 含有用户id
- * @return 分页结果
- */
- @RequestMapping(value = "/project/process/datax/list", method = RequestMethod.POST)
- public ReturnResult<PageResult<SyncConfigDto>> findAll(@RequestBody @Validated ConfigSelectForm form,
- HttpServletRequest request) {
- Long userId = Long.valueOf(request.getUserPrincipal().getName());
- return syncConfigService.list(form, userId);
- }
-
- /**
- * 获取同步任务配置
- *
- * @param id
- * 配置id
- * @param request
- * 用户id
- * @return 添加结果
- */
- @RequestMapping(value = "/project/process/datax", method = RequestMethod.GET)
- public ReturnResult<SyncConfigDto> findById(@RequestParam Long id, HttpServletRequest request) {
- Long userId = Long.valueOf(request.getUserPrincipal().getName());
- return syncConfigService.findById(id, userId);
- }
- }
-
复制代码
ConfigAddForm
- package com.geespace.microservices.builder.request;
-
- import javax.validation.constraints.NotEmpty;
- import javax.validation.constraints.NotNull;
-
- import com.alibaba.fastjson.JSONArray;
- import com.alibaba.fastjson.JSONObject;
-
- import lombok.Data;
-
- /**
- * @Author: zjr
- * @Date: 2020-05-06 09:42
- * @Version 1.0
- */
- @Data
- public class ConfigAddForm {
-
- /**
- * 配置名称
- */
- @NotEmpty(message = "name不能为空")
- private String name;
- /**
- * 配置描述
- */
- private String description;
- /**
- * 实时/全量/增量
- */
- @NotNull(message = "同步方式不能为空")
- private int syncType;
- /**
- * reader 选择的数据源id
- */
- @NotNull(message = "读取数据源id不能为空")
- private Long readerConfigId;
- /**
- * reader
- */
- @NotEmpty(message = "读取参数不能为空")
- private JSONObject readerParam;
- /**
- * writer 选择的数据源id
- */
- @NotNull(message = "写入数据源id不能为空")
- private Long writerConfigId;
- /**
- * writer
- */
- @NotEmpty(message = "写入参数不能为空")
- private JSONObject writerParam;
- /**
- * reader:column left,writer:column right
- */
- @NotEmpty(message = "字段对照表不能为空")
- private JSONArray columnMap;
-
- private Long userId;
- /**
- * 项目名称
- **/
- String projectName;
- /**
- * 项目id
- **/
- @NotNull(message = "projectId not null")
- Long projectId;
- Long id;
- }
-
复制代码
ConfigUpdateForm
- package com.geespace.microservices.builder.request;
-
- import javax.validation.constraints.NotEmpty;
- import javax.validation.constraints.NotNull;
-
- import com.alibaba.fastjson.JSONArray;
- import com.alibaba.fastjson.JSONObject;
-
- import lombok.Data;
-
- /**
- * @Author: zjr
- * @Date: 2020-05-06 09:42
- * @Version 1.0
- */
- @Data
- public class ConfigUpdateForm {
- @NotNull(message = "同步配置id不能为空")
- private Long id;
- /**
- * 配置名称
- */
- @NotEmpty(message = "name不能为空")
- private String name;
- /**
- * 配置描述
- */
- private String description;
- /**
- * 实时/全量/增量
- */
- @NotNull(message = "同步方式不能为空")
- private int syncType;
- /**
- * reader 选择的数据源id
- */
- @NotNull(message = "读取数据源id不能为空")
- private Long readerConfigId;
- /**
- * reader
- */
- @NotEmpty(message = "读取参数不能为空")
- private JSONObject readerParam;
- /**
- * writer 选择的数据源id
- */
- @NotNull(message = "写入数据源id不能为空")
- private Long writerConfigId;
- /**
- * writer
- */
- @NotEmpty(message = "写入参数不能为空")
- private JSONObject writerParam;
- /**
- * reader:column left,writer:column right
- */
- @NotEmpty(message = "字段对照表不能为空")
- private JSONArray columnMap;
-
- private Long userId;
- /**
- * 项目id
- **/
- @NotNull(message = "projectId not null")
- Long projectId;
- /**
- * 项目名称
- **/
- String projectName;
-
- }
-
复制代码
ProcessDto
- package com.geespace.microservices.builder.dto;
-
- import lombok.Data;
- import lombok.EqualsAndHashCode;
- import lombok.ToString;
-
- /**
- * dolphinscheduler调度器中工作流参数
- * @Author: liudz
- * @Date: 2020-03-23
- **/
- @Data
- @EqualsAndHashCode(callSuper = false)
- @ToString(callSuper = true)
- public class ProcessDto {
- /**
- * 流程定义ID
- **/
- private Integer id;
- /**
- * 流程定义节点图标连接信息(json格式)
- **/
- private String connects;
- /**
- * 流程定义节点坐标位置信息(json格式)
- **/
- private String locations;
- /**
- * 流程定义名称
- **/
- private String name;
- /**
- * 流程定义详细信息(json格式)
- **/
- private String processDefinitionJson;
- /**
- * 项目名称
- **/
- String projectName;
- /**
- * 项目id
- **/
- Long projectId;
- }
-
复制代码
SyncConfigDto
- package com.geespace.microservices.builder.dto;
-
- import com.alibaba.fastjson.JSONObject;
-
- import lombok.Data;
-
- /**
- * @Author: zjr
- * @Date: 2020-05-05 17:03
- * @Version 1.0
- */
- @Data
- public class SyncConfigDto {
- private Long id;
-
- /**
- * 配置名称
- */
- private String name;
-
- /**
- * 配置描述
- */
- private String description;
-
- /**
- * 实时/全量/增量
- */
- private int syncType;
- /**
- * json base64
- */
- private JSONObject content;
- /**
- * 项目名称
- **/
- String projectName;
- /**
- * 项目id
- **/
- Long projectId;
- }
-
复制代码
SyncConfigService
- package com.geespace.microservices.builder.service;
-
- import com.geespace.microservices.builder.dto.SyncConfigDto;
- import com.geespace.microservices.builder.request.ConfigAddForm;
- import com.geespace.microservices.builder.request.ConfigSelectForm;
- import com.geespace.microservices.builder.request.ConfigUpdateForm;
- import com.geespace.microservices.builder.response.PageResult;
- import com.geespace.microservices.builder.response.ReturnResult;
-
- /**
- * @Author: zjr
- * @Date: 2020-05-05 13:59
- * @Version 1.0
- */
- public interface SyncConfigService {
- /**
- * 添加同步任务配置
- *
- * @param form
- * 任务配置参数
- * @return 添加结果
- */
- ReturnResult<SyncConfigDto> addConfig(ConfigAddForm form);
-
- /**
- * 修改同步任务配置
- *
- * @param form
- * 任务配置参数(含id)
- * @return 修改结果
- */
- ReturnResult<SyncConfigDto> updateConfig(ConfigUpdateForm form);
-
- /**
- * 查找同步任务配置
- *
- * @param id
- * 同步任务配置id
- * @param userId
- * 用户id
- * @return 查询结果
- */
- ReturnResult<SyncConfigDto> findById(Long id, Long userId);
-
- /**
- * 删除同步任务配置
- *
- * @param id
- * 任务配置id
- * @param userId
- * 用户id
- * @return 删除结果
- */
- ReturnResult delete(Long id, Long userId);
-
- /**
- * 查询全部同步任务配置(分页)
- *
- * @param form
- * name
- * @param userId
- * 用户id
- * @return 分页结果
- */
- ReturnResult<PageResult<SyncConfigDto>> list(ConfigSelectForm form, Long userId);
- }
-
复制代码
SyncConfigServiceImpl
- package com.geespace.microservices.builder.service.impl;
-
- import java.util.ArrayList;
- import java.util.List;
- import java.util.stream.Collectors;
-
- import com.alibaba.fastjson.JSONArray;
- import com.alibaba.fastjson.JSONObject;
- import com.geespace.microservices.builder.biz.Contants;
- import com.geespace.microservices.builder.dao.SyncConfigMapper;
- import com.geespace.microservices.builder.dto.ColumnMap;
- import com.geespace.microservices.builder.dto.SyncConfigDto;
- import com.geespace.microservices.builder.entity.SyncConfig;
- import com.geespace.microservices.builder.factory.BaseParamTool;
- import com.geespace.microservices.builder.factory.ParamToolFactory;
- import com.geespace.microservices.builder.request.ConfigAddForm;
- import com.geespace.microservices.builder.request.ConfigSelectForm;
- import com.geespace.microservices.builder.request.ConfigUpdateForm;
- import com.geespace.microservices.builder.response.BizCode;
- import com.geespace.microservices.builder.response.PageResult;
- import com.geespace.microservices.builder.response.ReturnResult;
- import com.geespace.microservices.builder.service.SyncConfigService;
- import com.geespace.microservices.datasource.dto.JdbcDataSourceDto;
- import com.geespace.microservices.datasource.response.Response;
- import com.geespace.microservices.datasource.service.JdbcDataSourceService;
- import com.github.pagehelper.PageHelper;
- import com.github.pagehelper.PageInfo;
-
- import lombok.extern.slf4j.Slf4j;
-
- import org.springframework.beans.BeanUtils;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
- import org.springframework.util.CollectionUtils;
-
- /**
- * @Author: zjr
- * @Date: 2020-05-05 13:59
- * @Version 1.0
- */
- @Service
- @Slf4j
- public class SyncConfigServiceImpl implements SyncConfigService {
- public static final int ZERO = 0;
- public static final String HBASE = "hbase";
- @Autowired
- private SyncConfigMapper syncConfigMapper;
-
- @Autowired
- private JdbcDataSourceService dataSourceService;
-
- @Override
- public ReturnResult<SyncConfigDto> addConfig(ConfigAddForm form) {
- Integer checkResult = syncConfigMapper.checkNameUnique(form.getUserId(), form.getName(), null);
- if (checkResult == ZERO) {
- ColumnMap columnMap = makeColumnMap(form.getColumnMap());
- if (columnMap == null) {
- return ReturnResult.error(BizCode.COLUMN_MATCHING_ERROR);
- }
- // 查询reader数据源 填充reader
- JSONObject reader = packageReader(form.getReaderConfigId(), form.getReaderParam(), columnMap.getReader());
- // 查询writer数据源 填充writer
- JSONObject writer = packageWriter(form.getWriterConfigId(), form.getWriterParam(), columnMap.getWriter());
- JSONArray contentArray = new JSONArray();
- JSONObject content = new JSONObject();
- content.put("reader", reader);
- content.put("writer", writer);
- contentArray.add(content);
- SyncConfig syncConfig = new SyncConfig();
- syncConfig.setContent(packageJob(contentArray));
- syncConfig.setName(form.getName());
- syncConfig.setDescription(form.getDescription());
- syncConfig.setSyncType(form.getSyncType());
- syncConfig.setCreatedTimestamp(System.currentTimeMillis());
- syncConfig.setCreatedUser(form.getUserId());
- syncConfig.setModifiedTimestamp(System.currentTimeMillis());
- syncConfig.setProjectName(form.getProjectName());
- syncConfig.setProjectId(form.getProjectId());
- syncConfigMapper.insert(syncConfig);
- return ReturnResult.success(entityToDto(syncConfig));
- }
- log.error("SyncConfigServiceImpl--addConfig--NAME_IS_EXIST!");
- return ReturnResult.error(BizCode.NAME_IS_EXIST);
- }
-
- @Override
- public ReturnResult<SyncConfigDto> updateConfig(ConfigUpdateForm form) {
- SyncConfig syncConfig = syncConfigMapper.findById(form.getId());
- if (syncConfig == null || syncConfig.getCreatedUser() != form.getUserId()) {
- return ReturnResult.error(BizCode.UPDATE_OBJECT_NOT_EXIST);
- }
- Integer checkResult = syncConfigMapper.checkNameUnique(form.getUserId(), form.getName(), form.getId());
- if (checkResult == ZERO) {
- ColumnMap columnMap = makeColumnMap(form.getColumnMap());
- JSONObject reader = packageReader(form.getReaderConfigId(), form.getReaderParam(), columnMap.getReader());
- JSONObject writer = packageWriter(form.getWriterConfigId(), form.getWriterParam(), columnMap.getWriter());
- JSONArray contentArray = new JSONArray();
- JSONObject content = new JSONObject();
- content.put("reader", reader);
- content.put("writer", writer);
- contentArray.add(content);
- syncConfig.setContent(packageJob(contentArray));
- syncConfig.setName(form.getName());
- syncConfig.setDescription(form.getDescription());
- syncConfig.setSyncType(form.getSyncType());
- syncConfig.setModifiedTimestamp(System.currentTimeMillis());
- syncConfig.setProjectName(form.getProjectName());
- syncConfig.setProjectId(form.getProjectId());
- syncConfigMapper.update(syncConfig);
- return ReturnResult.success(entityToDto(syncConfig));
- }
- log.error("SyncConfigServiceImpl--updateConfig--NAME_IS_EXIST!");
- return ReturnResult.error(BizCode.NAME_IS_EXIST);
- }
-
- @Override
- public ReturnResult<SyncConfigDto> findById(Long id, Long userId) {
- SyncConfig syncConfig = syncConfigMapper.findById(id);
- if (syncConfig == null || syncConfig.getCreatedUser() != userId) {
- return ReturnResult.success(new SyncConfigDto());
- }
- return ReturnResult.success(entityToDto(syncConfig));
- }
-
- @Override
- public ReturnResult delete(Long id, Long userId) {
- log.debug("****id:{},userId:{}****", id, userId);
- SyncConfig syncConfig = syncConfigMapper.findById(id);
- log.debug("****syncConfig:{}****", syncConfig);
- log.debug("****syncConfig != null:{}", syncConfig != null);
- log.debug("****syncConfig.getCreatedUser():{},userId:{},syncConfig.getCreatedUser().equals(userId):{}",
- syncConfig.getCreatedUser(), userId, syncConfig.getCreatedUser().equals(userId));
- if (syncConfig != null && syncConfig.getCreatedUser().equals(userId)) {
- syncConfigMapper.delete(id);
- log.debug("****delete success!");
- }
- return ReturnResult.success();
- }
-
- @Override
- public ReturnResult<PageResult<SyncConfigDto>> list(ConfigSelectForm form, Long userId) {
- SyncConfig syncConfig = new SyncConfig();
- syncConfig.setCreatedUser(userId);
- syncConfig.setName(form.getName());
- syncConfig.setProjectId(form.getProjectId());
- PageHelper.startPage(form.getPageNum(), form.getPageSize());
- PageInfo<SyncConfig> configPageInfo = new PageInfo<>(syncConfigMapper.list(syncConfig));
- PageResult<SyncConfigDto> result = new PageResult<>();
- result.setPageNum(configPageInfo.getPageNum());
- result.setPageSize(configPageInfo.getPageSize());
- result.setTotalCount(configPageInfo.getTotal());
- result.setTotalPage(configPageInfo.getPages());
- List<SyncConfigDto> dtoList =
- configPageInfo.getList().stream().map(this::entityToDto).collect(Collectors.toList());
- result.setList(dtoList);
- return ReturnResult.success(result);
- }
-
- /**
- * 将reader writer对照list查分成2个独立list(保持顺序)
- *
- * @param columnMap
- * [{"reader":"col l1","writer":"col r1"},{"reader":"col l2","writer":"col r2"}]
- * @return object contants reader(list<String>) and writer(list<String>)
- */
- private ColumnMap makeColumnMap(JSONArray columnMap) {
- List<String> readerColumns = new ArrayList<>();
- List<String> writerColumns = new ArrayList<>();
- for (int i = 0; i < columnMap.size(); i++) {
- JSONObject column = columnMap.getJSONObject(i);
- readerColumns.add(column.getString("reader"));
- writerColumns.add(column.getString("writer"));
- }
- if (CollectionUtils.isEmpty(readerColumns) || CollectionUtils.isEmpty(writerColumns)) {
- return null;
- }
- ColumnMap column = new ColumnMap();
- column.setReader(readerColumns);
- column.setWriter(writerColumns);
- return column;
- }
-
- /**
- * 封装reader json
- *
- * @param readerConfigId
- * 数据源id
- * @param readerParam
- * 页面填写reader 配置属性信息(table、where...)
- * @param readerColumns
- * 选择的数据字段
- * @return reader json
- */
- private JSONObject packageReader(Long readerConfigId, JSONObject readerParam, List<String> readerColumns) {
- Response<JdbcDataSourceDto> descrypt = dataSourceService.findDescrypt(readerConfigId);
- if (!descrypt.responseSuccess()) {
- return null;
- }
- JdbcDataSourceDto jdbcDataSource = descrypt.getInfo();
- String sourceType = jdbcDataSource.getSourceType();
- BaseParamTool baseParamTool = ParamToolFactory.getByType(sourceType);
- JSONObject reader = baseParamTool.makeReaderJson(jdbcDataSource, readerParam, readerColumns);
- return reader;
- }
-
- /**
- * 封装writer json
- *
- * @param writerConfigId
- * 数据源id
- * @param writerParam
- * 页面填写writer 配置属性信息(table、where...)
- * @param writerColumns
- * 选择的映射字段
- * @return writer json
- */
- private JSONObject packageWriter(Long writerConfigId, JSONObject writerParam, List<String> writerColumns) {
- Response<JdbcDataSourceDto> descrypt = dataSourceService.findDescrypt(writerConfigId);
- if (!descrypt.responseSuccess()) {
- return null;
- }
- JdbcDataSourceDto jdbcDataSource = descrypt.getInfo();
- String sourceType = jdbcDataSource.getSourceType();
- BaseParamTool baseParamTool = ParamToolFactory.getByType(sourceType);
- JSONObject writer = baseParamTool.makeWriterJson(jdbcDataSource, writerParam, writerColumns);
- return writer;
- }
-
- /**
- * 封装执行job json
- *
- * @param content
- * reader and writer
- * @return job
- */
- private JSONObject packageJob(JSONArray content) {
- JSONObject job = new JSONObject();
- JSONObject setting = new JSONObject();
- JSONObject speed = new JSONObject();
- speed.put("channel", 1);
- JSONObject errorLimit = new JSONObject();
- errorLimit.put("record", 0);
- errorLimit.put("percentage", Contants.PERCENTAGE);
- setting.put("speed", speed);
- setting.put("errorLimit", errorLimit);
- job.put("setting", setting);
- job.put("content", content);
- JSONObject jobContent = new JSONObject();
- jobContent.put("job", job);
- return jobContent;
- }
-
- /**
- * entity转dto
- *
- * @param syncConfig
- * entity
- * @return dto
- */
- private SyncConfigDto entityToDto(SyncConfig syncConfig) {
- SyncConfigDto configDto = new SyncConfigDto();
- BeanUtils.copyProperties(syncConfig, configDto);
- return configDto;
- }
- }
-
复制代码
SyncConfigMapper
- package com.geespace.microservices.builder.dao;
-
- import java.util.List;
-
- import com.geespace.microservices.builder.entity.SyncConfig;
-
- import org.apache.ibatis.annotations.Delete;
- import org.apache.ibatis.annotations.Insert;
- import org.apache.ibatis.annotations.Mapper;
- import org.apache.ibatis.annotations.Result;
- import org.apache.ibatis.annotations.ResultMap;
- import org.apache.ibatis.annotations.Results;
- import org.apache.ibatis.annotations.Select;
- import org.apache.ibatis.annotations.SelectKey;
- import org.apache.ibatis.annotations.SelectProvider;
- import org.apache.ibatis.annotations.Update;
- import org.apache.ibatis.type.JdbcType;
-
- /**
- * @Author: zjr
- * @Date: 2020-05-05 10:40
- * @Version 1.0
- */
- @Mapper
- public interface SyncConfigMapper {
- /**
- * 插入一条数据
- *
- * @param syncConfig
- * 插入对象
- * @return 结果
- */
- @Insert({"insert into sync_config (name, description, content, sync_type, created_timestamp, created_user, ",
- "modified_timestamp,project_id,project_name) values (#{name,jdbcType=VARCHAR},#{description,jdbcType=VARCHAR},",
- "#{content,jdbcType=OTHER, typeHandler=com.geespace.microservices.builder.handler.MySqlJsonHandler}, ",
- "#{syncType,jdbcType=TINYINT}, #{createdTimestamp,jdbcType=BIGINT}, #{createdUser,jdbcType=BIGINT}, ",
- "#{modifiedTimestamp,jdbcType=BIGINT},#{projectId},#{projectName})"})
- @SelectKey(statement = "SELECT LAST_INSERT_ID()", keyProperty = "id", before = false, resultType = Long.class)
- Long insert(SyncConfig syncConfig);
-
- /**
- * 更新数据
- *
- * @param syncConfig
- * 插入对象
- * @return 结果
- */
- @Update({"update sync_config set ",
- "name = #{name,jdbcType=VARCHAR}, description = #{description,jdbcType=VARCHAR}, ",
- "sync_type = #{syncType,jdbcType=TINYINT}, content = #{content,jdbcType=OTHER,",
- "typeHandler=com.geespace.microservices.builder.handler.MySqlJsonHandler},project_id=#{projectId}, ",
- "created_timestamp = #{createdTimestamp,jdbcType=BIGINT}, created_user = #{createdUser,jdbcType=BIGINT}, ",
- "modified_timestamp = #{modifiedTimestamp,jdbcType=BIGINT},project_name=#{projectName},project_id=#{projectId}",
- " where id = #{id,jdbcType=BIGINT}"})
- int update(SyncConfig syncConfig);
-
- /**
- * 删除数据
- *
- * @param id
- * config id
- * @return 影响行数
- */
- @Delete("delete from sync_config where id = #{id,jdbcType=BIGINT}")
- int delete(Long id);
-
- /**
- * 查询
- *
- * @param syncConfig
- * name
- * @return list结果
- */
- @SelectProvider(type = SyncConfigSqlProvider.class, method = "select")
- @Results(id = "resultMap",
- value = {@Result(column = "id", property = "id", jdbcType = JdbcType.BIGINT, id = true),
- @Result(column = "name", property = "name", jdbcType = JdbcType.VARCHAR),
- @Result(column = "description", property = "description", jdbcType = JdbcType.VARCHAR),
- @Result(column = "sync_type", property = "syncType", jdbcType = JdbcType.TINYINT),
- @Result(column = "content", property = "content", jdbcType = JdbcType.OTHER,
- typeHandler = com.geespace.microservices.builder.handler.MySqlJsonHandler.class),
- @Result(column = "created_timestamp", property = "createdTimestamp", jdbcType = JdbcType.BIGINT),
- @Result(column = "created_user", property = "createdUser", jdbcType = JdbcType.BIGINT),
- @Result(column = "project_id", property = "projectId", jdbcType = JdbcType.BIGINT),
- @Result(column = "project_name", property = "projectName", jdbcType = JdbcType.VARCHAR),
- @Result(column = "modified_timestamp", property = "modifiedTimestamp", jdbcType = JdbcType.BIGINT)})
- List<SyncConfig> list(SyncConfig syncConfig);
-
- /**
- * id 查询
- *
- * @param id
- * id
- * @return 结果
- */
- @Select({"select id,project_id,project_name,name, description, sync_type, content,"
- + " created_timestamp, created_user, modified_timestamp ",
- "from sync_config where id = #{id,jdbcType=BIGINT}"})
- @ResultMap("resultMap")
- SyncConfig findById(Long id);
-
- /**
- * 校验任务名称唯一性,用于新增功能
- * @author: liudz
- * @param createdUser 用户ID
- * @param name 任务名称
- * @param id 任务ID
- * @date: 2020/7/23
- * @return SparkTask
- */
- @SelectProvider(type = SyncConfigSqlProvider.class, method = "checkNameUnique")
- Integer checkNameUnique(Long createdUser, String name, Long id);
- }
-
复制代码
SyncConfigSqlProvider
- package com.geespace.microservices.builder.dao;
-
- import com.geespace.microservices.builder.entity.SyncConfig;
-
- import org.apache.commons.lang3.StringUtils;
- import org.apache.ibatis.jdbc.SQL;
-
- /**
- * @Author: zjr
- * @Date: 2020-05-22 13:35
- * @Version 1.0
- */
- public class SyncConfigSqlProvider {
- /**
- * 条件查询
- *
- * @param syncConfig
- * name
- * @return sql
- */
- public String select(SyncConfig syncConfig) {
- SQL sql = new SQL();
- sql.SELECT("id,project_id,project_name,name, description, sync_type, content, created_timestamp,"
- + " created_user, modified_timestamp");
- sql.FROM("sync_config");
- sql.WHERE("created_user = #{createdUser,jdbcType=BIGINT}");
- if (!org.springframework.util.StringUtils.isEmpty(syncConfig.getProjectId())) {
- sql.WHERE("project_id=#{projectId}");
- }
- if (!StringUtils.isBlank(syncConfig.getName())) {
- sql.WHERE("name like concat('%', #{name,jdbcType=VARCHAR}, '%')");
- }
- sql.ORDER_BY("id desc");
- return sql.toString();
- }
-
- /**
- * 校验任务名称唯一性,用于新增功能
- *
- * @author: liudz
- * @date 2019/12/3
- * @author: liudz
- * @param createdUser 用户ID
- * @param name 任务名称
- * @param id 任务ID
- * @return sql
- */
- public String checkNameUnique(Long createdUser, String name, Long id) {
- SQL sql = new SQL();
- sql.SELECT("COUNT(name)");
- sql.FROM("sync_config");
- if (!org.springframework.util.StringUtils.isEmpty(id)) {
- sql.WHERE("id != #{id}");
- }
- sql.WHERE("created_user=#{createdUser} and name=#{name}");
- return sql.toString();
- }
- }
-
复制代码
JdbcDataSourceService
复制代码
JdbcDataSourceServiceImpl
复制代码
DataSourceMapper
- package com.geespace.microservices.datasource.dao;
-
- import java.util.List;
-
- import com.geespace.microservices.datasource.entity.JdbcDataSource;
-
- import org.apache.ibatis.annotations.Delete;
- import org.apache.ibatis.annotations.Insert;
- import org.apache.ibatis.annotations.Mapper;
- import org.apache.ibatis.annotations.Options;
- import org.apache.ibatis.annotations.Param;
- import org.apache.ibatis.annotations.Result;
- import org.apache.ibatis.annotations.ResultMap;
- import org.apache.ibatis.annotations.Results;
- import org.apache.ibatis.annotations.Select;
- import org.apache.ibatis.annotations.SelectKey;
- import org.apache.ibatis.annotations.SelectProvider;
- import org.apache.ibatis.annotations.Update;
- import org.apache.ibatis.type.JdbcType;
-
- /**
- * @Author: zjr
- * @Date: 2020-04-07 17:05
- * @Version 1.0
- */
- @Mapper
- public interface DataSourceMapper {
- /**
- * 添加数据源信息
- *
- * @param source
- * 数据源
- * @return id
- */
- @Insert({
- "insert into ge_jdbc_datasource (source_type, source_name, jdbc_url, user_name, password, zk_address, znode, ",
- "database_name, jdbc_driver_class, remark, creator, create_time, update_time, status)",
- "values (#{sourceType,jdbcType=VARCHAR}, #{sourceName,jdbcType=VARCHAR}, #{jdbcUrl,jdbcType=VARCHAR}, ",
- "#{userName,jdbcType=VARCHAR}, #{password,jdbcType=VARCHAR}, #{zkAddress,jdbcType=VARCHAR}, ",
- "#{znode,jdbcType=VARCHAR}, #{databaseName,jdbcType=VARCHAR}, #{jdbcDriverClass,jdbcType=VARCHAR}, ",
- "#{remark,jdbcType=VARCHAR}, #{creator,jdbcType=BIGINT}, #{createTime,jdbcType=TIMESTAMP}, ",
- "#{updateTime,jdbcType=TIMESTAMP}, #{status,jdbcType=TINYINT})"})
- @SelectKey(statement = "SELECT LAST_INSERT_ID()", keyProperty = "id", before = false, resultType = Long.class)
- int insert(JdbcDataSource source);
-
- /**
- * 修改数据源
- *
- * @param source
- * 全量修改
- * @return 影响行数
- */
- @Update({"update ge_jdbc_datasource",
- "set source_type = #{sourceType,jdbcType=VARCHAR}, source_name = #{sourceName,jdbcType=VARCHAR}, ",
- "jdbc_url = #{jdbcUrl,jdbcType=VARCHAR}, user_name = #{userName,jdbcType=VARCHAR}, ",
- "password = #{password,jdbcType=VARCHAR}, zk_address = #{zkAddress,jdbcType=VARCHAR}, ",
- "znode = #{znode,jdbcType=VARCHAR}, database_name = #{databaseName,jdbcType=VARCHAR}, ",
- "jdbc_driver_class = #{jdbcDriverClass,jdbcType=VARCHAR}, remark = #{remark,jdbcType=VARCHAR}, ",
- "update_time = #{updateTime,jdbcType=TIMESTAMP}, status = #{status,jdbcType=TINYINT}",
- "where id = #{id,jdbcType=BIGINT} and status = 1"})
- int update(JdbcDataSource source);
-
- /**
- * 删除数据源配置
- *
- * @param id
- * 数据源id
- * @return 影响行数
- */
- @Delete("delete from ge_jdbc_datasource where id = #{id,jdbcType=BIGINT}")
- int delete(Long id);
-
- /**
- * 查询用户数据源配置
- *
- * @param creator
- * 创建者id
- * @return 数据源列表
- */
- @Select({"select * from ge_jdbc_datasource where creator = #{creator,jdbcType=BIGINT} and status = 1",
- " order by id desc"})
- @Results(id = "resultMap",
- value = {@Result(column = "id", property = "id", jdbcType = JdbcType.BIGINT, id = true),
- @Result(column = "source_type", property = "sourceType", jdbcType = JdbcType.VARCHAR),
- @Result(column = "source_name", property = "sourceName", jdbcType = JdbcType.VARCHAR),
- @Result(column = "jdbc_url", property = "jdbcUrl", jdbcType = JdbcType.VARCHAR),
- @Result(column = "user_name", property = "userName", jdbcType = JdbcType.VARCHAR),
- @Result(column = "password", property = "password", jdbcType = JdbcType.VARCHAR),
- @Result(column = "zk_address", property = "zkAddress", jdbcType = JdbcType.VARCHAR),
- @Result(column = "znode", property = "znode", jdbcType = JdbcType.VARCHAR),
- @Result(column = "database_name", property = "databaseName", jdbcType = JdbcType.VARCHAR),
- @Result(column = "jdbc_driver_class", property = "jdbcDriverClass", jdbcType = JdbcType.VARCHAR),
- @Result(column = "remark", property = "remark", jdbcType = JdbcType.VARCHAR),
- @Result(column = "creator", property = "creator", jdbcType = JdbcType.BIGINT),
- @Result(column = "create_time", property = "createTime", jdbcType = JdbcType.TIMESTAMP),
- @Result(column = "update_time", property = "updateTime", jdbcType = JdbcType.TIMESTAMP),
- @Result(column = "status", property = "status", jdbcType = JdbcType.TINYINT)})
- List<JdbcDataSource> list(Long creator);
-
- /**
- * 查询用户数据源配置
- *
- * @param creator
- * 创建者id
- * @param type
- * 数据源类型
- * @return 数据源列表
- */
- @Select({"<script>", "select * from ge_jdbc_datasource where creator = #{creator,jdbcType=BIGINT} ",
- "and status = 1 and source_type in ",
- "<foreach item='item' index='index' collection='type' open='(' separator=',' close=')'>",
- "#{item,jdbcType=VARCHAR}", "</foreach>", " order by id desc", "</script>"})
- @ResultMap("resultMap")
- List<JdbcDataSource> listByType(@Param("creator") Long creator, @Param("type") List<String> type);
-
- /**
- * 条件查询数据源
- *
- * @param jdbcDataSource
- * 查询条件
- * @return 查询结果
- */
- @SelectProvider(type = DataSourceSqlProvider.class, method = "select")
- @ResultMap("resultMap")
- List<JdbcDataSource> select(JdbcDataSource jdbcDataSource);
-
- /**
- * id 查找
- *
- * @param id
- * id
- * @return 数据源
- */
- @Options(flushCache = Options.FlushCachePolicy.TRUE)
- @Select("select * from ge_jdbc_datasource where id = #{id,jdbcType=BIGINT} and status = 1")
- @ResultMap("resultMap")
- JdbcDataSource find(Long id);
-
- /**
- * 数据源名称是否存在
- *
- * @param jdbcDataSource
- * 数据源名称
- * @return 数据源
- */
- @Select({"select * from ge_jdbc_datasource ",
- "where source_name = #{sourceName,jdbcType=VARCHAR} and creator = #{creator,jdbcType=BIGINT}"})
- @ResultMap("resultMap")
- JdbcDataSource nameExist(JdbcDataSource jdbcDataSource);
- }
-
复制代码
DataSourceSqlProvider
- package com.geespace.microservices.datasource.dao;
-
- import com.geespace.microservices.datasource.entity.JdbcDataSource;
-
- import org.apache.commons.lang.StringUtils;
- import org.apache.ibatis.jdbc.SQL;
-
- /**
- * @Author: zjr
- * @Date: 2020-04-09 14:20
- * @Version 1.0
- */
- public class DataSourceSqlProvider {
- /**
- * 条件查询sql语句生成
- *
- * @param jdbcDataSource
- * 查询条件
- * @return sql语句
- */
- public String select(JdbcDataSource jdbcDataSource) {
- SQL sql = new SQL();
- sql.SELECT("*");
- sql.FROM("ge_jdbc_datasource");
- sql.WHERE("status = 1");
- if (jdbcDataSource.getCreator() != null) {
- sql.WHERE("creator = #{creator,jdbcType=BIGINT}");
- }
- if (!StringUtils.isBlank(jdbcDataSource.getSourceName())) {
- sql.WHERE("source_name like concat('%', #{sourceName,jdbcType=VARCHAR}, '%')");
- }
- sql.ORDER_BY("id desc");
- return sql.toString();
- }
- }
-
复制代码
MetaDataSourceMapper
- package com.geespace.microservices.datasource.dao;
-
- import java.util.List;
-
- import com.geespace.microservices.datasource.entity.JdbcDataSource;
-
- import org.apache.ibatis.annotations.Mapper;
- import org.apache.ibatis.annotations.Result;
- import org.apache.ibatis.annotations.ResultMap;
- import org.apache.ibatis.annotations.Results;
- import org.apache.ibatis.annotations.Select;
- import org.apache.ibatis.type.JdbcType;
-
- /**
- * 内部数据源,系统配置,和外部数据源保持一致
- * @Author: zjr
- * @Date: 2020-04-07 17:05
- * @Version 1.0
- */
- @Mapper
- public interface MetaDataSourceMapper {
- /**
- * 查询用户数据源配置
- *
- * @return 数据源列表
- */
- @Select("select * from ge_meta_datasource ")
- @Results(id = "resultMap",
- value = {@Result(column = "id", property = "id", jdbcType = JdbcType.BIGINT, id = true),
- @Result(column = "source_type", property = "sourceType", jdbcType = JdbcType.VARCHAR),
- @Result(column = "source_name", property = "sourceName", jdbcType = JdbcType.VARCHAR),
- @Result(column = "jdbc_url", property = "jdbcUrl", jdbcType = JdbcType.VARCHAR),
- @Result(column = "user_name", property = "userName", jdbcType = JdbcType.VARCHAR),
- @Result(column = "password", property = "password", jdbcType = JdbcType.VARCHAR),
- @Result(column = "zk_address", property = "zkAddress", jdbcType = JdbcType.VARCHAR),
- @Result(column = "znode", property = "znode", jdbcType = JdbcType.VARCHAR),
- @Result(column = "database_name", property = "databaseName", jdbcType = JdbcType.VARCHAR),
- @Result(column = "jdbc_driver_class", property = "jdbcDriverClass", jdbcType = JdbcType.VARCHAR),
- @Result(column = "remark", property = "remark", jdbcType = JdbcType.VARCHAR),
- @Result(column = "creator", property = "creator", jdbcType = JdbcType.BIGINT),
- @Result(column = "create_time", property = "createTime", jdbcType = JdbcType.TIMESTAMP),
- @Result(column = "update_time", property = "updateTime", jdbcType = JdbcType.TIMESTAMP),
- @Result(column = "status", property = "status", jdbcType = JdbcType.TINYINT)})
- List<JdbcDataSource> list();
- /**
- * id 查找
- *
- * @param id
- * id
- * @return 数据源
- */
- @Select("select * from ge_meta_datasource where id = #{id,jdbcType=BIGINT} ")
- @ResultMap("resultMap")
- JdbcDataSource find(Long id);
- }
-
复制代码
DataSourceSelectForm
- package com.geespace.microservices.datasource.form.datasource;
-
- import javax.validation.constraints.NotNull;
-
- import lombok.Data;
-
- /**
- * @Author: zjr
- * @Date: 2020-04-09 14:13
- * @Version 1.0
- */
- @Data
- public class DataSourceSelectForm {
- /**
- * 数据源名称模糊查询
- */
- private String sourceName;
- /**
- * 创建者
- */
- private Long creator;
- /**
- * 页码
- */
- @NotNull(message = "pageSize不能为空")
- private int pageSize;
- /**
- * 每页数据量
- */
- @NotNull(message = "pageNum不能为空")
- private int pageNum;
- }
-
复制代码
DataSourceAddForm
- package com.geespace.microservices.datasource.form.datasource;
-
- import javax.validation.constraints.NotBlank;
-
- import lombok.Data;
-
- /**
- * @Author: zjr
- * @Date: 2020-04-07 17:46
- * @Version 1.0
- */
- @Data
- public class DataSourceAddForm {
- /**
- * 数据源类型
- */
- @NotBlank(message = "数据源类型不能为空")
- private String sourceType;
- /**
- * 数据源名称
- */
- @NotBlank(message = "数据源名称不能为空")
- private String sourceName;
- /**
- * jdbc url
- */
- private String jdbcUrl;
- /**
- * 用户名
- */
- private String userName;
- /**
- * 密码
- */
- private String password;
- /**
- * zk地址
- */
- private String zkAddress;
- /**
- * hbase znode
- */
- private String znode;
- /**
- * 数据库名称
- */
- private String databaseName;
- /**
- * 驱动类
- */
- private String jdbcDriverClass;
- /**
- * 备注
- */
- private String remark;
- /**
- * 创建者
- */
- private Long creator;
-
- }
-
复制代码
DataSourceUpdateForm
- package com.geespace.microservices.datasource.form.datasource;
-
- import javax.validation.constraints.NotBlank;
- import javax.validation.constraints.NotNull;
-
- import lombok.Data;
-
- /**
- * @Author: zjr
- * @Date: 2020-04-07 17:46
- * @Version 1.0
- */
- @Data
- public class DataSourceUpdateForm {
- /**
- * id
- */
- @NotNull(message = "id不能为空")
- private Long id;
- /**
- * 数据源类型
- */
- @NotBlank(message = "数据源类型不能为空")
- private String sourceType;
- /**
- * 数据源名称
- */
- @NotBlank(message = "数据源名称不能为空")
- private String sourceName;
- /**
- * jdbc url
- */
- private String jdbcUrl;
- /**
- * 用户名
- */
- private String userName;
- /**
- * 密码
- */
- private String password;
- /**
- * zk地址
- */
- private String zkAddress;
- /**
- * hbase znode
- */
- private String znode;
- /**
- * 数据库名称
- */
- private String databaseName;
- /**
- * 驱动类
- */
- private String jdbcDriverClass;
- /**
- * 备注
- */
- private String remark;
- /**
- * 创建者
- */
- private Long creator;
- }
-
复制代码
YAPI测试用例
5.1查询全部同步任务配置(分页)
- {
- "pageNum": 1,
- "pageSize": 10,
- "projectId": 28,
- "name": "测试"
- }
复制代码
5.2 创建同步任务配置-mysql->mysql
- {
- "name": "测试同步任务配置-mysql-mysql-1",
- "description": "测试同步任务配置-mysql-mysql-1",
- "projectName": "test测试1",
- "projectId": 28,
- "syncType": 2,
- "readerConfigId": 1,
- "readerParam": {
- "table": "test_test"
- },
- "writerConfigId": 1,
- "writerParam": {
- "table": "test_test_1"
- },
- "columnMap": [
- {
- "reader": "id",
- "writer": "id"
- },
- {
- "reader": "name",
- "writer": "name"
- }
- ]
- }
复制代码
5.3 创建同步任务配置-hbase->hbase
- {
- "name": "测试同步任务配置-hbase-hbase-1",
- "description": "测试同步任务配置-hbase-hbase-1",
- "projectName": "test测试1",
- "projectId": 28,
- "syncType": 2,
- "readerConfigId": 130,
- "readerParam": {
- "table": "test_test"
- },
- "writerConfigId": 130,
- "writerParam": {
- "table": "test_test_1",
- "rowkeyColumns": [
- "f:id",
- "f:name"
- ]
- },
- "columnMap": [
- {
- "reader": "f:id",
- "writer": "f:id"
- },
- {
- "reader": "f:name",
- "writer": "f:name"
- }
- ]
- }
复制代码
5.4 创建同步任务配置-mysql->hbase
- {
- "name": "测试同步任务配置-mysql-hbase-1",
- "description": "测试同步任务配置mysql-hbase-1",
- "projectName": "test测试1",
- "projectId": 28,
- "syncType": 2,
- "readerConfigId": 1,
- "readerParam": {
- "table": "test_test"
- },
- "writerConfigId": 130,
- "writerParam": {
- "table": "test_test_1",
- "rowkeyColumns": [
- "f:id",
- "f:name"
- ]
- },
- "columnMap": [
- {
- "reader": "id",
- "writer": "f:id"
- },
- {
- "reader": "name",
- "writer": "f:name"
- }
- ]
- }
复制代码
5.5 创建同步任务配置-hbase->mysql
- {
- "name": "测试同步任务配置-hbase-mysql-1",
- "description": "测试同步任务配置-hbase-mysql-1",
- "projectName": "test测试1",
- "projectId": 28,
- "syncType": 2,
- "readerConfigId": 130,
- "readerParam": {
- "table": "test_test",
- "rowkeyColumns": [
- "f:id",
- "f:name"
- ]
- },
- "writerConfigId": 1,
- "writerParam": {
- "table": "test_test_1"
- },
- "columnMap": [
- {
- "reader": "f:id",
- "writer": "id"
- },
- {
- "reader": "f:name",
- "writer": "name"
- }
- ]
- }
复制代码
5.6 更新同步任务配置
- {
- "id": 82,
- "name": "测试同步任务配置-mysql-3",
- "description": "测试同步任务配置-mysql-3",
- "projectName": "数据同步任务",
- "projectId": 19,
- "syncType": 2,
- "readerConfigId": 1,
- "readerParam": {
- "table": "test_test"
- },
- "writerConfigId": 1,
- "writerParam": {
- "table": "test_test_1"
- },
- "columnMap": [
- {
- "reader": "id",
- "writer": "id"
- },
- {
- "reader": "name",
- "writer": "name"
- }
- ]
- }
复制代码
5.7 删除同步任务配置
5.8 查询同步任务配置
5.9 执行数据同步任务
5.10 停止数据同步任务
作者:刘大猫.
来源:https://blog.csdn.net/a924382407/article/details/120951230
最新经典文章,欢迎关注公众号
|