implementation of the job to collect simrels from postgres db

This commit is contained in:
miconis 2020-09-22 09:43:27 +02:00
parent d47352cbc7
commit 259362ef47
13 changed files with 332 additions and 52 deletions

View File

@ -0,0 +1,167 @@
package eu.dnetlib.dhp.oa.dedup;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
public class SparkCollectSimRels extends AbstractSparkAction {
private static final Logger log = LoggerFactory.getLogger(SparkCollectSimRels.class);
Dataset<Row> simGroupsDS;
Dataset<Row> groupsDS;
public SparkCollectSimRels(ArgumentApplicationParser parser, SparkSession spark, Dataset<Row> simGroupsDS, Dataset<Row> groupsDS) {
super(parser, spark);
this.simGroupsDS = simGroupsDS;
this.groupsDS = groupsDS;
}
public static void main(String[] args) throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkBlockStats.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/collectSimRels_parameters.json")));
parser.parseArgument(args);
SparkConf conf = new SparkConf();
final String dbUrl = parser.get("postgresUrl");
final String dbUser = parser.get("postgresUser");
final String dbPassword = parser.get("postgresPassword");
SparkSession spark = getSparkSession(conf);
DataFrameReader readOptions = spark.read()
.format("jdbc")
.option("url", dbUrl)
.option("user", dbUser)
.option("password", dbPassword);
new SparkCollectSimRels(
parser,
spark,
readOptions.option("dbtable", "similarity_groups").load(),
readOptions.option("dbtable", "groups").load()
).run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
}
@Override
void run(ISLookUpService isLookUpService) {
// read oozie parameters
final String isLookUpUrl = parser.get("isLookUpUrl");
final String actionSetId = parser.get("actionSetId");
final String workingPath = parser.get("workingPath");
final int numPartitions = Optional
.ofNullable(parser.get("numPartitions"))
.map(Integer::valueOf)
.orElse(NUM_PARTITIONS);
final String dbUrl = parser.get("postgresUrl");
final String dbUser = parser.get("postgresUser");
log.info("numPartitions: '{}'", numPartitions);
log.info("isLookUpUrl: '{}'", isLookUpUrl);
log.info("actionSetId: '{}'", actionSetId);
log.info("workingPath: '{}'", workingPath);
log.info("postgresUser: {}", dbUser);
log.info("postgresUrl: {}", dbUrl);
log.info("postgresPassword: xxx");
JavaPairRDD<String, List<String>> similarityGroup =
simGroupsDS
.toJavaRDD()
.mapToPair(r -> new Tuple2<>(r.getString(0), r.getString(1)))
.groupByKey()
.mapToPair(i -> new Tuple2<>(i._1(), StreamSupport.stream(i._2().spliterator(), false)
.collect(Collectors.toList())));
JavaPairRDD<String, String> groupIds =
groupsDS
.toJavaRDD()
.mapToPair(r -> new Tuple2<>(r.getString(0), r.getString(1)));
JavaRDD<Tuple2<Tuple2<String, String>, List<String>>> groups = similarityGroup
.leftOuterJoin(groupIds)
.filter(g -> g._2()._2().isPresent())
.map(g -> new Tuple2<>(new Tuple2<>(g._1(), g._2()._2().get()), g._2()._1()));
JavaRDD<Relation> relations = groups.flatMap(g -> {
String firstId = g._2().get(0);
List<Relation> rels = new ArrayList<>();
for (String id : g._2()) {
if (!firstId.equals(id))
rels.add(createSimRel(firstId, id, g._1()._2()));
}
return rels.iterator();
});
Dataset<Relation> resultRelations = spark.createDataset(
relations.filter(r -> r.getRelType().equals("resultResult")).rdd(),
Encoders.bean(Relation.class)
).repartition(numPartitions);
Dataset<Relation> organizationRelations = spark.createDataset(
relations.filter(r -> r.getRelType().equals("organizationOrganization")).rdd(),
Encoders.bean(Relation.class)
).repartition(numPartitions);
savePostgresRelation(organizationRelations, workingPath, actionSetId, "organization");
savePostgresRelation(resultRelations, workingPath, actionSetId, "publication");
savePostgresRelation(resultRelations, workingPath, actionSetId, "software");
savePostgresRelation(resultRelations, workingPath, actionSetId, "otherresearchproduct");
savePostgresRelation(resultRelations, workingPath, actionSetId, "dataset");
}
private Relation createSimRel(String source, String target, String entity) {
final Relation r = new Relation();
r.setSubRelType("dedupSimilarity");
r.setRelClass("isSimilarTo");
r.setDataInfo(new DataInfo());
switch (entity) {
case "result":
r.setSource("50|" + source);
r.setTarget("50|" + target);
r.setRelType("resultResult");
break;
case "organization":
r.setSource("20|" + source);
r.setTarget("20|" + target);
r.setRelType("organizationOrganization");
break;
default:
throw new IllegalArgumentException("unmanaged entity type: " + entity);
}
return r;
}
private void savePostgresRelation(Dataset<Relation> newRelations, String workingPath, String actionSetId, String entityType) {
newRelations
.write()
.mode(SaveMode.Append)
.parquet(DedupUtility.createSimRelPath(workingPath, actionSetId, entityType));
}
}

