Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .claude/skills/scaffold-snowflake-connector/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,10 @@ After all identity fields are confirmed, summarize how `buildMemberIdentities()`

### 3b. Organization Mapping

If Pre-Analysis determined there is no org data (no org-related columns found in any table), confirm: "I don't see any organization columns in the schema. Does this source have org/company data?" — if yes, proceed; if no, skip to 3c.
If Pre-Analysis determined there is no org data (no org-related columns found in any table): before asking the user, first read existing transformers in `services/apps/snowflake_connectors/src/integrations/` to check whether any of them join an org table using a key that also exists in the user's tables. If a match is found, prompt the user:
> "I don't see org columns in the tables you provided, but [EXISTING_PLATFORM] sources org data from `{ORG_TABLE}` via `{join_key}` — which also appears in your table. Did you mean to include this? (Recommended)"

If no existing pattern is joinable, ask: "I don't see any org columns. Does this source have org/company data?" — if yes, ask for the table; if no, skip to 3c.

If Pre-Analysis identified org columns:

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
INSERT INTO "activityTypes" ("activityType", platform, "isCodeContribution", "isCollaboration", description, "label") VALUES
('invited-meeting', 'meetings', false, false, 'User is invited to a meeting', 'Invited to a meeting'),
('attended-meeting', 'meetings', false, false, 'User attends a meeting', 'Attended a meeting');
Comment thread
joanagmaia marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -89,24 +89,26 @@ export class TransformerConsumer {
let resolveSkippedCount = 0

for await (const row of this.s3Service.streamParquetRows(job.s3Path)) {
const result = source.transformer.safeTransformRow(row)
if (!result) {
const results = source.transformer.safeTransformRow(row)
if (!results) {
transformSkippedCount++
continue
}

const resolved = await this.integrationResolver.resolve(platform, result.segment)
if (!resolved) {
resolveSkippedCount++
continue
for (const result of results) {
const resolved = await this.integrationResolver.resolve(platform, result.segment)
if (!resolved) {
resolveSkippedCount++
continue
}

await this.emitter.createAndProcessActivityResult(
resolved.segmentId,
resolved.integrationId,
result.activity,
)
transformedCount++
}
Comment on lines +98 to 111
Copy link

Copilot AI Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that transformers may return multiple activities per row, this loop resolves the same segment repeatedly when multiple activities share it (common when a row yields multiple activity types). Consider de-duplicating integrationResolver.resolve(...) calls per row (e.g., cache by slug+sourceId) to avoid extra Redis/DB lookups and reduce latency.

Copilot uses AI. Check for mistakes.

await this.emitter.createAndProcessActivityResult(
resolved.segmentId,
resolved.integrationId,
result.activity,
)
transformedCount++
}

const skippedCount = transformSkippedCount + resolveSkippedCount
Expand Down
15 changes: 11 additions & 4 deletions services/apps/snowflake_connectors/src/core/transformerBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ export abstract class TransformerBase {
abstract readonly platform: PlatformType

/**
* Transform a single raw row from the S3 export into an activity
* Transform a single raw row from the S3 export into one or more activities
* along with routing metadata. Returns null if the row should be skipped.
*/
abstract transformRow(row: Record<string, unknown>): TransformedActivity | null
abstract transformRow(
row: Record<string, unknown>,
): TransformedActivity | TransformedActivity[] | null

private static readonly INDIVIDUAL_NO_ACCOUNT_RE = /^individual\s*(?:[-–?]|with)\s*no\s+account$/i

Expand Down Expand Up @@ -104,10 +106,15 @@ export abstract class TransformerBase {

/**
* Safe wrapper around transformRow that catches errors and returns null.
* Always normalizes the result to an array for consistent consumption.
*/
safeTransformRow(row: Record<string, unknown>): TransformedActivity | null {
safeTransformRow(row: Record<string, unknown>): TransformedActivity[] | null {
try {
return this.transformRow(row)
const result = this.transformRow(row)
if (result === null) {
return null
}
return Array.isArray(result) ? result : [result]
Comment thread
cursor[bot] marked this conversation as resolved.
Outdated
} catch (err) {
const message = err instanceof Error ? err.message : String(err)
const stack = err instanceof Error ? err.stack : undefined
Expand Down
11 changes: 11 additions & 0 deletions services/apps/snowflake_connectors/src/integrations/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import { PlatformType } from '@crowd/types'

import { buildSourceQuery as cventBuildSourceQuery } from './cvent/event-registrations/buildSourceQuery'
import { CventTransformer } from './cvent/event-registrations/transformer'
import { buildSourceQuery as meetingAttendanceBuildQuery } from './meetings/meeting-attendance/buildSourceQuery'
import { MeetingAttendanceTransformer } from './meetings/meeting-attendance/transformer'
import { buildSourceQuery as tncCertificatesBuildQuery } from './tnc/certificates/buildSourceQuery'
import { TncCertificatesTransformer } from './tnc/certificates/transformer'
import { buildSourceQuery as tncCoursesBuildQuery } from './tnc/courses/buildSourceQuery'
Expand All @@ -20,6 +22,15 @@ export type { BuildSourceQuery, DataSource, PlatformDefinition } from './types'
export { DataSourceName } from './types'

const supported: Partial<Record<PlatformType, PlatformDefinition>> = {
[PlatformType.MEETINGS]: {
sources: [
{
name: DataSourceName.MEETINGS_MEETING_ATTENDANCE,
buildSourceQuery: meetingAttendanceBuildQuery,
transformer: new MeetingAttendanceTransformer(),
Comment thread
cursor[bot] marked this conversation as resolved.
},
],
},
[PlatformType.CVENT]: {
sources: [
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import { IS_PROD_ENV } from '@crowd/common'

const CDP_MATCHED_SEGMENTS = `
cdp_matched_segments AS (
SELECT DISTINCT
s.SOURCE_ID AS sourceId,
s.slug
FROM ANALYTICS.BRONZE_KAFKA_CROWD_DEV.SEGMENTS s
WHERE s.PARENT_SLUG IS NOT NULL
AND s.GRANDPARENTS_SLUG IS NOT NULL
AND s.SOURCE_ID IS NOT NULL
)`

const ORG_ACCOUNTS = `
org_accounts AS (
SELECT account_id, account_name, website, domain_aliases, LOGO_URL, INDUSTRY, N_EMPLOYEES
FROM analytics.bronze_fivetran_salesforce.accounts
WHERE website IS NOT NULL
UNION ALL
SELECT account_id, account_name, website, domain_aliases, NULL AS LOGO_URL, NULL AS INDUSTRY, NULL AS N_EMPLOYEES
FROM analytics.bronze_fivetran_salesforce_b2b.accounts
WHERE website IS NOT NULL
)`

export const buildSourceQuery = (sinceTimestamp?: string): string => {
let select = `
SELECT
t.PRIMARY_KEY,
t.MEETING_ID,
t.MEETING_NAME,
t.PROJECT_ID,
t.PROJECT_NAME,
t.PROJECT_SLUG,
t.ACCOUNT_ID,
t.ACCOUNT_NAME,
t.MEETING_DATE,
t.MEETING_TIME,
t.INVITEE_FULL_NAME,
t.INVITEE_LF_SSO,
t.INVITEE_LF_USER_ID,
t.INVITEE_EMAIL,
t.INVITEE_ATTENDED,
t.WAS_INVITED,
t.RAW_COMMITTEE_TYPE,
org.website AS ORG_WEBSITE,
org.domain_aliases AS ORG_DOMAIN_ALIASES,
org.logo_url AS LOGO_URL,
org.industry AS ORGANIZATION_INDUSTRY,
CAST(org.n_employees AS VARCHAR) AS ORGANIZATION_SIZE
FROM ANALYTICS.SILVER_FACT.MEETING_ATTENDANCE t
INNER JOIN cdp_matched_segments cms
ON cms.slug = t.PROJECT_SLUG
AND cms.sourceId = t.PROJECT_ID
LEFT JOIN org_accounts org
ON t.ACCOUNT_ID = org.account_id
WHERE (t.WAS_INVITED = TRUE OR t.INVITEE_ATTENDED = TRUE)`

if (!IS_PROD_ENV) {
select += ` AND t.PROJECT_SLUG = 'cncf'`
}

const dedup = `
QUALIFY ROW_NUMBER() OVER (PARTITION BY t.PRIMARY_KEY ORDER BY org.website DESC) = 1`

if (!sinceTimestamp) {
return `
WITH ${ORG_ACCOUNTS},
${CDP_MATCHED_SEGMENTS}
${select}
${dedup}`.trim()
}

return `
WITH ${ORG_ACCOUNTS},
${CDP_MATCHED_SEGMENTS},
new_cdp_segments AS (
SELECT DISTINCT
s.SOURCE_ID AS sourceId,
s.slug
FROM ANALYTICS.BRONZE_KAFKA_CROWD_DEV.SEGMENTS s
WHERE s.CREATED_TS >= '${sinceTimestamp}'
AND s.PARENT_SLUG IS NOT NULL
AND s.GRANDPARENTS_SLUG IS NOT NULL
AND s.SOURCE_ID IS NOT NULL
)

-- Updated records in existing segments
${select}
AND t.MEETING_DATE > '${sinceTimestamp}'::DATE
Comment thread
mbani01 marked this conversation as resolved.
Outdated
Comment thread
cursor[bot] marked this conversation as resolved.
Outdated
${dedup}

UNION

-- All records in newly created segments
${select}
AND EXISTS (
SELECT 1 FROM new_cdp_segments ncs
WHERE ncs.slug = cms.slug AND ncs.sourceId = cms.sourceId
)
${dedup}`.trim()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
import { MEETINGS_GRID, MeetingsActivityType } from '@crowd/integrations'
import { getServiceChildLogger } from '@crowd/logging'
import {
IActivityData,
IOrganizationIdentity,
OrganizationIdentityType,
OrganizationSource,
PlatformType,
} from '@crowd/types'

import { TransformedActivity, TransformerBase } from '../../../core/transformerBase'

const log = getServiceChildLogger('meetingAttendanceTransformer')

export class MeetingAttendanceTransformer extends TransformerBase {
readonly platform = PlatformType.MEETINGS

transformRow(row: Record<string, unknown>): TransformedActivity | TransformedActivity[] | null {
const email = (row.INVITEE_EMAIL as string | null)?.trim() || null
if (!email) {
log.debug({ primaryKey: row.PRIMARY_KEY }, 'Skipping row: missing email')
return null
}
Comment thread
joanagmaia marked this conversation as resolved.

const lfUsername = (row.INVITEE_LF_SSO as string | null)?.trim() || null
const sourceId = (row.INVITEE_LF_USER_ID as string | null)?.trim() || undefined
const displayName = (row.INVITEE_FULL_NAME as string | null)?.trim() || email

const identities = this.buildMemberIdentities({
email,
sourceId,
platformUsername: null,
lfUsername,
})
Comment thread
mbani01 marked this conversation as resolved.

const segmentSlug = (row.PROJECT_SLUG as string | null)?.trim() || null
const segmentSourceId = (row.PROJECT_ID as string | null)?.trim() || null
if (!segmentSlug || !segmentSourceId) {
return null
}

const meetingDate = (row.MEETING_DATE as string | null) || null
const meetingTime = (row.MEETING_TIME as string | null) || null
const timestamp =
meetingDate && meetingTime ? `${meetingDate}T${meetingTime}` : meetingDate || null

const primaryKey = (row.PRIMARY_KEY as string)?.trim()

const attributes = {
meetingID: row.MEETING_ID,
scheduledTime: timestamp,
topic: (row.MEETING_NAME as string | null) || null,
projectID: (row.PROJECT_ID as string | null) || null,
projectName: (row.PROJECT_NAME as string | null) || null,
organizationId: (row.ACCOUNT_ID as string | null) || null,
organizationName: (row.ACCOUNT_NAME as string | null) || null,
meetingType: (row.RAW_COMMITTEE_TYPE as string | null) || null,
Comment thread
joanagmaia marked this conversation as resolved.
}

const member = {
displayName,
identities,
organizations: this.buildOrganizations(row),
}

const segment = { slug: segmentSlug, sourceId: segmentSourceId }

const activities: TransformedActivity[] = []

if (row.WAS_INVITED === true) {
activities.push({
activity: {
type: MeetingsActivityType.INVITED_MEETING,
platform: PlatformType.MEETINGS,
timestamp,
score: MEETINGS_GRID[MeetingsActivityType.INVITED_MEETING].score,
sourceId: `${primaryKey}_invited`,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@joanagmaia, this one is different from specs, as meeting_id isn't unique (similar to committees). Used primaryKey as it seems to be unique

member: { ...member },
attributes,
} as IActivityData,
segment,
})
}

if (row.INVITEE_ATTENDED === true) {
activities.push({
activity: {
type: MeetingsActivityType.ATTENDED_MEETING,
platform: PlatformType.MEETINGS,
timestamp,
score: MEETINGS_GRID[MeetingsActivityType.ATTENDED_MEETING].score,
sourceId: `${primaryKey}_attended`,
member: { ...member },
attributes,
} as IActivityData,
segment,
})
}

if (activities.length === 0) {
return null
}

return activities.length === 1 ? activities[0] : activities
Comment thread
cursor[bot] marked this conversation as resolved.
}

private buildOrganizations(
row: Record<string, unknown>,
): IActivityData['member']['organizations'] {
const accountName = (row.ACCOUNT_NAME as string | null)?.trim() || null
if (!accountName) {
return undefined
}
Comment thread
cursor[bot] marked this conversation as resolved.

Comment thread
joanagmaia marked this conversation as resolved.
const website = (row.ORG_WEBSITE as string | null)?.trim() || null
const domainAliases = (row.ORG_DOMAIN_ALIASES as string | null)?.trim() || null

if (this.isIndividualNoAccount(accountName)) {
return [
{
displayName: accountName,
source: OrganizationSource.MEETINGS,
identities: website
? [
{
platform: this.platform,
value: website,
type: OrganizationIdentityType.PRIMARY_DOMAIN,
verified: true,
},
]
: [],
},
]
}

const identities: IOrganizationIdentity[] = []

if (website) {
identities.push({
platform: this.platform,
value: website,
type: OrganizationIdentityType.PRIMARY_DOMAIN,
verified: true,
})
}

if (domainAliases) {
for (const alias of domainAliases.split(',')) {
const trimmed = alias.trim()
if (trimmed) {
identities.push({
platform: this.platform,
value: trimmed,
type: OrganizationIdentityType.ALTERNATIVE_DOMAIN,
verified: true,
})
}
}
}

return [
{
displayName: accountName,
source: OrganizationSource.MEETINGS,
identities,
logo: (row.LOGO_URL as string | null)?.trim() || undefined,
size:
typeof row.ORGANIZATION_SIZE === 'string'
? row.ORGANIZATION_SIZE.trim() || undefined
: undefined,
industry: (row.ORGANIZATION_INDUSTRY as string | null)?.trim() || undefined,
},
]
}
}
Loading
Loading