-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathCloudQueryCLI.java
More file actions
126 lines (112 loc) · 5.02 KB
/
CloudQueryCLI.java
File metadata and controls
126 lines (112 loc) · 5.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package io.kestra.plugin.cloudquery;
import java.util.HashMap;
import java.util.List;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.*;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.scripts.exec.scripts.models.DockerOptions;
import io.kestra.plugin.scripts.exec.scripts.models.ScriptOutput;
import io.kestra.plugin.scripts.exec.scripts.runners.CommandsWrapper;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import io.kestra.core.models.annotations.PluginProperty;
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Run CloudQuery CLI commands",
description = "Executes provided CloudQuery CLI commands in order using /bin/sh -c inside the task runner container; honors rendered env vars and input files."
)
@Plugin(
examples = {
@Example(
title = "Run a CloudQuery sync from CLI. You need an [API key](https://docs.cloudquery.io/docs/deployment/generate-api-key) to download plugins. You can add the API key as an environment variable called `CLOUDQUERY_API_KEY`.",
full = true,
code = """
id: cloudquery_sync_cli
namespace: company.team
tasks:
- id: hn_to_duckdb
type: io.kestra.plugin.cloudquery.CloudQueryCLI
env:
CLOUDQUERY_API_KEY: "{{ secret('CLOUDQUERY_API_KEY') }}"
inputFiles:
config.yml: |
kind: source
spec:
name: hackernews
path: cloudquery/hackernews
version: v3.0.13
tables: ["*"]
backend_options:
table_name: cq_cursor
connection: "@@plugins.duckdb.connection"
destinations:
- "duckdb"
spec:
item_concurrency: 100
start_time: "{{ now() | dateAdd(-1, 'DAYS') }}"
---
kind: destination
spec:
name: duckdb
path: cloudquery/duckdb
version: v4.2.10
write_mode: overwrite-delete-stale
spec:
connection_string: hn.db
commands:
- cloudquery sync config.yml --log-console"""
)
}
)
public class CloudQueryCLI extends AbstractCloudQueryCommand implements RunnableTask<ScriptOutput>, NamespaceFilesInterface, InputFilesInterface, OutputFilesInterface {
@Schema(
title = "Commands to execute",
description = "Shell commands executed sequentially; include the cloudquery binary (aliased to /app/cloudquery) and any arguments."
)
@NotNull
@PluginProperty(group = "main")
protected Property<List<String>> commands;
@PluginProperty(group = "source")
private NamespaceFiles namespaceFiles;
@PluginProperty(group = "source")
private Object inputFiles;
@PluginProperty(group = "destination")
private Property<List<String>> outputFiles;
@Override
public ScriptOutput run(RunContext runContext) throws Exception {
var renderedOutputFiles = runContext.render(this.outputFiles).asList(String.class);
CommandsWrapper commands = new CommandsWrapper(runContext)
.withWarningOnStdErr(true)
.withDockerOptions(injectDefaults(getDocker()))
.withTaskRunner(this.getTaskRunner())
.withContainerImage(runContext.render(this.getContainerImage()).as(String.class).orElseThrow())
.withInterpreter(Property.ofValue(List.of("/bin/sh", "-c")))
.withBeforeCommands(Property.ofValue(List.of("alias cloudquery='/app/cloudquery'")))
.withCommands(this.commands)
.withEnv(runContext.render(this.getEnv()).asMap(String.class, String.class).isEmpty() ? new HashMap<>() : runContext.render(this.getEnv()).asMap(String.class, String.class))
.withInputFiles(inputFiles)
.withOutputFiles(renderedOutputFiles.isEmpty() ? null : renderedOutputFiles);
return commands.run();
}
@Override
protected DockerOptions injectDefaults(DockerOptions original) {
if (original == null) {
return null;
}
if (original.getEntryPoint() == null || original.getEntryPoint().isEmpty()) {
original = original.toBuilder().entryPoint(List.of("")).build();
}
return super.injectDefaults(original);
}
}