Skip to content

Commit 5863426

Browse files
Merge pull request #6 from RobertBHamilton/h2
H2 first wave. Support for H2 added as an optional backend database for the DataFlow tool.
2 parents 98753b8 + 7b14ff0 commit 5863426

32 files changed

Lines changed: 912 additions & 276 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,4 @@ scratch
1616
# JDT-specific (Eclipse Java Development Tools)
1717
.classpath
1818
lib
19+
dataflow.*

QUICKSTART.md

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
# OpenDataFlow
2+
3+
## Overview
4+
5+
OpenDataFlow is a lightweight orchestration utility that runs and coordinates batch jobs over partitioned or time-sliced data so teams can schedule, recover, and migrate large data-processing pipelines without changing their ETL code.
6+
7+
This quickstart uses H2 for simplicity.
8+
9+
It has been tested on Ubuntu and runs in a bash shell.
10+
11+
1. Requirements to run the demo:
12+
13+
- bash and the standard command line utilities
14+
- jq (`sudo apt install -y jq`)
15+
- The DataFlow jar: dataflow-1.0.0.jar
16+
- the decryption passkey in $PASSKEY environment variable
17+
` export PASSKEY=plugh `
18+
- The two supplied scripts: `utility.sh` and `RunJob`
19+
20+
Have jq on the your path, and put the dataflow-1.0.0.jar in the same directory as the scripts.
21+
22+
2. Set up the h2 database, schema and tables:
23+
24+
``` ./utility.sh createtables```
25+
26+
The initial connection to H2 creates the database, schema, and user automatically
27+
The createtables utility creates the standard dataflow tables in the database.
28+
29+
3. Configure the 'loadbob' job and the datasets that it uses
30+
```
31+
./utility.sh dml "insert into dataset (datasetid) values ('bobin')"
32+
./utility.sh dml "insert into dataset (datasetid) values ('bobout')"
33+
./utility.sh dml "insert into job (datasetid,itemtype,jobid) values ('bobout','OUT','loadbob')"
34+
./utility.sh dml "insert into job (datasetid,itemtype,jobid) values ('bobin' ,'IN', 'loadbob')"
35+
```
36+
These insert test data into the schema that are enough to simulate a run.
37+
38+
The first two commands register two datasets named 'bobin' and 'bobout'.
39+
The second two commands associates bobin and bobout as input and output data sets respectively with the job named 'loadbob'
40+
These inserts should only happen when one time to configure the job and datasets.
41+
42+
4. Set a status for the input dataset
43+
44+
```
45+
./utility.sh dml "insert into datastatus (dataid,datasetid,jobid,locktype,modified,status) values ('1.0','bobin','fakejob', 'OUT',now(),'READY')"
46+
```
47+
48+
49+
We give it a fake dataid, and specify a fakejob that "produced" it. The status of READY for an OUT data chunk means that it is ready and safe to bebe consumed.
50+
51+
5. "Write" a `loadbob.sh` to run. In this example it is just a one-liner that outputs some of the automatic environment variables.
52+
53+
```
54+
echo 'echo "running loadbob with dataid $dataid partition of input $bobin_DATASETID"' > loadbob.sh && chmod +x loadbob.sh
55+
```
56+
57+
One important note: the jobid is **inferred** from the name of the script. That means that if our jobid is 'loadbob' then the script has to be named 'oadbob.sh'. This is mandatory, but is just the way that the RunJob script is written. The intent is to keep it simple so that the only parameter to RunJob is the script name.
58+
59+
6. Run the job with RunJob
60+
61+
```
62+
RunJob ./loadbob.sh
63+
```
64+
65+
Output should look like this:
66+
67+
```text
68+
Mon Dec 1 04:08:50 PM CST 2025: Launching ./loadbob.sh with dataid 1.0
69+
running loadbob with dataid 1.0 partition of input bobin
70+
Mon Dec 1 04:08:50 PM CST 2025: Job ./loadbob.sh is complete. Updating status
71+
1 rows updated to READY for loadbob and 1.0 1 IN file-local locks released
72+
```
73+
Two log-style messages, confirming the start and end of the loadbob job, and the one line output by the `loadbob.sh` script
74+
The last line informational message indicating that DataFlow has set the final status
75+
76+
77+
7. Checks:
78+
do `RunJob ./loadbob.sh` a second time, and confirm that it will refuse to do a duplicate run.
79+
check the data with utility:
80+
81+
```text
82+
./utility.sh runs
83+
DATAID DATASETID JOBID LOCKTYPE MODIFIED STATUS
84+
------ --------- ----- -------- -------- ------
85+
1.0 bobout loadbob OUT 2025-12-01 16:08:49.740813 READY
86+
1.0 bobin fakejob OUT 2025-12-01 15:46:19.56124 READY
87+
```
88+
89+
Check the data with direct SQL's:
90+
91+
```text
92+
utility.sh sql "select * from datastatus"
93+
DATAID DATASETID JOBID LOCKTYPE MODIFIED STATUS
94+
------ --------- ----- -------- -------- ------
95+
1.0 bobin fakejob OUT 2025-12-01 15:46:19.56124 READY
96+
1.0 bobout loadbob OUT 2025-12-01 16:08:49.740813 READY
97+
98+
```
99+
100+
101+
## Remarks
102+
103+
* We started with just the jar file and had to manually create the schema and tables. But if you build the package with maven, the tests will build the H2 database, schema, tables, user and password for you, and the dataflow-1.0.0.jar will be in utilities/target/dataflow-1.0.0.jar
104+
105+
* Access to the h2 database for testing is through user ETL and password which was encrypted using the default passkey 'plugh'. You should encrypt your own password using your own passkey and put it into the dataflow.properties as soon as possible.
106+
The encrypted password and other connection information is in core/src/main/resources/dataflow.properties. You can copy it to your working directory and modify it, and the utilities will override the core/ properties file if they find this one. The encryption is easily done because it is one of the functions published by the utilities.sh tool.
107+
108+
109+
* In normal day-to-day operation you **never** need to update or insert the datastatus table, not in your code, not manually the way we did in this example. RunJob handles that for you. The inserts to job and dataset tables are one-time things to register and confure the datasets or to set up a test case.
110+
In exceptional cases such as handling errors, you encounter a job in FAILED state. If in that case you want the job to run again, you can reset the job to RESUBMIT. You can either do a dml command, like `utility.sh dml 'update datastatus set status to RESUBMIT where jobid='loadbob' and dataid='1.0'` though I would just endjob utility command:
111+
```
112+
utility.sh endjob loadbob 1.0 RESUBMIT
113+
```
114+
The big advantage is that you don't have to ask the scheduling team to make any changes, and you don't have to worry about command line parameters because there are not any. If the job is scheduled to run multiple times a day, then it will just catch up the next time it runs, and there are no changes in production at all except for the RESUBMIT status. That means you avoid an enormous amount of red tape and committee meetings just to rerun the job.
115+
116+
117+
* The actual dataset record has fields for things like hostname, database name, schema, table, username and (***encrypted***) password. These all appear as automatic variables to your script. This avoids all issues related to the temptation of hardcoding this metadata, the headaches involved with maintaining it, possible errors in connection strings, and having to make changes when moving from development to production.
118+
It is possible, and in our opinion best practice to ***hard-code nothing in your script***. Get it all from the metadata that DataFlow provides. For one thing if you have one job producing data and another job consuming it, you are now using named datasets and so both jobs are guaranteed to be using the same dataset. Almost no chance of second job picking up the wrong data because of a misconfiguration.
119+
Not only that, the framework guarantees that the second job will not have any false starts while the first job is running or in any error state.
120+
121+
* The dataset metadata are not restricted to only jdbc connections. They can be repurposed to file system paths, web page urls, tcp endpoints, what have you. The semantics is entirely up to the consumer (the ETL script) which can do whatever they want with it. DataFlow doesn't use it at all.
122+
123+
* Some of the other examples in the examples directory illustrate these points.
124+
125+

