Skip to content
On this page

a refactor of squirrelnado to resolve issues pinning it to node v15.11.0, and increase unixy reusability of its innards.

Usage:

js
const {read} = require('@ashnazg/squirrelnado'); // https://ashnazg.com/docs/lib/squirrelnado
require('@ashnazg/squirrelnado-psql')(read); // load the optional redshift/postgress driver.

// migrate data from one place to another:
await read(src).write(dst);
await read('postgres://server/db/schema/table').write('table.csv');

// load data to memory:
const json_rows = await read(src);

mutate the data in flight

mutate with .map(transformFunc): any time before a .write(), you can alter rows:

js
const fixed_rows = await read(src).map(row => {
	row.id = 'mysrc-' + row.id;
	return row;
});

Warning: For simple performance, the system is using pass by ref, so if you're consuming the data in multiple places from the same memory-copy, you'll see cross-contamination. You can either:

  1. do a shallow fork: return {...row}
  2. persist the common starting point to disk first, and run both consumers off of that

filtering out rows

.map() can drop rows as well by returning an empty list:

js
const fixed_rows = await read(src).map(row => {
	if (row.bad) return []; // use the 'multiple rows response' format to tell sqrl that none are needed.
	// I like this row, alter it and send it on:
	row.id = 'mysrc-' + row.id;
	return row;
});

I used to allow undefined instead of [], but I accidentally forgot to return row in so many trivial use cases that the library now throws an error when the app code does that.

returning more rows

If you return an array of rows from .map(), those will be streamed to the next step. (dropping, 1:1, and 1:many can all happen in the same step.)

js
const fixed_rows = await read(src).map(row => {
	if (row.bad) return;
	if (row.children) {
		return [
			{id: row.id + 'child-1', parent: row.id},
			{id: row.id + 'child-2', parent: row.id}
		];
	}
	row.id = 'mysrc-' + row.id;
	return row;
});

visiting rows without using up memory

If you want to iterate over a terabyte of data, but have no reason to write a stream of rows anywhere, you probably don't want the final .map() step resolving to all the input-rows. .forEach(visitor) is shorthand for .map(async row => {await visitor(row); return [];}), so await read('data').forEach(row => {...}) resolves to a harmless empty array.

visiting rows asynchronously

While promises are always in play at the per-file and per-sequence level, async functions at the row level drastically slower than old fashioned callbacks -- 20x slower than disk I/O in one 7gig job I've done.

While I've got a plan to build async-friendliness into .map later, for now, no.

DB control flags

js
read('foo.csv').write('mysql://server/db/schema/table?existing=truncate');
// which is the same as:
read('foo.csv').write('mysql://server', {existing: 'truncate', table:'table', schema:'schema', database:'db'});

DB table schemas

The above sample has to autodetect the columns in foo.csv in order to create INSERT foo (col1, col2) VALUES (...) statements -- it'll make guesses based on the first row, which can't work if there are nulls, or the first row has less than 100% of the fields in the whole set.

There are two ways to do this right:

  1. if the source system is a real DB (and the driver's parsing the metadata) and there are no .map() steps, the same column traits (type, length, nullability) will propagate.
  2. For the other 99% of the time, the write step needs a columns definition:
js
read('foo.csv').write('mysql://server/db/schema/table?existing=truncate', {columns: {
	src_id: 'INT NOT NULL',
	name: 'VARCHAR(32) NULL'
}});

.mapColumns({dst_column: 'src_column'}) is a specialized version of .map() that propagates original traits, if present.

Since mapColumns tosses out any fields not mentioned, sometimes .renameColumns('src_col', 'dst_col') is more convenient, since it leaves other columns alone. .deleteColumn('src') is the same as .renameColumns('src', null).

these three treat SINGLE param mode func('single,csv,string') the same as func('var', 'args', '...')

  1. .renameColumns('from1', 'to1', 'from2', 'to2') takes an even number of strings and an optional config object.
  2. .dropColumns('unwanted1', 'unwanted2') is built with the same flavor.
  3. .keepColumns('field1,field2') is like mapColumns with no renaming, just include/exclude behavior.

keepColumns also accepts a ColSpec and just ignores everything but the map keys:

columns = {a: 'INT', b: 'REAL'};
step.keepColumns(columns).write('db://', {columns});

existing

valuebehavior
undefinedcheck if table exists, run DDL if not (if the driver supports CREATE IF NOT EXISTS, this is just a prelude on the first batch of rows instead of a separate query.)
trueassume table exists, don't waste a trip to the server checking
'truncate'remove existing rows
'replace'uses 'CREATE OR REPLACE' (or 'DROP' if the driver doesn't support OR REPLACE.)

errors/stats

To make CLI and live-service error management more symmetrical, it uses @ashnazg/pubsub as an internal and app-facing message bus.

prefiguring endpoint details

While the basic plan is to use URLs for most src/dst use cases, separating out passwords is important for leak prevention, and you can use that functionality for other connection settings.

The optional config object that read() and write() take as a 2nd param allows you to selectively separate dynamic or high risk configs:

js
let {ENDPOINT = 'mysql://server/db/schema/table', PASSWORD} = process.env;
if (!PASSWORD) throw new Error('set PASSWORD');
await read('/tmp/tickets/page-*.csv').write(ENDPOINT, {pass: PASSWORD});

Instead of providing configs on each read/write, you can set defaults in the servers{} map:

js
const {read, servers} = require('@ashnazg/squirrelnado2'); // https://ashnazg.com/docs/lib/squirrelnado
servers.mytickets = {
	url: 'https://tickets.ashnazg.com/api/tickets',
	user: 'app',
	pass: process.env.APP_PASS
};
servers['etl@upstream'] = {
	pass: process.env.APP_PASS
};
servers.report_server = {
	protocol: 'mysql',
	host: 'reports.ashnazg.com',
	user: 'auditor',
	pass: process.env.AUDITOR_PASS,
	database: 'bi',
	schema: 'daily',
	table: 'open_tickets',
	columns: {
		id: 'INT NOT NULL',
		src_id: 'VARCHAR(16) NULL'
	},
	existing: 'truncate'
};

You can exercise the above defaults by using either the bare nickname (like report_server) as the URL, or by using it as the server name in a full URL: mysql://report_server/db1/schema_foo/table_bar.

js
// the user+hostname+port given in the URL must exactly match the above; it's never going to look up 'upstream' in the following use case:
let record_count = 0; // bad example in that just SQL'ing count(*) would be better, but pretend like we're doing real work here...
await read('etl@upstream').forEach(record => {
	++record_count;
});

let last_id = 0;
await read('tickets.csv').map(ticket => {
	return {
		id: ++last_id,
		src_id: 'ticket-' + ticket.id,
		meta: "this field is visible to later .map steps, but is not written to mysql as columns{} didn't include it"
	};
}).write('report_server');

// as long as the URL has a protocol://, you can give read/write more than just the nickname:
// this will let everything default (the empty slashes are how you can skip the DB/SCHEMA parts) except the table name and column listing.
...write('mysql://report_server///other_table', {columns: {name: 'VARCHAR(32) NOT NULL'}});

redacting passwords and authorization headers

To mitigate the risk when you have the password in the URL, the "preserved" URL stored in the sequence's config has the password redacted, as the URL ends up in the logs. (This is also why step.server.pass is a non-enumerable, and not a plain literal password.) If you have secrets elsewhere, like https://server/?auth_token=foo, it's up to you to redact the URL. You can alter or delete it without affecting this tool, as the URL is only kept there for debugging convenience.

While src/dst URLs can cover most use cases

misc utils

installing node CLI error traps

sqrl.install

internals

  1. load all plugins for dependency injection
  2. parse URLs and confs into coarse steps
  3. split coarse into true steps
  4. when launched, create streams for each step, link them together, and run them
  5. collect results if in RAM mode

parse URL

const parseURL = require('@ashnazg/squirrelnado2/parse-url'); // ~/projects/ashnazg-npm/squirrelnado/newcore/parse-url.js

JavaScript/Bash source released under the MIT License.