Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,21 @@ public class MysqlDagActionStore implements DagActionStore {
protected final DataSource dataSource;
private final DBStatementExecutor dbStatementExecutor;
private final String tableName;
private static final String UTF8_BINARY_COLLATION = " CHARACTER SET utf8mb4 COLLATE utf8mb4_bin";
private static final String EXISTS_STATEMENT = "SELECT EXISTS(SELECT * FROM %s WHERE flow_group = ? AND flow_name = ? AND flow_execution_id = ? AND job_name = ? AND dag_action = ?)";

protected static final String INSERT_STATEMENT = "INSERT INTO %s (flow_group, flow_name, flow_execution_id, job_name, dag_action) "
+ "VALUES (?, ?, ?, ?, ?)";
private static final String DELETE_STATEMENT = "DELETE FROM %s WHERE flow_group = ? AND flow_name =? AND flow_execution_id = ? AND job_name = ? AND dag_action = ?";
private static final String GET_ALL_STATEMENT = "SELECT flow_group, flow_name, flow_execution_id, job_name, dag_action FROM %s";
private static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s (" +
"flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, flow_name varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, "
+ "flow_execution_id varchar(" + ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL, "
+ "job_name varchar(" + ServiceConfigKeys.MAX_JOB_NAME_LENGTH + ") NOT NULL, "
+ "dag_action varchar(50) NOT NULL, modified_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP NOT NULL, "
"flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ")" + UTF8_BINARY_COLLATION
+ " NOT NULL, flow_name varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ")" + UTF8_BINARY_COLLATION
+ " NOT NULL, flow_execution_id varchar(" + ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ")"
+ UTF8_BINARY_COLLATION + " NOT NULL, "
+ "job_name varchar(" + ServiceConfigKeys.MAX_JOB_NAME_LENGTH + ")" + UTF8_BINARY_COLLATION + " NOT NULL, "
+ "dag_action varchar(50)" + UTF8_BINARY_COLLATION
+ " NOT NULL, modified_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP NOT NULL, "
+ "PRIMARY KEY (flow_group,flow_name,flow_execution_id,job_name,dag_action))";

// Deletes rows older than retention time period (in seconds) to prevent this table from growing unbounded.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,13 @@ public class MysqlDagStateStoreWithDagNodes implements DagStateStoreWithDagNodes
protected final String tableName;
protected final GsonSerDe<List<JobExecutionPlan>> serDe;
private final JobExecutionPlanDagFactory jobExecPlanDagFactory;
// utf8mb3 keeps the long dag_node_id primary key under MySQL's 3072-byte index limit while supporting em-dash names.
private static final String UTF8_BINARY_COLLATION = " CHARACTER SET utf8 COLLATE utf8_bin";

