This commit is contained in:
Enrico Ottonello 2020-04-23 15:26:06 +02:00
commit c03ac6e5bb
16 changed files with 375 additions and 127 deletions

View File

@ -25,6 +25,7 @@ public class DedupRecordFactory {
public static <T extends OafEntity> Dataset<T> createDedupRecord( public static <T extends OafEntity> Dataset<T> createDedupRecord(
final SparkSession spark, final SparkSession spark,
final DataInfo dataInfo,
final String mergeRelsInputPath, final String mergeRelsInputPath,
final String entitiesInputPath, final String entitiesInputPath,
final Class<T> clazz) { final Class<T> clazz) {
@ -67,18 +68,14 @@ public class DedupRecordFactory {
Encoders.STRING()) Encoders.STRING())
.mapGroups( .mapGroups(
(MapGroupsFunction<String, Tuple2<String, T>, T>) (MapGroupsFunction<String, Tuple2<String, T>, T>)
(key, values) -> entityMerger(key, values, ts, clazz), (key, values) -> entityMerger(key, values, ts, dataInfo),
Encoders.bean(clazz)); Encoders.bean(clazz));
} }
private static <T extends OafEntity> T entityMerger( private static <T extends OafEntity> T entityMerger(
String id, Iterator<Tuple2<String, T>> entities, long ts, Class<T> clazz) { String id, Iterator<Tuple2<String, T>> entities, long ts, DataInfo dataInfo) {
try {
T entity = clazz.newInstance(); T entity = entities.next()._2();
entity.setId(id);
entity.setDataInfo(new DataInfo());
entity.getDataInfo().setTrust("0.9");
entity.setLastupdatetimestamp(ts);
final Collection<String> dates = Lists.newArrayList(); final Collection<String> dates = Lists.newArrayList();
entities.forEachRemaining( entities.forEachRemaining(
@ -90,7 +87,7 @@ public class DedupRecordFactory {
Result er = (Result) entity; Result er = (Result) entity;
er.setAuthor(DedupUtility.mergeAuthor(er.getAuthor(), r1.getAuthor())); er.setAuthor(DedupUtility.mergeAuthor(er.getAuthor(), r1.getAuthor()));
if (er.getDateofacceptance() != null) { if (r1.getDateofacceptance() != null) {
dates.add(r1.getDateofacceptance().getValue()); dates.add(r1.getDateofacceptance().getValue());
} }
} }
@ -99,9 +96,11 @@ public class DedupRecordFactory {
if (ModelSupport.isSubClass(entity, Result.class)) { if (ModelSupport.isSubClass(entity, Result.class)) {
((Result) entity).setDateofacceptance(DatePicker.pick(dates)); ((Result) entity).setDateofacceptance(DatePicker.pick(dates));
} }
entity.setId(id);
entity.setLastupdatetimestamp(ts);
entity.setDataInfo(dataInfo);
return entity; return entity;
} catch (IllegalAccessException | InstantiationException e) {
throw new RuntimeException(e);
}
} }
} }

View File

