GPU Tasks

GPU tasks are specialized for managing and executing computationally intensive operations using Graphics Processing Units (GPUs). These tasks are optimized to efficiently handle large-scale data processing, particularly during model training and inference, where high-performance computing is critical.

Purpose

The GPUTaskInitializer class extends the QueueTaskInitializer to provide the necessary framework for managing GPU-accelerated tasks. It handles the dynamic loading and unloading of GPU models, manages worker processes, and ensures that tasks are processed efficiently using GPU resources.

Customization

To create a custom GPU-based task, subclass GPUTaskInitializer and implement the enqueue, process, and

store_entry methods. These methods define the logic for enqueuing tasks, processing them using GPUs, and storing the results in the database.

Key Features

  • Efficient Resource Management: Dynamically load and unload GPU models to maximize resource utilization.

  • Scalable Task Execution: Handle large batches of data, optimizing throughput during model training and inference.

  • Seamless Integration: Fully integrates with RabbitMQ for distributed task management, ensuring reliable and scalable task execution.

  • Worker Management: Manages GPU-specific worker processes to handle tasks in parallel.

  • Extensibility: Abstract methods are provided to allow developers to define custom GPU task logic.

Example Usage

Here is an example of how to subclass GPUTaskInitializer:

from protein_information_system.tasks.gpu import GPUTaskInitializer

class MyCustomGPUTask(GPUTaskInitializer):
    def enqueue(self):
        # Implementation of the task enqueuing logic for GPU processing
        pass

    def process(self, target):
        # Processing logic for the target data using GPU
        pass

    def store_entry(self, record):
        # Logic to store the processed record in the database
        pass
class protein_information_system.tasks.gpu.GPUTaskInitializer(conf, session_required=True)

Bases: QueueTaskInitializer

The GPUTaskInitializer class extends QueueTaskInitializer to manage tasks that are specifically designed for GPU-based processing.

This class provides the necessary infrastructure for setting up RabbitMQ queues, coordinating GPU-specific worker processes, and ensuring that tasks are efficiently processed using GPUs.

stop_event

An event to signal workers and threads to stop.

Type:

multiprocessing.Event

model_instances

Dictionary storing loaded models for each type.

Type:

dict

tokenizer_instances

Dictionary storing loaded tokenizers for each type.

Type:

dict

cleanup()

Clean up resources and stop worker processes.

This method ensures that all worker processes and threads are properly terminated.

abstract enqueue()

Abstract method to enqueue tasks. Must be overridden by subclasses.

load_model(model_type)

Load the GPU model into memory.

This method loads the specified model and its tokenizer into memory for processing tasks.

Parameters:

model_type (str) – The type of model to load.

abstract process(target)

Abstract method to process tasks. Must be overridden by subclasses.

publish_task(batch_data, model_type)

Publish a task to the GPU processing queue.

This method serializes the task data and publishes it to the appropriate queue for the specified model type.

Parameters:
  • batch_data (any) – The task data to be processed.

  • model_type (str) – The type of model for which the task is intended.

run_processor_worker_sequential(model_type)

Run the processor worker sequentially for a specific GPU model type.

This method manages the loading and unloading of models for each task in the GPU processing pipeline, ensuring efficient GPU usage.

Parameters:

model_type (str) – The type of GPU model to be used for processing.

setup_rabbitmq()

Set up RabbitMQ by declaring the necessary queues for GPU tasks.

This method connects to RabbitMQ using the provided credentials and declares the necessary queues for each GPU model type, as well as the queue for data insertion.

Raises:

Exception – If there is an issue setting up RabbitMQ.

start_workers()

Start the worker processes for GPU task processing and database insertion.

This method spawns worker processes to handle GPU-based task processing and inserts the processed data into the database. It also starts a monitoring thread to oversee the queues.

The method ensures that models are loaded and unloaded as needed to optimize GPU usage.

abstract store_entry(record)

Abstract method to store processed entries. Must be overridden by subclasses.

unload_model(model_type)

Unload the GPU model from memory.

This method removes the specified model and its tokenizer from memory.

Parameters:

model_type (str) – The type of model to unload.