Compare commits

...

6 Commits

Author SHA1 Message Date
Michele Artini 2c2311b062 openorgs pids 2020-07-28 16:16:20 +02:00
Michele Artini c364b329c4 merge from master 2020-07-28 14:11:39 +02:00
Michele Artini a5c9805037 merged master into branch 2020-07-22 10:12:42 +02:00
Michele Artini 94d2e7523f import simrels 2020-07-22 09:59:24 +02:00
Michele Artini 334eb2b927 import openorgs 2020-07-21 14:47:35 +02:00
Michele Artini 574870b09b openorgs db importer 2020-07-17 15:35:29 +02:00
20 changed files with 1098 additions and 843 deletions

View File

@ -69,7 +69,7 @@
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill> </kill>
<action name="index_es"> <action name="index_es">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
@ -97,4 +97,4 @@
<end name="End"/> <end name="End"/>
</workflow-app> </workflow-app>

View File

@ -0,0 +1,552 @@
package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.asString;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.dataInfo;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.journal;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.listFields;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.listKeyValues;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.qualifier;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DATASET_DEFAULT_RESULTTYPE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DATASOURCE_ORGANIZATION;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTIONS;
import static eu.dnetlib.dhp.schema.common.ModelConstants.ENTITYREGISTRY_PROVENANCE_ACTION;
import static eu.dnetlib.dhp.schema.common.ModelConstants.HAS_PARTICIPANT;
import static eu.dnetlib.dhp.schema.common.ModelConstants.IS_PARTICIPANT;
import static eu.dnetlib.dhp.schema.common.ModelConstants.IS_PRODUCED_BY;
import static eu.dnetlib.dhp.schema.common.ModelConstants.IS_PROVIDED_BY;
import static eu.dnetlib.dhp.schema.common.ModelConstants.IS_RELATED_TO;
import static eu.dnetlib.dhp.schema.common.ModelConstants.ORP_DEFAULT_RESULTTYPE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.OUTCOME;
import static eu.dnetlib.dhp.schema.common.ModelConstants.PARTICIPATION;
import static eu.dnetlib.dhp.schema.common.ModelConstants.PRODUCES;
import static eu.dnetlib.dhp.schema.common.ModelConstants.PROJECT_ORGANIZATION;
import static eu.dnetlib.dhp.schema.common.ModelConstants.PROVIDES;
import static eu.dnetlib.dhp.schema.common.ModelConstants.PROVISION;
import static eu.dnetlib.dhp.schema.common.ModelConstants.PUBLICATION_DEFAULT_RESULTTYPE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.RELATIONSHIP;
import static eu.dnetlib.dhp.schema.common.ModelConstants.RESULT_PROJECT;
import static eu.dnetlib.dhp.schema.common.ModelConstants.RESULT_RESULT;
import static eu.dnetlib.dhp.schema.common.ModelConstants.SOFTWARE_DEFAULT_RESULTTYPE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.USER_CLAIM;
import java.io.Closeable;
import java.io.IOException;
import java.sql.Array;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import eu.dnetlib.dhp.common.DbClient;
import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.Context;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.Journal;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.Software;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
public abstract class AbstractDbApplication extends AbstractMigrationApplication implements Closeable {
public static final String SOURCE_TYPE = "source_type";
public static final String TARGET_TYPE = "target_type";
private static final String ORG_ORG_RELTYPE = "organizationOrganization";
private static final String ORG_ORG_SUBRELTYPE = "dedup";
private static final String ORG_ORG_CLASS = "isSimilarTo";
private final DbClient dbClient;
private final long lastUpdateTimestamp;
private final VocabularyGroup vocs;
public AbstractDbApplication(
final String hdfsPath, final String dbUrl, final String dbUser, final String dbPassword,
final String isLookupUrl)
throws Exception {
super(hdfsPath);
this.dbClient = new DbClient(dbUrl, dbUser, dbPassword);
this.vocs = VocabularyGroup.loadVocsFromIS(ISLookupClientFactory.getLookUpService(isLookupUrl));
this.lastUpdateTimestamp = new Date().getTime();
}
protected AbstractDbApplication(final DbClient dbClient, final VocabularyGroup vocs) { // ONLY FOT TESTS
super();
this.dbClient = dbClient;
this.vocs = vocs;
this.lastUpdateTimestamp = new Date().getTime();
}
public void execute(final String sqlFile, final Function<ResultSet, List<Oaf>> producer)
throws Exception {
final String sql = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/sql/" + sqlFile));
final Consumer<ResultSet> consumer = rs -> producer.apply(rs).forEach(oaf -> emitOaf(oaf));
dbClient.processResults(sql, consumer);
}
public List<Oaf> processDatasource(final ResultSet rs) {
try {
final DataInfo info = prepareDataInfo(rs);
final Datasource ds = new Datasource();
ds.setId(createOpenaireId(10, rs.getString("datasourceid"), true));
ds.setOriginalId(Arrays.asList((String[]) rs.getArray("identities").getArray()));
ds
.setCollectedfrom(
listKeyValues(
createOpenaireId(10, rs.getString("collectedfromid"), true),
rs.getString("collectedfromname")));
ds.setPid(new ArrayList<>());
ds.setDateofcollection(asString(rs.getDate("dateofcollection")));
ds.setDateoftransformation(null); // Value not returned by the SQL query
ds.setExtraInfo(new ArrayList<>()); // Values not present in the DB
ds.setOaiprovenance(null); // Values not present in the DB
ds.setDatasourcetype(prepareQualifierSplitting(rs.getString("datasourcetype")));
ds.setOpenairecompatibility(prepareQualifierSplitting(rs.getString("openairecompatibility")));
ds.setOfficialname(field(rs.getString("officialname"), info));
ds.setEnglishname(field(rs.getString("englishname"), info));
ds.setWebsiteurl(field(rs.getString("websiteurl"), info));
ds.setLogourl(field(rs.getString("logourl"), info));
ds.setContactemail(field(rs.getString("contactemail"), info));
ds.setNamespaceprefix(field(rs.getString("namespaceprefix"), info));
ds.setLatitude(field(Double.toString(rs.getDouble("latitude")), info));
ds.setLongitude(field(Double.toString(rs.getDouble("longitude")), info));
ds.setDateofvalidation(field(asString(rs.getDate("dateofvalidation")), info));
ds.setDescription(field(rs.getString("description"), info));
ds.setSubjects(prepareListOfStructProps(rs.getArray("subjects"), info));
ds.setOdnumberofitems(field(Double.toString(rs.getInt("odnumberofitems")), info));
ds.setOdnumberofitemsdate(field(asString(rs.getDate("odnumberofitemsdate")), info));
ds.setOdpolicies(field(rs.getString("odpolicies"), info));
ds.setOdlanguages(prepareListFields(rs.getArray("odlanguages"), info));
ds.setOdcontenttypes(prepareListFields(rs.getArray("odcontenttypes"), info));
ds.setAccessinfopackage(prepareListFields(rs.getArray("accessinfopackage"), info));
ds.setReleasestartdate(field(asString(rs.getDate("releasestartdate")), info));
ds.setReleaseenddate(field(asString(rs.getDate("releaseenddate")), info));
ds.setMissionstatementurl(field(rs.getString("missionstatementurl"), info));
ds.setDataprovider(field(rs.getBoolean("dataprovider"), info));
ds.setServiceprovider(field(rs.getBoolean("serviceprovider"), info));
ds.setDatabaseaccesstype(field(rs.getString("databaseaccesstype"), info));
ds.setDatauploadtype(field(rs.getString("datauploadtype"), info));
ds.setDatabaseaccessrestriction(field(rs.getString("databaseaccessrestriction"), info));
ds.setDatauploadrestriction(field(rs.getString("datauploadrestriction"), info));
ds.setVersioning(field(rs.getBoolean("versioning"), info));
ds.setCitationguidelineurl(field(rs.getString("citationguidelineurl"), info));
ds.setQualitymanagementkind(field(rs.getString("qualitymanagementkind"), info));
ds.setPidsystems(field(rs.getString("pidsystems"), info));
ds.setCertificates(field(rs.getString("certificates"), info));
ds.setPolicies(new ArrayList<>()); // The sql query returns an empty array
ds
.setJournal(prepareJournal(rs.getString("officialname"), rs.getString("journal"), info)); // Journal
ds.setDataInfo(info);
ds.setLastupdatetimestamp(lastUpdateTimestamp);
return Arrays.asList(ds);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
public List<Oaf> processProject(final ResultSet rs) {
try {
final DataInfo info = prepareDataInfo(rs);
final Project p = new Project();
p.setId(createOpenaireId(40, rs.getString("projectid"), true));
p.setOriginalId(Arrays.asList(rs.getString("projectid")));
p
.setCollectedfrom(
listKeyValues(
createOpenaireId(10, rs.getString("collectedfromid"), true),
rs.getString("collectedfromname")));
p.setPid(new ArrayList<>());
p.setDateofcollection(asString(rs.getDate("dateofcollection")));
p.setDateoftransformation(asString(rs.getDate("dateoftransformation")));
p.setExtraInfo(new ArrayList<>()); // Values not present in the DB
p.setOaiprovenance(null); // Values not present in the DB
p.setWebsiteurl(field(rs.getString("websiteurl"), info));
p.setCode(field(rs.getString("code"), info));
p.setAcronym(field(rs.getString("acronym"), info));
p.setTitle(field(rs.getString("title"), info));
p.setStartdate(field(asString(rs.getDate("startdate")), info));
p.setEnddate(field(asString(rs.getDate("enddate")), info));
p.setCallidentifier(field(rs.getString("callidentifier"), info));
p.setKeywords(field(rs.getString("keywords"), info));
p.setDuration(field(Integer.toString(rs.getInt("duration")), info));
p.setEcsc39(field(Boolean.toString(rs.getBoolean("ecsc39")), info));
p
.setOamandatepublications(field(Boolean.toString(rs.getBoolean("oamandatepublications")), info));
p.setEcarticle29_3(field(Boolean.toString(rs.getBoolean("ecarticle29_3")), info));
p.setSubjects(prepareListOfStructProps(rs.getArray("subjects"), info));
p.setFundingtree(prepareListFields(rs.getArray("fundingtree"), info));
p.setContracttype(prepareQualifierSplitting(rs.getString("contracttype")));
p.setOptional1(field(rs.getString("optional1"), info));
p.setOptional2(field(rs.getString("optional2"), info));
p.setJsonextrainfo(field(rs.getString("jsonextrainfo"), info));
p.setContactfullname(field(rs.getString("contactfullname"), info));
p.setContactfax(field(rs.getString("contactfax"), info));
p.setContactphone(field(rs.getString("contactphone"), info));
p.setContactemail(field(rs.getString("contactemail"), info));
p.setSummary(field(rs.getString("summary"), info));
p.setCurrency(field(rs.getString("currency"), info));
p.setTotalcost(new Float(rs.getDouble("totalcost")));
p.setFundedamount(new Float(rs.getDouble("fundedamount")));
p.setDataInfo(info);
p.setLastupdatetimestamp(lastUpdateTimestamp);
return Arrays.asList(p);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
public List<Oaf> processOrganization(final ResultSet rs) {
try {
final DataInfo info = prepareDataInfo(rs);
final Organization o = new Organization();
o.setId(createOpenaireId(20, rs.getString("organizationid"), true));
o.setOriginalId(Arrays.asList(rs.getString("organizationid")));
o
.setCollectedfrom(
listKeyValues(
createOpenaireId(10, rs.getString("collectedfromid"), true),
rs.getString("collectedfromname")));
o.setPid(prepareListOfStructProps(rs.getArray("pid"), info));
o.setDateofcollection(asString(rs.getDate("dateofcollection")));
o.setDateoftransformation(asString(rs.getDate("dateoftransformation")));
o.setExtraInfo(new ArrayList<>()); // Values not present in the DB
o.setOaiprovenance(null); // Values not present in the DB
o.setLegalshortname(field(rs.getString("legalshortname"), info));
o.setLegalname(field(rs.getString("legalname"), info));
o.setAlternativeNames(prepareListFields(rs.getArray("alternativenames"), info));
o.setWebsiteurl(field(rs.getString("websiteurl"), info));
o.setLogourl(field(rs.getString("logourl"), info));
o.setEclegalbody(field(Boolean.toString(rs.getBoolean("eclegalbody")), info));
o.setEclegalperson(field(Boolean.toString(rs.getBoolean("eclegalperson")), info));
o.setEcnonprofit(field(Boolean.toString(rs.getBoolean("ecnonprofit")), info));
o
.setEcresearchorganization(field(Boolean.toString(rs.getBoolean("ecresearchorganization")), info));
o.setEchighereducation(field(Boolean.toString(rs.getBoolean("echighereducation")), info));
o
.setEcinternationalorganizationeurinterests(
field(Boolean.toString(rs.getBoolean("ecinternationalorganizationeurinterests")), info));
o
.setEcinternationalorganization(
field(Boolean.toString(rs.getBoolean("ecinternationalorganization")), info));
o.setEcenterprise(field(Boolean.toString(rs.getBoolean("ecenterprise")), info));
o.setEcsmevalidated(field(Boolean.toString(rs.getBoolean("ecsmevalidated")), info));
o.setEcnutscode(field(Boolean.toString(rs.getBoolean("ecnutscode")), info));
o.setCountry(prepareQualifierSplitting(rs.getString("country")));
o.setDataInfo(info);
o.setLastupdatetimestamp(lastUpdateTimestamp);
return Arrays.asList(o);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
public List<Oaf> processDatasourceOrganization(final ResultSet rs) {
try {
final DataInfo info = prepareDataInfo(rs);
final String orgId = createOpenaireId(20, rs.getString("organization"), true);
final String dsId = createOpenaireId(10, rs.getString("datasource"), true);
final List<KeyValue> collectedFrom = listKeyValues(
createOpenaireId(10, rs.getString("collectedfromid"), true), rs.getString("collectedfromname"));
final Relation r1 = new Relation();
r1.setRelType(DATASOURCE_ORGANIZATION);
r1.setSubRelType(PROVISION);
r1.setRelClass(IS_PROVIDED_BY);
r1.setSource(dsId);
r1.setTarget(orgId);
r1.setCollectedfrom(collectedFrom);
r1.setDataInfo(info);
r1.setLastupdatetimestamp(lastUpdateTimestamp);
final Relation r2 = new Relation();
r2.setRelType(DATASOURCE_ORGANIZATION);
r2.setSubRelType(PROVISION);
r2.setRelClass(PROVIDES);
r2.setSource(orgId);
r2.setTarget(dsId);
r2.setCollectedfrom(collectedFrom);
r2.setDataInfo(info);
r2.setLastupdatetimestamp(lastUpdateTimestamp);
return Arrays.asList(r1, r2);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
public List<Oaf> processProjectOrganization(final ResultSet rs) {
try {
final DataInfo info = prepareDataInfo(rs);
final String orgId = createOpenaireId(20, rs.getString("resporganization"), true);
final String projectId = createOpenaireId(40, rs.getString("project"), true);
final List<KeyValue> collectedFrom = listKeyValues(
createOpenaireId(10, rs.getString("collectedfromid"), true), rs.getString("collectedfromname"));
final Relation r1 = new Relation();
r1.setRelType(PROJECT_ORGANIZATION);
r1.setSubRelType(PARTICIPATION);
r1.setRelClass(HAS_PARTICIPANT);
r1.setSource(projectId);
r1.setTarget(orgId);
r1.setCollectedfrom(collectedFrom);
r1.setDataInfo(info);
r1.setLastupdatetimestamp(lastUpdateTimestamp);
final Relation r2 = new Relation();
r2.setRelType(PROJECT_ORGANIZATION);
r2.setSubRelType(PARTICIPATION);
r2.setRelClass(IS_PARTICIPANT);
r2.setSource(orgId);
r2.setTarget(projectId);
r2.setCollectedfrom(collectedFrom);
r2.setDataInfo(info);
r2.setLastupdatetimestamp(lastUpdateTimestamp);
return Arrays.asList(r1, r2);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
public List<Oaf> processClaims(final ResultSet rs) {
final DataInfo info = dataInfo(
false, null, false, false,
qualifier(USER_CLAIM, USER_CLAIM, DNET_PROVENANCE_ACTIONS, DNET_PROVENANCE_ACTIONS), "0.9");
final List<KeyValue> collectedFrom = listKeyValues(
createOpenaireId(10, "infrastruct_::openaire", true), "OpenAIRE");
try {
if (rs.getString(SOURCE_TYPE).equals("context")) {
final Result r;
if (rs.getString(TARGET_TYPE).equals("dataset")) {
r = new Dataset();
r.setResulttype(DATASET_DEFAULT_RESULTTYPE);
} else if (rs.getString(TARGET_TYPE).equals("software")) {
r = new Software();
r.setResulttype(SOFTWARE_DEFAULT_RESULTTYPE);
} else if (rs.getString(TARGET_TYPE).equals("other")) {
r = new OtherResearchProduct();
r.setResulttype(ORP_DEFAULT_RESULTTYPE);
} else {
r = new Publication();
r.setResulttype(PUBLICATION_DEFAULT_RESULTTYPE);
}
r.setId(createOpenaireId(50, rs.getString("target_id"), false));
r.setLastupdatetimestamp(lastUpdateTimestamp);
r.setContext(prepareContext(rs.getString("source_id"), info));
r.setDataInfo(info);
r.setCollectedfrom(collectedFrom);
return Arrays.asList(r);
} else {
final String sourceId = createOpenaireId(rs.getString(SOURCE_TYPE), rs.getString("source_id"), false);
final String targetId = createOpenaireId(rs.getString(TARGET_TYPE), rs.getString("target_id"), false);
final Relation r1 = new Relation();
final Relation r2 = new Relation();
if (rs.getString(SOURCE_TYPE).equals("project")) {
r1.setCollectedfrom(collectedFrom);
r1.setRelType(RESULT_PROJECT);
r1.setSubRelType(OUTCOME);
r1.setRelClass(PRODUCES);
r2.setCollectedfrom(collectedFrom);
r2.setRelType(RESULT_PROJECT);
r2.setSubRelType(OUTCOME);
r2.setRelClass(IS_PRODUCED_BY);
} else {
r1.setCollectedfrom(collectedFrom);
r1.setRelType(RESULT_RESULT);
r1.setSubRelType(RELATIONSHIP);
r1.setRelClass(IS_RELATED_TO);
r2.setCollectedfrom(collectedFrom);
r2.setRelType(RESULT_RESULT);
r2.setSubRelType(RELATIONSHIP);
r2.setRelClass(IS_RELATED_TO);
}
r1.setSource(sourceId);
r1.setTarget(targetId);
r1.setDataInfo(info);
r1.setLastupdatetimestamp(lastUpdateTimestamp);
r2.setSource(targetId);
r2.setTarget(sourceId);
r2.setDataInfo(info);
r2.setLastupdatetimestamp(lastUpdateTimestamp);
return Arrays.asList(r1, r2);
}
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
public List<Oaf> processOrgOrgSimRels(final ResultSet rs) {
try {
final DataInfo info = prepareDataInfo(rs); // TODO
final String orgId1 = createOpenaireId(20, rs.getString("id1"), true);
final String orgId2 = createOpenaireId(40, rs.getString("id2"), true);
final List<KeyValue> collectedFrom = listKeyValues(
createOpenaireId(10, rs.getString("collectedfromid"), true), rs.getString("collectedfromname"));
final Relation r1 = new Relation();
r1.setRelType(ORG_ORG_RELTYPE);
r1.setSubRelType(ORG_ORG_SUBRELTYPE);
r1.setRelClass(ORG_ORG_CLASS);
r1.setSource(orgId1);
r1.setTarget(orgId2);
r1.setCollectedfrom(collectedFrom);
r1.setDataInfo(info);
r1.setLastupdatetimestamp(lastUpdateTimestamp);
final Relation r2 = new Relation();
r2.setRelType(ORG_ORG_RELTYPE);
r2.setSubRelType(ORG_ORG_SUBRELTYPE);
r2.setRelClass(ORG_ORG_CLASS);
r2.setSource(orgId2);
r2.setTarget(orgId1);
r2.setCollectedfrom(collectedFrom);
r2.setDataInfo(info);
r2.setLastupdatetimestamp(lastUpdateTimestamp);
return Arrays.asList(r1, r2);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
private List<Context> prepareContext(final String id, final DataInfo dataInfo) {
final Context context = new Context();
context.setId(id);
context.setDataInfo(Arrays.asList(dataInfo));
return Arrays.asList(context);
}
private DataInfo prepareDataInfo(final ResultSet rs) throws SQLException {
final Boolean deletedbyinference = rs.getBoolean("deletedbyinference");
final String inferenceprovenance = rs.getString("inferenceprovenance");
final Boolean inferred = rs.getBoolean("inferred");
final String trust = rs.getString("trust");
return dataInfo(
deletedbyinference, inferenceprovenance, inferred, false, ENTITYREGISTRY_PROVENANCE_ACTION, trust);
}
private Qualifier prepareQualifierSplitting(final String s) {
if (StringUtils.isBlank(s)) {
return null;
}
final String[] arr = s.split("@@@");
return arr.length == 2 ? vocs.getTermAsQualifier(arr[1], arr[0]) : null;
}
private List<Field<String>> prepareListFields(final Array array, final DataInfo info) {
try {
return array != null ? listFields(info, (String[]) array.getArray()) : new ArrayList<>();
} catch (final SQLException e) {
throw new RuntimeException("Invalid SQL array", e);
}
}
private StructuredProperty prepareStructProp(final String s, final DataInfo dataInfo) {
if (StringUtils.isBlank(s)) {
return null;
}
final String[] parts = s.split("###");
if (parts.length == 2) {
final String value = parts[0];
final String[] arr = parts[1].split("@@@");
if (arr.length == 2) {
return structuredProperty(value, vocs.getTermAsQualifier(arr[1], arr[0]), dataInfo);
}
}
return null;
}
private List<StructuredProperty> prepareListOfStructProps(
final Array array,
final DataInfo dataInfo) throws SQLException {
final List<StructuredProperty> res = new ArrayList<>();
if (array != null) {
for (final String s : (String[]) array.getArray()) {
final StructuredProperty sp = prepareStructProp(s, dataInfo);
if (sp != null) {
res.add(sp);
}
}
}
return res;
}
private Journal prepareJournal(final String name, final String sj, final DataInfo info) {
if (StringUtils.isNotBlank(sj)) {
final String[] arr = sj.split("@@@");
if (arr.length == 3) {
final String issn = StringUtils.isNotBlank(arr[0]) ? arr[0].trim() : null;
final String eissn = StringUtils.isNotBlank(arr[1]) ? arr[1].trim() : null;
final String lissn = StringUtils.isNotBlank(arr[2]) ? arr[2].trim() : null;
if (issn != null || eissn != null || lissn != null) {
return journal(name, issn, eissn, eissn, null, null, null, null, null, null, null, info);
}
}
}
return null;
}
@Override
public void close() throws IOException {
super.close();
dbClient.close();
}
}

View File

@ -0,0 +1,64 @@
package eu.dnetlib.dhp.oa.graph.raw;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.DbClient;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
public class MigrateCordaOrgsApplication extends AbstractDbApplication {
private static final Logger log = LoggerFactory.getLogger(MigrateCordaOrgsApplication.class);
public static final String SOURCE_TYPE = "source_type";
public static final String TARGET_TYPE = "target_type";
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
MigrateCordaOrgsApplication.class
.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/migrate_openorgs_parameters.json")));
parser.parseArgument(args);
final String dbUrl = parser.get("postgresUrl");
log.info("postgresUrl: {}", dbUrl);
final String dbUser = parser.get("postgresUser");
log.info("postgresUser: {}", dbUser);
final String dbPassword = parser.get("postgresPassword");
log.info("postgresPassword: xxx");
final String isLookupUrl = parser.get("isLookupUrl");
log.info("isLookupUrl: {}", isLookupUrl);
final String hdfsPath = parser.get("hdfsPath");
log.info("hdfsPath: {}", hdfsPath);
try (final MigrateCordaOrgsApplication mapper = new MigrateCordaOrgsApplication(hdfsPath, dbUrl, dbUser,
dbPassword, isLookupUrl)) {
log.info("Processing CORDA orgs...");
mapper.execute("queryCordaOrganizations.sql", mapper::processOrganization);
}
log.info("All done.");
}
public MigrateCordaOrgsApplication(final String hdfsPath, final String dbUrl, final String dbUser,
final String dbPassword, final String isLookupUrl)
throws Exception {
super(hdfsPath, dbUrl, dbUser, dbPassword, isLookupUrl);
}
protected MigrateCordaOrgsApplication(final DbClient dbClient, final VocabularyGroup vocs) { // ONLY FOT TESTS
super(dbClient, vocs);
}
}

View File

@ -1,91 +1,21 @@
package eu.dnetlib.dhp.oa.graph.raw; package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.asString;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.dataInfo;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.journal;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.listFields;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.listKeyValues;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.qualifier;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DATASET_DEFAULT_RESULTTYPE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DATASOURCE_ORGANIZATION;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTIONS;
import static eu.dnetlib.dhp.schema.common.ModelConstants.ENTITYREGISTRY_PROVENANCE_ACTION;
import static eu.dnetlib.dhp.schema.common.ModelConstants.HAS_PARTICIPANT;
import static eu.dnetlib.dhp.schema.common.ModelConstants.IS_PARTICIPANT;
import static eu.dnetlib.dhp.schema.common.ModelConstants.IS_PRODUCED_BY;
import static eu.dnetlib.dhp.schema.common.ModelConstants.IS_PROVIDED_BY;
import static eu.dnetlib.dhp.schema.common.ModelConstants.IS_RELATED_TO;
import static eu.dnetlib.dhp.schema.common.ModelConstants.ORP_DEFAULT_RESULTTYPE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.OUTCOME;
import static eu.dnetlib.dhp.schema.common.ModelConstants.PARTICIPATION;
import static eu.dnetlib.dhp.schema.common.ModelConstants.PRODUCES;
import static eu.dnetlib.dhp.schema.common.ModelConstants.PROJECT_ORGANIZATION;
import static eu.dnetlib.dhp.schema.common.ModelConstants.PROVIDES;
import static eu.dnetlib.dhp.schema.common.ModelConstants.PROVISION;
import static eu.dnetlib.dhp.schema.common.ModelConstants.PUBLICATION_DEFAULT_RESULTTYPE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.RELATIONSHIP;
import static eu.dnetlib.dhp.schema.common.ModelConstants.RESULT_PROJECT;
import static eu.dnetlib.dhp.schema.common.ModelConstants.RESULT_RESULT;
import static eu.dnetlib.dhp.schema.common.ModelConstants.SOFTWARE_DEFAULT_RESULTTYPE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.USER_CLAIM;
import java.io.Closeable;
import java.io.IOException;
import java.sql.Array;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.DbClient; import eu.dnetlib.dhp.common.DbClient;
import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.Context;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.Journal;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.Software;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
public class MigrateDbEntitiesApplication extends AbstractMigrationApplication implements Closeable { public class MigrateDbEntitiesApplication extends AbstractDbApplication {
private static final Logger log = LoggerFactory.getLogger(MigrateDbEntitiesApplication.class); private static final Logger log = LoggerFactory.getLogger(MigrateDbEntitiesApplication.class);
public static final String SOURCE_TYPE = "source_type"; public static final String SOURCE_TYPE = "source_type";
public static final String TARGET_TYPE = "target_type"; public static final String TARGET_TYPE = "target_type";
private final DbClient dbClient;
private final long lastUpdateTimestamp;
private final VocabularyGroup vocs;
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser( final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils IOUtils
@ -116,459 +46,44 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
final boolean processClaims = parser.get("action") != null && parser.get("action").equalsIgnoreCase("claims"); final boolean processClaims = parser.get("action") != null && parser.get("action").equalsIgnoreCase("claims");
log.info("processClaims: {}", processClaims); log.info("processClaims: {}", processClaims);
try (final MigrateDbEntitiesApplication smdbe = new MigrateDbEntitiesApplication(hdfsPath, dbUrl, dbUser, try (final MigrateDbEntitiesApplication mapper = new MigrateDbEntitiesApplication(hdfsPath, dbUrl, dbUser,
dbPassword, isLookupUrl)) { dbPassword, isLookupUrl)) {
if (processClaims) { if (processClaims) {
log.info("Processing claims..."); log.info("Processing claims...");
smdbe.execute("queryClaims.sql", smdbe::processClaims); mapper.execute("queryClaims.sql", mapper::processClaims);
} else { } else {
log.info("Processing datasources..."); log.info("Processing datasources...");
smdbe.execute("queryDatasources.sql", smdbe::processDatasource); mapper.execute("queryDatasources.sql", mapper::processDatasource);
log.info("Processing projects..."); log.info("Processing projects...");
if (dbSchema.equalsIgnoreCase("beta")) { if (dbSchema.equalsIgnoreCase("beta")) {
smdbe.execute("queryProjects.sql", smdbe::processProject); mapper.execute("queryProjects.sql", mapper::processProject);
} else { } else {
smdbe.execute("queryProjects_production.sql", smdbe::processProject); mapper.execute("queryProjects_production.sql", mapper::processProject);
} }
log.info("Processing orgs..."); log.info("Processing orgs...");
smdbe.execute("queryOrganizations.sql", smdbe::processOrganization); mapper.execute("queryOrganizations.sql", mapper::processOrganization);
log.info("Processing relationsNoRemoval ds <-> orgs ..."); log.info("Processing relationsNoRemoval ds <-> orgs ...");
smdbe.execute("queryDatasourceOrganization.sql", smdbe::processDatasourceOrganization); mapper.execute("queryDatasourceOrganization.sql", mapper::processDatasourceOrganization);
log.info("Processing projects <-> orgs ..."); log.info("Processing projects <-> orgs ...");
smdbe.execute("queryProjectOrganization.sql", smdbe::processProjectOrganization); mapper.execute("queryProjectOrganization.sql", mapper::processProjectOrganization);
} }
log.info("All done."); log.info("All done.");
} }
} }
protected MigrateDbEntitiesApplication(final VocabularyGroup vocs) { // ONLY FOR UNIT TEST public MigrateDbEntitiesApplication(final String hdfsPath, final String dbUrl, final String dbUser,
super(); final String dbPassword, final String isLookupUrl)
this.dbClient = null;
this.lastUpdateTimestamp = new Date().getTime();
this.vocs = vocs;
}
public MigrateDbEntitiesApplication(
final String hdfsPath, final String dbUrl, final String dbUser, final String dbPassword,
final String isLookupUrl)
throws Exception { throws Exception {
super(hdfsPath); super(hdfsPath, dbUrl, dbUser, dbPassword, isLookupUrl);
this.dbClient = new DbClient(dbUrl, dbUser, dbPassword);
this.lastUpdateTimestamp = new Date().getTime();
this.vocs = VocabularyGroup.loadVocsFromIS(ISLookupClientFactory.getLookUpService(isLookupUrl));
} }
public void execute(final String sqlFile, final Function<ResultSet, List<Oaf>> producer) protected MigrateDbEntitiesApplication(final DbClient dbClient, final VocabularyGroup vocs) { // ONLY FOT TESTS
throws Exception { super(dbClient, vocs);
final String sql = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/sql/" + sqlFile));
final Consumer<ResultSet> consumer = rs -> producer.apply(rs).forEach(oaf -> emitOaf(oaf));
dbClient.processResults(sql, consumer);
}
public List<Oaf> processDatasource(final ResultSet rs) {
try {
final DataInfo info = prepareDataInfo(rs);
final Datasource ds = new Datasource();
ds.setId(createOpenaireId(10, rs.getString("datasourceid"), true));
ds.setOriginalId(Arrays.asList((String[]) rs.getArray("identities").getArray()));
ds
.setCollectedfrom(
listKeyValues(
createOpenaireId(10, rs.getString("collectedfromid"), true),
rs.getString("collectedfromname")));
ds.setPid(new ArrayList<>());
ds.setDateofcollection(asString(rs.getDate("dateofcollection")));
ds.setDateoftransformation(null); // Value not returned by the SQL query
ds.setExtraInfo(new ArrayList<>()); // Values not present in the DB
ds.setOaiprovenance(null); // Values not present in the DB
ds.setDatasourcetype(prepareQualifierSplitting(rs.getString("datasourcetype")));
ds.setOpenairecompatibility(prepareQualifierSplitting(rs.getString("openairecompatibility")));
ds.setOfficialname(field(rs.getString("officialname"), info));
ds.setEnglishname(field(rs.getString("englishname"), info));
ds.setWebsiteurl(field(rs.getString("websiteurl"), info));
ds.setLogourl(field(rs.getString("logourl"), info));
ds.setContactemail(field(rs.getString("contactemail"), info));
ds.setNamespaceprefix(field(rs.getString("namespaceprefix"), info));
ds.setLatitude(field(Double.toString(rs.getDouble("latitude")), info));
ds.setLongitude(field(Double.toString(rs.getDouble("longitude")), info));
ds.setDateofvalidation(field(asString(rs.getDate("dateofvalidation")), info));
ds.setDescription(field(rs.getString("description"), info));
ds.setSubjects(prepareListOfStructProps(rs.getArray("subjects"), info));
ds.setOdnumberofitems(field(Double.toString(rs.getInt("odnumberofitems")), info));
ds.setOdnumberofitemsdate(field(asString(rs.getDate("odnumberofitemsdate")), info));
ds.setOdpolicies(field(rs.getString("odpolicies"), info));
ds.setOdlanguages(prepareListFields(rs.getArray("odlanguages"), info));
ds.setOdcontenttypes(prepareListFields(rs.getArray("odcontenttypes"), info));
ds.setAccessinfopackage(prepareListFields(rs.getArray("accessinfopackage"), info));
ds.setReleasestartdate(field(asString(rs.getDate("releasestartdate")), info));
ds.setReleaseenddate(field(asString(rs.getDate("releaseenddate")), info));
ds.setMissionstatementurl(field(rs.getString("missionstatementurl"), info));
ds.setDataprovider(field(rs.getBoolean("dataprovider"), info));
ds.setServiceprovider(field(rs.getBoolean("serviceprovider"), info));
ds.setDatabaseaccesstype(field(rs.getString("databaseaccesstype"), info));
ds.setDatauploadtype(field(rs.getString("datauploadtype"), info));
ds.setDatabaseaccessrestriction(field(rs.getString("databaseaccessrestriction"), info));
ds.setDatauploadrestriction(field(rs.getString("datauploadrestriction"), info));
ds.setVersioning(field(rs.getBoolean("versioning"), info));
ds.setCitationguidelineurl(field(rs.getString("citationguidelineurl"), info));
ds.setQualitymanagementkind(field(rs.getString("qualitymanagementkind"), info));
ds.setPidsystems(field(rs.getString("pidsystems"), info));
ds.setCertificates(field(rs.getString("certificates"), info));
ds.setPolicies(new ArrayList<>()); // The sql query returns an empty array
ds
.setJournal(prepareJournal(rs.getString("officialname"), rs.getString("journal"), info)); // Journal
ds.setDataInfo(info);
ds.setLastupdatetimestamp(lastUpdateTimestamp);
return Arrays.asList(ds);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
public List<Oaf> processProject(final ResultSet rs) {
try {
final DataInfo info = prepareDataInfo(rs);
final Project p = new Project();
p.setId(createOpenaireId(40, rs.getString("projectid"), true));
p.setOriginalId(Arrays.asList(rs.getString("projectid")));
p
.setCollectedfrom(
listKeyValues(
createOpenaireId(10, rs.getString("collectedfromid"), true),
rs.getString("collectedfromname")));
p.setPid(new ArrayList<>());
p.setDateofcollection(asString(rs.getDate("dateofcollection")));
p.setDateoftransformation(asString(rs.getDate("dateoftransformation")));
p.setExtraInfo(new ArrayList<>()); // Values not present in the DB
p.setOaiprovenance(null); // Values not present in the DB
p.setWebsiteurl(field(rs.getString("websiteurl"), info));
p.setCode(field(rs.getString("code"), info));
p.setAcronym(field(rs.getString("acronym"), info));
p.setTitle(field(rs.getString("title"), info));
p.setStartdate(field(asString(rs.getDate("startdate")), info));
p.setEnddate(field(asString(rs.getDate("enddate")), info));
p.setCallidentifier(field(rs.getString("callidentifier"), info));
p.setKeywords(field(rs.getString("keywords"), info));
p.setDuration(field(Integer.toString(rs.getInt("duration")), info));
p.setEcsc39(field(Boolean.toString(rs.getBoolean("ecsc39")), info));
p
.setOamandatepublications(field(Boolean.toString(rs.getBoolean("oamandatepublications")), info));
p.setEcarticle29_3(field(Boolean.toString(rs.getBoolean("ecarticle29_3")), info));
p.setSubjects(prepareListOfStructProps(rs.getArray("subjects"), info));
p.setFundingtree(prepareListFields(rs.getArray("fundingtree"), info));
p.setContracttype(prepareQualifierSplitting(rs.getString("contracttype")));
p.setOptional1(field(rs.getString("optional1"), info));
p.setOptional2(field(rs.getString("optional2"), info));
p.setJsonextrainfo(field(rs.getString("jsonextrainfo"), info));
p.setContactfullname(field(rs.getString("contactfullname"), info));
p.setContactfax(field(rs.getString("contactfax"), info));
p.setContactphone(field(rs.getString("contactphone"), info));
p.setContactemail(field(rs.getString("contactemail"), info));
p.setSummary(field(rs.getString("summary"), info));
p.setCurrency(field(rs.getString("currency"), info));
p.setTotalcost(new Float(rs.getDouble("totalcost")));
p.setFundedamount(new Float(rs.getDouble("fundedamount")));
p.setDataInfo(info);
p.setLastupdatetimestamp(lastUpdateTimestamp);
return Arrays.asList(p);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
public List<Oaf> processOrganization(final ResultSet rs) {
try {
final DataInfo info = prepareDataInfo(rs);
final Organization o = new Organization();
o.setId(createOpenaireId(20, rs.getString("organizationid"), true));
o.setOriginalId(Arrays.asList(rs.getString("organizationid")));
o
.setCollectedfrom(
listKeyValues(
createOpenaireId(10, rs.getString("collectedfromid"), true),
rs.getString("collectedfromname")));
o.setPid(new ArrayList<>());
o.setDateofcollection(asString(rs.getDate("dateofcollection")));
o.setDateoftransformation(asString(rs.getDate("dateoftransformation")));
o.setExtraInfo(new ArrayList<>()); // Values not present in the DB
o.setOaiprovenance(null); // Values not present in the DB
o.setLegalshortname(field(rs.getString("legalshortname"), info));
o.setLegalname(field(rs.getString("legalname"), info));
o.setAlternativeNames(new ArrayList<>()); // Values not returned by the SQL query
o.setWebsiteurl(field(rs.getString("websiteurl"), info));
o.setLogourl(field(rs.getString("logourl"), info));
o.setEclegalbody(field(Boolean.toString(rs.getBoolean("eclegalbody")), info));
o.setEclegalperson(field(Boolean.toString(rs.getBoolean("eclegalperson")), info));
o.setEcnonprofit(field(Boolean.toString(rs.getBoolean("ecnonprofit")), info));
o
.setEcresearchorganization(field(Boolean.toString(rs.getBoolean("ecresearchorganization")), info));
o.setEchighereducation(field(Boolean.toString(rs.getBoolean("echighereducation")), info));
o
.setEcinternationalorganizationeurinterests(
field(Boolean.toString(rs.getBoolean("ecinternationalorganizationeurinterests")), info));
o
.setEcinternationalorganization(
field(Boolean.toString(rs.getBoolean("ecinternationalorganization")), info));
o.setEcenterprise(field(Boolean.toString(rs.getBoolean("ecenterprise")), info));
o.setEcsmevalidated(field(Boolean.toString(rs.getBoolean("ecsmevalidated")), info));
o.setEcnutscode(field(Boolean.toString(rs.getBoolean("ecnutscode")), info));
o.setCountry(prepareQualifierSplitting(rs.getString("country")));
o.setDataInfo(info);
o.setLastupdatetimestamp(lastUpdateTimestamp);
return Arrays.asList(o);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
public List<Oaf> processDatasourceOrganization(final ResultSet rs) {
try {
final DataInfo info = prepareDataInfo(rs);
final String orgId = createOpenaireId(20, rs.getString("organization"), true);
final String dsId = createOpenaireId(10, rs.getString("datasource"), true);
final List<KeyValue> collectedFrom = listKeyValues(
createOpenaireId(10, rs.getString("collectedfromid"), true), rs.getString("collectedfromname"));
final Relation r1 = new Relation();
r1.setRelType(DATASOURCE_ORGANIZATION);
r1.setSubRelType(PROVISION);
r1.setRelClass(IS_PROVIDED_BY);
r1.setSource(dsId);
r1.setTarget(orgId);
r1.setCollectedfrom(collectedFrom);
r1.setDataInfo(info);
r1.setLastupdatetimestamp(lastUpdateTimestamp);
final Relation r2 = new Relation();
r2.setRelType(DATASOURCE_ORGANIZATION);
r2.setSubRelType(PROVISION);
r2.setRelClass(PROVIDES);
r2.setSource(orgId);
r2.setTarget(dsId);
r2.setCollectedfrom(collectedFrom);
r2.setDataInfo(info);
r2.setLastupdatetimestamp(lastUpdateTimestamp);
return Arrays.asList(r1, r2);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
public List<Oaf> processProjectOrganization(final ResultSet rs) {
try {
final DataInfo info = prepareDataInfo(rs);
final String orgId = createOpenaireId(20, rs.getString("resporganization"), true);
final String projectId = createOpenaireId(40, rs.getString("project"), true);
final List<KeyValue> collectedFrom = listKeyValues(
createOpenaireId(10, rs.getString("collectedfromid"), true), rs.getString("collectedfromname"));
final Relation r1 = new Relation();
r1.setRelType(PROJECT_ORGANIZATION);
r1.setSubRelType(PARTICIPATION);
r1.setRelClass(HAS_PARTICIPANT);
r1.setSource(projectId);
r1.setTarget(orgId);
r1.setCollectedfrom(collectedFrom);
r1.setDataInfo(info);
r1.setLastupdatetimestamp(lastUpdateTimestamp);
final Relation r2 = new Relation();
r2.setRelType(PROJECT_ORGANIZATION);
r2.setSubRelType(PARTICIPATION);
r2.setRelClass(IS_PARTICIPANT);
r2.setSource(orgId);
r2.setTarget(projectId);
r2.setCollectedfrom(collectedFrom);
r2.setDataInfo(info);
r2.setLastupdatetimestamp(lastUpdateTimestamp);
return Arrays.asList(r1, r2);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
public List<Oaf> processClaims(final ResultSet rs) {
final DataInfo info = dataInfo(
false, null, false, false,
qualifier(USER_CLAIM, USER_CLAIM, DNET_PROVENANCE_ACTIONS, DNET_PROVENANCE_ACTIONS), "0.9");
final List<KeyValue> collectedFrom = listKeyValues(
createOpenaireId(10, "infrastruct_::openaire", true), "OpenAIRE");
try {
if (rs.getString(SOURCE_TYPE).equals("context")) {
final Result r;
if (rs.getString(TARGET_TYPE).equals("dataset")) {
r = new Dataset();
r.setResulttype(DATASET_DEFAULT_RESULTTYPE);
} else if (rs.getString(TARGET_TYPE).equals("software")) {
r = new Software();
r.setResulttype(SOFTWARE_DEFAULT_RESULTTYPE);
} else if (rs.getString(TARGET_TYPE).equals("other")) {
r = new OtherResearchProduct();
r.setResulttype(ORP_DEFAULT_RESULTTYPE);
} else {
r = new Publication();
r.setResulttype(PUBLICATION_DEFAULT_RESULTTYPE);
}
r.setId(createOpenaireId(50, rs.getString("target_id"), false));
r.setLastupdatetimestamp(lastUpdateTimestamp);
r.setContext(prepareContext(rs.getString("source_id"), info));
r.setDataInfo(info);
r.setCollectedfrom(collectedFrom);
return Arrays.asList(r);
} else {
final String sourceId = createOpenaireId(rs.getString(SOURCE_TYPE), rs.getString("source_id"), false);
final String targetId = createOpenaireId(rs.getString(TARGET_TYPE), rs.getString("target_id"), false);
final Relation r1 = new Relation();
final Relation r2 = new Relation();
if (rs.getString(SOURCE_TYPE).equals("project")) {
r1.setCollectedfrom(collectedFrom);
r1.setRelType(RESULT_PROJECT);
r1.setSubRelType(OUTCOME);
r1.setRelClass(PRODUCES);
r2.setCollectedfrom(collectedFrom);
r2.setRelType(RESULT_PROJECT);
r2.setSubRelType(OUTCOME);
r2.setRelClass(IS_PRODUCED_BY);
} else {
r1.setCollectedfrom(collectedFrom);
r1.setRelType(RESULT_RESULT);
r1.setSubRelType(RELATIONSHIP);
r1.setRelClass(IS_RELATED_TO);
r2.setCollectedfrom(collectedFrom);
r2.setRelType(RESULT_RESULT);
r2.setSubRelType(RELATIONSHIP);
r2.setRelClass(IS_RELATED_TO);
}
r1.setSource(sourceId);
r1.setTarget(targetId);
r1.setDataInfo(info);
r1.setLastupdatetimestamp(lastUpdateTimestamp);
r2.setSource(targetId);
r2.setTarget(sourceId);
r2.setDataInfo(info);
r2.setLastupdatetimestamp(lastUpdateTimestamp);
return Arrays.asList(r1, r2);
}
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
private List<Context> prepareContext(final String id, final DataInfo dataInfo) {
final Context context = new Context();
context.setId(id);
context.setDataInfo(Arrays.asList(dataInfo));
return Arrays.asList(context);
}
private DataInfo prepareDataInfo(final ResultSet rs) throws SQLException {
final Boolean deletedbyinference = rs.getBoolean("deletedbyinference");
final String inferenceprovenance = rs.getString("inferenceprovenance");
final Boolean inferred = rs.getBoolean("inferred");
final String trust = rs.getString("trust");
return dataInfo(
deletedbyinference, inferenceprovenance, inferred, false, ENTITYREGISTRY_PROVENANCE_ACTION, trust);
}
private Qualifier prepareQualifierSplitting(final String s) {
if (StringUtils.isBlank(s)) {
return null;
}
final String[] arr = s.split("@@@");
return arr.length == 2 ? vocs.getTermAsQualifier(arr[1], arr[0]) : null;
}
private List<Field<String>> prepareListFields(final Array array, final DataInfo info) {
try {
return array != null ? listFields(info, (String[]) array.getArray()) : new ArrayList<>();
} catch (final SQLException e) {
throw new RuntimeException("Invalid SQL array", e);
}
}
private StructuredProperty prepareStructProp(final String s, final DataInfo dataInfo) {
if (StringUtils.isBlank(s)) {
return null;
}
final String[] parts = s.split("###");
if (parts.length == 2) {
final String value = parts[0];
final String[] arr = parts[1].split("@@@");
if (arr.length == 2) {
return structuredProperty(value, vocs.getTermAsQualifier(arr[1], arr[0]), dataInfo);
}
}
return null;
}
private List<StructuredProperty> prepareListOfStructProps(
final Array array,
final DataInfo dataInfo) throws SQLException {
final List<StructuredProperty> res = new ArrayList<>();
if (array != null) {
for (final String s : (String[]) array.getArray()) {
final StructuredProperty sp = prepareStructProp(s, dataInfo);
if (sp != null) {
res.add(sp);
}
}
}
return res;
}
private Journal prepareJournal(final String name, final String sj, final DataInfo info) {
if (StringUtils.isNotBlank(sj)) {
final String[] arr = sj.split("@@@");
if (arr.length == 3) {
final String issn = StringUtils.isNotBlank(arr[0]) ? arr[0].trim() : null;
final String eissn = StringUtils.isNotBlank(arr[1]) ? arr[1].trim() : null;
final String lissn = StringUtils.isNotBlank(arr[2]) ? arr[2].trim() : null;
if (issn != null || eissn != null || lissn != null) {
return journal(name, issn, eissn, eissn, null, null, null, null, null, null, null, info);
}
}
}
return null;
}
@Override
public void close() throws IOException {
super.close();
dbClient.close();
} }
} }

View File

@ -0,0 +1,61 @@
package eu.dnetlib.dhp.oa.graph.raw;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
public class MigrateOpenOrgsApplication extends AbstractDbApplication {
private static final Logger log = LoggerFactory.getLogger(MigrateOpenOrgsApplication.class);
public static final String SOURCE_TYPE = "source_type";
public static final String TARGET_TYPE = "target_type";
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
MigrateOpenOrgsApplication.class
.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/migrate_openorgs_parameters.json")));
parser.parseArgument(args);
final String dbUrl = parser.get("postgresUrl");
log.info("postgresUrl: {}", dbUrl);
final String dbUser = parser.get("postgresUser");
log.info("postgresUser: {}", dbUser);
final String dbPassword = parser.get("postgresPassword");
log.info("postgresPassword: xxx");
final String isLookupUrl = parser.get("isLookupUrl");
log.info("isLookupUrl: {}", isLookupUrl);
final String hdfsPath = parser.get("hdfsPath");
log.info("hdfsPath: {}", hdfsPath);
try (final MigrateOpenOrgsApplication mapper = new MigrateOpenOrgsApplication(hdfsPath, dbUrl, dbUser,
dbPassword, isLookupUrl)) {
log.info("Processing open orgs...");
mapper.execute("queryOrganizationsFromOpenOrgsDB.sql", mapper::processOrganization);
log.info("Processing simrels...");
mapper.execute("querySimilarityFromOpenOrgsDB.sql", mapper::processOrgOrgSimRels);
log.info("All done.");
}
}
public MigrateOpenOrgsApplication(final String hdfsPath, final String dbUrl, final String dbUser,
final String dbPassword, final String isLookupUrl)
throws Exception {
super(hdfsPath, dbUrl, dbUser, dbPassword, isLookupUrl);
}
}

View File

@ -39,10 +39,9 @@ public class AbstractMigrationApplication implements Closeable {
this.writer = SequenceFile this.writer = SequenceFile
.createWriter( .createWriter(
getConf(), getConf(), SequenceFile.Writer.file(new Path(hdfsPath)), SequenceFile.Writer.keyClass(Text.class),
SequenceFile.Writer.file(new Path(hdfsPath)), SequenceFile.Writer
SequenceFile.Writer.keyClass(Text.class), .valueClass(Text.class));
SequenceFile.Writer.valueClass(Text.class));
} }
private Configuration getConf() throws IOException { private Configuration getConf() throws IOException {

View File

@ -0,0 +1,32 @@
[
{
"paramName": "p",
"paramLongName": "hdfsPath",
"paramDescription": "the path where storing the sequential file",
"paramRequired": true
},
{
"paramName": "pgurl",
"paramLongName": "postgresUrl",
"paramDescription": "postgres url, example: jdbc:postgresql://localhost:5432/testdb",
"paramRequired": true
},
{
"paramName": "pguser",
"paramLongName": "postgresUser",
"paramDescription": "postgres user",
"paramRequired": false
},
{
"paramName": "pgpasswd",
"paramLongName": "postgresPassword",
"paramDescription": "postgres password",
"paramRequired": false
},
{
"paramName": "isu",
"paramLongName": "isLookupUrl",
"paramDescription": "the url of the ISLookupService",
"paramRequired": true
}
]

View File

@ -25,6 +25,18 @@
<property> <property>
<name>postgresPassword</name> <name>postgresPassword</name>
<description>the password postgres</description> <description>the password postgres</description>
</property>
<property>
<name>postgresOpenOrgsURL</name>
<description>the postgres URL to access to the OpenOrgs database</description>
</property>
<property>
<name>postgresOpenOrgsUser</name>
<description>the user of OpenOrgs database</description>
</property>
<property>
<name>postgresOpenOrgsPassword</name>
<description>the password of OpenOrgs database</description>
</property> </property>
<property> <property>
<name>dbSchema</name> <name>dbSchema</name>
@ -116,8 +128,25 @@
<fork name="start_import"> <fork name="start_import">
<path start="ImportDB"/> <path start="ImportDB"/>
<path start="ImportDB_claims"/> <path start="ImportDB_claims"/>
<path start="ImportDB_openorgs"/>
</fork> </fork>
<action name="ImportDB_openorgs">
<java>
<prepare>
<delete path="${contentPath}/db_openorgs"/>
</prepare>
<main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateOpenOrgsApplication</main-class>
<arg>--hdfsPath</arg><arg>${contentPath}/db_openorgs</arg>
<arg>--postgresUrl</arg><arg>${postgresOpenOrgsURL}</arg>
<arg>--postgresUser</arg><arg>${postgresOpenOrgsUser}</arg>
<arg>--postgresPassword</arg><arg>${postgresOpenOrgsPassword}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
</java>
<ok to="wait_import"/>
<error to="Kill"/>
</action>
<action name="ImportDB_claims"> <action name="ImportDB_claims">
<java> <java>
<prepare> <prepare>
@ -308,7 +337,7 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts> </spark-opts>
<arg>--sourcePaths</arg><arg>${contentPath}/db_records,${contentPath}/oaf_records,${contentPath}/odf_records</arg> <arg>--sourcePaths</arg><arg>${contentPath}/db_openorgs,${contentPath}/db_records,${contentPath}/oaf_records,${contentPath}/odf_records</arg>
<arg>--targetPath</arg><arg>${workingDir}/entities</arg> <arg>--targetPath</arg><arg>${workingDir}/entities</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg> <arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
</spark> </spark>

View File

@ -0,0 +1,204 @@
<workflow-app name="create RAW Graph (openorgs and cordaorgs)" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>graphOutputPath</name>
<description>the target path to store raw graph</description>
</property>
<property>
<name>contentPath</name>
<description>path location to store (or reuse) content from the aggregator</description>
</property>
<property>
<name>postgresURL</name>
<description>the postgres URL to access to the database</description>
</property>
<property>
<name>postgresUser</name>
<description>the user postgres</description>
</property>
<property>
<name>postgresPassword</name>
<description>the password postgres</description>
</property>
<property>
<name>postgresOpenOrgsURL</name>
<description>the postgres URL to access to the OpenOrgs database</description>
</property>
<property>
<name>postgresOpenOrgsUser</name>
<description>the user of OpenOrgs database</description>
</property>
<property>
<name>postgresOpenOrgsPassword</name>
<description>the password of OpenOrgs database</description>
</property>
<property>
<name>dbSchema</name>
<value>beta</value>
<description>the database schema according to the D-Net infrastructure (beta or production)</description>
</property>
<property>
<name>mongoURL</name>
<description>mongoDB url, example: mongodb://[username:password@]host[:port]</description>
</property>
<property>
<name>mongoDb</name>
<description>mongo database</description>
</property>
<property>
<name>isLookupUrl</name>
<description>the address of the lookUp service</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="start_import"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<fork name="start_import">
<path start="ImportDB_corda_orgs"/>
<path start="ImportDB_openorgs"/>
</fork>
<action name="ImportDB_openorgs">
<java>
<prepare>
<delete path="${contentPath}/db_openorgs"/>
</prepare>
<main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateOpenOrgsApplication</main-class>
<arg>--hdfsPath</arg><arg>${contentPath}/db_openorgs</arg>
<arg>--postgresUrl</arg><arg>${postgresOpenOrgsURL}</arg>
<arg>--postgresUser</arg><arg>${postgresOpenOrgsUser}</arg>
<arg>--postgresPassword</arg><arg>${postgresOpenOrgsPassword}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
</java>
<ok to="wait_import"/>
<error to="Kill"/>
</action>
<action name="ImportDB_corda_orgs">
<java>
<prepare>
<delete path="${contentPath}/db_records"/>
</prepare>
<main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateCordaOrgsApplication</main-class>
<arg>--hdfsPath</arg><arg>${contentPath}/db_cordaorgs</arg>
<arg>--postgresUrl</arg><arg>${postgresURL}</arg>
<arg>--postgresUser</arg><arg>${postgresUser}</arg>
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
</java>
<ok to="wait_import"/>
<error to="Kill"/>
</action>
<join name="wait_import" to="GenerateEntities"/>
<action name="GenerateEntities">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>GenerateEntities</name>
<class>eu.dnetlib.dhp.oa.graph.raw.GenerateEntitiesApplication</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePaths</arg><arg>${contentPath}/db_openorgs,${contentPath}/db_cordaorgs</arg>
<arg>--targetPath</arg><arg>${workingDir}/entities</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
</spark>
<ok to="GenerateGraph"/>
<error to="Kill"/>
</action>
<action name="GenerateGraph">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>GenerateGraph</name>
<class>eu.dnetlib.dhp.oa.graph.raw.DispatchEntitiesApplication</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/entities</arg>
<arg>--graphRawPath</arg><arg>${graphOutputPath}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -1,108 +0,0 @@
<workflow-app name="import regular entities as Graph (step 1)" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>migrationPathStep1</name>
<description>the base path to store hdfs file</description>
</property>
<property>
<name>postgresURL</name>
<description>the postgres URL to access to the database</description>
</property>
<property>
<name>postgresUser</name>
<description>the user postgres</description>
</property>
<property>
<name>postgresPassword</name>
<description>the password postgres</description>
</property>
<property>
<name>mongoURL</name>
<description>mongoDB url, example: mongodb://[username:password@]host[:port]</description>
</property>
<property>
<name>mongoDb</name>
<description>mongo database</description>
</property>
<property>
<name>isLookupUrl</name>
<description>the address of the lookUp service</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
</parameters>
<start to="ResetWorkingPath"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="ResetWorkingPath">
<fs>
<delete path='${migrationPathStep1}'/>
<mkdir path='${migrationPathStep1}'/>
</fs>
<ok to="ImportDB"/>
<error to="Kill"/>
</action>
<action name="ImportDB">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.dhp.migration.step1.MigrateDbEntitiesApplication</main-class>
<arg>-p</arg><arg>${migrationPathStep1}/db_records</arg>
<arg>-pgurl</arg><arg>${postgresURL}</arg>
<arg>-pguser</arg><arg>${postgresUser}</arg>
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
<arg>-islookup</arg><arg>${isLookupUrl}</arg>
</java>
<ok to="ImportODF"/>
<error to="Kill"/>
</action>
<action name="ImportODF">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication</main-class>
<arg>-p</arg><arg>${migrationPathStep1}/odf_records</arg>
<arg>-mongourl</arg><arg>${mongoURL}</arg>
<arg>-mongodb</arg><arg>${mongoDb}</arg>
<arg>-f</arg><arg>ODF</arg>
<arg>-l</arg><arg>store</arg>
<arg>-i</arg><arg>cleaned</arg>
</java>
<ok to="ImportOAF"/>
<error to="Kill"/>
</action>
<action name="ImportOAF">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication</main-class>
<arg>-p</arg><arg>${migrationPathStep1}/oaf_records</arg>
<arg>-mongourl</arg><arg>${mongoURL}</arg>
<arg>-mongodb</arg><arg>${mongoDb}</arg>
<arg>-f</arg><arg>OAF</arg>
<arg>-l</arg><arg>store</arg>
<arg>-i</arg><arg>cleaned</arg>
</java>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -1,18 +0,0 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
</configuration>

View File

@ -1,65 +0,0 @@
<workflow-app name="import regular entities as Graph (step 2)" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>migrationPathStep1</name>
<description>the base path to store hdfs file</description>
</property>
<property>
<name>migrationPathStep2</name>
<description>the temporary path to store entities before dispatching</description>
</property>
<property>
<name>isLookupUrl</name>
<description>the address of the lookUp service</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
</parameters>
<start to="ResetEntities"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="ResetEntities">
<fs>
<delete path='${migrationPathStep2}'/>
<mkdir path='${migrationPathStep2}'/>
</fs>
<ok to="GenerateEntities"/>
<error to="Kill"/>
</action>
<action name="GenerateEntities">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>GenerateEntities</name>
<class>eu.dnetlib.dhp.migration.step2.GenerateEntitiesApplication</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>-s</arg><arg>${migrationPathStep1}/db_records,${migrationPathStep1}/oaf_records,${migrationPathStep1}/odf_records</arg>
<arg>-t</arg><arg>${migrationPathStep2}/all_entities</arg>
<arg>--islookup</arg><arg>${isLookupUrl}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -1,18 +0,0 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
</configuration>

View File

@ -1,60 +0,0 @@
<workflow-app name="import regular entities as Graph (step 3)" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>migrationPathStep2</name>
<description>the temporary path to store entities before dispatching</description>
</property>
<property>
<name>migrationPathStep3</name>
<description>the graph Raw base path</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
</parameters>
<start to="ResetGraph"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="ResetGraph">
<fs>
<delete path='${migrationPathStep3}'/>
<mkdir path='${migrationPathStep3}'/>
</fs>
<ok to="GenerateGraph"/>
<error to="Kill"/>
</action>
<action name="GenerateGraph">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>GenerateGraph</name>
<class>eu.dnetlib.dhp.migration.step3.DispatchEntitiesApplication</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>-s</arg><arg>${migrationPathStep2}/all_entities</arg>
<arg>-g</arg><arg>${migrationPathStep3}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,32 @@
SELECT
o.id AS organizationid,
o.legalshortname AS legalshortname,
o.legalname AS legalname,
ARRAY[]::text[] AS alternativenames,
o.websiteurl AS websiteurl,
o.logourl AS logourl,
o.ec_legalbody AS eclegalbody,
o.ec_legalperson AS eclegalperson,
o.ec_nonprofit AS ecnonprofit,
o.ec_researchorganization AS ecresearchorganization,
o.ec_highereducation AS echighereducation,
o.ec_internationalorganizationeurinterests AS ecinternationalorganizationeurinterests,
o.ec_internationalorganization AS ecinternationalorganization,
o.ec_enterprise AS ecenterprise,
o.ec_smevalidated AS ecsmevalidated,
o.ec_nutscode AS ecnutscode,
o.dateofcollection AS dateofcollection,
o.lastupdate AS dateoftransformation,
false AS inferred,
false AS deletedbyinference,
o.trust AS trust,
'' AS inferenceprovenance,
d.id AS collectedfromid,
d.officialname AS collectedfromname,
o.country || '@@@dnet:countries' AS country,
'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction,
ARRAY[]::text[] AS pid
FROM dsm_organizations o
LEFT OUTER JOIN dsm_datasources d ON (d.id = o.collectedfrom)
WHERE o.id like 'corda__%'

View File

@ -1,30 +1,31 @@
SELECT SELECT
o.id AS organizationid, o.id AS organizationid,
o.legalshortname AS legalshortname, o.legalshortname AS legalshortname,
o.legalname AS legalname, o.legalname AS legalname,
o.websiteurl AS websiteurl, ARRAY[]::text[] AS alternativenames,
o.logourl AS logourl, o.websiteurl AS websiteurl,
o.ec_legalbody AS eclegalbody, o.logourl AS logourl,
o.ec_legalperson AS eclegalperson, o.ec_legalbody AS eclegalbody,
o.ec_nonprofit AS ecnonprofit, o.ec_legalperson AS eclegalperson,
o.ec_researchorganization AS ecresearchorganization, o.ec_nonprofit AS ecnonprofit,
o.ec_highereducation AS echighereducation, o.ec_researchorganization AS ecresearchorganization,
o.ec_internationalorganizationeurinterests AS ecinternationalorganizationeurinterests, o.ec_highereducation AS echighereducation,
o.ec_internationalorganization AS ecinternationalorganization, o.ec_internationalorganizationeurinterests AS ecinternationalorganizationeurinterests,
o.ec_enterprise AS ecenterprise, o.ec_internationalorganization AS ecinternationalorganization,
o.ec_smevalidated AS ecsmevalidated, o.ec_enterprise AS ecenterprise,
o.ec_nutscode AS ecnutscode, o.ec_smevalidated AS ecsmevalidated,
o.dateofcollection AS dateofcollection, o.ec_nutscode AS ecnutscode,
o.lastupdate AS dateoftransformation, o.dateofcollection AS dateofcollection,
false AS inferred, o.lastupdate AS dateoftransformation,
false AS deletedbyinference, false AS inferred,
o.trust AS trust, false AS deletedbyinference,
'' AS inferenceprovenance, o.trust AS trust,
d.id AS collectedfromid, '' AS inferenceprovenance,
d.officialname AS collectedfromname, d.id AS collectedfromid,
o.country || '@@@dnet:countries' AS country, d.officialname AS collectedfromname,
o.country || '@@@dnet:countries' AS country,
'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction, 'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction,
ARRAY[]::text[] AS pid ARRAY[]::text[] AS pid
FROM dsm_organizations o FROM dsm_organizations o
LEFT OUTER JOIN dsm_datasources d ON (d.id = o.collectedfrom) LEFT OUTER JOIN dsm_datasources d ON (d.id = o.collectedfrom)

View File

@ -1,19 +1,31 @@
SELECT SELECT
o.id AS organizationid, o.id AS organizationid,
coalesce((array_agg(a.acronym))[1], o.name) AS legalshortname, coalesce((array_agg(a.acronym))[1], o.name) AS legalshortname,
o.name AS legalname, o.name AS legalname,
array_agg(DISTINCT n.name) AS "alternativeNames", array_agg(DISTINCT n.name) AS alternativenames,
(array_agg(u.url))[1] AS websiteurl, (array_agg(u.url))[1] AS websiteurl,
o.modification_date AS dateoftransformation, '' AS logourl,
false AS inferred, DATE(o.creation_date) AS dateofcollection,
false AS deletedbyinference, DATE(o.modification_date) AS dateoftransformation,
0.95 AS trust, false AS ecenterprise,
'' AS inferenceprovenance, false AS echighereducation,
'openaire____::openorgs' AS collectedfromid, false AS ecinternationalorganization,
'OpenOrgs Database' AS collectedfromname, false AS ecinternationalorganizationeurinterests,
o.country || '@@@dnet:countries' AS country, false AS eclegalbody,
'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction, false AS eclegalperson,
array_agg(DISTINCT i.otherid || '###' || i.type || '@@@dnet:pid_types') AS pid false AS ecnonprofit,
false AS ecnutscode,
false AS ecresearchorganization,
false AS ecsmevalidated,
false AS inferred,
false AS deletedbyinference,
0.99 AS trust,
'' AS inferenceprovenance,
'openaire____::openorgs' AS collectedfromid,
'OpenOrgs Database' AS collectedfromname,
o.country || '@@@dnet:countries' AS country,
'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction,
array_agg(DISTINCT i.otherid || '###' || i.type || '@@@dnet:pid_types') AS pid
FROM organizations o FROM organizations o
LEFT OUTER JOIN acronyms a ON (a.id = o.id) LEFT OUTER JOIN acronyms a ON (a.id = o.id)
LEFT OUTER JOIN urls u ON (u.id = o.id) LEFT OUTER JOIN urls u ON (u.id = o.id)
@ -28,21 +40,33 @@ GROUP BY
UNION ALL UNION ALL
SELECT SELECT
'openorgsmesh'||substring(o.id, 13)||'-'||md5(n.name) AS organizationid, 'openorgsmesh'||substring(o.id, 13)||'-'||md5(n.name) AS organizationid,
n.name AS legalshortname, n.name AS legalshortname,
n.name AS legalname, n.name AS legalname,
ARRAY[]::text[] AS "alternativeNames", ARRAY[]::text[] AS alternativenames,
(array_agg(u.url))[1] AS websiteurl, (array_agg(u.url))[1] AS websiteurl,
o.modification_date AS dateoftransformation, '' AS logourl,
false AS inferred, DATE(o.creation_date) AS dateofcollection,
false AS deletedbyinference, DATE(o.modification_date) AS dateoftransformation,
0.88 AS trust, false AS ecenterprise,
'' AS inferenceprovenance, false AS echighereducation,
'openaire____::openorgs' AS collectedfromid, false AS ecinternationalorganization,
'OpenOrgs Database' AS collectedfromname, false AS ecinternationalorganizationeurinterests,
o.country || '@@@dnet:countries' AS country, false AS eclegalbody,
'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction, false AS eclegalperson,
array_agg(DISTINCT i.otherid || '###' || i.type || '@@@dnet:pid_types') AS pid false AS ecnonprofit,
false AS ecnutscode,
false AS ecresearchorganization,
false AS ecsmevalidated,
false AS inferred,
false AS deletedbyinference,
0.5 AS trust,
'' AS inferenceprovenance,
'openaire____::openorgs' AS collectedfromid,
'OpenOrgs Database' AS collectedfromname,
o.country || '@@@dnet:countries' AS country,
'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction,
array_agg(DISTINCT i.otherid || '###' || i.type || '@@@dnet:pid_types') AS pid
FROM other_names n FROM other_names n
LEFT OUTER JOIN organizations o ON (n.id = o.id) LEFT OUTER JOIN organizations o ON (n.id = o.id)
LEFT OUTER JOIN urls u ON (u.id = o.id) LEFT OUTER JOIN urls u ON (u.id = o.id)

View File

@ -1,17 +1,24 @@
SELECT local_id AS id1, oa_original_id AS id2 FROM openaire_simrels WHERE reltype = 'is_similar' SELECT
local_id AS id1,
UNION ALL oa_original_id AS id2,
'openaire____::openorgs' AS collectedfromid,
SELECT 'OpenOrgs Database' AS collectedfromname,
o.id AS id1, false AS inferred,
'openorgsmesh'||substring(o.id, 13)||'-'||md5(a.acronym) AS id2 false AS deletedbyinference,
FROM acronyms a 0.99 AS trust,
LEFT OUTER JOIN organizations o ON (a.id = o.id) '' AS inferenceprovenance
FROM oa_duplicates WHERE reltype = 'is_similar'
UNION ALL UNION ALL
SELECT SELECT
o.id AS id1, o.id AS id1,
'openorgsmesh'||substring(o.id, 13)||'-'||md5(n.name) AS id2 'openorgsmesh'||substring(o.id, 13)||'-'||md5(n.name) AS id2,
'openaire____::openorgs' AS collectedfromid,
'OpenOrgs Database' AS collectedfromname,
false AS inferred,
false AS deletedbyinference,
0.99 AS trust,
'' AS inferenceprovenance
FROM other_names n FROM other_names n
LEFT OUTER JOIN organizations o ON (n.id = o.id) LEFT OUTER JOIN organizations o ON (n.id = o.id)

View File

@ -27,6 +27,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.common.DbClient;
import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils; import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.Datasource; import eu.dnetlib.dhp.schema.oaf.Datasource;
@ -47,6 +48,9 @@ public class MigrateDbEntitiesApplicationTest {
@Mock @Mock
private VocabularyGroup vocs; private VocabularyGroup vocs;
@Mock
private DbClient dbClient;
@BeforeEach @BeforeEach
public void setUp() { public void setUp() {
lenient() lenient()
@ -59,7 +63,7 @@ public class MigrateDbEntitiesApplicationTest {
lenient().when(vocs.termExists(anyString(), anyString())).thenReturn(true); lenient().when(vocs.termExists(anyString(), anyString())).thenReturn(true);
this.app = new MigrateDbEntitiesApplication(vocs); this.app = new MigrateDbEntitiesApplication(dbClient, vocs);
} }
@Test @Test