AI for Apache Mesos with Tensorflow

17. February 2025 Andreas Peters

With my TensorFlow framework for Apache Mesos, it becomes possible to run TensorFlow processes in parallel on servers within a Mesos cluster.

What is TensorFlow?

TensorFlow is an open-source machine learning framework developed by Google. It provides a flexible platform for creating, training, and deploying machine learning and deep learning models. TensorFlow supports both CPU and GPU acceleration and can run on various platforms.

The framework is based on the concept of data flow graphs, where mathematical operations are represented as nodes and data as tensors. This architecture enables efficient computation of complex models, especially neural networks. TensorFlow also includes Keras, a high-level API that simplifies model creation.

TensorFlow applications range from image recognition and natural language processing to medical diagnostics and autonomous driving.

Due to its scalability, community support, and integration with other technologies, TensorFlow is one of the leading tools in artificial intelligence and machine learning.

What is Apache Mesos?

Apache Mesos is an open-source cluster manager that efficiently manages and distributes computing resources such as CPU, GPU, memory, and storage. Originally developed at the University of California, Berkeley, it is optimized for large-scale distributed systems.

Mesos enables the execution of various workloads, including container orchestration with mesos-compose or Kubernetes (mesos-m3s), as well as big data applications like Apache Spark and Hadoop. It uses a two-tier scheduling approach, allowing flexible resource allocation.

Thanks to its scalability and fault tolerance, Mesos is often used in cloud and data center environments to optimize resource utilization. Benefits of TensorFlow in an Apache Mesos Cluster

The combination of TensorFlow with Apache Mesos provides a powerful, scalable solution for distributed machine learning. Mesos efficiently manages resources and enables the dynamic allocation of CPU, RAM, and GPUs, ensuring optimal execution of TensorFlow workloads.

Due to the scalability of Mesos, large deep learning models can be trained in parallel on multiple nodes, significantly reducing training time. Additionally, Mesos’ fault tolerance improves the reliability of TensorFlow jobs, as failed nodes are automatically replaced.

This combination is particularly suitable for companies that want to train and deploy AI models on a large scale.

Example

Below is an example of image categorization:

from __future__ import print_function

import json
import os
import tensorflow as tf
import pathlib
import matplotlib.pyplot as plt

from tensorflow import keras
from tensorflow.keras import layers
from tfmesos2 import cluster

# MESOS-Umgebungsvariablen setzen

os.environ["MESOS_MASTER"] = "<mesos_manager>:5050"
os.environ["MESOS_SSL"] = "true"
os.environ["MESOS_USERNAME"] = "<mesos_username>"
os.environ["MESOS_PASSWORD"] = "<mesos_password>"

extra_kw = {}
extra_kw['fetcher'] = {"http://192.168.150.81:11000/v0/download/flower_photos.tgz": "true"}

data_dir = pathlib.Path("/mnt/mesos/sandbox/flower_photos/")

# Bildverarbeitung und Modelltraining
def create_model(input_shape, num_classes):
    model = keras.Sequential([
        layers.Rescaling(1./255, input_shape=input_shape),
        layers.Conv2D(32, (3, 3), activation='relu'),
        layers.MaxPooling2D(),
        layers.Conv2D(64, (3, 3), activation='relu'),
        layers.MaxPooling2D(),
        layers.Flatten(),
        layers.Dense(128, activation='relu'),
        layers.Dense(num_classes, activation='softmax')
    ])

    model.compile(optimizer='adam',
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])

    return model

def main():
    jobs_def = [
        {"name": "ps", "num": 1},
        {"name": "worker", "num": 2},
    ]

    client_ip = "192.168.150.81"

    with cluster(jobs_def, client_ip=client_ip, **extra_kw) as c:
        os.environ["TF_CONFIG"] = json.dumps({"cluster": c.cluster_def})
        print(os.environ["TF_CONFIG"])

        cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
        strategy = tf.distribute.ParameterServerStrategy(cluster_resolver)

        batch_size = 1
        img_size = (180, 180)
        num_classes = 5

        train_ds = keras.utils.image_dataset_from_directory(
            data_dir,
            image_size=img_size,
            batch_size=batch_size,
            validation_split=0.2,
            subset="training",
            seed=123,
            shuffle=True)

        val_ds = keras.utils.image_dataset_from_directory(
            data_dir,
            image_size=img_size,
            batch_size=batch_size,
            validation_split=0.2,
            subset="validation",
            seed=123,
            shuffle=True)

        steps_per_epoch = tf.data.experimental.cardinality(train_ds).numpy()
        validation_steps = tf.data.experimental.cardinality(val_ds).numpy()

        print(f"Train batches: {steps_per_epoch}, Val batches: {validation_steps}")


        train_ds = strategy.experimental_distribute_dataset(train_ds)
        val_ds = strategy.experimental_distribute_dataset(val_ds)

        with strategy.scope():
            model = create_model(input_shape=(180, 180, 3), num_classes=num_classes)

        model.summary()

        model.fit(train_ds, validation_data=val_ds, epochs=1, steps_per_epoch=steps_per_epoch, validation_steps=validation_steps)
        print(">>>>>>Modelltraining abgeschlossen<<<<<<<<")

        model.save("saved_models/flower_model.keras")
        print(">>>>>>Modell gespeichert.<<<<<<<<")

if __name__ == '__main__':
    main()