# Metadata Field-Level Lineage

* [Concepts and Terminology](#concepts-and-terminology)
* [Field Lineage for Algoreus ](#field-lineage-for-algoreus-datafabriq)
* [Field Lineage for Algoreus Nodes](#field-lineage-for-algoreus-datafabriq-nodes)

Algoreus provides a way to retrieve the lineage for data entities. A data entity can have an associated schema. The schema defines different fields in the data entity along with their data type information. Field Level Lineage allows a user to get a more granular lineage view of a data entity. A field lineage for a given data entity shows for the specified time range all the fields that were computed for a data entity and the fields from source entities that participated in the computation of those fields. Field lineage also shows the detail operations that caused the transformation from fields of a source data entity to the field of a given data entity.

***

### Concepts and Terminology <a href="#concepts-and-terminology" id="concepts-and-terminology"></a>

* **Field:** Field identifies a column in a data entity. A field has a name and data type.
* **EndPoint:** EndPoint defines the source or destination of the data along with its namespace from where the fields are read or written to.
* **Field Operation:** Operation defines a single computation on a field. It has a name and description.
* **Read Operation:** Type of operation that reads from the source EndPoint and creates a collection of fields.
* **Transform Operation:** Type of operation that transforms a collection of input fields to a collection of output fields.
* **Write Operation:** Type of operation that writes the collection of fields to the destination EndPoint.
* **Origin:** Origin of the field is the name of the operation that outputted the field. The \<origin, fieldName> pair is used to uniquely identify the field because the field can appear in the outputs of multiple operations.

***

### Field Lineage for Algoreus <a href="#field-lineage-for-algoreus-datafabriq" id="field-lineage-for-algoreus-datafabriq"></a>

```
@Override
public void initialize() throws Exception {
  MapReduceContext context = getContext();
  List<Operation> operations = new ArrayList();

  Operation read = new ReadOperation("Read", "Read passenger information", EndPoint.of("ns", "passengerList"),
                                     "id", "firstName", "lastName", "address");
  operations.add(read);

  Operation concat = new TransformOperation("Concat", "Concatenated fields",
                                            Arrays.asList(InputField.of("Read", "firstName"),
                                            InputField.of("Read", "lastName")), "fullName");
  operations.add(concat);

  Operation normalize = new TransformOperation("Normalize", "Normalized field",
                                               Collections.singletonList(InputField.of("Read", "address")),
                                               "address");
  operations.add(normalize);

  Operation write = new WriteOperation("Write", "Wrote to passenger dataset", EndPoint.of("ns", "passenger"),
                                       Arrays.asList(InputField.of("Read", "id"),
                                                     InputField.of("Concat", "fullName"),
                                                     InputField.of("Normalize", "address")));
  operations.add(write);

  // Record field operation
  context.record(operations);
}
```

***

### Field Lineage for Algoreus Nodes <a href="#field-lineage-for-algoreus-datafabriq-nodes" id="field-lineage-for-algoreus-datafabriq-nodes"></a>

Nodes in Algoreus data axons can also record the field lineage. The capability to record lineage is available in the `prepareRun()` method of the node by using the context provided to the `prepareRun()` method.

```
@Override
public void prepareRun(BatchSourceContext context) throws Exception {
  if (config.getSchema() != null && config.getSchema().getFields() != null) {
    List<Schema.Field> fields = config.getSchema().getFields();
    // Make sure the schema and fields are non null
    FieldOperation operation = new FieldReadOperation("Read", "Read from files",
                                                      EndPoint.of(context.getNamespace(), config.referenceName),
                                                      fields.stream().map(Schema.Field::getName)
                                                        .collect(Collectors.toList()));
    context.record(Collections.singletonList(operation));
  }
}

@Override
public void prepareRun(StageSubmitterContext context) throws Exception {
  FieldOperation operation = new FieldTransformOperation("Concatenate", "Concatenated fields",
                                                         Arrays.asList(config.fieldToConcatenate1,
                                                                       config.fieldToConcatenate2),
                                                         config.newFieldName);
  context.record(Collections.singletonList(operation));
}

@Override
public void prepareRun(StageSubmitterContext context) throws Exception {
  FieldOperation operation = new FieldTransformOperation("Normalize", "Normalized field",
                                                         Collections.singletonList(config.fieldToNormalize),
                                                         config.fieldToConcatenate2);
  context.record(Collections.singletonList(operation));
}

@Override
public void prepareRun(BatchSinkContext context) throws Exception {
  if (schema.getFields() != null) {
    FieldOperation operation = new FieldWriteOperation("Write", "Wrote to Algoreus Table",
                                                       EndPoint.of(context.getNamespace(), "passenger"),
                                                       schema.getFields().stream().map(Schema.Field::getName)
                                                         .collect(Collectors.toList()));
    context.record(Collections.singletonList(operation));
  }
}
```

***
