Compare commits

...

6 Commits

Author SHA1 Message Date
Michele Artini 2c2311b062 openorgs pids 2020-07-28 16:16:20 +02:00
Michele Artini c364b329c4 merge from master 2020-07-28 14:11:39 +02:00
Michele Artini a5c9805037 merged master into branch 2020-07-22 10:12:42 +02:00
Michele Artini 94d2e7523f import simrels 2020-07-22 09:59:24 +02:00
Michele Artini 334eb2b927 import openorgs 2020-07-21 14:47:35 +02:00
Michele Artini 574870b09b openorgs db importer 2020-07-17 15:35:29 +02:00
20 changed files with 1098 additions and 843 deletions

View File

@ -69,7 +69,7 @@
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="index_es">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
@ -97,4 +97,4 @@
<end name="End"/>
</workflow-app>
</workflow-app>

View File

@ -0,0 +1,552 @@
package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.asString;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.dataInfo;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.journal;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.listFields;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.listKeyValues;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.qualifier;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DATASET_DEFAULT_RESULTTYPE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DATASOURCE_ORGANIZATION;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTIONS;
import static eu.dnetlib.dhp.schema.common.ModelConstants.ENTITYREGISTRY_PROVENANCE_ACTION;
import static eu.dnetlib.dhp.schema.common.ModelConstants.HAS_PARTICIPANT;
import static eu.dnetlib.dhp.schema.common.ModelConstants.IS_PARTICIPANT;
import static eu.dnetlib.dhp.schema.common.ModelConstants.IS_PRODUCED_BY;
import static eu.dnetlib.dhp.schema.common.ModelConstants.IS_PROVIDED_BY;
import static eu.dnetlib.dhp.schema.common.ModelConstants.IS_RELATED_TO;
import static eu.dnetlib.dhp.schema.common.ModelConstants.ORP_DEFAULT_RESULTTYPE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.OUTCOME;
import static eu.dnetlib.dhp.schema.common.ModelConstants.PARTICIPATION;
import static eu.dnetlib.dhp.schema.common.ModelConstants.PRODUCES;
import static eu.dnetlib.dhp.schema.common.ModelConstants.PROJECT_ORGANIZATION;
import static eu.dnetlib.dhp.schema.common.ModelConstants.PROVIDES;
import static eu.dnetlib.dhp.schema.common.ModelConstants.PROVISION;
import static eu.dnetlib.dhp.schema.common.ModelConstants.PUBLICATION_DEFAULT_RESULTTYPE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.RELATIONSHIP;
import static eu.dnetlib.dhp.schema.common.ModelConstants.RESULT_PROJECT;
import static eu.dnetlib.dhp.schema.common.ModelConstants.RESULT_RESULT;
import static eu.dnetlib.dhp.schema.common.ModelConstants.SOFTWARE_DEFAULT_RESULTTYPE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.USER_CLAIM;
import java.io.Closeable;
import java.io.IOException;
import java.sql.Array;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import eu.dnetlib.dhp.common.DbClient;
import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.Context;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.Journal;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.Software;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
public abstract class AbstractDbApplication extends AbstractMigrationApplication implements Closeable {
public static final String SOURCE_TYPE = "source_type";
public static final String TARGET_TYPE = "target_type";
private static final String ORG_ORG_RELTYPE = "organizationOrganization";
private static final String ORG_ORG_SUBRELTYPE = "dedup";
private static final String ORG_ORG_CLASS = "isSimilarTo";
private final DbClient dbClient;
private final long lastUpdateTimestamp;
private final VocabularyGroup vocs;
public AbstractDbApplication(
final String hdfsPath, final String dbUrl, final String dbUser, final String dbPassword,
final String isLookupUrl)
throws Exception {
super(hdfsPath);
this.dbClient = new DbClient(dbUrl, dbUser, dbPassword);
this.vocs = VocabularyGroup.loadVocsFromIS(ISLookupClientFactory.getLookUpService(isLookupUrl));
this.lastUpdateTimestamp = new Date().getTime();
}
protected AbstractDbApplication(final DbClient dbClient, final VocabularyGroup vocs) { // ONLY FOT TESTS
super();
this.dbClient = dbClient;
this.vocs = vocs;
this.lastUpdateTimestamp = new Date().getTime();
}
public void execute(final String sqlFile, final Function<ResultSet, List<Oaf>> producer)
throws Exception {
final String sql = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/sql/" + sqlFile));
final Consumer<ResultSet> consumer = rs -> producer.apply(rs).forEach(oaf -> emitOaf(oaf));
dbClient.processResults(sql, consumer);
}
public List<Oaf> processDatasource(final ResultSet rs) {
try {
final DataInfo info = prepareDataInfo(rs);
final Datasource ds = new Datasource();
ds.setId(createOpenaireId(10, rs.getString("datasourceid"), true));
ds.setOriginalId(Arrays.asList((String[]) rs.getArray("identities").getArray()));
ds
.setCollectedfrom(
listKeyValues(
createOpenaireId(10, rs.getString("collectedfromid"), true),
rs.getString("collectedfromname")));
ds.setPid(new ArrayList<>());
ds.setDateofcollection(asString(rs.getDate("dateofcollection")));
ds.setDateoftransformation(null); // Value not returned by the SQL query
ds.setExtraInfo(new ArrayList<>()); // Values not present in the DB
ds.setOaiprovenance(null); // Values not present in the DB
ds.setDatasourcetype(prepareQualifierSplitting(rs.getString("datasourcetype")));
ds.setOpenairecompatibility(prepareQualifierSplitting(rs.getString("openairecompatibility")));
ds.setOfficialname(field(rs.getString("officialname"), info));
ds.setEnglishname(field(rs.getString("englishname"), info));
ds.setWebsiteurl(field(rs.getString("websiteurl"), info));
ds.setLogourl(field(rs.getString("logourl"), info));
ds.setContactemail(field(rs.getString("contactemail"), info));
ds.setNamespaceprefix(field(rs.getString("namespaceprefix"), info));
ds.setLatitude(field(Double.toString(rs.getDouble("latitude")), info));
ds.setLongitude(field(Double.toString(rs.getDouble("longitude")), info));
ds.setDateofvalidation(field(asString(rs.getDate("dateofvalidation")), info));
ds.setDescription(field(rs.getString("description"), info));
ds.setSubjects(prepareListOfStructProps(rs.getArray("subjects"), info));
ds.setOdnumberofitems(field(Double.toString(rs.getInt("odnumberofitems")), info));
ds.setOdnumberofitemsdate(field(asString(rs.getDate("odnumberofitemsdate")), info));
ds.setOdpolicies(field(rs.getString("odpolicies"), info));
ds.setOdlanguages(prepareListFields(rs.getArray("odlanguages"), info));
ds.setOdcontenttypes(prepareListFields(rs.getArray("odcontenttypes"), info));
ds.setAccessinfopackage(prepareListFields(rs.getArray("accessinfopackage"), info));
ds.setReleasestartdate(field(asString(rs.getDate("releasestartdate")), info));
ds.setReleaseenddate(field(asString(rs.getDate("releaseenddate")), info));
ds.setMissionstatementurl(field(rs.getString("missionstatementurl"), info));
ds.setDataprovider(field(rs.getBoolean("dataprovider"), info));
ds.setServiceprovider(field(rs.getBoolean("serviceprovider"), info));
ds.setDatabaseaccesstype(field(rs.getString("databaseaccesstype"), info));
ds.setDatauploadtype(field(rs.getString("datauploadtype"), info));
ds.setDatabaseaccessrestriction(field(rs.getString("databaseaccessrestriction"), info));
ds.setDatauploadrestriction(field(rs.getString("datauploadrestriction"), info));
ds.setVersioning(field(rs.getBoolean("versioning"), info));
ds.setCitationguidelineurl(field(rs.getString("citationguidelineurl"), info));
ds.setQualitymanagementkind(field(rs.getString("qualitymanagementkind"), info));
ds.setPidsystems(field(rs.getString("pidsystems"), info));
ds.setCertificates(field(rs.getString("certificates"), info));
ds.setPolicies(new ArrayList<>()); // The sql query returns an empty array
ds
.setJournal(prepareJournal(rs.getString("officialname"), rs.getString("journal"), info)); // Journal
ds.setDataInfo(info);
ds.setLastupdatetimestamp(lastUpdateTimestamp);
return Arrays.asList(ds);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
public List<Oaf> processProject(final ResultSet rs) {
try {
final DataInfo info = prepareDataInfo(rs);
final Project p = new Project();
p.setId(createOpenaireId(40, rs.getString("projectid"), true));
p.setOriginalId(Arrays.asList(rs.getString("projectid")));
p
.setCollectedfrom(
listKeyValues(
createOpenaireId(10, rs.getString("collectedfromid"), true),
rs.getString("collectedfromname")));
p.setPid(new ArrayList<>());
p.setDateofcollection(asString(rs.getDate("dateofcollection")));
p.setDateoftransformation(asString(rs.getDate("dateoftransformation")));
p.setExtraInfo(new ArrayList<>()); // Values not present in the DB
p.setOaiprovenance(null); // Values not present in the DB
p.setWebsiteurl(field(rs.getString("websiteurl"), info));
p.setCode(field(rs.getString("code"), info));
p.setAcronym(field(rs.getString("acronym"), info));
p.setTitle(field(rs.getString("title"), info));
p.setStartdate(field(asString(rs.getDate("startdate")), info));
p.setEnddate(field(asString(rs.getDate("enddate")), info));
p.setCallidentifier(field(rs.getString("callidentifier"), info));
p.setKeywords(field(rs.getString("keywords"), info));
p.setDuration(field(Integer.toString(rs.getInt("duration")), info));
p.setEcsc39(field(Boolean.toString(rs.getBoolean("ecsc39")), info));
p
.setOamandatepublications(field(Boolean.toString(rs.getBoolean("oamandatepublications")), info));
p.setEcarticle29_3(field(Boolean.toString(rs.getBoolean("ecarticle29_3")), info));
p.setSubjects(prepareListOfStructProps(rs.getArray("subjects"), info));
p.setFundingtree(prepareListFields(rs.getArray("fundingtree"), info));
p.setContracttype(prepareQualifierSplitting(rs.getString("contracttype")));
p.setOptional1(field(rs.getString("optional1"), info));
p.setOptional2(field(rs.getString("optional2"), info));
p.setJsonextrainfo(field(rs.getString("jsonextrainfo"), info));
p.setContactfullname(field(rs.getString("contactfullname"), info));
p.setContactfax(field(rs.getString("contactfax"), info));
p.setContactphone(field(rs.getString("contactphone"), info));
p.setContactemail(field(rs.getString("contactemail"), info));
p.setSummary(field(rs.getString("summary"), info));
p.setCurrency(field(rs.getString("currency"), info));
p.setTotalcost(new Float(rs.getDouble("totalcost")));
p.setFundedamount(new Float(rs.getDouble("fundedamount")));
p.setDataInfo(info);
p.setLastupdatetimestamp(lastUpdateTimestamp);
return Arrays.asList(p);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
public List<Oaf> processOrganization(final ResultSet rs) {
try {
final DataInfo info = prepareDataInfo(rs);
final Organization o = new Organization();
o.setId(createOpenaireId(20, rs.getString("organizationid"), true));
o.setOriginalId(Arrays.asList(rs.getString("organizationid")));
o
.setCollectedfrom(
listKeyValues(
createOpenaireId(10, rs.getString("collectedfromid"), true),
rs.getString("collectedfromname")));
o.setPid(prepareListOfStructProps(rs.getArray("pid"), info));
o.setDateofcollection(asString(rs.getDate("dateofcollection")));
o.setDateoftransformation(asString(rs.getDate("dateoftransformation")));
o.setExtraInfo(new ArrayList<>()); // Values not present in the DB
o.setOaiprovenance(null); // Values not present in the DB
o.setLegalshortname(field(rs.getString("legalshortname"), info));
o.setLegalname(field(rs.getString("legalname"), info));
o.setAlternativeNames(prepareListFields(rs.getArray("alternativenames"), info));
o.setWebsiteurl(field(rs.getString("websiteurl"), info));
o.setLogourl(field(rs.getString("logourl"), info));
o.setEclegalbody(field(Boolean.toString(rs.getBoolean("eclegalbody")), info));
o.setEclegalperson(field(Boolean.toString(rs.getBoolean("eclegalperson")), info));
o.setEcnonprofit(field(Boolean.toString(rs.getBoolean("ecnonprofit")), info));
o
.setEcresearchorganization(field(Boolean.toString(rs.getBoolean("ecresearchorganization")), info));
o.setEchighereducation(field(Boolean.toString(rs.getBoolean("echighereducation")), info));
o
.setEcinternationalorganizationeurinterests(
field(Boolean.toString(rs.getBoolean("ecinternationalorganizationeurinterests")), info));
o
.setEcinternationalorganization(
field(Boolean.toString(rs.getBoolean("ecinternationalorganization")), info));
o.setEcenterprise(field(Boolean.toString(rs.getBoolean("ecenterprise")), info));
o.setEcsmevalidated(field(Boolean.toString(rs.getBoolean("ecsmevalidated")), info));
o.setEcnutscode(field(Boolean.toString(rs.getBoolean("ecnutscode")), info));
o.setCountry(prepareQualifierSplitting(rs.getString("country")));
o.setDataInfo(info);
o.setLastupdatetimestamp(lastUpdateTimestamp);
return Arrays.asList(o);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
public List<Oaf> processDatasourceOrganization(final ResultSet rs) {
try {
final DataInfo info = prepareDataInfo(rs);
final String orgId = createOpenaireId(20, rs.getString("organization"), true);
final String dsId = createOpenaireId(10, rs.getString("datasource"), true);
final List<KeyValue> collectedFrom = listKeyValues(
createOpenaireId(10, rs.getString("collectedfromid"), true), rs.getString("collectedfromname"));
final Relation r1 = new Relation();
r1.setRelType(DATASOURCE_ORGANIZATION);
r1.setSubRelType(PROVISION);
r1.setRelClass(IS_PROVIDED_BY);
r1.setSource(dsId);
r1.setTarget(orgId);
r1.setCollectedfrom(collectedFrom);
r1.setDataInfo(info);
r1.setLastupdatetimestamp(lastUpdateTimestamp);
final Relation r2 = new Relation();
r2.setRelType(DATASOURCE_ORGANIZATION);
r2.setSubRelType(PROVISION);
r2.setRelClass(PROVIDES);
r2.setSource(orgId);
r2.setTarget(dsId);
r2.setCollectedfrom(collectedFrom);
r2.setDataInfo(info);
r2.setLastupdatetimestamp(lastUpdateTimestamp);
return Arrays.asList(r1, r2);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
public List<Oaf> processProjectOrganization(final ResultSet rs) {
try {
final DataInfo info = prepareDataInfo(rs);
final String orgId = createOpenaireId(20, rs.getString("resporganization"), true);
final String projectId = createOpenaireId(40, rs.getString("project"), true);
final List<KeyValue> collectedFrom = listKeyValues(
createOpenaireId(10, rs.getString("collectedfromid"), true), rs.getString("collectedfromname"));
final Relation r1 = new Relation();
r1.setRelType(PROJECT_ORGANIZATION);
r1.setSubRelType(PARTICIPATION);
r1.setRelClass(HAS_PARTICIPANT);
r1.setSource(projectId);
r1.setTarget(orgId);
r1.setCollectedfrom(collectedFrom);
r1.setDataInfo(info);
r1.setLastupdatetimestamp(lastUpdateTimestamp);
final Relation r2 = new Relation();
r2.setRelType(PROJECT_ORGANIZATION);
r2.setSubRelType(PARTICIPATION);
r2.setRelClass(IS_PARTICIPANT);
r2.setSource(orgId);
r2.setTarget(projectId);
r2.setCollectedfrom(collectedFrom);
r2.setDataInfo(info);
r2.setLastupdatetimestamp(lastUpdateTimestamp);
return Arrays.asList(r1, r2);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
public List<Oaf> processClaims(final ResultSet rs) {
final DataInfo info = dataInfo(
false, null, false, false,
qualifier(USER_CLAIM, USER_CLAIM, DNET_PROVENANCE_ACTIONS, DNET_PROVENANCE_ACTIONS), "0.9");
final List<KeyValue> collectedFrom = listKeyValues(
createOpenaireId(10, "infrastruct_::openaire", true), "OpenAIRE");
try {
if (rs.getString(SOURCE_TYPE).equals("context")) {
final Result r;
if (rs.getString(TARGET_TYPE).equals("dataset")) {
r = new Dataset();
r.setResulttype(DATASET_DEFAULT_RESULTTYPE);
} else if (rs.getString(TARGET_TYPE).equals("software")) {
r = new Software();
r.setResulttype(SOFTWARE_DEFAULT_RESULTTYPE);
} else if (rs.getString(TARGET_TYPE).equals("other")) {
r = new OtherResearchProduct();
r.setResulttype(ORP_DEFAULT_RESULTTYPE);
} else {
r = new Publication();
r.setResulttype(PUBLICATION_DEFAULT_RESULTTYPE);
}
r.setId(createOpenaireId(50, rs.getString("target_id"), false));
r.setLastupdatetimestamp(lastUpdateTimestamp);
r.setContext(prepareContext(rs.getString("source_id"), info));
r.setDataInfo(info);
r.setCollectedfrom(collectedFrom);
return Arrays.asList(r);
} else {
final String sourceId = createOpenaireId(rs.getString(SOURCE_TYPE), rs.getString("source_id"), false);
final String targetId = createOpenaireId(rs.getString(TARGET_TYPE), rs.getString("target_id"), false);
final Relation r1 = new Relation();
final Relation r2 = new Relation();
if (rs.getString(SOURCE_TYPE).equals("project")) {
r1.setCollectedfrom(collectedFrom);
r1.setRelType(RESULT_PROJECT);
r1.setSubRelType(OUTCOME);
r1.setRelClass(PRODUCES);
r2.setCollectedfrom(collectedFrom);
r2.setRelType(RESULT_PROJECT);
r2.setSubRelType(OUTCOME);
r2.setRelClass(IS_PRODUCED_BY);
} else {
r1.setCollectedfrom(collectedFrom);
r1.setRelType(RESULT_RESULT);
r1.setSubRelType(RELATIONSHIP);
r1.setRelClass(IS_RELATED_TO);
r2.setCollectedfrom(collectedFrom);
r2.setRelType(RESULT_RESULT);
r2.setSubRelType(RELATIONSHIP);
r2.setRelClass(IS_RELATED_TO);
}
r1.setSource(sourceId);
r1.setTarget(targetId);
r1.setDataInfo(info);
r1.setLastupdatetimestamp(lastUpdateTimestamp);
r2.setSource(targetId);
r2.setTarget(sourceId);
r2.setDataInfo(info);
r2.setLastupdatetimestamp(lastUpdateTimestamp);
return Arrays.asList(r1, r2);
}
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
public List<Oaf> processOrgOrgSimRels(final ResultSet rs) {
try {
final DataInfo info = prepareDataInfo(rs); // TODO
final String orgId1 = createOpenaireId(20, rs.getString("id1"), true);
final String orgId2 = createOpenaireId(40, rs.getString("id2"), true);
final List<KeyValue> collectedFrom = listKeyValues(
createOpenaireId(10, rs.getString("collectedfromid"), true), rs.getString("collectedfromname"));
final Relation r1 = new Relation();
r1.setRelType(ORG_ORG_RELTYPE);
r1.setSubRelType(ORG_ORG_SUBRELTYPE);
r1.setRelClass(ORG_ORG_CLASS);
r1.setSource(orgId1);
r1.setTarget(orgId2);
r1.setCollectedfrom(collectedFrom);
r1.setDataInfo(info);
r1.setLastupdatetimestamp(lastUpdateTimestamp);
final Relation r2 = new Relation();
r2.setRelType(ORG_ORG_RELTYPE);
r2.setSubRelType(ORG_ORG_SUBRELTYPE);
r2.setRelClass(ORG_ORG_CLASS);
r2.setSource(orgId2);
r2.setTarget(orgId1);
r2.setCollectedfrom(collectedFrom);
r2.setDataInfo(info);
r2.setLastupdatetimestamp(lastUpdateTimestamp);
return Arrays.asList(r1, r2);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
private List<Context> prepareContext(final String id, final DataInfo dataInfo) {
final Context context = new Context();
context.setId(id);
context.setDataInfo(Arrays.asList(dataInfo));
return Arrays.asList(context);
}
private DataInfo prepareDataInfo(final ResultSet rs) throws SQLException {
final Boolean deletedbyinference = rs.getBoolean("deletedbyinference");
final String inferenceprovenance = rs.getString("inferenceprovenance");
final Boolean inferred = rs.getBoolean("inferred");
final String trust = rs.getString("trust");
return dataInfo(
deletedbyinference, inferenceprovenance, inferred, false, ENTITYREGISTRY_PROVENANCE_ACTION, trust);
}
private Qualifier prepareQualifierSplitting(final String s) {
if (StringUtils.isBlank(s)) {
return null;
}
final String[] arr = s.split("@@@");
return arr.length == 2 ? vocs.getTermAsQualifier(arr[1], arr[0]) : null;
}
private List<Field<String>> prepareListFields(final Array array, final DataInfo info) {
try {
return array != null ? listFields(info, (String[]) array.getArray()) : new ArrayList<>();
} catch (final SQLException e) {
throw new RuntimeException("Invalid SQL array", e);
}
}
private StructuredProperty prepareStructProp(final String s, final DataInfo dataInfo) {
if (StringUtils.isBlank(s)) {
return null;
}
final String[] parts = s.split("###");
if (parts.length == 2) {
final String value = parts[0];
final String[] arr = parts[1].split("@@@");
if (arr.length == 2) {
return structuredProperty(value, vocs.getTermAsQualifier(arr[1], arr[0]), dataInfo);
}
}
return null;
}
private List<StructuredProperty> prepareListOfStructProps(
final Array array,
final DataInfo dataInfo) throws SQLException {
final List<StructuredProperty> res = new ArrayList<>();
if (array != null) {
for (final String s : (String[]) array.getArray()) {
final StructuredProperty sp = prepareStructProp(s, dataInfo);
if (sp != null) {
res.add(sp);
}
}
}
return res;
}
private Journal prepareJournal(final String name, final String sj, final DataInfo info) {
if (StringUtils.isNotBlank(sj)) {
final String[] arr = sj.split("@@@");
if (arr.length == 3) {
final String issn = StringUtils.isNotBlank(arr[0]) ? arr[0].trim() : null;
final String eissn = StringUtils.isNotBlank(arr[1]) ? arr[1].trim() : null;
final String lissn = StringUtils.isNotBlank(arr[2]) ? arr[2].trim() : null;
if (issn != null || eissn != null || lissn != null) {
return journal(name, issn, eissn, eissn, null, null, null, null, null, null, null, info);
}
}
}
return null;
}
@Override
public void close() throws IOException {
super.close();
dbClient.close();
}
}

View File

@ -0,0 +1,64 @@
package eu.dnetlib.dhp.oa.graph.raw;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.DbClient;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
public class MigrateCordaOrgsApplication extends AbstractDbApplication {
private static final Logger log = LoggerFactory.getLogger(MigrateCordaOrgsApplication.class);
public static final String SOURCE_TYPE = "source_type";
public static final String TARGET_TYPE = "target_type";
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
MigrateCordaOrgsApplication.class
.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/migrate_openorgs_parameters.json")));
parser.parseArgument(args);
final String dbUrl = parser.get("postgresUrl");
log.info("postgresUrl: {}", dbUrl);
final String dbUser = parser.get("postgresUser");
log.info("postgresUser: {}", dbUser);
final String dbPassword = parser.get("postgresPassword");
log.info("postgresPassword: xxx");
final String isLookupUrl = parser.get("isLookupUrl");
log.info("isLookupUrl: {}", isLookupUrl);
final String hdfsPath = parser.get("hdfsPath");
log.info("hdfsPath: {}", hdfsPath);
try (final MigrateCordaOrgsApplication mapper = new MigrateCordaOrgsApplication(hdfsPath, dbUrl, dbUser,
dbPassword, isLookupUrl)) {
log.info("Processing CORDA orgs...");
mapper.execute("queryCordaOrganizations.sql", mapper::processOrganization);
}
log.info("All done.");
}
public MigrateCordaOrgsApplication(final String hdfsPath, final String dbUrl, final String dbUser,
final String dbPassword, final String isLookupUrl)
throws Exception {
super(hdfsPath, dbUrl, dbUser, dbPassword, isLookupUrl);
}
protected MigrateCordaOrgsApplication(final DbClient dbClient, final VocabularyGroup vocs) { // ONLY FOT TESTS
super(dbClient, vocs);
}
}

View File

@ -1,91 +1,21 @@
package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.asString;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.dataInfo;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.journal;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.listFields;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.listKeyValues;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.qualifier;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DATASET_DEFAULT_RESULTTYPE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DATASOURCE_ORGANIZATION;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTIONS;
import static eu.dnetlib.dhp.schema.common.ModelConstants.ENTITYREGISTRY_PROVENANCE_ACTION;
import static eu.dnetlib.dhp.schema.common.ModelConstants.HAS_PARTICIPANT;
import static eu.dnetlib.dhp.schema.common.ModelConstants.IS_PARTICIPANT;
import static eu.dnetlib.dhp.schema.common.ModelConstants.IS_PRODUCED_BY;
import static eu.dnetlib.dhp.schema.common.ModelConstants.IS_PROVIDED_BY;
import static eu.dnetlib.dhp.schema.common.ModelConstants.IS_RELATED_TO;
import static eu.dnetlib.dhp.schema.common.ModelConstants.ORP_DEFAULT_RESULTTYPE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.OUTCOME;
import static eu.dnetlib.dhp.schema.common.ModelConstants.PARTICIPATION;
import static eu.dnetlib.dhp.schema.common.ModelConstants.PRODUCES;
import static eu.dnetlib.dhp.schema.common.ModelConstants.PROJECT_ORGANIZATION;
import static eu.dnetlib.dhp.schema.common.ModelConstants.PROVIDES;
import static eu.dnetlib.dhp.schema.common.ModelConstants.PROVISION;
import static eu.dnetlib.dhp.schema.common.ModelConstants.PUBLICATION_DEFAULT_RESULTTYPE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.RELATIONSHIP;
import static eu.dnetlib.dhp.schema.common.ModelConstants.RESULT_PROJECT;
import static eu.dnetlib.dhp.schema.common.ModelConstants.RESULT_RESULT;
import static eu.dnetlib.dhp.schema.common.ModelConstants.SOFTWARE_DEFAULT_RESULTTYPE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.USER_CLAIM;
import java.io.Closeable;
import java.io.IOException;
import java.sql.Array;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.DbClient;
import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.Context;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.Journal;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.Software;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
public class MigrateDbEntitiesApplication extends AbstractMigrationApplication implements Closeable {
public class MigrateDbEntitiesApplication extends AbstractDbApplication {
private static final Logger log = LoggerFactory.getLogger(MigrateDbEntitiesApplication.class);
public static final String SOURCE_TYPE = "source_type";
public static final String TARGET_TYPE = "target_type";
private final DbClient dbClient;
private final long lastUpdateTimestamp;
private final VocabularyGroup vocs;
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
@ -116,459 +46,44 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
final boolean processClaims = parser.get("action") != null && parser.get("action").equalsIgnoreCase("claims");
log.info("processClaims: {}", processClaims);
try (final MigrateDbEntitiesApplication smdbe = new MigrateDbEntitiesApplication(hdfsPath, dbUrl, dbUser,
try (final MigrateDbEntitiesApplication mapper = new MigrateDbEntitiesApplication(hdfsPath, dbUrl, dbUser,
dbPassword, isLookupUrl)) {
if (processClaims) {
log.info("Processing claims...");
smdbe.execute("queryClaims.sql", smdbe::processClaims);
mapper.execute("queryClaims.sql", mapper::processClaims);
} else {
log.info("Processing datasources...");
smdbe.execute("queryDatasources.sql", smdbe::processDatasource);
mapper.execute("queryDatasources.sql", mapper::processDatasource);
log.info("Processing projects...");
if (dbSchema.equalsIgnoreCase("beta")) {
smdbe.execute("queryProjects.sql", smdbe::processProject);
mapper.execute("queryProjects.sql", mapper::processProject);
} else {
smdbe.execute("queryProjects_production.sql", smdbe::processProject);
mapper.execute("queryProjects_production.sql", mapper::processProject);
}
log.info("Processing orgs...");
smdbe.execute("queryOrganizations.sql", smdbe::processOrganization);
mapper.execute("queryOrganizations.sql", mapper::processOrganization);
log.info("Processing relationsNoRemoval ds <-> orgs ...");
smdbe.execute("queryDatasourceOrganization.sql", smdbe::processDatasourceOrganization);
mapper.execute("queryDatasourceOrganization.sql", mapper::processDatasourceOrganization);
log.info("Processing projects <-> orgs ...");
smdbe.execute("queryProjectOrganization.sql", smdbe::processProjectOrganization);
mapper.execute("queryProjectOrganization.sql", mapper::processProjectOrganization);
}
log.info("All done.");
}
}
protected MigrateDbEntitiesApplication(final VocabularyGroup vocs) { // ONLY FOR UNIT TEST
super();
this.dbClient = null;
this.lastUpdateTimestamp = new Date().getTime();
this.vocs = vocs;
}
public MigrateDbEntitiesApplication(
final String hdfsPath, final String dbUrl, final String dbUser, final String dbPassword,
final String isLookupUrl)
public MigrateDbEntitiesApplication(final String hdfsPath, final String dbUrl, final String dbUser,
final String dbPassword, final String isLookupUrl)
throws Exception {
super(hdfsPath);
this.dbClient = new DbClient(dbUrl, dbUser, dbPassword);
this.lastUpdateTimestamp = new Date().getTime();
this.vocs = VocabularyGroup.loadVocsFromIS(ISLookupClientFactory.getLookUpService(isLookupUrl));
super(hdfsPath, dbUrl, dbUser, dbPassword, isLookupUrl);
}
public void execute(final String sqlFile, final Function<ResultSet, List<Oaf>> producer)
throws Exception {
final String sql = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/sql/" + sqlFile));
final Consumer<ResultSet> consumer = rs -> producer.apply(rs).forEach(oaf -> emitOaf(oaf));
dbClient.processResults(sql, consumer);
}
public List<Oaf> processDatasource(final ResultSet rs) {
try {
final DataInfo info = prepareDataInfo(rs);
final Datasource ds = new Datasource();
ds.setId(createOpenaireId(10, rs.getString("datasourceid"), true));
ds.setOriginalId(Arrays.asList((String[]) rs.getArray("identities").getArray()));
ds
.setCollectedfrom(
listKeyValues(
createOpenaireId(10, rs.getString("collectedfromid"), true),
rs.getString("collectedfromname")));
ds.setPid(new ArrayList<>());
ds.setDateofcollection(asString(rs.getDate("dateofcollection")));
ds.setDateoftransformation(null); // Value not returned by the SQL query
ds.setExtraInfo(new ArrayList<>()); // Values not present in the DB
ds.setOaiprovenance(null); // Values not present in the DB
ds.setDatasourcetype(prepareQualifierSplitting(rs.getString("datasourcetype")));
ds.setOpenairecompatibility(prepareQualifierSplitting(rs.getString("openairecompatibility")));
ds.setOfficialname(field(rs.getString("officialname"), info));
ds.setEnglishname(field(rs.getString("englishname"), info));
ds.setWebsiteurl(field(rs.getString("websiteurl"), info));
ds.setLogourl(field(rs.getString("logourl"), info));
ds.setContactemail(field(rs.getString("contactemail"), info));
ds.setNamespaceprefix(field(rs.getString("namespaceprefix"), info));
ds.setLatitude(field(Double.toString(rs.getDouble("latitude")), info));
ds.setLongitude(field(Double.toString(rs.getDouble("longitude")), info));
ds.setDateofvalidation(field(asString(rs.getDate("dateofvalidation")), info));
ds.setDescription(field(rs.getString("description"), info));
ds.setSubjects(prepareListOfStructProps(rs.getArray("subjects"), info));
ds.setOdnumberofitems(field(Double.toString(rs.getInt("odnumberofitems")), info));
ds.setOdnumberofitemsdate(field(asString(rs.getDate("odnumberofitemsdate")), info));
ds.setOdpolicies(field(rs.getString("odpolicies"), info));
ds.setOdlanguages(prepareListFields(rs.getArray("odlanguages"), info));
ds.setOdcontenttypes(prepareListFields(rs.getArray("odcontenttypes"), info));
ds.setAccessinfopackage(prepareListFields(rs.getArray("accessinfopackage"), info));
ds.setReleasestartdate(field(asString(rs.getDate("releasestartdate")), info));
ds.setReleaseenddate(field(asString(rs.getDate("releaseenddate")), info));
ds.setMissionstatementurl(field(rs.getString("missionstatementurl"), info));
ds.setDataprovider(field(rs.getBoolean("dataprovider"), info));
ds.setServiceprovider(field(rs.getBoolean("serviceprovider"), info));
ds.setDatabaseaccesstype(field(rs.getString("databaseaccesstype"), info));
ds.setDatauploadtype(field(rs.getString("datauploadtype"), info));
ds.setDatabaseaccessrestriction(field(rs.getString("databaseaccessrestriction"), info));
ds.setDatauploadrestriction(field(rs.getString("datauploadrestriction"), info));
ds.setVersioning(field(rs.getBoolean("versioning"), info));
ds.setCitationguidelineurl(field(rs.getString("citationguidelineurl"), info));
ds.setQualitymanagementkind(field(rs.getString("qualitymanagementkind"), info));
ds.setPidsystems(field(rs.getString("pidsystems"), info));
ds.setCertificates(field(rs.getString("certificates"), info));
ds.setPolicies(new ArrayList<>()); // The sql query returns an empty array
ds
.setJournal(prepareJournal(rs.getString("officialname"), rs.getString("journal"), info)); // Journal
ds.setDataInfo(info);
ds.setLastupdatetimestamp(lastUpdateTimestamp);
return Arrays.asList(ds);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
public List<Oaf> processProject(final ResultSet rs) {
try {
final DataInfo info = prepareDataInfo(rs);
final Project p = new Project();
p.setId(createOpenaireId(40, rs.getString("projectid"), true));
p.setOriginalId(Arrays.asList(rs.getString("projectid")));
p
.setCollectedfrom(
listKeyValues(
createOpenaireId(10, rs.getString("collectedfromid"), true),
rs.getString("collectedfromname")));
p.setPid(new ArrayList<>());
p.setDateofcollection(asString(rs.getDate("dateofcollection")));
p.setDateoftransformation(asString(rs.getDate("dateoftransformation")));
p.setExtraInfo(new ArrayList<>()); // Values not present in the DB
p.setOaiprovenance(null); // Values not present in the DB
p.setWebsiteurl(field(rs.getString("websiteurl"), info));
p.setCode(field(rs.getString("code"), info));
p.setAcronym(field(rs.getString("acronym"), info));
p.setTitle(field(rs.getString("title"), info));
p.setStartdate(field(asString(rs.getDate("startdate")), info));
p.setEnddate(field(asString(rs.getDate("enddate")), info));
p.setCallidentifier(field(rs.getString("callidentifier"), info));
p.setKeywords(field(rs.getString("keywords"), info));
p.setDuration(field(Integer.toString(rs.getInt("duration")), info));
p.setEcsc39(field(Boolean.toString(rs.getBoolean("ecsc39")), info));
p
.setOamandatepublications(field(Boolean.toString(rs.getBoolean("oamandatepublications")), info));
p.setEcarticle29_3(field(Boolean.toString(rs.getBoolean("ecarticle29_3")), info));
p.setSubjects(prepareListOfStructProps(rs.getArray("subjects"), info));
p.setFundingtree(prepareListFields(rs.getArray("fundingtree"), info));
p.setContracttype(prepareQualifierSplitting(rs.getString("contracttype")));
p.setOptional1(field(rs.getString("optional1"), info));
p.setOptional2(field(rs.getString("optional2"), info));
p.setJsonextrainfo(field(rs.getString("jsonextrainfo"), info));
p.setContactfullname(field(rs.getString("contactfullname"), info));
p.setContactfax(field(rs.getString("contactfax"), info));
p.setContactphone(field(rs.getString("contactphone"), info));
p.setContactemail(field(rs.getString("contactemail"), info));
p.setSummary(field(rs.getString("summary"), info));
p.setCurrency(field(rs.getString("currency"), info));
p.setTotalcost(new Float(rs.getDouble("totalcost")));
p.setFundedamount(new Float(rs.getDouble("fundedamount")));
p.setDataInfo(info);
p.setLastupdatetimestamp(lastUpdateTimestamp);
return Arrays.asList(p);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
public List<Oaf> processOrganization(final ResultSet rs) {
try {
final DataInfo info = prepareDataInfo(rs);
final Organization o = new Organization();
o.setId(createOpenaireId(20, rs.getString("organizationid"), true));
o.setOriginalId(Arrays.asList(rs.getString("organizationid")));
o
.setCollectedfrom(
listKeyValues(
createOpenaireId(10, rs.getString("collectedfromid"), true),
rs.getString("collectedfromname")));
o.setPid(new ArrayList<>());
o.setDateofcollection(asString(rs.getDate("dateofcollection")));
o.setDateoftransformation(asString(rs.getDate("dateoftransformation")));
o.setExtraInfo(new ArrayList<>()); // Values not present in the DB
o.setOaiprovenance(null); // Values not present in the DB
o.setLegalshortname(field(rs.getString("legalshortname"), info));
o.setLegalname(field(rs.getString("legalname"), info));
o.setAlternativeNames(new ArrayList<>()); // Values not returned by the SQL query
o.setWebsiteurl(field(rs.getString("websiteurl"), info));
o.setLogourl(field(rs.getString("logourl"), info));
o.setEclegalbody(field(Boolean.toString(rs.getBoolean("eclegalbody")), info));
o.setEclegalperson(field(Boolean.toString(rs.getBoolean("eclegalperson")), info));
o.setEcnonprofit(field(Boolean.toString(rs.getBoolean("ecnonprofit")), info));
o
.setEcresearchorganization(field(Boolean.toString(rs.getBoolean("ecresearchorganization")), info));
o.setEchighereducation(field(Boolean.toString(rs.getBoolean("echighereducation")), info));
o
.setEcinternationalorganizationeurinterests(
field(Boolean.toString(rs.getBoolean("ecinternationalorganizationeurinterests")), info));
o
.setEcinternationalorganization(
field(Boolean.toString(rs.getBoolean("ecinternationalorganization")), info));
o.setEcenterprise(field(Boolean.toString(rs.getBoolean("ecenterprise")), info));
o.setEcsmevalidated(field(Boolean.toString(rs.getBoolean("ecsmevalidated")), info));
o.setEcnutscode(field(Boolean.toString(rs.getBoolean("ecnutscode")), info));
o.setCountry(prepareQualifierSplitting(rs.getString("country")));
o.setDataInfo(info);
o.setLastupdatetimestamp(lastUpdateTimestamp);
return Arrays.asList(o);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
public List<Oaf> processDatasourceOrganization(final ResultSet rs) {
try {
final DataInfo info = prepareDataInfo(rs);
final String orgId = createOpenaireId(20, rs.getString("organization"), true);
final String dsId = createOpenaireId(10, rs.getString("datasource"), true);
final List<KeyValue> collectedFrom = listKeyValues(
createOpenaireId(10, rs.getString("collectedfromid"), true), rs.getString("collectedfromname"));
final Relation r1 = new Relation();
r1.setRelType(DATASOURCE_ORGANIZATION);
r1.setSubRelType(PROVISION);
r1.setRelClass(IS_PROVIDED_BY);
r1.setSource(dsId);
r1.setTarget(orgId);
r1.setCollectedfrom(collectedFrom);
r1.setDataInfo(info);
r1.setLastupdatetimestamp(lastUpdateTimestamp);
final Relation r2 = new Relation();
r2.setRelType(DATASOURCE_ORGANIZATION);
r2.setSubRelType(PROVISION);
r2.setRelClass(PROVIDES);
r2.setSource(orgId);
r2.setTarget(dsId);
r2.setCollectedfrom(collectedFrom);
r2.setDataInfo(info);
r2.setLastupdatetimestamp(lastUpdateTimestamp);
return Arrays.asList(r1, r2);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
public List<Oaf> processProjectOrganization(final ResultSet rs) {
try {
final DataInfo info = prepareDataInfo(rs);
final String orgId = createOpenaireId(20, rs.getString("resporganization"), true);
final String projectId = createOpenaireId(40, rs.getString("project"), true);
final List<KeyValue> collectedFrom = listKeyValues(
createOpenaireId(10, rs.getString("collectedfromid"), true), rs.getString("collectedfromname"));
final Relation r1 = new Relation();
r1.setRelType(PROJECT_ORGANIZATION);
r1.setSubRelType(PARTICIPATION);
r1.setRelClass(HAS_PARTICIPANT);
r1.setSource(projectId);
r1.setTarget(orgId);
r1.setCollectedfrom(collectedFrom);
r1.setDataInfo(info);
r1.setLastupdatetimestamp(lastUpdateTimestamp);
final Relation r2 = new Relation();
r2.setRelType(PROJECT_ORGANIZATION);
r2.setSubRelType(PARTICIPATION);
r2.setRelClass(IS_PARTICIPANT);
r2.setSource(orgId);
r2.setTarget(projectId);
r2.setCollectedfrom(collectedFrom);
r2.setDataInfo(info);
r2.setLastupdatetimestamp(lastUpdateTimestamp);
return Arrays.asList(r1, r2);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
public List<Oaf> processClaims(final ResultSet rs) {
final DataInfo info = dataInfo(
false, null, false, false,
qualifier(USER_CLAIM, USER_CLAIM, DNET_PROVENANCE_ACTIONS, DNET_PROVENANCE_ACTIONS), "0.9");
final List<KeyValue> collectedFrom = listKeyValues(
createOpenaireId(10, "infrastruct_::openaire", true), "OpenAIRE");
try {
if (rs.getString(SOURCE_TYPE).equals("context")) {
final Result r;
if (rs.getString(TARGET_TYPE).equals("dataset")) {
r = new Dataset();
r.setResulttype(DATASET_DEFAULT_RESULTTYPE);
} else if (rs.getString(TARGET_TYPE).equals("software")) {
r = new Software();
r.setResulttype(SOFTWARE_DEFAULT_RESULTTYPE);
} else if (rs.getString(TARGET_TYPE).equals("other")) {
r = new OtherResearchProduct();
r.setResulttype(ORP_DEFAULT_RESULTTYPE);
} else {
r = new Publication();
r.setResulttype(PUBLICATION_DEFAULT_RESULTTYPE);
}
r.setId(createOpenaireId(50, rs.getString("target_id"), false));
r.setLastupdatetimestamp(lastUpdateTimestamp);
r.setContext(prepareContext(rs.getString("source_id"), info));
r.setDataInfo(info);
r.setCollectedfrom(collectedFrom);
return Arrays.asList(r);
} else {
final String sourceId = createOpenaireId(rs.getString(SOURCE_TYPE), rs.getString("source_id"), false);
final String targetId = createOpenaireId(rs.getString(TARGET_TYPE), rs.getString("target_id"), false);
final Relation r1 = new Relation();
final Relation r2 = new Relation();
if (rs.getString(SOURCE_TYPE).equals("project")) {
r1.setCollectedfrom(collectedFrom);
r1.setRelType(RESULT_PROJECT);
r1.setSubRelType(OUTCOME);
r1.setRelClass(PRODUCES);
r2.setCollectedfrom(collectedFrom);
r2.setRelType(RESULT_PROJECT);
r2.setSubRelType(OUTCOME);
r2.setRelClass(IS_PRODUCED_BY);
} else {
r1.setCollectedfrom(collectedFrom);
r1.setRelType(RESULT_RESULT);
r1.setSubRelType(RELATIONSHIP);
r1.setRelClass(IS_RELATED_TO);
r2.setCollectedfrom(collectedFrom);
r2.setRelType(RESULT_RESULT);
r2.setSubRelType(RELATIONSHIP);
r2.setRelClass(IS_RELATED_TO);
}
r1.setSource(sourceId);
r1.setTarget(targetId);
r1.setDataInfo(info);
r1.setLastupdatetimestamp(lastUpdateTimestamp);
r2.setSource(targetId);
r2.setTarget(sourceId);
r2.setDataInfo(info);
r2.setLastupdatetimestamp(lastUpdateTimestamp);
return Arrays.asList(r1, r2);
}
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
private List<Context> prepareContext(final String id, final DataInfo dataInfo) {
final Context context = new Context();
context.setId(id);
context.setDataInfo(Arrays.asList(dataInfo));
return Arrays.asList(context);
}
private DataInfo prepareDataInfo(final ResultSet rs) throws SQLException {
final Boolean deletedbyinference = rs.getBoolean("deletedbyinference");
final String inferenceprovenance = rs.getString("inferenceprovenance");
final Boolean inferred = rs.getBoolean("inferred");
final String trust = rs.getString("trust");
return dataInfo(
deletedbyinference, inferenceprovenance, inferred, false, ENTITYREGISTRY_PROVENANCE_ACTION, trust);
}
private Qualifier prepareQualifierSplitting(final String s) {
if (StringUtils.isBlank(s)) {
return null;
}
final String[] arr = s.split("@@@");
return arr.length == 2 ? vocs.getTermAsQualifier(arr[1], arr[0]) : null;
}
private List<Field<String>> prepareListFields(final Array array, final DataInfo info) {
try {
return array != null ? listFields(info, (String[]) array.getArray()) : new ArrayList<>();
} catch (final SQLException e) {
throw new RuntimeException("Invalid SQL array", e);
}
}
private StructuredProperty prepareStructProp(final String s, final DataInfo dataInfo) {
if (StringUtils.isBlank(s)) {
return null;
}
final String[] parts = s.split("###");
if (parts.length == 2) {
final String value = parts[0];
final String[] arr = parts[1].split("@@@");
if (arr.length == 2) {
return structuredProperty(value, vocs.getTermAsQualifier(arr[1], arr[0]), dataInfo);
}
}
return null;
}
private List<StructuredProperty> prepareListOfStructProps(
final Array array,
final DataInfo dataInfo) throws SQLException {
final List<StructuredProperty> res = new ArrayList<>();
if (array != null) {
for (final String s : (String[]) array.getArray()) {
final StructuredProperty sp = prepareStructProp(s, dataInfo);
if (sp != null) {
res.add(sp);
}
}
}
return res;
}
private Journal prepareJournal(final String name, final String sj, final DataInfo info) {
if (StringUtils.isNotBlank(sj)) {
final String[] arr = sj.split("@@@");
if (arr.length == 3) {
final String issn = StringUtils.isNotBlank(arr[0]) ? arr[0].trim() : null;
final String eissn = StringUtils.isNotBlank(arr[1]) ? arr[1].trim() : null;
final String lissn = StringUtils.isNotBlank(arr[2]) ? arr[2].trim() : null;
if (issn != null || eissn != null || lissn != null) {
return journal(name, issn, eissn, eissn, null, null, null, null, null, null, null, info);
}
}
}
return null;
}
@Override
public void close() throws IOException {
super.close();
dbClient.close();
protected MigrateDbEntitiesApplication(final DbClient dbClient, final VocabularyGroup vocs) { // ONLY FOT TESTS
super(dbClient, vocs);
}
}

View File

@ -0,0 +1,61 @@
package eu.dnetlib.dhp.oa.graph.raw;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
public class MigrateOpenOrgsApplication extends AbstractDbApplication {
private static final Logger log = LoggerFactory.getLogger(MigrateOpenOrgsApplication.class);
public static final String SOURCE_TYPE = "source_type";
public static final String TARGET_TYPE = "target_type";
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
MigrateOpenOrgsApplication.class
.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/migrate_openorgs_parameters.json")));
parser.parseArgument(args);
final String dbUrl = parser.get("postgresUrl");
log.info("postgresUrl: {}", dbUrl);
final String dbUser = parser.get("postgresUser");
log.info("postgresUser: {}", dbUser);
final String dbPassword = parser.get("postgresPassword");
log.info("postgresPassword: xxx");
final String isLookupUrl = parser.get("isLookupUrl");
log.info("isLookupUrl: {}", isLookupUrl);
final String hdfsPath = parser.get("hdfsPath");
log.info("hdfsPath: {}", hdfsPath);
try (final MigrateOpenOrgsApplication mapper = new MigrateOpenOrgsApplication(hdfsPath, dbUrl, dbUser,
dbPassword, isLookupUrl)) {
log.info("Processing open orgs...");
mapper.execute("queryOrganizationsFromOpenOrgsDB.sql", mapper::processOrganization);
log.info("Processing simrels...");
mapper.execute("querySimilarityFromOpenOrgsDB.sql", mapper::processOrgOrgSimRels);
log.info("All done.");
}
}
public MigrateOpenOrgsApplication(final String hdfsPath, final String dbUrl, final String dbUser,
final String dbPassword, final String isLookupUrl)
throws Exception {
super(hdfsPath, dbUrl, dbUser, dbPassword, isLookupUrl);
}
}

View File

@ -39,10 +39,9 @@ public class AbstractMigrationApplication implements Closeable {
this.writer = SequenceFile
.createWriter(
getConf(),
SequenceFile.Writer.file(new Path(hdfsPath)),
SequenceFile.Writer.keyClass(Text.class),
SequenceFile.Writer.valueClass(Text.class));
getConf(), SequenceFile.Writer.file(new Path(hdfsPath)), SequenceFile.Writer.keyClass(Text.class),
SequenceFile.Writer
.valueClass(Text.class));
}
private Configuration getConf() throws IOException {

View File

@ -0,0 +1,32 @@
[
{
"paramName": "p",
"paramLongName": "hdfsPath",
"paramDescription": "the path where storing the sequential file",
"paramRequired": true
},
{
"paramName": "pgurl",
"paramLongName": "postgresUrl",
"paramDescription": "postgres url, example: jdbc:postgresql://localhost:5432/testdb",
"paramRequired": true
},
{
"paramName": "pguser",
"paramLongName": "postgresUser",
"paramDescription": "postgres user",
"paramRequired": false
},
{
"paramName": "pgpasswd",
"paramLongName": "postgresPassword",
"paramDescription": "postgres password",
"paramRequired": false
},
{
"paramName": "isu",
"paramLongName": "isLookupUrl",
"paramDescription": "the url of the ISLookupService",
"paramRequired": true
}
]

View File

@ -25,6 +25,18 @@
<property>
<name>postgresPassword</name>
<description>the password postgres</description>
</property>
<property>
<name>postgresOpenOrgsURL</name>
<description>the postgres URL to access to the OpenOrgs database</description>
</property>
<property>
<name>postgresOpenOrgsUser</name>
<description>the user of OpenOrgs database</description>
</property>
<property>
<name>postgresOpenOrgsPassword</name>
<description>the password of OpenOrgs database</description>
</property>
<property>
<name>dbSchema</name>
@ -116,8 +128,25 @@
<fork name="start_import">
<path start="ImportDB"/>
<path start="ImportDB_claims"/>
<path start="ImportDB_openorgs"/>
</fork>
<action name="ImportDB_openorgs">
<java>
<prepare>
<delete path="${contentPath}/db_openorgs"/>
</prepare>
<main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateOpenOrgsApplication</main-class>
<arg>--hdfsPath</arg><arg>${contentPath}/db_openorgs</arg>
<arg>--postgresUrl</arg><arg>${postgresOpenOrgsURL}</arg>
<arg>--postgresUser</arg><arg>${postgresOpenOrgsUser}</arg>
<arg>--postgresPassword</arg><arg>${postgresOpenOrgsPassword}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
</java>
<ok to="wait_import"/>
<error to="Kill"/>
</action>
<action name="ImportDB_claims">
<java>
<prepare>
@ -308,7 +337,7 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePaths</arg><arg>${contentPath}/db_records,${contentPath}/oaf_records,${contentPath}/odf_records</arg>
<arg>--sourcePaths</arg><arg>${contentPath}/db_openorgs,${contentPath}/db_records,${contentPath}/oaf_records,${contentPath}/odf_records</arg>
<arg>--targetPath</arg><arg>${workingDir}/entities</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
</spark>

View File

@ -0,0 +1,204 @@
<workflow-app name="create RAW Graph (openorgs and cordaorgs)" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>graphOutputPath</name>
<description>the target path to store raw graph</description>
</property>
<property>
<name>contentPath</name>
<description>path location to store (or reuse) content from the aggregator</description>
</property>
<property>
<name>postgresURL</name>
<description>the postgres URL to access to the database</description>
</property>
<property>
<name>postgresUser</name>
<description>the user postgres</description>
</property>
<property>
<name>postgresPassword</name>
<description>the password postgres</description>
</property>
<property>
<name>postgresOpenOrgsURL</name>
<description>the postgres URL to access to the OpenOrgs database</description>
</property>
<property>
<name>postgresOpenOrgsUser</name>
<description>the user of OpenOrgs database</description>
</property>
<property>
<name>postgresOpenOrgsPassword</name>
<description>the password of OpenOrgs database</description>
</property>
<property>
<name>dbSchema</name>
<value>beta</value>
<description>the database schema according to the D-Net infrastructure (beta or production)</description>
</property>
<property>
<name>mongoURL</name>
<description>mongoDB url, example: mongodb://[username:password@]host[:port]</description>
</property>
<property>
<name>mongoDb</name>
<description>mongo database</description>
</property>
<property>
<name>isLookupUrl</name>
<description>the address of the lookUp service</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="start_import"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<fork name="start_import">
<path start="ImportDB_corda_orgs"/>
<path start="ImportDB_openorgs"/>
</fork>
<action name="ImportDB_openorgs">
<java>
<prepare>
<delete path="${contentPath}/db_openorgs"/>
</prepare>
<main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateOpenOrgsApplication</main-class>
<arg>--hdfsPath</arg><arg>${contentPath}/db_openorgs</arg>
<arg>--postgresUrl</arg><arg>${postgresOpenOrgsURL}</arg>
<arg>--postgresUser</arg><arg>${postgresOpenOrgsUser}</arg>
<arg>--postgresPassword</arg><arg>${postgresOpenOrgsPassword}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
</java>
<ok to="wait_import"/>
<error to="Kill"/>
</action>
<action name="ImportDB_corda_orgs">
<java>
<prepare>
<delete path="${contentPath}/db_records"/>
</prepare>
<main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateCordaOrgsApplication</main-class>
<arg>--hdfsPath</arg><arg>${contentPath}/db_cordaorgs</arg>
<arg>--postgresUrl</arg><arg>${postgresURL}</arg>
<arg>--postgresUser</arg><arg>${postgresUser}</arg>
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
</java>
<ok to="wait_import"/>
<error to="Kill"/>
</action>
<join name="wait_import" to="GenerateEntities"/>
<action name="GenerateEntities">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>GenerateEntities</name>
<class>eu.dnetlib.dhp.oa.graph.raw.GenerateEntitiesApplication</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePaths</arg><arg>${contentPath}/db_openorgs,${contentPath}/db_cordaorgs</arg>
<arg>--targetPath</arg><arg>${workingDir}/entities</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
</spark>
<ok to="GenerateGraph"/>
<error to="Kill"/>
</action>
<action name="GenerateGraph">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>GenerateGraph</name>
<class>eu.dnetlib.dhp.oa.graph.raw.DispatchEntitiesApplication</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/entities</arg>
<arg>--graphRawPath</arg><arg>${graphOutputPath}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -1,108 +0,0 @@
<workflow-app name="import regular entities as Graph (step 1)" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>migrationPathStep1</name>
<description>the base path to store hdfs file</description>
</property>
<property>
<name>postgresURL</name>
<description>the postgres URL to access to the database</description>
</property>
<property>
<name>postgresUser</name>
<description>the user postgres</description>
</property>
<property>
<name>postgresPassword</name>
<description>the password postgres</description>
</property>
<property>
<name>mongoURL</name>
<description>mongoDB url, example: mongodb://[username:password@]host[:port]</description>
</property>
<property>
<name>mongoDb</name>
<description>mongo database</description>
</property>
<property>
<name>isLookupUrl</name>
<description>the address of the lookUp service</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
</parameters>
<start to="ResetWorkingPath"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="ResetWorkingPath">
<fs>
<delete path='${migrationPathStep1}'/>
<mkdir path='${migrationPathStep1}'/>
</fs>
<ok to="ImportDB"/>
<error to="Kill"/>
</action>
<action name="ImportDB">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.dhp.migration.step1.MigrateDbEntitiesApplication</main-class>
<arg>-p</arg><arg>${migrationPathStep1}/db_records</arg>
<arg>-pgurl</arg><arg>${postgresURL}</arg>
<arg>-pguser</arg><arg>${postgresUser}</arg>
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
<arg>-islookup</arg><arg>${isLookupUrl}</arg>
</java>
<ok to="ImportODF"/>
<error to="Kill"/>
</action>
<action name="ImportODF">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication</main-class>
<arg>-p</arg><arg>${migrationPathStep1}/odf_records</arg>
<arg>-mongourl</arg><arg>${mongoURL}</arg>
<arg>-mongodb</arg><arg>${mongoDb}</arg>
<arg>-f</arg><arg>ODF</arg>
<arg>-l</arg><arg>store</arg>
<arg>-i</arg><arg>cleaned</arg>
</java>
<ok to="ImportOAF"/>
<error to="Kill"/>
</action>
<action name="ImportOAF">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication</main-class>
<arg>-p</arg><arg>${migrationPathStep1}/oaf_records</arg>
<arg>-mongourl</arg><arg>${mongoURL}</arg>
<arg>-mongodb</arg><arg>${mongoDb}</arg>
<arg>-f</arg><arg>OAF</arg>
<arg>-l</arg><arg>store</arg>
<arg>-i</arg><arg>cleaned</arg>
</java>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -1,18 +0,0 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
</configuration>

View File

@ -1,65 +0,0 @@
<workflow-app name="import regular entities as Graph (step 2)" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>migrationPathStep1</name>
<description>the base path to store hdfs file</description>
</property>
<property>
<name>migrationPathStep2</name>
<description>the temporary path to store entities before dispatching</description>
</property>
<property>
<name>isLookupUrl</name>
<description>the address of the lookUp service</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
</parameters>
<start to="ResetEntities"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="ResetEntities">
<fs>
<delete path='${migrationPathStep2}'/>
<mkdir path='${migrationPathStep2}'/>
</fs>
<ok to="GenerateEntities"/>
<error to="Kill"/>
</action>
<action name="GenerateEntities">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>GenerateEntities</name>
<class>eu.dnetlib.dhp.migration.step2.GenerateEntitiesApplication</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>-s</arg><arg>${migrationPathStep1}/db_records,${migrationPathStep1}/oaf_records,${migrationPathStep1}/odf_records</arg>
<arg>-t</arg><arg>${migrationPathStep2}/all_entities</arg>
<arg>--islookup</arg><arg>${isLookupUrl}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -1,18 +0,0 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
</configuration>

View File

@ -1,60 +0,0 @@
<workflow-app name="import regular entities as Graph (step 3)" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>migrationPathStep2</name>
<description>the temporary path to store entities before dispatching</description>
</property>
<property>
<name>migrationPathStep3</name>
<description>the graph Raw base path</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
</parameters>
<start to="ResetGraph"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="ResetGraph">
<fs>
<delete path='${migrationPathStep3}'/>
<mkdir path='${migrationPathStep3}'/>
</fs>
<ok to="GenerateGraph"/>
<error to="Kill"/>
</action>
<action name="GenerateGraph">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>GenerateGraph</name>
<class>eu.dnetlib.dhp.migration.step3.DispatchEntitiesApplication</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>-s</arg><arg>${migrationPathStep2}/all_entities</arg>
<arg>-g</arg><arg>${migrationPathStep3}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,32 @@
SELECT
o.id AS organizationid,
o.legalshortname AS legalshortname,
o.legalname AS legalname,
ARRAY[]::text[] AS alternativenames,
o.websiteurl AS websiteurl,
o.logourl AS logourl,
o.ec_legalbody AS eclegalbody,
o.ec_legalperson AS eclegalperson,
o.ec_nonprofit AS ecnonprofit,
o.ec_researchorganization AS ecresearchorganization,
o.ec_highereducation AS echighereducation,
o.ec_internationalorganizationeurinterests AS ecinternationalorganizationeurinterests,
o.ec_internationalorganization AS ecinternationalorganization,
o.ec_enterprise AS ecenterprise,
o.ec_smevalidated AS ecsmevalidated,
o.ec_nutscode AS ecnutscode,
o.dateofcollection AS dateofcollection,
o.lastupdate AS dateoftransformation,
false AS inferred,
false AS deletedbyinference,
o.trust AS trust,
'' AS inferenceprovenance,
d.id AS collectedfromid,
d.officialname AS collectedfromname,
o.country || '@@@dnet:countries' AS country,
'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction,
ARRAY[]::text[] AS pid
FROM dsm_organizations o
LEFT OUTER JOIN dsm_datasources d ON (d.id = o.collectedfrom)
WHERE o.id like 'corda__%'

View File

@ -1,30 +1,31 @@
SELECT
o.id AS organizationid,
o.legalshortname AS legalshortname,
o.legalname AS legalname,
o.websiteurl AS websiteurl,
o.logourl AS logourl,
o.ec_legalbody AS eclegalbody,
o.ec_legalperson AS eclegalperson,
o.ec_nonprofit AS ecnonprofit,
o.ec_researchorganization AS ecresearchorganization,
o.ec_highereducation AS echighereducation,
o.ec_internationalorganizationeurinterests AS ecinternationalorganizationeurinterests,
o.ec_internationalorganization AS ecinternationalorganization,
o.ec_enterprise AS ecenterprise,
o.ec_smevalidated AS ecsmevalidated,
o.ec_nutscode AS ecnutscode,
o.dateofcollection AS dateofcollection,
o.lastupdate AS dateoftransformation,
false AS inferred,
false AS deletedbyinference,
o.trust AS trust,
'' AS inferenceprovenance,
d.id AS collectedfromid,
d.officialname AS collectedfromname,
o.country || '@@@dnet:countries' AS country,
o.id AS organizationid,
o.legalshortname AS legalshortname,
o.legalname AS legalname,
ARRAY[]::text[] AS alternativenames,
o.websiteurl AS websiteurl,
o.logourl AS logourl,
o.ec_legalbody AS eclegalbody,
o.ec_legalperson AS eclegalperson,
o.ec_nonprofit AS ecnonprofit,
o.ec_researchorganization AS ecresearchorganization,
o.ec_highereducation AS echighereducation,
o.ec_internationalorganizationeurinterests AS ecinternationalorganizationeurinterests,
o.ec_internationalorganization AS ecinternationalorganization,
o.ec_enterprise AS ecenterprise,
o.ec_smevalidated AS ecsmevalidated,
o.ec_nutscode AS ecnutscode,
o.dateofcollection AS dateofcollection,
o.lastupdate AS dateoftransformation,
false AS inferred,
false AS deletedbyinference,
o.trust AS trust,
'' AS inferenceprovenance,
d.id AS collectedfromid,
d.officialname AS collectedfromname,
o.country || '@@@dnet:countries' AS country,
'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction,
ARRAY[]::text[] AS pid
ARRAY[]::text[] AS pid
FROM dsm_organizations o
LEFT OUTER JOIN dsm_datasources d ON (d.id = o.collectedfrom)

View File

@ -1,19 +1,31 @@
SELECT
o.id AS organizationid,
coalesce((array_agg(a.acronym))[1], o.name) AS legalshortname,
o.name AS legalname,
array_agg(DISTINCT n.name) AS "alternativeNames",
(array_agg(u.url))[1] AS websiteurl,
o.modification_date AS dateoftransformation,
false AS inferred,
false AS deletedbyinference,
0.95 AS trust,
'' AS inferenceprovenance,
'openaire____::openorgs' AS collectedfromid,
'OpenOrgs Database' AS collectedfromname,
o.country || '@@@dnet:countries' AS country,
'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction,
array_agg(DISTINCT i.otherid || '###' || i.type || '@@@dnet:pid_types') AS pid
o.id AS organizationid,
coalesce((array_agg(a.acronym))[1], o.name) AS legalshortname,
o.name AS legalname,
array_agg(DISTINCT n.name) AS alternativenames,
(array_agg(u.url))[1] AS websiteurl,
'' AS logourl,
DATE(o.creation_date) AS dateofcollection,
DATE(o.modification_date) AS dateoftransformation,
false AS ecenterprise,
false AS echighereducation,
false AS ecinternationalorganization,
false AS ecinternationalorganizationeurinterests,
false AS eclegalbody,
false AS eclegalperson,
false AS ecnonprofit,
false AS ecnutscode,
false AS ecresearchorganization,
false AS ecsmevalidated,
false AS inferred,
false AS deletedbyinference,
0.99 AS trust,
'' AS inferenceprovenance,
'openaire____::openorgs' AS collectedfromid,
'OpenOrgs Database' AS collectedfromname,
o.country || '@@@dnet:countries' AS country,
'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction,
array_agg(DISTINCT i.otherid || '###' || i.type || '@@@dnet:pid_types') AS pid
FROM organizations o
LEFT OUTER JOIN acronyms a ON (a.id = o.id)
LEFT OUTER JOIN urls u ON (u.id = o.id)
@ -28,21 +40,33 @@ GROUP BY
UNION ALL
SELECT
'openorgsmesh'||substring(o.id, 13)||'-'||md5(n.name) AS organizationid,
n.name AS legalshortname,
n.name AS legalname,
ARRAY[]::text[] AS "alternativeNames",
(array_agg(u.url))[1] AS websiteurl,
o.modification_date AS dateoftransformation,
false AS inferred,
false AS deletedbyinference,
0.88 AS trust,
'' AS inferenceprovenance,
'openaire____::openorgs' AS collectedfromid,
'OpenOrgs Database' AS collectedfromname,
o.country || '@@@dnet:countries' AS country,
'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction,
array_agg(DISTINCT i.otherid || '###' || i.type || '@@@dnet:pid_types') AS pid
'openorgsmesh'||substring(o.id, 13)||'-'||md5(n.name) AS organizationid,
n.name AS legalshortname,
n.name AS legalname,
ARRAY[]::text[] AS alternativenames,
(array_agg(u.url))[1] AS websiteurl,
'' AS logourl,
DATE(o.creation_date) AS dateofcollection,
DATE(o.modification_date) AS dateoftransformation,
false AS ecenterprise,
false AS echighereducation,
false AS ecinternationalorganization,
false AS ecinternationalorganizationeurinterests,
false AS eclegalbody,
false AS eclegalperson,
false AS ecnonprofit,
false AS ecnutscode,
false AS ecresearchorganization,
false AS ecsmevalidated,
false AS inferred,
false AS deletedbyinference,
0.5 AS trust,
'' AS inferenceprovenance,
'openaire____::openorgs' AS collectedfromid,
'OpenOrgs Database' AS collectedfromname,
o.country || '@@@dnet:countries' AS country,
'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction,
array_agg(DISTINCT i.otherid || '###' || i.type || '@@@dnet:pid_types') AS pid
FROM other_names n
LEFT OUTER JOIN organizations o ON (n.id = o.id)
LEFT OUTER JOIN urls u ON (u.id = o.id)

View File

@ -1,17 +1,24 @@
SELECT local_id AS id1, oa_original_id AS id2 FROM openaire_simrels WHERE reltype = 'is_similar'
UNION ALL
SELECT
o.id AS id1,
'openorgsmesh'||substring(o.id, 13)||'-'||md5(a.acronym) AS id2
FROM acronyms a
LEFT OUTER JOIN organizations o ON (a.id = o.id)
SELECT
local_id AS id1,
oa_original_id AS id2,
'openaire____::openorgs' AS collectedfromid,
'OpenOrgs Database' AS collectedfromname,
false AS inferred,
false AS deletedbyinference,
0.99 AS trust,
'' AS inferenceprovenance
FROM oa_duplicates WHERE reltype = 'is_similar'
UNION ALL
SELECT
o.id AS id1,
'openorgsmesh'||substring(o.id, 13)||'-'||md5(n.name) AS id2
o.id AS id1,
'openorgsmesh'||substring(o.id, 13)||'-'||md5(n.name) AS id2,
'openaire____::openorgs' AS collectedfromid,
'OpenOrgs Database' AS collectedfromname,
false AS inferred,
false AS deletedbyinference,
0.99 AS trust,
'' AS inferenceprovenance
FROM other_names n
LEFT OUTER JOIN organizations o ON (n.id = o.id)

View File

@ -27,6 +27,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.common.DbClient;
import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.Datasource;
@ -47,6 +48,9 @@ public class MigrateDbEntitiesApplicationTest {
@Mock
private VocabularyGroup vocs;
@Mock
private DbClient dbClient;
@BeforeEach
public void setUp() {
lenient()
@ -59,7 +63,7 @@ public class MigrateDbEntitiesApplicationTest {
lenient().when(vocs.termExists(anyString(), anyString())).thenReturn(true);
this.app = new MigrateDbEntitiesApplication(vocs);
this.app = new MigrateDbEntitiesApplication(dbClient, vocs);
}
@Test