README.md

Lines changed: 56 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -87,23 +87,70 @@ See the examples/ directory for exact variable names and sample scripts that rea
8787
Security note: the decrypted credential is provided only at runtime in the job process’s environment. Be careful not to echo or persist it in logs. Keep the encryption key used to create the encrypted password secret.
8888

8989
### Quick start (5 minutes)
90+
For the quick start we use H2 in file mode. It is fast and easy because H2 creates you database, schema, user with password automatically on the first connect. We have a utilty that creates the H2 tables next.
91+
For featherweight implementations, H2 will work pretty well. If you need robustness, scalability, and good transaction semantics then you will need posgress.
92+
also see the file QUICKSTART.md for more details.
93+
9094
1. Prerequisites
9195
- Linux (Ubuntu tested), bash, maven
9296
- jq (sudo apt install -y jq)
93-
- PostgreSQL (or run a local container)
94-
- json-java (json-20250517.jar) and the Postgres JDBC driver (e.g., postgresql-42.7.3.jar)
95-
- A Postgres database named `dataflow`, schema `dataflow`, and a user `etl`
9697

97-
2. Run a local Postgres (optional)
98+
2. Build
99+
mvn package
100+
101+
3. Create the DataFlow tables
102+
export PASSKEY=plugh (this is default but it should be changed )
103+
./utility.sh createtables
104+
105+
Your DataFlow tool is now configured and ready to run.
106+
107+
### Test
108+
109+
Test has a job (loadbob), which takes one input dataset (bobin) and one output dataset(bobout)
110+
We are not actually testing an ETL job. We want to test that the framework is calling it correctly and supplying the necessary dataid and dataset metadata.
111+
112+
1. Register two dataset
113+
```
114+
./utility.sh dml "insert into dataset (datasetid) values ('bobin')"
115+
./utility.sh dml "insert into dataset (datasetid) values ('bobout')"
116+
```
117+
2. Associate them to the loadbob job.
118+
```
119+
./utility.sh dml "insert into job (datasetid,itemtype,jobid) values ('bobout','OUT','loadbob')"
120+
./utility.sh dml "insert into job (datasetid,itemtype,jobid) values ('bobin' ,'IN', 'loadbob')"
121+
```
122+
3. Mark the input set as READY (just for the test. In real jobs some other job has completed and marked it)
123+
124+
```
125+
./utility.sh dml "insert into datastatus (dataid,datasetid,jobid,locktype,modified,status) values ('1.0','bobin','fakejob', 'OUT',now(),'READY')"
126+
```
127+
4. Job should be ready to run, now that it has an input data set in READY status. Perform the test now with RunJob
128+
```
129+
RunJob ./loadbob.sh
130+
```
131+
132+
Output should look like this:
133+
134+
```text
135+
Mon Dec 1 04:08:50 PM CST 2025: Launching ./loadbob.sh with dataid 1.0
136+
running loadbob with dataid 1.0 partition of input bobin
137+
Mon Dec 1 04:08:50 PM CST 2025: Job ./loadbob.sh is complete. Updating status
138+
1 rows updated to READY for loadbob and 1.0 1 IN file-local locks released
139+
```
140+
Two log-style messages, confirming the start and end of the loadbob job, and the one line output by the `loadbob.sh` script
141+
The last line informational message indicating that DataFlow has set the final status
142+
143+
### Postgres
144+
For this you need a postgres database up and running. The absolute easiest way is to spin up a container
145+
146+
1. Run a local Postgres (optional)
98147
podman run -p 5432:5432 --name pg -e POSTGRES_PASSWORD=secretpass -d docker.io/postgres
99148

