下面是Scala中的代码。我正在使用spark sql从hadoop中提取数据,对结果执行一些分组,序列化它,然后将消息写给Kafka。
我已经写了代码--但我想用函数的方式来写。我是否应该创建一个具有“get categories”函数的新类来从Hadoop中获取类别?我不知道如何处理这件事。
这是代码
class ExtractProcessor {
def process(): Unit = {
implicit val formats = DefaultFormats
val spark = SparkSession.builder().appName("test app").getOrCreate()
try {
val df = spark.sql("SELECT DISTINCT SUBCAT_CODE, SUBCAT_NAME, CAT_CODE, CAT_NAME " +
"FROM CATEGORY_HIERARCHY " +
"ORDER BY CAT_CODE, SUBCAT_CODE ")
val result = df.collect().groupBy(row => (row(2), row(3)))
val categories = result.map(cat =>
category(cat._1._1.toString(), cat._1._2.toString(),
cat._2.map(subcat =>
subcategory(subcat(0).toString(), subcat(1).toString())).toList))
val jsonMessage = write(categories)
val kafkaKey = java.security.MessageDigest.getInstance("SHA-1").digest(jsonMessage.getBytes("UTF-8")).map("%02x".format(_)).mkString.toString()
val key = write(kafkaKey)
Logger.log.info(s"Json Message: ${jsonMessage}")
Logger.log.info(s"Kafka Key: ${key}")
KafkaUtil.apply.send(key, jsonMessage, "testTopic")
}
class KafkaUtil {
def send(key: String, message: String, topicName: String): Unit = {
val properties = new Properties()
properties.put("bootstrap.servers", "localhost:9092")
properties.put("client.id", "test publisher")
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](properties)
try {
val record = new ProducerRecord[String, String](topicName, key, message)
producer.send(record)
}
finally {
producer.close()
Logger.log.info("Kafka producer closed...")
}
}
}
object KafkaUtil {
def apply: KafkaUtil = {
new KafkaUtil
}
}
提前谢谢你,苏约格
您的代码包括:1)将数据加载到spark df中;2)处理数据;3)创建json消息;4)向kafka发送json消息
单元测试对于测试纯函数是很好的。您可以将步骤2)
提取到具有类似def getCategories(df:DataFrame):seq[Category]
签名的方法中,并用测试覆盖它。
步骤3)
也可以包含在单元测试中,如果您觉得它容易出错
步骤1)
和4)
将由端到端测试涵盖
顺便说一句,val result=df.collect().groupby(row=>(row(2),row(3)))
效率很低。最好将其替换为val result=df.groupby(row=>(row(2),row(3))).collect
本文向大家介绍MySql COALESCE函数使用方法代码案例,包括了MySql COALESCE函数使用方法代码案例的使用技巧和注意事项,需要的朋友参考一下 COALESCE是一个函数, (expression_1, expression_2, ...,expression_n)依次参考各参数表达式,遇到非null值即停止并返回该值。如果所有的表达式都是空值,最终将返回一个空值。使用COALES
问题内容: 我试图自定义现有的JS库,而不修改原始JS代码。这段代码将加载一些我可以访问的外部JS文件,而我想做的就是更改原始文件中包含的功能之一,而无需将整个内容复制并粘贴到第二个JS文件中。 因此,例如,禁区JS可能具有以下功能: 我希望能够以某种方式在该函数中追加或添加一些JS代码。原因主要是在原始的不可触摸的JS中,该功能非常庞大,如果该JS得到更新,则我用它覆盖的功能将过时。 我不确定这
问题内容: 我有一个C函数,我想从C 调用。我无法使用“ ”这样的方法,因为C函数无法使用g 进行编译。但是使用gcc可以很好地编译。有什么想法如何从C ++调用函数吗? 问题答案: 像这样编译C代码: 然后是这样的C ++代码: 然后使用C ++链接器将它们链接在一起: 当您包含C函数的声明时,还必须告诉C ++编译器C头即将到来。因此开始于: 应该包含以下内容: (在此示例中,我使用了gcc,
问题内容: 我想从JavaScript代码中调用Python函数,因为JavaScript中没有其他方法可以做我想做的事情。这可能吗?您可以调整以下代码段使其正常工作吗? JavaScript代码: 包含使用高级库编写的函数,这些函数在JavaScript中没有易写的等效项: 问题答案: 您需要做的就是向您的pythoncode发出ajax请求。您可以使用jquery 进行此操作,也可以仅使用ja
问题内容: 我可以通过编程获得名称的功能代码吗? 喜欢: 可能吗? 是否有任何PHP自描述函数来重构函数/类代码? (意味着不要从源文件中正确获取源代码) 在Java中存在:http : //java.sun.com/developer/technicalArticles/ALT/Reflection/ 在PHP: get_class_method(0函数) 的typeof ReflectionF
我在R中查看cov的source_code,遇到了一段我不太理解的代码。 协方差的数学定义在这里。