forked from D-Net/dnet-hadoop
code formatting
This commit is contained in:
parent
46972f8393
commit
5816ded93f
|
@ -232,9 +232,9 @@ public class PropagationConstant {
|
||||||
|
|
||||||
if (HdfsSupport.exists(inputPath, spark.sparkContext().hadoopConfiguration())) {
|
if (HdfsSupport.exists(inputPath, spark.sparkContext().hadoopConfiguration())) {
|
||||||
return spark
|
return spark
|
||||||
.read()
|
.read()
|
||||||
.textFile(inputPath)
|
.textFile(inputPath)
|
||||||
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
|
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
|
||||||
} else {
|
} else {
|
||||||
return spark.emptyDataset(Encoders.bean(clazz));
|
return spark.emptyDataset(Encoders.bean(clazz));
|
||||||
}
|
}
|
||||||
|
|
|
@ -95,7 +95,6 @@ public class ResultTagger implements Serializable {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
tmp
|
tmp
|
||||||
.forEach(
|
.forEach(
|
||||||
dsId -> datasources
|
dsId -> datasources
|
||||||
|
|
|
@ -774,47 +774,47 @@ public class BulkTagJobTest {
|
||||||
void bulktagPublicationwithConstraintsTest() throws Exception {
|
void bulktagPublicationwithConstraintsTest() throws Exception {
|
||||||
|
|
||||||
final String sourcePath = getClass()
|
final String sourcePath = getClass()
|
||||||
.getResource(
|
.getResource(
|
||||||
"/eu/dnetlib/dhp/bulktag/sample/publication/orcidbulktagfordatasource")
|
"/eu/dnetlib/dhp/bulktag/sample/publication/orcidbulktagfordatasource")
|
||||||
.getPath();
|
.getPath();
|
||||||
SparkBulkTagJob
|
SparkBulkTagJob
|
||||||
.main(
|
.main(
|
||||||
new String[] {
|
new String[] {
|
||||||
"-isTest", Boolean.TRUE.toString(),
|
"-isTest", Boolean.TRUE.toString(),
|
||||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||||
"-sourcePath", sourcePath,
|
"-sourcePath", sourcePath,
|
||||||
"-taggingConf", IOUtils
|
"-taggingConf", IOUtils
|
||||||
.toString(
|
.toString(
|
||||||
BulkTagJobTest.class
|
BulkTagJobTest.class
|
||||||
.getResourceAsStream(
|
.getResourceAsStream(
|
||||||
"/eu/dnetlib/dhp/bulktag/communityconfiguration/tagging_conf_neanias.xml")),
|
"/eu/dnetlib/dhp/bulktag/communityconfiguration/tagging_conf_neanias.xml")),
|
||||||
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
|
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
|
||||||
"-outputPath", workingDir.toString() + "/publication",
|
"-outputPath", workingDir.toString() + "/publication",
|
||||||
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
|
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
|
||||||
"-pathMap", pathMap
|
"-pathMap", pathMap
|
||||||
});
|
});
|
||||||
|
|
||||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
JavaRDD<Publication> tmp = sc
|
JavaRDD<Publication> tmp = sc
|
||||||
.textFile(workingDir.toString() + "/publication")
|
.textFile(workingDir.toString() + "/publication")
|
||||||
.map(item -> OBJECT_MAPPER.readValue(item, Publication.class));
|
.map(item -> OBJECT_MAPPER.readValue(item, Publication.class));
|
||||||
|
|
||||||
Assertions.assertEquals(2, tmp.count());
|
Assertions.assertEquals(2, tmp.count());
|
||||||
org.apache.spark.sql.Dataset<Publication> verificationDataset = spark
|
org.apache.spark.sql.Dataset<Publication> verificationDataset = spark
|
||||||
.createDataset(tmp.rdd(), Encoders.bean(Publication.class));
|
.createDataset(tmp.rdd(), Encoders.bean(Publication.class));
|
||||||
|
|
||||||
verificationDataset.createOrReplaceTempView("dataset");
|
verificationDataset.createOrReplaceTempView("dataset");
|
||||||
String query = "select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name "
|
String query = "select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name "
|
||||||
+ "from dataset "
|
+ "from dataset "
|
||||||
+ "lateral view explode(context) c as MyT "
|
+ "lateral view explode(context) c as MyT "
|
||||||
+ "lateral view explode(MyT.datainfo) d as MyD "
|
+ "lateral view explode(MyT.datainfo) d as MyD "
|
||||||
+ "where MyD.inferenceprovenance = 'bulktagging'";
|
+ "where MyD.inferenceprovenance = 'bulktagging'";
|
||||||
|
|
||||||
org.apache.spark.sql.Dataset<Row> idExplodeCommunity = spark.sql(query);
|
org.apache.spark.sql.Dataset<Row> idExplodeCommunity = spark.sql(query);
|
||||||
|
|
||||||
idExplodeCommunity.show(false);
|
idExplodeCommunity.show(false);
|
||||||
Assertions.assertEquals(0, idExplodeCommunity.count());
|
Assertions.assertEquals(0, idExplodeCommunity.count());
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,173 +39,173 @@ import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
||||||
import eu.dnetlib.dhp.schema.oaf.utils.PidType;
|
import eu.dnetlib.dhp.schema.oaf.utils.PidType;
|
||||||
|
|
||||||
public class CleanCountrySparkJob implements Serializable {
|
public class CleanCountrySparkJob implements Serializable {
|
||||||
private static final Logger log = LoggerFactory.getLogger(CleanCountrySparkJob.class);
|
private static final Logger log = LoggerFactory.getLogger(CleanCountrySparkJob.class);
|
||||||
|
|
||||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
|
|
||||||
String jsonConfiguration = IOUtils
|
String jsonConfiguration = IOUtils
|
||||||
.toString(
|
.toString(
|
||||||
CleanCountrySparkJob.class
|
CleanCountrySparkJob.class
|
||||||
.getResourceAsStream(
|
.getResourceAsStream(
|
||||||
"/eu/dnetlib/dhp/oa/graph/input_clean_country_parameters.json"));
|
"/eu/dnetlib/dhp/oa/graph/input_clean_country_parameters.json"));
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||||
parser.parseArgument(args);
|
parser.parseArgument(args);
|
||||||
|
|
||||||
Boolean isSparkSessionManaged = Optional
|
Boolean isSparkSessionManaged = Optional
|
||||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||||
.map(Boolean::valueOf)
|
.map(Boolean::valueOf)
|
||||||
.orElse(Boolean.TRUE);
|
.orElse(Boolean.TRUE);
|
||||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||||
|
|
||||||
String inputPath = parser.get("inputPath");
|
String inputPath = parser.get("inputPath");
|
||||||
log.info("inputPath: {}", inputPath);
|
log.info("inputPath: {}", inputPath);
|
||||||
|
|
||||||
String workingDir = parser.get("workingDir");
|
String workingDir = parser.get("workingDir");
|
||||||
log.info("workingDir: {}", workingDir);
|
log.info("workingDir: {}", workingDir);
|
||||||
|
|
||||||
String datasourcePath = parser.get("hostedBy");
|
String datasourcePath = parser.get("hostedBy");
|
||||||
log.info("datasourcePath: {}", datasourcePath);
|
log.info("datasourcePath: {}", datasourcePath);
|
||||||
|
|
||||||
String country = parser.get("country");
|
String country = parser.get("country");
|
||||||
log.info("country: {}", country);
|
log.info("country: {}", country);
|
||||||
|
|
||||||
String[] verifyParam = parser.get("verifyParam").split(";");
|
String[] verifyParam = parser.get("verifyParam").split(";");
|
||||||
log.info("verifyParam: {}", verifyParam);
|
log.info("verifyParam: {}", verifyParam);
|
||||||
|
|
||||||
String collectedfrom = parser.get("collectedfrom");
|
String collectedfrom = parser.get("collectedfrom");
|
||||||
log.info("collectedfrom: {}", collectedfrom);
|
log.info("collectedfrom: {}", collectedfrom);
|
||||||
|
|
||||||
String graphTableClassName = parser.get("graphTableClassName");
|
String graphTableClassName = parser.get("graphTableClassName");
|
||||||
log.info("graphTableClassName: {}", graphTableClassName);
|
log.info("graphTableClassName: {}", graphTableClassName);
|
||||||
|
|
||||||
Class<? extends Result> entityClazz = (Class<? extends Result>) Class.forName(graphTableClassName);
|
Class<? extends Result> entityClazz = (Class<? extends Result>) Class.forName(graphTableClassName);
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
SparkConf conf = new SparkConf();
|
||||||
runWithSparkSession(
|
runWithSparkSession(
|
||||||
conf,
|
conf,
|
||||||
isSparkSessionManaged,
|
isSparkSessionManaged,
|
||||||
spark -> {
|
spark -> {
|
||||||
|
|
||||||
cleanCountry(
|
cleanCountry(
|
||||||
spark, country, verifyParam, inputPath, entityClazz, workingDir, collectedfrom, datasourcePath);
|
spark, country, verifyParam, inputPath, entityClazz, workingDir, collectedfrom, datasourcePath);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <T extends Result> void cleanCountry(SparkSession spark, String country, String[] verifyParam,
|
private static <T extends Result> void cleanCountry(SparkSession spark, String country, String[] verifyParam,
|
||||||
String inputPath, Class<T> entityClazz, String workingDir, String collectedfrom, String datasourcePath) {
|
String inputPath, Class<T> entityClazz, String workingDir, String collectedfrom, String datasourcePath) {
|
||||||
|
|
||||||
List<String> hostedBy = spark
|
List<String> hostedBy = spark
|
||||||
.read()
|
.read()
|
||||||
.textFile(datasourcePath)
|
.textFile(datasourcePath)
|
||||||
.collectAsList();
|
.collectAsList();
|
||||||
|
|
||||||
Dataset<T> res = spark
|
Dataset<T> res = spark
|
||||||
.read()
|
.read()
|
||||||
.textFile(inputPath)
|
.textFile(inputPath)
|
||||||
.map(
|
.map(
|
||||||
(MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, entityClazz),
|
(MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, entityClazz),
|
||||||
Encoders.bean(entityClazz));
|
Encoders.bean(entityClazz));
|
||||||
|
|
||||||
res.map((MapFunction<T, T>) r -> {
|
res.map((MapFunction<T, T>) r -> {
|
||||||
if (r.getInstance().stream().anyMatch(i -> hostedBy.contains(i.getHostedby().getKey())) ||
|
if (r.getInstance().stream().anyMatch(i -> hostedBy.contains(i.getHostedby().getKey())) ||
|
||||||
!r.getCollectedfrom().stream().anyMatch(cf -> cf.getValue().equals(collectedfrom))) {
|
!r.getCollectedfrom().stream().anyMatch(cf -> cf.getValue().equals(collectedfrom))) {
|
||||||
return r;
|
return r;
|
||||||
}
|
}
|
||||||
|
|
||||||
List<StructuredProperty> ids = getPidsAndAltIds(r).collect(Collectors.toList());
|
List<StructuredProperty> ids = getPidsAndAltIds(r).collect(Collectors.toList());
|
||||||
if (ids
|
if (ids
|
||||||
.stream()
|
.stream()
|
||||||
.anyMatch(
|
.anyMatch(
|
||||||
p -> p
|
p -> p
|
||||||
.getQualifier()
|
.getQualifier()
|
||||||
.getClassid()
|
.getClassid()
|
||||||
.equals(PidType.doi.toString()) && pidInParam(p.getValue(), verifyParam))) {
|
.equals(PidType.doi.toString()) && pidInParam(p.getValue(), verifyParam))) {
|
||||||
r
|
r
|
||||||
.setCountry(
|
.setCountry(
|
||||||
r
|
r
|
||||||
.getCountry()
|
.getCountry()
|
||||||
.stream()
|
.stream()
|
||||||
.filter(
|
.filter(
|
||||||
c -> toTakeCountry(c, country))
|
c -> toTakeCountry(c, country))
|
||||||
.collect(Collectors.toList()));
|
.collect(Collectors.toList()));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return r;
|
return r;
|
||||||
}, Encoders.bean(entityClazz))
|
}, Encoders.bean(entityClazz))
|
||||||
.write()
|
.write()
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.option("compression", "gzip")
|
.option("compression", "gzip")
|
||||||
.json(workingDir);
|
.json(workingDir);
|
||||||
|
|
||||||
spark
|
spark
|
||||||
.read()
|
.read()
|
||||||
.textFile(workingDir)
|
.textFile(workingDir)
|
||||||
.map(
|
.map(
|
||||||
(MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, entityClazz),
|
(MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, entityClazz),
|
||||||
Encoders.bean(entityClazz))
|
Encoders.bean(entityClazz))
|
||||||
.write()
|
.write()
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.option("compression", "gzip")
|
.option("compression", "gzip")
|
||||||
.json(inputPath);
|
.json(inputPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <T extends Result> Stream<StructuredProperty> getPidsAndAltIds(T r) {
|
private static <T extends Result> Stream<StructuredProperty> getPidsAndAltIds(T r) {
|
||||||
final Stream<StructuredProperty> resultPids = Optional
|
final Stream<StructuredProperty> resultPids = Optional
|
||||||
.ofNullable(r.getPid())
|
.ofNullable(r.getPid())
|
||||||
.map(Collection::stream)
|
.map(Collection::stream)
|
||||||
.orElse(Stream.empty());
|
.orElse(Stream.empty());
|
||||||
|
|
||||||
final Stream<StructuredProperty> instancePids = Optional
|
final Stream<StructuredProperty> instancePids = Optional
|
||||||
.ofNullable(r.getInstance())
|
.ofNullable(r.getInstance())
|
||||||
.map(
|
.map(
|
||||||
instance -> instance
|
instance -> instance
|
||||||
.stream()
|
.stream()
|
||||||
.flatMap(
|
.flatMap(
|
||||||
i -> Optional
|
i -> Optional
|
||||||
.ofNullable(i.getPid())
|
.ofNullable(i.getPid())
|
||||||
.map(Collection::stream)
|
.map(Collection::stream)
|
||||||
.orElse(Stream.empty())))
|
.orElse(Stream.empty())))
|
||||||
.orElse(Stream.empty());
|
.orElse(Stream.empty());
|
||||||
|
|
||||||
final Stream<StructuredProperty> instanceAltIds = Optional
|
final Stream<StructuredProperty> instanceAltIds = Optional
|
||||||
.ofNullable(r.getInstance())
|
.ofNullable(r.getInstance())
|
||||||
.map(
|
.map(
|
||||||
instance -> instance
|
instance -> instance
|
||||||
.stream()
|
.stream()
|
||||||
.flatMap(
|
.flatMap(
|
||||||
i -> Optional
|
i -> Optional
|
||||||
.ofNullable(i.getAlternateIdentifier())
|
.ofNullable(i.getAlternateIdentifier())
|
||||||
.map(Collection::stream)
|
.map(Collection::stream)
|
||||||
.orElse(Stream.empty())))
|
.orElse(Stream.empty())))
|
||||||
.orElse(Stream.empty());
|
.orElse(Stream.empty());
|
||||||
|
|
||||||
return Stream
|
return Stream
|
||||||
.concat(
|
.concat(
|
||||||
Stream.concat(resultPids, instancePids),
|
Stream.concat(resultPids, instancePids),
|
||||||
instanceAltIds);
|
instanceAltIds);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static boolean pidInParam(String value, String[] verifyParam) {
|
private static boolean pidInParam(String value, String[] verifyParam) {
|
||||||
for (String s : verifyParam)
|
for (String s : verifyParam)
|
||||||
if (value.startsWith(s))
|
if (value.startsWith(s))
|
||||||
return true;
|
return true;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static boolean toTakeCountry(Country c, String country) {
|
private static boolean toTakeCountry(Country c, String country) {
|
||||||
// If dataInfo is not set, or dataInfo.inferenceprovenance is not set or not present then it cannot be
|
// If dataInfo is not set, or dataInfo.inferenceprovenance is not set or not present then it cannot be
|
||||||
// inserted via propagation
|
// inserted via propagation
|
||||||
if (!Optional.ofNullable(c.getDataInfo()).isPresent())
|
if (!Optional.ofNullable(c.getDataInfo()).isPresent())
|
||||||
return true;
|
return true;
|
||||||
if (!Optional.ofNullable(c.getDataInfo().getInferenceprovenance()).isPresent())
|
if (!Optional.ofNullable(c.getDataInfo().getInferenceprovenance()).isPresent())
|
||||||
return true;
|
return true;
|
||||||
return !(c
|
return !(c
|
||||||
.getClassid()
|
.getClassid()
|
||||||
.equalsIgnoreCase(country) &&
|
.equalsIgnoreCase(country) &&
|
||||||
c.getDataInfo().getInferenceprovenance().equals("propagation"));
|
c.getDataInfo().getInferenceprovenance().equals("propagation"));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,76 +31,76 @@ import scala.Tuple2;
|
||||||
* @Date 22/07/22
|
* @Date 22/07/22
|
||||||
*/
|
*/
|
||||||
public class GetDatasourceFromCountry implements Serializable {
|
public class GetDatasourceFromCountry implements Serializable {
|
||||||
private static final Logger log = LoggerFactory.getLogger(GetDatasourceFromCountry.class);
|
private static final Logger log = LoggerFactory.getLogger(GetDatasourceFromCountry.class);
|
||||||
|
|
||||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
|
|
||||||
String jsonConfiguration = IOUtils
|
String jsonConfiguration = IOUtils
|
||||||
.toString(
|
.toString(
|
||||||
GetDatasourceFromCountry.class
|
GetDatasourceFromCountry.class
|
||||||
.getResourceAsStream(
|
.getResourceAsStream(
|
||||||
"/eu/dnetlib/dhp/oa/graph/input_datasource_country_parameters.json"));
|
"/eu/dnetlib/dhp/oa/graph/input_datasource_country_parameters.json"));
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||||
parser.parseArgument(args);
|
parser.parseArgument(args);
|
||||||
|
|
||||||
Boolean isSparkSessionManaged = Optional
|
Boolean isSparkSessionManaged = Optional
|
||||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||||
.map(Boolean::valueOf)
|
.map(Boolean::valueOf)
|
||||||
.orElse(Boolean.TRUE);
|
.orElse(Boolean.TRUE);
|
||||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||||
|
|
||||||
String inputPath = parser.get("inputPath");
|
String inputPath = parser.get("inputPath");
|
||||||
log.info("inputPath: {}", inputPath);
|
log.info("inputPath: {}", inputPath);
|
||||||
|
|
||||||
String workingPath = parser.get("workingDir");
|
String workingPath = parser.get("workingDir");
|
||||||
log.info("workingDir: {}", workingPath);
|
log.info("workingDir: {}", workingPath);
|
||||||
|
|
||||||
String country = parser.get("country");
|
String country = parser.get("country");
|
||||||
log.info("country: {}", country);
|
log.info("country: {}", country);
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
SparkConf conf = new SparkConf();
|
||||||
runWithSparkSession(
|
runWithSparkSession(
|
||||||
conf,
|
conf,
|
||||||
isSparkSessionManaged,
|
isSparkSessionManaged,
|
||||||
spark -> {
|
spark -> {
|
||||||
getDatasourceFromCountry(spark, country, inputPath, workingPath);
|
getDatasourceFromCountry(spark, country, inputPath, workingPath);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void getDatasourceFromCountry(SparkSession spark, String country, String inputPath,
|
private static void getDatasourceFromCountry(SparkSession spark, String country, String inputPath,
|
||||||
String workingDir) {
|
String workingDir) {
|
||||||
|
|
||||||
Dataset<Organization> organization = spark
|
Dataset<Organization> organization = spark
|
||||||
.read()
|
.read()
|
||||||
.textFile(inputPath + "/organization")
|
.textFile(inputPath + "/organization")
|
||||||
.map(
|
.map(
|
||||||
(MapFunction<String, Organization>) value -> OBJECT_MAPPER.readValue(value, Organization.class),
|
(MapFunction<String, Organization>) value -> OBJECT_MAPPER.readValue(value, Organization.class),
|
||||||
Encoders.bean(Organization.class))
|
Encoders.bean(Organization.class))
|
||||||
.filter(
|
.filter(
|
||||||
(FilterFunction<Organization>) o -> !o.getDataInfo().getDeletedbyinference() &&
|
(FilterFunction<Organization>) o -> !o.getDataInfo().getDeletedbyinference() &&
|
||||||
o.getCountry().getClassid().length() > 0 &&
|
o.getCountry().getClassid().length() > 0 &&
|
||||||
o.getCountry().getClassid().equals(country));
|
o.getCountry().getClassid().equals(country));
|
||||||
|
|
||||||
// filtering of the relations taking the non deleted by inference and those with IsProvidedBy as relclass
|
// filtering of the relations taking the non deleted by inference and those with IsProvidedBy as relclass
|
||||||
Dataset<Relation> relation = spark
|
Dataset<Relation> relation = spark
|
||||||
.read()
|
.read()
|
||||||
.textFile(inputPath + "/relation")
|
.textFile(inputPath + "/relation")
|
||||||
.map(
|
.map(
|
||||||
(MapFunction<String, Relation>) value -> OBJECT_MAPPER.readValue(value, Relation.class),
|
(MapFunction<String, Relation>) value -> OBJECT_MAPPER.readValue(value, Relation.class),
|
||||||
Encoders.bean(Relation.class))
|
Encoders.bean(Relation.class))
|
||||||
.filter(
|
.filter(
|
||||||
(FilterFunction<Relation>) rel -> rel.getRelClass().equalsIgnoreCase(ModelConstants.IS_PROVIDED_BY) &&
|
(FilterFunction<Relation>) rel -> rel.getRelClass().equalsIgnoreCase(ModelConstants.IS_PROVIDED_BY) &&
|
||||||
!rel.getDataInfo().getDeletedbyinference());
|
!rel.getDataInfo().getDeletedbyinference());
|
||||||
|
|
||||||
organization
|
organization
|
||||||
.joinWith(relation, organization.col("id").equalTo(relation.col("target")))
|
.joinWith(relation, organization.col("id").equalTo(relation.col("target")))
|
||||||
.map((MapFunction<Tuple2<Organization, Relation>, String>) t2 -> t2._2().getSource(), Encoders.STRING())
|
.map((MapFunction<Tuple2<Organization, Relation>, String>) t2 -> t2._2().getSource(), Encoders.STRING())
|
||||||
.write()
|
.write()
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.option("compression", "gzip")
|
.option("compression", "gzip")
|
||||||
.json(workingDir);
|
.json(workingDir);
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,7 +5,6 @@ import java.io.IOException;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
|
@ -27,165 +26,165 @@ import org.slf4j.LoggerFactory;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob;
|
import eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||||
|
|
||||||
public class CleanCountryTest {
|
public class CleanCountryTest {
|
||||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
private static SparkSession spark;
|
private static SparkSession spark;
|
||||||
|
|
||||||
private static Path workingDir;
|
private static Path workingDir;
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(CleanContextTest.class);
|
private static final Logger log = LoggerFactory.getLogger(CleanContextTest.class);
|
||||||
|
|
||||||
@BeforeAll
|
@BeforeAll
|
||||||
public static void beforeAll() throws IOException {
|
public static void beforeAll() throws IOException {
|
||||||
workingDir = Files.createTempDirectory(CleanCountryTest.class.getSimpleName());
|
workingDir = Files.createTempDirectory(CleanCountryTest.class.getSimpleName());
|
||||||
log.info("using work dir {}", workingDir);
|
log.info("using work dir {}", workingDir);
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
SparkConf conf = new SparkConf();
|
||||||
conf.setAppName(CleanCountryTest.class.getSimpleName());
|
conf.setAppName(CleanCountryTest.class.getSimpleName());
|
||||||
|
|
||||||
conf.setMaster("local[*]");
|
conf.setMaster("local[*]");
|
||||||
conf.set("spark.driver.host", "localhost");
|
conf.set("spark.driver.host", "localhost");
|
||||||
conf.set("hive.metastore.local", "true");
|
conf.set("hive.metastore.local", "true");
|
||||||
conf.set("spark.ui.enabled", "false");
|
conf.set("spark.ui.enabled", "false");
|
||||||
conf.set("spark.sql.warehouse.dir", workingDir.toString());
|
conf.set("spark.sql.warehouse.dir", workingDir.toString());
|
||||||
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
|
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
|
||||||
|
|
||||||
spark = SparkSession
|
spark = SparkSession
|
||||||
.builder()
|
.builder()
|
||||||
.appName(CleanCountryTest.class.getSimpleName())
|
.appName(CleanCountryTest.class.getSimpleName())
|
||||||
.config(conf)
|
.config(conf)
|
||||||
.getOrCreate();
|
.getOrCreate();
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterAll
|
@AfterAll
|
||||||
public static void afterAll() throws IOException {
|
public static void afterAll() throws IOException {
|
||||||
FileUtils.deleteDirectory(workingDir.toFile());
|
FileUtils.deleteDirectory(workingDir.toFile());
|
||||||
spark.stop();
|
spark.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testResultClean() throws Exception {
|
public void testResultClean() throws Exception {
|
||||||
final String sourcePath = getClass()
|
final String sourcePath = getClass()
|
||||||
.getResource("/eu/dnetlib/dhp/oa/graph/clean/publication_clean_country.json")
|
.getResource("/eu/dnetlib/dhp/oa/graph/clean/publication_clean_country.json")
|
||||||
.getPath();
|
.getPath();
|
||||||
|
|
||||||
spark
|
spark
|
||||||
.read()
|
.read()
|
||||||
.textFile(sourcePath)
|
.textFile(sourcePath)
|
||||||
.map(
|
.map(
|
||||||
(MapFunction<String, Publication>) r -> OBJECT_MAPPER.readValue(r, Publication.class),
|
(MapFunction<String, Publication>) r -> OBJECT_MAPPER.readValue(r, Publication.class),
|
||||||
Encoders.bean(Publication.class))
|
Encoders.bean(Publication.class))
|
||||||
.write()
|
.write()
|
||||||
.json(workingDir.toString() + "/publication");
|
.json(workingDir.toString() + "/publication");
|
||||||
|
|
||||||
CleanCountrySparkJob.main(new String[] {
|
CleanCountrySparkJob.main(new String[] {
|
||||||
"--isSparkSessionManaged", Boolean.FALSE.toString(),
|
"--isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||||
"--inputPath", workingDir.toString() + "/publication",
|
"--inputPath", workingDir.toString() + "/publication",
|
||||||
"--graphTableClassName", Publication.class.getCanonicalName(),
|
"--graphTableClassName", Publication.class.getCanonicalName(),
|
||||||
"--workingDir", workingDir.toString() + "/working",
|
"--workingDir", workingDir.toString() + "/working",
|
||||||
"--country", "NL",
|
"--country", "NL",
|
||||||
"--verifyParam", "10.17632",
|
"--verifyParam", "10.17632",
|
||||||
"--collectedfrom", "NARCIS",
|
"--collectedfrom", "NARCIS",
|
||||||
"--hostedBy", getClass()
|
"--hostedBy", getClass()
|
||||||
.getResource("/eu/dnetlib/dhp/oa/graph/clean/hostedBy")
|
.getResource("/eu/dnetlib/dhp/oa/graph/clean/hostedBy")
|
||||||
.getPath()
|
.getPath()
|
||||||
});
|
});
|
||||||
|
|
||||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||||
JavaRDD<Publication> tmp = sc
|
JavaRDD<Publication> tmp = sc
|
||||||
.textFile(workingDir.toString() + "/publication")
|
.textFile(workingDir.toString() + "/publication")
|
||||||
.map(item -> OBJECT_MAPPER.readValue(item, Publication.class));
|
.map(item -> OBJECT_MAPPER.readValue(item, Publication.class));
|
||||||
|
|
||||||
Assertions.assertEquals(8, tmp.count());
|
Assertions.assertEquals(8, tmp.count());
|
||||||
|
|
||||||
// original result with NL country and doi starting with Mendely prefix, but not collectedfrom NARCIS
|
// original result with NL country and doi starting with Mendely prefix, but not collectedfrom NARCIS
|
||||||
Assertions
|
Assertions
|
||||||
.assertEquals(
|
.assertEquals(
|
||||||
1,
|
1,
|
||||||
tmp
|
tmp
|
||||||
.filter(p -> p.getId().equals("50|DansKnawCris::0224aae28af558f21768dbc6439c7a95"))
|
.filter(p -> p.getId().equals("50|DansKnawCris::0224aae28af558f21768dbc6439c7a95"))
|
||||||
.collect()
|
.collect()
|
||||||
.get(0)
|
.get(0)
|
||||||
.getCountry()
|
.getCountry()
|
||||||
.size());
|
.size());
|
||||||
|
|
||||||
// original result with NL country and pid not starting with Mendely prefix
|
// original result with NL country and pid not starting with Mendely prefix
|
||||||
Assertions
|
Assertions
|
||||||
.assertEquals(
|
.assertEquals(
|
||||||
1,
|
1,
|
||||||
tmp
|
tmp
|
||||||
.filter(p -> p.getId().equals("50|DansKnawCris::20c414a3b1c742d5dd3851f1b67df2d9"))
|
.filter(p -> p.getId().equals("50|DansKnawCris::20c414a3b1c742d5dd3851f1b67df2d9"))
|
||||||
.collect()
|
.collect()
|
||||||
.get(0)
|
.get(0)
|
||||||
.getCountry()
|
.getCountry()
|
||||||
.size());
|
.size());
|
||||||
|
|
||||||
// original result with NL country and doi starting with Mendely prefix and collectedfrom NARCIS but not
|
// original result with NL country and doi starting with Mendely prefix and collectedfrom NARCIS but not
|
||||||
// inserted with propagation
|
// inserted with propagation
|
||||||
Assertions
|
Assertions
|
||||||
.assertEquals(
|
.assertEquals(
|
||||||
1,
|
1,
|
||||||
tmp
|
tmp
|
||||||
.filter(p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6af"))
|
.filter(p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6af"))
|
||||||
.collect()
|
.collect()
|
||||||
.get(0)
|
.get(0)
|
||||||
.getCountry()
|
.getCountry()
|
||||||
.size());
|
.size());
|
||||||
|
|
||||||
// original result with NL country and doi starting with Mendely prefix and collectedfrom NARCIS inserted with
|
// original result with NL country and doi starting with Mendely prefix and collectedfrom NARCIS inserted with
|
||||||
// propagation
|
// propagation
|
||||||
Assertions
|
Assertions
|
||||||
.assertEquals(
|
.assertEquals(
|
||||||
0,
|
0,
|
||||||
tmp
|
tmp
|
||||||
.filter(p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6ag"))
|
.filter(p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6ag"))
|
||||||
.collect()
|
.collect()
|
||||||
.get(0)
|
.get(0)
|
||||||
.getCountry()
|
.getCountry()
|
||||||
.size());
|
.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDatasetClean() throws Exception {
|
public void testDatasetClean() throws Exception {
|
||||||
final String sourcePath = getClass()
|
final String sourcePath = getClass()
|
||||||
.getResource("/eu/dnetlib/dhp/oa/graph/clean/dataset_clean_country.json")
|
.getResource("/eu/dnetlib/dhp/oa/graph/clean/dataset_clean_country.json")
|
||||||
.getPath();
|
.getPath();
|
||||||
|
|
||||||
spark
|
spark
|
||||||
.read()
|
.read()
|
||||||
.textFile(sourcePath)
|
.textFile(sourcePath)
|
||||||
.map(
|
.map(
|
||||||
(MapFunction<String, Dataset>) r -> OBJECT_MAPPER.readValue(r, Dataset.class),
|
(MapFunction<String, Dataset>) r -> OBJECT_MAPPER.readValue(r, Dataset.class),
|
||||||
Encoders.bean(Dataset.class))
|
Encoders.bean(Dataset.class))
|
||||||
.write()
|
.write()
|
||||||
.json(workingDir.toString() + "/dataset");
|
.json(workingDir.toString() + "/dataset");
|
||||||
|
|
||||||
CleanCountrySparkJob.main(new String[] {
|
CleanCountrySparkJob.main(new String[] {
|
||||||
"--isSparkSessionManaged", Boolean.FALSE.toString(),
|
"--isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||||
"--inputPath", workingDir.toString() + "/dataset",
|
"--inputPath", workingDir.toString() + "/dataset",
|
||||||
"-graphTableClassName", Dataset.class.getCanonicalName(),
|
"-graphTableClassName", Dataset.class.getCanonicalName(),
|
||||||
"-workingDir", workingDir.toString() + "/working",
|
"-workingDir", workingDir.toString() + "/working",
|
||||||
"-country", "NL",
|
"-country", "NL",
|
||||||
"-verifyParam", "10.17632",
|
"-verifyParam", "10.17632",
|
||||||
"-collectedfrom", "NARCIS",
|
"-collectedfrom", "NARCIS",
|
||||||
"-hostedBy", getClass()
|
"-hostedBy", getClass()
|
||||||
.getResource("/eu/dnetlib/dhp/oa/graph/clean/hostedBy")
|
.getResource("/eu/dnetlib/dhp/oa/graph/clean/hostedBy")
|
||||||
.getPath()
|
.getPath()
|
||||||
});
|
});
|
||||||
|
|
||||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||||
JavaRDD<Dataset> tmp = sc
|
JavaRDD<Dataset> tmp = sc
|
||||||
.textFile(workingDir.toString() + "/dataset")
|
.textFile(workingDir.toString() + "/dataset")
|
||||||
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
|
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
|
||||||
|
|
||||||
Assertions.assertEquals(1, tmp.count());
|
Assertions.assertEquals(1, tmp.count());
|
||||||
|
|
||||||
Assertions.assertEquals(0, tmp.first().getCountry().size());
|
Assertions.assertEquals(0, tmp.first().getCountry().size());
|
||||||
|
|
||||||
|
}
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,7 +12,6 @@ import java.util.Objects;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.dom4j.DocumentException;
|
import org.dom4j.DocumentException;
|
||||||
|
@ -22,6 +21,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
import org.mockito.Mock;
|
import org.mockito.Mock;
|
||||||
import org.mockito.junit.jupiter.MockitoExtension;
|
import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
|
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
|
||||||
|
@ -995,7 +995,7 @@ class MappersTest {
|
||||||
@Test
|
@Test
|
||||||
void testEoscFutureHackZenodo() throws IOException {
|
void testEoscFutureHackZenodo() throws IOException {
|
||||||
final String xml = IOUtils
|
final String xml = IOUtils
|
||||||
.toString(Objects.requireNonNull(getClass().getResourceAsStream("zenodo7351221.xml")));
|
.toString(Objects.requireNonNull(getClass().getResourceAsStream("zenodo7351221.xml")));
|
||||||
|
|
||||||
final List<Oaf> actual = new OdfToOafMapper(vocs, false, true).processMdRecord(xml);
|
final List<Oaf> actual = new OdfToOafMapper(vocs, false, true).processMdRecord(xml);
|
||||||
actual.forEach(a -> {
|
actual.forEach(a -> {
|
||||||
|
|
Loading…
Reference in New Issue