Anatomy of a Worker
Most workers consist of three components:
- A task definition file that implements task-specific business logic.
- A worker file that defines workflow policy including which NSQ channels to listen to, how to handle errors, and more.
- An app file that allows a worker to be compiled to a standalone application.
Task Definition Files
The task definition file implements functions that the worker will perform on data. This file focuses on a single data-centric operation. For example, the fixity checker calculates fixity on a file in preservation storage. The bag validator validates a bag before ingest. The file restorer restores individual files.
All of these files implement a method called Run()
to do the work. Run()
returns an integer describing the number of tasks processed and a slice of ProcessingError objects describing what, if anything went wrong during processing.
Worker Files
Worker files include a Context object that allows the worker to talk to NSQ, Redis, Registry and S3/Glacier/Wasabi.
The Base Worker defines a number of methods that use the context object to perform tasks common to all workers, such as getting work items from the registry, registering a listener with NSQ, etc. The base worker also handles interrupt and kill signals from the operating system.
Workers also include a Settings object that describes which NSQ topics it should read from and write to, how many times it should attempt its task, etc.
The worker source files tend to be slim because they don't implement much logic. They simply load code from an underlying task file, configure it with custom settings, and wrap it all into the worker framework that can talk to all of the external services (NSQ, Redis, Registry, and S3/Glacier/Wasabi).
For example, if you look at workers/ingest_format_identifier.go, you'll see the code does the following:
- Creates a new Context object providing clients for Redis, Registry, NSQ and S3.
- Creates a settings object telling the worker which NSQ topic to read from, which topic to push to if processing completes successfully, how many times to attempt its task, how to handle fatal errors, how many items to keep in its internal work queue, etc.
- Creates an underlying task handler, which can be any object that implements the
Run()
interface (aka a Runnable). - Creates a worker object with the required context, settings, and Runnable.
- Tells the worker to start listening to NSQ. It will listen to the channel specified in the Settings.
All workers follow this pattern.
Note
The cron jobs apt_queue
, apt_queue_fixity
and ingest_bucket_reader
don't have task files. Because they are designed to run and exit, rather
than listen forever, their logic is implemented directly in the worker
files.
App Files
Preserv's apps directory contains a directory for each worker. Inside each directory is a single file with a main()
method, which is required by Go for building executables. (Go allows only one main method per directory, which is why the app files are all in separate directories.)
The format identifier app shows the pattern for app files, which do the following:
- Read options from the command line. These are optional, but allow us to override the following key settings that typically come in through the environment, the .env file, or AWS's parameter store: a. ChannelBufferSize - This is the number of items the worker should keep in its internal work queue. b. NumWorkers - The number of go routines the worker should run. For example, if this is set to 2, the worker will run 2 concurrent instances of its task handler. (Two concurrent Runnables.) c. MaxAttempts defines the maximum number of non-fatal errors a task can encounter before it quits. Tasks that fail with non-fatal errors are marked as failed in the Registry, with the WorkItem.NeedsAdminReview flag set to true. The APTrust admin can requeue these items later.
- Create a worker with the specified options. Remember that the worker starts listening to its NSQ channel as soon as it is created.
- Sets up a blocking channel for this worker to listen to. Nothing happens in this channel, but because the worker has to listen to it forever, it cannot exit until the operating system kills the worker with a SIGINT or SIGKILL. Without this, the worker would exit immediately. See the Base Worker for info on how signals are handled.
Source Files
Worker | Service | Files | Definition |
---|---|---|---|
Bucket Reader | Ingest | No Task File Worker App |
Cron job that scans depositor receiving buckets for new tar files to ingest. |
Metadata Gatherer | Ingest | Task Worker App |
Parses a bag's tag files, calculates checksums on bag contents, and copies manifests and tag files to the ingest staging bucket. |
Bag Validator | Ingest | Task Worker App |
Validates a bag before ingest. |
Reingest Manager | Ingest | Task Worker App |
Checks if a bag is being reingested and if so, applies special processing. |
Staging Uploader | Ingest | Task Worker App |
Copies files from a tarred bag in a receiving bucket to the ingest staging bucket. |
Format Identifier | Ingest | Task Worker App |
Identifies the format of files within a bag. |
Preservation Uploader | Ingest | Task Worker App |
Copies files to preservation storage. |
Preservation Verifier | Ingest | Task Worker App |
Verifies that files copied to preservation storage are actually there. |
Ingest Recorder | Ingest | Task Worker App |
Records all ingest data in Registry. |
Cleanup | Ingest | Task Worker App |
Cleans up all of the temporary resources created during the ingest process and deletes ingested bags from receiving buckets. |
APT Queue Fixity | Fixity | No Task File Worker App |
Queues files for fixity checks. |
Fixity Checker | Fixity | Task Worker App |
Checks fixity on files in preservation storage. (S3 and Wasabi only. Does not check Glacier files.) |
Glacier Restorer | Restoration | Task Worker App |
Moves files from Glacier into S3 for restoration. |
File Restorer | Restoration | Task Worker App |
Restores individual files. |
Object Restorer | Restoration | Task Worker App |
Restores entire bags (intellectual objects). |
Deletion Manager | Deletion | Task Worker App |
Deletes files and objects from preservation storage. |
APT Queue | Deletion and Restoration | No Task File Worker App |
This cron job periodically scans Registry for restoration and deletion requests that have not been queued in NSQ. |