Metadata Field-Level Lineage

Algoreus provides a way to retrieve the lineage for data entities. A data entity can have an associated schema. The schema defines different fields in the data entity along with their data type information. Field Level Lineage allows a user to get a more granular lineage view of a data entity. A field lineage for a given data entity shows for the specified time range all the fields that were computed for a data entity and the fields from source entities that participated in the computation of those fields. Field lineage also shows the detail operations that caused the transformation from fields of a source data entity to the field of a given data entity.


Concepts and Terminology

  • Field: Field identifies a column in a data entity. A field has a name and data type.

  • EndPoint: EndPoint defines the source or destination of the data along with its namespace from where the fields are read or written to.

  • Field Operation: Operation defines a single computation on a field. It has a name and description.

  • Read Operation: Type of operation that reads from the source EndPoint and creates a collection of fields.

  • Transform Operation: Type of operation that transforms a collection of input fields to a collection of output fields.

  • Write Operation: Type of operation that writes the collection of fields to the destination EndPoint.

  • Origin: Origin of the field is the name of the operation that outputted the field. The <origin, fieldName> pair is used to uniquely identify the field because the field can appear in the outputs of multiple operations.


Field Lineage for Algoreus

@Override
public void initialize() throws Exception {
  MapReduceContext context = getContext();
  List<Operation> operations = new ArrayList();

  Operation read = new ReadOperation("Read", "Read passenger information", EndPoint.of("ns", "passengerList"),
                                     "id", "firstName", "lastName", "address");
  operations.add(read);

  Operation concat = new TransformOperation("Concat", "Concatenated fields",
                                            Arrays.asList(InputField.of("Read", "firstName"),
                                            InputField.of("Read", "lastName")), "fullName");
  operations.add(concat);

  Operation normalize = new TransformOperation("Normalize", "Normalized field",
                                               Collections.singletonList(InputField.of("Read", "address")),
                                               "address");
  operations.add(normalize);

  Operation write = new WriteOperation("Write", "Wrote to passenger dataset", EndPoint.of("ns", "passenger"),
                                       Arrays.asList(InputField.of("Read", "id"),
                                                     InputField.of("Concat", "fullName"),
                                                     InputField.of("Normalize", "address")));
  operations.add(write);

  // Record field operation
  context.record(operations);
}

Field Lineage for Algoreus Nodes

Nodes in Algoreus data axons can also record the field lineage. The capability to record lineage is available in the prepareRun() method of the node by using the context provided to the prepareRun() method.

@Override
public void prepareRun(BatchSourceContext context) throws Exception {
  if (config.getSchema() != null && config.getSchema().getFields() != null) {
    List<Schema.Field> fields = config.getSchema().getFields();
    // Make sure the schema and fields are non null
    FieldOperation operation = new FieldReadOperation("Read", "Read from files",
                                                      EndPoint.of(context.getNamespace(), config.referenceName),
                                                      fields.stream().map(Schema.Field::getName)
                                                        .collect(Collectors.toList()));
    context.record(Collections.singletonList(operation));
  }
}

@Override
public void prepareRun(StageSubmitterContext context) throws Exception {
  FieldOperation operation = new FieldTransformOperation("Concatenate", "Concatenated fields",
                                                         Arrays.asList(config.fieldToConcatenate1,
                                                                       config.fieldToConcatenate2),
                                                         config.newFieldName);
  context.record(Collections.singletonList(operation));
}

@Override
public void prepareRun(StageSubmitterContext context) throws Exception {
  FieldOperation operation = new FieldTransformOperation("Normalize", "Normalized field",
                                                         Collections.singletonList(config.fieldToNormalize),
                                                         config.fieldToConcatenate2);
  context.record(Collections.singletonList(operation));
}

@Override
public void prepareRun(BatchSinkContext context) throws Exception {
  if (schema.getFields() != null) {
    FieldOperation operation = new FieldWriteOperation("Write", "Wrote to Algoreus Table",
                                                       EndPoint.of(context.getNamespace(), "passenger"),
                                                       schema.getFields().stream().map(Schema.Field::getName)
                                                         .collect(Collectors.toList()));
    context.record(Collections.singletonList(operation));
  }
}

Last updated