当前位置: 首页 > 知识库问答 >
问题:

在运行之前,将不同的集群分配给azure databricks上的现有作业

高自怡
2023-03-14

我在Azure Databricks集群上使用databricks-api运行训练模型的自动作业。我的脚本检查集群,如果它不存在脚本将创建一个新的,否则它将返回现有的id。之后,我的脚本按名称检查作业,如果作业不存在,它会创建一个新的作业,如果作业存在,它会返回现有作业的id,将一个群集连接到该作业,然后运行它,作业完成后,我的脚本会删除所有群集...问题:第一次运行时,它工作正常,但之后当我运行脚本并创建新的集群以附加到我已经创建的作业时,它给我一个错误,即该集群不存在,因为作业一直在寻找旧的集群。在运行作业之前,有没有办法为现有作业更新/分配新的集群?

def authenticateDBWs():
  from databricks_api import DatabricksAPI
  db = DatabricksAPI(
      host="https:",
      token=""
  )
  return db

def createCluster(db, clusterName):
  print("Cluster not found... Now trying to create new cluster: ", clusterName)
  cluster = db.cluster.create_cluster(
    num_workers=0,
    cluster_name=clusterName,
    spark_version='10.1.x-gpu-ml-scala2.12',
    spark_conf= {
        "spark.databricks.cluster.profile": "singleNode",
        "spark.master": "local[*]"
    },
    node_type_id="Standard_NC4as_T4_v3",
    driver_node_type_id="Standard_NC4as_T4_v3",
    autotermination_minutes=20,
    enable_elastic_disk=True,
    )
  clusters = db.cluster.list_clusters(headers=None)
  cluster = [d for d in clusters["clusters"] if d['cluster_name'] in clusterName]
  return cluster
  
def getCluster(db, clusterName):
  print("Trying to get cluster: ", clusterName)
  clusters = db.cluster.list_clusters(headers=None)
  clusterArray = [d for d in clusters["clusters"] if d['cluster_name'] in clusterName]
  if len(clusterArray) == 0:
    clusterArray = createCluster(db, clusterName)
    cluster =   clusterArray[0]
  else:
    cluster =   clusterArray[0]
  
  return cluster


def startCluster (db, clusterName):
  print("Trying to start cluster: ", clusterName)
  import time
  cluster = getCluster(db,clusterName)
  #CHECK IF CLUSTER OBJECT IS EMPTY THEN CREATE CLUSTER
  
  print("Cluster state ...", cluster["state"])
  if cluster["state"] == 'TERMINATED':
    db.cluster.start_cluster(cluster_id  = cluster["cluster_id"])
  while cluster["state"] == 'PENDING':
    time.sleep(30)
    cluster = getCluster(db,clusterName)
    
  if cluster["state"] == 'RUNNING':
    status = cluster["state"]
  else:
    status = 'ERROR'
  return status
  

def getJob(db , jobName , clusterId, modelClass, classInstance):
  print("Get Job: ", jobName)
  jobs = db.jobs.list_jobs(headers=None)
  job = [d for d in jobs["jobs"] if d["settings"]["name"] in jobName]
  if len(job) == 0:
    print("Job does not exit, going to create it...")
    job = createJob(db, jobName, clusterId, modelClass, classInstance )
    jobId = job["job_id"]
  else:
    print("Job already existis")
    jobId = job[0]["job_id"]
  return jobId
  

def createJob(db, jobName , clusterId, modelClass, classInstance):
  print("Creating Job: "+jobName+" on "+clusterId)
  trainModelPath = '/Shared/Databricks-Mlops/Covid-Classifer/train/trainModel'
  job = db.jobs.create_job(
    name=jobName,
    existing_cluster_id=clusterId,
    email_notifications=None,
    notebook_task = {"notebook_path": trainModelPath,
                     "base_parameters": { "modelClass":  modelClass,"classInstance": classInstance }} ,
    timeout_seconds=54000)
  return job


def runJob(db, jobId, modelClass, classInstance):
  runId = db.jobs.run_now(
    job_id=jobId,
    notebook_params= {"modelClass":  modelClass,"classInstance": classInstance },
  )
  return runId 


def getRunResult(db, runId):
  import time
  run = db.jobs.get_run(runId)
  runState = run["state"]["life_cycle_state"]
  while runState == 'RUNNING' or runState == 'PENDING':
    print("Training run in progress.. status: ", runState)
    time.sleep(30)
    run = db.jobs.get_run(runId)
    runState = run["state"]["life_cycle_state"]
  

  runOutput  = db.jobs.get_run_output(runId)
  print('#########runOutput######## ',runOutput)
  #print("Run output:", runOutput["metadata"])
  return runOutput
    
#def runTrainingJob(modelClass, classInstance):
def runTrainingJob(arguments):
  
  modelClass=arguments[0]
  classInstance=arguments[1]
  
  db = authenticateDBWs()
  #clusterName = 'auto_train_covid_clas' + str(modelClass) + '_ins' + str(classInstance)
  clusterName = 'train_covid_clas'+str(modelClass)+ '_ins' + str(classInstance)
  print('REQUIRED CLUSTER NAME: ',clusterName)
  jobName = 'Covid_Class_autojob_' + str(modelClass) + '_Model_V' + str(classInstance)
  print('REQUIRED JOB NAME: ', jobName)
  
  cluster = getCluster(db, clusterName)
  print('1. getCluster: ',cluster)
  clusterStatus = startCluster(db, clusterName)
  print("2. Cluster status:", clusterStatus)
  
  print("run the training jobs")
  jobId = getJob(db, jobName, cluster["cluster_id"], modelClass, classInstance)
  runId = runJob(db , jobId, modelClass, classInstance)
  runResult = getRunResult(db, runId["run_id"])

  return runResult

返回runId

共有1个答案

卢晟
2023-03-14

是的,这是可能的-您需要使用完全更新或部分更新API更新作业定义中的相应设置。您只需要在代码中添加相应的步骤即可进行更新。

但主要问题是——为什么需要在现有集群上运行作业?作业应该完全自动运行,在作业集群(自动创建)上运行作业比在交互式集群上运行要便宜得多(几乎是4倍)。考虑切换到该方法,因为它将彻底消除您原来的问题,因为作业将具有附加的集群定义。

附言另一个选项是使用将创建所有对象的Database ricks Terraform Provider

 类似资料:
  • 我有一个JTabbedPane(比如myTabPane),有一个选项卡(为了清晰起见,我们只取一个选项卡)。在创建JTabbedPane时,我在这个选项卡中添加了一个JPanel(比如panel_A)。我在这个JPanel上有一个按钮。这个选项卡完美地显示了我的JPanel,上面有按钮。到目前为止一切都很好。 我已经在按钮上定义了一个侦听器,该侦听器创建了另一个扩展JPanel的类的实例(例如pa

  • 在k8s集群中。如何配置zeppelin在现有spark集群中运行spark作业,而不是旋转一个新的Pod? 我有一个k8s集群正在运行,我想在其中运行与齐柏林飞艇的火花。 Spark使用官方的Bitnami/Spark helm chart(v3.0.0)进行部署。我有一个主舱和两个工人舱运转良好,一切都很好。 短伪DockerFile: 我稍微修改了。(Image,imagePullSecre

  • 我已经在我的Windows7机器上设置了一个本地spark集群(一个主节点和辅助节点)。我已经创建了一个简单的scala脚本,我用sbt构建了这个脚本,并尝试用Spark-Submit运行这个脚本。请参阅以下资源 Scala代码: 现在,我用sbt构建并打包scala代码,并将其打包到一个JAR中。我的build.sbt文件如下所示 它创建一个jar,我使用spark submit命令提交它,如下

  • 所以我现在花了几个小时试图解决这个问题,并希望得到任何帮助。

  • 我有一个在AWS EC2机器上运行的HortonWorks集群,我想在上面运行一个使用spark streaming的spark工作,该工作将吞下tweet concernings《权力的游戏》。在尝试在集群上运行它之前,我确实在本地运行了它。代码正在工作,如下所示: 我的问题更确切地说是关于这段特定代码行: 17/07/24 11:53:42 INFO AppClient$ClientEndpo

  • 这是我拥有的数据帧的简化版本: 在这个 df 中,row.names 是唯一的 ID(我知道它打破了整洁数据的规则)。 在示例中,我们可以看到行id1和行id2是重复的。 我想做的是确定它们是重复的,并为这些重复项分配一个唯一的组名称。但请注意,将有多行彼此重复。 我希望的产出是: 有什么想法吗? 编辑: 我的原始数据示例: