当前位置: 首页 > 知识库问答 >
问题:

Scala spark kafka代码函数方法

韦星文
2023-03-14

下面是Scala中的代码。我正在使用spark sql从hadoop中提取数据,对结果执行一些分组,序列化它,然后将消息写给Kafka。

我已经写了代码--但我想用函数的方式来写。我是否应该创建一个具有“get categories”函数的新类来从Hadoop中获取类别?我不知道如何处理这件事。

这是代码

class ExtractProcessor {
  def process(): Unit = {

  implicit val formats = DefaultFormats

  val spark = SparkSession.builder().appName("test app").getOrCreate()

  try {
     val df = spark.sql("SELECT DISTINCT SUBCAT_CODE, SUBCAT_NAME, CAT_CODE, CAT_NAME " +
    "FROM CATEGORY_HIERARCHY " +
    "ORDER BY CAT_CODE, SUBCAT_CODE ")

     val result = df.collect().groupBy(row => (row(2), row(3)))
     val categories = result.map(cat =>
                    category(cat._1._1.toString(), cat._1._2.toString(),
                      cat._2.map(subcat =>
                      subcategory(subcat(0).toString(), subcat(1).toString())).toList))

     val jsonMessage = write(categories)
     val kafkaKey = java.security.MessageDigest.getInstance("SHA-1").digest(jsonMessage.getBytes("UTF-8")).map("%02x".format(_)).mkString.toString()
     val key = write(kafkaKey)

     Logger.log.info(s"Json Message: ${jsonMessage}")
     Logger.log.info(s"Kafka Key: ${key}")

     KafkaUtil.apply.send(key, jsonMessage, "testTopic")      
}
class KafkaUtil {
  def send(key: String, message: String, topicName: String): Unit = {
  val properties = new Properties()
  properties.put("bootstrap.servers", "localhost:9092")
  properties.put("client.id", "test publisher")
  properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  val producer = new KafkaProducer[String, String](properties)

  try {

    val record = new ProducerRecord[String, String](topicName, key, message)
    producer.send(record)
  }
  finally {
    producer.close()
    Logger.log.info("Kafka producer closed...")
  }
 }
}

object KafkaUtil {
  def apply: KafkaUtil = {
  new KafkaUtil
 }
}

提前谢谢你,苏约格

共有1个答案

劳韬
2023-03-14

您的代码包括:1)将数据加载到spark df中;2)处理数据;3)创建json消息;4)向kafka发送json消息

单元测试对于测试纯函数是很好的。您可以将步骤2)提取到具有类似def getCategories(df:DataFrame):seq[Category]签名的方法中,并用测试覆盖它。

步骤3)也可以包含在单元测试中,如果您觉得它容易出错

步骤1)4)将由端到端测试涵盖

顺便说一句,val result=df.collect().groupby(row=>(row(2),row(3)))效率很低。最好将其替换为val result=df.groupby(row=>(row(2),row(3))).collect

 类似资料:
  • 本文向大家介绍MySql COALESCE函数使用方法代码案例,包括了MySql COALESCE函数使用方法代码案例的使用技巧和注意事项,需要的朋友参考一下 COALESCE是一个函数, (expression_1, expression_2, ...,expression_n)依次参考各参数表达式,遇到非null值即停止并返回该值。如果所有的表达式都是空值,最终将返回一个空值。使用COALES

  • 问题内容: 我试图自定义现有的JS库,而不修改原始JS代码。这段代码将加载一些我可以访问的外部JS文件,而我想做的就是更改原始文件中包含的功能之一,而无需将整个内容复制并粘贴到第二个JS文件中。 因此,例如,禁区JS可能具有以下功能: 我希望能够以某种方式在该函数中追加或添加一些JS代码。原因主要是在原始的不可触摸的JS中,该功能非常庞大,如果该JS得到更新,则我用它覆盖的功能将过时。 我不确定这

  • 问题内容: 我有一个C函数,我想从C 调用。我无法使用“ ”这样的方法,因为C函数无法使用g 进行编译。但是使用gcc可以很好地编译。有什么想法如何从C ++调用函数吗? 问题答案: 像这样编译C代码: 然后是这样的C ++代码: 然后使用C ++链接器将它们链接在一起: 当您包含C函数的声明时,还必须告诉C ++编译器C头即将到来。因此开始于: 应该包含以下内容: (在此示例中,我使用了gcc,

  • 问题内容: 我想从JavaScript代码中调用Python函数,因为JavaScript中没有其他方法可以做我想做的事情。这可能吗?您可以调整以下代码段使其正常工作吗? JavaScript代码: 包含使用高级库编写的函数,这些函数在JavaScript中没有易写的等效项: 问题答案: 您需要做的就是向您的pythoncode发出ajax请求。您可以使用jquery 进行此操作,也可以仅使用ja

  • 问题内容: 我可以通过编程获得名称的功能代码吗? 喜欢: 可能吗? 是否有任何PHP自描述函数来重构函数/类代码? (意味着不要从源文件中正确获取源代码) 在Java中存在:http : //java.sun.com/developer/technicalArticles/ALT/Reflection/ 在PHP: get_class_method(0函数) 的typeof ReflectionF

  • 我在R中查看cov的source_code,遇到了一段我不太理解的代码。 协方差的数学定义在这里。