@ -3,7 +3,9 @@ package eu.dnetlib.dhp.oa.dedup;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
@ -21,6 +23,10 @@ public class SparkCreateDedupRecord extends AbstractSparkAction {
private static final Logger log = LoggerFactory.getLogger(SparkCreateDedupRecord.class); private static final Logger log = LoggerFactory.getLogger(SparkCreateDedupRecord.class);
public static final String ROOT_TRUST = "0.8";
public static final String PROVENANCE_ACTION_CLASS = "sysimport:dedup";
public static final String PROVENANCE_ACTIONS = "dnet:provenanceActions";
public SparkCreateDedupRecord(ArgumentApplicationParser parser, SparkSession spark) { public SparkCreateDedupRecord(ArgumentApplicationParser parser, SparkSession spark) {
super(parser, spark); super(parser, spark);
} }
@ -67,13 +73,30 @@ public class SparkCreateDedupRecord extends AbstractSparkAction {
DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity); DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity);
final String entityPath = DedupUtility.createEntityPath(graphBasePath, subEntity); final String entityPath = DedupUtility.createEntityPath(graphBasePath, subEntity);
Class<OafEntity> clazz = ModelSupport.entityTypes.get(EntityType.valueOf(subEntity)); final Class<OafEntity> clazz =
ModelSupport.entityTypes.get(EntityType.valueOf(subEntity));
DedupRecordFactory.createDedupRecord(spark, mergeRelPath, entityPath, clazz) final DataInfo dataInfo = getDataInfo(dedupConf);
DedupRecordFactory.createDedupRecord(spark, dataInfo, mergeRelPath, entityPath, clazz)
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.json(outputPath); .json(outputPath);
} }
} }
private static DataInfo getDataInfo(DedupConfig dedupConf) {
DataInfo info = new DataInfo();
info.setDeletedbyinference(false);
info.setInferred(true);
info.setInvisible(false);
info.setTrust(ROOT_TRUST);
info.setInferenceprovenance(dedupConf.getWf().getConfigurationId());
Qualifier provenance = new Qualifier();
provenance.setClassid(PROVENANCE_ACTION_CLASS);
provenance.setClassname(PROVENANCE_ACTION_CLASS);
provenance.setSchemeid(PROVENANCE_ACTIONS);
provenance.setSchemename(PROVENANCE_ACTIONS);
info.setProvenanceaction(provenance);
return info;
}
} }

View File

@ -25,7 +25,7 @@ public class ConnectedComponent implements Serializable {
if (docIds.size() > 1) { if (docIds.size() > 1) {
final String s = getMin(); final String s = getMin();
String prefix = s.split("\\|")[0]; String prefix = s.split("\\|")[0];
ccId = prefix + "|dedup_______::" + DedupUtility.md5(s); ccId = prefix + "|dedup_wf_001::" + DedupUtility.md5(s);
return ccId; return ccId;
} else { } else {
return docIds.iterator().next(); return docIds.iterator().next();

View File

@ -25,7 +25,7 @@ public class ConnectedComponent implements Serializable {
if (docIds.size() > 1) { if (docIds.size() > 1) {
final String s = getMin(); final String s = getMin();
String prefix = s.split("\\|")[0]; String prefix = s.split("\\|")[0];
ccId = prefix + "|dedup_______::" + DedupUtility.md5(s); ccId = prefix + "|dedup_wf_001::" + DedupUtility.md5(s);
return ccId; return ccId;
} else { } else {
return docIds.iterator().next(); return docIds.iterator().next();

View File

@ -13,7 +13,8 @@ import scala.collection.JavaConverters._
case class mappingAffiliation(name: String) case class mappingAffiliation(name: String)
case class mappingAuthor(given: Option[String], family: String, ORCID: Option[String], affiliation: Option[mappingAffiliation]) {} case class mappingAuthor(given: Option[String], family: String, ORCID: Option[String], affiliation: Option[mappingAffiliation]) {}
class Crossref2Oaf {
case object Crossref2Oaf {
//STATIC STRING //STATIC STRING
val MAG = "MAG" val MAG = "MAG"
@ -28,7 +29,6 @@ class Crossref2Oaf {
val DNET_LANGUAGES = "dnet:languages" val DNET_LANGUAGES = "dnet:languages"
val PID_TYPES = "dnet:pid_types" val PID_TYPES = "dnet:pid_types"
val mappingCrossrefType = Map( val mappingCrossrefType = Map(
"book-section" -> "publication", "book-section" -> "publication",
"book" -> "publication", "book" -> "publication",
@ -111,7 +111,7 @@ class Crossref2Oaf {
result.setCollectedfrom(List(createCollectedFrom()).asJava) result.setCollectedfrom(List(createCollectedFrom()).asJava)
// Publisher ( Name of work's publisher mapped into Result/Publisher) // Publisher ( Name of work's publisher mapped into Result/Publisher)
val publisher = (json \ "publisher").extract[String] val publisher = (json \ "publisher").extractOrElse[String](null)
result.setPublisher(asField(publisher)) result.setPublisher(asField(publisher))
// TITLE // TITLE
@ -144,7 +144,7 @@ class Crossref2Oaf {
//Mapping AUthor //Mapping AUthor
val authorList:List[mappingAuthor] = (json \ "author").extract[List[mappingAuthor]] val authorList: List[mappingAuthor] = (json \ "author").extractOrElse[List[mappingAuthor]](List())
result.setAuthor(authorList.map(a => generateAuhtor(a.given.orNull, a.family, a.ORCID.orNull)).asJava) result.setAuthor(authorList.map(a => generateAuhtor(a.given.orNull, a.family, a.ORCID.orNull)).asJava)
@ -175,8 +175,6 @@ class Crossref2Oaf {
} }
def generateAuhtor(given: String, family: String, orcid: String): Author = { def generateAuhtor(given: String, family: String, orcid: String): Author = {
val a = new Author val a = new Author
a.setName(given) a.setName(given)
@ -202,30 +200,28 @@ class Crossref2Oaf {
if (result == null) if (result == null)
return result return result
val cOBJCategory = mappingCrossrefSubType.getOrElse(objectType, mappingCrossrefSubType.getOrElse(objectSubType, "0038 Other literature type")); val cOBJCategory = mappingCrossrefSubType.getOrElse(objectType, mappingCrossrefSubType.getOrElse(objectSubType, "0038 Other literature type"));
logger.debug(mappingCrossrefType(objectType)) // logger.debug(mappingCrossrefType(objectType))
logger.debug(cOBJCategory) // logger.debug(cOBJCategory)
mappingResult(result, json, cOBJCategory) mappingResult(result, json, cOBJCategory)
result match { result match {
case publication: Publication => convertPublication(publication) case publication: Publication => convertPublication(publication, json, cOBJCategory)
case dataset: Dataset => convertDataset(dataset) case dataset: Dataset => convertDataset(dataset)
} }
result result
} }
def convertDataset(dataset: Dataset): Unit = { def convertDataset(dataset: Dataset): Unit = {
//TODO probably we need to add relation and other stuff here
} }
def convertPublication(publication: Publication, json: JValue, cobjCategory: String): Unit = { def convertPublication(publication: Publication, json: JValue, cobjCategory: String): Unit = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
val containerTitles = for {JString(ct) <- json \ "container-title"} yield ct val containerTitles = for {JString(ct) <- json \ "container-title"} yield ct
@ -243,11 +239,43 @@ class Crossref2Oaf {
publication.setSource(List(asField(source)).asJava) publication.setSource(List(asField(source)).asJava)
} }
} else { } else {
val issn = // Mapping Journal
val issnInfos = for {JArray(issn_types) <- json \ "issn-type"
JObject(issn_type) <- issn_types
JField("type", JString(tp)) <- issn_type
JField("value", JString(vl)) <- issn_type
} yield Tuple2(tp, vl)
val volume = (json \ "volume").extractOrElse[String] (null)
if (containerTitles.nonEmpty) {
val journal = new Journal
journal.setName(containerTitles.head)
if (issnInfos.nonEmpty) {
issnInfos.foreach(tp => {
tp._1 match {
case "electronic" => journal.setIssnOnline(tp._2)
case "print" => journal.setIssnPrinted(tp._2)
}
})
}
journal.setVol(volume)
val page = (json \ "page").extractOrElse[String] (null)
if(page!= null ) {
val pp = page.split("-")
journal.setSp(pp.head)
if (pp.size > 1)
journal.setEp(pp(1))
} }
// Mapping other types of publications
publication.setJournal(journal)
}
}
} }

View File

@ -1,14 +1,14 @@
package eu.dnetlib.doiboost.crossref package eu.dnetlib.doiboost.crossref
import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.Publication
import org.apache.commons.io.IOUtils import org.apache.commons.io.IOUtils
import org.apache.hadoop.io.{IntWritable, Text} import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.spark.SparkConf import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.{Dataset, Encoders, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory} import org.slf4j.{Logger, LoggerFactory}
case class Reference(author: String, firstPage: String) {} case class Reference(author: String, firstPage: String) {}
object SparkMapDumpIntoOAF { object SparkMapDumpIntoOAF {
@ -26,15 +26,24 @@ object SparkMapDumpIntoOAF {
.config(conf) .config(conf)
.appName(SparkMapDumpIntoOAF.getClass.getSimpleName) .appName(SparkMapDumpIntoOAF.getClass.getSimpleName)
.master(parser.get("master")).getOrCreate() .master(parser.get("master")).getOrCreate()
import spark.implicits._
implicit val mapEncoder = Encoders.bean(classOf[Publication])
val sc = spark.sparkContext val sc = spark.sparkContext
val x: String = sc.sequenceFile(parser.get("sourcePath"), classOf[IntWritable], classOf[Text])
.map(k => k._2.toString).first()
val item =CrossrefImporter.decompressBlob(x) val total = sc.sequenceFile(parser.get("sourcePath"), classOf[IntWritable], classOf[Text])
.map(k => k._2.toString).map(CrossrefImporter.decompressBlob)
.map(k => Crossref2Oaf.convert(k, logger))
.filter(k => k != null && k.isInstanceOf[Publication])
.map(k => k.asInstanceOf[Publication])
logger.info(item) val ds: Dataset[Publication] = spark.createDataset(total)
val targetPath = parser.get("targetPath")
ds.write.mode(SaveMode.Overwrite).save(s"${targetPath}/publication")
logger.info(s"total Item :${total}")
// lazy val json: json4s.JValue = parse(item) // lazy val json: json4s.JValue = parse(item)
// //

View File

@ -1,5 +1,6 @@
[ [
{"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true}, {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true},
{"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the working dir path", "paramRequired": true},
{"paramName":"m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true} {"paramName":"m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true}
] ]

View File

@ -26,18 +26,98 @@ public class DoiBoostTest {
// CrossrefImporter.main("-n file:///tmp -t file:///tmp/p.seq -ts 1586110000749".split(" // CrossrefImporter.main("-n file:///tmp -t file:///tmp/p.seq -ts 1586110000749".split("
// ")); // "));
SparkMapDumpIntoOAF.main( SparkMapDumpIntoOAF.main(
"-m local[*] -s file:///data/doiboost/crossref_dump.seq".split(" ")); "-m local[*] -s file:///data/doiboost/crossref_dump.seq -t /data/doiboost"
.split(" "));
}
@Test
public void testConvertDatasetCrossRef2Oaf() throws IOException {
final String json = IOUtils.toString(getClass().getResourceAsStream("dataset.json"));
ObjectMapper mapper = new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT);
assertNotNull(json);
assertFalse(StringUtils.isBlank(json));
final Result result = Crossref2Oaf.convert(json, logger);
logger.info(mapper.writeValueAsString(result));
} }
@Test @Test
public void testConvertPreprintCrossRef2Oaf() throws IOException { public void testConvertPreprintCrossRef2Oaf() throws IOException {
final String json = IOUtils.toString(getClass().getResourceAsStream("article.json")); final String json = IOUtils.toString(getClass().getResourceAsStream("preprint.json"));
ObjectMapper mapper = new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT); ObjectMapper mapper = new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT);
assertNotNull(json); assertNotNull(json);
assertFalse(StringUtils.isBlank(json)); assertFalse(StringUtils.isBlank(json));
Crossref2Oaf cf = new Crossref2Oaf();
final Result result = cf.convert(json, logger); final Result result = Crossref2Oaf.convert(json, logger);
assertNotNull(result);
assertNotNull(result.getDataInfo(), "Datainfo test not null Failed");
assertNotNull(
result.getDataInfo().getProvenanceaction(),
"DataInfo/Provenance test not null Failed");
assertFalse(
StringUtils.isBlank(result.getDataInfo().getProvenanceaction().getClassid()),
"DataInfo/Provenance/classId test not null Failed");
assertFalse(
StringUtils.isBlank(result.getDataInfo().getProvenanceaction().getClassname()),
"DataInfo/Provenance/className test not null Failed");
assertFalse(
StringUtils.isBlank(result.getDataInfo().getProvenanceaction().getSchemeid()),
"DataInfo/Provenance/SchemeId test not null Failed");
assertFalse(
StringUtils.isBlank(result.getDataInfo().getProvenanceaction().getSchemename()),
"DataInfo/Provenance/SchemeName test not null Failed");
assertNotNull(result.getCollectedfrom(), "CollectedFrom test not null Failed");
assertTrue(result.getCollectedfrom().size() > 0);
assertTrue(
result.getCollectedfrom().stream()
.anyMatch(
c ->
c.getKey()
.equalsIgnoreCase(
"10|openaire____::081b82f96300b6a6e3d282bad31cb6e2")));
assertTrue(
result.getCollectedfrom().stream()
.anyMatch(c -> c.getValue().equalsIgnoreCase("crossref")));
assertTrue(
result.getRelevantdate().stream()
.anyMatch(d -> d.getQualifier().getClassid().equalsIgnoreCase("created")));
assertTrue(
result.getRelevantdate().stream()
.anyMatch(
d -> d.getQualifier().getClassid().equalsIgnoreCase("available")));
assertTrue(
result.getRelevantdate().stream()
.anyMatch(d -> d.getQualifier().getClassid().equalsIgnoreCase("accepted")));
assertTrue(
result.getRelevantdate().stream()
.anyMatch(
d ->
d.getQualifier()
.getClassid()
.equalsIgnoreCase("published-online")));
assertTrue(
result.getRelevantdate().stream()
.anyMatch(
d ->
d.getQualifier()
.getClassid()
.equalsIgnoreCase("published-print")));
logger.info(mapper.writeValueAsString(result));
}
@Test
public void testConvertArticleCrossRef2Oaf() throws IOException {
final String json = IOUtils.toString(getClass().getResourceAsStream("article.json"));
ObjectMapper mapper = new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT);
assertNotNull(json);
assertFalse(StringUtils.isBlank(json));
final Result result = Crossref2Oaf.convert(json, logger);
assertNotNull(result); assertNotNull(result);
assertNotNull(result.getDataInfo(), "Datainfo test not null Failed"); assertNotNull(result.getDataInfo(), "Datainfo test not null Failed");
@ -73,15 +153,6 @@ public class DoiBoostTest {
assertTrue( assertTrue(
result.getRelevantdate().stream() result.getRelevantdate().stream()
.anyMatch(d -> d.getQualifier().getClassid().equalsIgnoreCase("created"))); .anyMatch(d -> d.getQualifier().getClassid().equalsIgnoreCase("created")));
// assertTrue(
// result.getRelevantdate().stream()
// .anyMatch(
// d ->
// d.getQualifier().getClassid().equalsIgnoreCase("available")));
// assertTrue(
// result.getRelevantdate().stream()
// .anyMatch(d ->
// d.getQualifier().getClassid().equalsIgnoreCase("accepted")));
assertTrue( assertTrue(
result.getRelevantdate().stream() result.getRelevantdate().stream()
.anyMatch( .anyMatch(
@ -107,8 +178,7 @@ public class DoiBoostTest {
ObjectMapper mapper = new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT); ObjectMapper mapper = new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT);
assertNotNull(json); assertNotNull(json);
assertFalse(StringUtils.isBlank(json)); assertFalse(StringUtils.isBlank(json));
Crossref2Oaf cf = new Crossref2Oaf(); final Result result = Crossref2Oaf.convert(json, logger);
final Result result = cf.convert(json, logger);
assertNotNull(result); assertNotNull(result);
logger.info(mapper.writeValueAsString(result)); logger.info(mapper.writeValueAsString(result));

View File

@ -170,5 +170,5 @@
"container-title": [ "container-title": [
"Ecl\u00e9tica Qu\u00edmica Journal" "Ecl\u00e9tica Qu\u00edmica Journal"
], ],
"page": "41" "page": "41-50"
} }

View File

@ -0,0 +1,105 @@
{
"DOI": "10.1037/e522512014-096",
"subtitle": [
"(522512014-096)"
],
"issued": {
"date-parts": [
[
2012
]
]
},
"prefix": "10.1037",
"author": [
{
"affiliation": [],
"given": "Jessica",
"family": "Trudeau",
"sequence": "first"
},
{
"affiliation": [],
"given": "Amy",
"family": "McShane",
"sequence": "additional"
},
{
"affiliation": [],
"given": "Renee",
"family": "McDonald",
"sequence": "additional"
}
],
"reference-count": 0,
"member": "15",
"source": "Crossref",
"score": 1.0,
"deposited": {
"timestamp": 1413827035000,
"date-parts": [
[
2014,
10,
20
]
],
"date-time": "2014-10-20T17:43:55Z"
},
"indexed": {
"timestamp": 1550142454710,
"date-parts": [
[
2019,
2,
14
]
],
"date-time": "2019-02-14T11:07:34Z"
},
"type": "dataset",
"URL": "http://dx.doi.org/10.1037/e522512014-096",
"is-referenced-by-count": 0,
"published-print": {
"date-parts": [
[
2012
]
]
},
"references-count": 0,
"institution": {
"acronym": [
"APA"
],
"place": [
"-"
],
"name": "American Psychological Association"
},
"publisher": "American Psychological Association (APA)",
"content-domain": {
"domain": [],
"crossmark-restriction": false
},
"created": {
"timestamp": 1413826121000,
"date-parts": [
[
2014,
10,
20
]
],
"date-time": "2014-10-20T17:28:41Z"
},
"title": [
"Project Support: A Randomized Control Study to Evaluate the Translation of an Evidence- Based Program"
],
"alternative-id": [
"522512014-096"
],
"container-title": [
"PsycEXTRA Dataset"
]
}

View File

@ -19,6 +19,8 @@ public class GraphHiveImporterJob {
private static final Logger log = LoggerFactory.getLogger(GraphHiveImporterJob.class); private static final Logger log = LoggerFactory.getLogger(GraphHiveImporterJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = final ArgumentApplicationParser parser =
@ -37,12 +39,12 @@ public class GraphHiveImporterJob {
String inputPath = parser.get("inputPath"); String inputPath = parser.get("inputPath");
log.info("inputPath: {}", inputPath); log.info("inputPath: {}", inputPath);
String hiveMetastoreUris = parser.get("hiveMetastoreUris");
log.info("hiveMetastoreUris: {}", hiveMetastoreUris);
String hiveDbName = parser.get("hiveDbName"); String hiveDbName = parser.get("hiveDbName");
log.info("hiveDbName: {}", hiveDbName); log.info("hiveDbName: {}", hiveDbName);
String hiveMetastoreUris = parser.get("hiveMetastoreUris");
log.info("hiveMetastoreUris: {}", hiveMetastoreUris);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", hiveMetastoreUris); conf.set("hive.metastore.uris", hiveMetastoreUris);
@ -58,13 +60,13 @@ public class GraphHiveImporterJob {
spark.sql(String.format("DROP DATABASE IF EXISTS %s CASCADE", hiveDbName)); spark.sql(String.format("DROP DATABASE IF EXISTS %s CASCADE", hiveDbName));
spark.sql(String.format("CREATE DATABASE IF NOT EXISTS %s", hiveDbName)); spark.sql(String.format("CREATE DATABASE IF NOT EXISTS %s", hiveDbName));
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
// Read the input file and convert it into RDD of serializable object // Read the input file and convert it into RDD of serializable object
ModelSupport.oafTypes.forEach( ModelSupport.oafTypes.forEach(
(name, clazz) -> (name, clazz) ->
spark.createDataset( spark.createDataset(
sc.textFile(inputPath + "/" + name) sc.textFile(inputPath + "/" + name)
.map(s -> new ObjectMapper().readValue(s, clazz)) .map(s -> OBJECT_MAPPER.readValue(s, clazz))
.rdd(), .rdd(),
Encoders.bean(clazz)) Encoders.bean(clazz))
.write() .write()

View File

@ -12,19 +12,15 @@
<value>true</value> <value>true</value>
</property> </property>
<property> <property>
<name>oozie.action.sharelib.for.spark</name> <name>hiveMetastoreUris</name>
<value>spark2</value>
</property>
<property>
<name>hive_metastore_uris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value> <value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property> </property>
<property> <property>
<name>hive_jdbc_url</name> <name>hiveJdbcUrl</name>
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value> <value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
</property> </property>
<property> <property>
<name>hive_db_name</name> <name>hiveDbName</name>
<value>openaire</value> <value>openaire</value>
</property> </property>
</configuration> </configuration>

View File

@ -1,10 +1,10 @@
DROP VIEW IF EXISTS ${hive_db_name}.result; DROP VIEW IF EXISTS ${hiveDbName}.result;
CREATE VIEW IF NOT EXISTS result as CREATE VIEW IF NOT EXISTS result as
select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hive_db_name}.publication p select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hiveDbName}.publication p
union all union all
select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hive_db_name}.dataset d select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hiveDbName}.dataset d
union all union all
select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hive_db_name}.software s select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hiveDbName}.software s
union all union all
select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hive_db_name}.otherresearchproduct o; select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hiveDbName}.otherresearchproduct o;

View File

@ -2,13 +2,21 @@
<parameters> <parameters>
<property> <property>
<name>sourcePath</name> <name>inputPath</name>
<description>the source path</description> <description>the source path</description>
</property> </property>
<property> <property>
<name>hive_db_name</name> <name>hiveDbName</name>
<description>the target hive database name</description> <description>the target hive database name</description>
</property> </property>
<property>
<name>hiveJdbcUrl</name>
<description>hive server jdbc url</description>
</property>
<property>
<name>hiveMetastoreUris</name>
<description>hive server metastore URIs</description>
</property>
<property> <property>
<name>sparkDriverMemory</name> <name>sparkDriverMemory</name>
<description>memory for driver process</description> <description>memory for driver process</description>
@ -87,9 +95,9 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg> <arg>--inputPath</arg><arg>${inputPath}</arg>
<arg>--hive_db_name</arg><arg>${hive_db_name}</arg> <arg>--hiveDbName</arg><arg>${hiveDbName}</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg> <arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark> </spark>
<ok to="PostProcessing"/> <ok to="PostProcessing"/>
<error to="Kill"/> <error to="Kill"/>
@ -102,12 +110,12 @@
<configuration> <configuration>
<property> <property>
<name>hive.metastore.uris</name> <name>hive.metastore.uris</name>
<value>${hive_metastore_uris}</value> <value>${hiveMetastoreUris}</value>
</property> </property>
</configuration> </configuration>
<jdbc-url>${hive_jdbc_url}/${hive_db_name}</jdbc-url> <jdbc-url>${hiveJdbcUrl}/${hiveDbName}</jdbc-url>
<script>lib/scripts/postprocessing.sql</script> <script>lib/scripts/postprocessing.sql</script>
<param>hive_db_name=${hive_db_name}</param> <param>hiveDbName=${hiveDbName}</param>
</hive2> </hive2>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>

View File

@ -216,6 +216,7 @@ public class CreateRelatedEntitiesJob_phase2 {
(MapFunction<String, E>) (MapFunction<String, E>)
value -> OBJECT_MAPPER.readValue(value, entityClazz), value -> OBJECT_MAPPER.readValue(value, entityClazz),
Encoders.bean(entityClazz)) Encoders.bean(entityClazz))
.filter("dataInfo.invisible == false")
.map( .map(
(MapFunction<E, TypedRow>) (MapFunction<E, TypedRow>)
value -> value ->

View File

@ -292,6 +292,12 @@
<groupId>eu.dnetlib</groupId> <groupId>eu.dnetlib</groupId>
<artifactId>dnet-actionmanager-common</artifactId> <artifactId>dnet-actionmanager-common</artifactId>
<version>6.0.5</version> <version>6.0.5</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
</exclusions>
</dependency> </dependency>
<dependency> <dependency>
<groupId>eu.dnetlib</groupId> <groupId>eu.dnetlib</groupId>