我正在Scala中编写一个Spark作业,它读取S3上的parquet文件,执行一些简单的转换,然后将它们保存到DynamoDB实例中。每次运行时,我们都需要在Dynamo中创建一个新的表,所以我编写了一个Lambda函数,它负责表的创建。Spark作业所做的第一件事是生成一个表名,调用我的Lambda函数(将新表名传递给它),等待创建表,然后正常地执行ETL步骤。
但是,看起来我的Lambda函数总是被调用两次。我无法解释。下面是代码的示例:
def main(spark: SparkSession, pathToParquet: String) {
// generate a unique table name
val tableName = generateTableName()
// call the lambda function
val result = callLambdaFunction(tableName)
// wait for the table to be created
waitForTableCreation(tableName)
// normal ETL pipeline
var parquetRDD = spark.read.parquet(pathToParquet)
val transformedRDD = parquetRDD.map((row: Row) => transformData(row), encoder=kryo[(Text, DynamoDBItemWritable)])
transformedRDD.saveAsHadoopDataset(getConfiguration(tableName))
spark.sparkContext.stop()
}
def waitForTableCreation(tableName: String) {
val client: AmazonDynamoDB = AmazonDynamoDBClientBuilder.defaultClient()
val waiter: Waiter[DescribeTableRequest] = client.waiters().tableExists()
try {
waiter.run(new WaiterParameters[DescribeTableRequest](new DescribeTableRequest(tableName)))
} catch {
case ex: WaiterTimedOutException =>
LOGGER.error("Timed out waiting to create table: " + tableName)
throw ex
case t: Throwable => throw t
}
}
lambda调用同样简单:
def callLambdaFunction(tableName: String) {
val myLambda = LambdaInvokerFactory.builder()
.lambdaClient(AWSLambdaClientBuilder.defaultClient)
.lambdaFunctionNameResolver(new LambdaByName(LAMBDA_FUNCTION_NAME))
.build(classOf[MyLambdaContract])
myLambda.invoke(new MyLambdaInput(tableName))
}
正如我所说,当我在这段代码上运行spark-submit
时,它肯定会碰到Lambda函数。但我无法解释为什么它会击中它两次。结果是在DynamoDB中提供了两个表。
在将此作为Spark作业运行的上下文中,等待步骤似乎也失败了。但是当我对等待的代码进行单元测试时,它似乎可以自己正常工作。它成功地阻止,直到表准备就绪。
下面是我的getconfiguration
函数的代码:
def getConfiguration(tableName: String) : JobConf = {
val conf = new Configuration()
conf.set("dynamodb.servicename", "dynamodb")
conf.set("dynamodb.input.tableName", tableName)
conf.set("dynamodb.output.tableName", tableName)
conf.set("dynamodb.endpoint", "https://dynamodb.us-east-1.amazonaws.com")
conf.set("dynamodb.regionid", "us-east-1")
conf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
conf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
new JobConf(conf)
}
另外,这里有一个要点,其中包含了一些我在尝试运行此代码时看到的异常日志。
感谢@soapergem添加日志记录和选项。我添加了一个答案(一个尝试),因为它可能比注释长一点:)
总结一下:
spark-submit
和配置选项没有什么奇怪的地方Caused by: com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException: Requested resource not found: Table: EmrTest_20190708143902 not found (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ResourceNotFoundException; Request ID: V0M91J7KEUVR4VM78MF5TKHLEBVV4KQNSO5AEMVJF66Q9ASUAAJG)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.doInvoke(AmazonDynamoDBClient.java:4243)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:4210)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.executeDescribeTable(AmazonDynamoDBClient.java:1890)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.describeTable(AmazonDynamoDBClient.java:1857)
at org.apache.hadoop.dynamodb.DynamoDBClient$1.call(DynamoDBClient.java:129)
at org.apache.hadoop.dynamodb.DynamoDBClient$1.call(DynamoDBClient.java:126)
at org.apache.hadoop.dynamodb.DynamoDBFibonacciRetryer.runWithRetry(DynamoDBFibonacciRetryer.java:80)
dynamodb:describe*
关于资源:*
(如果是这个原因,为了最小特权原则,您应该在生产中使用somthing资源:test_emr*
)dynamodb:descripbe*
并检查是否得到与gist中相同的堆栈跟踪问题内容: 有没有一种Python方式可以只运行一个程序实例? 我想出的唯一合理的解决方案是尝试将其作为服务器在某个端口上运行,然后尝试将第二个程序绑定到同一端口-失败。但这不是一个好主意,也许有比这更轻巧的东西了吗? (考虑到程序有时可能会失败,例如segfault-因此“锁定文件”之类的东西将无法工作) 问题答案: 以下代码可以完成此工作,它是跨平台的,并且可以在Python 2.4-3.2上
问题内容: 有没有一种Python方式可以只运行一个程序实例? 我想出的唯一合理的解决方案是尝试将其作为服务器在某个端口上运行,然后尝试将第二个程序绑定到同一端口-失败。但这不是一个好主意,也许有比这更轻巧的东西了吗? (考虑到程序有时可能会失败,例如segfault-因此“锁定文件”之类的东西将无法工作) 问题答案: 以下代码可以完成此工作,它是跨平台的,并且可以在Python 2.4-3.2上
问题内容: 如何强制一个表只有一行?下面是我尝试过的。该触发器可以工作,但是,触发绝对不会。对于CREATE,我想使用,但是SQLite不支持。 问题答案: 通常,要限制表中的行数,必须防止任何进一步的插入。在SQLite中,这是通过RAISE()完成的: 但是,如果限制为1,则可以简单地将主键约束为固定值:
使用指南 - 代码安装 - 安装方法 - 如何手动安装代码 只有在正确地添加了百度统计代码后,才能获取尽可能准确的流量数据,代码安装过程中需要注意以下几点: 代码的安装位置要正确,请将异步分析代码安装在标签</head>标记前。 一个页面中不要重复安装相同的代码,统计有去重规则,一般不会重复计算。一段代码生效后,另外一段代码就会废弃,但建议只安装一段代码。 不要对代码有任何编辑操作,随意编辑代码会
使用指南 - 代码安装 - 安装方法 - 如何自动安装代码 如果您是直接向空间提供商购买的网站,请向该空间提供商询问您网站的FTP用户名和密码。 如果您是委托建站公司为您建立的网站,请提前询问建站公司,网站的FTP用户名和密码,一般该类公司在建站完成后,都会以邮件形式向您发送网站FTP用户名和密码,请您仔细查询以获得您网站的FTP用户名和密码。 如果您网站有专门人员进行管理,请向他们询问,以获得网
我有一个docker DB安装方法,它目前位于中。目前,构造如下 有多个测试类都扩展了这个测试超类,每个测试类将构造一个容器,并在完成后将其删除。因此,maven需要花费大量时间来管理docker。(创建和删除) 我的问题是, 我可能想要实现的理想情况是,这个容器创建 我有一些不完整的想法: 在SpringBoot主类中添加构造函数触发器,如果它是由Test启动的,则运行Docker容器构造函数。