-
-
Save molinalucas/868c80081fe7b14b442613c900ca54e6 to your computer and use it in GitHub Desktop.
Spark-compatible Kafka DefaultPartitioner
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // This is directly, lazily translated to Scala from the original Java source from the Kafka Clients lib | |
| def murmur2(data: Array[Byte]): Int = { | |
| val length = data.length | |
| val seed = 0x9747b28c | |
| // 'm' and 'r' are mixing constants generated offline. | |
| // They're not really 'magic', they just happen to work well. | |
| val m = 0x5bd1e995 | |
| val r = 24 | |
| // Initialize the hash to a random value | |
| var h = seed ^ length | |
| val length4 = length / 4 | |
| for (i <- 0 until length4) { | |
| val i4 = i * 4 | |
| var k = (data(i4 + 0) & 0xff) + ((data(i4 + 1) & 0xff) << 8) + ((data(i4 + 2) & 0xff) << 16) + ((data(i4 + 3) & 0xff) << 24) | |
| k *= m | |
| k ^= k >>> r | |
| k *= m | |
| h *= m | |
| h ^= k | |
| } | |
| // Handle the last few bytes of the input array | |
| length % 4 match { | |
| case 3 => | |
| h ^= (data((length & ~3) + 2) & 0xff) << 16 | |
| h ^= (data((length & ~3) + 1) & 0xff) << 8 | |
| h ^= data(length & ~3) & 0xff | |
| h *= m | |
| case 2 => | |
| h ^= (data((length & ~3) + 1) & 0xff) << 8 | |
| h ^= data(length & ~3) & 0xff | |
| h *= m | |
| case 1 => | |
| h ^= data(length & ~3) & 0xff | |
| h *= m | |
| case 0 => | |
| } | |
| h ^= h >>> 13 | |
| h *= m | |
| h ^= h >>> 15 | |
| h | |
| } | |
| def toPositive(number: Int): Int = number & 0x7fffffff | |
| def getPartition(data: Array[Byte], partitionCount: Int): Int = { | |
| val hash = murmur2(data) | |
| val partition = toPositive(hash) % partitionCount | |
| partition | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def serialize(value: String): Array[Byte] = value.getBytes("UTF8") // matches default configured Kafka `StringSerializer` | |
| def partitioner(partitions: Int)(value: String): Int = { | |
| getPartition(serialize(value), partitions) | |
| } | |
| // translates the function into the UDF that can be used with Spark DataFrames | |
| def partition24 = udf { partitioner(24) } | |
| spark.sql("SELECT key, value FROM place").withColumn("kafka_partition", partition24($"key")) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment