Configuration of the SequenceFile Writer

This commit is contained in:
Michele Artini 2020-03-05 15:49:14 +01:00
parent ccb153de78
commit b6efa9d6ab
14 changed files with 81 additions and 143 deletions

View File

@ -68,12 +68,10 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
final String dbPassword = parser.get("postgresPassword"); final String dbPassword = parser.get("postgresPassword");
final String hdfsPath = parser.get("hdfsPath"); final String hdfsPath = parser.get("hdfsPath");
final String hdfsNameNode = parser.get("namenode");
final String hdfsUser = parser.get("hdfsUser");
final boolean processClaims = parser.get("action") != null && parser.get("action").equalsIgnoreCase("claims"); final boolean processClaims = parser.get("action") != null && parser.get("action").equalsIgnoreCase("claims");
try (final MigrateDbEntitiesApplication smdbe = new MigrateDbEntitiesApplication(hdfsPath, hdfsNameNode, hdfsUser, dbUrl, dbUser, dbPassword)) { try (final MigrateDbEntitiesApplication smdbe = new MigrateDbEntitiesApplication(hdfsPath, dbUrl, dbUser, dbPassword)) {
if (processClaims) { if (processClaims) {
log.info("Processing claims..."); log.info("Processing claims...");
smdbe.execute("queryClaims.sql", smdbe::processClaims); smdbe.execute("queryClaims.sql", smdbe::processClaims);
@ -97,9 +95,9 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
} }
} }
public MigrateDbEntitiesApplication(final String hdfsPath, final String hdfsNameNode, final String hdfsUser, final String dbUrl, final String dbUser, public MigrateDbEntitiesApplication(final String hdfsPath, final String dbUrl, final String dbUser,
final String dbPassword) throws Exception { final String dbPassword) throws Exception {
super(hdfsPath, hdfsNameNode, hdfsUser); super(hdfsPath);
this.dbClient = new DbClient(dbUrl, dbUser, dbPassword); this.dbClient = new DbClient(dbUrl, dbUser, dbPassword);
this.lastUpdateTimestamp = new Date().getTime(); this.lastUpdateTimestamp = new Date().getTime();
} }

View File

@ -32,18 +32,15 @@ public class MigrateMongoMdstoresApplication extends AbstractMigrationApplicatio
final String mdInterpretation = parser.get("mdInterpretation"); final String mdInterpretation = parser.get("mdInterpretation");
final String hdfsPath = parser.get("hdfsPath"); final String hdfsPath = parser.get("hdfsPath");
final String hdfsNameNode = parser.get("namenode");
final String hdfsUser = parser.get("hdfsUser");
try (MigrateMongoMdstoresApplication app = new MigrateMongoMdstoresApplication(hdfsPath, hdfsNameNode, hdfsUser, mongoBaseUrl, mongoDb)) { try (MigrateMongoMdstoresApplication app = new MigrateMongoMdstoresApplication(hdfsPath, mongoBaseUrl, mongoDb)) {
app.execute(mdFormat, mdLayout, mdInterpretation); app.execute(mdFormat, mdLayout, mdInterpretation);
} }
} }
public MigrateMongoMdstoresApplication(final String hdfsPath, final String hdfsNameNode, final String hdfsUser, final String mongoBaseUrl, public MigrateMongoMdstoresApplication(final String hdfsPath, final String mongoBaseUrl, final String mongoDb) throws Exception {
final String mongoDb) throws Exception { super(hdfsPath);
super(hdfsPath, hdfsNameNode, hdfsUser);
this.mdstoreClient = new MdstoreClient(mongoBaseUrl, mongoDb); this.mdstoreClient = new MdstoreClient(mongoBaseUrl, mongoDb);
} }

View File

