AWS 使用API Gateway+Lambda构建REST API

记录如何通过API Gateway+Lambda实现REST API功能

目标: 实现企微群机器人的自定义通知功能

整体流程

flowchart TD;
a["客户端"] --> |1.POST| b["AWS API Gateway"]
b --> |2.转发请求| c["Lambda函数"]
c --> |3.数据转换| d["企业微信API接口"]
d --> |4.推送消息| e["机器人在聊天群发送消息"]
d --> |4.返回请求结果| c --> |5.传递结果| b --> |6.传递结果| a

创建资源

创建Lambda

前置创建IAM策略/创建Role/创建Lambda等操作略过, 本次不需要特殊权限, Lambda所用的角色只需要AWSLambdaBasicExecutionRoleAWSLambdaVPCAccessExecutionRole即可

创建API Gateway

参考教程: 教程:利用 API Gateway 使用 Lambda - AWS Lambda

参见下图, 选择REST API即可

API Gateway类型

出现以下提示, 大致意思是提供了一个预置模板, 忽略即可

创建提示

因为第一次使用, 直接引用模板的内容作为学习

使用预置模板
创建后提示

修改基本信息

首先修改默认名称和描述信息

修改名称和描述信息

创建资源

因为用于Jenkins, 创建一个同名的资源, 其他资源和请求都删了

创建资源

创建请求

发信请求一般都是POST, 因此创建资源下的请求

创建请求
选择POST请求类型

集成类型选择Lambda, 确认区域和名称信息, 点击保存

集成类型
允许添加权限

测试请求

在展示页面点击测试按钮进行测试

测试API Gateway

输入测试内容后, 点击测试按钮

测试样例

页面下方会展示相关的执行日志等信息

执行结果返回

发布API

按照下图, 点击发布API

发布API

填写相关信息, 点击发布API

补充相关发布信息

返回的页面会展示请求所需的URL, 记录备用

部署完成

👆🏻上面的结果是Stage的URL, 并不是请求的, 需要导航到具体方法, 或者对上面的结果进行拼接, 见下图

实际URL

测试API

1
2
3
4
5
6
# 通过API Gateway发布构建结果
ROBOT_KEY="YOUR_GROUP_ROBOT_KEY"
API_URL="https://YOUR_KEY.execute-api.cn-north-1.amazonaws.com.cn/PROD/jenkins"

curl -X POST -d "{\"key\":\"${ROBOT_KEY}\",\"type\":\"card\",\"source_icon_url\":\"https://img1.imgtp.com/2023/04/10/cODmyqWA.ico\",\"source_desc\":\"Jenkins\",\"source_desc_color\":3,\"main_title\":\"Jenkins构建通知\",\"main_title_desc\":\"项目: ${JOB_NAME} 第${BUILD_NUMBER}次构建\",\"emphasis_title\":\"成功\",\"emphasis_desc\":\"点击本卡片查看构建详细日志\",\"card_url\":\"${BUILD_URL}/console\"}" ${API_URL}

测试结果可用

测试结果

结合Jenkins使用

普通Shell环境

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
function main() {
# 主要功能函数
}

# 创建发送通知函数
function send_notice() {
# 通过API Gateway发布构建结果
ROBOT_KEY="YOUR_BOT_KEY"
API_URL="https://YOUR_UNIQ_KEY.execute-api.cn-north-1.amazonaws.com.cn/PROD/jenkins"
ICON_URL="https://img1.imgtp.com/2023/04/10/cODmyqWA.ico"

if [ "$1" -eq "0" ];
then
STATUS='成功'
COLOR=3
else
STATUS='失败'
COLOR=2
fi

curl -X POST -d "{\"key\":\"${ROBOT_KEY}\",\"type\":\"card\",\"source_icon_url\":\"${ICON_URL}\",\"source_desc\":\"Jenkins\",\"source_desc_color\":${COLOR},\"main_title\":\"Jenkins构建通知\",\"main_title_desc\":\"项目: ${JOB_NAME} 第${BUILD_NUMBER}次构建\",\"emphasis_title\":\"${STATUS}\",\"emphasis_desc\":\"点击本卡片查看项目信息\",\"card_url\":\"${BUILD_URL}\"}" ${API_URL}
}

# 下面的main是将全部需要执行的功能封装好的函数
main
send_notice $?

