最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

除非我增加写入容量,否则DynamoDB javascript SDK batchWriteItem不会完成

运维笔记admin10浏览0评论

除非我增加写入容量,否则DynamoDB javascript SDK batchWriteItem不会完成

除非我增加写入容量,否则DynamoDB javascript SDK batchWriteItem不会完成

我正在运行一系列单元测试(node.js 4.x,aws-sdk,mocha),它们在每次测试之前将数据加载到表中,然后在测试后清除表。

我有两个测试失败,因为ConditionExpression触发ConditionCheckFailedException。但是,如果我增加读/写容量,他们测试通过。

我的理解是SDK处理限制异常并为你重试它们,为什么我的测试不会运行得慢而且通过?相反,似乎测试无法完成scan - > batchWriteItem进程,因此当新测试开始时,表中仍有记录。

我被团队成员告知他们已经看到类似的问题,他们只是增加了解决问题的吞吐量。这不适合我。要么我做错了,我的测试有竞争条件,或者我应该有一个模式可以确保我的操作在受到限制时完成?我应该能够使用限制指标来告知何时需要增加吞吐量,但我仍然可以继续重试,直到内存不足为止。

有没有其他人遇到这个问题,你做了什么来处理这个问题?

回答如下:

经过一些调试后,我注意到了UnprocessedItems响应元素。查找UnprocessedItems in the docs后,我意识到我应该更仔细地阅读。下面的代码将运行一个带延迟的重试循环(指数退避):

var clearEventTable = function (tableName, client, cleared) {
  var exclusiveStartKey = null;
  var retryCount = 0;

  var read = function(query, callback) {
    client.scan(query, function (err, page) {
      if(err) {
        console.log(err);
        return callback(err);
      }

      retryCount = 0;
      exclusiveStartKey = page.LastEvaluatedKey || null;
      if(page.Count == 0) {
        return callback(null, {});
      }

      if(page.Count < 25 && exclusiveStartKey) {
        console.log("read capacity limit reached: " + JSON.stringify(page, null, 2));
      }

      var keys = _.map(page.Items, function(n) {
        return { DeleteRequest: { Key: n } };
      });

      var batch = {
        RequestItems: {},
        ReturnConsumedCapacity: "INDEXES",
        ReturnItemCollectionMetrics: "SIZE"
      };

      batch.RequestItems[tableName] = keys;

      callback(null, batch);
    });
  };

  var write = function(batch, callback) {
    if(batch && batch.RequestItems){
      client.batchWriteItem(batch, function(err, result) {
        if(err) {
          console.log(err);
          return callback(err);
        }

        if(Object.keys(result.UnprocessedItems).length !== 0) {
          console.log("Retry batchWriteItem: " + JSON.stringify(result, null, 2));
          retryCount++;
          var retry = {
            RequestItems: result.UnprocessedItems,
            ReturnConsumedCapacity: "INDEXES",
            ReturnItemCollectionMetrics: "SIZE"
          };
          // retry with exponential backoff
          var delay = retryCount > 0 ? (50 * Math.pow(2, retryCount - 1)) : 0;
          setTimeout(write(retry, callback), delay);
          return;
        }

        callback(null, result);
      });
    } else {
      callback(null);
    }
  };

  var params = {
    TableName: tableName,
    ProjectionExpression: "aggregateId,id",
    Limit: 25, // max 25 per batchWriteItem 
    ConsistentRead: false,
    ReturnConsumedCapacity: "TOTAL"
  };

  async.doWhilst(function (next) {
    // retrieve entities
    if (exclusiveStartKey)
      params.ExclusiveStartKey = exclusiveStartKey;

    asyncpose(write, read)(params, function (err, result) {
      if (err) next(err);
      else next(null, result);
    });
  }, function () {
    // test if we need to load more
    return exclusiveStartKey !== null;
  }, function (err, r) {
    // return results
    if (err) {
      console.log(err);
      return cleared(err);
    }
    return cleared(null);;
  });
};
发布评论

评论列表(0)

  1. 暂无评论