Hi,
I am running into unexpected behavior where Tensorflow Metal with python multiprocessing does not work with "forking" on M1s. It used to work so on previous MacOS platforms though. It does work with "spawning" but this is not preferred approach for our implementation. Specifically:
- execution of the code in a subprocess continues till it reaches creation of the tf.data.Dataset() where it stops further execution without any error messages (it quits the subprocess and executes the rest of the code in main).
- the above happens when we use forking and provide the code to be executed in the subprocess as either fun() in the main script or a sub-module script imported to the main script.
- the above happens no matter we use the multiprocessing or multiprocess module, the later of which was supposed to fix such behavior on other platforms.
the example code is below. please let me know if you would like to receive any further details
Is it expected behavior or something that needs further investigation?
Software Stack: Mac OS 12.1 tf 2.7 metal 0.3 also tested on tf. 2.8
the Example Code is:
def executor(worker, params):
print("START")
batch = 128
import numpy as np
import pandas as pd
import sys
import tensorflow as tf
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense
url = 'http://archive.ics.uci.edu/ml/machine-learning-databases/auto-mpg/auto-mpg.data'
column_names = ['MPG', 'Displacement', 'Horsepower', 'Weight']
dataset = pd.read_csv(url, names=column_names,
na_values='?', comment='\t',
sep=' ', skipinitialspace=True).dropna()
x_train = np.array(dataset[['Horsepower', 'Weight']]).reshape(-1,2,2)
y_train = np.array(dataset[['MPG','Displacement']]).reshape(-1,2,2)
print(x_train.shape)
print(y_train.shape)
print("BEFORE td.data.Dataset!")
train_data = tf.data.Dataset.from_tensor_slices((x_train, y_train)).cache().shuffle(x_train.shape[0]).batch(batch).repeat().prefetch(tf.data.experimental.AUTOTUNE)
print("WOW!, AFTER td.data.Dataset")
model = tf.keras.models.Sequential()
model.add(tf.keras.Input(shape=(x_train.shape[1],x_train.shape[2])))
model.add(tf.keras.layers.Dense(64, activation='relu'))
model.add(tf.keras.layers.Dense(32, activation='relu'))
model.add(tf.keras.layers.Flatten())
model.add(tf.keras.layers.Dense(2*2))
model.add(tf.keras.layers.Reshape([2,2]))
print(model.summary())
model.compile(optimizer='adam', loss=tf.keras.losses.MeanSquaredError(), run_eagerly=True)
model.fit(train_data, epochs=params["epochs"], steps_per_epoch = 3)
CHECK WITH MULTIPROCESSING
TESTING CONDITIONS
On old MACs 0,0,0 worked
On M1 only 0,1,1 works ?!?!?
package = 0 # 0 multiprocessing, 1 multiprocess method = 1 # 0 Fork, 1 Spawn executor_mode = 1 # 0 from-main, 1 from module
if package == 0:
import multiprocessing as mp
else:
import multiprocess as mp
if method == 0: mp.set_start_method('fork', force=True) else: mp.set_start_method('spawn', force=True)
import sys if 'dataset_in_multiprocessing_issue' in sys.modules: print("yes") del sys.modules["dataset_in_multiprocessing_issue"] import dataset_in_multiprocessing_issue else: print("No") import dataset_in_multiprocessing_issue
params = {"epochs": 300}
if name == 'main':
#def executor(worker, params): -- tested puting executor here for SPAWN
processes = []
for worker in range(mp.cpu_count()-9):
if executor_mode == 0:
p = mp.Process(target=executor, args=(worker,params,))
else:
p = mp.Process(target=dataset_in_multiprocessing_issue.executor, args=(worker,params,))
processes.append(p)
p.start()
for process in processes:
process.join()
print("Fully DONE!")