From 0bb7651369393362de49501005479f43cc5f0192 Mon Sep 17 00:00:00 2001 From: jackleary Date: Thu, 5 Mar 2026 11:44:02 +0000 Subject: [PATCH 01/15] nrl-2002 Initial set up of dynamo export --- .../dynamo_export_poll/__init__.py | 0 .../dynamo_export_poll/dynamo_export_poll.py | 31 ++ ...ynamo_export_step_function_definition.json | 87 +++++ .../dynamo_export_trigger/__init__.py | 0 .../dynamo_export_trigger.py | 83 ++++ .../modules/dynamo_export/dynamo_output_s3.tf | 77 ++++ .../dynamo_processed_output_s3.tf | 76 ++++ .../modules/dynamo_export/glue.tf | 178 +++++++++ .../modules/dynamo_export/kms.tf | 15 + .../modules/dynamo_export/lambda.tf | 217 +++++++++++ .../modules/dynamo_export/main.tf | 12 + .../modules/dynamo_export/outputs.tf | 12 + .../modules/dynamo_export/src/glue_job.py | 363 ++++++++++++++++++ .../dynamo_export/ssm_put_param/__init__.py | 0 .../ssm_put_param/ssm_put_param.py | 18 + .../modules/dynamo_export/step_function.tf | 61 +++ .../modules/dynamo_export/variables.tf | 24 ++ 17 files changed, 1254 insertions(+) create mode 100644 terraform/account-wide-infrastructure/modules/dynamo_export/dynamo_export_poll/__init__.py create mode 100644 terraform/account-wide-infrastructure/modules/dynamo_export/dynamo_export_poll/dynamo_export_poll.py create mode 100644 terraform/account-wide-infrastructure/modules/dynamo_export/dynamo_export_step_function_definition.json create mode 100644 terraform/account-wide-infrastructure/modules/dynamo_export/dynamo_export_trigger/__init__.py create mode 100644 terraform/account-wide-infrastructure/modules/dynamo_export/dynamo_export_trigger/dynamo_export_trigger.py create mode 100644 terraform/account-wide-infrastructure/modules/dynamo_export/dynamo_output_s3.tf create mode 100644 terraform/account-wide-infrastructure/modules/dynamo_export/dynamo_processed_output_s3.tf create mode 100644 terraform/account-wide-infrastructure/modules/dynamo_export/glue.tf create mode 100644 terraform/account-wide-infrastructure/modules/dynamo_export/kms.tf create mode 100644 terraform/account-wide-infrastructure/modules/dynamo_export/lambda.tf create mode 100644 terraform/account-wide-infrastructure/modules/dynamo_export/main.tf create mode 100644 terraform/account-wide-infrastructure/modules/dynamo_export/outputs.tf create mode 100644 terraform/account-wide-infrastructure/modules/dynamo_export/src/glue_job.py create mode 100644 terraform/account-wide-infrastructure/modules/dynamo_export/ssm_put_param/__init__.py create mode 100644 terraform/account-wide-infrastructure/modules/dynamo_export/ssm_put_param/ssm_put_param.py create mode 100644 terraform/account-wide-infrastructure/modules/dynamo_export/step_function.tf create mode 100644 terraform/account-wide-infrastructure/modules/dynamo_export/variables.tf diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/dynamo_export_poll/__init__.py b/terraform/account-wide-infrastructure/modules/dynamo_export/dynamo_export_poll/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/dynamo_export_poll/dynamo_export_poll.py b/terraform/account-wide-infrastructure/modules/dynamo_export/dynamo_export_poll/dynamo_export_poll.py new file mode 100644 index 000000000..44cc53a31 --- /dev/null +++ b/terraform/account-wide-infrastructure/modules/dynamo_export/dynamo_export_poll/dynamo_export_poll.py @@ -0,0 +1,31 @@ +import boto3 +from botocore.config import Config + +ddb = boto3.client( + "dynamodb", + config=Config(connect_timeout=5, read_timeout=5), +) + + +def lambda_handler(event, _): + completed = [] + for arn in event["export_arns"]: + response = ddb.describe_export(ExportArn=arn) + if response["ExportDescription"]["ExportStatus"] == "FAILED": + return { + "status": "FAILED", + "export_to_time": event["export_to_time"], + "export_arns": event["export_arns"], + "export_type": event["export_type"], + } + + completed.append(response["ExportDescription"]["ExportStatus"]) + + status = "COMPLETED" if all(s == "COMPLETED" for s in completed) else "IN_PROGRESS" + + return { + "status": status, + "export_to_time": event["export_to_time"], + "export_arns": event["export_arns"], + "export_type": event["export_type"], + } diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/dynamo_export_step_function_definition.json b/terraform/account-wide-infrastructure/modules/dynamo_export/dynamo_export_step_function_definition.json new file mode 100644 index 000000000..8327ae3bd --- /dev/null +++ b/terraform/account-wide-infrastructure/modules/dynamo_export/dynamo_export_step_function_definition.json @@ -0,0 +1,87 @@ +{ + "Comment": "execute lambdas", + "StartAt": "DynamoExport", + "States": { + "DynamoExport": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "OutputPath": "$.Payload", + "Parameters": { + "FunctionName": "${lambda_export_trigger_function_name}", + "Payload.$": "$" + }, + "Next": "DynamoExportStatusCheck" + }, + "DynamoExportStatusCheck": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "OutputPath": "$.Payload", + "Parameters": { + "FunctionName": "${lambda_export_poll_function_name}", + "Payload.$": "$" + }, + "Next": "Choice" + }, + "Choice": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.status", + "StringEquals": "COMPLETED", + "Next": "SSMPut" + }, + { + "Variable": "$.status", + "StringEquals": "IN_PROGRESS", + "Next": "WaitState" + }, + { + "Variable": "$.status", + "StringEquals": "FAILED", + "Next": "ExportFailure" + } + ], + "Default": "FailState" + }, + "FailState": { + "Type": "Fail", + "Error": "UnhandledStatus", + "Cause": "Status not recognised" + }, + "ExportFailure": { + "Type": "Fail", + "Error": "DynamoExportFailed", + "Cause": "DynamoDB Export Failed" + }, + "WaitState": { + "Type": "Wait", + "Seconds": 120, + "Next": "DynamoExportStatusCheck" + }, + "SSMPut": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "OutputPath": "$.Payload", + "Parameters": { + "FunctionName": "${lambda_ssm_put_param_function_name}", + "Payload.$": "$" + }, + "Next": "GlueJobTrigger" + }, + "GlueJobTrigger": { + "Type": "Task", + "Resource": "arn:aws:states:::glue:startJobRun.sync", + "Parameters": { + "JobName": "${glue_job_name}", + "Arguments": { + "--SOURCE_BUCKET": "${dynamo_export_s3_bucket_name}", + "--TARGET_BUCKET": "${dynamo_export_processed_s3_bucket_name}", + "--DDB_TABLE_ARN": "${ddb_table_arn}", + "--GLUE_CRAWLER_NAME": "${glue_crawler_name}", + "--EXPORT_TYPE.$": "$.export_type" + } + }, + "End": true + } + } +} diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/dynamo_export_trigger/__init__.py b/terraform/account-wide-infrastructure/modules/dynamo_export/dynamo_export_trigger/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/dynamo_export_trigger/dynamo_export_trigger.py b/terraform/account-wide-infrastructure/modules/dynamo_export/dynamo_export_trigger/dynamo_export_trigger.py new file mode 100644 index 000000000..a2c9ce3a9 --- /dev/null +++ b/terraform/account-wide-infrastructure/modules/dynamo_export/dynamo_export_trigger/dynamo_export_trigger.py @@ -0,0 +1,83 @@ +import os +from datetime import datetime, timedelta, timezone + +import boto3 +from botocore.config import Config +from botocore.exceptions import ClientError + +bucket = os.environ["BUCKET"] +ddb_table_arn = os.environ["DDB_TABLE_ARN"] +kms_key = os.environ["KMS_KEY"] +env = os.environ["ENVIRONMENT"] +ddb_table_name = os.environ["DDB_TABLE_NAME"] + +SSM_PARAM = "/exports/DynamoExportRuntime" + +ddb_client = boto3.client( + "dynamodb", + config=Config(connect_timeout=5, read_timeout=5), +) +ssm = boto3.client( + "ssm", + config=Config(connect_timeout=5, read_timeout=5), +) + + +def lambda_handler(_, __): + to_time = datetime.now(timezone.utc).replace(microsecond=0, second=0, minute=0) + export_arns = [] + + try: + from_time = ssm.get_parameter(Name=SSM_PARAM)["Parameter"]["Value"] + from_time = datetime.fromisoformat(from_time).replace( + microsecond=0, second=0, minute=0 + ) + + # Handle exports longer than 24 hours by splitting into multiple exports + earliest_pitr = ddb_client.describe_continuous_backups( + TableName=ddb_table_name + )["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"][ + "EarliestRestorableDateTime" + ] + from_time = max(from_time, earliest_pitr) + days_difference = (to_time - from_time).days + 1 + from_times = [from_time + timedelta(days=i) for i in range(days_difference)] + + for base_time in from_times: + end_time = min(base_time + timedelta(days=1), to_time) + if end_time == base_time: + continue + response = ddb_client.export_table_to_point_in_time( + TableArn=ddb_table_arn, + S3Bucket=bucket, + S3SseAlgorithm="KMS", + S3SseKmsKeyId=kms_key, + ExportFormat="DYNAMODB_JSON", + ExportType="INCREMENTAL_EXPORT", + IncrementalExportSpecification={ + "ExportFromTime": base_time, + "ExportToTime": end_time, + "ExportViewType": "NEW_AND_OLD_IMAGES", + }, + ) + export_arns.append(response["ExportDescription"]["ExportArn"]) + export_type = response["ExportDescription"]["ExportType"] + except ClientError as e: + if e.response["Error"]["Code"] != "ParameterNotFound": + raise + response = ddb_client.export_table_to_point_in_time( + TableArn=ddb_table_arn, + S3Bucket=bucket, + S3SseAlgorithm="KMS", + S3SseKmsKeyId=kms_key, + ExportFormat="DYNAMODB_JSON", + ExportType="FULL_EXPORT", + ) + export_arns.append(response["ExportDescription"]["ExportArn"]) + export_type = response["ExportDescription"]["ExportType"] + + return { + "export_to_time": to_time.isoformat(), + "export_arns": export_arns, + "export_type": export_type, + } diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/dynamo_output_s3.tf b/terraform/account-wide-infrastructure/modules/dynamo_export/dynamo_output_s3.tf new file mode 100644 index 000000000..f18938d43 --- /dev/null +++ b/terraform/account-wide-infrastructure/modules/dynamo_export/dynamo_output_s3.tf @@ -0,0 +1,77 @@ +resource "aws_s3_bucket" "dynamodb_output" { + bucket = "${var.name_prefix}-${var.environment}-dynamo-output-bucket" +} + +# May need to restrict access to specific IAM roles/principals in future, but helps with testing for now. +data "aws_iam_policy_document" "dynamodb_output" { + statement { + sid = "HTTPSOnly" + effect = "Deny" + actions = ["s3:*"] + + principals { + type = "AWS" + identifiers = ["*"] + } + + resources = [ + aws_s3_bucket.dynamodb_output.arn, + "${aws_s3_bucket.dynamodb_output.arn}/*" + ] + + condition { + test = "Bool" + variable = "aws:SecureTransport" + values = ["false"] + } + } +} + +resource "aws_s3_bucket_policy" "dynamodb_output" { + bucket = aws_s3_bucket.dynamodb_output.id + policy = data.aws_iam_policy_document.dynamodb_output.json +} + + +resource "aws_s3_bucket_server_side_encryption_configuration" "dynamodb_output" { + bucket = aws_s3_bucket.dynamodb_output.bucket + + rule { + apply_server_side_encryption_by_default { + kms_master_key_id = aws_kms_key.dynamo.arn + sse_algorithm = "aws:kms" + } + } +} + + +resource "aws_s3_bucket_public_access_block" "dynamodb_output_public_access_block" { + bucket = aws_s3_bucket.dynamodb_output.id + + block_public_acls = true + block_public_policy = true + ignore_public_acls = true + restrict_public_buckets = true +} + +resource "aws_s3_bucket_lifecycle_configuration" "dynamodb_output_lifecycle" { + bucket = aws_s3_bucket.dynamodb_output.id + + + rule { + id = "object-auto-delete-rule" + status = "Enabled" + filter {} + + expiration { + days = 2 + } + } +} + +resource "aws_s3_bucket_versioning" "dynamodb_output_versioning" { + bucket = aws_s3_bucket.dynamodb_output.id + versioning_configuration { + status = "Enabled" + } +} diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/dynamo_processed_output_s3.tf b/terraform/account-wide-infrastructure/modules/dynamo_export/dynamo_processed_output_s3.tf new file mode 100644 index 000000000..8af17df9c --- /dev/null +++ b/terraform/account-wide-infrastructure/modules/dynamo_export/dynamo_processed_output_s3.tf @@ -0,0 +1,76 @@ +resource "aws_s3_bucket" "dynamodb_output_processed" { + bucket = "${var.name_prefix}-${var.environment}-dynamo-output-processed-bucket" +} + +data "aws_iam_policy_document" "dynamodb_output_processed" { + statement { + sid = "HTTPSOnly" + effect = "Deny" + actions = ["s3:*"] + + principals { + type = "AWS" + identifiers = ["*"] + } + + resources = [ + aws_s3_bucket.dynamodb_output_processed.arn, + "${aws_s3_bucket.dynamodb_output_processed.arn}/*" + ] + + condition { + test = "Bool" + variable = "aws:SecureTransport" + values = ["false"] + } + } +} + +resource "aws_s3_bucket_policy" "dynamodb_output_processed" { + bucket = aws_s3_bucket.dynamodb_output_processed.id + policy = data.aws_iam_policy_document.dynamodb_output_processed.json +} + + +resource "aws_s3_bucket_server_side_encryption_configuration" "dynamodb_output_processed" { + bucket = aws_s3_bucket.dynamodb_output_processed.bucket + + rule { + apply_server_side_encryption_by_default { + kms_master_key_id = aws_kms_key.dynamo_processed.arn + sse_algorithm = "aws:kms" + } + } +} + + +resource "aws_s3_bucket_public_access_block" "dynamodb_output_processed_public_access_block" { + bucket = aws_s3_bucket.dynamodb_output_processed.id + + block_public_acls = true + block_public_policy = true + ignore_public_acls = true + restrict_public_buckets = true +} + +resource "aws_s3_bucket_lifecycle_configuration" "dynamodb_output_processed_lifecycle" { + bucket = aws_s3_bucket.dynamodb_output_processed.id + + + rule { + id = "object-auto-delete-rule" + status = "Enabled" + filter {} + + expiration { + days = 3 * 365 + } + } +} + +resource "aws_s3_bucket_versioning" "dynamodb_output_processed_versioning" { + bucket = aws_s3_bucket.dynamodb_output_processed.id + versioning_configuration { + status = "Enabled" + } +} diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/glue.tf b/terraform/account-wide-infrastructure/modules/dynamo_export/glue.tf new file mode 100644 index 000000000..395e5feaa --- /dev/null +++ b/terraform/account-wide-infrastructure/modules/dynamo_export/glue.tf @@ -0,0 +1,178 @@ +data "aws_iam_policy_document" "glue_service_assume_role" { + statement { + actions = ["sts:AssumeRole"] + principals { + type = "Service" + identifiers = ["glue.amazonaws.com"] + } + } +} + +resource "aws_iam_role" "glue_service_role" { + name = "${var.name_prefix}-dynamo-glue-service-role" + assume_role_policy = data.aws_iam_policy_document.glue_service_assume_role.json +} + +data "aws_iam_policy_document" "glue_service_policy" { + statement { + effect = "Allow" + actions = [ + "s3:PutObject", + "s3:GetObject", + "s3:GetObjectVersion", + "s3:GetBucketLocation", + "s3:DeleteObject", + "s3:ListBucket", + ] + resources = [ + aws_s3_bucket.dynamodb_output.arn, + "${aws_s3_bucket.dynamodb_output.arn}/*", + aws_s3_bucket.dynamodb_output_processed.arn, + "${aws_s3_bucket.dynamodb_output_processed.arn}/*", + ] + } + + statement { + effect = "Allow" + actions = [ + "kms:Decrypt", + "kms:GenerateDataKey" + ] + resources = [ + aws_kms_key.dynamo.arn, + aws_kms_key.dynamo_processed.arn, + ] + } + + statement { + actions = [ + "glue:GetDatabase", + "glue:GetDatabases", + "glue:GetTable", + "glue:GetTables", + "glue:CreateTable", + "glue:UpdateTable", + "glue:DeleteTable", + "glue:GetPartition", + "glue:GetPartitions", + "glue:CreatePartition", + "glue:BatchCreatePartition", + "glue:UpdatePartition", + ] + + resources = [ + "arn:aws:glue:eu-west-2:${data.aws_caller_identity.current.account_id}:catalog", + "arn:aws:glue:eu-west-2:${data.aws_caller_identity.current.account_id}:database/*", + "arn:aws:glue:eu-west-2:${data.aws_caller_identity.current.account_id}:table/*", + ] + + effect = "Allow" + } + + statement { + actions = [ + "glue:GetCrawler", + "glue:StartCrawler", + "glue:GetCrawlerMetrics", + ] + + resources = [ + aws_glue_crawler.log_crawler.arn, + ] + + effect = "Allow" + } + + statement { + effect = "Allow" + actions = [ + "logs:CreateLogGroup", + "logs:CreateLogStream", + "logs:PutLogEvents" + ] + resources = ["arn:aws:logs:*:*:*:/aws-glue/*"] + } + + statement { + effect = "Allow" + actions = [ + "ssm:GetParameter", + ] + resources = ["arn:aws:ssm:eu-west-2:${data.aws_caller_identity.current.account_id}:*"] + } +} + +resource "aws_iam_role_policy" "glue_service_policy" { + name = "${var.name_prefix}-dynamo-glue-service-policy" + role = aws_iam_role.glue_service_role.id + policy = data.aws_iam_policy_document.glue_service_policy.json +} + +resource "aws_glue_catalog_database" "log_database" { + name = "${var.name_prefix}-dynamo-reporting" + location_uri = "${aws_s3_bucket.dynamodb_output_processed.id}/" +} + +resource "aws_glue_crawler" "log_crawler" { + name = "${var.name_prefix}-${var.environment}-dynamo-crawler" + database_name = aws_glue_catalog_database.log_database.name + role = aws_iam_role.glue_service_role.name + delta_target { + delta_tables = ["s3://${aws_s3_bucket.dynamodb_output_processed.id}/processed/dynamo_export_flags"] + write_manifest = false + create_native_delta_table = true + } + schema_change_policy { + delete_behavior = "LOG" + } + configuration = jsonencode({ + "Version" : 1.0, + "Grouping" : { + "TableGroupingPolicy" : "CombineCompatibleSchemas" + } + }) +} + +resource "aws_glue_trigger" "crawler_trigger" { + name = "${var.name_prefix}-crawler-trigger" + type = "ON_DEMAND" + actions { + crawler_name = aws_glue_crawler.log_crawler.name + } +} + +resource "aws_s3_object" "script" { + bucket = aws_s3_bucket.dynamodb_output_processed.bucket + key = "glue_code/glue_job.py" + source = "${path.module}/src/glue_job.py" + source_hash = filemd5("${path.module}/src/glue_job.py") +} + +resource "aws_glue_job" "glue_job" { + name = "${var.name_prefix}-dynamo-export-glue-job" + role_arn = aws_iam_role.glue_service_role.arn + description = "Export DynamoDB data to S3" + glue_version = "5.0" + worker_type = "G.1X" + timeout = 48 * 60 + max_retries = 0 + number_of_workers = 4 + + command { + name = "glueetl" + python_version = 3 + script_location = "s3://${aws_s3_bucket.dynamodb_output_processed.id}/glue_code/glue_job.py" + } + + default_arguments = { + "--enable-auto-scaling" = "true" + "--enable-continuous-cloudwatch-log" = "true" + "--enable-continuous-log-filter" = "true" + "--datalake-formats" = "delta" + "--enable-metrics" = "true" + "--enable-glue-datacatalog" = "true" + "--conf" = "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog --conf spark.delta.logStore.class=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore --conf spark.jars.packages=io.delta:delta-core_2.3:3.3.0" + # "--SLACK_WEBHOOK_URL_SSM_PARAMETER_NAME" = "/${var.environment}-blue/api_config/slack_service_alert_webhook" + "--ENVIRONMENT" = var.environment + } +} diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/kms.tf b/terraform/account-wide-infrastructure/modules/dynamo_export/kms.tf new file mode 100644 index 000000000..6c2ac99d1 --- /dev/null +++ b/terraform/account-wide-infrastructure/modules/dynamo_export/kms.tf @@ -0,0 +1,15 @@ +resource "aws_kms_key" "dynamo" { +} + +resource "aws_kms_alias" "dynamo" { + name = "alias/${var.name_prefix}-dynamo-output-bucket" + target_key_id = aws_kms_key.dynamo.key_id +} + +resource "aws_kms_key" "dynamo_processed" { +} + +resource "aws_kms_alias" "dynamo_processed" { + name = "alias/${var.name_prefix}-dynamo-processed-output-bucket" + target_key_id = aws_kms_key.dynamo_processed.key_id +} diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/lambda.tf b/terraform/account-wide-infrastructure/modules/dynamo_export/lambda.tf new file mode 100644 index 000000000..7f21cf1d8 --- /dev/null +++ b/terraform/account-wide-infrastructure/modules/dynamo_export/lambda.tf @@ -0,0 +1,217 @@ +data "aws_dynamodb_table" "pointer_table" { + name = var.pointer_table_name +} + +data "aws_iam_policy_document" "lambda_assume_role" { + statement { + effect = "Allow" + principals { + type = "Service" + identifiers = ["lambda.amazonaws.com"] + } + actions = ["sts:AssumeRole"] + } +} + +resource "aws_iam_role" "lambda_role" { + name = "${var.name_prefix}-dynamodb-export-lambda" + + assume_role_policy = data.aws_iam_policy_document.lambda_assume_role.json +} + +data "aws_iam_policy_document" "lambda_policy" { + statement { + effect = "Allow" + actions = [ + "dynamodb:ExportTableToPointInTime", + "dynamodb:DescribeExport", + "dynamodb:DescribeTable", + "dynamodb:DescribeContinuousBackups", + ] + resources = ["*"] + } + + statement { + effect = "Allow" + actions = [ + "ssm:GetParameter" + ] + resources = ["arn:aws:ssm:eu-west-2:${data.aws_caller_identity.current.account_id}:*"] + } + + statement { + effect = "Allow" + actions = [ + "s3:PutObject", + "s3:PutObjectAcl", + "s3:AbortMultipartUpload", + "s3:ListMultipartUploadParts" + ] + resources = [ + aws_s3_bucket.dynamodb_output.arn, + "${aws_s3_bucket.dynamodb_output.arn}/*" + ] + } + + statement { + effect = "Allow" + actions = [ + "kms:GenerateDataKey", + "kms:Encrypt", + "kms:Decrypt" + ] + resources = [aws_kms_key.dynamo.arn] + } + + statement { + effect = "Allow" + actions = [ + "logs:CreateLogGroup", + "logs:CreateLogStream", + "logs:PutLogEvents" + ] + resources = ["*"] + } +} + +resource "aws_iam_role_policy" "lambda_policy" { + name = "${var.name_prefix}-dynamodb-export-lambda-policy" + role = aws_iam_role.lambda_role.id + + policy = data.aws_iam_policy_document.lambda_policy.json +} + +data "aws_s3_object" "dynamo_export_trigger_zip" { + bucket = var.asset_bucket + key = "${var.asset_version}/dynamo_export_trigger.zip" +} + +resource "aws_lambda_function" "dynamo_export_trigger" { + function_name = "${var.name_prefix}-dynamo-export-trigger" + role = aws_iam_role.lambda_role.arn + handler = "src.lambdas.dynamo_export_trigger.dynamo_export_trigger.lambda_handler" + runtime = "python3.13" + timeout = 30 + + s3_bucket = data.aws_s3_object.dynamo_export_trigger_zip.bucket + s3_key = data.aws_s3_object.dynamo_export_trigger_zip.key + s3_object_version = data.aws_s3_object.dynamo_export_trigger_zip.version_id + + logging_config { + log_format = "JSON" + } + + environment { + variables = { + BUCKET = aws_s3_bucket.dynamodb_output.id + DDB_TABLE_ARN = data.aws_dynamodb_table.pointer_table.arn + KMS_KEY = aws_kms_key.dynamo.key_id + ENVIRONMENT = var.environment + DDB_TABLE_NAME = var.pointer_table_name + } + } +} + +data "aws_s3_object" "dynamo_export_poll_zip" { + bucket = var.asset_bucket + key = "${var.asset_version}/dynamo_export_poll.zip" +} + +resource "aws_lambda_function" "dynamo_export_poll" { + function_name = "${var.name_prefix}-dynamo-export-poll" + role = aws_iam_role.lambda_role.arn + handler = "src.lambdas.dynamo_export_poll.dynamo_export_poll.lambda_handler" + runtime = "python3.13" + timeout = 30 + + s3_bucket = data.aws_s3_object.dynamo_export_poll_zip.bucket + s3_key = data.aws_s3_object.dynamo_export_poll_zip.key + s3_object_version = data.aws_s3_object.dynamo_export_poll_zip.version_id + + logging_config { + log_format = "JSON" + } + + environment { + variables = { + BUCKET = aws_s3_bucket.dynamodb_output.id + DDB_TABLE_ARN = data.aws_dynamodb_table.pointer_table.arn + KMS_KEY = aws_kms_key.dynamo.key_id + ENVIRONMENT = var.environment + } + } +} + +data "aws_iam_policy_document" "ssm_put_param_assume_role" { + statement { + actions = ["sts:AssumeRole"] + + principals { + type = "Service" + identifiers = ["lambda.amazonaws.com"] + } + } +} + +resource "aws_iam_role" "ssm_put_param_role" { + name = "${var.name_prefix}-ssm-put-param-lambda" + + assume_role_policy = data.aws_iam_policy_document.ssm_put_param_assume_role.json +} + +data "aws_iam_policy_document" "ssm_put_param_policy" { + statement { + effect = "Allow" + actions = [ + "ssm:PutParameter" + ] + resources = ["arn:aws:ssm:eu-west-2:${data.aws_caller_identity.current.account_id}:*"] + } + + statement { + effect = "Allow" + actions = [ + "logs:CreateLogGroup", + "logs:CreateLogStream", + "logs:PutLogEvents" + ] + resources = ["*"] + } +} + +resource "aws_iam_role_policy" "ssm_put_param_policy" { + name = "${var.name_prefix}-ssm-put-param-lambda-policy" + role = aws_iam_role.ssm_put_param_role.id + + policy = data.aws_iam_policy_document.ssm_put_param_policy.json +} + +data "aws_s3_object" "ssm_put_param_zip" { + bucket = var.asset_bucket + key = "${var.asset_version}/ssm_put_param.zip" +} + +resource "aws_lambda_function" "ssm_put_param" { + function_name = "${var.name_prefix}-ssm-put-param" + role = aws_iam_role.ssm_put_param_role.arn + handler = "src.lambdas.ssm_put_param.ssm_put_param.lambda_handler" + runtime = "python3.13" + timeout = 30 + + s3_bucket = data.aws_s3_object.ssm_put_param_zip.bucket + s3_key = data.aws_s3_object.ssm_put_param_zip.key + s3_object_version = data.aws_s3_object.ssm_put_param_zip.version_id + + logging_config { + log_format = "JSON" + } + + environment { + variables = { + BUCKET = aws_s3_bucket.dynamodb_output.id + DDB_TABLE_ARN = data.aws_dynamodb_table.pointer_table.arn + KMS_KEY = aws_kms_key.dynamo.key_id + ENVIRONMENT = var.environment + } + } +} diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/main.tf b/terraform/account-wide-infrastructure/modules/dynamo_export/main.tf new file mode 100644 index 000000000..f519070c2 --- /dev/null +++ b/terraform/account-wide-infrastructure/modules/dynamo_export/main.tf @@ -0,0 +1,12 @@ +terraform { + required_providers { + aws = { + source = "hashicorp/aws" + version = ">= 6.9.0, < 7.0.0" + } + } + + required_version = ">= 1.14" +} + +data "aws_caller_identity" "current" {} diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/outputs.tf b/terraform/account-wide-infrastructure/modules/dynamo_export/outputs.tf new file mode 100644 index 000000000..af4ad2b56 --- /dev/null +++ b/terraform/account-wide-infrastructure/modules/dynamo_export/outputs.tf @@ -0,0 +1,12 @@ +output "dynamo_processed_bucket_arn" { + value = aws_s3_bucket.dynamodb_output_processed.arn + description = "The ARN of the S3 bucket used for processed DynamoDB data." +} +output "dynamo_processed_key_arn" { + value = aws_kms_key.dynamo_processed.arn + description = "The ARN of the KMS key used to encrypt the processed DynamoDB S3 bucket." +} +output "dynamo_export_step_function_arn" { + value = aws_sfn_state_machine.dynamo_export.arn + description = "The ARN of the Step Function for DynamoDB patient history processing." +} diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/src/glue_job.py b/terraform/account-wide-infrastructure/modules/dynamo_export/src/glue_job.py new file mode 100644 index 000000000..4d980df47 --- /dev/null +++ b/terraform/account-wide-infrastructure/modules/dynamo_export/src/glue_job.py @@ -0,0 +1,363 @@ +import hashlib +import logging +import os +import sys +import time +import timeit + +import boto3 +from awsglue.context import GlueContext +from awsglue.utils import getResolvedOptions +from botocore.config import Config +from delta.tables import DeltaTable +from pyspark.context import SparkContext +from pyspark.sql import DataFrame +from pyspark.sql.functions import col, desc, lit, row_number, udf, when +from pyspark.sql.types import StringType, StructField, StructType +from pyspark.sql.window import Window + +spark_context = SparkContext.getOrCreate() +glue_context = GlueContext(spark_context) +session = glue_context.spark_session + +MSG_FORMAT = "%(asctime)s %(levelname)s %(name)s: %(message)s" +DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S" +logging.basicConfig(format=MSG_FORMAT, datefmt=DATETIME_FORMAT) +logger = logging.getLogger("ETLLogger") +logger.setLevel(logging.INFO) + +region = os.environ.get("AWS_REGION", "eu-west-2") +glue = boto3.client( + service_name="glue", + region_name=region, + config=Config(connect_timeout=5, read_timeout=5), +) +ssm = boto3.client( + "ssm", + config=Config(connect_timeout=5, read_timeout=5), +) + +ARGS = getResolvedOptions( + sys.argv, + [ + "SOURCE_BUCKET", + "DDB_TABLE_ARN", + "TARGET_BUCKET", + "GLUE_CRAWLER_NAME", + "EXPORT_TYPE", + "SLACK_WEBHOOK_URL_SSM_PARAMETER_NAME", + "ENVIRONMENT", + ], +) + +KEYS = [ + "pk", + "sk", + "author", + "category", + "category_id", + "created_on", + "custodian", + "custodian_suffix", + "id", + "master_identifier", + "nhs_number", + "patient_key", + "patient_sort", + "producer_id", + "source", + "type", + "type_id", + "updated_on", + "version", +] + +MAP_KEYS = [ + "document", +] + +DERIVED_KEYS = [ + "date", + "test_patient", +] + + +# def get_ssm_parameter(parameter): +# """Get ssm parameter value""" +# return ssm.get_parameter(Name=parameter, WithDecryption=True)["Parameter"]["Value"] + + +# def send_slack_notification(webhook_url: str, exception: Optional[Exception]): +# """Send a message to Slack via webhook.""" +# alert_message = { +# "blocks": [ +# { +# "type": "section", +# "text": { +# "type": "mrkdwn", +# "text": f"*Dynamo Export Processing Failed in {ARGS['ENVIRONMENT']} environment*\nUncaught exception: {exception}", +# }, +# }, +# ] +# } +# response = requests.post( +# webhook_url, +# data=json.dumps(alert_message), +# headers={"Content-Type": "application/json"}, +# ) + +# if response.status_code != 200: +# raise ValueError( +# f"Request to Slack returned {response.status_code}, {response.text}" +# ) + + +@udf +def hash_nhs(nhs_number: str) -> str: + """Hash the NHS number using SHA-256.""" + if not nhs_number: + return "" + + return hashlib.sha256(nhs_number.encode()).hexdigest() + + +def validate_df_schema(df: DataFrame) -> DataFrame: + """Ensure that the DataFrame has the expected schema for NewImage and OldImage.""" + logger.info("Validating DataFrame schema for NewImage and OldImage columns.") + image_schema = StructType( + [StructField(k, StructType([StructField("S", StringType())])) for k in KEYS] + + [ + StructField(k, StructType([StructField("M", StructType([]))])) + for k in MAP_KEYS + ] + ) + + for column in ["NewImage", "OldImage"]: + if column not in df.columns: + logger.info( + f"{column} column is missing from DataFrame. Adding empty column." + ) + df = df.withColumn(column, lit(None).cast(image_schema)) + + return df + + +def run_crawler( + crawler: str, *, timeout_minutes: int = 120, retry_seconds: int = 5 +) -> None: + """Run the specified AWS Glue crawler, waiting until completion.""" + timeout_seconds = timeout_minutes * 60 + start_time = timeit.default_timer() + abort_time = start_time + timeout_seconds + + def wait_until_ready() -> None: + state_previous = None + while True: + response_get = glue.get_crawler(Name=crawler) + state = response_get["Crawler"]["State"] + if state != state_previous: + logger.info(f"Crawler {crawler} is {state.lower()}.") + state_previous = state + if state == "READY": + return + if timeit.default_timer() > abort_time: + raise TimeoutError( + f"Failed to crawl {crawler}. The allocated time of {timeout_minutes:,} minutes has elapsed." + ) + time.sleep(retry_seconds) + + wait_until_ready() + try: + response_start = glue.start_crawler(Name=crawler) + assert response_start["ResponseMetadata"]["HTTPStatusCode"] == 200 + except glue.exceptions.CrawlerRunningException as e: + logger.info(f"{crawler} is already running: {e}") + logger.info(f"Crawling {crawler}.") + wait_until_ready() + logger.info(f"Crawled {crawler}.") + + +def process_full_export(df: DataFrame) -> None: + """Process full export DataFrame.""" + fields = [field for field in KEYS + MAP_KEYS if field in df.columns] + + df = df.withColumnRenamed("nhs_number", "nhs_number_raw") + df = ( + df.withColumn( + "test_patient", + when( + col("nhs_number_raw").startswith("9") + | col("nhs_number_raw").startswith("5"), + True, + ).otherwise(False), + ) + .withColumn("nhs_number", hash_nhs(col("nhs_number_raw"))) + .withColumn("date", df["last_updated"].cast("date")) + .select(*fields, *DERIVED_KEYS) + ) + + df.write.format("delta").mode("append").partitionBy("date").save( + f"s3://{ARGS['TARGET_BUCKET']}/processed/dynamo_export_flags" + ) + + run_crawler(ARGS["GLUE_CRAWLER_NAME"]) + + +def safe_select(df: DataFrame, field: str, dtype: str, image: str) -> col: + """Safely select a field from the DataFrame, returning None if the field does not exist""" + try: + df.select(f"{image}.{field}.{dtype}") + return col(f"{image}.{field}.{dtype}").alias(field) + except Exception: + logger.info( + f"Field {field} of type {dtype} is missing in {image}. Returning null for this field." + ) + return lit(None).alias(field) + + +def process_incremental_export(df: DataFrame) -> None: + """Process incremental export DataFrame""" + df = validate_df_schema(df).cache() + df = df.withColumn( + "eventName", + when(col("NewImage").isNotNull() & col("OldImage").isNull(), "INSERT") + .when(col("NewImage").isNotNull() & col("OldImage").isNotNull(), "MODIFY") + .when(col("NewImage").isNull() & col("OldImage").isNotNull(), "REMOVE") + .otherwise(None), + ).cache() + + window = ( + Window.partitionBy("nhs_number", "path") + .orderBy(desc("last_updated")) + .rowsBetween(Window.unboundedPreceding, Window.currentRow) + ) + upserted = ( + df.filter(col("eventName").isin("INSERT", "MODIFY")) + .select( + *[safe_select(df, field, "S", "NewImage") for field in KEYS], + *[safe_select(df, field, "M", "NewImage") for field in MAP_KEYS], + ) + .withColumn("rn", row_number().over(window)) + .filter(col("rn") == 1) + .drop("rn") + .cache() + ) + logger.info(f"Upserted DataFrame has {upserted.count()} rows.") + + deleted = ( + df.filter(col("eventName") == "REMOVE") + .select( + safe_select(df, "nhs_number", "S", "OldImage"), + safe_select(df, "path", "S", "OldImage"), + ) + .dropDuplicates(subset=["nhs_number", "path"]) + .cache() + ) + logger.info(f"Deleted DataFrame has {deleted.count()} rows.") + + delta_table = DeltaTable.forPath( + session, + f"s3://{ARGS['TARGET_BUCKET']}/processed/dynamo_export_flags", + ) + + if not upserted.isEmpty(): + run_upsert_logic(delta_table, upserted, KEYS + MAP_KEYS) + + if not deleted.isEmpty(): + run_delete_logic(delta_table, deleted) + + run_crawler(ARGS["GLUE_CRAWLER_NAME"]) + + +def run_delete_logic(delta_table: DeltaTable, deleted: DataFrame) -> None: + """Delete records from Delta table based on deleted DataFrame""" + logger.info("Running delete logic for removed records.") + deleted = deleted.withColumnRenamed("nhs_number", "nhs_number_raw") + deleted = deleted.withColumn("nhs_number", hash_nhs(col("nhs_number_raw"))) + delta_table.alias("t").merge( + deleted.alias("d"), + "t.nhs_number = d.nhs_number AND t.path = d.path", + ).whenMatchedDelete().execute() + + +def run_upsert_logic( + delta_table: DeltaTable, upserted: DataFrame, fields: list[str] +) -> None: + """Upsert records into Delta table based on upserted DataFrame""" + logger.info("Running upsert logic for inserted/modified records.") + upserted = upserted.withColumnRenamed("nhs_number", "nhs_number_raw") + upserted = ( + upserted.withColumn( + "test_patient", + when( + col("nhs_number_raw").startswith("9") + | col("nhs_number_raw").startswith("5"), + True, + ).otherwise(False), + ) + .withColumn("nhs_number", hash_nhs(col("nhs_number_raw"))) + .withColumn( + "date", + upserted["last_updated"].cast("date"), + ) + .select(*fields, *DERIVED_KEYS) + ) + + delta_table.alias("t").merge( + upserted.alias("u"), + "t.nhs_number = u.nhs_number AND t.path = u.path", + ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute() + + +def main(): + """Process DynamoDB export data and write to Delta Lake.""" + if ARGS["EXPORT_TYPE"] == "FULL_EXPORT": + logger.info("Starting full export processing.") + df = glue_context.create_dynamic_frame.from_options( + connection_type="dynamodb", + connection_options={ + "dynamodb.export": "s3", + "dynamodb.tableArn": ARGS["DDB_TABLE_ARN"], + "dynamodb.s3.bucket": ARGS["SOURCE_BUCKET"], + "dynamodb.s3.prefix": "AWSDynamoDB/", + "dynamodb.simplifyDDBJson": True, # Only applicable for the full export + }, + ).toDF() + + if df.isEmpty(): + return + + process_full_export(df) + return + + if ARGS["EXPORT_TYPE"] == "INCREMENTAL_EXPORT": + logger.info("Starting incremental export processing.") + df = ( + glue_context.create_dynamic_frame.from_options( + connection_type="dynamodb", + connection_options={ + "dynamodb.export": "s3", + "dynamodb.tableArn": ARGS["DDB_TABLE_ARN"], + "dynamodb.s3.bucket": ARGS["SOURCE_BUCKET"], + "dynamodb.s3.prefix": "AWSDynamoDB/data/", + }, + ) + .toDF() + .dropDuplicates() + ) + + if df.isEmpty(): + return + + process_incremental_export(df) + + +if __name__ == "__main__": + try: + main() + except Exception as e: + # send_slack_notification( + # webhook_url=get_ssm_parameter(ARGS["SLACK_WEBHOOK_URL_SSM_PARAMETER_NAME"]), + # exception=e, + # ) + raise diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/ssm_put_param/__init__.py b/terraform/account-wide-infrastructure/modules/dynamo_export/ssm_put_param/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/ssm_put_param/ssm_put_param.py b/terraform/account-wide-infrastructure/modules/dynamo_export/ssm_put_param/ssm_put_param.py new file mode 100644 index 000000000..a9d64c581 --- /dev/null +++ b/terraform/account-wide-infrastructure/modules/dynamo_export/ssm_put_param/ssm_put_param.py @@ -0,0 +1,18 @@ +import boto3 +from botocore.config import Config + +ssm = boto3.client( + "ssm", + config=Config(connect_timeout=5, read_timeout=5), +) + + +def lambda_handler(event, _): + param_name = "/exports/DynamoExportRuntime" + param_value = event["export_to_time"] + ssm.put_parameter(Name=param_name, Value=param_value, Type="String", Overwrite=True) + return { + "to_time": param_value, + "export_arns": event["export_arns"], + "export_type": event["export_type"], + } diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/step_function.tf b/terraform/account-wide-infrastructure/modules/dynamo_export/step_function.tf new file mode 100644 index 000000000..07ee74f2b --- /dev/null +++ b/terraform/account-wide-infrastructure/modules/dynamo_export/step_function.tf @@ -0,0 +1,61 @@ +data "aws_iam_policy_document" "step_functions_assume_role" { + statement { + effect = "Allow" + principals { + type = "Service" + identifiers = ["states.amazonaws.com"] + } + actions = ["sts:AssumeRole"] + } +} + +resource "aws_iam_role" "step_functions_role" { + name = "${var.name_prefix}-dynamodb-export-sf-role" + assume_role_policy = data.aws_iam_policy_document.step_functions_assume_role.json +} + +data "aws_iam_policy_document" "step_functions_policy" { + statement { + effect = "Allow" + actions = [ + "lambda:InvokeFunction", + "lambda:ListFunctions" + ] + resources = [ + aws_lambda_function.dynamo_export_trigger.arn, + aws_lambda_function.dynamo_export_poll.arn, + aws_lambda_function.ssm_put_param.arn + ] + } + + statement { + effect = "Allow" + actions = [ + "glue:StartJobRun", + "glue:GetJobRun" + ] + resources = [aws_glue_job.glue_job.arn] + } +} + +resource "aws_iam_role_policy" "step_functions_policy" { + name = "${var.name_prefix}-dynamodb-export-sf-policy" + role = aws_iam_role.step_functions_role.id + policy = data.aws_iam_policy_document.step_functions_policy.json +} + +resource "aws_sfn_state_machine" "dynamo_export" { + name = "${var.name_prefix}-dynamodb-export-sf" + role_arn = aws_iam_role.step_functions_role.arn + + definition = templatefile("${path.module}/dynamo_export_step_function_definition.json", { + lambda_export_trigger_function_name = aws_lambda_function.dynamo_export_trigger.function_name + lambda_export_poll_function_name = aws_lambda_function.dynamo_export_poll.function_name + lambda_ssm_put_param_function_name = aws_lambda_function.ssm_put_param.function_name + dynamo_export_s3_bucket_name = aws_s3_bucket.dynamodb_output.id + dynamo_export_processed_s3_bucket_name = aws_s3_bucket.dynamodb_output_processed.id + ddb_table_arn = data.aws_dynamodb_table.pointer_table.arn + glue_job_name = aws_glue_job.glue_job.name + glue_crawler_name = aws_glue_crawler.log_crawler.name + }) +} diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/variables.tf b/terraform/account-wide-infrastructure/modules/dynamo_export/variables.tf new file mode 100644 index 000000000..6c0b14575 --- /dev/null +++ b/terraform/account-wide-infrastructure/modules/dynamo_export/variables.tf @@ -0,0 +1,24 @@ +variable "name_prefix" { + type = string + description = "The prefix to apply to all resources in the module" +} + +variable "environment" { + type = string + description = "Environment in use" +} + +variable "pointer_table_name" { + type = string + description = "patient_flags_datastore table name" +} + +variable "asset_bucket" { + type = string + description = "Name of the bucket that holds lambda zips" +} + +variable "asset_version" { + type = string + description = "Version for the lambda zips to use during deployment" +} From 704b38051efe253870f09d35b498c87b2bb8ce13 Mon Sep 17 00:00:00 2001 From: jackleary Date: Thu, 5 Mar 2026 12:04:31 +0000 Subject: [PATCH 02/15] nrl-2002 Sort lambdas for deployment --- .../dynamo_export_poll/__init__.py | 0 .../dynamo_export_poll/dynamo_export_poll.py | 0 .../dynamo_export_trigger/__init__.py | 0 .../dynamo_export_trigger.py | 0 .../ssm_put_param/__init__.py | 0 .../ssm_put_param/ssm_put_param.py | 0 .../dev/dynamo_export.tf | 6 ++++ .../modules/dynamo_export/lambda.tf | 30 ++++--------------- .../modules/dynamo_export/main.tf | 11 ------- 9 files changed, 12 insertions(+), 35 deletions(-) rename {terraform/account-wide-infrastructure/modules/dynamo_export => lambdas}/dynamo_export_poll/__init__.py (100%) rename {terraform/account-wide-infrastructure/modules/dynamo_export => lambdas}/dynamo_export_poll/dynamo_export_poll.py (100%) rename {terraform/account-wide-infrastructure/modules/dynamo_export => lambdas}/dynamo_export_trigger/__init__.py (100%) rename {terraform/account-wide-infrastructure/modules/dynamo_export => lambdas}/dynamo_export_trigger/dynamo_export_trigger.py (100%) rename {terraform/account-wide-infrastructure/modules/dynamo_export => lambdas}/ssm_put_param/__init__.py (100%) rename {terraform/account-wide-infrastructure/modules/dynamo_export => lambdas}/ssm_put_param/ssm_put_param.py (100%) create mode 100644 terraform/account-wide-infrastructure/dev/dynamo_export.tf diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/dynamo_export_poll/__init__.py b/lambdas/dynamo_export_poll/__init__.py similarity index 100% rename from terraform/account-wide-infrastructure/modules/dynamo_export/dynamo_export_poll/__init__.py rename to lambdas/dynamo_export_poll/__init__.py diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/dynamo_export_poll/dynamo_export_poll.py b/lambdas/dynamo_export_poll/dynamo_export_poll.py similarity index 100% rename from terraform/account-wide-infrastructure/modules/dynamo_export/dynamo_export_poll/dynamo_export_poll.py rename to lambdas/dynamo_export_poll/dynamo_export_poll.py diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/dynamo_export_trigger/__init__.py b/lambdas/dynamo_export_trigger/__init__.py similarity index 100% rename from terraform/account-wide-infrastructure/modules/dynamo_export/dynamo_export_trigger/__init__.py rename to lambdas/dynamo_export_trigger/__init__.py diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/dynamo_export_trigger/dynamo_export_trigger.py b/lambdas/dynamo_export_trigger/dynamo_export_trigger.py similarity index 100% rename from terraform/account-wide-infrastructure/modules/dynamo_export/dynamo_export_trigger/dynamo_export_trigger.py rename to lambdas/dynamo_export_trigger/dynamo_export_trigger.py diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/ssm_put_param/__init__.py b/lambdas/ssm_put_param/__init__.py similarity index 100% rename from terraform/account-wide-infrastructure/modules/dynamo_export/ssm_put_param/__init__.py rename to lambdas/ssm_put_param/__init__.py diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/ssm_put_param/ssm_put_param.py b/lambdas/ssm_put_param/ssm_put_param.py similarity index 100% rename from terraform/account-wide-infrastructure/modules/dynamo_export/ssm_put_param/ssm_put_param.py rename to lambdas/ssm_put_param/ssm_put_param.py diff --git a/terraform/account-wide-infrastructure/dev/dynamo_export.tf b/terraform/account-wide-infrastructure/dev/dynamo_export.tf new file mode 100644 index 000000000..4e6d69e09 --- /dev/null +++ b/terraform/account-wide-infrastructure/dev/dynamo_export.tf @@ -0,0 +1,6 @@ +module "dynamo_export" { + source = "./modules/dynamo_export" + name_prefix = "nhsd-nrlf--dev" + environment = "dev" + pointer_table_name = module.dev-pointers-table.table_name +} diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/lambda.tf b/terraform/account-wide-infrastructure/modules/dynamo_export/lambda.tf index 7f21cf1d8..15d6a595d 100644 --- a/terraform/account-wide-infrastructure/modules/dynamo_export/lambda.tf +++ b/terraform/account-wide-infrastructure/modules/dynamo_export/lambda.tf @@ -81,11 +81,6 @@ resource "aws_iam_role_policy" "lambda_policy" { policy = data.aws_iam_policy_document.lambda_policy.json } -data "aws_s3_object" "dynamo_export_trigger_zip" { - bucket = var.asset_bucket - key = "${var.asset_version}/dynamo_export_trigger.zip" -} - resource "aws_lambda_function" "dynamo_export_trigger" { function_name = "${var.name_prefix}-dynamo-export-trigger" role = aws_iam_role.lambda_role.arn @@ -93,9 +88,8 @@ resource "aws_lambda_function" "dynamo_export_trigger" { runtime = "python3.13" timeout = 30 - s3_bucket = data.aws_s3_object.dynamo_export_trigger_zip.bucket - s3_key = data.aws_s3_object.dynamo_export_trigger_zip.key - s3_object_version = data.aws_s3_object.dynamo_export_trigger_zip.version_id + filename = "${path.module}/../../../../dist/dynamo_export_trigger.zip" + source_code_hash = filebase64sha256("${path.module}/../../../../dist/dynamo_export_trigger.zip") logging_config { log_format = "JSON" @@ -112,11 +106,6 @@ resource "aws_lambda_function" "dynamo_export_trigger" { } } -data "aws_s3_object" "dynamo_export_poll_zip" { - bucket = var.asset_bucket - key = "${var.asset_version}/dynamo_export_poll.zip" -} - resource "aws_lambda_function" "dynamo_export_poll" { function_name = "${var.name_prefix}-dynamo-export-poll" role = aws_iam_role.lambda_role.arn @@ -124,9 +113,8 @@ resource "aws_lambda_function" "dynamo_export_poll" { runtime = "python3.13" timeout = 30 - s3_bucket = data.aws_s3_object.dynamo_export_poll_zip.bucket - s3_key = data.aws_s3_object.dynamo_export_poll_zip.key - s3_object_version = data.aws_s3_object.dynamo_export_poll_zip.version_id + filename = "${path.module}/../../../../dist/dynamo_export_poll.zip" + source_code_hash = filebase64sha256("${path.module}/../../../../dist/dynamo_export_poll.zip") logging_config { log_format = "JSON" @@ -186,11 +174,6 @@ resource "aws_iam_role_policy" "ssm_put_param_policy" { policy = data.aws_iam_policy_document.ssm_put_param_policy.json } -data "aws_s3_object" "ssm_put_param_zip" { - bucket = var.asset_bucket - key = "${var.asset_version}/ssm_put_param.zip" -} - resource "aws_lambda_function" "ssm_put_param" { function_name = "${var.name_prefix}-ssm-put-param" role = aws_iam_role.ssm_put_param_role.arn @@ -198,9 +181,8 @@ resource "aws_lambda_function" "ssm_put_param" { runtime = "python3.13" timeout = 30 - s3_bucket = data.aws_s3_object.ssm_put_param_zip.bucket - s3_key = data.aws_s3_object.ssm_put_param_zip.key - s3_object_version = data.aws_s3_object.ssm_put_param_zip.version_id + filename = "${path.module}/../../../../dist/ssm_put_param.zip" + source_code_hash = filebase64sha256("${path.module}/../../../../dist/ssm_put_param.zip") logging_config { log_format = "JSON" diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/main.tf b/terraform/account-wide-infrastructure/modules/dynamo_export/main.tf index f519070c2..8fc4b38cc 100644 --- a/terraform/account-wide-infrastructure/modules/dynamo_export/main.tf +++ b/terraform/account-wide-infrastructure/modules/dynamo_export/main.tf @@ -1,12 +1 @@ -terraform { - required_providers { - aws = { - source = "hashicorp/aws" - version = ">= 6.9.0, < 7.0.0" - } - } - - required_version = ">= 1.14" -} - data "aws_caller_identity" "current" {} From c115f480323ed68a9225a9ebd2feab07fe00a744 Mon Sep 17 00:00:00 2001 From: jackleary Date: Thu, 5 Mar 2026 13:40:40 +0000 Subject: [PATCH 03/15] NRL-2002 Update table name --- .../modules/dynamo_export/src/glue_job.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/src/glue_job.py b/terraform/account-wide-infrastructure/modules/dynamo_export/src/glue_job.py index 4d980df47..aaf2fdeb3 100644 --- a/terraform/account-wide-infrastructure/modules/dynamo_export/src/glue_job.py +++ b/terraform/account-wide-infrastructure/modules/dynamo_export/src/glue_job.py @@ -197,7 +197,7 @@ def process_full_export(df: DataFrame) -> None: ) df.write.format("delta").mode("append").partitionBy("date").save( - f"s3://{ARGS['TARGET_BUCKET']}/processed/dynamo_export_flags" + f"s3://{ARGS['TARGET_BUCKET']}/processed/dynamo_export_pointers" ) run_crawler(ARGS["GLUE_CRAWLER_NAME"]) @@ -257,7 +257,7 @@ def process_incremental_export(df: DataFrame) -> None: delta_table = DeltaTable.forPath( session, - f"s3://{ARGS['TARGET_BUCKET']}/processed/dynamo_export_flags", + f"s3://{ARGS['TARGET_BUCKET']}/processed/dynamo_export_pointers", ) if not upserted.isEmpty(): From 4ef535aeaac16d7404c406ca8ab83cfa84481bcb Mon Sep 17 00:00:00 2001 From: Anjali Trace Date: Wed, 29 Apr 2026 16:21:51 +0100 Subject: [PATCH 04/15] NRL-2002 Remove references to patient flags --- .../modules/dynamo_export/outputs.tf | 2 +- .../modules/dynamo_export/variables.tf | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/outputs.tf b/terraform/account-wide-infrastructure/modules/dynamo_export/outputs.tf index af4ad2b56..70895a957 100644 --- a/terraform/account-wide-infrastructure/modules/dynamo_export/outputs.tf +++ b/terraform/account-wide-infrastructure/modules/dynamo_export/outputs.tf @@ -8,5 +8,5 @@ output "dynamo_processed_key_arn" { } output "dynamo_export_step_function_arn" { value = aws_sfn_state_machine.dynamo_export.arn - description = "The ARN of the Step Function for DynamoDB patient history processing." + description = "The ARN of the Step Function for DynamoDB pointer history processing." } diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/variables.tf b/terraform/account-wide-infrastructure/modules/dynamo_export/variables.tf index 6c0b14575..e3dffc413 100644 --- a/terraform/account-wide-infrastructure/modules/dynamo_export/variables.tf +++ b/terraform/account-wide-infrastructure/modules/dynamo_export/variables.tf @@ -10,7 +10,7 @@ variable "environment" { variable "pointer_table_name" { type = string - description = "patient_flags_datastore table name" + description = "Name of the pointer table to export" } variable "asset_bucket" { From 6cdfa8b7d70b0e8e3e6af574a56f7d1f06e98681 Mon Sep 17 00:00:00 2001 From: Anjali Trace Date: Thu, 30 Apr 2026 16:33:42 +0100 Subject: [PATCH 05/15] NRL-2002 Remove unused variables --- .../account-wide-infrastructure/dev/dynamo_export.tf | 2 +- .../modules/dynamo_export/variables.tf | 10 ---------- 2 files changed, 1 insertion(+), 11 deletions(-) diff --git a/terraform/account-wide-infrastructure/dev/dynamo_export.tf b/terraform/account-wide-infrastructure/dev/dynamo_export.tf index 4e6d69e09..6c624a802 100644 --- a/terraform/account-wide-infrastructure/dev/dynamo_export.tf +++ b/terraform/account-wide-infrastructure/dev/dynamo_export.tf @@ -1,5 +1,5 @@ module "dynamo_export" { - source = "./modules/dynamo_export" + source = "../modules/dynamo_export" name_prefix = "nhsd-nrlf--dev" environment = "dev" pointer_table_name = module.dev-pointers-table.table_name diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/variables.tf b/terraform/account-wide-infrastructure/modules/dynamo_export/variables.tf index e3dffc413..60c031c90 100644 --- a/terraform/account-wide-infrastructure/modules/dynamo_export/variables.tf +++ b/terraform/account-wide-infrastructure/modules/dynamo_export/variables.tf @@ -12,13 +12,3 @@ variable "pointer_table_name" { type = string description = "Name of the pointer table to export" } - -variable "asset_bucket" { - type = string - description = "Name of the bucket that holds lambda zips" -} - -variable "asset_version" { - type = string - description = "Version for the lambda zips to use during deployment" -} From 9f50f2ec666d8b19eb372cfa379616e79069d10d Mon Sep 17 00:00:00 2001 From: Anjali Trace Date: Thu, 30 Apr 2026 16:34:17 +0100 Subject: [PATCH 06/15] NRL-2002 Add new dynamo export lambdas to build --- Makefile | 6 +++- lambdas/dynamo_export/Makefile | 33 +++++++++++++++++++ .../dynamo_export_poll.py | 2 +- .../dynamo_export_trigger.py | 2 +- .../ssm_put_param.py | 2 +- lambdas/dynamo_export_poll/__init__.py | 0 lambdas/dynamo_export_trigger/__init__.py | 0 lambdas/ssm_put_param/__init__.py | 0 8 files changed, 41 insertions(+), 4 deletions(-) create mode 100644 lambdas/dynamo_export/Makefile rename lambdas/{dynamo_export_poll => dynamo_export}/dynamo_export_poll.py (96%) rename lambdas/{dynamo_export_trigger => dynamo_export}/dynamo_export_trigger.py (98%) rename lambdas/{ssm_put_param => dynamo_export}/ssm_put_param.py (92%) delete mode 100644 lambdas/dynamo_export_poll/__init__.py delete mode 100644 lambdas/dynamo_export_trigger/__init__.py delete mode 100644 lambdas/ssm_put_param/__init__.py diff --git a/Makefile b/Makefile index 59f353180..b727577d3 100644 --- a/Makefile +++ b/Makefile @@ -61,12 +61,16 @@ check-deploy: ## check the deploy environment is setup correctly check-deploy-warn: @SHOULD_WARN_ONLY=true ./scripts/check-deploy-environment.sh -build: check-warn build-api-packages build-layers build-dependency-layer build-seed-sandbox-lambda ## Build the project +build: check-warn build-api-packages build-layers build-dependency-layer build-seed-sandbox-lambda build-dynamo-export-lambdas ## Build the project build-seed-sandbox-lambda: @echo "Building seed_sandbox Lambda" @cd lambdas/seed_sandbox && make build +build-dynamo-export-lambdas: + @echo "Building dynamo_export Lambdas" + @cd lambdas/dynamo_export && make build + build-dependency-layer: @echo "Building Lambda dependency layer" @mkdir -p $(DIST_PATH) diff --git a/lambdas/dynamo_export/Makefile b/lambdas/dynamo_export/Makefile new file mode 100644 index 000000000..af09c6f83 --- /dev/null +++ b/lambdas/dynamo_export/Makefile @@ -0,0 +1,33 @@ +.PHONY: * + +FILE_TO_PACKAGE?= + +clean: + @echo "Cleaning build artifacts..." + rm -rf build + @echo "✓ Clean complete" + +build-lambda: clean + $(eval LAMBDA_NAME := $(basename ${FILE_TO_PACKAGE})) + @echo "Building $(LAMBDA_NAME) Lambda deployment package..." + mkdir -p build + + # Copy the handler + cp $(FILE_TO_PACKAGE) build/ + + # Create the zip file in root dist + mkdir -p ../../dist + cd build && zip -r "../../../dist/${LAMBDA_NAME}.zip" . -x "*.pyc" -x "__pycache__/*" -x ".DS_Store" + + @echo "✓ Lambda package created: ../../dist/${LAMBDA_NAME}.zip" + +build-dynamo-export-trigger: + FILE_TO_PACKAGE=dynamo_export_trigger.py make build-lambda + +build-dynamo-export-poll: + FILE_TO_PACKAGE=dynamo_export_poll.py make build-lambda + +build-ssm-put-param: + FILE_TO_PACKAGE=ssm_put_param.py make build-lambda + +build: build-dynamo-export-trigger build-dynamo-export-poll build-ssm-put-param diff --git a/lambdas/dynamo_export_poll/dynamo_export_poll.py b/lambdas/dynamo_export/dynamo_export_poll.py similarity index 96% rename from lambdas/dynamo_export_poll/dynamo_export_poll.py rename to lambdas/dynamo_export/dynamo_export_poll.py index 44cc53a31..5f8ec6df3 100644 --- a/lambdas/dynamo_export_poll/dynamo_export_poll.py +++ b/lambdas/dynamo_export/dynamo_export_poll.py @@ -7,7 +7,7 @@ ) -def lambda_handler(event, _): +def lambda_handler(event, _context): completed = [] for arn in event["export_arns"]: response = ddb.describe_export(ExportArn=arn) diff --git a/lambdas/dynamo_export_trigger/dynamo_export_trigger.py b/lambdas/dynamo_export/dynamo_export_trigger.py similarity index 98% rename from lambdas/dynamo_export_trigger/dynamo_export_trigger.py rename to lambdas/dynamo_export/dynamo_export_trigger.py index a2c9ce3a9..dd4c74a77 100644 --- a/lambdas/dynamo_export_trigger/dynamo_export_trigger.py +++ b/lambdas/dynamo_export/dynamo_export_trigger.py @@ -23,7 +23,7 @@ ) -def lambda_handler(_, __): +def lambda_handler(_event, _context): to_time = datetime.now(timezone.utc).replace(microsecond=0, second=0, minute=0) export_arns = [] diff --git a/lambdas/ssm_put_param/ssm_put_param.py b/lambdas/dynamo_export/ssm_put_param.py similarity index 92% rename from lambdas/ssm_put_param/ssm_put_param.py rename to lambdas/dynamo_export/ssm_put_param.py index a9d64c581..a5ce4a8b1 100644 --- a/lambdas/ssm_put_param/ssm_put_param.py +++ b/lambdas/dynamo_export/ssm_put_param.py @@ -7,7 +7,7 @@ ) -def lambda_handler(event, _): +def lambda_handler(event, _context): param_name = "/exports/DynamoExportRuntime" param_value = event["export_to_time"] ssm.put_parameter(Name=param_name, Value=param_value, Type="String", Overwrite=True) diff --git a/lambdas/dynamo_export_poll/__init__.py b/lambdas/dynamo_export_poll/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/lambdas/dynamo_export_trigger/__init__.py b/lambdas/dynamo_export_trigger/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/lambdas/ssm_put_param/__init__.py b/lambdas/ssm_put_param/__init__.py deleted file mode 100644 index e69de29bb..000000000 From 6566ba186a705086f0834e8be4d912df53689855 Mon Sep 17 00:00:00 2001 From: Anjali Trace Date: Tue, 5 May 2026 15:16:10 +0100 Subject: [PATCH 07/15] NRL-2002 Update handler names --- .../modules/dynamo_export/lambda.tf | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/lambda.tf b/terraform/account-wide-infrastructure/modules/dynamo_export/lambda.tf index 15d6a595d..f6555812c 100644 --- a/terraform/account-wide-infrastructure/modules/dynamo_export/lambda.tf +++ b/terraform/account-wide-infrastructure/modules/dynamo_export/lambda.tf @@ -84,7 +84,7 @@ resource "aws_iam_role_policy" "lambda_policy" { resource "aws_lambda_function" "dynamo_export_trigger" { function_name = "${var.name_prefix}-dynamo-export-trigger" role = aws_iam_role.lambda_role.arn - handler = "src.lambdas.dynamo_export_trigger.dynamo_export_trigger.lambda_handler" + handler = "dynamo_export_trigger.lambda_handler" runtime = "python3.13" timeout = 30 @@ -109,7 +109,7 @@ resource "aws_lambda_function" "dynamo_export_trigger" { resource "aws_lambda_function" "dynamo_export_poll" { function_name = "${var.name_prefix}-dynamo-export-poll" role = aws_iam_role.lambda_role.arn - handler = "src.lambdas.dynamo_export_poll.dynamo_export_poll.lambda_handler" + handler = "dynamo_export_poll.lambda_handler" runtime = "python3.13" timeout = 30 @@ -177,7 +177,7 @@ resource "aws_iam_role_policy" "ssm_put_param_policy" { resource "aws_lambda_function" "ssm_put_param" { function_name = "${var.name_prefix}-ssm-put-param" role = aws_iam_role.ssm_put_param_role.arn - handler = "src.lambdas.ssm_put_param.ssm_put_param.lambda_handler" + handler = "ssm_put_param.lambda_handler" runtime = "python3.13" timeout = 30 From 7cdd3b7a546a1b00c271cfeceb1468618848c531 Mon Sep 17 00:00:00 2001 From: Anjali Trace Date: Tue, 5 May 2026 16:06:55 +0100 Subject: [PATCH 08/15] NRL-2002 Add instructions for enabling point-in-time recovery mode required for dynamo export --- terraform/account-wide-infrastructure/README.md | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/terraform/account-wide-infrastructure/README.md b/terraform/account-wide-infrastructure/README.md index 538841694..f66c8592c 100644 --- a/terraform/account-wide-infrastructure/README.md +++ b/terraform/account-wide-infrastructure/README.md @@ -9,7 +9,7 @@ Each subdirectory corresponds to each AWS account (`mgmt`, `prod`, `test` and `d ## Table of Contents 1. [Prerequisites](#prerequisites) -2. [Initialise shell environment](#initialise-shell-environment) +2. [Deploy mgmt resources](#deploy-mgmt-resources) 3. [Deploy account wide resources](#deploy-account-wide-resources) 4. [Tear down account wide resources](#tear-down-account-wide-resources) @@ -160,6 +160,15 @@ To disable the PowerBI Gateway from the account: 1. Set the `enable_powerbi_auto_push` variable to `false` in `./ACCOUNT_NAME/vars.tf` 2. Deploy the account-wide infrastructure to the account +#### Dynamo Export + +Data from Dynamo is exported to Athena for reporting via a step function using AWS Glue. + +This requires point-in-time recovery to be enabled on the desired dynamo table. +This is already enabled on the prod and int pointer tables, but no others. + +If you'd like to trigger this step function on any other pointer table e.g. dev, perftest you will need to temporarily [enable point-in-time recovery via the console](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/PointInTimeRecovery_Howitworks.html#howitworks-enable-pitr-console). + ## Tear down account wide resources WARNING - This action will destroy all account-wide resources from the AWS account. This should From 68b4d564032dddb9cb0a0504904b9baf871e731c Mon Sep 17 00:00:00 2001 From: Anjali Trace Date: Wed, 6 May 2026 13:29:05 +0100 Subject: [PATCH 09/15] NRL-2002 Use KMS key arn rather than uuid to prevent export failing on The table encryption key is not accessible --- .../modules/dynamo_export/lambda.tf | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/lambda.tf b/terraform/account-wide-infrastructure/modules/dynamo_export/lambda.tf index f6555812c..1cc4f75bc 100644 --- a/terraform/account-wide-infrastructure/modules/dynamo_export/lambda.tf +++ b/terraform/account-wide-infrastructure/modules/dynamo_export/lambda.tf @@ -99,7 +99,7 @@ resource "aws_lambda_function" "dynamo_export_trigger" { variables = { BUCKET = aws_s3_bucket.dynamodb_output.id DDB_TABLE_ARN = data.aws_dynamodb_table.pointer_table.arn - KMS_KEY = aws_kms_key.dynamo.key_id + KMS_KEY = aws_kms_key.dynamo.arn ENVIRONMENT = var.environment DDB_TABLE_NAME = var.pointer_table_name } @@ -124,7 +124,7 @@ resource "aws_lambda_function" "dynamo_export_poll" { variables = { BUCKET = aws_s3_bucket.dynamodb_output.id DDB_TABLE_ARN = data.aws_dynamodb_table.pointer_table.arn - KMS_KEY = aws_kms_key.dynamo.key_id + KMS_KEY = aws_kms_key.dynamo.arn ENVIRONMENT = var.environment } } @@ -192,7 +192,7 @@ resource "aws_lambda_function" "ssm_put_param" { variables = { BUCKET = aws_s3_bucket.dynamodb_output.id DDB_TABLE_ARN = data.aws_dynamodb_table.pointer_table.arn - KMS_KEY = aws_kms_key.dynamo.key_id + KMS_KEY = aws_kms_key.dynamo.arn ENVIRONMENT = var.environment } } From ce68b0d8fada496575706b467a6638efe8e4c233 Mon Sep 17 00:00:00 2001 From: Anjali Trace Date: Wed, 6 May 2026 14:50:37 +0100 Subject: [PATCH 10/15] NRL-2002 Give dyanmo export trigger lambda perms to access the table KMS encryption key --- .../account-wide-infrastructure/dev/dynamo_export.tf | 9 +++++---- .../modules/dynamo_export/lambda.tf | 9 +++++++++ .../modules/dynamo_export/variables.tf | 5 +++++ .../modules/pointers-table/output.tf | 5 +++++ 4 files changed, 24 insertions(+), 4 deletions(-) diff --git a/terraform/account-wide-infrastructure/dev/dynamo_export.tf b/terraform/account-wide-infrastructure/dev/dynamo_export.tf index 6c624a802..6ad81cb82 100644 --- a/terraform/account-wide-infrastructure/dev/dynamo_export.tf +++ b/terraform/account-wide-infrastructure/dev/dynamo_export.tf @@ -1,6 +1,7 @@ module "dynamo_export" { - source = "../modules/dynamo_export" - name_prefix = "nhsd-nrlf--dev" - environment = "dev" - pointer_table_name = module.dev-pointers-table.table_name + source = "../modules/dynamo_export" + name_prefix = "nhsd-nrlf--dev" + environment = "dev" + pointer_table_name = module.dev-pointers-table.table_name + pointer_table_kms_key_arn = module.dev-pointers-table.kms_key_arn } diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/lambda.tf b/terraform/account-wide-infrastructure/modules/dynamo_export/lambda.tf index 1cc4f75bc..31b837a70 100644 --- a/terraform/account-wide-infrastructure/modules/dynamo_export/lambda.tf +++ b/terraform/account-wide-infrastructure/modules/dynamo_export/lambda.tf @@ -63,6 +63,15 @@ data "aws_iam_policy_document" "lambda_policy" { resources = [aws_kms_key.dynamo.arn] } + statement { + effect = "Allow" + actions = [ + "kms:Decrypt", + "kms:DescribeKey" + ] + resources = [var.pointer_table_kms_key_arn] + } + statement { effect = "Allow" actions = [ diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/variables.tf b/terraform/account-wide-infrastructure/modules/dynamo_export/variables.tf index 60c031c90..40cc62d2d 100644 --- a/terraform/account-wide-infrastructure/modules/dynamo_export/variables.tf +++ b/terraform/account-wide-infrastructure/modules/dynamo_export/variables.tf @@ -12,3 +12,8 @@ variable "pointer_table_name" { type = string description = "Name of the pointer table to export" } + +variable "pointer_table_kms_key_arn" { + type = string + description = "ARN of the KMS key used to encrypt the pointer table" +} diff --git a/terraform/account-wide-infrastructure/modules/pointers-table/output.tf b/terraform/account-wide-infrastructure/modules/pointers-table/output.tf index a565987b8..738328dfb 100644 --- a/terraform/account-wide-infrastructure/modules/pointers-table/output.tf +++ b/terraform/account-wide-infrastructure/modules/pointers-table/output.tf @@ -12,3 +12,8 @@ output "write_policy_arn" { description = "Policy to write to the pointers table" value = aws_iam_policy.pointers-table-write.arn } + +output "kms_key_arn" { + description = "ARN of the KMS key used to encrypt the pointers table" + value = aws_kms_key.pointers-table-key.arn +} From 2f019a3c689d89167ec743f34261a7c3e8faec95 Mon Sep 17 00:00:00 2001 From: Anjali Trace Date: Wed, 6 May 2026 15:31:35 +0100 Subject: [PATCH 11/15] NRL-2002 Comment out unused required arg for slack notification on error --- .../modules/dynamo_export/src/glue_job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/src/glue_job.py b/terraform/account-wide-infrastructure/modules/dynamo_export/src/glue_job.py index aaf2fdeb3..d34153e98 100644 --- a/terraform/account-wide-infrastructure/modules/dynamo_export/src/glue_job.py +++ b/terraform/account-wide-infrastructure/modules/dynamo_export/src/glue_job.py @@ -45,7 +45,7 @@ "TARGET_BUCKET", "GLUE_CRAWLER_NAME", "EXPORT_TYPE", - "SLACK_WEBHOOK_URL_SSM_PARAMETER_NAME", + # "SLACK_WEBHOOK_URL_SSM_PARAMETER_NAME", "ENVIRONMENT", ], ) From 3d89d76e891b697330218409348ae40e4bf954ab Mon Sep 17 00:00:00 2001 From: Anjali Trace Date: Wed, 6 May 2026 15:46:15 +0100 Subject: [PATCH 12/15] NRL-2002 initialise export_type in case no from_times calculated --- lambdas/dynamo_export/dynamo_export_trigger.py | 1 + 1 file changed, 1 insertion(+) diff --git a/lambdas/dynamo_export/dynamo_export_trigger.py b/lambdas/dynamo_export/dynamo_export_trigger.py index dd4c74a77..b4b9b341f 100644 --- a/lambdas/dynamo_export/dynamo_export_trigger.py +++ b/lambdas/dynamo_export/dynamo_export_trigger.py @@ -26,6 +26,7 @@ def lambda_handler(_event, _context): to_time = datetime.now(timezone.utc).replace(microsecond=0, second=0, minute=0) export_arns = [] + export_type = None try: from_time = ssm.get_parameter(Name=SSM_PARAM)["Parameter"]["Value"] From 3164a0ffb4a185e7cb3176395039a054429355bd Mon Sep 17 00:00:00 2001 From: Anjali Trace Date: Wed, 6 May 2026 15:53:23 +0100 Subject: [PATCH 13/15] NRL-2002 logs for debugging --- lambdas/dynamo_export/dynamo_export_trigger.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lambdas/dynamo_export/dynamo_export_trigger.py b/lambdas/dynamo_export/dynamo_export_trigger.py index b4b9b341f..bc8fbf895 100644 --- a/lambdas/dynamo_export/dynamo_export_trigger.py +++ b/lambdas/dynamo_export/dynamo_export_trigger.py @@ -40,9 +40,13 @@ def lambda_handler(_event, _context): )["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"][ "EarliestRestorableDateTime" ] + print("earliest_pitr", earliest_pitr) # noqa: T201 from_time = max(from_time, earliest_pitr) + print("from_time", from_time) # noqa: T201 days_difference = (to_time - from_time).days + 1 + print("days_difference", days_difference) # noqa: T201 from_times = [from_time + timedelta(days=i) for i in range(days_difference)] + print("from_times", from_times) # noqa: T201 for base_time in from_times: end_time = min(base_time + timedelta(days=1), to_time) From b6638c345c649a7f805706c0c5a37b84ed1947c6 Mon Sep 17 00:00:00 2001 From: Anjali Trace Date: Wed, 6 May 2026 16:57:25 +0100 Subject: [PATCH 14/15] NRL-2002 remove prints no longer needed to debug --- lambdas/dynamo_export/dynamo_export_trigger.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/lambdas/dynamo_export/dynamo_export_trigger.py b/lambdas/dynamo_export/dynamo_export_trigger.py index bc8fbf895..731927776 100644 --- a/lambdas/dynamo_export/dynamo_export_trigger.py +++ b/lambdas/dynamo_export/dynamo_export_trigger.py @@ -40,13 +40,10 @@ def lambda_handler(_event, _context): )["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"][ "EarliestRestorableDateTime" ] - print("earliest_pitr", earliest_pitr) # noqa: T201 + from_time = max(from_time, earliest_pitr) - print("from_time", from_time) # noqa: T201 days_difference = (to_time - from_time).days + 1 - print("days_difference", days_difference) # noqa: T201 from_times = [from_time + timedelta(days=i) for i in range(days_difference)] - print("from_times", from_times) # noqa: T201 for base_time in from_times: end_time = min(base_time + timedelta(days=1), to_time) From 40696ac2aafd20aa76e74a932f1aed35bbbbc98c Mon Sep 17 00:00:00 2001 From: Anjali Trace Date: Wed, 6 May 2026 17:10:08 +0100 Subject: [PATCH 15/15] NRL-2002 rename last updated column to be valid --- .../modules/dynamo_export/src/glue_job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/src/glue_job.py b/terraform/account-wide-infrastructure/modules/dynamo_export/src/glue_job.py index d34153e98..85f28bc48 100644 --- a/terraform/account-wide-infrastructure/modules/dynamo_export/src/glue_job.py +++ b/terraform/account-wide-infrastructure/modules/dynamo_export/src/glue_job.py @@ -192,7 +192,7 @@ def process_full_export(df: DataFrame) -> None: ).otherwise(False), ) .withColumn("nhs_number", hash_nhs(col("nhs_number_raw"))) - .withColumn("date", df["last_updated"].cast("date")) + .withColumn("date", df["updated_on"].cast("date")) .select(*fields, *DERIVED_KEYS) )