View File

@ -104,15 +104,13 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
.map(s -> MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), s))
.mapToPair((PairFunction<String, Object, String>) s -> new Tuple2<>(hash(s), s));
final RDD<Edge<String>> edgeRdd = spark
.read()
.textFile(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity))
.map(
(MapFunction<String, Relation>) r -> OBJECT_MAPPER.readValue(r, Relation.class),
Encoders.bean(Relation.class))
.javaRDD()
.map(it -> new Edge<>(hash(it.getSource()), hash(it.getTarget()), it.getRelClass()))
.rdd();
final RDD<Edge<String>> edgeRdd = spark
.read()
.load(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity))
.as(Encoders.bean(Relation.class))
.javaRDD()
.map(it -> new Edge<>(hash(it.getSource()), hash(it.getTarget()), it.getRelClass()))
.rdd();
final Dataset<Relation> mergeRels = spark
.createDataset(

View File

@ -100,12 +100,17 @@ public class SparkCreateSimRels extends AbstractSparkAction {
.repartition(numPartitions);
// create relations by comparing only elements in the same group
Deduper
.computeRelations(sc, blocks, dedupConf)
.map(t -> createSimRel(t._1(), t._2(), entity))
.repartition(numPartitions)
.map(r -> OBJECT_MAPPER.writeValueAsString(r))
.saveAsTextFile(outputPath);
spark.createDataset(
Deduper
.computeRelations(sc, blocks, dedupConf)
.map(t -> createSimRel(t._1(), t._2(), entity))
.repartition(numPartitions)
.rdd(),
Encoders.bean(Relation.class)
)
.write()
.mode(SaveMode.Append)
.parquet(outputPath);
}
}

View File

@ -0,0 +1,44 @@
[
{
"paramName": "la",
"paramLongName": "isLookUpUrl",
"paramDescription": "address for the LookUp",
"paramRequired": true
},
{
"paramName": "asi",
"paramLongName": "actionSetId",
"paramDescription": "action set identifier (name of the orchestrator)",
"paramRequired": true
},
{
"paramName": "w",
"paramLongName": "workingPath",
"paramDescription": "path of the working directory",
"paramRequired": true
},
{
"paramName": "np",
"paramLongName": "numPartitions",
"paramDescription": "number of partitions for the similarity relations intermediate phases",
"paramRequired": false
},
{
"paramName": "purl",
"paramLongName": "postgresUrl",
"paramDescription": "the url of the postgres server",
"paramRequired": true
},
{
"paramName": "pusr",
"paramLongName": "postgresUser",
"paramDescription": "the owner of the postgres database",
"paramRequired": true
},
{
"paramName": "ppwd",
"paramLongName": "postgresPassword",
"paramDescription": "the password for the postgres user",
"paramRequired": true
}
]

View File

