This commit is contained in:
parent
cc86f24372
commit
0a2c00ce29
|
@ -74,9 +74,6 @@ public class SparkDumpResult implements Serializable {
|
|||
final String resultClassName = parser.get("resultTableName");
|
||||
log.info("resultTableName: {}", resultClassName);
|
||||
|
||||
final String masterDuplicatePath = parser.get("masterDuplicatePath");
|
||||
log.info("masterDuplicatePath: {}", masterDuplicatePath);
|
||||
|
||||
Optional<String> pathString = Optional.ofNullable(parser.get("pathMap"));
|
||||
HashMap<String, String> pathMap = null;
|
||||
if (pathString.isPresent()) {
|
||||
|
@ -97,13 +94,13 @@ public class SparkDumpResult implements Serializable {
|
|||
|
||||
run(
|
||||
isSparkSessionManaged, inputPath, outputPath, pathMap, selectionConstraints, inputClazz,
|
||||
resultType, masterDuplicatePath);
|
||||
resultType);
|
||||
|
||||
}
|
||||
|
||||
private static void run(Boolean isSparkSessionManaged, String inputPath, String outputPath,
|
||||
HashMap<String, String> pathMap, SelectionConstraints selectionConstraints,
|
||||
Class<? extends eu.dnetlib.dhp.schema.oaf.Result> inputClazz, String resultType, String masterDuplicatePath) {
|
||||
Class<? extends eu.dnetlib.dhp.schema.oaf.Result> inputClazz, String resultType) {
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
HashMap<String, String> finalPathMap = pathMap;
|
||||
|
@ -116,7 +113,7 @@ public class SparkDumpResult implements Serializable {
|
|||
Utils.removeOutputDir(spark, outputPath + "/dump/" + resultType);
|
||||
resultDump(
|
||||
spark, inputPath, outputPath, inputClazz, finalPathMap,
|
||||
finalSelectionConstraints, resultType, masterDuplicatePath);
|
||||
finalSelectionConstraints, resultType);
|
||||
});
|
||||
|
||||
}
|
||||
|
@ -128,17 +125,13 @@ public class SparkDumpResult implements Serializable {
|
|||
Class<I> inputClazz,
|
||||
Map<String, String> pathMap,
|
||||
SelectionConstraints selectionConstraints,
|
||||
String resultType,
|
||||
String masterDuplicatePath) {
|
||||
String resultType) {
|
||||
|
||||
List<MasterDuplicate> masterDuplicateList = Utils
|
||||
.readPath(spark, masterDuplicatePath, MasterDuplicate.class)
|
||||
.collectAsList();
|
||||
Utils
|
||||
.readPath(spark, inputPath, inputClazz)
|
||||
.map(
|
||||
(MapFunction<I, I>) value -> filterResult(
|
||||
value, pathMap, selectionConstraints, inputClazz, masterDuplicateList, resultType),
|
||||
value, pathMap, selectionConstraints, inputClazz, resultType),
|
||||
Encoders.bean(inputClazz))
|
||||
.filter(Objects::nonNull)
|
||||
.write()
|
||||
|
@ -163,7 +156,7 @@ public class SparkDumpResult implements Serializable {
|
|||
}
|
||||
|
||||
private static <I extends eu.dnetlib.dhp.schema.oaf.Result> I filterResult(I value, Map<String, String> pathMap,
|
||||
SelectionConstraints selectionConstraints, Class<I> inputClazz, List<MasterDuplicate> masterDuplicateList,
|
||||
SelectionConstraints selectionConstraints, Class<I> inputClazz,
|
||||
String resultType) {
|
||||
Optional<DataInfo> odInfo = Optional.ofNullable(value.getDataInfo());
|
||||
|
||||
|
@ -195,14 +188,7 @@ public class SparkDumpResult implements Serializable {
|
|||
return null;
|
||||
}
|
||||
}
|
||||
if (Optional.ofNullable(value.getCollectedfrom()).isPresent())
|
||||
value.getCollectedfrom().forEach(cf -> update(cf, masterDuplicateList));
|
||||
if (Optional.ofNullable(value.getInstance()).isPresent()) {
|
||||
value.getInstance().forEach(i -> {
|
||||
update(i.getCollectedfrom(), masterDuplicateList);
|
||||
update(i.getHostedby(), masterDuplicateList);
|
||||
});
|
||||
}
|
||||
|
||||
return value;
|
||||
}
|
||||
|
||||
|
@ -210,13 +196,4 @@ public class SparkDumpResult implements Serializable {
|
|||
return (classid.equals(resultType) || (classid.equals("other") && resultType.equals("otherresearchproduct")));
|
||||
}
|
||||
|
||||
private static void update(KeyValue kv, List<MasterDuplicate> masterDuplicateList) {
|
||||
for (MasterDuplicate md : masterDuplicateList) {
|
||||
if (md.getDuplicate().equals(kv.getKey())) {
|
||||
kv.setKey(md.getMaster());
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
#GENERAL PROPERTIES FOR DUMPS
|
||||
pathMap={"author":"$['author'][*]['fullname']","title":"$['title'][*]['value']","orcid":"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid')]['value']","contributor":"$['contributor'][*]['value']","description":"$['description'][*]['value']","dateofacceptance":"$['dateofacceptance']['value']", "context": "$['context'][*]['id']","url":"$['instance'][*]['url'][*]", "country": "$['country'][*]['classid']"}"
|
||||
selectionCriteria={"criteria":[{"constraint":[{"verb":"equals","field":"context","value":"enermaps"}]}]}
|
||||
sourcePath=/tmp/prod_provision/graph/18_graph_blacklisted
|
||||
#sourcePath=/tmp/miriam/graphCopy
|
||||
outputPath=/tmp/miriam/graph_dumps/enermaps_subset
|
||||
dumpType=subset
|
||||
depositionType=new
|
||||
###accessToken for the openaire sandbox following
|
||||
accessToken=OzzOsyucEIHxCEfhlpsMo3myEiwpCza3trCRL7ddfGTAK9xXkIP2MbXd6Vg4
|
||||
connectionUrl=https://sandbox.zenodo.org/api/deposit/depositions
|
||||
##accessToken=GxqutB1JnEmdvBafQI2cCjtUvoOs0novDuie3hxCEQUJcErHVMhkJjawIqhb
|
||||
##connectionUrl=https://zenodo.org/api/deposit/depositions
|
||||
resultAggregation=false
|
||||
upload=false
|
||||
onlyUpload=false
|
||||
removeSet="merges;isMergedIn"
|
||||
metadata=""
|
||||
singleDeposition=false
|
||||
communityId= none
|
||||
conceptRecordId=7490191
|
||||
depositionId=none
|
||||
postgresURL=jdbc:postgresql://postgresql.services.openaire.eu:5432/dnet_openaireplus
|
||||
postgresUser=dnet
|
||||
postgresPassword=dnetPwd
|
||||
##values community or complete. If it is community then the model of the dump will be the one of the communities.
|
||||
##If it is the complete the model of the dump will be the one of the complete graph
|
||||
mapAs=complete
|
|
@ -63,11 +63,7 @@
|
|||
<value>none</value>
|
||||
<description>the depositionId of a deposition open that has to be added content</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>organizationCommunityMap</name>
|
||||
<value>none</value>
|
||||
<description>the organization community map</description>
|
||||
</property>
|
||||
<!---->
|
||||
<property>
|
||||
<name>hiveDbName</name>
|
||||
<description>the target hive database name</description>
|
||||
|
@ -200,18 +196,6 @@
|
|||
<name>sourcePath</name>
|
||||
<value>${sourcePath}</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>organizationCommunityMap</name>
|
||||
<value>${organizationCommunityMap}</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>pathMap</name>
|
||||
<value>${pathMap}</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>selectionCriteria</name>
|
||||
<value>${selectionCriteria}</value>
|
||||
</property>
|
||||
</configuration>
|
||||
</sub-workflow>
|
||||
<ok to="make_archive" />
|
||||
|
@ -316,17 +300,6 @@
|
|||
<error to="Kill" />
|
||||
</action>
|
||||
|
||||
<!-- <action name="make_archive">-->
|
||||
<!-- <java>-->
|
||||
<!-- <main-class>eu.dnetlib.dhp.oa.graph.dump.MakeTar</main-class>-->
|
||||
<!-- <arg>--hdfsPath</arg><arg>${outputPath}</arg>-->
|
||||
<!-- <arg>--nameNode</arg><arg>${nameNode}</arg>-->
|
||||
<!-- <arg>--sourcePath</arg><arg>${workingDir}/tar</arg>-->
|
||||
<!-- </java>-->
|
||||
<!-- <ok to="should_upload"/>-->
|
||||
<!-- <error to="Kill"/>-->
|
||||
<!-- </action>-->
|
||||
|
||||
<action name="make_archive">
|
||||
<java>
|
||||
<main-class>eu.dnetlib.dhp.oa.graph.dump.MakeTar</main-class>
|
||||
|
|
File diff suppressed because one or more lines are too long
|
@ -8,10 +8,6 @@
|
|||
<name>outputPath</name>
|
||||
<description>the output path</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>organizationCommunityMap</name>
|
||||
<description>the organization community map</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>pathMap</name>
|
||||
<description>the path where to find the elements involved in the constraints within the json</description>
|
||||
|
@ -92,7 +88,7 @@
|
|||
</configuration>
|
||||
</global>
|
||||
|
||||
<start to="get_master_duplicate" />
|
||||
<start to="fork_select_and_dump" />
|
||||
|
||||
<action name="get_master_duplicate">
|
||||
<java>
|
||||
|
@ -138,7 +134,6 @@
|
|||
<arg>--pathMap</arg><arg>${pathMap}</arg>
|
||||
<arg>--selectionCriteria</arg><arg>${selectionCriteria}</arg>
|
||||
<arg>--resultType</arg><arg>publication</arg>
|
||||
<arg>--masterDuplicatePath</arg><arg>${workingDir}/masterduplicate</arg>
|
||||
</spark>
|
||||
<ok to="join_dump"/>
|
||||
<error to="Kill"/>
|
||||
|
@ -168,7 +163,6 @@
|
|||
<arg>--pathMap</arg><arg>${pathMap}</arg>
|
||||
<arg>--selectionCriteria</arg><arg>${selectionCriteria}</arg>
|
||||
<arg>--resultType</arg><arg>dataset</arg>
|
||||
<arg>--masterDuplicatePath</arg><arg>${workingDir}/masterduplicate</arg>
|
||||
</spark>
|
||||
<ok to="join_dump"/>
|
||||
<error to="Kill"/>
|
||||
|
@ -197,7 +191,6 @@
|
|||
<arg>--pathMap</arg><arg>${pathMap}</arg>
|
||||
<arg>--selectionCriteria</arg><arg>${selectionCriteria}</arg>
|
||||
<arg>--resultType</arg><arg>otherresearchproduct</arg>
|
||||
<arg>--masterDuplicatePath</arg><arg>${workingDir}/masterduplicate</arg>
|
||||
</spark>
|
||||
<ok to="join_dump"/>
|
||||
<error to="Kill"/>
|
||||
|
@ -226,7 +219,6 @@
|
|||
<arg>--pathMap</arg><arg>${pathMap}</arg>
|
||||
<arg>--selectionCriteria</arg><arg>${selectionCriteria}</arg>
|
||||
<arg>--resultType</arg><arg>software</arg>
|
||||
<arg>--masterDuplicatePath</arg><arg>${workingDir}/masterduplicate</arg>
|
||||
</spark>
|
||||
<ok to="join_dump"/>
|
||||
<error to="Kill"/>
|
||||
|
|
Loading…
Reference in New Issue