[Dump Subset] change code to read from db

This commit is contained in:
Miriam Baglioni 2022-11-25 17:52:46 +01:00
parent 67d48763fa
commit f26378f426
3 changed files with 196 additions and 186 deletions

View File

@ -1,20 +1,6 @@
package eu.dnetlib.dhp.oa.graph.dump.subset;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mongodb.DBCursor;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.DbClient;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.BufferedWriter;
import java.io.Closeable;
import java.io.IOException;
@ -24,7 +10,23 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.function.Function;
public class ReadMasterDuplicateFromDB {
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mongodb.DBCursor;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.DbClient;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
public class ReadMasterDuplicateFromDB {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@ -58,9 +60,9 @@ public class ReadMasterDuplicateFromDB {
}
private static void execute(String dbUrl, String dbUser, String dbPassword, FSDataOutputStream fos) {
try(DbClient dbClient = new DbClient(dbUrl, dbUser, dbPassword)){
try(BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fos, StandardCharsets.UTF_8))){
dbClient.processResults(QUERY, rs -> writeMap(datasourceMasterMap(rs), writer));
try (DbClient dbClient = new DbClient(dbUrl, dbUser, dbPassword)) {
try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fos, StandardCharsets.UTF_8))) {
dbClient.processResults(QUERY, rs -> writeMap(datasourceMasterMap(rs), writer));
}
} catch (IOException e) {
@ -68,7 +70,6 @@ public class ReadMasterDuplicateFromDB {
}
}
public static MasterDuplicate datasourceMasterMap(ResultSet rs) {
try {
MasterDuplicate dm = new MasterDuplicate();
@ -84,7 +85,6 @@ public class ReadMasterDuplicateFromDB {
}
}
protected static void writeMap(final MasterDuplicate dm, BufferedWriter writer) {
try {
writer.write(OBJECT_MAPPER.writeValueAsString(dm));

View File

@ -80,7 +80,8 @@ public class SparkSelectSubset implements Serializable {
}
private static void selectSubset(SparkSession spark, String inputPath, String outputPath, Set<String> removeSet, String masterDuplicatePath) {
private static void selectSubset(SparkSession spark, String inputPath, String outputPath, Set<String> removeSet,
String masterDuplicatePath) {
Dataset<Relation> relation = Utils
.readPath(spark, inputPath + "/relation", Relation.class)
.filter(
@ -202,12 +203,13 @@ public class SparkSelectSubset implements Serializable {
(FlatMapFunction<Publication, String>) p -> {
List<String> ret = new ArrayList<>();
p.getInstance().stream().forEach(i -> {
if (Optional.ofNullable(i.getHostedby()).isPresent() && Optional.ofNullable(i.getHostedby().getKey()).isPresent())
if (Optional.ofNullable(i.getHostedby()).isPresent()
&& Optional.ofNullable(i.getHostedby().getKey()).isPresent())
ret.add(i.getHostedby().getKey());
});
if (Optional.ofNullable(p.getCollectedfrom()).isPresent()){
if (Optional.ofNullable(p.getCollectedfrom()).isPresent()) {
p.getCollectedfrom().stream().forEach(cf -> {
if(Optional.ofNullable(cf.getKey()).isPresent())
if (Optional.ofNullable(cf.getKey()).isPresent())
ret.add(cf.getKey());
});
}
@ -216,69 +218,73 @@ public class SparkSelectSubset implements Serializable {
.union(
Utils
.readPath(spark, outputPath + "/original/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class)
.flatMap(
(FlatMapFunction<eu.dnetlib.dhp.schema.oaf.Dataset, String>) p -> {
List<String> ret = new ArrayList<>();
p.getInstance().stream().forEach(i -> {
if (Optional.ofNullable(i.getHostedby()).isPresent() && Optional.ofNullable(i.getHostedby().getKey()).isPresent())
ret.add(i.getHostedby().getKey());
});
if (Optional.ofNullable(p.getCollectedfrom()).isPresent() ){
p.getCollectedfrom().stream().forEach(cf -> {
if(Optional.ofNullable(cf.getKey()).isPresent())
ret.add(cf.getKey());
});
}
return ret.iterator();
}, Encoders.STRING()))
.flatMap(
(FlatMapFunction<eu.dnetlib.dhp.schema.oaf.Dataset, String>) p -> {
List<String> ret = new ArrayList<>();
p.getInstance().stream().forEach(i -> {
if (Optional.ofNullable(i.getHostedby()).isPresent()
&& Optional.ofNullable(i.getHostedby().getKey()).isPresent())
ret.add(i.getHostedby().getKey());
});
if (Optional.ofNullable(p.getCollectedfrom()).isPresent()) {
p.getCollectedfrom().stream().forEach(cf -> {
if (Optional.ofNullable(cf.getKey()).isPresent())
ret.add(cf.getKey());
});
}
return ret.iterator();
}, Encoders.STRING()))
.union(
Utils
.readPath(spark, outputPath + "/original/software", Software.class)
.flatMap(
(FlatMapFunction<Software, String>) p -> {
List<String> ret = new ArrayList<>();
p.getInstance().stream().forEach(i -> {
if (Optional.ofNullable(i.getHostedby()).isPresent() && Optional.ofNullable(i.getHostedby().getKey()).isPresent())
ret.add(i.getHostedby().getKey());
});
if (Optional.ofNullable(p.getCollectedfrom()).isPresent()){
p.getCollectedfrom().stream().forEach(cf -> {
if(Optional.ofNullable(cf.getKey()).isPresent())
ret.add(cf.getKey());
});
}
return ret.iterator();
}, Encoders.STRING()))
.flatMap(
(FlatMapFunction<Software, String>) p -> {
List<String> ret = new ArrayList<>();
p.getInstance().stream().forEach(i -> {
if (Optional.ofNullable(i.getHostedby()).isPresent()
&& Optional.ofNullable(i.getHostedby().getKey()).isPresent())
ret.add(i.getHostedby().getKey());
});
if (Optional.ofNullable(p.getCollectedfrom()).isPresent()) {
p.getCollectedfrom().stream().forEach(cf -> {
if (Optional.ofNullable(cf.getKey()).isPresent())
ret.add(cf.getKey());
});
}
return ret.iterator();
}, Encoders.STRING()))
.union(
Utils
.readPath(spark, outputPath + "/original/otherresearchproduct", OtherResearchProduct.class)
.flatMap(
(FlatMapFunction<OtherResearchProduct, String>) p -> {
List<String> ret = new ArrayList<>();
p.getInstance().stream().forEach(i -> {
if (Optional.ofNullable(i.getHostedby()).isPresent() && Optional.ofNullable(i.getHostedby().getKey()).isPresent())
ret.add(i.getHostedby().getKey());
});
if (Optional.ofNullable(p.getCollectedfrom()).isPresent()){
p.getCollectedfrom().stream().forEach(cf -> {
if(Optional.ofNullable(cf.getKey()).isPresent())
ret.add(cf.getKey());
});
}
return ret.iterator();
}, Encoders.STRING()))
.flatMap(
(FlatMapFunction<OtherResearchProduct, String>) p -> {
List<String> ret = new ArrayList<>();
p.getInstance().stream().forEach(i -> {
if (Optional.ofNullable(i.getHostedby()).isPresent()
&& Optional.ofNullable(i.getHostedby().getKey()).isPresent())
ret.add(i.getHostedby().getKey());
});
if (Optional.ofNullable(p.getCollectedfrom()).isPresent()) {
p.getCollectedfrom().stream().forEach(cf -> {
if (Optional.ofNullable(cf.getKey()).isPresent())
ret.add(cf.getKey());
});
}
return ret.iterator();
}, Encoders.STRING()))
.filter((FilterFunction<String>) s -> !s.equals(ModelConstants.UNKNOWN_REPOSITORY.getKey()))
.distinct();
Dataset<MasterDuplicate> masterDuplicate = Utils.readPath(spark, masterDuplicatePath, MasterDuplicate.class);
Dataset<String> cfhb = cfhb_orig.joinWith(masterDuplicate, masterDuplicate.col("duplicate").equalTo(cfhb_orig.col("value")), "left")
.map((MapFunction<Tuple2<String, MasterDuplicate>, String>) t2 -> {
if (!Optional.ofNullable(t2._2()).isPresent()) {
return t2._1();
}
return t2._2().getMaster();
}, Encoders.STRING());
Dataset<String> cfhb = cfhb_orig
.joinWith(masterDuplicate, masterDuplicate.col("duplicate").equalTo(cfhb_orig.col("value")), "left")
.map((MapFunction<Tuple2<String, MasterDuplicate>, String>) t2 -> {
if (!Optional.ofNullable(t2._2()).isPresent()) {
return t2._1();
}
return t2._2().getMaster();
}, Encoders.STRING());
datasource
.joinWith(cfhb, datasource.col("id").equalTo(cfhb.col("value")))
@ -291,7 +297,9 @@ public class SparkSelectSubset implements Serializable {
cfhb.foreach((ForeachFunction<String>) s -> System.out.println("cf " + s));
datasource.foreach((ForeachFunction<Datasource>) ds -> System.out.println("ds " + ds.getId()));
Utils.readPath(spark, outputPath + "/original/datasource", Datasource.class).foreach((ForeachFunction<Datasource>) ds -> System.out.println("dsID: " + ds.getId()));
Utils
.readPath(spark, outputPath + "/original/datasource", Datasource.class)
.foreach((ForeachFunction<Datasource>) ds -> System.out.println("dsID: " + ds.getId()));
Dataset<Project> project = Utils
.readPath(spark, inputPath + "/project", Project.class)

View File

@ -109,7 +109,7 @@ public class DumpSubsetTest {
"\"contributor\" : \"$['contributor'][*]['value']\", \"description\" : \"$['description'][*]['value']\", "
+
"\"dateofacceptance\" : \"$['dateofacceptance']['value']\", " +
"\"context\": \"['context'][*]['id']\"}";
"\"context\": \"['context'][*]['id']\"}";
final String constraint = "{\"criteria\":[{\"constraint\":[{\"verb\":\"lesser_than\",\"field\":\"dateofacceptance\",\"value\":\"2023-01-01\"},{\"verb\":\"greater_than\",\"field\":\"dateofacceptance\",\"value\":\"2006-12-31\"}]}]}";
final String sourcePath = getClass()
@ -153,51 +153,53 @@ public class DumpSubsetTest {
@Test // Step 1
void testSelectionConstraintsCommunity() throws Exception {
final String pathMap = "{\"author\" : \"$['author'][*]['fullname']\", " +
"\"title\" : \"$['title'][*]['value']\", \"orcid\" : \"$['author'][*]['pid'][*][?(@['key']=='ORCID')]['value']\", "
+
"\"contributor\" : \"$['contributor'][*]['value']\", \"description\" : \"$['description'][*]['value']\", "
+
"\"dateofacceptance\" : \"$['dateofacceptance']['value']\", " +
"\"context\": \"$['context'][*]['id']\"}";
"\"title\" : \"$['title'][*]['value']\", \"orcid\" : \"$['author'][*]['pid'][*][?(@['key']=='ORCID')]['value']\", "
+
"\"contributor\" : \"$['contributor'][*]['value']\", \"description\" : \"$['description'][*]['value']\", "
+
"\"dateofacceptance\" : \"$['dateofacceptance']['value']\", " +
"\"context\": \"$['context'][*]['id']\"}";
final String constraint = "{\"criteria\":[{\"constraint\":[{\"verb\":\"equals\",\"field\":\"context\",\"value\":\"dh-ch\"}]}]}";
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/input/publication")
.getPath();
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/input/publication")
.getPath();
final String communityMapPath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/communityMapPath/communitymap.json")
.getPath();
.getResource("/eu/dnetlib/dhp/oa/graph/dump/communityMapPath/communitymap.json")
.getPath();
SparkDumpResult
.main(
new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
"-outputPath", workingDir.toString(),
"-communityMapPath", communityMapPath,
"-pathMap", pathMap,
"-selectionCriteria", constraint,
"-resultType", "publication"
.main(
new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
"-outputPath", workingDir.toString(),
"-communityMapPath", communityMapPath,
"-pathMap", pathMap,
"-selectionCriteria", constraint,
"-resultType", "publication"
});
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<GraphResult> tmp = sc
.textFile(workingDir.toString() + "/dump/publication")
.map(item -> OBJECT_MAPPER.readValue(item, GraphResult.class));
.textFile(workingDir.toString() + "/dump/publication")
.map(item -> OBJECT_MAPPER.readValue(item, GraphResult.class));
Assertions.assertEquals(17, tmp.count());
JavaRDD<Publication> tmp_pubs = sc
.textFile(workingDir.toString() + "/original/publication")
.map(item -> OBJECT_MAPPER.readValue(item, Publication.class));
.textFile(workingDir.toString() + "/original/publication")
.map(item -> OBJECT_MAPPER.readValue(item, Publication.class));
Assertions.assertEquals(17, tmp_pubs.count());
Assertions.assertEquals(17, tmp_pubs.filter(p -> p.getContext().stream().anyMatch(c -> c.getId().equals("dh-ch"))).count());
Assertions
.assertEquals(
17, tmp_pubs.filter(p -> p.getContext().stream().anyMatch(c -> c.getId().equals("dh-ch"))).count());
}
@ -265,7 +267,7 @@ public class DumpSubsetTest {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-outputPath", workingDir.toString(),
"-masterDuplicatePath", getClass()
"-masterDuplicatePath", getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/masterDuplicate/empty")
.getPath()
@ -329,123 +331,123 @@ public class DumpSubsetTest {
void testSelectSubsetMaster() throws Exception {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/input/")
.getPath();
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/input/")
.getPath();
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
sc
.textFile(
getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/original/publication")
.getPath())
.saveAsTextFile(workingDir.toString() + "/original/publication");
.textFile(
getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/original/publication")
.getPath())
.saveAsTextFile(workingDir.toString() + "/original/publication");
sc
.textFile(
getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/original/software")
.getPath())
.saveAsTextFile(workingDir.toString() + "/original/software");
.textFile(
getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/original/software")
.getPath())
.saveAsTextFile(workingDir.toString() + "/original/software");
sc
.textFile(
getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/original/dataset")
.getPath())
.saveAsTextFile(workingDir.toString() + "/original/dataset");
.textFile(
getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/original/dataset")
.getPath())
.saveAsTextFile(workingDir.toString() + "/original/dataset");
sc
.textFile(
getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/original/otherresearchproduct")
.getPath())
.saveAsTextFile(workingDir.toString() + "/original/otherresearchproduct");
.textFile(
getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/original/otherresearchproduct")
.getPath())
.saveAsTextFile(workingDir.toString() + "/original/otherresearchproduct");
sc
.textFile(
getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/dump/publication")
.getPath())
.saveAsTextFile(workingDir.toString() + "/dump/publication");
.textFile(
getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/dump/publication")
.getPath())
.saveAsTextFile(workingDir.toString() + "/dump/publication");
sc
.textFile(
getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/dump/software")
.getPath())
.saveAsTextFile(workingDir.toString() + "/dump/software");
.textFile(
getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/dump/software")
.getPath())
.saveAsTextFile(workingDir.toString() + "/dump/software");
sc
.textFile(
getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/dump/dataset")
.getPath())
.saveAsTextFile(workingDir.toString() + "/dump/dataset");
.textFile(
getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/dump/dataset")
.getPath())
.saveAsTextFile(workingDir.toString() + "/dump/dataset");
sc
.textFile(
getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/dump/otherresearchproduct")
.getPath())
.saveAsTextFile(workingDir.toString() + "/dump/otherresearchproduct");
.textFile(
getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/dump/otherresearchproduct")
.getPath())
.saveAsTextFile(workingDir.toString() + "/dump/otherresearchproduct");
SparkSelectSubset
.main(
new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-outputPath", workingDir.toString(),
"-masterDuplicatePath", getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/masterDuplicate/correspondence")
.getPath()
.main(
new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-outputPath", workingDir.toString(),
"-masterDuplicatePath", getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/masterDuplicate/correspondence")
.getPath()
});
});
JavaRDD<Relation> tmp = sc
.textFile(workingDir.toString() + "/original/relation")
.map(item -> OBJECT_MAPPER.readValue(item, Relation.class));
.textFile(workingDir.toString() + "/original/relation")
.map(item -> OBJECT_MAPPER.readValue(item, Relation.class));
Assertions.assertEquals(20, tmp.count());
Assertions
.assertEquals(
6, tmp.filter(r -> r.getSource().startsWith("50|") && r.getTarget().startsWith("50|")).count());
.assertEquals(
6, tmp.filter(r -> r.getSource().startsWith("50|") && r.getTarget().startsWith("50|")).count());
Assertions
.assertEquals(
3, tmp.filter(r -> r.getSource().startsWith("50|") && r.getTarget().startsWith("20|")).count());
.assertEquals(
3, tmp.filter(r -> r.getSource().startsWith("50|") && r.getTarget().startsWith("20|")).count());
Assertions
.assertEquals(
3, tmp.filter(r -> r.getSource().startsWith("20|") && r.getTarget().startsWith("50|")).count());
.assertEquals(
3, tmp.filter(r -> r.getSource().startsWith("20|") && r.getTarget().startsWith("50|")).count());
Assertions
.assertEquals(
4, tmp.filter(r -> r.getSource().startsWith("40|") && r.getTarget().startsWith("50|")).count());
.assertEquals(
4, tmp.filter(r -> r.getSource().startsWith("40|") && r.getTarget().startsWith("50|")).count());
Assertions
.assertEquals(
1, tmp.filter(r -> r.getSource().startsWith("10|") && r.getTarget().startsWith("20|")).count());
.assertEquals(
1, tmp.filter(r -> r.getSource().startsWith("10|") && r.getTarget().startsWith("20|")).count());
Assertions
.assertEquals(
1, tmp.filter(r -> r.getSource().startsWith("20|") && r.getTarget().startsWith("10|")).count());
.assertEquals(
1, tmp.filter(r -> r.getSource().startsWith("20|") && r.getTarget().startsWith("10|")).count());
Assertions
.assertEquals(
1, tmp.filter(r -> r.getSource().startsWith("20|") && r.getTarget().startsWith("40|")).count());
.assertEquals(
1, tmp.filter(r -> r.getSource().startsWith("20|") && r.getTarget().startsWith("40|")).count());
Assertions
.assertEquals(
1, tmp.filter(r -> r.getSource().startsWith("40|") && r.getTarget().startsWith("20|")).count());
.assertEquals(
1, tmp.filter(r -> r.getSource().startsWith("40|") && r.getTarget().startsWith("20|")).count());
JavaRDD<eu.dnetlib.dhp.schema.oaf.Datasource> tmp_datasource = sc
.textFile(workingDir.toString() + "/original/datasource")
.map(item -> OBJECT_MAPPER.readValue(item, Datasource.class));
.textFile(workingDir.toString() + "/original/datasource")
.map(item -> OBJECT_MAPPER.readValue(item, Datasource.class));
Assertions.assertEquals(5, tmp_datasource.count());
Assertions
.assertEquals(
1,
tmp_datasource
.filter(d -> d.getId().equals("10|fairsharing_::cd0f74b5955dc87fd0605745c4b49ee8"))
.count());
.assertEquals(
1,
tmp_datasource
.filter(d -> d.getId().equals("10|fairsharing_::cd0f74b5955dc87fd0605745c4b49ee8"))
.count());
JavaRDD<Organization> tmp_organization = sc
.textFile(workingDir.toString() + "/original/organization")
.map(item -> OBJECT_MAPPER.readValue(item, Organization.class));
.textFile(workingDir.toString() + "/original/organization")
.map(item -> OBJECT_MAPPER.readValue(item, Organization.class));
Assertions.assertEquals(3, tmp_organization.count());
JavaRDD<Project> tmp_project = sc
.textFile(workingDir.toString() + "/original/project")
.map(item -> OBJECT_MAPPER.readValue(item, Project.class));
.textFile(workingDir.toString() + "/original/project")
.map(item -> OBJECT_MAPPER.readValue(item, Project.class));
Assertions.assertEquals(3, tmp_project.count());
}