This commit is contained in:
Miriam Baglioni 2023-07-28 10:26:22 +02:00
parent 81b55dc20b
commit 6b113961c1
10 changed files with 49 additions and 27 deletions

View File

@ -155,7 +155,7 @@ public class ResultMapper implements Serializable {
input
.getCollectedfrom()
.stream()
.map(cf -> CfHbKeyValue.newInstance(cf.getKey(), cf.getValue()))
.map(cf -> CfHbKeyValue.newInstance(cf.getKey().substring(3), cf.getValue()))
.collect(Collectors.toList()));
}
@ -518,11 +518,11 @@ public class ResultMapper implements Serializable {
instance
.setCollectedfrom(
CfHbKeyValue
.newInstance(i.getCollectedfrom().getKey(), i.getCollectedfrom().getValue()));
.newInstance(i.getCollectedfrom().getKey().substring(3), i.getCollectedfrom().getValue()));
instance
.setHostedby(
CfHbKeyValue.newInstance(i.getHostedby().getKey(), i.getHostedby().getValue()));
CfHbKeyValue.newInstance(i.getHostedby().getKey().substring(3), i.getHostedby().getValue()));
return instance;

View File

@ -63,7 +63,7 @@ public class Utils {
return String
.format(
"%s|%s::%s", Constants.CONTEXT_ID, Constants.CONTEXT_NS_PREFIX,
"%s::%s", Constants.CONTEXT_ID, Constants.CONTEXT_NS_PREFIX,
DHPUtils.md5(id));
}

View File

