fixing issues
This commit is contained in:
parent
8d83b5173c
commit
aa48d52707
|
@ -63,6 +63,9 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable {
|
|||
final String outputPath = parser.get("outputPath");
|
||||
log.info("outputPath: {}", outputPath);
|
||||
|
||||
final String resultType = parser.get("resultType");
|
||||
log.info("resultType: {}", resultType);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
runWithSparkSession(
|
||||
|
@ -70,14 +73,14 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable {
|
|||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
Utils.removeOutputDir(spark, workingPath + "publicationextendedaffiliation");
|
||||
addOrganizations(spark, inputPath, workingPath, outputPath);
|
||||
addOrganizations(spark, inputPath, workingPath, outputPath, resultType);
|
||||
});
|
||||
}
|
||||
|
||||
private static void addOrganizations(SparkSession spark, String inputPath, String workingPath, String outputPath) {
|
||||
private static void addOrganizations(SparkSession spark, String inputPath, String workingPath, String outputPath, String resultType) {
|
||||
|
||||
Dataset<Result> results = Utils
|
||||
.readPath(spark, workingPath + "publication", Result.class);
|
||||
.readPath(spark, workingPath + "resultType", Result.class);
|
||||
|
||||
Dataset<Relation> relations = Utils
|
||||
.readPath(spark, inputPath + "/relation", Relation.class)
|
||||
|
@ -155,20 +158,21 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable {
|
|||
.option("compression", "gzip")
|
||||
.json(workingPath + "publicationextendedaffiliation");
|
||||
|
||||
relations
|
||||
.joinWith(organizations, relations.col("source").equalTo(organizations.col("id")))
|
||||
.map(
|
||||
(MapFunction<Tuple2<Relation, Organization>, eu.dnetlib.dhp.eosc.model.Organization>) t2 -> mapOrganization(
|
||||
t2._2()),
|
||||
Encoders.bean(eu.dnetlib.dhp.eosc.model.Organization.class))
|
||||
.filter(Objects::nonNull)
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(outputPath + "organization");
|
||||
Dataset<Relation> organizationWithAffiliation = relations
|
||||
.joinWith(results, relations.col("target").equalTo(results.col("id")))
|
||||
.map((MapFunction<Tuple2<Relation, Result>, Relation>) t2 -> t2._1(), Encoders.bean(Relation.class));
|
||||
|
||||
relations
|
||||
.joinWith(organizations, relations.col("source").equalTo(organizations.col("id")))
|
||||
organizationWithAffiliation.joinWith(organizations, organizationWithAffiliation.col("source").equalTo(organizations.col("id")))
|
||||
.map((MapFunction<Tuple2<Relation, Organization>, eu.dnetlib.dhp.eosc.model.Organization>) t2 -> mapOrganization(
|
||||
t2._2()),
|
||||
Encoders.bean(eu.dnetlib.dhp.eosc.model.Organization.class))
|
||||
.filter(Objects::nonNull)
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(outputPath + "organization");
|
||||
|
||||
organizationWithAffiliation.joinWith(organizations, organizationWithAffiliation.col("source").equalTo(organizations.col("id")))
|
||||
.map(
|
||||
(MapFunction<Tuple2<Relation, Organization>, eu.dnetlib.dhp.eosc.model.Relation>) t2 -> eu.dnetlib.dhp.eosc.model.Relation
|
||||
.newInstance(t2._1().getSource(), t2._1().getTarget()),
|
||||
|
|
|
@ -166,6 +166,7 @@
|
|||
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
|
||||
<arg>--workingPath</arg><arg>${workingDir}/dump/</arg>
|
||||
<arg>--outputPath</arg><arg>${outputPath}/dump/</arg>
|
||||
<arg>--resultType</arg><arg>publication</arg>
|
||||
</spark>
|
||||
<ok to="wait_eosc_dump"/>
|
||||
<error to="Kill"/>
|
||||
|
@ -218,6 +219,7 @@
|
|||
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
|
||||
<arg>--workingPath</arg><arg>${workingDir}/dump/</arg>
|
||||
<arg>--outputPath</arg><arg>${outputPath}/dump/</arg>
|
||||
<arg>--resultType</arg><arg>dataset</arg>
|
||||
</spark>
|
||||
<ok to="wait_eosc_dump"/>
|
||||
<error to="Kill"/>
|
||||
|
@ -271,6 +273,7 @@
|
|||
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
|
||||
<arg>--workingPath</arg><arg>${workingDir}/dump/</arg>
|
||||
<arg>--outputPath</arg><arg>${outputPath}/dump/</arg>
|
||||
<arg>--resultType</arg><arg>otherresearchproduct</arg>
|
||||
</spark>
|
||||
<ok to="wait_eosc_dump"/>
|
||||
<error to="Kill"/>
|
||||
|
@ -323,6 +326,7 @@
|
|||
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
|
||||
<arg>--workingPath</arg><arg>${workingDir}/dump/</arg>
|
||||
<arg>--outputPath</arg><arg>${outputPath}/dump/</arg>
|
||||
<arg>--resultType</arg><arg>software</arg>
|
||||
</spark>
|
||||
<ok to="wait_eosc_dump"/>
|
||||
<error to="Kill"/>
|
||||
|
|
|
@ -28,7 +28,12 @@
|
|||
"paramLongName":"workingPath",
|
||||
"paramDescription": "The path to the community map",
|
||||
"paramRequired": false
|
||||
}
|
||||
},
|
||||
{
|
||||
"paramName":"rt",
|
||||
"paramLongName":"resultType",
|
||||
"paramDescription": "The path to the community map",
|
||||
"paramRequired": false}
|
||||
]
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue