Skip to content

Streaming Data Directly Into the Training Process Using Native Iterators

Using native iterators to stream data is an alternative to cloning data to the training machines using the MissingLink CLI.

With the streaming method, data is streamed to the training machine during training and is cached locally. The training starts immediately: this could mean the difference between waiting hours for local files to be processed and an experiment running instantly as only the data it needs is loaded. Even better, the requested data is cached locally - a factor that can considerably speed up the time it takes to rerun an experiment.


The following demonstrates an example of a simple implementation using Keras.

To start out, import and configure MissingLink. Then set a variable for the data volume you want to use and save the query you want to run. Notice that the query contains the version number at the end. You can use the same query against different versions as you run concurrent experiments.

import missinglink
import os

DATA_VOLUME_ID = 4545113384550400

# train validate test
QUERY = '@split:0.6:0.2:0.2 @sample:0.01 @version:fc3b8ef88ca2eda84371b71f7bea4dd5f42c8e92'

Up next, create the callback itself:

# Returns a vector

missinglink_callback = missinglink.KerasCallback(project=6697485175095296)

Note how a batch size was defined for use later. Then the project ID was passed into the KerasCallback so that the project does not need to be selected from the terminal window each time you run the experiment.

Once everything is configured, create a function to process deserializing the data returns by the iterator:

def deserialization_callback(file_names, metadatas):

    for fname in file_names:
        print("file: {} size: {}".format(fname, os.path.getsize(fname)))

    for data in metadatas:

    # data point and expected class -> vectoring
    # for example, using a string but you can do any pre-processing step here and return the actual image
    return fname, 0

For this example, the file name and size are printed directly. Also, some of the associated metadata for the image is retrieved. If this were a real callback, it would process the data and get it ready for the model.

The last part of the code generates and executes the iterators:

# When setting the drop_last flag to False, all the batches 
# will have the same size and if the last one is smaller, 
# than the standard batch size, it is filled with nulls; 
# when the flag is set to True, the batch is not filled 
# and instead a smaller batch which fits its data length is returned.
data_generator = missinglink_callback.bind_data_generator(
    DATA_VOLUME_ID, QUERY, deserialization_callback, batch_size=BATCH_SIZE, drop_last=True

train_generator, test_generator, validation_generator = data_generator.flow()

# loop through this to trigger the deserialization callback.
for item in train_generator:
    # loop through this to get examples to train the neural network

As you can see, the data generator is defined then train, test, and validate generators are created from the query described earlier. After you have the iterators, loop through them and print the item. For the purposes or the example, the code breaks after the first loop but could continue indefinitely.

In just a few lines of code, you are now able to stream any query directly to the model and remove the bottleneck of managing the data locally. MissingLink will now manage to load each data point and its corresponding metadata, which in turn significantly speeds up the start time of your experiments.