Merge pull request 'H2020Classification fix and possibility to add datasources in blacklist for propagation of result to organization' (#108) from miriam.baglioni/dnet-hadoop:master into master

Reviewed-on: D-Net/dnet-hadoop#108

The changes look ok, but please drop a comment to describe how the parameters should be changed from the workflow caller for both workflows
* H2020Classification
* propagation of result to organization
This commit is contained in:
Claudio Atzori 2021-05-20 15:25:05 +02:00
commit 2e70aa43f0
7 changed files with 44 additions and 10 deletions

View File

@ -22,7 +22,7 @@ import org.apache.poi.xssf.usermodel.XSSFWorkbook;
*/
public class EXCELParser {
public <R> List<R> parse(InputStream file, String classForName)
public <R> List<R> parse(InputStream file, String classForName, String sheetName)
throws ClassNotFoundException, IOException, IllegalAccessException, InstantiationException,
InvalidFormatException {
@ -30,7 +30,11 @@ public class EXCELParser {
OPCPackage pkg = OPCPackage.open(file);
XSSFWorkbook wb = new XSSFWorkbook(pkg);
XSSFSheet sheet = wb.getSheet("cordisref-H2020topics");
XSSFSheet sheet = wb.getSheet(sheetName);
if(sheetName == null){
throw new RuntimeException("Sheet name " + sheetName + " not present in current file");
}
List<R> ret = new ArrayList<>();
@ -49,7 +53,7 @@ public class EXCELParser {
headers.add(dataFormatter.formatCellValue(cell));
}
} else {
Class<?> clazz = Class.forName("eu.dnetlib.dhp.actionmanager.project.utils.EXCELTopic");
Class<?> clazz = Class.forName(classForName);
final Object cc = clazz.newInstance();
for (int i = 0; i < headers.size(); i++) {

View File

@ -42,19 +42,20 @@ public class ReadExcel implements Closeable {
final String hdfsPath = parser.get("hdfsPath");
final String hdfsNameNode = parser.get("hdfsNameNode");
final String classForName = parser.get("classForName");
final String sheetName = parser.get("sheetName");
try (final ReadExcel readExcel = new ReadExcel(hdfsPath, hdfsNameNode, fileURL)) {
log.info("Getting Excel file...");
readExcel.execute(classForName);
readExcel.execute(classForName, sheetName);
}
}
public void execute(final String classForName) throws Exception {
public void execute(final String classForName, final String sheetName) throws Exception {
EXCELParser excelParser = new EXCELParser();
excelParser
.parse(excelFile, classForName)
.parse(excelFile, classForName, sheetName)
.stream()
.forEach(p -> write(p));

View File

@ -65,6 +65,7 @@
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
<arg>--fileURL</arg><arg>${topicFileURL}</arg>
<arg>--hdfsPath</arg><arg>${workingDir}/topic</arg>
<arg>--sheetName</arg><arg>${sheetName}</arg>
<arg>--classForName</arg><arg>eu.dnetlib.dhp.actionmanager.project.utils.EXCELTopic</arg>
</java>
<ok to="read_projects"/>

View File

@ -23,6 +23,11 @@
"paramLongName" : "classForName",
"paramDescription" : "the name of the class to deserialize the csv to",
"paramRequired" : true
}, {
"paramName": "sn",
"paramLongName" : "sheetName",
"paramDescription" : "the name of the sheet in case the file is excel",
"paramRequired" : false
}

View File

@ -4,6 +4,11 @@ package eu.dnetlib.dhp.resulttoorganizationfrominstrepo;
import static eu.dnetlib.dhp.PropagationConstant.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
@ -51,6 +56,11 @@ public class PrepareResultInstRepoAssociation {
final String alreadyLinkedPath = parser.get("alreadyLinkedPath");
log.info("alreadyLinkedPath {}: ", alreadyLinkedPath);
List<String> blacklist = Optional
.ofNullable(parser.get("blacklist"))
.map(v -> Arrays.asList(v.split(";")))
.orElse(new ArrayList<>());
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
@ -61,7 +71,7 @@ public class PrepareResultInstRepoAssociation {
readNeededResources(spark, inputPath);
removeOutputDir(spark, datasourceOrganizationPath);
prepareDatasourceOrganization(spark, datasourceOrganizationPath);
prepareDatasourceOrganization(spark, datasourceOrganizationPath, blacklist);
removeOutputDir(spark, alreadyLinkedPath);
prepareAlreadyLinkedAssociation(spark, alreadyLinkedPath);
@ -80,7 +90,14 @@ public class PrepareResultInstRepoAssociation {
}
private static void prepareDatasourceOrganization(
SparkSession spark, String datasourceOrganizationPath) {
SparkSession spark, String datasourceOrganizationPath, List<String> blacklist) {
String blacklisted = "";
if (blacklist.size() > 0) {
blacklisted = " AND d.id != '" + blacklist.get(0) + "'";
for (int i = 1; i < blacklist.size(); i++) {
blacklisted += " AND d.id != '" + blacklist.get(i) + "'";
}
}
String query = "SELECT source datasourceId, target organizationId "
+ "FROM ( SELECT id "
@ -88,7 +105,7 @@ public class PrepareResultInstRepoAssociation {
+ "WHERE datasourcetype.classid = '"
+ INSTITUTIONAL_REPO_TYPE
+ "' "
+ "AND datainfo.deletedbyinference = false ) d "
+ "AND datainfo.deletedbyinference = false " + blacklisted + " ) d "
+ "JOIN ( SELECT source, target "
+ "FROM relation "
+ "WHERE lower(relclass) = '"

View File

@ -28,5 +28,10 @@
"paramLongName": "isSparkSessionManaged",
"paramDescription": "the path where prepared info have been stored",
"paramRequired": false
}
},{
"paramName": "bl",
"paramLongName": "blacklist",
"paramDescription": "institutional repositories that should not be considered for the propagation",
"paramRequired": false
}
]

View File

@ -141,6 +141,7 @@
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--datasourceOrganizationPath</arg><arg>${workingDir}/preparedInfo/datasourceOrganization</arg>
<arg>--alreadyLinkedPath</arg><arg>${workingDir}/preparedInfo/alreadyLinked</arg>
<arg>--blacklist</arg><arg>${blacklist}</arg>
</spark>
<ok to="fork_join_apply_resulttoorganization_propagation"/>
<error to="Kill"/>