我在Azure Databricks集群上使用databricks-api运行训练模型的自动作业。我的脚本检查集群,如果它不存在脚本将创建一个新的,否则它将返回现有的id。之后,我的脚本按名称检查作业,如果作业不存在,它会创建一个新的作业,如果作业存在,它会返回现有作业的id,将一个群集连接到该作业,然后运行它,作业完成后,我的脚本会删除所有群集...问题:第一次运行时,它工作正常,但之后当我运行脚本并创建新的集群以附加到我已经创建的作业时,它给我一个错误,即该集群不存在,因为作业一直在寻找旧的集群。在运行作业之前,有没有办法为现有作业更新/分配新的集群?
def authenticateDBWs():
from databricks_api import DatabricksAPI
db = DatabricksAPI(
host="https:",
token=""
)
return db
def createCluster(db, clusterName):
print("Cluster not found... Now trying to create new cluster: ", clusterName)
cluster = db.cluster.create_cluster(
num_workers=0,
cluster_name=clusterName,
spark_version='10.1.x-gpu-ml-scala2.12',
spark_conf= {
"spark.databricks.cluster.profile": "singleNode",
"spark.master": "local[*]"
},
node_type_id="Standard_NC4as_T4_v3",
driver_node_type_id="Standard_NC4as_T4_v3",
autotermination_minutes=20,
enable_elastic_disk=True,
)
clusters = db.cluster.list_clusters(headers=None)
cluster = [d for d in clusters["clusters"] if d['cluster_name'] in clusterName]
return cluster
def getCluster(db, clusterName):
print("Trying to get cluster: ", clusterName)
clusters = db.cluster.list_clusters(headers=None)
clusterArray = [d for d in clusters["clusters"] if d['cluster_name'] in clusterName]
if len(clusterArray) == 0:
clusterArray = createCluster(db, clusterName)
cluster = clusterArray[0]
else:
cluster = clusterArray[0]
return cluster
def startCluster (db, clusterName):
print("Trying to start cluster: ", clusterName)
import time
cluster = getCluster(db,clusterName)
#CHECK IF CLUSTER OBJECT IS EMPTY THEN CREATE CLUSTER
print("Cluster state ...", cluster["state"])
if cluster["state"] == 'TERMINATED':
db.cluster.start_cluster(cluster_id = cluster["cluster_id"])
while cluster["state"] == 'PENDING':
time.sleep(30)
cluster = getCluster(db,clusterName)
if cluster["state"] == 'RUNNING':
status = cluster["state"]
else:
status = 'ERROR'
return status
def getJob(db , jobName , clusterId, modelClass, classInstance):
print("Get Job: ", jobName)
jobs = db.jobs.list_jobs(headers=None)
job = [d for d in jobs["jobs"] if d["settings"]["name"] in jobName]
if len(job) == 0:
print("Job does not exit, going to create it...")
job = createJob(db, jobName, clusterId, modelClass, classInstance )
jobId = job["job_id"]
else:
print("Job already existis")
jobId = job[0]["job_id"]
return jobId
def createJob(db, jobName , clusterId, modelClass, classInstance):
print("Creating Job: "+jobName+" on "+clusterId)
trainModelPath = '/Shared/Databricks-Mlops/Covid-Classifer/train/trainModel'
job = db.jobs.create_job(
name=jobName,
existing_cluster_id=clusterId,
email_notifications=None,
notebook_task = {"notebook_path": trainModelPath,
"base_parameters": { "modelClass": modelClass,"classInstance": classInstance }} ,
timeout_seconds=54000)
return job
def runJob(db, jobId, modelClass, classInstance):
runId = db.jobs.run_now(
job_id=jobId,
notebook_params= {"modelClass": modelClass,"classInstance": classInstance },
)
return runId
def getRunResult(db, runId):
import time
run = db.jobs.get_run(runId)
runState = run["state"]["life_cycle_state"]
while runState == 'RUNNING' or runState == 'PENDING':
print("Training run in progress.. status: ", runState)
time.sleep(30)
run = db.jobs.get_run(runId)
runState = run["state"]["life_cycle_state"]
runOutput = db.jobs.get_run_output(runId)
print('#########runOutput######## ',runOutput)
#print("Run output:", runOutput["metadata"])
return runOutput
#def runTrainingJob(modelClass, classInstance):
def runTrainingJob(arguments):
modelClass=arguments[0]
classInstance=arguments[1]
db = authenticateDBWs()
#clusterName = 'auto_train_covid_clas' + str(modelClass) + '_ins' + str(classInstance)
clusterName = 'train_covid_clas'+str(modelClass)+ '_ins' + str(classInstance)
print('REQUIRED CLUSTER NAME: ',clusterName)
jobName = 'Covid_Class_autojob_' + str(modelClass) + '_Model_V' + str(classInstance)
print('REQUIRED JOB NAME: ', jobName)
cluster = getCluster(db, clusterName)
print('1. getCluster: ',cluster)
clusterStatus = startCluster(db, clusterName)
print("2. Cluster status:", clusterStatus)
print("run the training jobs")
jobId = getJob(db, jobName, cluster["cluster_id"], modelClass, classInstance)
runId = runJob(db , jobId, modelClass, classInstance)
runResult = getRunResult(db, runId["run_id"])
return runResult
返回runId
是的,这是可能的-您需要使用完全更新或部分更新API更新作业定义中的相应设置。您只需要在代码中添加相应的步骤即可进行更新。
但主要问题是——为什么需要在现有集群上运行作业?作业应该完全自动运行,在作业集群(自动创建)上运行作业比在交互式集群上运行要便宜得多(几乎是4倍)。考虑切换到该方法,因为它将彻底消除您原来的问题,因为作业将具有附加的集群定义。
附言另一个选项是使用将创建所有对象的Database ricks Terraform Provider
我有一个JTabbedPane(比如myTabPane),有一个选项卡(为了清晰起见,我们只取一个选项卡)。在创建JTabbedPane时,我在这个选项卡中添加了一个JPanel(比如panel_A)。我在这个JPanel上有一个按钮。这个选项卡完美地显示了我的JPanel,上面有按钮。到目前为止一切都很好。 我已经在按钮上定义了一个侦听器,该侦听器创建了另一个扩展JPanel的类的实例(例如pa
我已经在我的Windows7机器上设置了一个本地spark集群(一个主节点和辅助节点)。我已经创建了一个简单的scala脚本,我用sbt构建了这个脚本,并尝试用Spark-Submit运行这个脚本。请参阅以下资源 Scala代码: 现在,我用sbt构建并打包scala代码,并将其打包到一个JAR中。我的build.sbt文件如下所示 它创建一个jar,我使用spark submit命令提交它,如下
在k8s集群中。如何配置zeppelin在现有spark集群中运行spark作业,而不是旋转一个新的Pod? 我有一个k8s集群正在运行,我想在其中运行与齐柏林飞艇的火花。 Spark使用官方的Bitnami/Spark helm chart(v3.0.0)进行部署。我有一个主舱和两个工人舱运转良好,一切都很好。 短伪DockerFile: 我稍微修改了。(Image,imagePullSecre
所以我现在花了几个小时试图解决这个问题,并希望得到任何帮助。
我有一个在AWS EC2机器上运行的HortonWorks集群,我想在上面运行一个使用spark streaming的spark工作,该工作将吞下tweet concernings《权力的游戏》。在尝试在集群上运行它之前,我确实在本地运行了它。代码正在工作,如下所示: 我的问题更确切地说是关于这段特定代码行: 17/07/24 11:53:42 INFO AppClient$ClientEndpo
这是我拥有的数据帧的简化版本: 在这个 df 中,row.names 是唯一的 ID(我知道它打破了整洁数据的规则)。 在示例中,我们可以看到行id1和行id2是重复的。 我想做的是确定它们是重复的,并为这些重复项分配一个唯一的组名称。但请注意,将有多行彼此重复。 我希望的产出是: 有什么想法吗? 编辑: 我的原始数据示例: