Skip to content

Instantly share code, notes, and snippets.

@hncuong
Created April 22, 2019 10:29
Show Gist options
  • Select an option

  • Save hncuong/4a81d5822fa9be709aad4d0950d38dd5 to your computer and use it in GitHub Desktop.

Select an option

Save hncuong/4a81d5822fa9be709aad4d0950d38dd5 to your computer and use it in GitHub Desktop.
package com.zmining.processor.relation.community;
import com.vng.dataplatform.sdk.hdbc.HDFSConnection;
import com.vng.dataplatform.sdk.sparkjob.SparkJobAbst;
import com.vng.dataplatform.sdk.sparkjob.SparkjobResult;
import com.zmining.common.graph.community.clique.KCliqueCommunityFinder;
import com.zmining.service.JavaFriendService;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.functions;
import org.jgrapht.Graph;
import org.jgrapht.graph.DefaultEdge;
import org.jgrapht.graph.DefaultUndirectedGraph;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class ZaloLocalGraphBuilderWithService extends SparkJobAbst {
private static final String friendFolder = "/data/jobs/zmb/aggregated/relation_log/A30-FRIEND/";
private static final int MOD = 1000;
private static final int DAY_IN_MS = 1000 * 60 * 60 * 24;
public static final String[] FRIENDS_PARQUET = new String[]{
"/data/jobs/zmb/aggregated/relation_log/A30-FRIEND/0",
"/data/jobs/zmb/aggregated/relation_log/A30-FRIEND/1",
"/data/jobs/zmb/aggregated/relation_log/A30-FRIEND/2",
"/data/jobs/zmb/aggregated/relation_log/A30-FRIEND/3",
"/data/jobs/zmb/aggregated/relation_log/A30-FRIEND/4",
"/data/jobs/zmb/aggregated/relation_log/A30-FRIEND/5",
"/data/jobs/zmb/aggregated/relation_log/A30-FRIEND/6",
};
private static final int MIN_CLIQUE_SIZE = 4;
private static final int CLIQUE_FINDER_TIMEOUT_MS = 1000;
public static void main (String[] args) throws Exception {
new ZaloLocalGraphBuilderWithService().process(args);
}
@Override
public SparkjobResult execute(long l, HDFSConnection hdfsConnection, SparkConf sparkConf) throws Exception {
SQLContext sqlContext = hdfsConnection.getSparkSession().sqlContext();
int subIdx = 0;
JavaPairRDD<Integer, Graph<Integer, DefaultEdge>> friendGraphRdd = getFriendGraph(subIdx, sqlContext);
JavaPairRDD<Integer, List<Set<Integer>>> rdd = ZaloLocalCommunityDetector.findCommunities(friendGraphRdd);
ZaloLocalCommunityDetector.saveCommunities(subIdx, rdd, hdfsConnection, sparkConf);
return null;
}
/**
* Create local graph using Friend Service
* @param subIdx
* @param sqlContext
* @return
*/
private static JavaPairRDD<Integer, Graph<Integer, DefaultEdge>> getFriendGraph (int subIdx, SQLContext sqlContext) {
// All friends Df
// src, friends
Dataset<Row> friendDf = sqlContext.read().parquet(FRIENDS_PARQUET);
// Sub friends Df
Dataset<Row> subFriendDf = friendDf.filter(functions.col("src").mod(MOD).equalTo(subIdx));
// Get friend of friend using friend service
// Create local graph
JavaPairRDD<Integer, Graph<Integer, DefaultEdge>> friendGraphRdd = subFriendDf.toJavaRDD().
mapToPair(row -> {
Integer src = row.getInt(0);
List<Integer> friend = row.getList(1);
return new Tuple2<>(src, friend);
})
.mapValues(x -> {
// Get friend of friend
List<Tuple2<Integer, List<Integer>>> shareFriend = new ArrayList<>();
IntOpenHashSet friend = new IntOpenHashSet(x);
Map<Integer, Set<Integer>> friendOfFriends = JavaFriendService.INSTANCE.getMultiFriend(friend);
// Create graph
Graph<Integer, DefaultEdge> graph = new DefaultUndirectedGraph<>(DefaultEdge.class);
for (int fid : friendOfFriends.keySet()) {
Set<Integer> fFriend = friendOfFriends.get(fid);
fFriend.retainAll(friend);
if (!graph.containsVertex(fid)) graph.addVertex(fid);
// Avoid pitfall
fFriend.remove(fid);
// add edges
for (Integer fr : fFriend) {
if (!graph.containsVertex(fr)) graph.addVertex(fr);
graph.addEdge(fid, fr);
}
}
return graph;
});
return friendGraphRdd;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment