Skip to content

Instantly share code, notes, and snippets.

@molinalucas
Forked from sriggin/1_Partitioner.scala
Created December 11, 2022 01:45
Show Gist options
  • Select an option

  • Save molinalucas/868c80081fe7b14b442613c900ca54e6 to your computer and use it in GitHub Desktop.

Select an option

Save molinalucas/868c80081fe7b14b442613c900ca54e6 to your computer and use it in GitHub Desktop.
Spark-compatible Kafka DefaultPartitioner
// This is directly, lazily translated to Scala from the original Java source from the Kafka Clients lib
def murmur2(data: Array[Byte]): Int = {
val length = data.length
val seed = 0x9747b28c
// 'm' and 'r' are mixing constants generated offline.
// They're not really 'magic', they just happen to work well.
val m = 0x5bd1e995
val r = 24
// Initialize the hash to a random value
var h = seed ^ length
val length4 = length / 4
for (i <- 0 until length4) {
val i4 = i * 4
var k = (data(i4 + 0) & 0xff) + ((data(i4 + 1) & 0xff) << 8) + ((data(i4 + 2) & 0xff) << 16) + ((data(i4 + 3) & 0xff) << 24)
k *= m
k ^= k >>> r
k *= m
h *= m
h ^= k
}
// Handle the last few bytes of the input array
length % 4 match {
case 3 =>
h ^= (data((length & ~3) + 2) & 0xff) << 16
h ^= (data((length & ~3) + 1) & 0xff) << 8
h ^= data(length & ~3) & 0xff
h *= m
case 2 =>
h ^= (data((length & ~3) + 1) & 0xff) << 8
h ^= data(length & ~3) & 0xff
h *= m
case 1 =>
h ^= data(length & ~3) & 0xff
h *= m
case 0 =>
}
h ^= h >>> 13
h *= m
h ^= h >>> 15
h
}
def toPositive(number: Int): Int = number & 0x7fffffff
def getPartition(data: Array[Byte], partitionCount: Int): Int = {
val hash = murmur2(data)
val partition = toPositive(hash) % partitionCount
partition
}
def serialize(value: String): Array[Byte] = value.getBytes("UTF8") // matches default configured Kafka `StringSerializer`
def partitioner(partitions: Int)(value: String): Int = {
getPartition(serialize(value), partitions)
}
// translates the function into the UDF that can be used with Spark DataFrames
def partition24 = udf { partitioner(24) }
spark.sql("SELECT key, value FROM place").withColumn("kafka_partition", partition24($"key"))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment