はじめに
AWS LambdaでNode.js 6.10が利用できるようになりましたね。以前の投稿でPromiseの使い方について紹介しました。今回、DynamoDBからデータを取得する方法をはじめ、非同期処理を直列的・並列的に実行する方法をパターン化してみました。
Promiseを使用したひな型
トップレベルではタスクは必ず直列に実行されるようにします。というのは、初期化→なんらかの処理→レスポンス、という流れは不変のはず、という観点から。私としてはすべての処理をタスクとして扱い、PromiseLikeにしてしまう考え方で進めたいと思います。タスク間のやり取りには処理全体を通して利用できる変数stashを利用することにします。このstashは個々のタスクの引数として渡され、またタスクの実行結果として返されるようにします。この考え方で作成したひな型は下記の様になります。
ソースコード
'use strict';
const taskInitialize = (event, context) => {
return new Promise((resolve, reject) => {
const stash = {}; // 必要があればeventやcontextから目的の値を格納する
resolve(stash);
});
}
const taskDoSomething = (stash) => {
return new Promise((resolve, reject) => {
resolve(stash);
});
}
exports.handler = (event, context, callback) => {
taskInitialize(event, context)
.then(taskDoSomething)
.then(callback.bind(null, null))
.catch(callback);
};
解説
stashは初期化(taskInitialize)で生成して、以降このstashはタスクをまたいで使用します。タスクの引数と返り値をすべてstashで統一しているおかげで、thenの中がすっきりしています。省略せずに記述すると、下記の様になります。
exports.handler = (event, context, callback) => {
taskInitialize(event, context)
.then((stash) => { return taskDoSomething(stash) })
.then((stash) => { callback(null, stash) })
.catch((error) => { callback(error) });
};
全体の流れは最後のthenのつながりを見れば一目瞭然ですね。最後まで正常に完了すれば、最後のthenにあるcallbackを正常終了として呼び出します。もしthenを進める途中でエラーが発生する(個々のタスクでrejectを実行する)と、直ちにcatchへ移り、callbackを異常終了として呼び出します。これで、Lambda関数として最低限の”お作法”に従うことができます。
直列処理の例
プログラム概要
直列処理とは、初期化→タスク1→タスク2→(中略)→レスポンス といった、順次実行タイプの構造です。具体的例として、S3バケットにファイルを1つアップロードして、アップロードしたファイル名をSNSトピックに送信する、というプログラムを作ってみます。
タスクとして洗い出すと、下記ようになります。
- 初期化する taskInitialize (環境変数のバリデーション、共通の変数 stash の生成)
- S3バケットにファイルを1つアップロードする taskPutObject
- アップロードしたファイル名をSNSトピックに送信する taskPublish
- レスポンス
環境変数
ファイルをアップロードする先のS3バケット名と、実行結果を送信する先のSNSトピックのARNは環境変数として与えることとします。
環境変数名 | 説明 |
---|---|
BucketName | S3バケット名 |
TopicArn | SNSトピックのARN |
ソースコード
ソースコードは下記の様になります。
'use strict';
const AWS = require('aws-sdk');
const s3 = new AWS.S3();
const sns = new AWS.SNS();
const taskInitialize = (event, context) => {
return new Promise((resolve, reject) => {
// バリデーション実行
if (!process.env.BucketName) {
reject('Specify env.BucketName');
return;
}
if (!process.env.TopicArn) {
reject('Specify env.TopicArn');
return;
}
const stash = {
env: { // バリデーション、クリーンアップ済みの環境変数
BucketName: process.env.BucketName,
TopicArn: process.env.TopicArn
},
body: { DevelSampleName: 'No.1' },
objectKey: 'Data/DevelSample_1.json',
uploadedKeys: [] // アップロード済みファイル名
};
resolve(stash);
});
};
const taskPutObject = (stash) => {
return new Promise((resolve, reject) => {
const params = {
Body: JSON.stringify(stash.body),
Bucket: stash.env.BucketName, // クリーンアップ前のprocess.env.BucketNameは使わないようにする
Key: stash.objectKey,
ServerSideEncryption: 'AES256',
Tagging: ''
};
s3.putObject(params, (error, data) => {
if (error) {
reject(error);
return;
}
stash.uploadedKeys.push(stash.objectKey); // S3へのアップロードが正常に完了したらファイル名をstashに格納する
resolve(stash);
});
});
};
const taskPublish = (stash) => {
return new Promise((resolve, reject) => {
const params = {
Subject: '[DevelSample] result for No.1',
Message: JSON.stringify({Objects: stash.uploadedKeys}, null, 4),
TopicArn: stash.env.TopicArn
};
sns.publish(params, (error, data) => {
if (error) {
reject(error);
return;
}
resolve(stash);
});
});
}
exports.handler = (event, context, callback) => {
taskInitialize(event, context)
.then(taskPutObject)
.then(taskPublish)
.then(callback.bind(null, null))
.catch(callback);
};
解説
うまく実行できましたか?下記の様なメッセージがSNSトピックに通知されたはずです。
{
"Objects": [
"Data/DevelSample_1.json"
]
}
環境変数の指定が必要なので、taskInitializeでバリデーションとクリーンアップ(といってもコピーしているだけですが)をしています。あとはstashを読み書きしてタスクを進めているだけです。タスク1つをとってみると単純な内容となっています。全体として、見通しの良いプログラムになったのではないでしょうか。
並列処理の例
プログラム概要
さて、ここからはループ処理の実装に進みます。Node.jsを採用しているAWS SDK for javascriptでは個々のタスクは基本的に非同期処理で実装します。実行時間短縮のため、もし並列化できるのであれば複数タスクを同時実行したいものです。先のサンプルから、今度はS3に複数ファイルを並列にアップロードするようにしてみます。とはいえ、トップレベルでは直列を維持します。並列化するのは、個々のタスクの内部だけにとどめるようにします。
タスクとして洗い出すと、下記ようになります。
- 初期化する taskInitialize (環境変数のバリデーション、共通の変数 stash の生成)
- S3バケットに複数のファイルアップロードする taskPutObjects
- S3バケットに1つのファイルをアップロードする subtaskPutObject
- アップロードしたファイル名をSNSトピックに送信する taskPublish
- レスポンス
環境変数
S3バケット名とSNSトピックのARNは環境変数として与えることとします。
環境変数名 | 説明 |
---|---|
BucketName | S3バケット名 |
TopicArn | SNSトピックのARN |
ソースコード
'use strict';
const AWS = require('aws-sdk');
const s3 = new AWS.S3();
const sns = new AWS.SNS();
const taskInitialize = (event, context) => {
return new Promise((resolve, reject) => {
if (!process.env.BucketName) {
reject('Specify env.BucketName');
return;
}
if (!process.env.TopicArn) {
reject('Specify env.TopicArn');
return;
}
const stash = {
env: {
BucketName: process.env.BucketName,
TopicArn: process.env.TopicArn
},
items: [
{
body: { DevelSampleName: 'No.2' },
objectKey : 'Data/DevelSample_2_1.json'
},
{
body: { DevelSampleName: 'No.2' },
objectKey : 'Data/DevelSample_2_2.json'
},
{
body: { DevelSampleName: 'No.2' },
objectKey : 'Data/DevelSample_2_3.json'
},
{
body: { DevelSampleName: 'No.2' },
objectKey : 'Data/DevelSample_2_4.json'
},
{
body: { DevelSampleName: 'No.2' },
objectKey : 'Data/DevelSample_2_5.json'
},
{
body: { DevelSampleName: 'No.2' },
objectKey : 'Data/DevelSample_2_6.json'
},
{
body: { DevelSampleName: 'No.2' },
objectKey : 'Data/DevelSample_2_7.json'
},
{
body: { DevelSampleName: 'No.2' },
objectKey : 'Data/DevelSample_2_8.json'
},
{
body: { DevelSampleName: 'No.2' },
objectKey : 'Data/DevelSample_2_9.json'
},
{
body: { DevelSampleName: 'No.2' },
objectKey : 'Data/DevelSample_2_10.json'
}
],
uploadedKeys: []
};
resolve(stash);
});
};
const taskPutObjects = (stash) => {
return new Promise((resolve, reject) => {
const subtaskPutObject = (anItem) => {
return new Promise((resolve, reject) => {
const params = {
Body: JSON.stringify(anItem.body),
Bucket: stash.env.BucketName,
Key: anItem.objectKey,
ServerSideEncryption: 'AES256',
Tagging: ''
};
s3.putObject(params, (error, data) => {
if (error) {
reject(error);
return;
}
stash.uploadedKeys.push(anItem.objectKey);
resolve();
});
});
}
Promise.all(stash.items.map(subtaskPutObject)) // ここで並列に実行
.then(resolve.bind(null, stash))
.catch(reject);
});
};
const taskPublish = (stash) => {
return new Promise((resolve, reject) => {
const params = {
Subject: '[DevelSample] stash for No.2',
Message: JSON.stringify({Objects: stash.uploadedKeys}, null, 4),
TopicArn: process.env.TopicArn
};
sns.publish(params, (error, data) => {
if (error) {
reject(error);
return;
}
resolve(stash);
});
});
}
exports.handler = (event, context, callback) => {
taskInitialize(event, context)
.then(taskPutObjects)
.then(taskPublish)
.then(callback.bind(null, null))
.catch(callback);
};
解説
いかがでしょうか。taskPutObject 改め、taskPutObjects として、複数ファイルを並列でアップロードするようにしました。taskPutObjects の中で、ファイル1つのアップロードを担当するサブタスク subtaskPutObject を定義して、Promise.all()で一斉にサブタスクを開始させています。すべてのサブタスクが完了すると、メインのタスク taskPutObjectsの完了としてresolveを呼び出しています。
ちなみに、Promise.all()周りのコードは省略せずに書くと下記の様になります。
(前略)
Promise.all(stash.items.map((anItem) => { return subtaskPutObject(anItem) })
.then((results) => { resolve(stash) }) // サブタスクの実行結果はstash.uploadedKeysに保存したので、resultsは捨てることができる
.catch((error) => { reject(error) });
(後略)
S3バケットへのファイルのアップロードは並列化されているため、実行結果をみると必ずしもstash.itemsの順番通りにアップロードされるわけではないことが確認できると思います。
{
"Objects": [
"Data/DevelSample_2_1.json",
"Data/DevelSample_2_2.json",
"Data/DevelSample_2_3.json",
"Data/DevelSample_2_4.json",
"Data/DevelSample_2_5.json",
"Data/DevelSample_2_6.json",
"Data/DevelSample_2_8.json",
"Data/DevelSample_2_10.json",
"Data/DevelSample_2_7.json",
"Data/DevelSample_2_9.json"
]
}
回数不明の直列処理の例
プログラム概要
DynamoDBを扱う際、scanで1回に取得できるデータは最大1MBまでという制限があります。全件取得するような処理を考えると、全部で何回取得する必要があるのかわからないなか、末尾まで繰り返しscanする事になります。このように、繰り返す回数が不明な場合のプログラムを作成してみます。1MBを超えるテストデータを作成するのも手間なので、最大10件しか取得しないように追加で制限をかけて、回数不明の直列処理をしてみます。(※)
(※) DynamoDBの使い方として、全件取得するような、短時間に大量のデータ件数を参照するような処理は本来望ましくありません。リードキャパシティを瞬間的に消費してしまい、他のプログラムのリード処理を妨げてしまうためです。DataPipelineなどを利用して、流量制限を設けて取得するのがベターです。
タスクとして洗い出すと、下記ようになります。
- 初期化する taskInitialize (環境変数のバリデーション、共通の変数 stash の生成)
- DynamoDBからすべてのデータを取得する taskGetItems
- 最大10件ずつデータを取得する subtaskScan
- 取得したデータをSNSトピックに送信する taskPublish
- レスポンス
環境変数
DynamoDBテーブル名と、実行結果を送信する先のSNSトピックのARNは環境変数として与えることとします。
環境変数名 | 説明 |
---|---|
TableName | DynamoDBテーブル名 |
TopicArn | SNSトピックのARN |
ソースコード
ソースコードは下記の様になります。
'use strict';
const AWS = require('aws-sdk');
const dynamodb = new AWS.DynamoDB();
const sns = new AWS.SNS();
const taskInitialize = (event, context) => {
return new Promise((resolve, reject) => {
if (!process.env.TableName) {
reject('Specify env.TableName');
return;
}
if (!process.env.TopicArn) {
reject('Specify env.TopicArn');
return;
}
const stash = {
env: {
TableName: process.env.TableName,
TopicArn: process.env.TopicArn
},
items: []
};
resolve(stash);
});
};
const taskGetItems = (stash) => {
return new Promise((resolve, reject) => {
const subtaskScan = (exclusiveStartKey) => {
return new Promise((resolve, reject) => {
if (!exclusiveStartKey) { // exclusiveStartKeyがnullならこのサブタスクを完了する
resolve();
return;
}
const params = {
TableName: stash.env.TableName,
Limit: 10
};
if (exclusiveStartKey !== 'firsttime') { // 1回目のscanでなければExclusiveStartKeyを指定する
params['ExclusiveStartKey'] = exclusiveStartKey;
}
dynamodb.scan(params, (error, data) => {
if (error) {
reject(error);
return;
}
Array.prototype.push.apply(stash.items, data.Items);
//console.log(stash.items.length);
subtaskScan(data.LastEvaluatedKey) // 2回目以降のscanを実行する(再帰的実行)
.then(resolve) // 後続のサブタスクが完了すれば、このサブタスクは完了する
.catch(reject);
});
});
}
subtaskScan('firsttime') // 1回目のscanを実行する
.then(resolve.bind(null, stash)) // すべてのサブタスクが完了すれば、メインタスクtaskGetItemsは完了する
.catch(reject);
});
};
const taskPublish = (stash) => {
return new Promise((resolve, reject) => {
const params = {
Subject: '[DevelSample] stash for No.3',
Message: JSON.stringify({Objects: stash.items}, null, 4),
TopicArn: process.env.TopicArn
};
sns.publish(params, (error, data) => {
if (error) {
reject(error);
return;
}
resolve(stash);
});
});
}
exports.handler = (event, context, callback) => {
taskInitialize(event, context)
.then(taskGetItems)
.then(taskPublish)
.then(callback.bind(null, null))
.catch(callback);
};
解説
実行の前に、予め Dynamoテーブルに11件以上、できれば30件ほどデータを保存しておいてください。
すべてのデータを取得する(LastEvaluatedKeyがnullになる=続きのデータがなくなる)まで取得処理を繰り返すよう、サブタスクsubtaskScanを再帰的に呼び出すことで、回数不明の直列処理を記述してみました。Promiseを使う場合でも再帰は定石どおりに記述することができますね。
再帰の終了条件を判定している箇所(コメント”exclusiveStartKeyがnullならこのサブタスクを完了する”の箇所)で、Lambdaの残り実行時間(context.getRemainingTimeInMillis())やDynamoDBのリードキャパシティ使用量(data.ConsumedCapacity)も判定すれば、任意のタイミングでデータ取得を中断できるようになります。大量のデータがある場合、Lambdaの実行時間上限を超える前に中断して、続きは中断したところから再開できるように発展させることもできます。
応用
さて、これまで取り上げてきた、直列処理、並列処理、回数不明の直列処理を組み合わせて、少し実用的なプログラムを作成してみます。
プログラム概要
DynamoDBテーブルのデータをS3にバックアップするバックアッププログラムを作成します。DynamoDBからデータを10件ずつ取得します。すべてのデータを取得し終えたら、1ファイルにまとめてS3に保存します。アップロードし終えたらSNSトピックに結果を送信します。
タスクとして洗い出すと、下記ようになります。
- 初期化する taskInitialize (環境変数のバリデーション、共通の変数 stash の生成)
- DynamoDBからすべてのデータを取得する taskGetItems
- 最大10件ずつデータを取得する subtaskScan
- 取得したデータをS3に保存する taskPutObject
- アップロードしたファイル名をSNSトピックに送信する taskPublish
- レスポンス
環境変数
S3バケット名とDynamoDBテーブル名、SNSトピックのARNは環境変数として与えることとします。
環境変数名 | 説明 |
---|---|
BucketName | S3バケット名 |
TableName | DynamoDBテーブル名 |
TopicArn | SNSトピックのARN |
ソースコード
ソースコードは下記の様になります。
'use strict';
const AWS = require('aws-sdk');
const dynamodb = new AWS.DynamoDB();
const s3 = new AWS.S3();
const sns = new AWS.SNS();
const taskInitialize = (event, context) => {
return new Promise((resolve, reject) => {
if (!process.env.BucketName) {
reject('Specify env.BucketName');
return;
}
if (!process.env.TableName) {
reject('Specify env.TableName');
return;
}
if (!process.env.TopicArn) {
reject('Specify env.TopicArn');
return;
}
const stash = {
env: {
BucketName: process.env.BucketName,
TableName: process.env.TableName,
TopicArn: process.env.TopicArn
},
items: [],
objectKey: 'Data/DevelSample_4.json',
uploadedKeys: []
};
resolve(stash);
});
};
const taskGetItems = (stash) => {
return new Promise((resolve, reject) => {
const subtaskScan = (exclusiveStartKey) => {
return new Promise((resolve, reject) => {
if (!exclusiveStartKey) {
resolve();
return;
}
const params = {
TableName: stash.env.TableName,
Limit: 10
};
if (exclusiveStartKey !== 'firsttime') {
params['ExclusiveStartKey'] = exclusiveStartKey;
}
dynamodb.scan(params, (error, data) => {
if (error) {
reject(error);
return;
}
Array.prototype.push.apply(stash.items, data.Items);
//console.log(stash.items.length);
subtaskScan(data.LastEvaluatedKey)
.then(resolve)
.catch(reject);
});
});
}
subtaskScan('firsttime')
.then(resolve.bind(null, stash))
.catch(reject);
});
};
const taskPutObject = (stash) => {
return new Promise((resolve, reject) => {
const params = {
Body: JSON.stringify(stash.items),
Bucket: stash.env.BucketName,
Key: stash.objectKey,
ServerSideEncryption: 'AES256',
Tagging: ''
};
s3.putObject(params, (error, data) => {
if (error) {
reject(error);
return;
}
stash.uploadedKeys.push(stash.objectKey);
resolve(stash);
});
});
}
const taskPublish = (stash) => {
return new Promise((resolve, reject) => {
const params = {
Subject: '[DevelSample] stash for No.4',
Message: JSON.stringify({Objects: stash.uploadedKeys}, null, 4),
TopicArn: stash.env.TopicArn
};
sns.publish(params, (error, data) => {
if (error) {
reject(error);
return;
}
resolve(stash);
});
});
}
exports.handler = (event, context, callback) => {
taskInitialize(event, context)
.then(taskGetItems)
.then(taskPutObject)
.then(taskPublish)
.then(callback.bind(null, null))
.catch(callback);
};
解説
これまで出てきたパターンの組み合わせで実現できました。DynamoDBのリードキャパシティを飽和させてしまうかもしれない問題、データが多すぎてLambda関数のタイムアウト上限300秒に引っかかってしまうかもしれない問題など、実用するにはまだまだ課題がたくさんあります。とはいえ、データがさほど多くなければ十分実用的なプログラムとなったのではないでしょうか。
おわりに
いかがでしょうか。Promiseを上手に使って、サーバーレスシステムの構築に一歩前進したのではないでしょうか。直列処理ができるようになり、また必要な個所では並列処理にすることも簡単です。今回取り上げたパターンでだいたいのことは実現できるかと思うのですが、こんなパターンはどうするの?みたいなツッコミお待ちしています。