Merge branch 'master' of code-repo.d4science.org:D-Net/dnet-hadoop

This commit is contained in:
Michele Artini 2020-04-23 16:19:16 +02:00
commit b164d96874
10 changed files with 92 additions and 57 deletions

View File

@ -25,6 +25,7 @@ public class DedupRecordFactory {
public static <T extends OafEntity> Dataset<T> createDedupRecord(
final SparkSession spark,
final DataInfo dataInfo,
final String mergeRelsInputPath,
final String entitiesInputPath,
final Class<T> clazz) {
@ -67,41 +68,39 @@ public class DedupRecordFactory {
Encoders.STRING())
.mapGroups(
(MapGroupsFunction<String, Tuple2<String, T>, T>)
(key, values) -> entityMerger(key, values, ts, clazz),
(key, values) -> entityMerger(key, values, ts, dataInfo),
Encoders.bean(clazz));
}
private static <T extends OafEntity> T entityMerger(
String id, Iterator<Tuple2<String, T>> entities, long ts, Class<T> clazz) {
try {
T entity = clazz.newInstance();
entity.setId(id);
entity.setDataInfo(new DataInfo());
entity.getDataInfo().setTrust("0.9");
entity.setLastupdatetimestamp(ts);
String id, Iterator<Tuple2<String, T>> entities, long ts, DataInfo dataInfo) {
final Collection<String> dates = Lists.newArrayList();
entities.forEachRemaining(
t -> {
T duplicate = t._2();
entity.mergeFrom(duplicate);
if (ModelSupport.isSubClass(duplicate, Result.class)) {
Result r1 = (Result) duplicate;
Result er = (Result) entity;
er.setAuthor(DedupUtility.mergeAuthor(er.getAuthor(), r1.getAuthor()));
T entity = entities.next()._2();
if (er.getDateofacceptance() != null) {
dates.add(r1.getDateofacceptance().getValue());
}
final Collection<String> dates = Lists.newArrayList();
entities.forEachRemaining(
t -> {
T duplicate = t._2();
entity.mergeFrom(duplicate);
if (ModelSupport.isSubClass(duplicate, Result.class)) {
Result r1 = (Result) duplicate;
Result er = (Result) entity;
er.setAuthor(DedupUtility.mergeAuthor(er.getAuthor(), r1.getAuthor()));
if (r1.getDateofacceptance() != null) {
dates.add(r1.getDateofacceptance().getValue());
}
});
}
});
if (ModelSupport.isSubClass(entity, Result.class)) {
((Result) entity).setDateofacceptance(DatePicker.pick(dates));
}
return entity;
} catch (IllegalAccessException | InstantiationException e) {
throw new RuntimeException(e);
if (ModelSupport.isSubClass(entity, Result.class)) {
((Result) entity).setDateofacceptance(DatePicker.pick(dates));
}
entity.setId(id);
entity.setLastupdatetimestamp(ts);
entity.setDataInfo(dataInfo);
return entity;
}
}

View File

@ -3,7 +3,9 @@ package eu.dnetlib.dhp.oa.dedup;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
@ -21,6 +23,10 @@ public class SparkCreateDedupRecord extends AbstractSparkAction {
private static final Logger log = LoggerFactory.getLogger(SparkCreateDedupRecord.class);
public static final String ROOT_TRUST = "0.8";
public static final String PROVENANCE_ACTION_CLASS = "sysimport:dedup";
public static final String PROVENANCE_ACTIONS = "dnet:provenanceActions";
public SparkCreateDedupRecord(ArgumentApplicationParser parser, SparkSession spark) {
super(parser, spark);
}
@ -67,13 +73,30 @@ public class SparkCreateDedupRecord extends AbstractSparkAction {
DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity);
final String entityPath = DedupUtility.createEntityPath(graphBasePath, subEntity);
Class<OafEntity> clazz = ModelSupport.entityTypes.get(EntityType.valueOf(subEntity));
DedupRecordFactory.createDedupRecord(spark, mergeRelPath, entityPath, clazz)
final Class<OafEntity> clazz =
ModelSupport.entityTypes.get(EntityType.valueOf(subEntity));
final DataInfo dataInfo = getDataInfo(dedupConf);
DedupRecordFactory.createDedupRecord(spark, dataInfo, mergeRelPath, entityPath, clazz)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
}
}
private static DataInfo getDataInfo(DedupConfig dedupConf) {
DataInfo info = new DataInfo();
info.setDeletedbyinference(false);
info.setInferred(true);
info.setInvisible(false);
info.setTrust(ROOT_TRUST);
info.setInferenceprovenance(dedupConf.getWf().getConfigurationId());
Qualifier provenance = new Qualifier();
provenance.setClassid(PROVENANCE_ACTION_CLASS);
provenance.setClassname(PROVENANCE_ACTION_CLASS);
provenance.setSchemeid(PROVENANCE_ACTIONS);
provenance.setSchemename(PROVENANCE_ACTIONS);
info.setProvenanceaction(provenance);
return info;
}
}

View File