Groovy脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
pipeline {
parameters {
string(
name: 'qywx_bot_key',
defaultValue: 'YOUR_BOT_KEY',
description: '企业微信群机器人Key',
trim: true
)
}
stages {
stage('核心功能') {
steps {
// 执行核心功能
}
}
post {
success {
script {
env.STATUS = '成功'
// 颜色为3表示字体为绿色
env.COLOR = 3
env.API_URL = "https://YOUR_UNIQ_KEY.execute-api.cn-north-1.amazonaws.com.cn/PROD/jenkins"
env.ICON_URL = "https://img1.imgtp.com/2023/04/10/cODmyqWA.ico"
sh "python ${params.jenkins_script_dir}/jenkins_email.py --email_type deploy --job_name ${JOB_NAME} --job_status success --job_url ${JOB_URL} --build_url ${BUILD_URL} --build_number ${BUILD_NUMBER} --receivers ${params.email_receives} --test_report_url ${BUILD_URL} --cc ${params.success_cc}"
sh "curl -X POST -d '{\"key\":\"${params.qywx_bot_key}\",\"type\":\"card\",\"source_icon_url\":\"${env.ICON_URL}\",\"source_desc\":\"Jenkins\",\"source_desc_color\":${env.COLOR},\"main_title\":\"Jenkins构建通知\",\"main_title_desc\":\"项目: ${JOB_NAME} 第${BUILD_NUMBER}次构建\",\"emphasis_title\":\"${env.STATUS}\",\"emphasis_desc\":\"点击本卡片查看项目信息\",\"card_url\":\"${BUILD_URL}\"}' ${env.API_URL}"
}
}
failure {
script {
env.STATUS = '失败'
// 颜色为2表示字体为红色
env.COLOR = 2
env.API_URL = "https://YOUR_UNIQ_KEY.execute-api.cn-north-1.amazonaws.com.cn/PROD/jenkins"
env.ICON_URL = "https://img1.imgtp.com/2023/04/10/cODmyqWA.ico"
sh "python ${params.jenkins_script_dir}/jenkins_email.py --email_type deploy --job_name ${JOB_NAME} --job_status failure --job_url ${JOB_URL} --build_url ${BUILD_URL} --build_number ${BUILD_NUMBER} --receivers ${params.email_receives} --test_report_url ${BUILD_URL} --cc ${params.failed_cc}"
sh "curl -X POST -d '{\"key\":\"${params.qywx_bot_key}\",\"type\":\"card\",\"source_icon_url\":\"${env.ICON_URL}\",\"source_desc\":\"Jenkins\",\"source_desc_color\":${env.COLOR},\"main_title\":\"Jenkins构建通知\",\"main_title_desc\":\"项目: ${JOB_NAME} 第${BUILD_NUMBER}次构建\",\"emphasis_title\":\"${env.STATUS}\",\"emphasis_desc\":\"点击本卡片查看项目信息\",\"card_url\":\"${BUILD_URL}\"}' ${env.API_URL}"
}
}
}

效果展示

Jenkins通知效果

更新记录

🆙2023-04-26通过询问ChatGPT, 得知可以通过Jenkins的REST API获取相关事件触发信息

如何获取当前构建的信息

因此, 结合环境变量, 可以通过以下命令获取返回的信息

1
2
# 获取当前构建的详细信息
curl -s "http://jenkins.goldwind.com.cn/job/${JOB_NAME}/${BUILD_NUMBER}/api/json"

样例信息如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
{
"_class": "org.jenkinsci.plugins.workflow.job.WorkflowRun",
"actions": [{
"_class": "hudson.model.CauseAction",
"causes": [{
"_class": "hudson.model.Cause$UpstreamCause",
"shortDescription": "Started by upstream project \"Test-Trigger\" build number 16",
"upstreamBuild": 16,
"upstreamProject": "Test-Trigger",
"upstreamUrl": "job/Test-Trigger/"
}]
},
{
"_class": "jenkins.metrics.impl.TimeInQueueAction",
"blockedDurationMillis": 0,
"blockedTimeMillis": 0,
"buildableDurationMillis": 0,
"buildableTimeMillis": 0,
"buildingDurationMillis": 39,
"executingTimeMillis": 39,
"executorUtilization": 1,
"subTaskCount": 0,
"waitingDurationMillis": 8777,
"waitingTimeMillis": 8777
},
{},
{},
{},
{},
{},
{
"_class": "org.jenkinsci.plugins.pipeline.modeldefinition.actions.RestartDeclarativePipelineAction"
},
{},
{
"_class": "org.jenkinsci.plugins.workflow.job.views.FlowGraphAction"
},
{},
{},
{}
],
"artifacts": [],
"building": false,
"description": null,
"displayName": "#6",
"duration": 39,
"estimatedDuration": 37,
"executor": null,
"fullDisplayName": "tg #6",
"id": "6",
"keepLog": false,
"number": 6,
"queueId": 397764,
"result": "SUCCESS",
"timestamp": 1682487526586,
"url": "http://jenkins.example.com/job/tg/6/",
"changeSets": [],
"culprits": [],
"nextBuild": null,
"previousBuild": {
"number": 5,
"url": "http://jenkins.example.com/job/tg/5/"
}
}

再结合jq命令, 可以对其进行解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 获取原始JSON字符串
BUILD_JSON=$(curl -s "http://jenkins.goldwind.com.cn/job/${JOB_NAME}/${BUILD_NUMBER}/api/json")
# 获取上游项目名称
UPSTREAM_PROJECT=$(echo ${BUILD_JSON}|jq '.actions[0].causes[0].upstreamProject')
# 获取上游项目构建ID
UPSTREAM_BUILD_ID=$(echo ${BUILD_JSON}|jq '.actions[0].causes[0].upstreamBuild')
# 拼接构建触发信息
if [ "${UPSTREAM_PROJECT}" != "null" -a "${UPSTREAM_BUILD_ID}" != "null" ]
then
TRIGGER="${UPSTREAM_PROJECT}项目的第${UPSTREAM_BUILD_ID}次构建"
else
TRIGGER="手工"
fi
BUILD_CAUSE="本次构建由${TRIGGER}所触发"
echo "${BUILD_CAUSE}"

因此Shell类型的可直接使用上面的代码, pipeline则需要一定的修改, 可以参考一下环境变量

参考ChatGPT对话结论

如何使用多行shell脚本执行结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
environment {
TRIGGER = sh(returnStdout: true, script: """
# 获取原始JSON字符串
BUILD_JSON=\$(curl -s "http://jenkins.example.com/job/${JOB_NAME}/${BUILD_NUMBER}/api/json")
# 获取上游项目名称
UPSTREAM_PROJECT=\$(echo \${BUILD_JSON}|jq '.actions[0].causes[0].upstreamProject' |tr -d '"')
# 获取上游项目构建ID
UPSTREAM_BUILD_ID=\$(echo \${BUILD_JSON}|jq '.actions[0].causes[0].upstreamBuild')
# 拼接构建触发信息
if [ "\${UPSTREAM_PROJECT}" != "null" -a "\${UPSTREAM_BUILD_ID}" != "null" ]
then
echo "\${UPSTREAM_PROJECT} - \${UPSTREAM_BUILD_ID}"
else
echo "手动"
fi
""").trim()
}

上面的groovy定义了一个名为TRIGGER的环境变量, 其中

  • 如果由其他项目触发

    返回触发项目名 - 构建ID, 结合发信功能可实现类似如下样式

    被触发样式
  • 如果手工执行

    返回手动, 结合发信功能可实现类似如下样式

    手动执行示意

结合Bitbucket使用

Managing webhooks in Bitbucket Server | Bitbucket Data Center and Server 5.15 | Atlassian Documentation

Event payload | Bitbucket Data Center and Server 5.15 | Atlassian Documentation

使用 API Gateway 控制台设置请求和响应数据映射 - Amazon API Gateway

将 Amazon Lambda 与 Amazon API Gateway 结合使用 - Amazon Lambda

目标

实现Bitbucket支持的自定义Webhook功能

Webhook初探

参考如下图片创建Webhook

创建Webhook
所需参数和支持的类型

通过上面可以看到参数:

  • Name: [必填]直观名称, 随意取

  • URL: [必填]调用Webhook的URL地址, 对应API Gateway的发布API地址

  • Sercret: [可选]传递的密钥文件, 通过官方文档可以了解到详细信息

    When you define a secret for a webhook, each request is signed via a Hash-based Message Authentication Code (HMAC).

    The default for this algorithm is HMACSha256. The header X-Hub-Signature is defined and contains the HMAC.

    To authenticate the validity of the message payload, the receiver can perform the HMAC algorithm on the received body with the secret as the key to the HMAC algorithm.

    If the results do not match, it may indicate there was a problem with transmission that has caused the message payload to change.

从上面的文档提取两个关键要点:

  1. Secret会使用HMACSha256算法对结果进行加密
  2. Secret会通过Headers传递, 并且其Key-Value样式一定为: {'X-Hub-Signature': 'SECRET_WITH_ALG'}

基于以上, 可以通过Secret传递群机器人的Key信息, 用于发送到不同的群使用

整体流程

flowchart TD;
a["Bitbucket Server"] --> |触发Webhook|b["API Gateway"]
b --> |转发请求| c["解析Lambda"]
c --> |解析请求| d["发信Lambda"] --> |处理请求|e["发信流程"]

设置API Gateway

修改Method Request

首先按照上文中创建请求相关内容, 创建名为bitbucket的资源, 和对应POST方法的请求, 然后按照下图图示配置方法请求

创建方法请求

然后创建请求头, 参考下图

方法请求头

创建后, 勾选Required, 因为该请求头用于确定Bot的Key信息, 因此必须提供

将请求头设置为强制要求

同样的, 还需要添加X-Event-Key这个请求头, 最终结果如下

添加两个必须的请求头

修改Intgration Request

按照下图图示配置集成请求

配置集成请求

使用Lambda代理集成方式实现

🆙2023-04-12通过提交工单询问AWS后台支持, 如果要将请求头转发为LambdaEvent信息, 可以激活代理集成功能, 即参见下图☑️Use Lambda Proxy integration

激活代理集成
激活确认
添加权限确认

使用Mapping方式实现

在弹出的页面, 点击HTTP Header, 并添加请求头映射信息, 详细参见文档1

创建Header映射

点击Mapped from旁边的可以查看说明, 如下图

转换说明

添加信息

  1. 请求类型
    • Name: X-Event-Key
    • Mapped from: method.request.header.X-Event-Key
  2. 加密后签名
    • Name: X-Hub-Signature
    • Mapped from: method.request.header.X-Hub-Signature
  3. AWS异步调用(强制要求, 否则无法异步调用)
    • Name: X-Amz-Invocation-Type
    • Mapped from: 'Event'

结果见下图

添加集成请求信息

点击Mapping Templates, 并添加映射模板信息, 按照下图操作即可, 其中Content-Type默认为application/json, 更多详细自定义信息, 请参考文档2

创建映射模板

根据ChatGPT反馈结果, 需要进行修改

ChatGPT讨论截图

因此需要将body-json的键值对, 修改为"body" : "$util.escapeJavaScript($input.body)"

测试请求

Bitbucket点击Test connection进行测试, 使用Lambda直接记录请求结果

Bitbucket测试请求

使用Lambda代理集成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
{
'resource': '/bitbucket',
'path': '/bitbucket',
'httpMethod': 'POST',
'headers': {
'Accept': '*/*',
'Host': 'YOUR_KEY.execute-api.cn-north-1.amazonaws.com.cn',
'User-Agent': 'Atlassian HttpClient 1.1.0 / Bitbucket-5.15.1 (5015001) / Default',
'Via': '1.1 localhost (Apache-HttpClient/4.5.5 (cache))',
'X-Amzn-Trace-Id': 'Root=***********************f',
'X-Event-Key': 'diagnostics:ping',
'X-Forwarded-For': '********',
'X-Forwarded-Port': '443',
'X-Forwarded-Proto': 'https',
'X-Request-Id': '******************************'
},
'multiValueHeaders': {
'Accept': ['*/*'],
'Host': ['YOUR_KEY.execute-api.cn-north-1.amazonaws.com.cn'],
'User-Agent': ['Atlassian HttpClient 1.1.0 / Bitbucket-5.15.1 (5015001) / Default'],
'Via': ['1.1 localhost (Apache-HttpClient/4.5.5 (cache))'],
'X-Amzn-Trace-Id': ['Root=***********************f'],
'X-Event-Key': ['diagnostics:ping'],
'X-Forwarded-For': ['********'],
'X-Forwarded-Port': ['443'],
'X-Forwarded-Proto': ['https'],
'X-Request-Id': ['******************************']
},
'queryStringParameters': None,
'multiValueQueryStringParameters': None,
'pathParameters': None,
'stageVariables': None,
'requestContext': {
'resourceId': '*******',
'resourcePath': '/bitbucket',
'httpMethod': 'POST',
'extendedRequestId': '*****',
'requestTime': '12/Apr/2023:02:58:37 +0000',
'path': '/PROD/bitbucket',
'accountId': '***********',
'protocol': 'HTTP/1.1',
'stage': 'PROD',
'domainPrefix': 'YOUR_KEY',
'requestTimeEpoch': 1681268317175,
'requestId': '******************************',
'identity': {
'cognitoIdentityPoolId': None,
'accountId': None,
'cognitoIdentityId': None,
'caller': None,
'sourceIp': '********',
'principalOrgId': None,
'accessKey': None,
'cognitoAuthenticationType': None,
'cognitoAuthenticationProvider': None,
'userArn': None,
'userAgent': 'Atlassian HttpClient 1.1.0 / Bitbucket-5.15.1 (5015001) / Default',
'user': None
},
'domainName': 'YOUR_KEY.execute-api.cn-north-1.amazonaws.com.cn',
'apiId': 'YOUR_KEY'
},
'body': None,
'isBase64Encoded': False
}

使用Mapping实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
{
'body': '{}',
'params': {
'path': {},
'querystring': {},
'header': {
'Accept': '*/*',
'Host': 'YOUR_KEY.execute-api.cn-north-1.amazonaws.com.cn',
'User-Agent': 'Atlassian HttpClient 1.1.0 / Bitbucket-5.15.1 (5015001) / Default',
'Via': '1.1 localhost (Apache-HttpClient/4.5.5 (cache))',
'X-Amzn-Trace-Id': 'Root=***********************e',
'X-Event-Key': 'diagnostics:ping',
'X-Forwarded-For': '***********',
'X-Forwarded-Port': '443',
'X-Forwarded-Proto': 'https',
'X-Request-Id': '******************************'
}
},
'stage-variables': {},
'context': {
'account-id': '',
'api-id': 'YOUR_KEY',
'api-key': '',
'authorizer-principal-id': '',
'caller': '',
'cognito-authentication-provider': '',
'cognito-authentication-type': '',
'cognito-identity-id': '',
'cognito-identity-pool-id': '',
'http-method': 'POST',
'stage': 'PROD',
'source-ip': '***********',
'user': '',
'user-agent': 'Atlassian HttpClient 1.1.0 / Bitbucket-5.15.1 (5015001) / Default',
'user-arn': '',
'request-id': '******************************',
'resource-id': '********',
'resource-path': '/bitbucket'
}
}

使用Lambda处理请求

根据前文说明, 根据不同的X-Event-Key, 需要解析为不同的内容, 主要有以下几种, 详细请参考官方文档3

X-Event-Key 触发条件 用途说明
diagnostics:ping 点击Test connection Bitbucket测试请求, 应返回200结果, 并提示信息已被处理, 转发请求为测试内容
repo:refs_changed 执行git push 新内容已经推送至该存储库
pr:opened 创建Pull Request 创建任何PR请求时会通过企微机器人发送通知
pr:merged 合并Pull Request PR请求通过时, 会触发相关信息通知
pr:declined 拒绝合并Pull Request PR请求被拒绝时, 会触发相关信息通知

Python3 处理源码

获取了亚马逊云技术后台技术团队的支持

尊敬的客户,您好! 感谢您联系亚马逊云科技技术支持团队.

根据我们在电话上的沟通,总结如下:

  • 问题描述:

    您使用API Gateway的Lambda集成,并且为后端Lambda函数配置了另一个Lambda函数作为异步调用的目标。您当前尝试调用时遇到了502报错。 如果我对此问题理解有误,烦请您随时指正,谢谢!

  • 分析过程:

    很高兴与您通过电话沟通。

    我们今天通过在线会议共享屏幕共同查看了您的控制台。

    在今天的会议中,您为您的API Gateway开启了执行日志。我们检查了您的APIGW执行日志并注意到了502报错信息内容为:Malformed Lambda proxy response.

    我们判断由于当前您的后端集成类型为Lambda代理集成,您的后端Lambda需要按照特定格式返回响应。有关 用于代理集成的 Lambda 函数的输出格式 的更多内容请参考:

    在 API Gateway 中设置 Lambda 代理集成 - Amazon API Gateway

    您随后可以正常调用APIGW。但是您配置的Lambda函数目标函数没有被调用。

    我们重新考虑了您当前使用的架构。

    您的客户端调用APIGW后,后端的Lambda函数会执行,您希望Lambda函数执行后会通过Lambda目标调用另外一个Lambda函数。其中Lambda目标的典型使用场景为将异步调用记录发送到另一个服务,即需要Lambda函数被异步调用或流调用。APIGW调用Lambda默认为同步调用,因此在之前的测试中,您的目标函数没有被调用。 有关配置异步调用目标的更多内容请参考:

    异步调用 - Amazon Lambda

    随后我们将您的APIGW后端Lambda集成调整为Lambda自定义集成。您之前使用Lambda代理集成是因为您需要将客户端请求的特定header传递至Lambda继承中。我们在Lambda自定义集成中通过配置映射模板传递了您的Header信息,并在APIGW方法请求配置了静态header使得您的APIGW会自行异步调用Lambda。经测试,您的后端Lambda中可以取到header。有关映射模板的更多信息请参考:

    API Gateway 映射模板和访问日志记录变量引用 - Amazon API Gateway

    期间您也自行配置了同步调用Lambda并成功使用SDK调用另一个Lambda函数

    希望以上信息对您有帮助。

如果您在此遇到相关问题,欢迎随时联系我们!

此致 (Best Regards),

Hao S.

您的满意是我们持续改进的动力。

欢迎您点击右上角的评分选项,为此次回复进行满意度评价。感谢您的反馈!

We value your feedback. Please share your experience by rating this correspondence. Each correspondence can also be rated by selecting the stars in top right corner of each correspondence within the Support Center.

需要重点注意:

  1. 如果使用了Lambda代理集成功能, 相关返回请求的格式有明确要求4
  2. 不使用代理集成的情况下, 一定需要确保正文body纯字符串, 否则无法通过HMAC SHA256算法验证, 进而无法与已知机器人Key做匹配, 这点务必确定
点击展开
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
用于解析转发Bitbucket的webhook请求.

- Author: Rex Zhou <879582094@qq.com>
- Created Time: 2023/4/12 14:32
- Copyright: Copyright © 2023 Rex Zhou. All rights reserved.
"""

__version__ = "0.0.4"

__author__ = "Rex Zhou"
__copyright__ = "Copyright © 2023 Rex Zhou. All rights reserved."
__credits__ = [__author__]
__license__ = "None"
__maintainer__ = __author__
__email__ = "879582094@qq.com"

import hashlib
import hmac
import json
import logging
import os
from pathlib import Path

import boto3

# 日志等级
LOG_LEVEL = os.environ.get('LOG_LEVEL', 'info').upper()
# 后处理Lambda函数ARN/Name
ENV_POST_LAMBDA = 'POST_LAMBDA'
POST_LAMBDA = os.environ.get(ENV_POST_LAMBDA)

if bool(os.environ.get("AWS_SESSION_TOKEN")):
IN_LAMBDA = True
logger = logging.getLogger()
logger.setLevel(LOG_LEVEL)
client = boto3.client('lambda')
else:
IN_LAMBDA = False
logging.basicConfig(level=LOG_LEVEL)
logger = logging.getLogger()
client = boto3.Session(profile_name='5729').client('lambda')


class BitBucketParser:
"""Bitbucket Webhook解析器"""

#: 加密算法
ALG = hashlib.sha256

#: 提取Key信息的环境变量名称
ENV_NAME = 'QYWX_BOT_KEYS'

#: 提取Key的分隔符
SEP = ','

#: 图标URL
ICON = 'https://img1.imgtp.com/2023/04/13/wXZHiRht.png'

def __init__(self):
self.__template = {
'type': 'card',
'source_icon_url': self.ICON,
'source_desc': 'Bitbucket',
'source_desc_color': 3,
'main_title': '未知项目',
'main_title_desc': '触发者: {author}',
'emphasis_title': '未知操作',
'emphasis_desc': '点击本卡片查看详细信息',
'card_url': 'http://jenkins.goldwind.com.cn/'
}
self.logger = logging.getLogger(self.__class__.__name__)

def __verify_key(self, key: str, payload: str, signature: str) -> bool:
"""
校验机器人Key是否与提供值匹配方法

:param key: :class:`str` 机器人Key值
:param payload: :class:`str` 负载JSON字符串
:param signature: :class:`str` 请求签名字符串
:return: :class:`bool` 是否匹配
"""
self.logger.debug('正在尝试Key: %s', key)
hasher = hmac.new(key.encode(), payload.encode(), self.ALG)
digest = "sha256=" + hasher.hexdigest()
self.logger.debug('十六进制摘要结果: %s', digest)
result = hmac.compare_digest(digest, signature)
self.logger.debug('匹配结果: %s', result)
return result

def match_key_by_payload(self, payload: str, signature: str) -> str:
"""
遍历解析请求负载字典功能, 通过预置的Key清单逐个检查请求发生目的机器人

:param payload: :class:`str` 请求负载JSON字符串
:param signature: :class:`str` 请求签名字符串
:return: :class:`str` 匹配的企微机器人Key字符串
"""
self.logger.debug('尝试解析负载JSON: %s', payload)
self.logger.info('尝试匹配加密后签名: %s', signature)
for key in self.keys:
if self.__verify_key(key, payload, signature):
found = key
msg = f'已找到匹配企微群机器人Key: {key}'
self.logger.info(msg)
break
else:
msg = f"无法找到匹配机器人Key信息, 请在环境变量: {self.ENV_NAME} " \
f"中添加以'{self.SEP}'分割的机器人Key!"
self.logger.critical(msg)
raise LookupError(msg)
return found

@property
def keys(self) -> list:
"""
获取全部支持的机器人Key列表功能

:return: :class:`list` 机器人Key列表
"""
keys = os.environ.get(self.ENV_NAME, '')
self.logger.info('环境变量获取结果: %s', keys)
keys = [_ for _ in keys.split(self.SEP) if _]
self.logger.info('分割后结果: %s', keys)
return keys

def _parse_push(self, payload: str) -> dict:
"""
解析PUSH推送事件

:param payload: :class:`str` 载荷字符串
:return: :class:`dict` 解析后的字典信息
"""
data = json.loads(payload)
actor = data['actor']['displayName']
repo_name = data['repository']['name']
repo_url = data['repository']['links']['self'][0]['href']
branch_name = data['changes'][0]['ref']['displayId']
branch_op = data['changes'][0]['type']
mapper = {
'ADD': {
'emphasis_title': '已创建',
'source_desc_color': 1
},
'UPDATE': {
'emphasis_title': '已更新',
'source_desc_color': 3
},
'DELETE': {
'emphasis_title': '已删除',
'source_desc_color': 2
},
}
result = {
**self.__template,
**mapper[branch_op], 'main_title': repo_name,
'main_title_desc': f'分支: {branch_name} 触发者: {actor}',
'card_url': repo_url
}
return result

def _parse_pr(self, payload: str) -> dict:
"""
解析PR事件

:param payload: :class:`str` 载荷字符串
:return: :class:`dict` 解析后的字典信息
"""
data = json.loads(payload)
actor = data['actor']['displayName']
pr_title = data['pullRequest']['title']
pr_desc = data['pullRequest']['description']
pr_state = data['pullRequest']['state']
repo_url = data['pullRequest']['links']['self'][0]['href']
mapper = {
'OPEN': {
'emphasis_title': '已打开',
'source_desc_color': 1
},
'DECLINED': {
'emphasis_title': '已拒绝',
'source_desc_color': 2
},
'MERGED': {
'emphasis_title': '已合并',
'source_desc_color': 3
},
}
result = {
**self.__template,
**mapper[pr_state], 'main_title': pr_title,
'main_title_desc': pr_desc,
'emphasis_desc': f'发起人: {actor}',
'card_url': repo_url
}
return result

def parse_payload(self, event_type: str, payload: str) -> dict:
"""
动态解析载荷字符串方法.

:param event_type: :class:`str` 事件类型, 需要在mapper中定义
:param payload: :class:`str` 载荷字符串
:return: :class:`dict` 解析后结果字典
"""
mapper = {
'repo:refs_changed': self._parse_push,
'pr:opened': self._parse_pr,
'pr:merged': self._parse_pr,
'pr:declined': self._parse_pr
}
self.logger.info('尝试解析事件类型: %s', event_type)
try:
func = mapper[event_type]
except KeyError:
msg = f'无法找到事件类型为: {event_type}的解析方法, 请检查是否已在mapper字典中定义!'
self.logger.critical(msg)
raise ValueError(msg) from KeyError
result = func(payload)
self.logger.info('载荷字符串解析成功!')
self.logger.info('结果: %s', result)
return result


def _parse_event(event: dict) -> dict:
"""
解析事件请求字典

:param event: :class:`dict` 事件请求字典
:return: :class:`dict` 结果字典
"""
if 'headers' in event:
# 使用Lambda集成代理, 同步执行
body = event['body']
body = '{}' if body is None else body
result = {
'synchronize': True,
'X-Event-Key': event['headers']['X-Event-Key'],
'X-Hub-Signature': event['headers'].get('X-Hub-Signature'),
'body': body
}
else:
# 使用Mapping方式, 异步执行
result = {
'synchronize': False,
'X-Event-Key': event['params']['header']['X-Event-Key'],
'X-Hub-Signature': event['params']['header'].get('X-Hub-Signature'),
'body': event['body']
}
return result


def send_notice(event: dict) -> dict:
"""
发送通知功能

:param event: :class:`dict` 事件字典
:return: :class:`dict` 请求结果字典
"""
result = {"headers": {"Content-Type": "application/json"}}
if POST_LAMBDA is None:
logger.critical('未配置%s环境变量! 请配置后处理Lambda的ARN/Name/Partial ARN!',
ENV_POST_LAMBDA)
result.update({
'statusCode': 500,
'body': f'解析lambda函数错误! 未配置{ENV_POST_LAMBDA}环境变量!'
})
else:
logger.info('解析后的事件信息将传递至 %s Lambda函数', POST_LAMBDA)
kwargs = {
'FunctionName': POST_LAMBDA,
'LogType': 'Tail',
'Payload': json.dumps(event, ensure_ascii=False)
}
logger.debug('请求参数信息: %s', kwargs)
response = client.invoke(**kwargs)
logger.debug('返回响应: %s', response)
result.update({
'statusCode': response['StatusCode'],
'body': response['Payload'].read().decode()
})
return result


def lambda_handler(event, context) -> dict:
"""
Lambda入口程序

1. 创建 :class:`BitBucketParser` 实例

#. 使用 :attr:`mapper` 构造检查解析参数, 事件类型对应关系:

- repo:refs_changed ➡️ :meth:`~BitBucketParser._parse_push`
- pr:opened ➡️ :meth:`~BitBucketParser._parse_pr`
- pr:merged ➡️ :meth:`~BitBucketParser._parse_pr`
- pr:declined ➡️ :meth:`~BitBucketParser._parse_pr`

#. 构造解析后结果字典 :attr:`response`

#. 调用发信Lambda, 返回数据

.. seealso::

`事件负载说明`_

.. _`事件负载说明`: https://confluence.atlassian.com/bitbucketserver0515/event-payload-961275434.html

:param event: :class:`dict` 事件字典
:param context: :class:`str` 上下文
:return: :class:`dict` 解析后结果字典
"""
logger.info('事件及上下文信息如下:')
logger.info(event)
logger.info(context)
event = _parse_event(event)
logger.info('解析后的事件字典: %s', event)
parser = BitBucketParser()
event_type = event['X-Event-Key']
# 针对于测试情况, 直接返回结果
if event_type == 'diagnostics:ping':
logger.info('检查连通性功能, 直接返回请求')
response = {
'statusCode': 200,
'body': '{"Status": "Success"}',
"headers": {
"Content-Type": "application/json"
}
}
return response
payload = event['body']
signature = event['X-Hub-Signature']
# 提取机器人Key
key = parser.match_key_by_payload(payload, signature)
# 解析正文信息
body = parser.parse_payload(event_type, payload)
response = {'key': key, **body}
if event['synchronize']:
response = send_notice(response)
logger.info('已完成!')
# 根据chatgpt查询结果, 必须按照如下格式返回结果
response = {"statusCode": 202, "body": response}
logger.info(response)
return response

根据ChatGPT反馈结果, 返回类型必须满足{"statusCode": 202, "body": "<request-response payload>"}格式

ChatGPT回答

因此, 在实现基础发送企微机器人webhook的功能下, 应增加解析Event功能, 可参考以下

1
2
3
4
5
6
7
8
9
10
11
def _parse_event(event: dict) -> dict:
"""
解析Event事件请求, 通过判断事件字典 :attr:`event`
中是否包含指定关键字 ``responsePayload``, 来判断事件来源是源于另外一个Lambda还是直接调用

:param event: :class:`dict` 事件来源字典
:return: :class:`dict` 解析后的字典
"""
if 'responsePayload' in event:
event = event['responsePayload']['body']
return event

结合Nexus使用

目标

实现Nexus支持的自定义Webhook功能

Webhook初探

Webhooks

Enabling A Global Webhook Capability

Enabling A Repository Webhook Capability

创建扩展
两种类型: 全局和单一仓库

全局Webhook

全局配置图

重点需要说明一下①和②:

  1. audit是审计相关, 启用这个类型需要先启用Audit模块, 原官方手册文档描述如下

    A Global: Webhook capability for Event Type audit requires that you also enable the Audit capability, in order for events to be fed to the webhook.

  2. NexusSecret Key也是类似于Bitbucket, 同样都是HMAC算法, 只不过存在以下不同点

    • 采用Sha1哈希算法计算
    • 请求头名称为X-Nexus-Webhook-Signature

    官方手册描述如下:

    Webhooks will provide a sha1 hash of the JSON body using a shared secret key. This allows you to verify the data integrity as well as its authenticity. The JSON body is hashed without whitespace and this hash is provided in the header as X-Nexus-Webhook-Signature.

单仓库Webhook

单仓库配置

重点需要说明一下①和②:

  1. 这个事件类型填写前, 需要先弄明白assetcomponent的概念, 引用ChatGPT解释

    ChatGPT解释

    在Sonatype Nexus中,一个存储库是用于管理构件(Component)和资产(Asset)的集合。

    • Component:指构成软件项目的基本组成部分,如 JAR 包、WAR 包、Maven 依赖等。在Nexus中,构件可以被视为单个文件(例如JAR文件),也可以是一系列与特定软件项目相关联的文件集。
    • Asset:指构成构件的实际物理文件,如JAR包中的class文件、POM文件等。在Nexus中,资产是构件中的实际文件,一般来说,一个构件可能包含多个资产。

    在一个存储库中,Nexus可以帮助我们对构件和资产进行管理、维护和查询,确保软件构建和部署的可靠性和稳定性。

    可能有些迷糊, 看下面实际的图就清楚了

    Component概念
    Asset概念

    可以看到component指一个版本, 类似与组的概念. asset表示的是一个实际的文件或资源

  2. 与上面的全局配置相同

    NexusSecret Key也是类似于Bitbucket, 同样都是HMAC算法, 只不过存在以下不同点

    • 采用Sha1哈希算法计算
    • 请求头名称为X-Nexus-Webhook-Signature

    官方手册描述如下:

    Webhooks will provide a sha1 hash of the JSON body using a shared secret key. This allows you to verify the data integrity as well as its authenticity. The JSON body is hashed without whitespace and this hash is provided in the header as X-Nexus-Webhook-Signature.

如何选择

理解上面的区别后, 我们一般的需求为单仓库类型并且触发类型为component, 原因如下:

  1. 需要通知仓库内容的修改变更而并非仓库本身或者审计相关
  2. 对于实际asset的变化, 大部分情况下是不需要关注的, 原因:
    • 对于docker类型, asset为sha256的镜像, component为镜像的tag
    • 对于pypi类型, asset为实际安装包.whl文件或归档.tar.gz文件, component为包版本

相关流程图如下:

```

因此处理的Lambda需要对结果进行分流

## 设置API Gateway

### 修改Nexus Method Request

首先按照上文中[创建请求](#创建请求)相关内容, 创建名为`nexus`的资源, 和对应`POST`方法的请求, 然后按照下图图示配置方法请求

![创建方法请求](https://blog-static.chowrex.com/pictures/202304231725980.png)

然后创建请求头, 参考下图, Name值为`X-Nexus-Webhook-Signature`

![方法请求头](https://blog-static.chowrex.com/pictures/202304231726982.png)

创建后, **不需要勾选**`Required`, 因为对一个仓库配置多个webhook无法配置相同URL, 详见[下文](), 所以逻辑下放至代码中进行处理

![请求头可选](https://blog-static.chowrex.com/pictures/202304231729063.png)

### 修改Nexus Intgration Request

按照下图图示配置集成请求

![配置集成请求](https://blog-static.chowrex.com/pictures/202304231734461.png)

在弹出的页面, 点击`HTTP Header`, 并添加请求头映射信息, 详细参见文档[^1]

![创建Header映射](https://blog-static.chowrex.com/pictures/202304231736316.png)

添加信息

1. 加密后签名
    - `Name`: *X-Hub-Signature*
    - `Mapped from`: *method.request.header.X-Hub-Signature*
2. AWS异步调用(***强制要求, 否则无法异步调用***)
    - `Name`: *X-Amz-Invocation-Type*
    - `Mapped from`: *'Event'*

结果见下图

![添加集成请求信息](https://blog-static.chowrex.com/pictures/202304231738394.png)

点击`Mapping Templates`, 并添加映射模板信息, 按照下图操作即可, 其中*Content-Type*默认为`application/json`, 更多详细自定义信息, 请参考文档[^3]

![创建映射模板](https://blog-static.chowrex.com/pictures/202304231744483.png)

结果记录

```json
##  See http://docs.aws.amazon.com/apigateway/latest/developerguide/api-gateway-mapping-template-reference.html
##  This template will pass through all parameters including path, querystring, header, stage variables, and context through to the integration endpoint via the body/payload
#set($allParams = $input.params())
{
"body" : "$util.escapeJavaScript($input.body)",
"params" : {
#foreach($type in $allParams.keySet())
    #set($params = $allParams.get($type))
"$type" : {
    #foreach($paramName in $params.keySet())
    "$paramName" : "$util.escapeJavaScript($params.get($paramName))"
        #if($foreach.hasNext),#end
    #end
}
    #if($foreach.hasNext),#end
#end
},
"stage-variables" : {
#foreach($key in $stageVariables.keySet())
"$key" : "$util.escapeJavaScript($stageVariables.get($key))"
    #if($foreach.hasNext),#end
#end
},
"context" : {
    "account-id" : "$context.identity.accountId",
    "api-id" : "$context.apiId",
    "api-key" : "$context.identity.apiKey",
    "authorizer-principal-id" : "$context.authorizer.principalId",
    "caller" : "$context.identity.caller",
    "cognito-authentication-provider" : "$context.identity.cognitoAuthenticationProvider",
    "cognito-authentication-type" : "$context.identity.cognitoAuthenticationType",
    "cognito-identity-id" : "$context.identity.cognitoIdentityId",
    "cognito-identity-pool-id" : "$context.identity.cognitoIdentityPoolId",
    "http-method" : "$context.httpMethod",
    "stage" : "$context.stage",
    "source-ip" : "$context.identity.sourceIp",
    "user" : "$context.identity.user",
    "user-agent" : "$context.identity.userAgent",
    "user-arn" : "$context.identity.userArn",
    "request-id" : "$context.requestId",
    "resource-id" : "$context.resourceId",
    "resource-path" : "$context.resourcePath"
    }
}

使用Lambda处理请求

点击展开
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
解析用于转发Nexus事件通知

- Author: Rex Zhou <879582094@qq.com>
- Created Time: 2023/4/23 16:50
- Copyright: Copyright © 2023 Rex Zhou. All rights reserved.
"""

__version__ = "0.0.1"

__author__ = "Rex Zhou"
__copyright__ = "Copyright © 2023 Rex Zhou. All rights reserved."
__credits__ = [__author__]
__license__ = "None"
__maintainer__ = __author__
__email__ = "879582094@qq.com"

import hashlib
import hmac
import json
import logging
import os
from functools import reduce

import jmespath

# 日志等级
LOG_LEVEL = os.environ.get('LOG_LEVEL', 'info').upper()
# Nexus 加密Key信息
NEXUS_SECRET_KEY = os.environ.get('NEXUS_SECRET_KEY', 'DEFAULT_KEY')
# Nexus 图标URL
NEXUS_ICON = 'https://img1.imgtp.com/2023/04/24/NZImJ7rj.png'

# 机器人ID映射字典
BOT_ID_MAPPER = {
'jupyter': '****************',
'master_data_management': '**********************'
}

if bool(os.environ.get("AWS_SESSION_TOKEN")):
logger = logging.getLogger()
logger.setLevel(LOG_LEVEL)
else:
logging.basicConfig(level=LOG_LEVEL)
logger = logging.getLogger()


def _verify_event(event: dict) -> None:
"""
校验事件请求字典

:param event: :class:`dict` 事件请求字典
:return: :class:`None`
"""
signature = jmespath.search('params.header', event)
logger.debug('全部事件请求头: %s', signature)
signature = signature.get('X-Nexus-Webhook-Signature',
'') if signature else ''
if not signature:
return None
logger.info('正在检查密钥%s哈希结果是否匹配', NEXUS_SECRET_KEY)
digest = hmac.new(NEXUS_SECRET_KEY.encode(), event['body'].encode(),
hashlib.sha1).hexdigest()
logger.debug('十六进制摘要结果: %s', digest)
result = hmac.compare_digest(digest, signature)
logger.info('校验结果: %s', result)
if not result:
msg = f'使用密钥{NEXUS_SECRET_KEY}通过算法{hashlib.sha1}对请求校验失败!'
logger.critical(msg)
raise ValueError(msg)
return None


def parse_payload(payload: str) -> dict:
"""
解析事件有效载荷(Payload)

:param payload: :class:`str` 载荷字符串
:return: :class:`dict` 解析后的字典信息
"""
data = json.loads(payload)
action = data['action']
component = data['component']['name']
# 首先使用filter函数和keys方法过滤出字典中所有包含在字符串中的Key
key = filter(lambda _: _ in component, BOT_ID_MAPPER.keys())
# 然后使用reduce函数获取对应的Value值
key = reduce(lambda a, b: a.get(b, {}), key, BOT_ID_MAPPER)
mapper = {
'CREATED': {
'emphasis_title': '已创建',
'source_desc_color': 3
},
'DELETED': {
'emphasis_title': '已删除',
'source_desc_color': 2
},
'UPDATED': {
'emphasis_title': '已更新',
'source_desc_color': 1
},
}
result = {
'key':
key,
'type':
'card',
'source_icon_url':
NEXUS_ICON,
'source_desc':
'Nexus',
'source_desc_color':
1,
'main_title':
data['repositoryName'],
'main_title_desc':
f"组件: {component}\n版本: {data['component']['version']}",
'emphasis_title':
action,
'emphasis_desc':
f'触发者: {data["initiator"]}',
'card_url':
'http://nexus.goldwind.com.cn/#browse/browse',
**mapper[action]
}
return result


def lambda_handler(event, context) -> dict:
"""
Lambda入口程序

1. 使用 :meth:`_verify_event` 检查时间event参数, 如果使用了密钥则校验密钥

#. 使用 :meth:`parse_payload` 构造解析后结果字典 :attr:`response`

#. 调用发信Lambda, 返回数据

:param event: :class:`dict` 事件字典
:param context: :class:`str` 上下文
:return: :class:`dict` 解析后结果字典
"""
logger.info('事件及上下文信息如下:')
logger.info(event)
logger.info(context)
_verify_event(event)
# 解析正文信息
response = parse_payload(event['body'])
logger.info(response)
logger.info('已完成!')
# 根据chatgpt查询结果, 必须按照如下格式返回结果
response = {"statusCode": 202, "body": response}
logger.info(response)
return response

FAQ

无法创建映射 Invalid mapping expression specified

创建映射失败, 需要现在Method的Request中添加这个header才行

创建失败

无法创建相同URL的webhook

Only one capability of type 'Webhook: Repository', repository 'docker-hosted-gw_images', url 'https://YOUR-KEY.execute-api.cn-north-1.amazonaws.com.cn/PROD/nexus' can be created

不允许相同URL

以上信息说明无法创建相同URL的webhook, 避免后续增加配置复杂度, 相关分流功能应在代码中实现


  1. 使用 API Gateway 控制台设置请求和响应数据映射 - Amazon API Gateway↩︎

  2. API Gateway 映射模板和访问日志记录变量引用 - Amazon API Gateway↩︎

  3. Event payload | Bitbucket Data Center and Server 5.15 | Atlassian Documentation↩︎

  4. 在 API Gateway 中设置 Lambda 代理集成 - Amazon API Gateway↩︎