100-
3. Build
101-
mvn package
102149

103-
4. Initialize database
150+
2. Initialize database. Create a user, ETL and his password.
104151
Connect with psql and run docs/create_tables.sql. See docs/datamodel.txt for schema notes.
105152

106-
5. Configure
153+
3. Configure
107154
Encrypt the DB password with the included Cryptor class:
108155
java -cp app/target/app-1.0.0.jar com.hamiltonlabs.dataflow.utility.Cryptor -e <key> "<password>"
109156
Create the file **dataflow.properties** and place the url,user,schema, and encrypted fields. This tells the utility how to access the dataflow database.
@@ -116,7 +163,7 @@ Security note: the decrypted credential is provided only at runtime in the job p
116163
```
117164
Keep the encryption key private.
118165

119-
7. Run your job
166+
4. Run your job
120167
Make your ETL script executable (e.g., myETL.sh) and invoke it via:
121168
RunJob myETL.sh
122169

RunJob

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
# Added to make cronjobs easier to set CLASSPATH
88
thisdir=`dirname ${BASH_SOURCE[0]}`
99
cd $thisdir
10-
echo "running from working directory `pwd`"
1110

1211
# support alternate way to pass the key
1312
if [ -z "$PASSKEY" ];then
@@ -24,7 +23,18 @@ jobid=${jobid%.*} #strip off the extention
2423
shift 1
2524
export args="$@"
2625
#
27-
export CLASSPATH=bin/postgresql-42.7.3.jar:bin/json-20250517.jar:utility/target/utility-1.0.0.jar
26+
#export CLASSPATH=bin/postgresql-42.7.3.jar:bin/json-20250517.jar:utility/target/utility-1.0.0.jar
27+
#export CLASSPATH=bin/json-20250517.jar:bin/h2-2.2.224.jar:utility/target/utility-1.0.0.jar
28+
# get the jar file either in current directory or in canonical build location
29+
jarfile="`ls utility/target/dataflow*jar dataflow*jar 2>/dev/null|tail -1`"
30+
if [ -f "$jarfile" ];then
31+
export jarc="java -jar $jarfile $PASSKEY "
32+
else
33+
echo we need the dataflow-version.jar to run this utility. Cannot continue
34+
exit
35+
fi
36+
export CLASSPATH=$jarfile
37+
2838

2939
getEnvJSON(){
3040
#java com.hamiltonlabs.dataflow.utility.GetJobData $jobid $passkey
@@ -41,7 +51,7 @@ java com.hamiltonlabs.dataflow.utility.Cryptor -d $passkey "$1"
4151
declareEnv(){
4252
#stale dataid will result in false executions so make sure we only set it here
4353
unset dataid
44-
getEnvJSON|jq -c '.[] | to_entries[]' | while read -r entry; do
54+
getEnvJSON|tail -1|jq -c '.[] | to_entries[]' | while read -r entry; do
4555

4656

4757
key=$(echo "$entry" | jq -r '.key')
@@ -65,23 +75,22 @@ declareEnv(){
6575
done
6676
}
6777

78+
# test
6879
env="`declareEnv`"
6980
# uncomment if you need to debug echo "$env"
81+
7082
source <(echo "$env")
7183
if [ -z "$dataid" ];then
7284
echo "`date`: no suitable data available for job. Not running it"
7385
else
7486

75-
echo "`date`: Launching $cmd with dataid $dataid"
76-
# lock our files
77-
#java com.hamiltonlabs.dataflow.utility.SetJobStart $passkey $jobid $dataid
78-
echo runs "`./utility.sh runs`"
79-
eval "$cmd $args"
80-
if [ $? -eq 0 ];then
87+
echo "`date`: Launching $cmd with dataid $dataid"
88+
eval "$cmd $args"
89+
if [ $? -eq 0 ];then
8190
echo "`date`: Job $cmd is complete. Updating status"
8291
java -cp $CLASSPATH:app.jar com.hamiltonlabs.dataflow.utility.SetJobEndStatus $passkey $jobid $dataid READY
83-
else
92+
else
8493
echo "`date`: Job $cmd has failed. Updating status"
8594
java -cp $CLASSPATH:app.jar com.hamiltonlabs.dataflow.utility.SetJobEndStatus $passkey $jobid $dataid FAILED
86-
fi
95+
fi
8796
fi

bin/json-20250517.jar

-80.8 KB
Binary file not shown.

bin/postgresql-42.7.3.jar

-1.04 MB
Binary file not shown.

changes.log

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,12 @@ NOTE: for the script we have added a new dependency. It uses jq.
5353
The messy problems of 11-09 are fixed.
5454
2025-11-17 Change RunJob to use simpler syntax. Now we just to RunJob myscript.sh and it infers the runid from the script name.
5555
To use the old method, which allows arbitrary commands like "echo or set", use SRUN jobid cmd
56-
Updated the help in utility jar, added deleterun function. Loading properties file now as a resource.
56+
Updated the help in utility jar, added deleterun function. Loading dataflow.properties file now optionally as a resource in jar.
57+
58+
Remove passkey on command line utilities.sh and RunJob entirely and require it to be environment only.
59+
60+
2025-11-21 Added useful functions such as ForceRun and simplified RunJob, and added clear examples, added very useful automatic dataset called 'today' which can be used for daily runs to get an automatic dataid. Merged the service_cleaup mod into main.
61+
62+
2025-11-29 (dev) new "h2" dev branch. Added h2 support, for unit testing and as a light-weight implementation. Created platform module for platform-dependent SQL. Unit test for critical DataFlow.launchJob method.
63+
5764

58-
TODO: I think we should remove passkey on command line entirely and force it to be environment only.
59-

core/dataflow.mv.db

16 KB
Binary file not shown.

core/pom.xml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,21 @@
1919
<artifactId>json</artifactId>
2020
<version>20250517</version>
2121
</dependency>
22+
<dependency>
23+
<groupId>com.hamiltonlabs</groupId>
24+
<artifactId>platform</artifactId>
25+
<version>1.0.0</version>
26+
</dependency>
27+
<dependency>
28+
<groupId>org.postgresql</groupId>
29+
<artifactId>postgresql</artifactId>
30+
<version>42.7.3</version>
31+
</dependency>
32+
<dependency>
33+
<groupId>com.h2database</groupId>
34+
<artifactId>h2</artifactId>
35+
<version>2.2.224</version>
36+
</dependency>
2237
</dependencies>
2338
</project>
2439

core/src/main/java/com/hamiltonlabs/dataflow/core/CredentialProvider.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22

33
import java.util.Properties;
44
import java.io.InputStream;
5-
//import java.io.FileInputStream;
6-
5+
import java.io.FileInputStream;
6+
import java.io.File;
77
/** Provide credentials for all OpenDataFlow access
88
* Credentials are user/password. They are returned in a java.util.Properties object.
99
*
@@ -47,7 +47,12 @@ public static String getPass(String passphrase,String encryptedPassword) throws
4747
*/
4848
public static Properties getCredentials(String passphrase,String propertiesPath) throws java.security.GeneralSecurityException,java.io.IOException {
4949
Properties properties=new Properties();
50-
// properties.load(new FileInputStream(filepath));
50+
//properties.load(new FileInputStream(filepath));
51+
File file=new File(propertiesPath);
52+
if (file.exists()){
53+
properties.load(new FileInputStream(file));
54+
return updateDecrypted(passphrase,properties);
55+
}
5156
try (InputStream input = CredentialProvider.class.getClassLoader().getResourceAsStream(propertiesPath)) {
5257
if (input == null) {
5358
System.out.printf("Unable to find resource %s\n",propertiesPath);

0 commit comments

Comments
 (0)