forked from D-Net/dnet-hadoop
Merge pull request 'Project propagation via communityAPI instead of using IS via IIS' (#362) from projectPropagation into master
Reviewed-on: D-Net/dnet-hadoop#362
This commit is contained in:
commit
2b626815ff
|
@ -5,7 +5,6 @@ import java.util.ArrayList;
|
|||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
|
@ -72,6 +71,9 @@ public class PropagationConstant {
|
|||
public static final String PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID = "result:community:organization";
|
||||
public static final String PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME = " Propagation of result belonging to community through organization";
|
||||
|
||||
public static final String PROPAGATION_RESULT_COMMUNITY_PROJECT_CLASS_ID = "result:community:project";
|
||||
public static final String PROPAGATION_RESULT_COMMUNITY_PROJECT_CLASS_NAME = " Propagation of result belonging to community through project";
|
||||
|
||||
public static final String PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_ID = "authorpid:result";
|
||||
public static final String PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_NAME = "Propagation of authors pid to result through semantic relations";
|
||||
|
||||
|
|
|
@ -0,0 +1,124 @@
|
|||
|
||||
package eu.dnetlib.dhp.resulttocommunityfromproject;
|
||||
|
||||
import static eu.dnetlib.dhp.PropagationConstant.*;
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.io.compress.GzipCodec;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.api.java.function.MapGroupsFunction;
|
||||
import org.apache.spark.sql.*;
|
||||
import org.apache.spark.sql.types.DataTypes;
|
||||
import org.apache.spark.sql.types.StructType;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
|
||||
import eu.dnetlib.dhp.api.Utils;
|
||||
import eu.dnetlib.dhp.api.model.CommunityEntityMap;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList;
|
||||
import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultOrganizations;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import scala.Tuple2;
|
||||
|
||||
public class PrepareResultCommunitySet {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(PrepareResultCommunitySet.class);
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
String jsonConfiguration = IOUtils
|
||||
.toString(
|
||||
PrepareResultCommunitySet.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/resulttocommunityfromproject/input_preparecommunitytoresult_parameters.json"));
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||
parser.parseArgument(args);
|
||||
|
||||
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
String inputPath = parser.get("sourcePath");
|
||||
log.info("inputPath: {}", inputPath);
|
||||
|
||||
final String outputPath = parser.get("outputPath");
|
||||
log.info("outputPath: {}", outputPath);
|
||||
|
||||
final boolean production = Boolean.valueOf(parser.get("production"));
|
||||
log.info("production: {}", production);
|
||||
|
||||
final CommunityEntityMap projectsMap = Utils.getCommunityProjects(production);
|
||||
// log.info("projectsMap: {}", new Gson().toJson(projectsMap));
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
runWithSparkSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
removeOutputDir(spark, outputPath);
|
||||
prepareInfo(spark, inputPath, outputPath, projectsMap);
|
||||
});
|
||||
}
|
||||
|
||||
private static void prepareInfo(
|
||||
SparkSession spark,
|
||||
String inputPath,
|
||||
String outputPath,
|
||||
CommunityEntityMap projectMap) {
|
||||
|
||||
final StructType structureSchema = new StructType()
|
||||
.add(
|
||||
"dataInfo", new StructType()
|
||||
.add("deletedbyinference", DataTypes.BooleanType)
|
||||
.add("invisible", DataTypes.BooleanType))
|
||||
.add("source", DataTypes.StringType)
|
||||
.add("target", DataTypes.StringType)
|
||||
.add("relClass", DataTypes.StringType);
|
||||
|
||||
spark
|
||||
.read()
|
||||
.schema(structureSchema)
|
||||
.json(inputPath)
|
||||
.filter(
|
||||
"dataInfo.deletedbyinference != true " +
|
||||
"and relClass == '" + ModelConstants.IS_PRODUCED_BY + "'")
|
||||
.select(
|
||||
new Column("source").as("resultId"),
|
||||
new Column("target").as("projectId"))
|
||||
.groupByKey((MapFunction<Row, String>) r -> (String) r.getAs("resultId"), Encoders.STRING())
|
||||
.mapGroups((MapGroupsFunction<String, Row, ResultProjectList>) (k, v) -> {
|
||||
ResultProjectList rpl = new ResultProjectList();
|
||||
rpl.setResultId(k);
|
||||
ArrayList<String> cl = new ArrayList<>();
|
||||
cl.addAll(projectMap.get(v.next().getAs("projectId")));
|
||||
v.forEachRemaining(r -> {
|
||||
projectMap
|
||||
.get(r.getAs("projectId"))
|
||||
.forEach(c -> {
|
||||
if (!cl.contains(c))
|
||||
cl.add(c);
|
||||
});
|
||||
|
||||
});
|
||||
if (cl.size() == 0)
|
||||
return null;
|
||||
rpl.setCommunityList(cl);
|
||||
return rpl;
|
||||
}, Encoders.bean(ResultProjectList.class))
|
||||
.filter(Objects::nonNull)
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(outputPath);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,26 @@
|
|||
|
||||
package eu.dnetlib.dhp.resulttocommunityfromproject;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
|
||||
public class ResultProjectList implements Serializable {
|
||||
private String resultId;
|
||||
private ArrayList<String> communityList;
|
||||
|
||||
public String getResultId() {
|
||||
return resultId;
|
||||
}
|
||||
|
||||
public void setResultId(String resultId) {
|
||||
this.resultId = resultId;
|
||||
}
|
||||
|
||||
public ArrayList<String> getCommunityList() {
|
||||
return communityList;
|
||||
}
|
||||
|
||||
public void setCommunityList(ArrayList<String> communityList) {
|
||||
this.communityList = communityList;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,163 @@
|
|||
|
||||
package eu.dnetlib.dhp.resulttocommunityfromproject;
|
||||
|
||||
import static eu.dnetlib.dhp.PropagationConstant.*;
|
||||
import static eu.dnetlib.dhp.PropagationConstant.PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME;
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList;
|
||||
import eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.Context;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import scala.Tuple2;
|
||||
|
||||
/**
|
||||
* @author miriam.baglioni
|
||||
* @Date 11/10/23
|
||||
*/
|
||||
public class SparkResultToCommunityFromProject implements Serializable {
|
||||
private static final Logger log = LoggerFactory.getLogger(SparkResultToCommunityFromProject.class);
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
String jsonConfiguration = IOUtils
|
||||
.toString(
|
||||
SparkResultToCommunityFromProject.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/resulttocommunityfromproject/input_communitytoresult_parameters.json"));
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||
|
||||
parser.parseArgument(args);
|
||||
|
||||
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
String inputPath = parser.get("sourcePath");
|
||||
log.info("inputPath: {}", inputPath);
|
||||
|
||||
final String outputPath = parser.get("outputPath");
|
||||
log.info("outputPath: {}", outputPath);
|
||||
|
||||
final String possibleupdatespath = parser.get("preparedInfoPath");
|
||||
log.info("preparedInfoPath: {}", possibleupdatespath);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
runWithSparkSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
|
||||
execPropagation(spark, inputPath, outputPath, possibleupdatespath);
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
private static <R extends Result> void execPropagation(
|
||||
SparkSession spark,
|
||||
String inputPath,
|
||||
String outputPath,
|
||||
|
||||
String possibleUpdatesPath) {
|
||||
|
||||
Dataset<ResultProjectList> possibleUpdates = readPath(spark, possibleUpdatesPath, ResultProjectList.class);
|
||||
|
||||
ModelSupport.entityTypes
|
||||
.keySet()
|
||||
.parallelStream()
|
||||
.forEach(e -> {
|
||||
if (ModelSupport.isResult(e)) {
|
||||
removeOutputDir(spark, outputPath + e.name());
|
||||
Class<R> resultClazz = ModelSupport.entityTypes.get(e);
|
||||
Dataset<R> result = readPath(spark, inputPath + e.name(), resultClazz);
|
||||
|
||||
result
|
||||
.joinWith(
|
||||
possibleUpdates,
|
||||
result.col("id").equalTo(possibleUpdates.col("resultId")),
|
||||
"left_outer")
|
||||
.map(resultCommunityFn(), Encoders.bean(resultClazz))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(outputPath + e.name());
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
private static <R extends Result> MapFunction<Tuple2<R, ResultProjectList>, R> resultCommunityFn() {
|
||||
return value -> {
|
||||
R ret = value._1();
|
||||
Optional<ResultProjectList> rcl = Optional.ofNullable(value._2());
|
||||
if (rcl.isPresent()) {
|
||||
// ArrayList<String> communitySet = rcl.get().getCommunityList();
|
||||
List<String> contextList = ret
|
||||
.getContext()
|
||||
.stream()
|
||||
.map(Context::getId)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
R res = (R) ret.getClass().newInstance();
|
||||
|
||||
res.setId(ret.getId());
|
||||
List<Context> propagatedContexts = new ArrayList<>();
|
||||
for (String cId : rcl.get().getCommunityList()) {
|
||||
if (!contextList.contains(cId)) {
|
||||
Context newContext = new Context();
|
||||
newContext.setId(cId);
|
||||
newContext
|
||||
.setDataInfo(
|
||||
Arrays
|
||||
.asList(
|
||||
getDataInfo(
|
||||
PROPAGATION_DATA_INFO_TYPE,
|
||||
PROPAGATION_RESULT_COMMUNITY_PROJECT_CLASS_ID,
|
||||
PROPAGATION_RESULT_COMMUNITY_PROJECT_CLASS_NAME,
|
||||
ModelConstants.DNET_PROVENANCE_ACTIONS)));
|
||||
propagatedContexts.add(newContext);
|
||||
} else {
|
||||
ret
|
||||
.getContext()
|
||||
.stream()
|
||||
.filter(c -> c.getId().equals(cId))
|
||||
.findFirst()
|
||||
.get()
|
||||
.getDataInfo()
|
||||
.add(
|
||||
getDataInfo(
|
||||
PROPAGATION_DATA_INFO_TYPE,
|
||||
PROPAGATION_RESULT_COMMUNITY_PROJECT_CLASS_ID,
|
||||
PROPAGATION_RESULT_COMMUNITY_PROJECT_CLASS_NAME,
|
||||
ModelConstants.DNET_PROVENANCE_ACTIONS));
|
||||
}
|
||||
}
|
||||
res.setContext(propagatedContexts);
|
||||
ret.mergeFrom(res);
|
||||
}
|
||||
return ret;
|
||||
};
|
||||
}
|
||||
}
|
|
@ -0,0 +1,28 @@
|
|||
[
|
||||
{
|
||||
"paramName":"s",
|
||||
"paramLongName":"sourcePath",
|
||||
"paramDescription": "the path of the sequencial file to read",
|
||||
"paramRequired": true
|
||||
},
|
||||
|
||||
{
|
||||
"paramName": "out",
|
||||
"paramLongName": "outputPath",
|
||||
"paramDescription": "the path used to store temporary output files",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "ssm",
|
||||
"paramLongName": "isSparkSessionManaged",
|
||||
"paramDescription": "true if the spark session is managed, false otherwise",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "p",
|
||||
"paramLongName": "preparedInfoPath",
|
||||
"paramDescription": "the path where prepared info have been stored",
|
||||
"paramRequired": true
|
||||
}
|
||||
|
||||
]
|
|
@ -0,0 +1,28 @@
|
|||
[
|
||||
{
|
||||
"paramName":"s",
|
||||
"paramLongName":"sourcePath",
|
||||
"paramDescription": "the path of the sequencial file to read",
|
||||
"paramRequired": true
|
||||
},
|
||||
|
||||
{
|
||||
"paramName": "ssm",
|
||||
"paramLongName": "isSparkSessionManaged",
|
||||
"paramDescription": "true if the spark session is managed, false otherwise",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "out",
|
||||
"paramLongName": "outputPath",
|
||||
"paramDescription": "the path used to store temporary output files",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "p",
|
||||
"paramLongName": "production",
|
||||
"paramDescription": "the path used to store temporary output files",
|
||||
"paramRequired": true
|
||||
}
|
||||
|
||||
]
|
|
@ -0,0 +1,58 @@
|
|||
<configuration>
|
||||
<property>
|
||||
<name>jobTracker</name>
|
||||
<value>yarnRM</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>nameNode</name>
|
||||
<value>hdfs://nameservice1</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.use.system.libpath</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>spark2</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>hive_metastore_uris</name>
|
||||
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2YarnHistoryServerAddress</name>
|
||||
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2EventLogDir</name>
|
||||
<value>/user/spark/spark2ApplicationHistory</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2ExtraListeners</name>
|
||||
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2SqlQueryExecutionListeners</name>
|
||||
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorNumber</name>
|
||||
<value>4</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkDriverMemory</name>
|
||||
<value>15G</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorMemory</name>
|
||||
<value>6G</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorCores</name>
|
||||
<value>1</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2MaxExecutors</name>
|
||||
<value>50</value>
|
||||
</property>
|
||||
</configuration>
|
|
@ -0,0 +1,144 @@
|
|||
<workflow-app name="community_to_result_propagation_project" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
<property>
|
||||
<name>sourcePath</name>
|
||||
<description>the source path</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>outputPath</name>
|
||||
<description>the output path</description>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<global>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<configuration>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>${oozieActionShareLibForSpark2}</value>
|
||||
</property>
|
||||
</configuration>
|
||||
</global>
|
||||
|
||||
<start to="reset_outputpath"/>
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
<action name="reset_outputpath">
|
||||
<fs>
|
||||
<delete path="${outputPath}"/>
|
||||
<mkdir path="${outputPath}"/>
|
||||
</fs>
|
||||
<ok to="copy_entities"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<fork name="copy_entities">
|
||||
<path start="copy_relation"/>
|
||||
<path start="copy_organization"/>
|
||||
<path start="copy_projects"/>
|
||||
<path start="copy_datasources"/>
|
||||
</fork>
|
||||
|
||||
<action name="copy_relation">
|
||||
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||
<arg>${nameNode}/${sourcePath}/relation</arg>
|
||||
<arg>${nameNode}/${outputPath}/relation</arg>
|
||||
</distcp>
|
||||
<ok to="copy_wait"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="copy_organization">
|
||||
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||
<arg>${nameNode}/${sourcePath}/organization</arg>
|
||||
<arg>${nameNode}/${outputPath}/organization</arg>
|
||||
</distcp>
|
||||
<ok to="copy_wait"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="copy_projects">
|
||||
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||
<arg>${nameNode}/${sourcePath}/project</arg>
|
||||
<arg>${nameNode}/${outputPath}/project</arg>
|
||||
</distcp>
|
||||
<ok to="copy_wait"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="copy_datasources">
|
||||
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||
<arg>${nameNode}/${sourcePath}/datasource</arg>
|
||||
<arg>${nameNode}/${outputPath}/datasource</arg>
|
||||
</distcp>
|
||||
<ok to="copy_wait"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<join name="copy_wait" to="prepare_result_communitylist"/>
|
||||
|
||||
<action name="prepare_result_communitylist">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Prepare-Community-Result-Organization</name>
|
||||
<class>eu.dnetlib.dhp.resulttocommunityfromproject.PrepareResultCommunitySet</class>
|
||||
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=6
|
||||
--executor-memory=5G
|
||||
--conf spark.executor.memoryOverhead=3g
|
||||
--conf spark.sql.shuffle.partitions=3284
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
|
||||
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
|
||||
</spark-opts>
|
||||
<arg>--sourcePath</arg><arg>${sourcePath}/relation</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/preparedInfo/resultCommunityList</arg>
|
||||
<arg>--production</arg><arg>${production}</arg>
|
||||
</spark>
|
||||
<ok to="exec-propagation"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="exec-propagation">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>community2resultfromproject</name>
|
||||
<class>eu.dnetlib.dhp.resulttocommunityfromproject.SparkResultToCommunityFromProject</class>
|
||||
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=6
|
||||
--executor-memory=5G
|
||||
--conf spark.executor.memoryOverhead=3g
|
||||
--conf spark.sql.shuffle.partitions=3284
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
|
||||
</spark-opts>
|
||||
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo/resultCommunityList</arg>
|
||||
<arg>--sourcePath</arg><arg>${sourcePath}/</arg>
|
||||
<arg>--outputPath</arg><arg>${outputPath}/</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
|
||||
<end name="End"/>
|
||||
|
||||
</workflow-app>
|
|
@ -0,0 +1,133 @@
|
|||
|
||||
package eu.dnetlib.dhp.resulttocommunityfromproject;
|
||||
|
||||
import static org.apache.spark.sql.functions.desc;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.Row;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.orcidtoresultfromsemrel.OrcidPropagationJobTest;
|
||||
import eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob;
|
||||
import eu.dnetlib.dhp.schema.oaf.Context;
|
||||
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
||||
|
||||
public class ResultToCommunityJobTest {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(ResultToCommunityJobTest.class);
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
private static SparkSession spark;
|
||||
|
||||
private static Path workingDir;
|
||||
|
||||
@BeforeAll
|
||||
public static void beforeAll() throws IOException {
|
||||
workingDir = Files.createTempDirectory(ResultToCommunityJobTest.class.getSimpleName());
|
||||
log.info("using work dir {}", workingDir);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
conf.setAppName(ResultToCommunityJobTest.class.getSimpleName());
|
||||
|
||||
conf.setMaster("local[*]");
|
||||
conf.set("spark.driver.host", "localhost");
|
||||
conf.set("hive.metastore.local", "true");
|
||||
conf.set("spark.ui.enabled", "false");
|
||||
conf.set("spark.sql.warehouse.dir", workingDir.toString());
|
||||
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
|
||||
|
||||
spark = SparkSession
|
||||
.builder()
|
||||
.appName(OrcidPropagationJobTest.class.getSimpleName())
|
||||
.config(conf)
|
||||
.getOrCreate();
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
public static void afterAll() throws IOException {
|
||||
FileUtils.deleteDirectory(workingDir.toFile());
|
||||
spark.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
void testSparkResultToCommunityFromProjectJob() throws Exception {
|
||||
final String preparedInfoPath = getClass()
|
||||
.getResource("/eu/dnetlib/dhp/resulttocommunityfromproject/preparedInfo")
|
||||
.getPath();
|
||||
SparkResultToCommunityFromProject
|
||||
.main(
|
||||
new String[] {
|
||||
|
||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"-sourcePath", getClass()
|
||||
.getResource("/eu/dnetlib/dhp/resulttocommunityfromproject/sample/")
|
||||
.getPath(),
|
||||
|
||||
"-outputPath", workingDir.toString() + "/",
|
||||
"-preparedInfoPath", preparedInfoPath
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
JavaRDD<Dataset> tmp = sc
|
||||
.textFile(workingDir.toString() + "/dataset")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
|
||||
|
||||
Assertions.assertEquals(10, tmp.count());
|
||||
/**
|
||||
* {"resultId":"50|57a035e5b1ae::d5be548ca7ae489d762f893be67af52f","communityList":["aurora"]}
|
||||
* {"resultId":"50|57a035e5b1ae::a77232ffca9115fcad51c3503dbc7e3e","communityList":["aurora"]}
|
||||
* {"resultId":"50|57a035e5b1ae::803aaad4decab7e27cd4b52a1931b3a1","communityList":["sdsn-gr"]}
|
||||
* {"resultId":"50|57a035e5b1ae::a02e9e4087bca50687731ae5c765b5e1","communityList":["netherlands"]}
|
||||
*/
|
||||
List<Context> context = tmp
|
||||
.filter(r -> r.getId().equals("50|57a035e5b1ae::d5be548ca7ae489d762f893be67af52f"))
|
||||
.first()
|
||||
.getContext();
|
||||
Assertions.assertTrue(context.stream().anyMatch(c -> containsResultCommunityProject(c)));
|
||||
|
||||
context = tmp
|
||||
.filter(r -> r.getId().equals("50|57a035e5b1ae::a77232ffca9115fcad51c3503dbc7e3e"))
|
||||
.first()
|
||||
.getContext();
|
||||
Assertions.assertTrue(context.stream().anyMatch(c -> containsResultCommunityProject(c)));
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
0, tmp.filter(r -> r.getId().equals("50|57a035e5b1ae::803aaad4decab7e27cd4b52a1931b3a1")).count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
0, tmp.filter(r -> r.getId().equals("50|57a035e5b1ae::a02e9e4087bca50687731ae5c765b5e1")).count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
2, tmp.filter(r -> r.getContext().stream().anyMatch(c -> c.getId().equals("aurora"))).count());
|
||||
|
||||
}
|
||||
|
||||
private static boolean containsResultCommunityProject(Context c) {
|
||||
return c
|
||||
.getDataInfo()
|
||||
.stream()
|
||||
.anyMatch(di -> di.getProvenanceaction().getClassid().equals("result:community:project"));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,4 @@
|
|||
{"resultId":"50|57a035e5b1ae::d5be548ca7ae489d762f893be67af52f","communityList":["aurora"]}
|
||||
{"resultId":"50|57a035e5b1ae::a77232ffca9115fcad51c3503dbc7e3e","communityList":["aurora"]}
|
||||
{"resultId":"50|57a035e5b1ae::803aaad4decab7e27cd4b52a1931b3a1","communityList":["sdsn-gr"]}
|
||||
{"resultId":"50|57a035e5b1ae::a02e9e4087bca50687731ae5c765b5e1","communityList":["netherlands"]}
|
|
@ -0,0 +1,20 @@
|
|||
{"subRelType": "affiliation", "relClass": "isProducedBy", "dataInfo": {"provenanceaction": {"classid": "iis", "classname": "Inferred by OpenAIRE", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": true, "inferenceprovenance": "iis::document_affiliations", "invisible": false, "trust": "0.7731"}, "target": "40|corda__h2020::5c16b849965ee04493a5e244471aae16", "lastupdatetimestamp": 1694431186898, "relType": "resultOrganization", "source": "50|57a035e5b1ae::803aaad4decab7e27cd4b52a1931b3a1", "collectedfrom": [], "validated": false, "properties": []}
|
||||
{"subRelType": "affiliation", "relClass": "isProducedBy", "dataInfo": {"provenanceaction": {"classid": "iis", "classname": "Inferred by OpenAIRE", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": true, "inferenceprovenance": "iis::document_affiliations", "invisible": false, "trust": "0.9"}, "target": "40|nwo_________::a9f6d38fb3626d6659d385f71be9657e", "lastupdatetimestamp": 1694431155490, "relType": "resultOrganization", "source": "50|57a035e5b1ae::a02e9e4087bca50687731ae5c765b5e1", "validated": false, "properties": []}
|
||||
{"subRelType": "affiliation", "relClass": "isProducedBy", "dataInfo": {"provenanceaction": {"classid": "iis", "classname": "Inferred by OpenAIRE", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": true, "inferenceprovenance": "iis::document_affiliations", "invisible": false, "trust": "0.9"}, "target": "40|ukri________::93b0983bc4fb1a17a8c3d1e6ab45c03b", "lastupdatetimestamp": 1694431195409, "relType": "resultOrganization", "source": "50|57a035e5b1ae::a77232ffca9115fcad51c3503dbc7e3e", "collectedfrom": [], "validated": false, "properties": []}
|
||||
{"subRelType": "affiliation", "relClass": "isProducedBy", "dataInfo": {"provenanceaction": {"classid": "iis", "classname": "Inferred by OpenAIRE", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": true, "inferenceprovenance": "iis::document_affiliations", "invisible": false, "trust": "0.7731"}, "target": "40|ukri________::93b0983bc4fb1a17a8c3d1e6ab45c03b", "lastupdatetimestamp": 1694431195538, "relType": "resultOrganization", "source": "50|57a035e5b1ae::d5be548ca7ae489d762f893be67af52f", "collectedfrom": [], "validated": false, "properties": []}
|
||||
{"subRelType": "affiliation", "relClass": "isProducedBy", "dataInfo": {"provenanceaction": {"classid": "iis", "classname": "Inferred by OpenAIRE", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": true, "inferenceprovenance": "iis::document_affiliations", "invisible": false, "trust": "0.9"}, "target": "40|openorgs____::eb0669daa9efeb898a3090d8aac7c953", "lastupdatetimestamp": 1694431216929, "relType": "resultOrganization", "source": "50|RECOLECTA___::031d8312287a0108202acd8c5957fcb5", "collectedfrom": [], "validated": false, "properties": []}
|
||||
{"subRelType": "affiliation", "relClass": "isProducedBy", "dataInfo": {"provenanceaction": {"classid": "iis", "classname": "Inferred by OpenAIRE", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": true, "inferenceprovenance": "iis::document_affiliations", "invisible": false, "trust": "0.9"}, "target": "40|pending_org_::26d4324bfca459ab17e1efd966fba8d7", "lastupdatetimestamp": 1694431156296, "relType": "resultOrganization", "source": "50|RECOLECTA___::0ac43c9933175f1d0e091ba4dc814565", "collectedfrom": [], "validated": false, "properties": []}
|
||||
{"subRelType": "affiliation", "relClass": "isProducedBy", "dataInfo": {"provenanceaction": {"classid": "iis", "classname": "Inferred by OpenAIRE", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": true, "inferenceprovenance": "iis::document_affiliations", "invisible": false, "trust": "0.9"}, "target": "40|pending_org_::fc8f734ba211cfc8a3769189808338c2", "lastupdatetimestamp": 1694431218717, "relType": "resultOrganization", "source": "50|RECOLECTA___::1c6c582aa2d57c932069bc4382653229", "collectedfrom": [], "validated": false, "properties": []}
|
||||
{"subRelType": "affiliation", "relClass": "isProducedBy", "dataInfo": {"provenanceaction": {"classid": "iis", "classname": "Inferred by OpenAIRE", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": true, "inferenceprovenance": "iis::document_affiliations", "invisible": false, "trust": "0.9"}, "target": "40|pending_org_::15c4bd8602727bff4ecbaf69e7ac43af", "lastupdatetimestamp": 1694431211627, "relType": "resultOrganization", "source": "50|RECOLECTA___::21c4e85b761c898dd0f6d59d9f9b85f1", "collectedfrom": [], "validated": false, "properties": []}
|
||||
{"subRelType": "affiliation", "relClass": "isProducedBy", "dataInfo": {"provenanceaction": {"classid": "iis", "classname": "Inferred by OpenAIRE", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": true, "inferenceprovenance": "iis::document_affiliations", "invisible": false, "trust": "0.9"}, "target": "40|pending_org_::6e4694bb54928162e3736e2042663ff1", "lastupdatetimestamp": 1694431213910, "relType": "resultOrganization", "source": "50|RECOLECTA___::22436b0491ae186b64671a0667551830", "collectedfrom": [], "validated": false, "properties": []}
|
||||
{"subRelType": "affiliation", "relClass": "isProducedBy", "dataInfo": {"provenanceaction": {"classid": "iis", "classname": "Inferred by OpenAIRE", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": true, "inferenceprovenance": "iis::document_affiliations", "invisible": false, "trust": "0.9"}, "target": "40|openorgs____::5665ec851e301a1f816ee0be3e98757b", "lastupdatetimestamp": 1694431212739, "relType": "resultOrganization", "source": "50|RECOLECTA___::6739f9dda9d6e4703b70f908a4ab6259", "collectedfrom": [], "validated": false, "properties": []}
|
||||
{"subRelType": "affiliation", "relClass": "isProducedBy", "dataInfo": {"provenanceaction": {"classid": "iis", "classname": "Inferred by OpenAIRE", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": true, "inferenceprovenance": "iis::document_affiliations", "invisible": false, "trust": "0.9"}, "target": "40|pending_org_::7ccc3885d6c13b55952c4af07e95f6a6", "lastupdatetimestamp": 1694431203420, "relType": "resultOrganization", "source": "50|RECOLECTA___::a3baa9c23ff95de4f09bf711bcf934a5", "validated": false, "properties": []}
|
||||
{"subRelType": "affiliation", "relClass": "isProducedBy", "dataInfo": {"provenanceaction": {"classid": "iis", "classname": "Inferred by OpenAIRE", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": true, "inferenceprovenance": "iis::document_affiliations", "invisible": false, "trust": "0.8997"}, "target": "40|openorgs____::2cd85ac41e550e32a488a1b26a71fdd8", "lastupdatetimestamp": 1694431162512, "relType": "resultOrganization", "source": "50|RECOLECTA___::ce1f62947f2773709b75e57ec91eca51", "collectedfrom": [], "validated": false, "properties": []}
|
||||
{"subRelType": "affiliation", "relClass": "isProducedBy", "dataInfo": {"provenanceaction": {"classid": "iis", "classname": "Inferred by OpenAIRE", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": true, "inferenceprovenance": "iis::document_affiliations", "invisible": false, "trust": "0.9"}, "target": "40|pending_org_::2f224aa8c572c740fac1ed8b1a38aa21", "lastupdatetimestamp": 1694431204317, "relType": "resultOrganization", "source": "50|RECOLECTA___::df660a72ea2d758e542cdd0d63dcdbd1", "collectedfrom": [], "validated": false, "properties": []}
|
||||
{"subRelType": "affiliation", "relClass": "isProducedBy", "dataInfo": {"provenanceaction": {"classid": "iis", "classname": "Inferred by OpenAIRE", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": true, "inferenceprovenance": "iis::document_affiliations", "invisible": false, "trust": "0.7731"}, "target": "40|openorgs____::d4c564d89cc281f00a27ff7684da5668", "lastupdatetimestamp": 1694431222761, "relType": "resultOrganization", "source": "50|altaiap_____::5c53480b2300ae410d183822b0c1ee9b", "collectedfrom": [], "validated": false, "properties": []}
|
||||
{"subRelType": "affiliation", "relClass": "isProducedBy", "dataInfo": {"provenanceaction": {"classid": "iis", "classname": "Inferred by OpenAIRE", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": true, "inferenceprovenance": "iis::document_affiliations", "invisible": false, "trust": "0.9"}, "target": "40|pending_org_::5f8192f98afda4a06afb4386b7c913cc", "lastupdatetimestamp": 1694431180171, "relType": "resultOrganization", "source": "50|altaiap_____::d036acbdb3155339d2fa77455a0b3d95", "validated": false, "properties": []}
|
||||
{"subRelType": "affiliation", "relClass": "isProducedBy", "dataInfo": {"provenanceaction": {"classid": "iis", "classname": "Inferred by OpenAIRE", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": true, "inferenceprovenance": "iis::document_affiliations", "invisible": false, "trust": "0.8998"}, "target": "40|pending_org_::afc6d809fd8a8db5ee592469c12a7dc8", "lastupdatetimestamp": 1694431193653, "relType": "resultOrganization", "source": "50|arXiv_______::0007ee48e3aa0c409dc68f172bbc76a8", "collectedfrom": [], "validated": false, "properties": []}
|
||||
{"subRelType": "affiliation", "relClass": "isProducedBy", "dataInfo": {"provenanceaction": {"classid": "iis", "classname": "Inferred by OpenAIRE", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": true, "inferenceprovenance": "iis::document_affiliations", "invisible": false, "trust": "0.8998"}, "target": "40|openorgs____::5ab85d71ee109cc81a2bc28cbdeba96a", "lastupdatetimestamp": 1694431224324, "relType": "resultOrganization", "source": "50|arXiv_______::00285f6ee59920bcdb4d96d6cd240095", "collectedfrom": [], "validated": false, "properties": []}
|
||||
{"subRelType": "affiliation", "relClass": "isProducedBy", "dataInfo": {"provenanceaction": {"classid": "iis", "classname": "Inferred by OpenAIRE", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": true, "inferenceprovenance": "iis::document_affiliations", "invisible": false, "trust": "0.9"}, "target": "40|pending_org_::7d22ea802b3405becbde1e2e1ea59197", "lastupdatetimestamp": 1694431220671, "relType": "resultOrganization", "source": "50|arXiv_______::002eb5d6ee54ba872b882bbd3824aa96", "collectedfrom": [], "validated": false, "properties": []}
|
||||
{"subRelType": "affiliation", "relClass": "isProducedBy", "dataInfo": {"provenanceaction": {"classid": "iis", "classname": "Inferred by OpenAIRE", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": true, "inferenceprovenance": "iis::document_affiliations", "invisible": false, "trust": "0.8847"}, "target": "40|pending_org_::c44367f733bf59e75fc7e92eb9ad4489", "lastupdatetimestamp": 1694431179461, "relType": "resultOrganization", "source": "50|arXiv_______::00480a6b5ae2cd88dbd7781e1ec341aa", "collectedfrom": [], "validated": false, "properties": []}
|
||||
{"subRelType": "affiliation", "relClass": "isProducedBy", "dataInfo": {"provenanceaction": {"classid": "iis", "classname": "Inferred by OpenAIRE", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": true, "inferenceprovenance": "iis::document_affiliations", "invisible": false, "trust": "0.8998"}, "target": "40|openorgs____::293e60068d63e17e20f2de54944cae01", "lastupdatetimestamp": 1694431219918, "relType": "resultOrganization", "source": "50|arXiv_______::00766c4ea96b8b797528b511b793ba62", "collectedfrom": [], "validated": false, "properties": []}
|
File diff suppressed because one or more lines are too long
Loading…
Reference in New Issue