当前位置: 首页 > 工具软件 > Scalding > 使用案例 >

mahout和spark_Mahout和Scalding用于检测扑克共谋

方焱
2023-12-01

mahout和spark

当我读过一本关于Mahout的非常出色的书《 Mahout In Action》 (这也是机器学习的出色入门)时,其中的一个例子引起了我的注意。
该书的作者使用了著名的K-means聚类算法在stackoverflow.com上寻找相似的参与者,其中相似性的标准是用户对问题的投票/回答的作者的集合。

用简单的话来说,K-means算法迭代地找到多维空间中彼此靠近的点/向量的簇。 应用于在StackOverflow中寻找相似玩家的问题时,我们假设多维空间中的每个轴都是用户,其中从零到零的距离是点的总和,授予其他玩家给出的问题/答案(尺寸通常也称为“特征”,其中距离是“特征权重”)。

显然,可以将相同的方法应用于大规模多人在线扑克中最复杂的问题之一-共谋检测。 我们假设一个简单的假设是,如果两个或多个玩家彼此玩了太多游戏(考虑到任何一个玩家可能只是一个活跃的玩家,并且与任何人玩了很多游戏),他们可能勾结。

使用K-means聚类算法,我们将大量玩家分成一个很小的紧密集群(最好每个集群有2-8个玩家)。 在我们将进一步介绍的基本实现中,每个用户都表示为矢量,其中轴是她玩过的其他玩家(功能的权重是一起玩的游戏数量)。

阶段1.建立字典

第一步,我们需要构建所有球员的字典/枚举,并参与我们分析的手部历史子集:

// extract user ID from hand history record
  val userId = (playerHistory: PlayerHandHistory) =>
    new Text(playerHistory.getUserId.toString)

  // Builds basic dixtionary (enumeration, in fact) of all the players, participated in the selected subset of hand
  // history records
  class Builder(args: Args) extends Job(args) {

    // input tap is an HTable with hand history entries: hand history id -> hand history record, serialized with ProtoBuf
    val input = new HBaseSource("hand", args("hbasehost"), 'handId, Array("d"), Array('blob))
    // output tap - plain text file with player IDs
    val output = TextLine(args("output"))

    input
      .read
      .flatMap('blob -> 'player) {
      // every hand history record contains the list of players, participated in the hand
      blob: Array[Byte] => // at the first stage, we simply extract the list of IDs, and add it to the flat list
        HandHistory.parseFrom(blob).getPlayerList.map(userId)
    }
      .unique('player) // remove duplicate user IDs
      .project('player) // leave only 'player column from the tuple
      .write(output)

  }
1003
 1004
 1005
 1006
 1007
 ...

阶段2。向字典添加索引

其次,我们将用户ID映射到向量中玩家的位置/索引。

class Indexer(args: Args) extends Job(args) {

    val output = WritableSequenceFile(args("output"), classOf[Text], classOf[IntWritable],
      'userId -> 'idx)

    TextLine(args("input")).read
      .map(('offset -> 'line) -> ('userId -> 'idx)) {
      // dictionary lines are read with indices from TextLine source
      // out of the box. For some reason, in my case, indices were multiplied by 5, so I have had to divide them
      tuple: (Int, String) => (new Text(tuple._2.toString) -> new IntWritable((tuple._1 / 5)))
    }
      .project(('userId -> 'idx)) // only userId -> index tuple is passed to the output
      .write(output)

  }
1003 0
 1004 1
 1005 2
 1006 3
 1007 4
 ...

阶段3.构建载体

我们构建向量,将其作为输入传递给K-means聚类算法。 如上所述,向量中的每个位置都对应于与该玩家玩过的其他玩家:

/**
 * K-means clustering algorithm requires the input to be represented as vectors.
 * In out case, the vector, itself, represents the player, where other users, the player has played with, are
 * vector axises/features (the weigh of the feature is a number of games, played together)
 * User: remeniuk
 */
class VectorBuilder(args: Args) extends Job(args) {

  import Dictionary._

  // initializes dictionary pipe
  val dictionary = TextLine(args("dictionary"))
    .read
    .map(('offset -> 'line) -> ('userId -> 'dictionaryIdx)) {
    tuple: (Int, String) =>
      (tuple._2 -> tuple._1 / 5)
  }
    .project(('userId -> 'dictionaryIdx))

  val input = new HBaseSource("hand", args("hbasehost"), 'handId, Array("d"), Array('blob))
  val output = WritableSequenceFile(args("output"), classOf[Text], classOf[VectorWritable],
    'player1Id -> 'vector)

  input
    .read
    .flatMap('blob -> ('player1Id -> 'player2Id)) {
    //builds a flat list of pairs of users that player together
    blob: Array[Byte] =>
      val playerList = HandsHistoryCoreInternalDomain.HandHistory.parseFrom(blob).getPlayerList.map(userId)
      playerList.flatMap {
        playerId =>
          playerList.filterNot(_ == playerId).map(otherPlayerId => (playerId -> otherPlayerId.toString))
      }
  }
    .joinWithSmaller('player2Id -> 'userId, dictionary) // joins the list of pairs of //user that played together with
    // the dictionary, so that the second member of the tuple (ID of the second //player) is replaced with th index
    //in the dictionary
    .groupBy('player1Id -> 'dictionaryIdx) {
    group => group.size // groups pairs of players, played together, counting the number of hands
  }
    .map(('player1Id, 'dictionaryIdx, 'size) ->('playerId, 'partialVector)) {
    tuple: (String, Int, Int) =>
      val partialVector = new NamedVector(
        new SequentialAccessSparseVector(args("dictionarySize").toInt), tuple._1) 
// turns a tuple of two users
      // into a vector with one feature
      partialVector.set(tuple._2, tuple._3)
      (new Text(tuple._1), new VectorWritable(partialVector))
  }
    .groupBy('player1Id) {
    // combines partial vectors into one vector that represents the number of hands, //played with other players
    group => group.reduce('partialVector -> 'vector) {
      (left: VectorWritable, right: VectorWritable) =>
        new VectorWritable(left.get.plus(right.get))
    }
  }
    .write(output)

}
1003 {3:5.0,5:4.0,6:4.0,9:4.0}
 1004 {2:4.0,4:4.0,8:4.0,37:4.0}
 1005 {1:4.0,4:5.0,8:4.0,37:4.0}
 1006 {0:5.0,5:4.0,6:4.0,9:4.0}
 1007 {1:4.0,2:5.0,8:4.0,37:4.0}
 ...

向量化输入所需的整个工作流程:

val conf = new Configuration
    conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
      + "org.apache.hadoop.io.serializer.WritableSerialization")

    // the path, where the vectors will be stored to
    val vectorsPath = new Path("job/vectors")
    // enumeration of all users involved in a selected subset of hand history records
    val dictionaryPath = new Path("job/dictionary")
    // text file with the dictionary size
    val dictionarySizePath = new Path("job/dictionary-size")
    // indexed dictionary (every user ID in the dictionary is mapped to an index, from 0)
    val indexedDictionaryPath = new Path("job/indexed-dictionary")

    println("Building dictionary...")
    // extracts IDs of all the users, participating in selected subset of hand history records
    Tool.main(Array(classOf[Dictionary.Builder].getName, "--hdfs",
      "--hbasehost", "localhost", "--output", dictionaryPath.toString))
    // adds index to the dictionary
    Tool.main(Array(classOf[Dictionary.Indexer].getName, "--hdfs",
      "--input", dictionaryPath.toString, "--output", indexedDictionaryPath.toString))
    // calculates dictionary size, and stores it to the FS
    Tool.main(Array(classOf[Dictionary.Size].getName, "--hdfs",
      "--input", dictionaryPath.toString, "--output", dictionarySizePath.toString))

    // reads dictionary size
    val fs = FileSystem.get(dictionaryPath.toUri, conf)
    val dictionarySize = new BufferedReader(
      new InputStreamReader(
        fs.open(new Path(dictionarySizePath, "part-00000"))
      )).readLine().toInt

    println("Vectorizing...")
    // builds vectors (player -> other players in the game)
    // IDs of other players (in the vectors) are replaces with indices, taken from dictionary
    Tool.main(Array(classOf[VectorBuilder].getName, "--hdfs",
      "--dictionary", dictionaryPath.toString, "--hbasehost", "localhost",
      "--output", vectorsPath.toString, "--dictionarySize", dictionarySize.toString))

