Configuration¶
Druzhba pipelines are defined in YAML files, located in the directory given by
the DRUZHBA_CONFIG_DIR, laid out like:
/<DRUZHBA_CONFIG_DIR>
- _pipeline.yaml # defines tables in this pipeline
- pgtest.yaml
- mysqltest.yaml
- etc
Pipeline Configuration¶
An example can be found in the examples.
The top level _pipeline.yaml defines details about the pipeline as a whole
as well as connection info for the target database. We use a modified parser for
YAML extended with support for interpolation like ${REDSHIFT_URL} which will
inject the value of the REDSHIFT_URL environment variable.
Besides the target configuration fields, _pipeline.yaml also includes a list
of source databases under the sources element.
connection: <see below>
sources:
- alias: pgtest
type: postgres
- alias: mysqltest
type: mysql
enabled: false
These aliases point to other files in the same directory, e.g. pgtest.yaml.
For an entry in sources:
alias: an arbitrary name used to reference a single source databsaetype: indicates which database driver Druzhba should use.postgres,mysql, ormssql.enabled: if false, indicates that while this database is configured, a Druzhba run should not include it by default. It may still be requested explicitly by passing its alias to--database.
Supported fields for the connection to the target database (currently only Redshift):
connection– options of the target database connection.url– if provided, will be parsed into a user/password/host/port/database instead of requiring the separate items belowuserpasswordhostportdatabase
index: defines where Druzhba will create or find the tracking table in the target databaseschematable
s3: used to write data files to S3 and theCOPYinto Redshiftbucketprefix
iam_copy_role: IAM role used in the copy operation. Only IAM authorization is supported.redshift_cert_path: path to an SSL cert file. Druzhba considers this parameter optional but it may be necessary depending on your Redshift configuration.
Database Configuration¶
<db_alias>.yaml defines the configuration for a specific source database.
Other top-level keys in this file are:
tables: a list of tables to pull. Usually the majority of this file. See below.data: an arbitrary dictionary of keys and value which will be formatted into SQL queries under the key db, for manual pipelines.(SQL Server only) Within this object, the key object_schema_name is also used in the database connection.
connection_string: an explicit connection URI likeprotocol://user:pass@host:post/database_namein most cases.connection_string_env: an alternate environment variable for this database’s connection_string.
If neither connection_string nor connection_string_env is provided, the
environment variable <DB_ALIAS>_DATABASE_URL is assumed.
Table Configuration¶
The YAML file has several configurable settings for each table.
source_name: table name in source database. Required even ifquery_fileis used.
Options configuring the creation of the target table for automatic tables.
destination_name: desired table name in target databasedestination_schema: schema in target database. This schema must already exist.distribution_key: Optional. A single column to be used as the tabledistkey. It should be unique or mostly unique. Good examples are primary key IDs and high-resolution timestamps. You can read more about distkeys in the AWS docs. Note: If nodistribution_keyis specified, the first Primary Key will be selected by default.sort_keys: Optional. Zero or more columns that define the sortkey. You can read more about sortkeys in the AWS docs as well.
Options pertaining to the table’s incremental update behavior:
index_column: Optional. The column to use for determining which rows are dumped from the source DB. Ideally this is a timestamp (updated_atif rows can be edited,created_atif the table is append-only) or a sequential numeric ID. In any case, this will go into a where clause likeWHERE col > n AND col <= m. Please note that if there is no index on this column in the source database, this could affect performance.nis pulled from"public"."pipeline_table_index", andmis pulled from the source database (SELECT MAX(index_column) FROM source_table;) before data export. NOTE: If noindex_columnis specified, the entire table will be dumped (refreshed) on each run of the pipeline.index_sql: Optional (alternative toindex_column). A SQL query that should return a single row with column calledindex_value. Jinja templating is supported.primary_key: Optional. Column name or list of column names to specify as primary keys, if they cannot be inferred from the source table. When loading data, the primary_key will be used to replace existing rows instead of inserting new ones. Required for incremental updates based on aquery_file.full_refresh: Optional. Deletes the entire table prior to loading extracted data. Not compatible with index_column, index_sql, or append_only.append_only: Optional. Simplifies load side by skipping deletes entirely. Requires index_column or index_sql. Incompatible with full_refresh.
Options defining a “manual” table rather than “managed” one.
query_file: Optional. Local path to a file containing valid query SQL. If this is provided, only this query will be used to pull from the source database, no SQL will be auto generated and no datatypes will be inferred.schema_file: Optional. Local path to a file containing validCREATED TABLESQL. If this is provided, only this query will be used to create the destination table in the Data Warehouse. Without this, the table schema will be generated from the contents of the query. Not compatible with the--rebuildcommand line argument.
Column-specific configuration:
columns_to_drop: Optional. This is a list of columns that exist in the source table that should not exist in the warehoused table. This is the only transformation that currently happens to data.type_map: Optional. Overrides type conversion from the source DB to Redshift. This is especially useful for types not supported in Redshift like Enums and Arrays. Ex:
type_map:
- your_column_name: smallint
- other_column_name: varchar(70)
Other configuration options:
truncate_file: Optional. If using aquery_file(below), this is required to define deletes from the destination table for a--full-refresh. This option is useful for many-to-one pipelines, to only delete records in the target that come from the current source. Not compatible with the--rebuildcommand line argument.not_null_date: Optional. If the source table has aNOT NULLconstraint specified on date/datetime/timestamp columns that do, in fact, haveNULLvalues (or equivalent, a la0000-00-00), this option can be used to convert these todatetime.datetime.mininstead.data: Optional. Object which will be formatted into Jinja templates under the keytable.
Templating¶
Custom SQL files can use Jinja2 templating. Three variables are defined:
dbgets data from thedatablock of the database yaml filetablegets data from thedatablock of the table configurationruncontains automatically set run metadata with fields:destination_schema_namedestination_table_namedb_namesource_table_nameold_index_valuenew_index_value
In particular run.old_index_value and run.new_index_value are useful for
building custom incremental update logic.
Monitoring¶
Monitoring can be provided through several options. Logging verbosity is
controlled through either the --log-level command line option or the
LOG_LEVEL environment variable. Additionally Sentry (Raven) and StatsD are
supported out of the box and configured through environment variables. Other
monitoring options are available by writing a Python wrapper to invoke the
Druzhba engine rather than running the application.
Sentry¶
Sentry monitoring may be enabled by setting the SENTRY_DSN environment
variable. If the SENTRY_DSN environment variable is set, warnings and errors
will be posted to the requested DSN.
The SENTRY_ENVIRONMENT and SENTRY_RELEASE environment variables will be
passed to Sentry as well and have the effect described in the Sentry
documentation.
StatsD¶
Druzhba can send several telemetry data points to StatsD if configured. These
include total pipeline duration, individual source database durations and
numbers of rows updated per table. To enable the StatsD integration set
STATSD_HOST and STATSD_PORT environment variables to set where you would
like Druzhba to send its telemetry. Druzhba also supports an optional
STATSD_PREFIX that will be prepended to the event names Druzhba sends by
default.
Extensible custom monitoring¶
If you would like to use another monitoring provider you may do so by running
your own Python process, extending the MonitoringProvider class through the
as-of-yet undocumented (sorry) monitoring API, assigning it over
main.monitor and calling run manually.
This interface will be cleaned up in a future release.
Usage Considerations¶
Index column filters should be fast¶
Druzhba pulls incrementally according to the value of the index_column given
in a table’s configuration, and then inserts-or-replaces new or updated rows
according to an optional primary_key. On the first run (or if --rebuild
is given) Druzhba will create the target table. After that, it will use a SQL
filter on index_column to only pull newly updated rows.
Consequently, queries against index_column need to be fast! Usually, unless
a table is append_only, an updated_at timestamp column is used to for
index_column - it is usually necessary to create a database index
(unfortunate name collision!) on this column to make these pulls faster, which
will slow down writes a little bit.
State management¶
Druzhba currently tracks pipeline state by the source database, database_alias, and table. Consequently, it supports many-to-one pipelines from e.g. multiple copies of the same source database to a single shared target table. But it does not support one-to-many pipelines, because it could not distinguish the state of the different pipelines. SQL-based pipelines currently need to define a source_table_name which is used to track their state.
Manual vs Managed¶
A specific target table may be:
“managed”, meaning Druzhba handles the creation of the target table (inferred from datatypes on the source table) and the generation of the source-side query.
“manual” - SQL queries are provided to read from the source (not necessarily from one table) and to create the target table (rather than inferring its schema from the source table).
Manual table creation is not currently supported for SQL Server.