@ -1,4 +1,3 @@
package eu.dnetlib.dhp.oa.dedup;
import com.fasterxml.jackson.databind.ObjectMapper;
@ -16,10 +15,7 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.*;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
@ -52,6 +48,7 @@ public class SparkDedupTest implements Serializable {
private static String testOutputBasePath;
private static String testDedupGraphBasePath;
private static final String testActionSetId = "test-orchestrator";
private static String testDedupAssertionsBasePath;
@BeforeAll
public static void cleanUp() throws IOException, URISyntaxException {
@ -66,6 +63,10 @@ public class SparkDedupTest implements Serializable {
testDedupGraphBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-")
.toAbsolutePath()
.toString();
testDedupAssertionsBasePath = Paths
.get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/assertions").toURI())
.toFile()
.getAbsolutePath();
FileUtils.deleteDirectory(new File(testOutputBasePath));
FileUtils.deleteDirectory(new File(testDedupGraphBasePath));
@ -80,7 +81,8 @@ public class SparkDedupTest implements Serializable {
.getOrCreate();
jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
}
}
@BeforeEach
public void setUp() throws IOException, ISLookUpException {
@ -150,6 +152,7 @@ public class SparkDedupTest implements Serializable {
SparkCreateSimRels.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json")));
parser
.parseArgument(
new String[] {
@ -162,30 +165,30 @@ public class SparkDedupTest implements Serializable {
new SparkCreateSimRels(parser, spark).run(isLookUpService);
long orgs_simrel = spark
.read()
.textFile(testOutputBasePath + "/" + testActionSetId + "/organization_simrel")
.count();
long orgs_simrel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/organization_simrel")
.count();
long pubs_simrel = spark
.read()
.textFile(testOutputBasePath + "/" + testActionSetId + "/publication_simrel")
.count();
long pubs_simrel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/publication_simrel")
.count();
long sw_simrel = spark
.read()
.textFile(testOutputBasePath + "/" + testActionSetId + "/software_simrel")
.count();
long sw_simrel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/software_simrel")
.count();
long ds_simrel = spark
.read()
.textFile(testOutputBasePath + "/" + testActionSetId + "/dataset_simrel")
.count();
long ds_simrel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/dataset_simrel")
.count();
long orp_simrel = spark
.read()
.textFile(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_simrel")
.count();
long orp_simrel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_simrel")
.count();
assertEquals(3432, orgs_simrel);
assertEquals(7152, pubs_simrel);
@ -194,8 +197,69 @@ public class SparkDedupTest implements Serializable {
assertEquals(6750, orp_simrel);
}
@Test
@Order(2)
public void collectSimRelsTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkCreateSimRels.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/collectSimRels_parameters.json")));
parser
.parseArgument(
new String[] {
"-asi", testActionSetId,
"-la", "lookupurl",
"-w", testOutputBasePath,
"-np", "50",
"-purl", "jdbc:postgresql://localhost:5432/dnet_dedup",
"-pusr", "postgres_url",
"-ppwd", ""
});
new SparkCollectSimRels(
parser,
spark,
spark.read().load(testDedupAssertionsBasePath + "/similarity_groups"),
spark.read().load(testDedupAssertionsBasePath + "/groups")
).run(null);
long orgs_simrel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/organization_simrel")
.count();
long pubs_simrel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/publication_simrel")
.count();
long sw_simrel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/software_simrel")
.count();
long ds_simrel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/dataset_simrel")
.count();
long orp_simrel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_simrel")
.count();
assertEquals(4022, orgs_simrel);
assertEquals(10575, pubs_simrel);
assertEquals(3767, sw_simrel);
assertEquals(3881, ds_simrel);
assertEquals(10173, orp_simrel);
}
@Test
@Order(2)
@Order(3)
public void cutMergeRelsTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -204,6 +268,7 @@ public class SparkDedupTest implements Serializable {
SparkCreateMergeRels.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json")));
parser
.parseArgument(
new String[] {
@ -290,7 +355,7 @@ public class SparkDedupTest implements Serializable {
}
@Test
@Order(3)
@Order(4)
public void createMergeRelsTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -299,6 +364,7 @@ public class SparkDedupTest implements Serializable {
SparkCreateMergeRels.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json")));
parser
.parseArgument(
new String[] {
@ -344,7 +410,7 @@ public class SparkDedupTest implements Serializable {
}
@Test
@Order(4)
@Order(5)
public void createDedupRecordTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -391,7 +457,7 @@ public class SparkDedupTest implements Serializable {
}
@Test
@Order(5)
@Order(6)
public void updateEntityTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -507,7 +573,7 @@ public class SparkDedupTest implements Serializable {
}
@Test
@Order(6)
@Order(7)
public void propagateRelationTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -557,7 +623,7 @@ public class SparkDedupTest implements Serializable {
}
@Test
@Order(7)
@Order(8)
public void testRelations() throws Exception {
testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_1.json", 12, 10);
testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_2.json", 10, 2);
@ -575,11 +641,11 @@ public class SparkDedupTest implements Serializable {
assertEquals(expected_unique, rel.distinct().count());
}
@AfterAll
public static void finalCleanUp() throws IOException {
FileUtils.deleteDirectory(new File(testOutputBasePath));
FileUtils.deleteDirectory(new File(testDedupGraphBasePath));
}
// @AfterAll
// public static void finalCleanUp() throws IOException {
// FileUtils.deleteDirectory(new File(testOutputBasePath));
// FileUtils.deleteDirectory(new File(testDedupGraphBasePath));
// }
public boolean isDeletedByInference(String s) {
return s.contains("\"deletedbyinference\":true");