What are Custom Operators?

UpTrain offers a wide range of built-in operators to help you get started with your training pipeline. However, you may want to create your own custom operator to perform a specific task. This tutorial will walk you through the process of creating a custom operator.

There are a few things that need to be kept in mind while creating an operator:

  1. Operators have two methods: setup and run
    1. setup is called once when the operator is initialized. This is where you can pass in any settings that you need to use in the run method.
    2. run is called for each batch of data that is passed to the operator. This is where you will perform the actual operation on the data. It returns a dictionary with the key output and the value depends on the type of operator. Any extra information can be put in the extra key of the dictionary.
  2. There are two types of operators: TransformOp and ColumnOp.
    1. TransformOp represents an operator that transforms the data into another form.
      • This is used for operations like filtering, cleaning, etc.
      • The value of the output key of the dictionary returned by the run method should be a polars.DataFrame or None.
    2. ColumnOp represents an operator that adds a new column to the data.
      • This is used for operations like adding a new column, renaming a column, etc.
      • The value of the output key of the dictionary returned by the run method should set as the computed table.
  3. The operator should be registered using the register_custom_op decorator.

Examples

Example 1: Cleanup Operator

An Operator that goes through a list of messages and extracts the question, document title, document link, document text, and response from the messages.

from uptrain.operators import TransformOp, register_custom_op

@register_custom_op
class Cleanup(TransformOp):
    def setup(self, settings):
        return self

    def run(self, dataset):
        import json
        import polars as pl

        table_cols = [ 
            "question",
            "document_title",
            "document_link",
            "document_text",
            "response",
        ]
        out = []
        for point in dataset.to_dicts():
            messages = json.loads(point["messages"])
            question = messages[0]["content"].split("The input is: '")[1].split("?")[0]
            name = (
                messages[0]["content"]
                .split("technical documentation titled ")[1]
                .split(", found at")[0]
            )
            link = messages[0]["content"].split("found at ")[1].split(". \n")[0]
            text = (
                messages[0]["content"]
                .split("--- START: Document ---")[1]
                .split(name + "\n")[1]
                .split("\n\n--- END: Document")[0]
            )
            response = messages[1]["content"][1:-1]

            new_row = dict(zip(table_cols, [question, name, link, text, response]))
            out.append(new_row)

        return {"output": pl.from_dicts(out)}

Example 2: AddContext Operator

An Operator that adds the model and pipeline name to the data.

from uptrain.operators import TransformOp, register_custom_op

@register_custom_op
class AddContext(TransformOp):
    def setup(self, settings):
        return self

    def run(self, dataset):
        import polars as pl

        return {
            "output": dataset.with_columns(
                [
                    pl.lit("gpt-4").alias("model"),
                    pl.lit("context_retrieval").alias("pipeline"),
                ]
            )
        }