Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 35 additions & 18 deletions src/datasets/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ use crate::python_runner;
use crate::runner_sse;
use crate::source_language::{classify_runtime_extension, SourceLanguage};
use crate::sync::discovery::{
discover_project_log_refs, ProjectLogRefDiscoveryResult, ProjectLogRefScope,
discover_project_log_refs, ProjectLogRefDiscoveryOptions, ProjectLogRefDiscoveryResult,
ProjectLogRefScope,
};
use crate::sync::{
artifact_base_dir, artifact_spec_dir, create_jsonl_file_writer, epoch_seconds, read_json_file,
Expand Down Expand Up @@ -280,10 +281,12 @@ pub async fn run(base: BaseArgs, args: PipelineArgs) -> Result<()> {
&args.runner,
&source_project.id,
&source,
refs,
args.transform.max_concurrency(),
Some(&attachment_dir),
None,
TransformInput {
refs,
max_concurrency: args.transform.max_concurrency(),
attachment_dir: Some(&attachment_dir),
row_writer: None,
},
)
.await?;
let row_count = transform_response.rows.len();
Expand Down Expand Up @@ -1216,10 +1219,12 @@ async fn transform_refs(base: &BaseArgs, args: PipelineTransformArgs) -> Result<
&args.runner,
&source_project.id,
&source,
refs,
args.transform.max_concurrency(),
Some(&attachment_dir),
Some(&mut writer as &mut dyn Write),
TransformInput {
refs,
max_concurrency: args.transform.max_concurrency(),
attachment_dir: Some(&attachment_dir),
row_writer: Some(&mut writer as &mut dyn Write),
},
)
.await?;
writer.flush().context("failed to flush transform output")?;
Expand Down Expand Up @@ -1256,16 +1261,26 @@ async fn transform_refs(base: &BaseArgs, args: PipelineTransformArgs) -> Result<
)
}

struct TransformInput<'a> {
refs: Vec<Value>,
max_concurrency: usize,
attachment_dir: Option<&'a Path>,
row_writer: Option<&'a mut dyn Write>,
}

async fn transform_source_refs(
base: &BaseArgs,
runner: &PipelineRunnerArgs,
source_project_id: &str,
source: &PipelineSourceInspect,
refs: Vec<Value>,
max_concurrency: usize,
attachment_dir: Option<&Path>,
mut row_writer: Option<&mut dyn Write>,
input: TransformInput<'_>,
) -> Result<PipelineTransformResponse> {
let TransformInput {
refs,
max_concurrency,
attachment_dir,
mut row_writer,
} = input;
if let Some(attachment_dir) = attachment_dir {
fs::create_dir_all(attachment_dir)
.with_context(|| format!("failed to create {}", attachment_dir.display()))?;
Expand Down Expand Up @@ -1849,11 +1864,13 @@ async fn discover_refs(
let result = discover_project_log_refs(
&client,
&ctx,
&project.id,
filter.as_ref(),
project_log_ref_scope(scope),
limit,
options.page_size,
ProjectLogRefDiscoveryOptions {
project_id: &project.id,
filter: filter.as_ref(),
scope: project_log_ref_scope(scope),
target: limit,
page_size: options.page_size,
},
|reference| {
write_jsonl_value(&mut writer, &reference.to_value())?;
writer.flush().context("failed to flush discovery output")?;
Expand Down
45 changes: 28 additions & 17 deletions src/sync/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ pub(crate) struct ProjectLogRefDiscoveryResult {
pub(crate) pages: usize,
}

pub(crate) struct ProjectLogRefDiscoveryOptions<'a> {
pub(crate) project_id: &'a str,
pub(crate) filter: Option<&'a Value>,
pub(crate) scope: ProjectLogRefScope,
pub(crate) target: usize,
pub(crate) page_size: usize,
}

#[derive(Debug, Deserialize)]
struct DiscoveryBtqlResponse {
data: Vec<Map<String, Value>>,
Expand All @@ -56,16 +64,19 @@ struct DiscoveryBtqlResponse {
pub(crate) async fn discover_project_log_refs<F>(
client: &ApiClient,
ctx: &LoginContext,
project_id: &str,
filter: Option<&Value>,
scope: ProjectLogRefScope,
target: usize,
page_size: usize,
options: ProjectLogRefDiscoveryOptions<'_>,
mut on_ref: F,
) -> Result<ProjectLogRefDiscoveryResult>
where
F: FnMut(ProjectLogRef) -> Result<()>,
{
let ProjectLogRefDiscoveryOptions {
project_id,
filter,
scope,
target,
page_size,
} = options;
let mut seen = HashSet::new();
let mut trace_roots = Vec::new();
let mut trace_refs_by_root_span_id = HashMap::new();
Expand Down Expand Up @@ -94,19 +105,19 @@ where
}
ProjectLogRefScope::Trace => {
let root_span_id = reference.root_span_id.clone();
if !trace_refs_by_root_span_id.contains_key(&root_span_id) {
if trace_roots.len() >= target {
continue;
match trace_refs_by_root_span_id.entry(root_span_id) {
std::collections::hash_map::Entry::Vacant(entry) => {
if trace_roots.len() >= target {
continue;
}
trace_roots.push(entry.key().clone());
entry.insert(reference);
}
std::collections::hash_map::Entry::Occupied(mut entry) => {
if better_trace_origin_ref(entry.get(), &reference) {
entry.insert(reference);
}
}
trace_roots.push(root_span_id.clone());
trace_refs_by_root_span_id.insert(root_span_id, reference);
continue;
}
let should_replace = trace_refs_by_root_span_id
.get(&root_span_id)
.is_none_or(|current| better_trace_origin_ref(current, &reference));
if should_replace {
trace_refs_by_root_span_id.insert(root_span_id, reference);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/topics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ enum TopicMapCommands {
#[command(alias = "view")]
Show(TopicMapViewArgs),
/// Update a configured Topics topic map by name or function ID
Set(TopicMapSetArgs),
Set(Box<TopicMapSetArgs>),
}

#[derive(Debug, Clone, Args)]
Expand Down
Loading