1
0
Fork 0

using common constants from ModelConstants

This commit is contained in:
Claudio Atzori 2021-05-04 11:51:52 +02:00
parent c00be646f3
commit ba86835951
1 changed files with 35 additions and 27 deletions

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
@ -43,6 +44,7 @@ import eu.dnetlib.dhp.actionmanager.ror.model.RorOrganization;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.action.AtomicAction; import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Field; import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.KeyValue; import eu.dnetlib.dhp.schema.oaf.KeyValue;
@ -54,8 +56,6 @@ import scala.Tuple2;
public class GenerateRorActionSetJob { public class GenerateRorActionSetJob {
private static final String COUNTRIES_VOC = "dnet:countries";
private static final Logger log = LoggerFactory.getLogger(GenerateRorActionSetJob.class); private static final Logger log = LoggerFactory.getLogger(GenerateRorActionSetJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@ -66,7 +66,8 @@ public class GenerateRorActionSetJob {
private static final DataInfo ROR_DATA_INFO = dataInfo( private static final DataInfo ROR_DATA_INFO = dataInfo(
false, "", false, false, ENTITYREGISTRY_PROVENANCE_ACTION, "0.92"); false, "", false, false, ENTITYREGISTRY_PROVENANCE_ACTION, "0.92");
private static final Qualifier ROR_PID_TYPE = qualifier("ROR", "ROR", "dnet:pid_types", "dnet:pid_types"); private static final Qualifier ROR_PID_TYPE = qualifier(
"ROR", "ROR", ModelConstants.DNET_PID_TYPES, ModelConstants.DNET_PID_TYPES);
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
@ -109,7 +110,9 @@ public class GenerateRorActionSetJob {
final String outputPath) throws Exception { final String outputPath) throws Exception {
readInputPath(spark, inputPath) readInputPath(spark, inputPath)
.map(GenerateRorActionSetJob::convertRorOrg, Encoders.bean(Organization.class)) .map(
(MapFunction<RorOrganization, Organization>) GenerateRorActionSetJob::convertRorOrg,
Encoders.bean(Organization.class))
.toJavaRDD() .toJavaRDD()
.map(o -> new AtomicAction<>(Organization.class, o)) .map(o -> new AtomicAction<>(Organization.class, o))
.mapToPair( .mapToPair(
@ -151,8 +154,9 @@ public class GenerateRorActionSetJob {
o o
.setCountry( .setCountry(
qualifier( qualifier(
r.getCountry().getCountryCode(), r.getCountry().getCountryName(), COUNTRIES_VOC, r.getCountry().getCountryCode(), r.getCountry().getCountryName(),
COUNTRIES_VOC)); ModelConstants.DNET_COUNTRY_TYPE,
ModelConstants.DNET_COUNTRY_TYPE));
} else { } else {
o.setCountry(null); o.setCountry(null);
} }
@ -171,30 +175,34 @@ public class GenerateRorActionSetJob {
final Object all = e.getValue().getAll(); final Object all = e.getValue().getAll();
if (all == null) { if (all == null) {
// skip // skip
} else if (all instanceof String) { } else {
final Qualifier qualifier = qualifier(
type, type,
ModelConstants.DNET_PID_TYPES, ModelConstants.DNET_PID_TYPES);
if (all instanceof String) {
pids pids
.add( .add(
structuredProperty( structuredProperty(
all.toString(), qualifier(type, type, "dnet:pid_types", "dnet:pid_types"), ROR_DATA_INFO)); all.toString(), qualifier, ROR_DATA_INFO));
} else if (all instanceof Collection) { } else if (all instanceof Collection) {
for (final Object pid : (Collection<?>) all) { for (final Object pid : (Collection<?>) all) {
pids pids
.add( .add(
structuredProperty( structuredProperty(
pid.toString(), qualifier(type, type, "dnet:pid_types", "dnet:pid_types"), pid.toString(), qualifier, ROR_DATA_INFO));
ROR_DATA_INFO));
} }
} else if (all instanceof String[]) { } else if (all instanceof String[]) {
for (final String pid : (String[]) all) { for (final String pid : (String[]) all) {
pids pids
.add( .add(
structuredProperty( structuredProperty(
pid, qualifier(type, type, "dnet:pid_types", "dnet:pid_types"), ROR_DATA_INFO)); pid, qualifier, ROR_DATA_INFO));
} }
} else { } else {
log.warn("Invalid type for pid list: " + all.getClass()); log.warn("Invalid type for pid list: " + all.getClass());
} }
} }
}
return pids; return pids;
} }