阶段4.生成n个随机簇

随机聚类/质心是K-means算法的切入点:

//randomly selected cluster the will be passed as an input to K-means
     val inputClustersPath = new Path('jobinput-clusters')
     val distanceMeasure = new EuclideanDistanceMeasure
 
     println('Making random seeds...')
      //build 30 initial random clusterscentroids
     RandomSeedGenerator.buildRandom(conf, vectorsPath, inputClustersPath, 30, distanceMeasure)

阶段5.运行K-means算法

每次下一次迭代时,K均值将找到更好的质心和聚类。 因此,我们有30个玩家群,他们之间的互动最频繁:

// clusterization results
    val outputClustersPath = new Path("job/output-clusters")
    // textual dump of clusterization results
    val dumpPath = "job/dump"

    println("Running K-means...")
    // runs K-means algorithm with up to 20 iterations, to find clusters of colluding players (assumption of collusion is
    // made on the basis of number hand player together with other player[s])
    KMeansDriver.run(conf, vectorsPath, inputClustersPath, outputClustersPath,
      new CosineDistanceMeasure(), 0.01, 20, true, 0, false)

    println("Printing results...")

    // dumps clusters to a text file
    val clusterizationResult = finalClusterPath(conf, outputClustersPath, 20)
    val clusteredPoints = new Path(outputClustersPath, "clusteredPoints")
    val clusterDumper = new ClusterDumper(clusterizationResult, clusteredPoints)
    clusterDumper.setNumTopFeatures(10)
    clusterDumper.setOutputFile(dumpPath)
    clusterDumper.setTermDictionary(new Path(indexedDictionaryPath, "part-00000").toString,
      "sequencefile")
    clusterDumper.printClusters(null)

结果

现在转到“作业/转储” –该文件包含由K-means生成的所有群集的文本转储。 这是文件的一小部分:

VL-0{n=5 c=[1003:3.400, 1006:3.400, 1008:3.200, 1009:3.200, 1012:3.200] r=[1003:1.744, 1006:1.744, 1008:1.600, 1009:1.600, 1012:1.600]}
  Top Terms: 
   1006                                    =>                 3.4
   1003                                    =>                 3.4
   1012                                    =>                 3.2
   1009                                    =>                 3.2
   1008                                    =>                 3.2
 
 VL-15{n=1 c=[1016:4.000, 1019:3.000, 1020:3.000, 1021:3.000, 1022:3.000, 1023:3.000, 1024:3.000, 1025:3.000] r=[]}
  Top Terms: 
   1016                                    =>                 4.0
   1025                                    =>                 3.0
   1024                                    =>                 3.0
   1023                                    =>                 3.0
   1022                                    =>                 3.0
   1021                                    =>                 3.0
   1020                                    =>                 3.0
   1019                                    =>                 3.0

如我们所见,已检测到2个玩家群:一个有8个玩家,彼此之间玩了很多游戏,第二个有4个玩家。

参考: Vasil Remeniuk博客博客上的JCG合作伙伴 Vasil Remeniuk 使用Mahout和 Scalding进行的扑克勾结检测


翻译自: https://www.javacodegeeks.com/2012/08/mahout-and-scalding-for-poker-collusion.html

mahout和spark

 类似资料: