Merge branch 'beta' of https://code-repo.d4science.org/D-Net/dnet-hadoop into beta
This commit is contained in:
commit
d05ca53a14
|
@ -44,7 +44,7 @@ public class PropagationConstant {
|
|||
|
||||
public final static String NULL = "NULL";
|
||||
|
||||
public static final String INSTITUTIONAL_REPO_TYPE = "pubsrepository::institutional";
|
||||
public static final String INSTITUTIONAL_REPO_TYPE = "institutional";
|
||||
|
||||
public static final String PROPAGATION_DATA_INFO_TYPE = "propagation";
|
||||
|
||||
|
@ -231,9 +231,9 @@ public class PropagationConstant {
|
|||
|
||||
if (HdfsSupport.exists(inputPath, spark.sparkContext().hadoopConfiguration())) {
|
||||
return spark
|
||||
.read()
|
||||
.textFile(inputPath)
|
||||
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
|
||||
.read()
|
||||
.textFile(inputPath)
|
||||
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
|
||||
} else {
|
||||
return spark.emptyDataset(Encoders.bean(clazz));
|
||||
}
|
||||
|
|
|
@ -1,8 +1,10 @@
|
|||
|
||||
package eu.dnetlib.dhp.bulktag.community;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.dom4j.DocumentException;
|
||||
import org.xml.sax.SAXException;
|
||||
|
||||
|
@ -13,74 +15,17 @@ import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
|||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||
|
||||
public class QueryInformationSystem {
|
||||
private static final String XQUERY = "for $x in collection('/db/DRIVER/ContextDSResources/ContextDSResourceType') "
|
||||
+ " let $subj := $x//CONFIGURATION/context/param[./@name='subject']/text() "
|
||||
+ " let $datasources := $x//CONFIGURATION/context/category[./@id=concat($x//CONFIGURATION/context/@id,'::contentproviders')]/concept "
|
||||
+ " let $organizations := $x//CONFIGURATION/context/category[./@id=concat($x//CONFIGURATION/context/@id,'::resultorganizations')]/concept "
|
||||
+ " let $communities := $x//CONFIGURATION/context/category[./@id=concat($x//CONFIGURATION/context/@id,'::zenodocommunities')]/concept "
|
||||
+ " let $fos := $x//CONFIGURATION/context/param[./@name='fos']/text() "
|
||||
+ " let $sdg := $x//CONFIGURATION/context/param[./@name='sdg']/text() "
|
||||
+
|
||||
"let $zenodo := $x//param[./@name='zenodoCommunity']/text() "
|
||||
+ " where $x//CONFIGURATION/context[./@type='community' or ./@type='ri'] and $x//context/param[./@name = 'status']/text() != 'hidden' "
|
||||
+ " return "
|
||||
+ " <community> "
|
||||
+ " { $x//CONFIGURATION/context/@id} "
|
||||
+ " <advancedConstraints>" +
|
||||
"{$x//CONFIGURATION/context/param[./@name='advancedConstraints']/text() }" +
|
||||
"</advancedConstraints>"
|
||||
+ " <subjects> "
|
||||
+ " {for $y in tokenize($subj,',') "
|
||||
+ " return "
|
||||
+ " <subject>{$y}</subject>} "
|
||||
+ " {for $y in tokenize($fos,',') "
|
||||
+ " return "
|
||||
+ " <subject>{$y}</subject>} "
|
||||
+ " {for $y in tokenize($sdg,',') "
|
||||
+ " return "
|
||||
+ " <subject>{$y}</subject>} "
|
||||
+ " </subjects> "
|
||||
+ " <datasources> "
|
||||
+ " {for $d in $datasources "
|
||||
+ " where $d/param[./@name='enabled']/text()='true' "
|
||||
+ " return "
|
||||
+ " <datasource> "
|
||||
+ " <openaireId> "
|
||||
+ " {$d//param[./@name='openaireId']/text()} "
|
||||
+ " </openaireId> "
|
||||
+ " <selcriteria> "
|
||||
+ " {$d/param[./@name='selcriteria']/text()} "
|
||||
+ " </selcriteria> "
|
||||
+ " </datasource> } "
|
||||
+ " </datasources> " +
|
||||
" <zenodocommunities> " +
|
||||
"{for $zc in $zenodo " +
|
||||
"return " +
|
||||
"<zenodocommunity> " +
|
||||
"<zenodoid> " +
|
||||
"{$zc} " +
|
||||
"</zenodoid> " +
|
||||
"</zenodocommunity>}"
|
||||
+ " {for $zc in $communities "
|
||||
+ " return "
|
||||
+ " <zenodocommunity> "
|
||||
+ " <zenodoid> "
|
||||
+ " {$zc/param[./@name='zenodoid']/text()} "
|
||||
+ " </zenodoid> "
|
||||
+ " <selcriteria> "
|
||||
+ " {$zc/param[./@name='selcriteria']/text()} "
|
||||
+ " </selcriteria> "
|
||||
+ " </zenodocommunity>} "
|
||||
+ " </zenodocommunities> "
|
||||
+ "<advancedConstraint>"
|
||||
+ "{$x//CONFIGURATION/context/param[./@name='advancedConstraint']/text()} "
|
||||
+ "</advancedConstraint>"
|
||||
+ " </community>";
|
||||
|
||||
public static CommunityConfiguration getCommunityConfiguration(final String isLookupUrl)
|
||||
throws ISLookUpException, DocumentException, SAXException {
|
||||
throws ISLookUpException, DocumentException, SAXException, IOException {
|
||||
ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl);
|
||||
final List<String> res = isLookUp.quickSearchProfile(XQUERY);
|
||||
final List<String> res = isLookUp
|
||||
.quickSearchProfile(
|
||||
IOUtils
|
||||
.toString(
|
||||
QueryInformationSystem.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/bulktag/query.xq")));
|
||||
|
||||
final String xmlConf = "<communities>" + Joiner.on(" ").join(res) + "</communities>";
|
||||
|
||||
|
|
|
@ -102,7 +102,7 @@ public class PrepareResultInstRepoAssociation {
|
|||
String query = "SELECT source datasourceId, target organizationId "
|
||||
+ "FROM ( SELECT id "
|
||||
+ "FROM datasource "
|
||||
+ "WHERE datasourcetype.classid = '"
|
||||
+ "WHERE lower(jurisdiction.classid) = '"
|
||||
+ INSTITUTIONAL_REPO_TYPE
|
||||
+ "' "
|
||||
+ "AND datainfo.deletedbyinference = false " + blacklisted + " ) d "
|
||||
|
|
|
@ -38,13 +38,13 @@
|
|||
{
|
||||
"paramName": "test",
|
||||
"paramLongName": "isTest",
|
||||
"paramDescription": "true if the spark session is managed, false otherwise",
|
||||
"paramDescription": "Parameter intended for testing purposes only. True if the reun is relatesd to a test and so the taggingConf parameter should be loaded",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "tg",
|
||||
"paramLongName": "taggingConf",
|
||||
"paramDescription": "true if the spark session is managed, false otherwise",
|
||||
"paramDescription": "this parameter is intended for testing purposes only. It is a possible tagging configuration obtained via the XQUERY. Intended to be removed",
|
||||
"paramRequired": false
|
||||
}
|
||||
|
||||
|
|
|
@ -219,7 +219,7 @@
|
|||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<join name="wait" to="eosc_tag"/>
|
||||
<join name="wait" to="End"/>
|
||||
|
||||
<action name="eosc_tag">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
|
|
|
@ -0,0 +1,59 @@
|
|||
for $x in collection('/db/DRIVER/ContextDSResources/ContextDSResourceType')
|
||||
let $subj := $x//CONFIGURATION/context/param[./@name='subject']/text()
|
||||
let $datasources := $x//CONFIGURATION/context/category[./@id=concat($x//CONFIGURATION/context/@id,'::contentproviders')]/concept
|
||||
let $organizations := $x//CONFIGURATION/context/category[./@id=concat($x//CONFIGURATION/context/@id,'::resultorganizations')]/concept
|
||||
let $communities := $x//CONFIGURATION/context/category[./@id=concat($x//CONFIGURATION/context/@id,'::zenodocommunities')]/concept
|
||||
let $fos := $x//CONFIGURATION/context/param[./@name='fos']/text()
|
||||
let $sdg := $x//CONFIGURATION/context/param[./@name='sdg']/text()
|
||||
let $zenodo := $x//param[./@name='zenodoCommunity']/text()
|
||||
where $x//CONFIGURATION/context[./@type='community' or ./@type='ri'] and $x//context/param[./@name = 'status']/text() != 'hidden'
|
||||
return
|
||||
<community>
|
||||
{ $x//CONFIGURATION/context/@id}
|
||||
<advancedConstraints>
|
||||
{$x//CONFIGURATION/context/param[./@name='advancedConstraints']/text() }
|
||||
</advancedConstraints>
|
||||
<subjects>
|
||||
{for $y in tokenize($subj,',')
|
||||
return
|
||||
<subject>{$y}</subject>}
|
||||
{for $y in tokenize($fos,',')
|
||||
return
|
||||
<subject>{$y}</subject>}
|
||||
{for $y in tokenize($sdg,',')
|
||||
return
|
||||
<subject>{$y}</subject>}
|
||||
</subjects>
|
||||
<datasources>
|
||||
{for $d in $datasources
|
||||
where $d/param[./@name='enabled']/text()='true'
|
||||
return
|
||||
<datasource>
|
||||
<openaireId>
|
||||
{$d//param[./@name='openaireId']/text()}
|
||||
</openaireId>
|
||||
<selcriteria>
|
||||
{$d/param[./@name='selcriteria']/text()}
|
||||
</selcriteria>
|
||||
</datasource> }
|
||||
</datasources>
|
||||
<zenodocommunities>
|
||||
{for $zc in $zenodo
|
||||
return
|
||||
<zenodocommunity>
|
||||
<zenodoid>
|
||||
{$zc}
|
||||
</zenodoid>
|
||||
</zenodocommunity>}
|
||||
{for $zc in $communities
|
||||
return
|
||||
<zenodocommunity>
|
||||
<zenodoid>
|
||||
{$zc/param[./@name='zenodoid']/text()}
|
||||
</zenodoid>
|
||||
<selcriteria>
|
||||
{$zc/param[./@name='selcriteria']/text()}
|
||||
</selcriteria>
|
||||
</zenodocommunity>}
|
||||
</zenodocommunities>
|
||||
</community>
|
|
@ -5,6 +5,7 @@ import java.io.IOException;
|
|||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
|
||||
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
|
@ -147,4 +148,44 @@ public class CleanCountryTest {
|
|||
.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDatasetClean() throws Exception {
|
||||
final String sourcePath = getClass()
|
||||
.getResource("/eu/dnetlib/dhp/oa/graph/clean/dataset_clean_country.json")
|
||||
.getPath();
|
||||
|
||||
spark
|
||||
.read()
|
||||
.textFile(sourcePath)
|
||||
.map(
|
||||
(MapFunction<String, Dataset>) r -> OBJECT_MAPPER.readValue(r, Dataset.class),
|
||||
Encoders.bean(Dataset.class))
|
||||
.write()
|
||||
.json(workingDir.toString() + "/dataset");
|
||||
|
||||
CleanCountrySparkJob.main(new String[] {
|
||||
"--isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"--inputPath", workingDir.toString() + "/dataset",
|
||||
"-graphTableClassName", Dataset.class.getCanonicalName(),
|
||||
"-workingDir", workingDir.toString() + "/working",
|
||||
"-country", "NL",
|
||||
"-verifyParam", "10.17632",
|
||||
"-collectedfrom", "NARCIS",
|
||||
"-hostedBy", getClass()
|
||||
.getResource("/eu/dnetlib/dhp/oa/graph/clean/hostedBy")
|
||||
.getPath()
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
JavaRDD<Dataset> tmp = sc
|
||||
.textFile(workingDir.toString() + "/dataset")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
|
||||
|
||||
Assertions.assertEquals(1, tmp.count());
|
||||
|
||||
Assertions.assertEquals(0, tmp.first().getCountry().size());
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
File diff suppressed because one or more lines are too long
Loading…
Reference in New Issue