Created
April 22, 2019 10:29
-
-
Save hncuong/4a81d5822fa9be709aad4d0950d38dd5 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package com.zmining.processor.relation.community; | |
| import com.vng.dataplatform.sdk.hdbc.HDFSConnection; | |
| import com.vng.dataplatform.sdk.sparkjob.SparkJobAbst; | |
| import com.vng.dataplatform.sdk.sparkjob.SparkjobResult; | |
| import com.zmining.common.graph.community.clique.KCliqueCommunityFinder; | |
| import com.zmining.service.JavaFriendService; | |
| import it.unimi.dsi.fastutil.ints.IntOpenHashSet; | |
| import org.apache.spark.SparkConf; | |
| import org.apache.spark.api.java.JavaPairRDD; | |
| import org.apache.spark.sql.Dataset; | |
| import org.apache.spark.sql.Row; | |
| import org.apache.spark.sql.SQLContext; | |
| import org.apache.spark.sql.functions; | |
| import org.jgrapht.Graph; | |
| import org.jgrapht.graph.DefaultEdge; | |
| import org.jgrapht.graph.DefaultUndirectedGraph; | |
| import scala.Tuple2; | |
| import java.util.ArrayList; | |
| import java.util.List; | |
| import java.util.Map; | |
| import java.util.Set; | |
| public class ZaloLocalGraphBuilderWithService extends SparkJobAbst { | |
| private static final String friendFolder = "/data/jobs/zmb/aggregated/relation_log/A30-FRIEND/"; | |
| private static final int MOD = 1000; | |
| private static final int DAY_IN_MS = 1000 * 60 * 60 * 24; | |
| public static final String[] FRIENDS_PARQUET = new String[]{ | |
| "/data/jobs/zmb/aggregated/relation_log/A30-FRIEND/0", | |
| "/data/jobs/zmb/aggregated/relation_log/A30-FRIEND/1", | |
| "/data/jobs/zmb/aggregated/relation_log/A30-FRIEND/2", | |
| "/data/jobs/zmb/aggregated/relation_log/A30-FRIEND/3", | |
| "/data/jobs/zmb/aggregated/relation_log/A30-FRIEND/4", | |
| "/data/jobs/zmb/aggregated/relation_log/A30-FRIEND/5", | |
| "/data/jobs/zmb/aggregated/relation_log/A30-FRIEND/6", | |
| }; | |
| private static final int MIN_CLIQUE_SIZE = 4; | |
| private static final int CLIQUE_FINDER_TIMEOUT_MS = 1000; | |
| public static void main (String[] args) throws Exception { | |
| new ZaloLocalGraphBuilderWithService().process(args); | |
| } | |
| @Override | |
| public SparkjobResult execute(long l, HDFSConnection hdfsConnection, SparkConf sparkConf) throws Exception { | |
| SQLContext sqlContext = hdfsConnection.getSparkSession().sqlContext(); | |
| int subIdx = 0; | |
| JavaPairRDD<Integer, Graph<Integer, DefaultEdge>> friendGraphRdd = getFriendGraph(subIdx, sqlContext); | |
| JavaPairRDD<Integer, List<Set<Integer>>> rdd = ZaloLocalCommunityDetector.findCommunities(friendGraphRdd); | |
| ZaloLocalCommunityDetector.saveCommunities(subIdx, rdd, hdfsConnection, sparkConf); | |
| return null; | |
| } | |
| /** | |
| * Create local graph using Friend Service | |
| * @param subIdx | |
| * @param sqlContext | |
| * @return | |
| */ | |
| private static JavaPairRDD<Integer, Graph<Integer, DefaultEdge>> getFriendGraph (int subIdx, SQLContext sqlContext) { | |
| // All friends Df | |
| // src, friends | |
| Dataset<Row> friendDf = sqlContext.read().parquet(FRIENDS_PARQUET); | |
| // Sub friends Df | |
| Dataset<Row> subFriendDf = friendDf.filter(functions.col("src").mod(MOD).equalTo(subIdx)); | |
| // Get friend of friend using friend service | |
| // Create local graph | |
| JavaPairRDD<Integer, Graph<Integer, DefaultEdge>> friendGraphRdd = subFriendDf.toJavaRDD(). | |
| mapToPair(row -> { | |
| Integer src = row.getInt(0); | |
| List<Integer> friend = row.getList(1); | |
| return new Tuple2<>(src, friend); | |
| }) | |
| .mapValues(x -> { | |
| // Get friend of friend | |
| List<Tuple2<Integer, List<Integer>>> shareFriend = new ArrayList<>(); | |
| IntOpenHashSet friend = new IntOpenHashSet(x); | |
| Map<Integer, Set<Integer>> friendOfFriends = JavaFriendService.INSTANCE.getMultiFriend(friend); | |
| // Create graph | |
| Graph<Integer, DefaultEdge> graph = new DefaultUndirectedGraph<>(DefaultEdge.class); | |
| for (int fid : friendOfFriends.keySet()) { | |
| Set<Integer> fFriend = friendOfFriends.get(fid); | |
| fFriend.retainAll(friend); | |
| if (!graph.containsVertex(fid)) graph.addVertex(fid); | |
| // Avoid pitfall | |
| fFriend.remove(fid); | |
| // add edges | |
| for (Integer fr : fFriend) { | |
| if (!graph.containsVertex(fr)) graph.addVertex(fr); | |
| graph.addEdge(fid, fr); | |
| } | |
| } | |
| return graph; | |
| }); | |
| return friendGraphRdd; | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment