-
Notifications
You must be signed in to change notification settings - Fork 731
feat(snowflake-connectors): meetings implementation [CM-1034] #3998
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
ea73a87
f2bf2c8
6039d06
b3c3880
fc137a6
d7f4e8c
dce716b
188c6c5
befa516
efbec03
c122c30
1ac5d3a
e7b8669
9d855d6
5ba1afd
f52614a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| INSERT INTO "activityTypes" ("activityType", platform, "isCodeContribution", "isCollaboration", description, "label") VALUES | ||
| ('invited-meeting', 'meetings', false, false, 'User is invited to a meeting', 'Invited to a meeting'), | ||
| ('attended-meeting', 'meetings', false, false, 'User attends a meeting', 'Attended a meeting'); | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -89,24 +89,26 @@ export class TransformerConsumer { | |
| let resolveSkippedCount = 0 | ||
|
|
||
| for await (const row of this.s3Service.streamParquetRows(job.s3Path)) { | ||
| const result = source.transformer.safeTransformRow(row) | ||
| if (!result) { | ||
| const results = source.transformer.safeTransformRow(row) | ||
| if (!results) { | ||
| transformSkippedCount++ | ||
| continue | ||
| } | ||
|
|
||
| const resolved = await this.integrationResolver.resolve(platform, result.segment) | ||
| if (!resolved) { | ||
| resolveSkippedCount++ | ||
| continue | ||
| for (const result of results) { | ||
| const resolved = await this.integrationResolver.resolve(platform, result.segment) | ||
| if (!resolved) { | ||
| resolveSkippedCount++ | ||
| continue | ||
| } | ||
|
|
||
| await this.emitter.createAndProcessActivityResult( | ||
| resolved.segmentId, | ||
| resolved.integrationId, | ||
| result.activity, | ||
| ) | ||
| transformedCount++ | ||
| } | ||
|
Comment on lines
+98
to
111
|
||
|
|
||
| await this.emitter.createAndProcessActivityResult( | ||
| resolved.segmentId, | ||
| resolved.integrationId, | ||
| result.activity, | ||
| ) | ||
| transformedCount++ | ||
| } | ||
|
|
||
| const skippedCount = transformSkippedCount + resolveSkippedCount | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,101 @@ | ||
| import { IS_PROD_ENV } from '@crowd/common' | ||
|
|
||
| const CDP_MATCHED_SEGMENTS = ` | ||
| cdp_matched_segments AS ( | ||
| SELECT DISTINCT | ||
| s.SOURCE_ID AS sourceId, | ||
| s.slug | ||
| FROM ANALYTICS.BRONZE_KAFKA_CROWD_DEV.SEGMENTS s | ||
| WHERE s.PARENT_SLUG IS NOT NULL | ||
| AND s.GRANDPARENTS_SLUG IS NOT NULL | ||
| AND s.SOURCE_ID IS NOT NULL | ||
| )` | ||
|
|
||
| const ORG_ACCOUNTS = ` | ||
| org_accounts AS ( | ||
| SELECT account_id, account_name, website, domain_aliases, LOGO_URL, INDUSTRY, N_EMPLOYEES | ||
| FROM analytics.bronze_fivetran_salesforce.accounts | ||
| WHERE website IS NOT NULL | ||
| UNION ALL | ||
| SELECT account_id, account_name, website, domain_aliases, NULL AS LOGO_URL, NULL AS INDUSTRY, NULL AS N_EMPLOYEES | ||
| FROM analytics.bronze_fivetran_salesforce_b2b.accounts | ||
| WHERE website IS NOT NULL | ||
| )` | ||
|
|
||
| export const buildSourceQuery = (sinceTimestamp?: string): string => { | ||
| let select = ` | ||
| SELECT | ||
| t.PRIMARY_KEY, | ||
| t.MEETING_ID, | ||
| t.MEETING_NAME, | ||
| t.PROJECT_ID, | ||
| t.PROJECT_NAME, | ||
| t.PROJECT_SLUG, | ||
| t.ACCOUNT_ID, | ||
| t.ACCOUNT_NAME, | ||
| t.MEETING_DATE, | ||
| t.MEETING_TIME, | ||
| t.INVITEE_FULL_NAME, | ||
| t.INVITEE_LF_SSO, | ||
| t.INVITEE_LF_USER_ID, | ||
| t.INVITEE_EMAIL, | ||
| t.INVITEE_ATTENDED, | ||
| t.WAS_INVITED, | ||
| t.RAW_COMMITTEE_TYPE, | ||
| org.website AS ORG_WEBSITE, | ||
| org.domain_aliases AS ORG_DOMAIN_ALIASES, | ||
| org.logo_url AS LOGO_URL, | ||
| org.industry AS ORGANIZATION_INDUSTRY, | ||
| CAST(org.n_employees AS VARCHAR) AS ORGANIZATION_SIZE | ||
| FROM ANALYTICS.SILVER_FACT.MEETING_ATTENDANCE t | ||
| INNER JOIN cdp_matched_segments cms | ||
| ON cms.slug = t.PROJECT_SLUG | ||
| AND cms.sourceId = t.PROJECT_ID | ||
| LEFT JOIN org_accounts org | ||
| ON t.ACCOUNT_ID = org.account_id | ||
| WHERE (t.WAS_INVITED = TRUE OR t.INVITEE_ATTENDED = TRUE)` | ||
|
|
||
| if (!IS_PROD_ENV) { | ||
| select += ` AND t.PROJECT_SLUG = 'cncf'` | ||
| } | ||
|
|
||
| const dedup = ` | ||
| QUALIFY ROW_NUMBER() OVER (PARTITION BY t.PRIMARY_KEY ORDER BY org.website DESC) = 1` | ||
|
|
||
| if (!sinceTimestamp) { | ||
| return ` | ||
| WITH ${ORG_ACCOUNTS}, | ||
| ${CDP_MATCHED_SEGMENTS} | ||
| ${select} | ||
| ${dedup}`.trim() | ||
| } | ||
|
|
||
| return ` | ||
| WITH ${ORG_ACCOUNTS}, | ||
| ${CDP_MATCHED_SEGMENTS}, | ||
| new_cdp_segments AS ( | ||
| SELECT DISTINCT | ||
| s.SOURCE_ID AS sourceId, | ||
| s.slug | ||
| FROM ANALYTICS.BRONZE_KAFKA_CROWD_DEV.SEGMENTS s | ||
| WHERE s.CREATED_TS >= '${sinceTimestamp}' | ||
| AND s.PARENT_SLUG IS NOT NULL | ||
| AND s.GRANDPARENTS_SLUG IS NOT NULL | ||
| AND s.SOURCE_ID IS NOT NULL | ||
| ) | ||
|
|
||
| -- Updated records in existing segments | ||
| ${select} | ||
| AND t.MEETING_DATE >= '${sinceTimestamp}'::DATE | ||
|
cursor[bot] marked this conversation as resolved.
Outdated
|
||
| ${dedup} | ||
|
|
||
| UNION | ||
|
|
||
| -- All records in newly created segments | ||
| ${select} | ||
| AND EXISTS ( | ||
| SELECT 1 FROM new_cdp_segments ncs | ||
| WHERE ncs.slug = cms.slug AND ncs.sourceId = cms.sourceId | ||
| ) | ||
| ${dedup}`.trim() | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.