forked from D-Net/dnet-hadoop
[enrichment single step] refactoring to fix issue in disappeared result type
This commit is contained in:
parent
2d302e6827
commit
59eaccbd87
|
@ -0,0 +1,84 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.PropagationConstant.isSparkSessionManaged;
|
||||||
|
import static eu.dnetlib.dhp.PropagationConstant.readPath;
|
||||||
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.sql.Dataset;
|
||||||
|
import org.apache.spark.sql.SaveMode;
|
||||||
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob;
|
||||||
|
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author miriam.baglioni
|
||||||
|
* @Date 15/01/24
|
||||||
|
*/
|
||||||
|
public class MoveResult implements Serializable {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(MoveResult.class);
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
String jsonConfiguration = IOUtils
|
||||||
|
.toString(
|
||||||
|
SparkResultToCommunityFromOrganizationJob.class
|
||||||
|
.getResourceAsStream(
|
||||||
|
"/eu/dnetlib/dhp/wf/subworkflows/input_moveresult_parameters.json"));
|
||||||
|
|
||||||
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||||
|
|
||||||
|
parser.parseArgument(args);
|
||||||
|
|
||||||
|
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
|
||||||
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||||
|
|
||||||
|
String inputPath = parser.get("sourcePath");
|
||||||
|
log.info("inputPath: {}", inputPath);
|
||||||
|
|
||||||
|
final String outputPath = parser.get("outputPath");
|
||||||
|
log.info("outputPath: {}", outputPath);
|
||||||
|
|
||||||
|
SparkConf conf = new SparkConf();
|
||||||
|
|
||||||
|
runWithSparkSession(
|
||||||
|
conf,
|
||||||
|
isSparkSessionManaged,
|
||||||
|
spark -> {
|
||||||
|
moveResults(spark, inputPath, outputPath);
|
||||||
|
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <R extends Result> void moveResults(SparkSession spark, String inputPath, String outputPath) {
|
||||||
|
|
||||||
|
ModelSupport.entityTypes
|
||||||
|
.keySet()
|
||||||
|
.parallelStream()
|
||||||
|
.filter(e -> ModelSupport.isResult(e))
|
||||||
|
// .parallelStream()
|
||||||
|
.forEach(e -> {
|
||||||
|
Class<R> resultClazz = ModelSupport.entityTypes.get(e);
|
||||||
|
Dataset<R> resultDataset = readPath(spark, inputPath + e.name(), resultClazz);
|
||||||
|
if (resultDataset.count() > 0) {
|
||||||
|
|
||||||
|
resultDataset
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.json(outputPath + e.name());
|
||||||
|
}
|
||||||
|
|
||||||
|
});
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -76,29 +76,41 @@ public class SparkResultToCommunityFromOrganizationJob {
|
||||||
ModelSupport.entityTypes
|
ModelSupport.entityTypes
|
||||||
.keySet()
|
.keySet()
|
||||||
.parallelStream()
|
.parallelStream()
|
||||||
|
.filter(e -> ModelSupport.isResult(e))
|
||||||
|
// .parallelStream()
|
||||||
.forEach(e -> {
|
.forEach(e -> {
|
||||||
if (ModelSupport.isResult(e)) {
|
// if () {
|
||||||
Class<R> resultClazz = ModelSupport.entityTypes.get(e);
|
Class<R> resultClazz = ModelSupport.entityTypes.get(e);
|
||||||
removeOutputDir(spark, outputPath + e.name());
|
removeOutputDir(spark, outputPath + e.name());
|
||||||
Dataset<R> result = readPath(spark, inputPath + e.name(), resultClazz);
|
Dataset<R> result = readPath(spark, inputPath + e.name(), resultClazz);
|
||||||
|
|
||||||
result
|
log.info("executing left join");
|
||||||
.joinWith(
|
result
|
||||||
possibleUpdates,
|
.joinWith(
|
||||||
result.col("id").equalTo(possibleUpdates.col("resultId")),
|
possibleUpdates,
|
||||||
"left_outer")
|
result.col("id").equalTo(possibleUpdates.col("resultId")),
|
||||||
.map(resultCommunityFn(), Encoders.bean(resultClazz))
|
"left_outer")
|
||||||
.write()
|
.map(resultCommunityFn(), Encoders.bean(resultClazz))
|
||||||
.mode(SaveMode.Overwrite)
|
.write()
|
||||||
.option("compression", "gzip")
|
.mode(SaveMode.Overwrite)
|
||||||
.json(outputPath + e.name());
|
.option("compression", "gzip")
|
||||||
|
.json(outputPath + e.name());
|
||||||
|
|
||||||
readPath(spark, outputPath + e.name(), resultClazz)
|
// log
|
||||||
.write()
|
// .info(
|
||||||
.mode(SaveMode.Overwrite)
|
// "reading results from " + outputPath + e.name() + " and copying them to " + inputPath
|
||||||
.option("compression", "gzip")
|
// + e.name());
|
||||||
.json(inputPath + e.name());
|
// Dataset<R> tmp = readPath(spark, outputPath + e.name(), resultClazz);
|
||||||
}
|
// if (tmp.count() > 0){
|
||||||
|
//
|
||||||
|
// tmp
|
||||||
|
// .write()
|
||||||
|
// .mode(SaveMode.Overwrite)
|
||||||
|
// .option("compression", "gzip")
|
||||||
|
// .json(inputPath + e.name());
|
||||||
|
// }
|
||||||
|
|
||||||
|
// }
|
||||||
});
|
});
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -115,11 +127,11 @@ public class SparkResultToCommunityFromOrganizationJob {
|
||||||
.map(Context::getId)
|
.map(Context::getId)
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
// @SuppressWarnings("unchecked")
|
||||||
R res = (R) ret.getClass().newInstance();
|
// R res = (R) ret.getClass().newInstance();
|
||||||
|
|
||||||
res.setId(ret.getId());
|
// res.setId(ret.getId());
|
||||||
List<Context> propagatedContexts = new ArrayList<>();
|
// List<Context> propagatedContexts = new ArrayList<>();
|
||||||
for (String cId : communitySet) {
|
for (String cId : communitySet) {
|
||||||
if (!contextList.contains(cId)) {
|
if (!contextList.contains(cId)) {
|
||||||
Context newContext = new Context();
|
Context newContext = new Context();
|
||||||
|
@ -133,11 +145,11 @@ public class SparkResultToCommunityFromOrganizationJob {
|
||||||
PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID,
|
PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID,
|
||||||
PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME,
|
PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME,
|
||||||
ModelConstants.DNET_PROVENANCE_ACTIONS)));
|
ModelConstants.DNET_PROVENANCE_ACTIONS)));
|
||||||
propagatedContexts.add(newContext);
|
ret.getContext().add(newContext);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
res.setContext(propagatedContexts);
|
// res.setContext(propagatedContexts);
|
||||||
ret.mergeFrom(res);
|
// ret.mergeFrom(res);
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
};
|
};
|
||||||
|
|
|
@ -86,29 +86,30 @@ public class SparkResultToCommunityFromProject implements Serializable {
|
||||||
ModelSupport.entityTypes
|
ModelSupport.entityTypes
|
||||||
.keySet()
|
.keySet()
|
||||||
.parallelStream()
|
.parallelStream()
|
||||||
|
.filter(e -> ModelSupport.isResult(e))
|
||||||
.forEach(e -> {
|
.forEach(e -> {
|
||||||
if (ModelSupport.isResult(e)) {
|
// if () {
|
||||||
removeOutputDir(spark, outputPath + e.name());
|
removeOutputDir(spark, outputPath + e.name());
|
||||||
Class<R> resultClazz = ModelSupport.entityTypes.get(e);
|
Class<R> resultClazz = ModelSupport.entityTypes.get(e);
|
||||||
Dataset<R> result = readPath(spark, inputPath + e.name(), resultClazz);
|
Dataset<R> result = readPath(spark, inputPath + e.name(), resultClazz);
|
||||||
|
|
||||||
result
|
result
|
||||||
.joinWith(
|
.joinWith(
|
||||||
possibleUpdates,
|
possibleUpdates,
|
||||||
result.col("id").equalTo(possibleUpdates.col("resultId")),
|
result.col("id").equalTo(possibleUpdates.col("resultId")),
|
||||||
"left_outer")
|
"left_outer")
|
||||||
.map(resultCommunityFn(), Encoders.bean(resultClazz))
|
.map(resultCommunityFn(), Encoders.bean(resultClazz))
|
||||||
.write()
|
.write()
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.option("compression", "gzip")
|
.option("compression", "gzip")
|
||||||
.json(outputPath + e.name());
|
.json(outputPath + e.name());
|
||||||
|
|
||||||
readPath(spark, outputPath + e.name(), resultClazz)
|
readPath(spark, outputPath + e.name(), resultClazz)
|
||||||
.write()
|
.write()
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.option("compression", "gzip")
|
.option("compression", "gzip")
|
||||||
.json(inputPath + e.name());
|
.json(inputPath + e.name());
|
||||||
}
|
// }
|
||||||
});
|
});
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,12 +1,12 @@
|
||||||
sourcePath=/tmp/beta_provision/graph/09_graph_dedup_enriched
|
sourcePath=/tmp/beta_provision/graph/10_graph_orcid_enriched
|
||||||
resumeFrom=CountryPropagation
|
resumeFrom=CommunityOrganization
|
||||||
allowedsemrelsorcidprop=isSupplementedBy;isSupplementTo
|
allowedsemrelsorcidprop=isSupplementedBy;isSupplementTo
|
||||||
allowedsemrelsresultproject=isSupplementedBy;isSupplementTo
|
allowedsemrelsresultproject=isSupplementedBy;isSupplementTo
|
||||||
allowedsemrelscommunitysemrel=isSupplementedBy;isSupplementTo
|
allowedsemrelscommunitysemrel=isSupplementedBy;isSupplementTo
|
||||||
datasourceWhitelistForCountryPropagation=10|opendoar____::16e6a3326dd7d868cbc926602a61e4d0;10|openaire____::fdb035c8b3e0540a8d9a561a6c44f4de;10|eurocrisdris::fe4903425d9040f680d8610d9079ea14;10|openaire____::5b76240cc27a58c6f7ceef7d8c36660e;10|openaire____::172bbccecf8fca44ab6a6653e84cb92a;10|openaire____::149c6590f8a06b46314eed77bfca693f;10|eurocrisdris::a6026877c1a174d60f81fd71f62df1c1;10|openaire____::4692342f0992d91f9e705c26959f09e0;10|openaire____::8d529dbb05ec0284662b391789e8ae2a;10|openaire____::345c9d171ef3c5d706d08041d506428c;10|opendoar____::1c1d4df596d01da60385f0bb17a4a9e0;10|opendoar____::7a614fd06c325499f1680b9896beedeb;10|opendoar____::1ee3dfcd8a0645a25a35977997223d22;10|opendoar____::d296c101daa88a51f6ca8cfc1ac79b50;10|opendoar____::798ed7d4ee7138d49b8828958048130a;10|openaire____::c9d2209ecc4d45ba7b4ca7597acb88a2;10|eurocrisdris::c49e0fe4b9ba7b7fab717d1f0f0a674d;10|eurocrisdris::9ae43d14471c4b33661fedda6f06b539;10|eurocrisdris::432ca599953ff50cd4eeffe22faf3e48
|
datasourceWhitelistForCountryPropagation=10|opendoar____::16e6a3326dd7d868cbc926602a61e4d0;10|openaire____::fdb035c8b3e0540a8d9a561a6c44f4de;10|eurocrisdris::fe4903425d9040f680d8610d9079ea14;10|openaire____::5b76240cc27a58c6f7ceef7d8c36660e;10|openaire____::172bbccecf8fca44ab6a6653e84cb92a;10|openaire____::149c6590f8a06b46314eed77bfca693f;10|eurocrisdris::a6026877c1a174d60f81fd71f62df1c1;10|openaire____::4692342f0992d91f9e705c26959f09e0;10|openaire____::8d529dbb05ec0284662b391789e8ae2a;10|openaire____::345c9d171ef3c5d706d08041d506428c;10|opendoar____::1c1d4df596d01da60385f0bb17a4a9e0;10|opendoar____::7a614fd06c325499f1680b9896beedeb;10|opendoar____::1ee3dfcd8a0645a25a35977997223d22;10|opendoar____::d296c101daa88a51f6ca8cfc1ac79b50;10|opendoar____::798ed7d4ee7138d49b8828958048130a;10|openaire____::c9d2209ecc4d45ba7b4ca7597acb88a2;10|eurocrisdris::c49e0fe4b9ba7b7fab717d1f0f0a674d;10|eurocrisdris::9ae43d14471c4b33661fedda6f06b539;10|eurocrisdris::432ca599953ff50cd4eeffe22faf3e48
|
||||||
#allowedtypes=pubsrepository::institutional
|
#allowedtypes=pubsrepository::institutional
|
||||||
allowedtypes=Institutional
|
allowedtypes=Institutional
|
||||||
outputPath=/tmp/miriam/enrichment_one_step
|
outputPath=/tmp/beta_provision/graph/11_graph_orcid
|
||||||
pathMap ={"author":"$['author'][*]['fullname']", \
|
pathMap ={"author":"$['author'][*]['fullname']", \
|
||||||
"title":"$['title'][*]['value']",\
|
"title":"$['title'][*]['value']",\
|
||||||
"orcid":"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid')]['value']" ,\
|
"orcid":"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid')]['value']" ,\
|
||||||
|
|
|
@ -231,7 +231,7 @@
|
||||||
</property>
|
</property>
|
||||||
</configuration>
|
</configuration>
|
||||||
</sub-workflow>
|
</sub-workflow>
|
||||||
<ok to="result_project" />
|
<ok to="End" />
|
||||||
<error to="Kill" />
|
<error to="Kill" />
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,22 @@
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"paramName":"s",
|
||||||
|
"paramLongName":"sourcePath",
|
||||||
|
"paramDescription": "the path of the sequencial file to read",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
|
||||||
|
{
|
||||||
|
"paramName": "out",
|
||||||
|
"paramLongName": "outputPath",
|
||||||
|
"paramDescription": "the path used to store temporary output files",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "ssm",
|
||||||
|
"paramLongName": "isSparkSessionManaged",
|
||||||
|
"paramDescription": "true if the spark session is managed, false otherwise",
|
||||||
|
"paramRequired": false
|
||||||
|
}
|
||||||
|
|
||||||
|
]
|
|
@ -69,7 +69,7 @@
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<master>yarn</master>
|
<master>yarn</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
<name>community2resultfromorganization-Publication</name>
|
<name>community2resultfromorganization</name>
|
||||||
<class>eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob</class>
|
<class>eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob</class>
|
||||||
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
|
@ -88,6 +88,33 @@
|
||||||
<arg>--sourcePath</arg><arg>${sourcePath}/</arg>
|
<arg>--sourcePath</arg><arg>${sourcePath}/</arg>
|
||||||
<arg>--outputPath</arg><arg>${workingDir}/communityorganization/resulttocommunityfromorganization/</arg>
|
<arg>--outputPath</arg><arg>${workingDir}/communityorganization/resulttocommunityfromorganization/</arg>
|
||||||
</spark>
|
</spark>
|
||||||
|
<ok to="move-results"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="move-results">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>community2resultfromorganization - move results</name>
|
||||||
|
<class>eu.dnetlib.dhp.MoveResult</class>
|
||||||
|
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-cores=6
|
||||||
|
--executor-memory=5G
|
||||||
|
--conf spark.executor.memoryOverhead=3g
|
||||||
|
--conf spark.sql.shuffle.partitions=3284
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--sourcePath</arg><arg>${workingDir}/communityorganization/resulttocommunityfromorganization/</arg>
|
||||||
|
<arg>--outputPath</arg><arg>${sourcePath}/</arg>
|
||||||
|
<!-- <arg>--outputPath</arg><arg>/tmp/miriam/rescomm/</arg>-->
|
||||||
|
</spark>
|
||||||
<ok to="End"/>
|
<ok to="End"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
|
@ -86,12 +86,37 @@
|
||||||
<arg>--sourcePath</arg><arg>${sourcePath}/</arg>
|
<arg>--sourcePath</arg><arg>${sourcePath}/</arg>
|
||||||
<arg>--outputPath</arg><arg>${workingDir}/communitythroughproject/</arg>
|
<arg>--outputPath</arg><arg>${workingDir}/communitythroughproject/</arg>
|
||||||
</spark>
|
</spark>
|
||||||
|
<ok to="move-results"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="move-results">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>community2resultfromorganization - move results</name>
|
||||||
|
<class>eu.dnetlib.dhp.MoveResult</class>
|
||||||
|
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-cores=6
|
||||||
|
--executor-memory=5G
|
||||||
|
--conf spark.executor.memoryOverhead=3g
|
||||||
|
--conf spark.sql.shuffle.partitions=3284
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--sourcePath</arg><arg>${workingDir}/communitythroughproject/</arg>
|
||||||
|
<arg>--outputPath</arg><arg>${sourcePath}/</arg>
|
||||||
|
<!-- <arg>outputPath</arg><arg>/tmp/miriam/rescomm/</arg>-->
|
||||||
|
</spark>
|
||||||
<ok to="End"/>
|
<ok to="End"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
<end name="End"/>
|
<end name="End"/>
|
||||||
|
|
||||||
</workflow-app>
|
</workflow-app>
|
Loading…
Reference in New Issue