This commit is contained in:
Miriam Baglioni 2023-11-16 12:17:42 +01:00
parent 0f602bae9d
commit db388ebc21
5 changed files with 145 additions and 63 deletions

View File

@ -17,6 +17,7 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -76,8 +77,10 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable {
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
Utils.removeOutputDir(spark, workingPath + resultType + "extendedaffiliation"); Utils.removeOutputDir(spark, workingPath + resultType + "extendedaffiliation");
Utils.removeOutputDir(spark, workingPath + resultType + "organization");
Utils.removeOutputDir(spark, workingPath + resultType + "resultOrganization");
addOrganizations(spark, inputPath, workingPath, resultType); addOrganizations(spark, inputPath, workingPath, resultType);
dumpOrganizationAndRelations(spark, inputPath, workingPath, resultType); dumpOrganizationAndRelations(spark, inputPath, workingPath, resultType);
}); });
} }
@ -101,6 +104,8 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable {
.joinWith(relation, result.col("id").equalTo(relation.col("source"))) .joinWith(relation, result.col("id").equalTo(relation.col("source")))
.map((MapFunction<Tuple2<Result, Relation>, Relation>) t2 -> t2._2(), Encoders.bean(Relation.class)); .map((MapFunction<Tuple2<Result, Relation>, Relation>) t2 -> t2._2(), Encoders.bean(Relation.class));
log.info("Number of affiliation relation for " + resultType + " = " + eoscRelation.count());
// from eoscRelation select the organization // from eoscRelation select the organization
Dataset<String> organizationIds = eoscRelation Dataset<String> organizationIds = eoscRelation
.joinWith(organization, eoscRelation.col("target").equalTo(organization.col("id"))) .joinWith(organization, eoscRelation.col("target").equalTo(organization.col("id")))
@ -121,7 +126,7 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable {
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.json(workingPath + resultType + "/organization"); .json(workingPath + resultType + "organization");
eoscRelation eoscRelation
.joinWith(organization, eoscRelation.col("target").equalTo(organization.col("id"))) .joinWith(organization, eoscRelation.col("target").equalTo(organization.col("id")))
@ -137,7 +142,7 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable {
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.json(workingPath + resultType + "/resultOrganization"); .json(workingPath + resultType + "resultOrganization");
} }
@ -159,67 +164,18 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable {
Dataset<ResultOrganizations> resultOrganization = relations Dataset<ResultOrganizations> resultOrganization = relations
.joinWith(organizations, relations.col("source").equalTo(organizations.col("id")), "left") .joinWith(organizations, relations.col("source").equalTo(organizations.col("id")), "left")
.map((MapFunction<Tuple2<Relation, Organization>, ResultOrganizations>) t2 -> { .map((MapFunction<Tuple2<Relation, Organization>, ResultOrganizations>) t2 -> getResultOrganizations(t2), Encoders.bean(ResultOrganizations.class))
if (t2._2() != null) {
ResultOrganizations rOrg = new ResultOrganizations();
rOrg.setResultId(t2._1().getTarget());
Affiliation org = new Affiliation();
org.setId(t2._2().getId());
if (Optional.ofNullable(t2._2().getLegalname()).isPresent()) {
org.setName(t2._2().getLegalname().getValue());
} else {
org.setName("");
}
HashMap<String, Set<String>> organizationPids = new HashMap<>();
if (Optional.ofNullable(t2._2().getPid()).isPresent())
t2._2().getPid().forEach(p -> {
if (!organizationPids.containsKey(p.getQualifier().getClassid()))
organizationPids.put(p.getQualifier().getClassid(), new HashSet<>());
organizationPids.get(p.getQualifier().getClassid()).add(p.getValue());
});
List<OrganizationPid> pids = new ArrayList<>();
for (String key : organizationPids.keySet()) {
for (String value : organizationPids.get(key)) {
OrganizationPid pid = new OrganizationPid();
pid.setValue(value);
pid.setType(key);
pids.add(pid);
}
}
org.setPid(pids);
rOrg.setAffiliation(org);
return rOrg;
}
return null;
}, Encoders.bean(ResultOrganizations.class))
.filter(Objects::nonNull); .filter(Objects::nonNull);
System.out.println(resultOrganization.count());
results results
.joinWith(resultOrganization, results.col("id").equalTo(resultOrganization.col("resultId")), "left") .joinWith(resultOrganization, results.col("id").equalTo(resultOrganization.col("resultId")), "left")
.groupByKey( .groupByKey(
(MapFunction<Tuple2<Result, ResultOrganizations>, String>) t2 -> t2._1().getId(), Encoders.STRING()) (MapFunction<Tuple2<Result, ResultOrganizations>, String>) t2 -> t2._1().getId(), Encoders.STRING())
.mapGroups( .mapGroups(
(MapGroupsFunction<String, Tuple2<Result, ResultOrganizations>, Result>) (s, it) -> { (MapGroupsFunction<String, Tuple2<Result, ResultOrganizations>, Result>) (s, it) -> addAffiliation(it)
Tuple2<Result, ResultOrganizations> first = it.next(); , Encoders.bean(Result.class))
if (first._2() == null) {
return first._1();
}
Result ret = first._1();
List<Affiliation> affiliation = new ArrayList<>();
Set<String> alreadyInsertedAffiliations = new HashSet<>();
affiliation.add(first._2().getAffiliation());
alreadyInsertedAffiliations.add(first._2().getAffiliation().getId());
it.forEachRemaining(res -> {
if (!alreadyInsertedAffiliations.contains(res._2().getAffiliation().getId())) {
affiliation.add(res._2().getAffiliation());
alreadyInsertedAffiliations.add(res._2().getAffiliation().getId());
}
});
ret.setAffiliation(affiliation);
return ret;
}, Encoders.bean(Result.class))
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
@ -227,6 +183,63 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable {
} }
@Nullable
private static ResultOrganizations getResultOrganizations(Tuple2<Relation, Organization> t2) {
if (t2._2() != null) {
Organization organization = t2._2();
ResultOrganizations rOrg = new ResultOrganizations();
rOrg.setResultId(t2._1().getTarget());
Affiliation org = new Affiliation();
org.setId(organization.getId());
if (Optional.ofNullable(organization.getLegalname()).isPresent()) {
org.setName(organization.getLegalname().getValue());
} else {
org.setName("");
}
HashMap<String, Set<String>> organizationPids = new HashMap<>();
if (Optional.ofNullable(organization.getPid()).isPresent())
organization.getPid().forEach(p -> {
if (!organizationPids.containsKey(p.getQualifier().getClassid()))
organizationPids.put(p.getQualifier().getClassid(), new HashSet<>());
organizationPids.get(p.getQualifier().getClassid()).add(p.getValue());
});
List<OrganizationPid> pids = new ArrayList<>();
for (String key : organizationPids.keySet()) {
for (String value : organizationPids.get(key)) {
OrganizationPid pid = new OrganizationPid();
pid.setValue(value);
pid.setType(key);
pids.add(pid);
}
}
org.setPid(pids);
rOrg.setAffiliation(org);
return rOrg;
}
return null;
}
private static Result addAffiliation(Iterator<Tuple2<Result, ResultOrganizations>> it) {
Tuple2<Result, ResultOrganizations> first = it.next();
if (first._2() == null) {
return first._1();
}
Result ret = first._1();
List<Affiliation> affiliation = new ArrayList<>();
Set<String> alreadyInsertedAffiliations = new HashSet<>();
affiliation.add(first._2().getAffiliation());
alreadyInsertedAffiliations.add(first._2().getAffiliation().getId());
it.forEachRemaining(res -> {
if (!alreadyInsertedAffiliations.contains(res._2().getAffiliation().getId())) {
affiliation.add(res._2().getAffiliation());
alreadyInsertedAffiliations.add(res._2().getAffiliation().getId());
}
});
ret.setAffiliation(affiliation);
return ret;
}
private static eu.dnetlib.dhp.eosc.model.Organization mapOrganization(Organization org) { private static eu.dnetlib.dhp.eosc.model.Organization mapOrganization(Organization org) {
if (isToBeDumpedOrg(org)) if (isToBeDumpedOrg(org))

View File

@ -6,6 +6,7 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
@ -95,6 +96,7 @@ public class SelectEoscResultsJobStep1 implements Serializable {
(MapFunction<R, Result>) r -> (Result) ResultMapper (MapFunction<R, Result>) r -> (Result) ResultMapper
.map(r, communityMap, df), .map(r, communityMap, df),
Encoders.bean(Result.class)) Encoders.bean(Result.class))
.filter(Objects::nonNull)
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")

View File

@ -10,6 +10,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SaveMode;
@ -57,6 +58,9 @@ public class SparkDumpOrganizationProject implements Serializable {
final String outputPath = parser.get("outputPath"); final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath); log.info("outputPath: {}", outputPath);
final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
runWithSparkSession( runWithSparkSession(
@ -64,16 +68,42 @@ public class SparkDumpOrganizationProject implements Serializable {
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
Utils.removeOutputDir(spark, outputPath + "/organizationProject"); Utils.removeOutputDir(spark, outputPath + "/organizationProject");
dumpRelation(spark, inputPath, outputPath); dumpRelation(spark, inputPath, outputPath, workingPath);
}); });
} }
private static void dumpRelation(SparkSession spark, String inputPath, String outputPath) { private static void dumpRelation(SparkSession spark, String inputPath, String outputPath, String workingPath) {
Dataset<Organization> organization = Utils.readPath(spark, outputPath + "organization", Organization.class); Dataset<Organization> organization = Utils
.readPath(spark, workingPath + "publicationorganization", Organization.class)
.union(Utils.readPath(spark, workingPath + "datasetorganization", Organization.class))
.union(Utils.readPath(spark, workingPath + "softwareorganization", Organization.class))
.union(Utils.readPath(spark, workingPath + "otherresearchproductorganization", Organization.class))
.groupByKey((MapFunction<Organization, String>) o -> o.getId(), Encoders.STRING())
.mapGroups(
(MapGroupsFunction<String, Organization, Organization>) (k, v) -> v.next(),
Encoders.bean(Organization.class));
Dataset<Project> project = Utils.readPath(spark, outputPath + "project", Project.class); Dataset<Project> project = Utils
.readPath(spark, workingPath + "publicationproject", Project.class)
.union(Utils.readPath(spark, workingPath + "datasetproject", Project.class))
.union(Utils.readPath(spark, workingPath + "softwareproject", Project.class))
.union(Utils.readPath(spark, workingPath + "otherresearchproductproject", Project.class))
.groupByKey((MapFunction<Project, String>) o -> o.getId(), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, Project, Project>) (k, v) -> v.next(), Encoders.bean(Project.class));
organization
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + "organization");
project
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + "project");
Dataset<Relation> relation = Utils Dataset<Relation> relation = Utils
.readPath(spark, inputPath + "/relation", Relation.class) .readPath(spark, inputPath + "/relation", Relation.class)
@ -96,6 +126,42 @@ public class SparkDumpOrganizationProject implements Serializable {
.option("compression", "gzip") .option("compression", "gzip")
.json(outputPath + "organizationProject"); .json(outputPath + "organizationProject");
Utils
.readPath(spark, workingPath + "publicationresultOrganization", eu.dnetlib.dhp.eosc.model.Relation.class)
.union(
Utils
.readPath(
spark, workingPath + "datasetresultOrganization", eu.dnetlib.dhp.eosc.model.Relation.class))
.union(
Utils
.readPath(
spark, workingPath + "softwareresultOrganization", eu.dnetlib.dhp.eosc.model.Relation.class))
.union(
Utils
.readPath(
spark, workingPath + "otherresearchproductresultOrganization",
eu.dnetlib.dhp.eosc.model.Relation.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + "resultOrganization");
Utils
.readPath(spark, workingPath + "publicationresultProject", eu.dnetlib.dhp.eosc.model.Relation.class)
.union(
Utils.readPath(spark, workingPath + "datasetresultProject", eu.dnetlib.dhp.eosc.model.Relation.class))
.union(
Utils.readPath(spark, workingPath + "softwareresultProject", eu.dnetlib.dhp.eosc.model.Relation.class))
.union(
Utils
.readPath(
spark, workingPath + "otherresearchproductresultProject",
eu.dnetlib.dhp.eosc.model.Relation.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + "resultProject");
} }
} }

View File

@ -132,7 +132,7 @@ public class SparkUpdateProjectInfo implements Serializable {
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.json(outputPath + "project"); .json(workingPath + resultType + "project");
result result
.joinWith( .joinWith(
@ -152,7 +152,7 @@ public class SparkUpdateProjectInfo implements Serializable {
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.json(outputPath + "resultProject"); .json(workingPath + resultType + "resultProject");
} }
private static eu.dnetlib.dhp.eosc.model.Project mapProject(eu.dnetlib.dhp.schema.oaf.Project p) private static eu.dnetlib.dhp.eosc.model.Project mapProject(eu.dnetlib.dhp.schema.oaf.Project p)

View File

@ -632,6 +632,7 @@
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg> <arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--outputPath</arg><arg>${outputPath}/dump/</arg> <arg>--outputPath</arg><arg>${outputPath}/dump/</arg>
<arg>--workingPath</arg><arg>${workingDir}/dump/</arg>
</spark> </spark>
<ok to="make_archive"/> <ok to="make_archive"/>
<error to="Kill"/> <error to="Kill"/>