forked from D-Net/dnet-hadoop
Merge pull request 'nsprefix_blacklist' (#34) from nsprefix_blacklist into master
This commit is contained in:
commit
a89b6cc3ba
|
@ -44,6 +44,7 @@ import java.util.Date;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
import java.util.function.Predicate;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
@ -53,6 +54,7 @@ import org.slf4j.LoggerFactory;
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.common.DbClient;
|
import eu.dnetlib.dhp.common.DbClient;
|
||||||
import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;
|
import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;
|
||||||
|
import eu.dnetlib.dhp.oa.graph.raw.common.VerifyNsPrefixPredicate;
|
||||||
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Context;
|
import eu.dnetlib.dhp.schema.oaf.Context;
|
||||||
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
||||||
|
@ -113,6 +115,11 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
|
||||||
final String hdfsPath = parser.get("hdfsPath");
|
final String hdfsPath = parser.get("hdfsPath");
|
||||||
log.info("hdfsPath: {}", hdfsPath);
|
log.info("hdfsPath: {}", hdfsPath);
|
||||||
|
|
||||||
|
final String nsPrefixBlacklist = parser.get("nsPrefixBlacklist");
|
||||||
|
log.info("nsPrefixBlacklist: {}", nsPrefixBlacklist);
|
||||||
|
|
||||||
|
final Predicate<Oaf> verifyNamespacePrefix = new VerifyNsPrefixPredicate(nsPrefixBlacklist);
|
||||||
|
|
||||||
final boolean processClaims = parser.get("action") != null && parser.get("action").equalsIgnoreCase("claims");
|
final boolean processClaims = parser.get("action") != null && parser.get("action").equalsIgnoreCase("claims");
|
||||||
log.info("processClaims: {}", processClaims);
|
log.info("processClaims: {}", processClaims);
|
||||||
|
|
||||||
|
@ -123,23 +130,25 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
|
||||||
smdbe.execute("queryClaims.sql", smdbe::processClaims);
|
smdbe.execute("queryClaims.sql", smdbe::processClaims);
|
||||||
} else {
|
} else {
|
||||||
log.info("Processing datasources...");
|
log.info("Processing datasources...");
|
||||||
smdbe.execute("queryDatasources.sql", smdbe::processDatasource);
|
smdbe.execute("queryDatasources.sql", smdbe::processDatasource, verifyNamespacePrefix);
|
||||||
|
|
||||||
log.info("Processing projects...");
|
log.info("Processing projects...");
|
||||||
if (dbSchema.equalsIgnoreCase("beta")) {
|
if (dbSchema.equalsIgnoreCase("beta")) {
|
||||||
smdbe.execute("queryProjects.sql", smdbe::processProject);
|
smdbe.execute("queryProjects.sql", smdbe::processProject, verifyNamespacePrefix);
|
||||||
} else {
|
} else {
|
||||||
smdbe.execute("queryProjects_production.sql", smdbe::processProject);
|
smdbe.execute("queryProjects_production.sql", smdbe::processProject, verifyNamespacePrefix);
|
||||||
}
|
}
|
||||||
|
|
||||||
log.info("Processing orgs...");
|
log.info("Processing orgs...");
|
||||||
smdbe.execute("queryOrganizations.sql", smdbe::processOrganization);
|
smdbe.execute("queryOrganizations.sql", smdbe::processOrganization, verifyNamespacePrefix);
|
||||||
|
|
||||||
log.info("Processing relationsNoRemoval ds <-> orgs ...");
|
log.info("Processing relationsNoRemoval ds <-> orgs ...");
|
||||||
smdbe.execute("queryDatasourceOrganization.sql", smdbe::processDatasourceOrganization);
|
smdbe
|
||||||
|
.execute(
|
||||||
|
"queryDatasourceOrganization.sql", smdbe::processDatasourceOrganization, verifyNamespacePrefix);
|
||||||
|
|
||||||
log.info("Processing projects <-> orgs ...");
|
log.info("Processing projects <-> orgs ...");
|
||||||
smdbe.execute("queryProjectOrganization.sql", smdbe::processProjectOrganization);
|
smdbe.execute("queryProjectOrganization.sql", smdbe::processProjectOrganization, verifyNamespacePrefix);
|
||||||
}
|
}
|
||||||
log.info("All done.");
|
log.info("All done.");
|
||||||
}
|
}
|
||||||
|
@ -163,10 +172,20 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
|
||||||
}
|
}
|
||||||
|
|
||||||
public void execute(final String sqlFile, final Function<ResultSet, List<Oaf>> producer)
|
public void execute(final String sqlFile, final Function<ResultSet, List<Oaf>> producer)
|
||||||
|
throws Exception {
|
||||||
|
execute(sqlFile, producer, oaf -> true);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void execute(final String sqlFile, final Function<ResultSet, List<Oaf>> producer,
|
||||||
|
final Predicate<Oaf> predicate)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
final String sql = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/sql/" + sqlFile));
|
final String sql = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/sql/" + sqlFile));
|
||||||
|
|
||||||
final Consumer<ResultSet> consumer = rs -> producer.apply(rs).forEach(oaf -> emitOaf(oaf));
|
final Consumer<ResultSet> consumer = rs -> producer.apply(rs).forEach(oaf -> {
|
||||||
|
if (predicate.test(oaf)) {
|
||||||
|
emitOaf(oaf);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
dbClient.processResults(sql, consumer);
|
dbClient.processResults(sql, consumer);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,62 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.oa.graph.raw.common;
|
||||||
|
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.function.Predicate;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
|
||||||
|
import com.google.common.base.Splitter;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Datasource;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This predicate should be used to skip oaf objects using a blacklist of nsprefixes.
|
||||||
|
*
|
||||||
|
* @author michele
|
||||||
|
*/
|
||||||
|
public class VerifyNsPrefixPredicate implements Predicate<Oaf> {
|
||||||
|
|
||||||
|
final Set<String> invalids = new HashSet<>();
|
||||||
|
|
||||||
|
public VerifyNsPrefixPredicate(final String blacklist) {
|
||||||
|
if (StringUtils.isNotBlank(blacklist)) {
|
||||||
|
Splitter
|
||||||
|
.on(",")
|
||||||
|
.trimResults()
|
||||||
|
.omitEmptyStrings()
|
||||||
|
.split(blacklist)
|
||||||
|
.forEach(invalids::add);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean test(final Oaf oaf) {
|
||||||
|
if (oaf instanceof Datasource) {
|
||||||
|
return testValue(((Datasource) oaf).getNamespaceprefix().getValue());
|
||||||
|
} else if (oaf instanceof OafEntity) {
|
||||||
|
return testValue(((OafEntity) oaf).getId());
|
||||||
|
} else if (oaf instanceof Relation) {
|
||||||
|
return testValue(((Relation) oaf).getSource()) && testValue(((Relation) oaf).getTarget());
|
||||||
|
} else {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected boolean testValue(final String s) {
|
||||||
|
if (StringUtils.isNotBlank(s)) {
|
||||||
|
for (final String invalid : invalids) {
|
||||||
|
if (Pattern.matches("^(\\d\\d\\|)?" + invalid + ".*$", s)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -40,5 +40,11 @@
|
||||||
"paramLongName": "dbschema",
|
"paramLongName": "dbschema",
|
||||||
"paramDescription": "the database schema according to the D-Net infrastructure (beta or production)",
|
"paramDescription": "the database schema according to the D-Net infrastructure (beta or production)",
|
||||||
"paramRequired": true
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "nsbl",
|
||||||
|
"paramLongName": "nsPrefixBlacklist",
|
||||||
|
"paramDescription": "a blacklist of nsprefixes (comma separeted)",
|
||||||
|
"paramRequired": false
|
||||||
}
|
}
|
||||||
]
|
]
|
|
@ -43,7 +43,11 @@
|
||||||
<name>isLookupUrl</name>
|
<name>isLookupUrl</name>
|
||||||
<description>the address of the lookUp service</description>
|
<description>the address of the lookUp service</description>
|
||||||
</property>
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>nsPrefixBlacklist</name>
|
||||||
|
<value></value>
|
||||||
|
<description>a blacklist of nsprefixes (comma separeted)</description>
|
||||||
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>sparkDriverMemory</name>
|
<name>sparkDriverMemory</name>
|
||||||
<description>memory for driver process</description>
|
<description>memory for driver process</description>
|
||||||
|
@ -131,6 +135,7 @@
|
||||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||||
<arg>--action</arg><arg>claims</arg>
|
<arg>--action</arg><arg>claims</arg>
|
||||||
<arg>--dbschema</arg><arg>${dbSchema}</arg>
|
<arg>--dbschema</arg><arg>${dbSchema}</arg>
|
||||||
|
<arg>--nsPrefixBlacklist</arg><arg>${nsPrefixBlacklist}</arg>
|
||||||
</java>
|
</java>
|
||||||
<ok to="ImportODF_claims"/>
|
<ok to="ImportODF_claims"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
|
@ -182,6 +187,7 @@
|
||||||
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
|
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
|
||||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||||
<arg>--dbschema</arg><arg>${dbSchema}</arg>
|
<arg>--dbschema</arg><arg>${dbSchema}</arg>
|
||||||
|
<arg>--nsPrefixBlacklist</arg><arg>${nsPrefixBlacklist}</arg>
|
||||||
</java>
|
</java>
|
||||||
<ok to="ImportODF"/>
|
<ok to="ImportODF"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
|
|
|
@ -38,7 +38,11 @@
|
||||||
<name>isLookupUrl</name>
|
<name>isLookupUrl</name>
|
||||||
<description>the address of the lookUp service</description>
|
<description>the address of the lookUp service</description>
|
||||||
</property>
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>nsPrefixBlacklist</name>
|
||||||
|
<value></value>
|
||||||
|
<description>a blacklist of nsprefixes (comma separeted)</description>
|
||||||
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>sparkDriverMemory</name>
|
<name>sparkDriverMemory</name>
|
||||||
<description>memory for driver process</description>
|
<description>memory for driver process</description>
|
||||||
|
@ -113,6 +117,7 @@
|
||||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||||
<arg>--action</arg><arg>claims</arg>
|
<arg>--action</arg><arg>claims</arg>
|
||||||
<arg>--dbschema</arg><arg>${dbSchema}</arg>
|
<arg>--dbschema</arg><arg>${dbSchema}</arg>
|
||||||
|
<arg>--nsPrefixBlacklist</arg><arg>${nsPrefixBlacklist}</arg>
|
||||||
</java>
|
</java>
|
||||||
<ok to="ImportODF_claims"/>
|
<ok to="ImportODF_claims"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
|
|
|
@ -25,7 +25,11 @@
|
||||||
<name>isLookupUrl</name>
|
<name>isLookupUrl</name>
|
||||||
<description>the address of the lookUp service</description>
|
<description>the address of the lookUp service</description>
|
||||||
</property>
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>nsPrefixBlacklist</name>
|
||||||
|
<value></value>
|
||||||
|
<description>a blacklist of nsprefixes (comma separeted)</description>
|
||||||
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>sparkDriverMemory</name>
|
<name>sparkDriverMemory</name>
|
||||||
<description>memory for driver process</description>
|
<description>memory for driver process</description>
|
||||||
|
@ -99,6 +103,7 @@
|
||||||
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
|
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
|
||||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||||
<arg>--dbschema</arg><arg>${dbSchema}</arg>
|
<arg>--dbschema</arg><arg>${dbSchema}</arg>
|
||||||
|
<arg>--nsPrefixBlacklist</arg><arg>${nsPrefixBlacklist}</arg>
|
||||||
</java>
|
</java>
|
||||||
<ok to="ImportDB_claims"/>
|
<ok to="ImportDB_claims"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
|
@ -117,6 +122,7 @@
|
||||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||||
<arg>--dbschema</arg><arg>${dbSchema}</arg>
|
<arg>--dbschema</arg><arg>${dbSchema}</arg>
|
||||||
<arg>--action</arg><arg>claims</arg>
|
<arg>--action</arg><arg>claims</arg>
|
||||||
|
<arg>--nsPrefixBlacklist</arg><arg>${nsPrefixBlacklist}</arg>
|
||||||
</java>
|
</java>
|
||||||
<ok to="End"/>
|
<ok to="End"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
|
|
|
@ -28,6 +28,11 @@
|
||||||
<name>isLookupUrl</name>
|
<name>isLookupUrl</name>
|
||||||
<description>the address of the lookUp service</description>
|
<description>the address of the lookUp service</description>
|
||||||
</property>
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>nsPrefixBlacklist</name>
|
||||||
|
<value></value>
|
||||||
|
<description>a blacklist of nsprefixes (comma separeted)</description>
|
||||||
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>sparkDriverMemory</name>
|
<name>sparkDriverMemory</name>
|
||||||
<description>memory for driver process</description>
|
<description>memory for driver process</description>
|
||||||
|
@ -67,6 +72,7 @@
|
||||||
<arg>-pguser</arg><arg>${postgresUser}</arg>
|
<arg>-pguser</arg><arg>${postgresUser}</arg>
|
||||||
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
|
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
|
||||||
<arg>-islookup</arg><arg>${isLookupUrl}</arg>
|
<arg>-islookup</arg><arg>${isLookupUrl}</arg>
|
||||||
|
<arg>--nsPrefixBlacklist</arg><arg>${nsPrefixBlacklist}</arg>
|
||||||
</java>
|
</java>
|
||||||
<ok to="ImportODF"/>
|
<ok to="ImportODF"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
|
|
|
@ -0,0 +1,92 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.oa.graph.raw.common;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Datasource;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Field;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Project;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
|
|
||||||
|
class VerifyNsPrefixPredicateTest {
|
||||||
|
|
||||||
|
private VerifyNsPrefixPredicate predicate;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
void setUp() throws Exception {
|
||||||
|
predicate = new VerifyNsPrefixPredicate("corda,nsf,wt");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testTestValue() {
|
||||||
|
assertFalse(predicate.testValue("corda__2020"));
|
||||||
|
assertFalse(predicate.testValue("nsf________"));
|
||||||
|
assertFalse(predicate.testValue("nsf"));
|
||||||
|
assertFalse(predicate.testValue("corda"));
|
||||||
|
assertFalse(predicate.testValue("10|corda_______::fjkdsfjksdhfksj"));
|
||||||
|
assertFalse(predicate.testValue("20|corda_______::fjkdsfjksdhfksj"));
|
||||||
|
|
||||||
|
assertTrue(predicate.testValue("xxxxxx_____"));
|
||||||
|
assertTrue(predicate.testValue("10|xxxxxx_____::sdasdasaddasad"));
|
||||||
|
|
||||||
|
assertTrue(predicate.testValue(null));
|
||||||
|
assertTrue(predicate.testValue(""));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testTest_ds_true() {
|
||||||
|
final Field<String> prefix = new Field<>();
|
||||||
|
prefix.setValue("xxxxxx______");
|
||||||
|
|
||||||
|
final Datasource ds = new Datasource();
|
||||||
|
ds.setNamespaceprefix(prefix);
|
||||||
|
|
||||||
|
assertTrue(predicate.test(ds));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testTest_ds_false() {
|
||||||
|
final Field<String> prefix = new Field<>();
|
||||||
|
prefix.setValue("corda__2020");
|
||||||
|
|
||||||
|
final Datasource ds = new Datasource();
|
||||||
|
ds.setNamespaceprefix(prefix);
|
||||||
|
|
||||||
|
assertFalse(predicate.test(ds));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testTest_rel_true() {
|
||||||
|
final Relation rel = new Relation();
|
||||||
|
rel.setSource("10|yyyyyy______:sdfsfsffsdfs");
|
||||||
|
rel.setTarget("10|xxxxxx______:sdfsfsffsdfs");
|
||||||
|
assertTrue(predicate.test(rel));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testTest_rel_false() {
|
||||||
|
final Relation rel = new Relation();
|
||||||
|
rel.setSource("10|corda_______:sdfsfsffsdfs");
|
||||||
|
rel.setTarget("10|xxxxxx______:sdfsfsffsdfs");
|
||||||
|
assertFalse(predicate.test(rel));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testTest_proj_true() {
|
||||||
|
final Project p = new Project();
|
||||||
|
p.setId("10|xxxxxx______:sdfsfsffsdfs");
|
||||||
|
assertTrue(predicate.test(p));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testTest_proj_false() {
|
||||||
|
final Project p = new Project();
|
||||||
|
p.setId("10|corda_____:sdfsfsffsdfs");
|
||||||
|
assertFalse(predicate.test(p));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue