-
Notifications
You must be signed in to change notification settings - Fork 731
feat(snowflake-connectors): meetings implementation [CM-1034] #3998
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
ea73a87
f2bf2c8
6039d06
b3c3880
fc137a6
d7f4e8c
dce716b
188c6c5
befa516
efbec03
c122c30
1ac5d3a
e7b8669
9d855d6
5ba1afd
f52614a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| INSERT INTO "activityTypes" ("activityType", platform, "isCodeContribution", "isCollaboration", description, "label") VALUES | ||
| ('invited-meeting', 'meetings', false, false, 'User is invited to a meeting', 'Invited to a meeting'), | ||
| ('attended-meeting', 'meetings', false, false, 'User attends a meeting', 'Attended a meeting'); | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -89,24 +89,26 @@ export class TransformerConsumer { | |
| let resolveSkippedCount = 0 | ||
|
|
||
| for await (const row of this.s3Service.streamParquetRows(job.s3Path)) { | ||
| const result = source.transformer.safeTransformRow(row) | ||
| if (!result) { | ||
| const results = source.transformer.safeTransformRow(row) | ||
| if (!results) { | ||
| transformSkippedCount++ | ||
| continue | ||
| } | ||
|
|
||
| const resolved = await this.integrationResolver.resolve(platform, result.segment) | ||
| if (!resolved) { | ||
| resolveSkippedCount++ | ||
| continue | ||
| for (const result of results) { | ||
| const resolved = await this.integrationResolver.resolve(platform, result.segment) | ||
| if (!resolved) { | ||
| resolveSkippedCount++ | ||
| continue | ||
| } | ||
|
|
||
| await this.emitter.createAndProcessActivityResult( | ||
| resolved.segmentId, | ||
| resolved.integrationId, | ||
| result.activity, | ||
| ) | ||
| transformedCount++ | ||
| } | ||
|
Comment on lines
+98
to
111
|
||
|
|
||
| await this.emitter.createAndProcessActivityResult( | ||
| resolved.segmentId, | ||
| resolved.integrationId, | ||
| result.activity, | ||
| ) | ||
| transformedCount++ | ||
| } | ||
|
|
||
| const skippedCount = transformSkippedCount + resolveSkippedCount | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,101 @@ | ||
| import { IS_PROD_ENV } from '@crowd/common' | ||
|
|
||
| const CDP_MATCHED_SEGMENTS = ` | ||
| cdp_matched_segments AS ( | ||
| SELECT DISTINCT | ||
| s.SOURCE_ID AS sourceId, | ||
| s.slug | ||
| FROM ANALYTICS.BRONZE_KAFKA_CROWD_DEV.SEGMENTS s | ||
| WHERE s.PARENT_SLUG IS NOT NULL | ||
| AND s.GRANDPARENTS_SLUG IS NOT NULL | ||
| AND s.SOURCE_ID IS NOT NULL | ||
| )` | ||
|
|
||
| const ORG_ACCOUNTS = ` | ||
| org_accounts AS ( | ||
| SELECT account_id, account_name, website, domain_aliases, LOGO_URL, INDUSTRY, N_EMPLOYEES | ||
| FROM analytics.bronze_fivetran_salesforce.accounts | ||
| WHERE website IS NOT NULL | ||
| UNION ALL | ||
| SELECT account_id, account_name, website, domain_aliases, NULL AS LOGO_URL, NULL AS INDUSTRY, NULL AS N_EMPLOYEES | ||
| FROM analytics.bronze_fivetran_salesforce_b2b.accounts | ||
| WHERE website IS NOT NULL | ||
| )` | ||
|
|
||
| export const buildSourceQuery = (sinceTimestamp?: string): string => { | ||
| let select = ` | ||
| SELECT | ||
| t.PRIMARY_KEY, | ||
| t.MEETING_ID, | ||
| t.MEETING_NAME, | ||
| t.PROJECT_ID, | ||
| t.PROJECT_NAME, | ||
| t.PROJECT_SLUG, | ||
| t.ACCOUNT_ID, | ||
| t.ACCOUNT_NAME, | ||
| t.MEETING_DATE, | ||
| t.MEETING_TIME, | ||
| t.INVITEE_FULL_NAME, | ||
| t.INVITEE_LF_SSO, | ||
| t.INVITEE_LF_USER_ID, | ||
| t.INVITEE_EMAIL, | ||
| t.INVITEE_ATTENDED, | ||
| t.WAS_INVITED, | ||
| t.RAW_COMMITTEE_TYPE, | ||
| org.website AS ORG_WEBSITE, | ||
| org.domain_aliases AS ORG_DOMAIN_ALIASES, | ||
| org.logo_url AS LOGO_URL, | ||
| org.industry AS ORGANIZATION_INDUSTRY, | ||
| CAST(org.n_employees AS VARCHAR) AS ORGANIZATION_SIZE | ||
| FROM ANALYTICS.SILVER_FACT.MEETING_ATTENDANCE t | ||
| INNER JOIN cdp_matched_segments cms | ||
| ON cms.slug = t.PROJECT_SLUG | ||
| AND cms.sourceId = t.PROJECT_ID | ||
| LEFT JOIN org_accounts org | ||
| ON t.ACCOUNT_ID = org.account_id | ||
| WHERE (t.WAS_INVITED = TRUE OR t.INVITEE_ATTENDED = TRUE)` | ||
|
|
||
| if (!IS_PROD_ENV) { | ||
| select += ` AND t.PROJECT_SLUG = 'cncf'` | ||
| } | ||
|
|
||
| const dedup = ` | ||
| QUALIFY ROW_NUMBER() OVER (PARTITION BY t.PRIMARY_KEY ORDER BY org.website DESC) = 1` | ||
|
|
||
| if (!sinceTimestamp) { | ||
| return ` | ||
| WITH ${ORG_ACCOUNTS}, | ||
| ${CDP_MATCHED_SEGMENTS} | ||
| ${select} | ||
| ${dedup}`.trim() | ||
| } | ||
|
|
||
| return ` | ||
| WITH ${ORG_ACCOUNTS}, | ||
| ${CDP_MATCHED_SEGMENTS}, | ||
| new_cdp_segments AS ( | ||
| SELECT DISTINCT | ||
| s.SOURCE_ID AS sourceId, | ||
| s.slug | ||
| FROM ANALYTICS.BRONZE_KAFKA_CROWD_DEV.SEGMENTS s | ||
| WHERE s.CREATED_TS >= '${sinceTimestamp}' | ||
| AND s.PARENT_SLUG IS NOT NULL | ||
| AND s.GRANDPARENTS_SLUG IS NOT NULL | ||
| AND s.SOURCE_ID IS NOT NULL | ||
| ) | ||
|
|
||
| -- Updated records in existing segments | ||
| ${select} | ||
| AND t.MEETING_DATE > '${sinceTimestamp}'::DATE | ||
|
mbani01 marked this conversation as resolved.
Outdated
cursor[bot] marked this conversation as resolved.
Outdated
|
||
| ${dedup} | ||
|
|
||
| UNION | ||
|
|
||
| -- All records in newly created segments | ||
| ${select} | ||
| AND EXISTS ( | ||
| SELECT 1 FROM new_cdp_segments ncs | ||
| WHERE ncs.slug = cms.slug AND ncs.sourceId = cms.sourceId | ||
| ) | ||
| ${dedup}`.trim() | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,176 @@ | ||
| import { MEETINGS_GRID, MeetingsActivityType } from '@crowd/integrations' | ||
| import { getServiceChildLogger } from '@crowd/logging' | ||
| import { | ||
| IActivityData, | ||
| IOrganizationIdentity, | ||
| OrganizationIdentityType, | ||
| OrganizationSource, | ||
| PlatformType, | ||
| } from '@crowd/types' | ||
|
|
||
| import { TransformedActivity, TransformerBase } from '../../../core/transformerBase' | ||
|
|
||
| const log = getServiceChildLogger('meetingAttendanceTransformer') | ||
|
|
||
| export class MeetingAttendanceTransformer extends TransformerBase { | ||
| readonly platform = PlatformType.MEETINGS | ||
|
|
||
| transformRow(row: Record<string, unknown>): TransformedActivity | TransformedActivity[] | null { | ||
| const email = (row.INVITEE_EMAIL as string | null)?.trim() || null | ||
| if (!email) { | ||
| log.debug({ primaryKey: row.PRIMARY_KEY }, 'Skipping row: missing email') | ||
| return null | ||
| } | ||
|
joanagmaia marked this conversation as resolved.
|
||
|
|
||
| const lfUsername = (row.INVITEE_LF_SSO as string | null)?.trim() || null | ||
| const sourceId = (row.INVITEE_LF_USER_ID as string | null)?.trim() || undefined | ||
| const displayName = (row.INVITEE_FULL_NAME as string | null)?.trim() || email | ||
|
|
||
| const identities = this.buildMemberIdentities({ | ||
| email, | ||
| sourceId, | ||
| platformUsername: null, | ||
| lfUsername, | ||
| }) | ||
|
mbani01 marked this conversation as resolved.
|
||
|
|
||
| const segmentSlug = (row.PROJECT_SLUG as string | null)?.trim() || null | ||
| const segmentSourceId = (row.PROJECT_ID as string | null)?.trim() || null | ||
| if (!segmentSlug || !segmentSourceId) { | ||
| return null | ||
| } | ||
|
|
||
| const meetingDate = (row.MEETING_DATE as string | null) || null | ||
| const meetingTime = (row.MEETING_TIME as string | null) || null | ||
| const timestamp = | ||
| meetingDate && meetingTime ? `${meetingDate}T${meetingTime}` : meetingDate || null | ||
|
|
||
| const primaryKey = (row.PRIMARY_KEY as string)?.trim() | ||
|
|
||
| const attributes = { | ||
| meetingID: row.MEETING_ID, | ||
| scheduledTime: timestamp, | ||
| topic: (row.MEETING_NAME as string | null) || null, | ||
| projectID: (row.PROJECT_ID as string | null) || null, | ||
| projectName: (row.PROJECT_NAME as string | null) || null, | ||
| organizationId: (row.ACCOUNT_ID as string | null) || null, | ||
| organizationName: (row.ACCOUNT_NAME as string | null) || null, | ||
| meetingType: (row.RAW_COMMITTEE_TYPE as string | null) || null, | ||
|
joanagmaia marked this conversation as resolved.
|
||
| } | ||
|
|
||
| const member = { | ||
| displayName, | ||
| identities, | ||
| organizations: this.buildOrganizations(row), | ||
| } | ||
|
|
||
| const segment = { slug: segmentSlug, sourceId: segmentSourceId } | ||
|
|
||
| const activities: TransformedActivity[] = [] | ||
|
|
||
| if (row.WAS_INVITED === true) { | ||
| activities.push({ | ||
| activity: { | ||
| type: MeetingsActivityType.INVITED_MEETING, | ||
| platform: PlatformType.MEETINGS, | ||
| timestamp, | ||
| score: MEETINGS_GRID[MeetingsActivityType.INVITED_MEETING].score, | ||
| sourceId: `${primaryKey}_invited`, | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @joanagmaia, this one is different from specs, as meeting_id isn't unique (similar to committees). Used primaryKey as it seems to be unique |
||
| member: { ...member }, | ||
| attributes, | ||
| } as IActivityData, | ||
| segment, | ||
| }) | ||
| } | ||
|
|
||
| if (row.INVITEE_ATTENDED === true) { | ||
| activities.push({ | ||
| activity: { | ||
| type: MeetingsActivityType.ATTENDED_MEETING, | ||
| platform: PlatformType.MEETINGS, | ||
| timestamp, | ||
| score: MEETINGS_GRID[MeetingsActivityType.ATTENDED_MEETING].score, | ||
| sourceId: `${primaryKey}_attended`, | ||
| member: { ...member }, | ||
| attributes, | ||
| } as IActivityData, | ||
| segment, | ||
| }) | ||
| } | ||
|
|
||
| if (activities.length === 0) { | ||
| return null | ||
| } | ||
|
|
||
| return activities.length === 1 ? activities[0] : activities | ||
|
cursor[bot] marked this conversation as resolved.
|
||
| } | ||
|
|
||
| private buildOrganizations( | ||
| row: Record<string, unknown>, | ||
| ): IActivityData['member']['organizations'] { | ||
| const accountName = (row.ACCOUNT_NAME as string | null)?.trim() || null | ||
| if (!accountName) { | ||
| return undefined | ||
| } | ||
|
cursor[bot] marked this conversation as resolved.
|
||
|
|
||
|
joanagmaia marked this conversation as resolved.
|
||
| const website = (row.ORG_WEBSITE as string | null)?.trim() || null | ||
| const domainAliases = (row.ORG_DOMAIN_ALIASES as string | null)?.trim() || null | ||
|
|
||
| if (this.isIndividualNoAccount(accountName)) { | ||
| return [ | ||
| { | ||
| displayName: accountName, | ||
| source: OrganizationSource.MEETINGS, | ||
| identities: website | ||
| ? [ | ||
| { | ||
| platform: this.platform, | ||
| value: website, | ||
| type: OrganizationIdentityType.PRIMARY_DOMAIN, | ||
| verified: true, | ||
| }, | ||
| ] | ||
| : [], | ||
| }, | ||
| ] | ||
| } | ||
|
|
||
| const identities: IOrganizationIdentity[] = [] | ||
|
|
||
| if (website) { | ||
| identities.push({ | ||
| platform: this.platform, | ||
| value: website, | ||
| type: OrganizationIdentityType.PRIMARY_DOMAIN, | ||
| verified: true, | ||
| }) | ||
| } | ||
|
|
||
| if (domainAliases) { | ||
| for (const alias of domainAliases.split(',')) { | ||
| const trimmed = alias.trim() | ||
| if (trimmed) { | ||
| identities.push({ | ||
| platform: this.platform, | ||
| value: trimmed, | ||
| type: OrganizationIdentityType.ALTERNATIVE_DOMAIN, | ||
| verified: true, | ||
| }) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| return [ | ||
| { | ||
| displayName: accountName, | ||
| source: OrganizationSource.MEETINGS, | ||
| identities, | ||
| logo: (row.LOGO_URL as string | null)?.trim() || undefined, | ||
| size: | ||
| typeof row.ORGANIZATION_SIZE === 'string' | ||
| ? row.ORGANIZATION_SIZE.trim() || undefined | ||
| : undefined, | ||
| industry: (row.ORGANIZATION_INDUSTRY as string | null)?.trim() || undefined, | ||
| }, | ||
| ] | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.