// todo add a column that tells if it is a running dag or a failed dag
protected static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s ("
+ "dag_node_id VARCHAR(" + ServiceConfigKeys.MAX_DAG_NODE_ID_LENGTH + ") CHARACTER SET latin1 COLLATE latin1_bin NOT NULL, "
+ "parent_dag_id VARCHAR(" + ServiceConfigKeys.MAX_DAG_ID_LENGTH + ") NOT NULL, "
+ "dag_node_id VARCHAR(" + ServiceConfigKeys.MAX_DAG_NODE_ID_LENGTH + ")" + UTF8_BINARY_COLLATION + " NOT NULL, "
+ "parent_dag_id VARCHAR(" + ServiceConfigKeys.MAX_DAG_ID_LENGTH + ")" + UTF8_BINARY_COLLATION + " NOT NULL, "
+ "dag_node JSON NOT NULL, "
+ "modified_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, "
+ "PRIMARY KEY (dag_node_id), "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,12 @@ public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
default auto-update/initialization values
- We desire millisecond level precision and denote that with `(3)` for the TIMESTAMP types
*/
private static final String UTF8_BINARY_COLLATION = " CHARACTER SET utf8mb4 COLLATE utf8mb4_bin";
private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s ("
+ "flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, flow_name varchar("
+ ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " + "job_name varchar("
+ ServiceConfigKeys.MAX_JOB_NAME_LENGTH + ") NOT NULL, dag_action varchar(100) NOT NULL, "
+ "flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ")" + UTF8_BINARY_COLLATION
+ " NOT NULL, flow_name varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ")" + UTF8_BINARY_COLLATION
+ " NOT NULL, " + "job_name varchar(" + ServiceConfigKeys.MAX_JOB_NAME_LENGTH + ")" + UTF8_BINARY_COLLATION
+ " NOT NULL, dag_action varchar(100)" + UTF8_BINARY_COLLATION + " NOT NULL, "
+ "event_timestamp TIMESTAMP(3) NOT NULL, "
+ "lease_acquisition_timestamp TIMESTAMP(3) NULL, "
+ "PRIMARY KEY (flow_group,flow_name,job_name,dag_action))";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.google.common.base.Optional;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValueFactory;

import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -76,10 +77,20 @@ public class DagProcUtils {
*/
public static void submitNextNodes(DagManagementStateStore dagManagementStateStore, Dag<JobExecutionPlan> dag,
Dag.DagId dagId) throws IOException {
submitNextNodes(dagManagementStateStore, dag, dagId, DagActionStore.LeaseParams.UNKNOWN_STORE_INSERT_TIME_MILLIS);
}

public static void submitNextNodes(DagManagementStateStore dagManagementStateStore, Dag<JobExecutionPlan> dag,
Dag.DagId dagId, DagActionStore.LeaseParams leaseParams) throws IOException {
submitNextNodes(dagManagementStateStore, dag, dagId, getStoreInsertTimeMillis(leaseParams));
}

private static void submitNextNodes(DagManagementStateStore dagManagementStateStore, Dag<JobExecutionPlan> dag,
Dag.DagId dagId, long storeInsertTimeMillis) throws IOException {
Set<Dag.DagNode<JobExecutionPlan>> nextNodes = DagUtils.getNext(dag);
if (nextNodes.size() == 1) {
Dag.DagNode<JobExecutionPlan> dagNode = nextNodes.iterator().next();
DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode, dagId);
DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode, dagId, storeInsertTimeMillis);
} else {
for (Dag.DagNode<JobExecutionPlan> dagNode : nextNodes) {
JobExecutionPlan jobExecutionPlan = dagNode.getValue();
Expand All @@ -98,9 +109,21 @@ public static void submitNextNodes(DagManagementStateStore dagManagementStateSto
*/
public static void submitJobToExecutor(DagManagementStateStore dagManagementStateStore, Dag.DagNode<JobExecutionPlan> dagNode,
Dag.DagId dagId) {
submitJobToExecutor(dagManagementStateStore, dagNode, dagId,
DagActionStore.LeaseParams.UNKNOWN_STORE_INSERT_TIME_MILLIS);
}

public static void submitJobToExecutor(DagManagementStateStore dagManagementStateStore, Dag.DagNode<JobExecutionPlan> dagNode,
Dag.DagId dagId, DagActionStore.LeaseParams leaseParams) {
submitJobToExecutor(dagManagementStateStore, dagNode, dagId, getStoreInsertTimeMillis(leaseParams));
}

private static void submitJobToExecutor(DagManagementStateStore dagManagementStateStore,
Dag.DagNode<JobExecutionPlan> dagNode, Dag.DagId dagId, long storeInsertTimeMillis) {
DagUtils.incrementJobAttempt(dagNode);
JobExecutionPlan jobExecutionPlan = DagUtils.getJobExecutionPlan(dagNode);
JobSpec jobSpec = DagUtils.getJobSpec(dagNode);
stampDagActionStoreInsertTimeMillis(jobSpec, storeInsertTimeMillis);
Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);

String specExecutorUri = DagUtils.getSpecExecutorUri(dagNode);
Expand Down Expand Up @@ -166,6 +189,22 @@ public static void submitJobToExecutor(DagManagementStateStore dagManagementStat
log.info("Submitted job {} for dagId {}", DagUtils.getJobName(dagNode), dagId);
}

static void stampDagActionStoreInsertTimeMillis(JobSpec jobSpec, long storeInsertTimeMillis) {
Config jobConfig = jobSpec.getConfig().withoutPath(ConfigurationKeys.DAG_ACTION_LAUNCH_STORE_INSERT_TIME_MILLIS_KEY);
if (storeInsertTimeMillis != DagActionStore.LeaseParams.UNKNOWN_STORE_INSERT_TIME_MILLIS) {
jobConfig = jobConfig.withValue(ConfigurationKeys.DAG_ACTION_LAUNCH_STORE_INSERT_TIME_MILLIS_KEY,
ConfigValueFactory.fromAnyRef(storeInsertTimeMillis));
}
jobSpec.setConfig(jobConfig);
jobSpec.setConfigAsProperties(ConfigUtils.configToProperties(jobConfig));
}

private static long getStoreInsertTimeMillis(DagActionStore.LeaseParams leaseParams) {
return leaseParams == null
? DagActionStore.LeaseParams.UNKNOWN_STORE_INSERT_TIME_MILLIS
: leaseParams.getStoreInsertTimeMillis();
}

public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel, DagManagementStateStore dagManagementStateStore) throws IOException {
Properties cancelJobArgs = new Properties();
String serializedFuture = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ protected void act(DagManagementStateStore dagManagementStateStore, Optional<Dag
"DAG could not be compiled for " + getDagId());
dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false);
} else {
DagProcUtils.submitNextNodes(dagManagementStateStore, dag.get(), getDagId());
DagProcUtils.submitNextNodes(dagManagementStateStore, dag.get(), getDagId(), getDagTask().getLeaseParams());
DagProcUtils.setAndEmitFlowEvent(eventSubmitter, dag.get(), TimingEvent.FlowTimings.FLOW_RUNNING);
dagManagementStateStore.getDagManagerMetrics().conditionallyMarkFlowAsState(DagUtils.getFlowId(dag.get()),
Dag.FlowState.RUNNING);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ protected void act(DagManagementStateStore dagManagementStateStore, Pair<Optiona
// but when reevaluate/resume/launch dag proc found multiple parallel jobs to run next, it creates reevaluate
// dag actions for each of those parallel job and in this scenario there is no job status available.
// If the job status is not present, this job was never launched, submit it now.
DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode, getDagId());
DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode, getDagId(), getDagTask().getLeaseParams());
dagProcEngineMetrics.markDagActionsAct(getDagActionType(), true);
return;
}
Expand Down Expand Up @@ -124,7 +124,7 @@ protected void act(DagManagementStateStore dagManagementStateStore, Pair<Optiona
// dag failed and is also not retryable. in that case if this job's retry passes, overall status of the dag can be
// set to PASS, which would be incorrect.
dag.setFlowEvent(null);
DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode, getDagId());
DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode, getDagId(), getDagTask().getLeaseParams());
} else if (DagProcUtils.isDagFinished(dag)) {
String flowEvent = DagProcUtils.calcFlowStatus(dag);
dag.setFlowEvent(flowEvent);
Expand Down Expand Up @@ -179,7 +179,7 @@ private void onJobFinish(DagManagementStateStore dagManagementStateStore, Dag.Da
case COMPLETE:
dagManagementStateStore.getDagManagerMetrics().incrementExecutorSuccess(dagNode);
if (!DagProcUtils.isDagFinished(dag)) { // this may fail when dag failure option is finish_running and some dag node has failed
DagProcUtils.submitNextNodes(dagManagementStateStore, dag, getDagId());
DagProcUtils.submitNextNodes(dagManagementStateStore, dag, getDagId(), getDagTask().getLeaseParams());
}
break;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ protected void act(DagManagementStateStore dagManagementStateStore, Optional<Dag
// if it fails here, it will check point the failed dag in the (running) dag store again, which is idempotent
dagManagementStateStore.deleteFailedDag(getDagId());

DagProcUtils.submitNextNodes(dagManagementStateStore, failedDag.get(), getDagId());
DagProcUtils.submitNextNodes(dagManagementStateStore, failedDag.get(), getDagId(), getDagTask().getLeaseParams());

DagProcUtils.sendEnforceFlowFinishDeadlineDagAction(dagManagementStateStore, getDagTask().getDagAction());
dagProcEngineMetrics.markDagActionsAct(getDagActionType(), true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.gobblin.config.ConfigBuilder;
Expand All @@ -43,9 +44,11 @@
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagManagerMetrics;
import org.apache.gobblin.service.modules.orchestration.DagUtils;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.commons.lang.StringUtils;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -125,6 +128,43 @@ public void testWhenSubmitToExecutorSuccess() throws URISyntaxException, IOExcep
Mockito.verifyNoMoreInteractions(dagManagementStateStore);
}

@Test
public void testSubmitJobToExecutorReplacesStaleStoreInsertTimeMillis()
throws URISyntaxException, IOException, ExecutionException, InterruptedException {
Dag.DagId dagId = new Dag.DagId("flowGroup3", "flowName3", 2345678);
long staleStoreInsertTimeMillis = 1730000000000L;
long freshStoreInsertTimeMillis = staleStoreInsertTimeMillis + 60000L;
JobExecutionPlan jobExecutionPlan = getJobExecutionPlans().get(2);
JobSpec jobSpec = jobExecutionPlan.getJobSpec();
Config staleJobConfig = jobSpec.getConfig().withValue(
ConfigurationKeys.DAG_ACTION_LAUNCH_STORE_INSERT_TIME_MILLIS_KEY,
ConfigValueFactory.fromAnyRef(staleStoreInsertTimeMillis));
jobSpec.setConfig(staleJobConfig);
jobSpec.setConfigAsProperties(ConfigUtils.configToProperties(staleJobConfig));
Dag.DagNode<JobExecutionPlan> dagNode = new Dag.DagNode<>(jobExecutionPlan);
DagManagerMetrics metrics = Mockito.mock(DagManagerMetrics.class);
Mockito.when(dagManagementStateStore.getDagManagerMetrics()).thenReturn(metrics);
SpecProducer<Spec> producer = DagUtils.getSpecProducer(dagNode);
Mockito.doReturn(CompletableFuture.completedFuture("urn")).when(producer).addSpec(Mockito.any(Spec.class));
DagActionStore.DagAction dagAction = new DagActionStore.DagAction("flowGroup3", "flowName3", 2345678, "job1",
DagActionStore.DagActionType.REEVALUATE);
DagActionStore.LeaseParams leaseParams = new DagActionStore.LeaseParams(dagAction, false,
System.currentTimeMillis(), freshStoreInsertTimeMillis);

DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode, dagId, leaseParams);

ArgumentCaptor<Spec> specCaptor = ArgumentCaptor.forClass(Spec.class);
Mockito.verify(producer).addSpec(specCaptor.capture());
JobSpec submittedJobSpec = (JobSpec) specCaptor.getValue();
Assert.assertEquals(
submittedJobSpec.getConfig().getLong(ConfigurationKeys.DAG_ACTION_LAUNCH_STORE_INSERT_TIME_MILLIS_KEY),
freshStoreInsertTimeMillis);
Assert.assertEquals(
submittedJobSpec.getConfigAsProperties()
.getProperty(ConfigurationKeys.DAG_ACTION_LAUNCH_STORE_INSERT_TIME_MILLIS_KEY),
String.valueOf(freshStoreInsertTimeMillis));
}

@Test(expectedExceptions = RuntimeException.class)
public void testWhenSubmitToExecutorGivesRuntimeException() throws URISyntaxException, IOException, ExecutionException, InterruptedException{
Dag.DagId dagId = new Dag.DagId("flowGroup3", "flowName3", 2345678);
Expand Down
Loading