|Most Shared

Kabali tech coding spark big data
12 Nov 2016at Cumming

Problem

  • Given big data jobs have widely varying business needs
  • and operate in a shifting environment with respect to data, software and operations,
  • when I must assert their sanity with regression,
  • then I need a hassle-free, transparent mechanism to enforce sampling of the job runs in any environment

Solution

Kabali is a library which
  • formalizes proportionate stratified sampling approach
  • enables fail-fast in production by first running the sample before entire population
  • easy declarative configuration for analysts
  • enforces sampling need contracts on developers, yet transparent, minimal and non-intrusive.

Points

  • Kabali is not intended to train or validate the Business Model used in the job, as the Model is only one of the variables in a production job execution.
  • In case of big data jobs, the data, software and infrastructure are continuously shifting leading to variance in the job outputs over time, which Kabali will track and make a call.
  • Kabali initial approach of using weighted stratified sampling is a first attempt, we are pursuing experts for a stronger model.
  • Kabali serves as a medium for analysts, developers and operators to use sampling and ensure the job sanity in production via regression
  • Kabali is a library written in Scala and can be just added as a dependency.
  • It currently employs Apache Spark stack like DataSets and Spark SQL, but the testing principles can be ported later to any big data setup with appropriate tech plugged in.
  • Kabali brings sampling and regression testing as a first-class citizen to Big data environment, rather than being an after-thought.
Briefly put, Kabali ..
  • extracts samples from the input population based on the configured sampling criteria
  • runs the job only for the extracted sample
  • validates the sample output against expected tolerance levels
  • if sample output is good, proceeds to execute for entire population, else bails out.

Kabali provides the following traits, refer the code on how they work

  • Samplable trait, which enforces the below contract and is used to select the samples from the input population
  • Sampler trait is on the job output, and enforces a contract to give the deciding key, which in this case is a credit score
  • Sampling trait which the client job should extend, it does the crux of extracting samplables, run only sample first before entire population/
trait Samplable {
   def getJson: String
   def getKey: String
}
trait Sampler {
   def getKey:String
}
trait Sampling extends Logging {

    /**
    * client must implement to run the sample through the batch process
    * planned for entire population
    */
    def runSample(rdd: RDD[Samplable]): RDD[Sampler]

    /**
    * client must implement to run the population through the batch process
    * planned for entire population
    */
    def runPopulation(rdd: RDD[Samplable])

    /**
    * client must provide the sampling yaml file location
    */
    def kabaliConfig: String

    /**
    * kabali will run and validate sample first before running against whole population
    */
    protected def checkAndRun(population: RDD[Samplable], sc: SparkContext): Unit = {
        val samplables: RDD[Samplable] = extract(population, sc)
        val samplers: RDD[Sampler] = runSample(samplables)

        if (validate(samplers, sc)) {
            logInfo("Job is safe to proceed for entire population")
            runPopulation(population)
        } else {
             logError("Job is exhibiting deviation from acceptable thresholds, hence cannot proceed!")
        }
    }

    /**
    * kabali will extract the samples from population
    */
    private def extract(samplables: RDD[Samplable], sc: SparkContext): RDD[Samplable] = {
        val sqlContext = new SQLContext(sc)
        var samples: RDD[Samplable] = sc.emptyRDD[Samplable]
        val jsons = samplables.map(item => item.getJson)

        //get the fraction of the sample vs population
        val fraction = config.sampleSize.toDouble / jsons.count().toDouble

        //register the samplables as table
        sqlContext.read.json(jsons).cache().registerTempTable(allSamplables)

        //extract each configured strata sample using spark-sql
        config.stratas.foreach(strata => {
            val matched = sqlContext.sql(strata.sql.stripMargin)
            val ids = matched.select(column).map(row => row.getString(0)).collect()
            val strataFraction = strata.fraction * fraction
            logInfo("name=" + strata.name + s"|fraction=$fraction|strata-fraction=$strataFraction")

            val sample = samplables
            .filter(p => ids.contains(p.getKey))
            .sample(withReplacement = false, strataFraction, 0)
            samples = samples.union(sample)
        })
        logInfo("kabali sample size = " + samples.count())
        samples.cache()
        samples
    }

    /**
    * kabali will validate the processed sample against the thresholds
    */
    private def validate(rdd: RDD[Sampler], sc: SparkContext): Boolean = {
        val sqlContext = new SQLContext(sc)
        val jsons = rdd.map(item => item.getKey.replaceAll("[\\s]", "_"))

        //register all the strata samples together as a table
        sqlContext.read.json(jsons).cache().registerTempTable(allSamplers)

        //pick the key used for sample health determination
        val scoreSet = sqlContext.sql(config.scoreKey.stripMargin)
        val column = scoreSet.columns(0)
        val keys = scoreSet.select(column).map(row => row.getLong(0).toDouble).collect()
        val stats = new DescriptiveStatistics(keys)

        val tolerance = config.tolerance
        val expectedMean = config.expectedMean
        val sampleMean = stats.getMean

        //validate the sample health against configured tolerance expectations
        sampleMean < (expectedMean + expectedMean * tolerance) && sampleMean > (expectedMean - expectedMean * tolerance)
    }

    /**
    * load the sampling strata configuration from client file
    * @return
    */
    private def loadFromFile: Config = {
        val resource: URL = this.getClass.getClassLoader.getResource(kabaliConfig)
        val reader = new FileReader(resource.getFile)
        new ObjectMapper(new YAMLFactory()).readValue(reader, classOf[Config])
    }

    override def toString: String = {
        val builder: StringBuilder = new StringBuilder
        builder.append(config.job + "|\n")
        for (strata: Strata <- config.stratas) builder.append(strata.toString)
        builder.toString()
    }

    private lazy val config = loadFromFile
    private val column: String = "_c0"
    private val allSamplables: String = "samplable"
    private val allSamplers: String = "sampler"

 }

Sample Job

Let us take a simple case of a spark job say CreditJob calculating credit scores of people using their credit profiles.
  • The CreditProfile which extends Kabali's Samplable is the job input, which represents a record of all loan balances, deliquency history, payments made etc, which is stored as a JSON document.
  • The CreditScore which extends Kabali's Sampler, is the job output, that contains the credit score for an given entity or person from a scoring model in the job execution.
class CreditProfile extends Samplable {
     override def getJson: String = {
        "return the json { } payload containing all the credit card balances and history. This will become a dataset"
     }
     override def getKey: String = {
        "return the the unique entity key like SSN" //implement to give a entity key
     }
}
class CreditScore extends Sampler {
    override def getKey: String = {
       "return the final score like 789, 670..etc for a given person"
    }
}
    
Now, finally the job that calculates the credit score.
 class CreditScoreJob(sc: SparkContext) extends Sampling {

     val level = StorageLevel.MEMORY_AND_DISK
     override def kabaliConfig: String = "kabali.yml"

     /**
     * steps
     * -----
     * kabali will extract sample based on tester-configured biased stratified sampling
     * kabali will run only the sample through the batch process
     * kabali will validate the sample batch output through configured thresholds
     * kabali will allow the job to proceed only if no significant deviation is found
     */
     def run(context: SparkContext): Unit = {
         val rdd = sc.emptyRDD[CreditProfile] //read customer profiles data from cassandra | hdfs
         val samplables = rdd.map(item => item.asInstanceOf[Samplable])
         checkAndRun(samplables, context)
     }

     /**
     * run the scoring model for sample set only
     */
     override def runSample(rdd: RDD[Samplable]): RDD[Sampler] = {
         val creditProfiles = rdd.map(r => r.asInstanceOf[CreditProfile])
         applyScoringModel(creditProfiles).map(score => score.asInstanceOf[Sampler])
     }

     /**
     * run the scoring model for entire population
     */
     override def runPopulation(rdd: RDD[Samplable]) = {
         val creditProfiles = rdd.map(r => r.asInstanceOf[CreditProfile])
         applyScoringModel(creditProfiles)
     }

     /**
     * Run the original logic of the job, say you take the rdd through derivation of some attributes and apply a scoring model.
     * @param rdd the rdd of original job items
     * @return the processed job items
     */
     def applyScoringModel(rdd: RDD[CreditProfile]): RDD[CreditScore] = {
         rdd.map(profile => new CreditScore) //some real attribution and scoring model goes here; now dummy shown
     }
 }
     
comments powered by Disqus

All content except noted photos and videos copyright © Vijayaraj Chakravarthy. All rights reserved. *Any images or videos not listed as mine are copyright to their respective owners and were used under creative common license or fair use standards. If a photo or video is your material and you do not wish it to be on the site, please email me vijayrc@outlook.com and I will remove it immediately.