Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
from dataclasses import dataclass
from datetime import date, datetime
from datetime import date
from typing import Any, Protocol
from uuid import UUID

Expand Down Expand Up @@ -144,17 +144,17 @@ def send_compact_transaction_report_email(
compact: str,
report_s3_path: str,
reporting_cycle: str,
start_date: datetime,
end_date: datetime,
start_date: date,
end_date: date,
) -> dict[str, Any]:
"""
Send a compact transaction report email.

:param compact: Compact name
:param report_s3_path: S3 path to the report zip file
:param reporting_cycle: Reporting cycle (e.g., 'weekly', 'monthly')
:param start_date: Start datetime of the reporting period
:param end_date: End datetime of the reporting period
:param start_date: Start date of the reporting period
:param end_date: End date of the reporting period
:return: Response from the email notification service
"""

Expand All @@ -178,8 +178,8 @@ def send_jurisdiction_transaction_report_email(
jurisdiction: str,
report_s3_path: str,
reporting_cycle: str,
start_date: datetime,
end_date: datetime,
start_date: date,
Comment thread
jusdino marked this conversation as resolved.
end_date: date,
) -> dict[str, str]:
"""
Send a jurisdiction transaction report email.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

import csv
from datetime import datetime, timedelta
from datetime import date, datetime
from decimal import Decimal
from enum import StrEnum
from io import BytesIO, StringIO
Expand All @@ -13,6 +13,7 @@
from cc_common.data_model.schema.compact.common import COMPACT_TYPE
from cc_common.data_model.schema.jurisdiction.common import JURISDICTION_TYPE
from cc_common.exceptions import CCInternalException
from report_window import ReportCycle, ReportWindow


class ReportableTransactionStatuses(StrEnum):
Expand All @@ -21,73 +22,10 @@ class ReportableTransactionStatuses(StrEnum):
SettledSuccessfully = 'settledSuccessfully'


def _get_display_date_range(reporting_cycle: str) -> tuple[datetime, datetime]:
"""Get the display date range for reports.

These dates are used for report filenames and email notifications.

:param reporting_cycle: Either 'weekly' or 'monthly'
:return: Tuple of (start_time, end_time) in UTC for display purposes
"""
if reporting_cycle == 'weekly':
end_time = config.current_standard_datetime
# Go back 7 days to capture the full week
start_time = end_time - timedelta(days=7)
return start_time, end_time
if reporting_cycle == 'monthly':
# Reports run on the first day of the month.
# Knowing this, we can use the current date to get the start and end of the month.
# By going back 1 day from the first day of the current month, we get the last day of the previous month.
end_time = config.current_standard_datetime.replace(
day=1, hour=0, minute=0, second=0, microsecond=0
) - timedelta(days=1)
# Start time is the first day of the previous month
start_time = end_time.replace(day=1)
return start_time, end_time
raise ValueError(f'Invalid reporting cycle: {reporting_cycle}')


def _get_query_date_range(reporting_cycle: str) -> tuple[datetime, datetime]:
"""Get the query date range for DynamoDB queries.

Our Sort Key format for transactions includes additional components after the timestamp
(COMPACT#name#TIME#timestamp#BATCH#id#TX#id), So the DynamoDB BETWEEN condition is INCLUSIVE for the beginning
range and EXCLUSIVE at the end range. This is because DynamoDB performs lexicographical comparison on the entire
sort key string. When the sort key continues beyond the comparison value:

- For the lower bound: Additional characters after the comparison point make the full key "greater than" the bound,
satisfying the >= condition
- For the upper bound: Additional characters after the comparison point make the full key "greater than" the bound,
failing the <= condition

We need to adjust our timestamps accordingly to ensure we capture all settled transactions exactly once.

:param reporting_cycle: Either 'weekly' or 'monthly'
:return: Tuple of (start_time, end_time) in UTC for DynamoDB queries
"""
if reporting_cycle == 'weekly':
# Reports run on Friday 10:00 PM UTC
end_time = config.current_standard_datetime.replace(hour=22, minute=0, second=0, microsecond=0)
# Go back 7 days to capture the full week
start_time = end_time - timedelta(days=7)
return start_time, end_time

if reporting_cycle == 'monthly':
# Reports run on the first day of the month
# End time is midnight, since that will be excluded from the BETWEEN key condition
end_time = config.current_standard_datetime.replace(hour=0, minute=0, second=0, microsecond=0)
# Start time is midnight of the previous month
start_time = (end_time - timedelta(days=1)).replace(day=1, hour=0, minute=0, second=0, microsecond=0)
return start_time, end_time

raise ValueError(f'Invalid reporting cycle: {reporting_cycle}')


def _store_compact_reports_in_s3(
compact: str,
reporting_cycle: str,
start_time: datetime,
end_time: datetime,
report_window: ReportWindow,
summary_report: str,
transaction_detail: str,
bucket_name: str,
Expand All @@ -96,33 +34,31 @@ def _store_compact_reports_in_s3(

:param compact: Compact name
:param reporting_cycle: Either 'weekly' or 'monthly'
:param start_time: Report start time
:param end_time: Report end time
:param report_window: the Report Window
:param summary_report: Financial summary report CSV content
:param transaction_detail: Transaction detail report CSV content
:param bucket_name: S3 bucket name
:return: Dictionary of file types to their S3 paths
"""
date_range = f'{start_time.strftime("%Y-%m-%d")}--{end_time.strftime("%Y-%m-%d")}'
base_path = (
f'compact/{compact}/reports/compact-transactions/reporting-cycle/{reporting_cycle}/'
f'{end_time.strftime("%Y/%m/%d")}'
f'{report_window.display_end.strftime("%Y/%m/%d")}'
)

# Define paths for all report files
# Currently, we are only sending the .zip file in the email reporting, but there is potential
# to store .gz files in the future
paths = {
'report_zip': f'{base_path}/{compact}-{date_range}-report.zip',
'report_zip': f'{base_path}/{compact}-{report_window.display_text}-report.zip',
}

s3_client = config.s3_client

# Create and store combined zip with uncompressed CSVs
zip_buffer = BytesIO()
with ZipFile(zip_buffer, 'w', compression=ZIP_DEFLATED) as zip_file:
zip_file.writestr(f'financial-summary-{date_range}.csv', summary_report.encode('utf-8'))
zip_file.writestr(f'transaction-detail-{date_range}.csv', transaction_detail.encode('utf-8'))
zip_file.writestr(f'financial-summary-{report_window.display_text}.csv', summary_report.encode('utf-8'))
zip_file.writestr(f'transaction-detail-{report_window.display_text}.csv', transaction_detail.encode('utf-8'))
s3_client.put_object(Bucket=bucket_name, Key=paths['report_zip'], Body=zip_buffer.getvalue())

return paths
Expand All @@ -132,8 +68,7 @@ def _store_jurisdiction_reports_in_s3(
compact: str,
jurisdiction: str,
reporting_cycle: str,
start_time: datetime,
end_time: datetime,
report_window: ReportWindow,
transaction_detail: str,
bucket_name: str,
) -> dict[str, str]:
Expand All @@ -142,31 +77,31 @@ def _store_jurisdiction_reports_in_s3(
:param compact: Compact name
:param jurisdiction: Jurisdiction postal code
:param reporting_cycle: Either 'weekly' or 'monthly'
:param start_time: Report start time
:param end_time: Report end time
:param report_window: The report window
:param transaction_detail: Transaction detail report CSV content
:param bucket_name: S3 bucket name
:return: Dictionary of file types to their S3 paths
"""
date_range = f'{start_time.strftime("%Y-%m-%d")}--{end_time.strftime("%Y-%m-%d")}'
base_path = (
f'compact/{compact}/reports/jurisdiction-transactions/jurisdiction/{jurisdiction}/'
f'reporting-cycle/{reporting_cycle}/{end_time.strftime("%Y/%m/%d")}'
f'reporting-cycle/{reporting_cycle}/{report_window.display_end.strftime("%Y/%m/%d")}'
)

# Define paths for all report files
# Currently, we are only sending the .zip file in the email reporting, but there is potential
# to store .gz files in the future
paths = {
'report_zip': f'{base_path}/{jurisdiction}-{date_range}-report.zip',
'report_zip': f'{base_path}/{jurisdiction}-{report_window.display_text}-report.zip',
}

s3_client = config.s3_client

# Create and store zip with uncompressed CSV
zip_buffer = BytesIO()
with ZipFile(zip_buffer, 'w', compression=ZIP_DEFLATED) as zip_file:
zip_file.writestr(f'{jurisdiction}-transaction-detail-{date_range}.csv', transaction_detail.encode('utf-8'))
zip_file.writestr(
f'{jurisdiction}-transaction-detail-{report_window.display_text}.csv', transaction_detail.encode('utf-8')
)
s3_client.put_object(Bucket=bucket_name, Key=paths['report_zip'], Body=zip_buffer.getvalue())

return paths
Expand All @@ -186,8 +121,26 @@ def generate_transaction_reports(event: dict, context: LambdaContext) -> dict:
:return: Success message
"""
compact = event['compact']
reporting_cycle = event['reportingCycle']
logger.info('Generating transaction reports', compact=compact, reporting_cycle=reporting_cycle)
reporting_cycle = ReportCycle(event['reportingCycle'])

# Support 'manual' report date overrides for re-runs
report_start_override = event.get('reportStartOverride')
report_end_override = event.get('reportEndOverride')
if report_start_override and report_end_override:
report_window = ReportWindow(
reporting_cycle,
display_start_date=date.fromisoformat(report_start_override),
display_end_date=date.fromisoformat(report_end_override),
)
else:
report_window = ReportWindow(reporting_cycle)

logger.info(
'Generating transaction reports',
compact=compact,
reporting_cycle=reporting_cycle,
window=report_window.display_text,
)

# this is used to track any errors that occur when generating the reports
# without preventing valid reports from being sent
Expand Down Expand Up @@ -216,16 +169,9 @@ def generate_transaction_reports(event: dict, context: LambdaContext) -> dict:
# Get the S3 bucket name
bucket_name = config.transaction_reports_bucket_name

# Get both query and display date ranges
query_start_time, query_end_time = _get_query_date_range(reporting_cycle)

# Convert query times to epochs for DynamoDB
start_epoch = int(query_start_time.timestamp())
end_epoch = int(query_end_time.timestamp())

# Get all transactions for the time period
transactions = transaction_client.get_transactions_in_range(
compact=compact, start_epoch=start_epoch, end_epoch=end_epoch
compact=compact, start_epoch=report_window.start_epoch, end_epoch=report_window.end_epoch
)

# For now, we only report on transactions that have been successfully settled, so we filter to only include
Expand Down Expand Up @@ -263,14 +209,11 @@ def generate_transaction_reports(event: dict, context: LambdaContext) -> dict:
compact_transaction_csv = _generate_compact_transaction_report(transactions, providers)
jurisdiction_reports = _generate_jurisdiction_reports(transactions, providers, jurisdiction_configurations)

display_start_time, display_end_time = _get_display_date_range(reporting_cycle)

# Store compact reports in S3 and get paths
compact_paths = _store_compact_reports_in_s3(
compact=compact,
reporting_cycle=reporting_cycle,
start_time=display_start_time,
end_time=display_end_time,
report_window=report_window,
summary_report=compact_summary_csv,
transaction_detail=compact_transaction_csv,
bucket_name=bucket_name,
Expand All @@ -282,8 +225,8 @@ def generate_transaction_reports(event: dict, context: LambdaContext) -> dict:
compact=compact,
report_s3_path=compact_paths['report_zip'],
reporting_cycle=reporting_cycle,
start_date=display_start_time,
end_date=display_end_time,
start_date=report_window.display_start,
end_date=report_window.display_end,
)
except CCInternalException as e:
logger.error(
Expand All @@ -300,8 +243,7 @@ def generate_transaction_reports(event: dict, context: LambdaContext) -> dict:
compact=compact,
jurisdiction=jurisdiction,
reporting_cycle=reporting_cycle,
start_time=display_start_time,
end_time=display_end_time,
report_window=report_window,
transaction_detail=report_csv,
bucket_name=bucket_name,
)
Expand All @@ -312,8 +254,8 @@ def generate_transaction_reports(event: dict, context: LambdaContext) -> dict:
jurisdiction=jurisdiction,
report_s3_path=jurisdiction_paths['report_zip'],
reporting_cycle=reporting_cycle,
start_date=display_start_time,
end_date=display_end_time,
start_date=report_window.display_start,
end_date=report_window.display_end,
)
except CCInternalException as e:
logger.error(
Expand Down
Loading
Loading