@ -25,7 +25,7 @@ public class ConnectedComponent implements Serializable {
if (docIds.size() > 1) {
final String s = getMin();
String prefix = s.split("\\|")[0];
ccId = prefix + "|dedup_______::" + DedupUtility.md5(s);
ccId = prefix + "|dedup_wf_001::" + DedupUtility.md5(s);
return ccId;
} else {
return docIds.iterator().next();

View File

@ -25,7 +25,7 @@ public class ConnectedComponent implements Serializable {
if (docIds.size() > 1) {
final String s = getMin();
String prefix = s.split("\\|")[0];
ccId = prefix + "|dedup_______::" + DedupUtility.md5(s);
ccId = prefix + "|dedup_wf_001::" + DedupUtility.md5(s);
return ccId;
} else {
return docIds.iterator().next();

View File

@ -19,6 +19,8 @@ public class GraphHiveImporterJob {
private static final Logger log = LoggerFactory.getLogger(GraphHiveImporterJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser =
@ -37,12 +39,12 @@ public class GraphHiveImporterJob {
String inputPath = parser.get("inputPath");
log.info("inputPath: {}", inputPath);
String hiveMetastoreUris = parser.get("hiveMetastoreUris");
log.info("hiveMetastoreUris: {}", hiveMetastoreUris);
String hiveDbName = parser.get("hiveDbName");
log.info("hiveDbName: {}", hiveDbName);
String hiveMetastoreUris = parser.get("hiveMetastoreUris");
log.info("hiveMetastoreUris: {}", hiveMetastoreUris);
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", hiveMetastoreUris);
@ -58,13 +60,13 @@ public class GraphHiveImporterJob {
spark.sql(String.format("DROP DATABASE IF EXISTS %s CASCADE", hiveDbName));
spark.sql(String.format("CREATE DATABASE IF NOT EXISTS %s", hiveDbName));
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
// Read the input file and convert it into RDD of serializable object
ModelSupport.oafTypes.forEach(
(name, clazz) ->
spark.createDataset(
sc.textFile(inputPath + "/" + name)
.map(s -> new ObjectMapper().readValue(s, clazz))
.map(s -> OBJECT_MAPPER.readValue(s, clazz))
.rdd(),
Encoders.bean(clazz))
.write()

View File

@ -12,19 +12,15 @@
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>hive_metastore_uris</name>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>hive_jdbc_url</name>
<name>hiveJdbcUrl</name>
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
</property>
<property>
<name>hive_db_name</name>
<name>hiveDbName</name>
<value>openaire</value>
</property>
</configuration>

View File

@ -1,10 +1,10 @@
DROP VIEW IF EXISTS ${hive_db_name}.result;
DROP VIEW IF EXISTS ${hiveDbName}.result;
CREATE VIEW IF NOT EXISTS result as
select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hive_db_name}.publication p
select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hiveDbName}.publication p
union all
select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hive_db_name}.dataset d
select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hiveDbName}.dataset d
union all
select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hive_db_name}.software s
select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hiveDbName}.software s
union all
select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hive_db_name}.otherresearchproduct o;
select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hiveDbName}.otherresearchproduct o;

View File

@ -2,13 +2,21 @@
<parameters>
<property>
<name>sourcePath</name>
<name>inputPath</name>
<description>the source path</description>
</property>
<property>
<name>hive_db_name</name>
<name>hiveDbName</name>
<description>the target hive database name</description>
</property>
<property>
<name>hiveJdbcUrl</name>
<description>hive server jdbc url</description>
</property>
<property>
<name>hiveMetastoreUris</name>
<description>hive server metastore URIs</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
@ -87,9 +95,9 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--hive_db_name</arg><arg>${hive_db_name}</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--inputPath</arg><arg>${inputPath}</arg>
<arg>--hiveDbName</arg><arg>${hiveDbName}</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="PostProcessing"/>
<error to="Kill"/>
@ -102,12 +110,12 @@
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>${hive_metastore_uris}</value>
<value>${hiveMetastoreUris}</value>
</property>
</configuration>
<jdbc-url>${hive_jdbc_url}/${hive_db_name}</jdbc-url>
<jdbc-url>${hiveJdbcUrl}/${hiveDbName}</jdbc-url>
<script>lib/scripts/postprocessing.sql</script>
<param>hive_db_name=${hive_db_name}</param>
<param>hiveDbName=${hiveDbName}</param>
</hive2>
<ok to="End"/>
<error to="Kill"/>

View File

@ -216,6 +216,7 @@ public class CreateRelatedEntitiesJob_phase2 {
(MapFunction<String, E>)
value -> OBJECT_MAPPER.readValue(value, entityClazz),
Encoders.bean(entityClazz))
.filter("dataInfo.invisible == false")
.map(
(MapFunction<E, TypedRow>)
value ->

View File

@ -292,6 +292,12 @@
<groupId>eu.dnetlib</groupId>
<artifactId>dnet-actionmanager-common</artifactId>
<version>6.0.5</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>eu.dnetlib</groupId>