@ -61,6 +61,13 @@ public class SparkPrepareResultProject implements Serializable {
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
Boolean substring = Optional
.ofNullable(parser.get("substring"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
@ -74,11 +81,12 @@ public class SparkPrepareResultProject implements Serializable {
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
prepareResultProjectList(spark, inputPath, outputPath);
prepareResultProjectList(spark, inputPath, outputPath, substring);
});
}
private static void prepareResultProjectList(SparkSession spark, String inputPath, String outputPath) {
private static void prepareResultProjectList(SparkSession spark, String inputPath, String outputPath,
Boolean substring) {
Dataset<Relation> relation = Utils
.readPath(spark, inputPath + "/relation", Relation.class)
.filter(
@ -101,7 +109,10 @@ public class SparkPrepareResultProject implements Serializable {
Set<String> projectSet = new HashSet<>();
Tuple2<eu.dnetlib.dhp.schema.oaf.Project, Relation> first = it.next();
ResultProject rp = new ResultProject();
rp.setResultId(s);
if (substring)
rp.setResultId(s.substring(3));
else
rp.setResultId(s);
eu.dnetlib.dhp.schema.oaf.Project p = first._1();
projectSet.add(p.getId());
Project ps = getProject(p, first._2);
@ -131,7 +142,7 @@ public class SparkPrepareResultProject implements Serializable {
private static Project getProject(eu.dnetlib.dhp.schema.oaf.Project op, Relation relation) {
Project p = Project
.newInstance(
op.getId(),
op.getId().substring(3),
op.getCode().getValue(),
Optional
.ofNullable(op.getAcronym())

View File

@ -84,7 +84,7 @@ public class Extractor implements Serializable {
.orElse(null))
.orElse(null);
Relation r = getRelation(
value.getId().substring(3), contextId.substring(3),
value.getId().substring(3), contextId,
Constants.RESULT_ENTITY,
Constants.CONTEXT_ENTITY,
ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP, provenance);
@ -94,7 +94,7 @@ public class Extractor implements Serializable {
hashCodes.add(r.hashCode());
}
r = getRelation(
contextId.substring(3), value.getId().substring(3),
contextId, value.getId().substring(3),
Constants.CONTEXT_ENTITY,
Constants.RESULT_ENTITY,
ModelConstants.IS_RELATED_TO,

View File

@ -55,8 +55,8 @@ public class Process implements Serializable {
ci
.getDatasourceList()
.forEach(ds -> {
String nodeType = ModelSupport.idPrefixEntity.get(ds.substring(0, 2));
String datasourceId = ds.startsWith("10|") || ds.startsWith("40|") ? ds.substring(3) : ds;
String contextId = Utils.getContextId(ci.getId());
relationList
@ -65,7 +65,7 @@ public class Process implements Serializable {
.newInstance(
contextId, eu.dnetlib.dhp.oa.model.graph.Constants.CONTEXT_ENTITY,
ds, nodeType,
datasourceId, nodeType,
RelType.newInstance(ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP),
Provenance
.newInstance(
@ -76,7 +76,7 @@ public class Process implements Serializable {
.add(
Relation
.newInstance(
ds, nodeType,
datasourceId, nodeType,
contextId, eu.dnetlib.dhp.oa.model.graph.Constants.CONTEXT_ENTITY,
RelType.newInstance(ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP),
Provenance

View File

@ -105,7 +105,7 @@ public class SparkCollectAndSave implements Serializable {
}
// Dataset<String> dumpedIds = Utils.getEntitiesId(spark, outputPath);
Dataset<String> dumpedIds = Utils.getEntitiesId(spark, outputPath);
Dataset<Relation> relations = Utils
.readPath(spark, inputPath + "/relation/publication", Relation.class)
@ -116,16 +116,18 @@ public class SparkCollectAndSave implements Serializable {
.union(Utils.readPath(spark, inputPath + "/relation/context", Relation.class))
.union(Utils.readPath(spark, inputPath + "/relation/relation", Relation.class));
Utils.getValidRelations(relations, Utils.getEntitiesId(spark, outputPath))
// Dataset<Relation> relJoinSource = relations
// .joinWith(dumpedIds, relations.col("source.id").equalTo(dumpedIds.col("value")))
// .map((MapFunction<Tuple2<Relation, String>, Relation>) t2 -> t2._1(),
// Encoders.bean(Relation.class));
//
// relJoinSource
// .joinWith(dumpedIds, relJoinSource.col("target.id").equalTo(dumpedIds.col("value")))
// .map((MapFunction<Tuple2<Relation, String>, Relation>) t2 -> t2._1(),
// Encoders.bean(Relation.class))
// Utils.getValidRelations(relations, Utils.getEntitiesId(spark, outputPath))
Dataset<Relation> relJoinSource = relations
.joinWith(dumpedIds, relations.col("source").equalTo(dumpedIds.col("value")))
.map(
(MapFunction<Tuple2<Relation, String>, Relation>) t2 -> t2._1(),
Encoders.bean(Relation.class));
relJoinSource
.joinWith(dumpedIds, relJoinSource.col("target").equalTo(dumpedIds.col("value")))
.map(
(MapFunction<Tuple2<Relation, String>, Relation>) t2 -> t2._1(),
Encoders.bean(Relation.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")

View File

@ -79,7 +79,9 @@ public class SparkDumpRelationJob implements Serializable {
private static void dumpRelation(SparkSession spark, String inputPath, String outputPath, Set<String> removeSet) {
Dataset<Relation> relations = Utils.readPath(spark, inputPath, Relation.class);
relations
.filter((FilterFunction<Relation>) r -> !removeSet.contains(r.getRelClass()))
.filter(
(FilterFunction<Relation>) r -> !removeSet.contains(r.getRelClass())
&& !r.getSubRelType().equalsIgnoreCase("resultService"))
.map((MapFunction<Relation, eu.dnetlib.dhp.oa.model.graph.Relation>) relation -> {
eu.dnetlib.dhp.oa.model.graph.Relation relNew = new eu.dnetlib.dhp.oa.model.graph.Relation();
relNew

View File

@ -155,7 +155,7 @@ public class SparkOrganizationRelation implements Serializable {
eu.dnetlib.dhp.oa.model.graph.Relation
.newInstance(
id, Constants.CONTEXT_ENTITY,
organization,
organization.substring(3),
ModelSupport.idPrefixEntity.get(organization.substring(0, 2)),
RelType.newInstance(ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP),
Provenance
@ -167,7 +167,7 @@ public class SparkOrganizationRelation implements Serializable {
.add(
eu.dnetlib.dhp.oa.model.graph.Relation
.newInstance(
organization, ModelSupport.idPrefixEntity.get(organization.substring(0, 2)),
organization.substring(3), ModelSupport.idPrefixEntity.get(organization.substring(0, 2)),
id, Constants.CONTEXT_ENTITY,
RelType.newInstance(ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP),
Provenance

View File

@ -16,5 +16,11 @@
"paramLongName": "isSparkSessionManaged",
"paramDescription": "true if the spark session is managed, false otherwise",
"paramRequired": false
},
{
"paramName": "sb",
"paramLongName": "substring",
"paramDescription": "true if the spark session is managed, false otherwise",
"paramRequired": false
}
]

View File

@ -102,6 +102,7 @@
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--outputPath</arg><arg>${workingDir}/preparedInfo</arg>
<arg>--substring</arg><arg>false</arg>
</spark>
<ok to="fork_result_linked_to_projects"/>
<error to="Kill"/>