@ -2,13 +2,11 @@ package eu.dnetlib.dhp.migration.utils;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.net.URI;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
@ -30,22 +28,21 @@ public class AbstractMigrationApplication implements Closeable {
private static final Log log = LogFactory.getLog(AbstractMigrationApplication.class); private static final Log log = LogFactory.getLog(AbstractMigrationApplication.class);
public AbstractMigrationApplication(final String hdfsPath, final String hdfsNameNode, final String hdfsUser) throws Exception { public AbstractMigrationApplication(final String hdfsPath) throws Exception {
log.info(String.format("Creating SequenceFile Writer, hdfsPath=%s, nameNode=%s, user=%s", hdfsPath, hdfsNameNode, hdfsUser)); log.info(String.format("Creating SequenceFile Writer, hdfsPath=%s", hdfsPath));
this.writer = SequenceFile.createWriter(getConf(hdfsNameNode, hdfsUser), SequenceFile.Writer.file(new Path(hdfsPath)), SequenceFile.Writer this.writer = SequenceFile.createWriter(getConf(), SequenceFile.Writer.file(new Path(hdfsPath)), SequenceFile.Writer
.keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class)); .keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class));
} }
private Configuration getConf(final String hdfsNameNode, final String hdfsUser) throws IOException { private Configuration getConf() throws IOException {
final Configuration conf = new Configuration(); final Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfsNameNode); /*
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); * conf.set("fs.defaultFS", hdfsNameNode); conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); * conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); System.setProperty("HADOOP_USER_NAME", hdfsUser);
System.setProperty("HADOOP_USER_NAME", hdfsUser); * System.setProperty("hadoop.home.dir", "/"); FileSystem.get(URI.create(hdfsNameNode), conf);
System.setProperty("hadoop.home.dir", "/"); */
FileSystem.get(URI.create(hdfsNameNode), conf);
return conf; return conf;
} }

View File

@ -5,18 +5,6 @@
"paramDescription": "the path where storing the sequential file", "paramDescription": "the path where storing the sequential file",
"paramRequired": true "paramRequired": true
}, },
{
"paramName": "n",
"paramLongName": "namenode",
"paramDescription": "the Name Node URI",
"paramRequired": true
},
{
"paramName": "u",
"paramLongName": "hdfsUser",
"paramDescription": "the user wich create the hdfs seq file",
"paramRequired": true
},
{ {
"paramName": "pgurl", "paramName": "pgurl",
"paramLongName": "postgresUrl", "paramLongName": "postgresUrl",

View File

@ -5,18 +5,6 @@
"paramDescription": "the path where storing the sequential file", "paramDescription": "the path where storing the sequential file",
"paramRequired": true "paramRequired": true
}, },
{
"paramName": "n",
"paramLongName": "namenode",
"paramDescription": "the Name Node URI",
"paramRequired": true
},
{
"paramName": "u",
"paramLongName": "hdfsUser",
"paramDescription": "the user wich create the hdfs seq file",
"paramRequired": true
},
{ {
"paramName": "mongourl", "paramName": "mongourl",
"paramLongName": "mongoBaseUrl", "paramLongName": "mongoBaseUrl",

View File

@ -15,8 +15,4 @@
<name>oozie.action.sharelib.for.spark</name> <name>oozie.action.sharelib.for.spark</name>
<value>spark2</value> <value>spark2</value>
</property> </property>
<property>
<name>hdfsUser</name>
<value>dnet</value>
</property>
</configuration> </configuration>

View File

@ -67,8 +67,6 @@
<name-node>${nameNode}</name-node> <name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.dhp.migration.step1.MigrateDbEntitiesApplication</main-class> <main-class>eu.dnetlib.dhp.migration.step1.MigrateDbEntitiesApplication</main-class>
<arg>-p</arg><arg>${migrationClaimsPathStep1}/db_claims</arg> <arg>-p</arg><arg>${migrationClaimsPathStep1}/db_claims</arg>
<arg>-n</arg><arg>${nameNode}</arg>
<arg>-u</arg><arg>${hdfsUser}</arg>
<arg>-pgurl</arg><arg>${postgresURL}</arg> <arg>-pgurl</arg><arg>${postgresURL}</arg>
<arg>-pguser</arg><arg>${postgresUser}</arg> <arg>-pguser</arg><arg>${postgresUser}</arg>
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg> <arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
@ -84,8 +82,6 @@
<name-node>${nameNode}</name-node> <name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication</main-class> <main-class>eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication</main-class>
<arg>-p</arg><arg>${migrationClaimsPathStep1}/odf_claims</arg> <arg>-p</arg><arg>${migrationClaimsPathStep1}/odf_claims</arg>
<arg>-n</arg><arg>${nameNode}</arg>
<arg>-u</arg><arg>${hdfsUser}</arg>
<arg>-mongourl</arg><arg>${mongoURL}</arg> <arg>-mongourl</arg><arg>${mongoURL}</arg>
<arg>-mongodb</arg><arg>${mongoDb}</arg> <arg>-mongodb</arg><arg>${mongoDb}</arg>
<arg>-f</arg><arg>ODF</arg> <arg>-f</arg><arg>ODF</arg>
@ -102,8 +98,6 @@
<name-node>${nameNode}</name-node> <name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication</main-class> <main-class>eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication</main-class>
<arg>-p</arg><arg>${migrationClaimsPathStep1}/oaf_claims</arg> <arg>-p</arg><arg>${migrationClaimsPathStep1}/oaf_claims</arg>
<arg>-n</arg><arg>${nameNode}</arg>
<arg>-u</arg><arg>${hdfsUser}</arg>
<arg>-mongourl</arg><arg>${mongoURL}</arg> <arg>-mongourl</arg><arg>${mongoURL}</arg>
<arg>-mongodb</arg><arg>${mongoDb}</arg> <arg>-mongodb</arg><arg>${mongoDb}</arg>
<arg>-f</arg><arg>OAF</arg> <arg>-f</arg><arg>OAF</arg>

View File

@ -15,8 +15,4 @@
<name>oozie.action.sharelib.for.spark</name> <name>oozie.action.sharelib.for.spark</name>
<value>spark2</value> <value>spark2</value>
</property> </property>
<property>
<name>hdfsUser</name>
<value>dnet</value>
</property>
</configuration> </configuration>

View File

@ -67,8 +67,6 @@
<name-node>${nameNode}</name-node> <name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.dhp.migration.step1.MigrateDbEntitiesApplication</main-class> <main-class>eu.dnetlib.dhp.migration.step1.MigrateDbEntitiesApplication</main-class>
<arg>-p</arg><arg>${migrationPathStep1}/db_records</arg> <arg>-p</arg><arg>${migrationPathStep1}/db_records</arg>
<arg>-n</arg><arg>${nameNode}</arg>
<arg>-u</arg><arg>${hdfsUser}</arg>
<arg>-pgurl</arg><arg>${postgresURL}</arg> <arg>-pgurl</arg><arg>${postgresURL}</arg>
<arg>-pguser</arg><arg>${postgresUser}</arg> <arg>-pguser</arg><arg>${postgresUser}</arg>
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg> <arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
@ -83,8 +81,6 @@
<name-node>${nameNode}</name-node> <name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication</main-class> <main-class>eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication</main-class>
<arg>-p</arg><arg>${migrationPathStep1}/odf_records</arg> <arg>-p</arg><arg>${migrationPathStep1}/odf_records</arg>
<arg>-n</arg><arg>${nameNode}</arg>
<arg>-u</arg><arg>${hdfsUser}</arg>
<arg>-mongourl</arg><arg>${mongoURL}</arg> <arg>-mongourl</arg><arg>${mongoURL}</arg>
<arg>-mongodb</arg><arg>${mongoDb}</arg> <arg>-mongodb</arg><arg>${mongoDb}</arg>
<arg>-f</arg><arg>ODF</arg> <arg>-f</arg><arg>ODF</arg>
@ -101,8 +97,6 @@
<name-node>${nameNode}</name-node> <name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication</main-class> <main-class>eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication</main-class>
<arg>-p</arg><arg>${migrationPathStep1}/oaf_records</arg> <arg>-p</arg><arg>${migrationPathStep1}/oaf_records</arg>
<arg>-n</arg><arg>${nameNode}</arg>
<arg>-u</arg><arg>${hdfsUser}</arg>
<arg>-mongourl</arg><arg>${mongoURL}</arg> <arg>-mongourl</arg><arg>${mongoURL}</arg>
<arg>-mongodb</arg><arg>${mongoDb}</arg> <arg>-mongodb</arg><arg>${mongoDb}</arg>
<arg>-f</arg><arg>OAF</arg> <arg>-f</arg><arg>OAF</arg>

View File

@ -15,8 +15,4 @@
<name>oozie.action.sharelib.for.spark</name> <name>oozie.action.sharelib.for.spark</name>
<value>spark2</value> <value>spark2</value>
</property> </property>
<property>
<name>hdfsUser</name>
<value>dnet</value>
</property>
</configuration> </configuration>

View File

@ -59,8 +59,6 @@
<name-node>${nameNode}</name-node> <name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.dhp.migration.step1.MigrateDbEntitiesApplication</main-class> <main-class>eu.dnetlib.dhp.migration.step1.MigrateDbEntitiesApplication</main-class>
<arg>-p</arg><arg>${migrationPathStep1}/db_records</arg> <arg>-p</arg><arg>${migrationPathStep1}/db_records</arg>
<arg>-n</arg><arg>${nameNode}</arg>
<arg>-u</arg><arg>${hdfsUser}</arg>
<arg>-pgurl</arg><arg>${postgresURL}</arg> <arg>-pgurl</arg><arg>${postgresURL}</arg>
<arg>-pguser</arg><arg>${postgresUser}</arg> <arg>-pguser</arg><arg>${postgresUser}</arg>
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg> <arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
@ -75,8 +73,6 @@
<name-node>${nameNode}</name-node> <name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication</main-class> <main-class>eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication</main-class>
<arg>-p</arg><arg>${migrationPathStep1}/odf_records</arg> <arg>-p</arg><arg>${migrationPathStep1}/odf_records</arg>
<arg>-n</arg><arg>${nameNode}</arg>
<arg>-u</arg><arg>${hdfsUser}</arg>
<arg>-mongourl</arg><arg>${mongoURL}</arg> <arg>-mongourl</arg><arg>${mongoURL}</arg>
<arg>-mongodb</arg><arg>${mongoDb}</arg> <arg>-mongodb</arg><arg>${mongoDb}</arg>
<arg>-f</arg><arg>ODF</arg> <arg>-f</arg><arg>ODF</arg>
@ -93,8 +89,6 @@
<name-node>${nameNode}</name-node> <name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication</main-class> <main-class>eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication</main-class>
<arg>-p</arg><arg>${migrationPathStep1}/oaf_records</arg> <arg>-p</arg><arg>${migrationPathStep1}/oaf_records</arg>
<arg>-n</arg><arg>${nameNode}</arg>
<arg>-u</arg><arg>${hdfsUser}</arg>
<arg>-mongourl</arg><arg>${mongoURL}</arg> <arg>-mongourl</arg><arg>${mongoURL}</arg>
<arg>-mongodb</arg><arg>${mongoDb}</arg> <arg>-mongodb</arg><arg>${mongoDb}</arg>
<arg>-f</arg><arg>OAF</arg> <arg>-f</arg><arg>OAF</arg>

View File

@ -15,8 +15,4 @@
<name>oozie.action.sharelib.for.spark</name> <name>oozie.action.sharelib.for.spark</name>
<value>spark2</value> <value>spark2</value>
</property> </property>
<property>
<name>hdfsUser</name>
<value>dnet</value>
</property>
</configuration> </configuration>

View File

@ -15,8 +15,4 @@
<name>oozie.action.sharelib.for.spark</name> <name>oozie.action.sharelib.for.spark</name>
<value>spark2</value> <value>spark2</value>
</property> </property>
<property>
<name>hdfsUser</name>
<value>dnet</value>
</property>
</configuration> </configuration>

View File

@ -1,79 +1,87 @@
package eu.dnetlib.dhp.collection; package eu.dnetlib.dhp.collection;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.model.mdstore.Provenance;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.junit.*;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.model.mdstore.Provenance;
public class CollectionJobTest { public class CollectionJobTest {
private Path testDir;
@Before private Path testDir;
public void setup() throws IOException {
testDir = Files.createTempDirectory("dhp-collection");
}
@After @Before
public void teadDown() throws IOException { public void setup() throws IOException {
FileUtils.deleteDirectory(testDir.toFile()); testDir = Files.createTempDirectory("dhp-collection");
} }
@Test @After
public void tesCollection() throws Exception { public void teadDown() throws IOException {
Provenance provenance = new Provenance("pippo", "puppa", "ns_prefix"); FileUtils.deleteDirectory(testDir.toFile());
GenerateNativeStoreSparkJob.main(new String[] { }
"-mt", "local",
"-w", "wid",
"-e", "XML",
"-d", ""+System.currentTimeMillis(),
"-p", new ObjectMapper().writeValueAsString(provenance),
"-x", "./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']",
"-i", this.getClass().getResource("/eu/dnetlib/dhp/collection/native.seq").toString(),
"-o", testDir.toString()+"/store",
"-t", "true",
"-ru", "",
"-rp", "",
"-rh", "",
"-ro", "",
"-rr", ""});
System.out.println(new ObjectMapper().writeValueAsString(provenance));
}
@Test
public void tesCollection() throws Exception {
final Provenance provenance = new Provenance("pippo", "puppa", "ns_prefix");
GenerateNativeStoreSparkJob.main(new String[] {
"-mt", "local",
"-w", "wid",
"-e", "XML",
"-d", "" + System.currentTimeMillis(),
"-p", new ObjectMapper().writeValueAsString(provenance),
"-x", "./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']",
"-i", this.getClass().getResource("/eu/dnetlib/dhp/collection/native.seq").toString(),
"-o", testDir.toString() + "/store",
"-t", "true",
"-ru", "",
"-rp", "",
"-rh", "",
"-ro", "",
"-rr", "" });
System.out.println(new ObjectMapper().writeValueAsString(provenance));
}
@Test
public void testGenerationMetadataRecord() throws Exception {
@Test final String xml = IOUtils.toString(this.getClass().getResourceAsStream("./record.xml"));
public void testGenerationMetadataRecord() throws Exception {
final String xml = IOUtils.toString(this.getClass().getResourceAsStream("./record.xml")); final MetadataRecord record = GenerateNativeStoreSparkJob
.parseRecord(xml, "./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']", "XML", new Provenance("foo", "bar",
"ns_prefix"), System.currentTimeMillis(), null, null);
MetadataRecord record = GenerateNativeStoreSparkJob.parseRecord(xml, "./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']", "XML", new Provenance("foo", "bar", "ns_prefix"), System.currentTimeMillis(), null,null); assert record != null;
System.out.println(record.getId());
System.out.println(record.getOriginalId());
assert record != null; }
System.out.println(record.getId());
System.out.println(record.getOriginalId());
@Test
public void TestEquals() throws IOException {
} final String xml = IOUtils.toString(this.getClass().getResourceAsStream("./record.xml"));
final MetadataRecord record = GenerateNativeStoreSparkJob
.parseRecord(xml, "./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']", "XML", new Provenance("foo", "bar",
"ns_prefix"), System.currentTimeMillis(), null, null);
final MetadataRecord record1 = GenerateNativeStoreSparkJob
.parseRecord(xml, "./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']", "XML", new Provenance("foo", "bar",
"ns_prefix"), System.currentTimeMillis(), null, null);
assert record != null;
record.setBody("ciao");
assert record1 != null;
record1.setBody("mondo");
Assert.assertEquals(record, record1);
}
@Test
public void TestEquals () throws IOException {
final String xml = IOUtils.toString(this.getClass().getResourceAsStream("./record.xml"));
MetadataRecord record = GenerateNativeStoreSparkJob.parseRecord(xml, "./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']", "XML", new Provenance("foo", "bar", "ns_prefix"), System.currentTimeMillis(), null,null);
MetadataRecord record1 = GenerateNativeStoreSparkJob.parseRecord(xml, "./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']", "XML", new Provenance("foo", "bar", "ns_prefix"), System.currentTimeMillis(), null,null);
assert record != null;
record.setBody("ciao");
assert record1 != null;
record1.setBody("mondo");
Assert.assertEquals(record, record1);
}
} }