diff --git a/tensorflow_federated/python/core/impl/executors/cpp_to_python_executor.py b/tensorflow_federated/python/core/impl/executors/cpp_to_python_executor.py index 800fb0e6e1..287eb8215a 100644 --- a/tensorflow_federated/python/core/impl/executors/cpp_to_python_executor.py +++ b/tensorflow_federated/python/core/impl/executors/cpp_to_python_executor.py @@ -59,8 +59,7 @@ def type_signature(self) -> computation_types.Type: return self._type_signature @property - def ref(self) -> int: - """Hands out a reference to self without transferring ownership.""" + def reference(self) -> int: return self._owned_value_id.ref @tracing.trace @@ -119,9 +118,9 @@ async def create_call( fn: CppToPythonExecutorValue, arg: Optional[CppToPythonExecutorValue] = None ) -> CppToPythonExecutorValue: - fn_ref = fn.ref + fn_ref = fn.reference if arg is not None: - arg_ref = arg.ref + arg_ref = arg.reference else: arg_ref = None try: @@ -139,7 +138,7 @@ async def create_struct( id_list = [] type_list = [] for name, value in structure.iter_elements(executor_value_struct): - id_list.append(value.ref) + id_list.append(value.reference) type_list.append((name, value.type_signature)) try: struct_id = self._cpp_executor.create_struct(id_list) @@ -153,7 +152,9 @@ async def create_struct( async def create_selection(self, source: CppToPythonExecutorValue, index: int) -> CppToPythonExecutorValue: try: - selection_id = self._cpp_executor.create_selection(source.ref, index) + selection_id = self._cpp_executor.create_selection( + source.reference, index + ) except Exception as e: # pylint: disable=broad-except _handle_error(e) selection_type = source.type_signature[index] diff --git a/tensorflow_federated/python/core/impl/executors/eager_tf_executor.py b/tensorflow_federated/python/core/impl/executors/eager_tf_executor.py index 69070538f5..3f860d3dc5 100644 --- a/tensorflow_federated/python/core/impl/executors/eager_tf_executor.py +++ b/tensorflow_federated/python/core/impl/executors/eager_tf_executor.py @@ -536,7 +536,7 @@ def to_representation_for_type( return to_representation_for_type( value, tf_function_cache, type_spec=type_spec, device=None) elif isinstance(value, EagerValue): - return value.internal_representation + return value.reference elif isinstance(value, executor_value_base.ExecutorValue): raise TypeError( 'Cannot accept a value embedded within a non-eager executor.') @@ -568,12 +568,7 @@ def __init__(self, value, type_spec): self._value = value @property - def internal_representation(self): - """Returns a representation of the eager value embedded in the executor. - - This property is only intended for use by the eager executor and tests. Not - for consumption by consumers of the executor interface. - """ + def reference(self): return self._value @property @@ -619,8 +614,8 @@ class EagerTFExecutor(executor_base.Executor): One further implementation detail is worth noting. Like all executors, this executor embeds incoming data as an instance of an executor-specific class, here the `EagerValue`. All `EagerValues` are assumed in this implmentation - to have an `internal_representation` which is a fixed point under the action - of `to_representation_for_type` with type the `type_signature` attribute of + to have an `reference` which is a fixed point under the action of + `to_representation_for_type` with type the `type_signature` attribute of the `EagerValue`. This invariant is introduced by normalization in `create_value`, and is respected by the form of returned `EagerValues` in all other methods this executor exposes. @@ -711,11 +706,10 @@ async def create_call(self, comp, arg=None): comp.type_signature)) if comp.type_signature.parameter is not None: return EagerValue( - comp.internal_representation(arg.internal_representation), - comp.type_signature.result) + comp.reference(arg.reference), comp.type_signature.result + ) elif arg is None: - return EagerValue(comp.internal_representation(), - comp.type_signature.result) + return EagerValue(comp.reference(), comp.type_signature.result) else: raise TypeError('Cannot pass an argument to a no-argument function.') @@ -734,7 +728,7 @@ async def create_struct(self, elements): type_elements = [] for k, v in elements: py_typecheck.check_type(v, EagerValue) - val_elements.append((k, v.internal_representation)) + val_elements.append((k, v.reference)) type_elements.append((k, v.type_signature)) return EagerValue( structure.Struct(val_elements), @@ -759,10 +753,9 @@ async def create_selection(self, source, index): """ py_typecheck.check_type(source, EagerValue) py_typecheck.check_type(source.type_signature, computation_types.StructType) - py_typecheck.check_type(source.internal_representation, structure.Struct) + py_typecheck.check_type(source.reference, structure.Struct) py_typecheck.check_type(index, int) - return EagerValue(source.internal_representation[index], - source.type_signature[index]) + return EagerValue(source.reference[index], source.type_signature[index]) def close(self): pass diff --git a/tensorflow_federated/python/core/impl/executors/eager_tf_executor_test.py b/tensorflow_federated/python/core/impl/executors/eager_tf_executor_test.py index 3c6560815b..76d105191e 100644 --- a/tensorflow_federated/python/core/impl/executors/eager_tf_executor_test.py +++ b/tensorflow_federated/python/core/impl/executors/eager_tf_executor_test.py @@ -409,8 +409,8 @@ def test_eager_value_constructor_with_int_constant(self): 10, {}, int_tensor_type) v = eager_tf_executor.EagerValue(normalized_value, int_tensor_type) self.assertEqual(str(v.type_signature), 'int32') - self.assertIsInstance(v.internal_representation, tf.Tensor) - self.assertEqual(v.internal_representation, 10) + self.assertIsInstance(v.reference, tf.Tensor) + self.assertEqual(v.reference, 10) def test_executor_constructor_fails_if_not_in_eager_mode(self): with tf.Graph().as_default(): @@ -427,9 +427,9 @@ def test_executor_create_value_int(self): ex = eager_tf_executor.EagerTFExecutor() val = asyncio.run(ex.create_value(10, tf.int32)) self.assertIsInstance(val, eager_tf_executor.EagerValue) - self.assertIsInstance(val.internal_representation, tf.Tensor) + self.assertIsInstance(val.reference, tf.Tensor) self.assertEqual(str(val.type_signature), 'int32') - self.assertEqual(val.internal_representation, 10) + self.assertEqual(val.reference, 10) def test_executor_create_value_raises_on_lambda(self): ex = eager_tf_executor.EagerTFExecutor() @@ -458,15 +458,15 @@ def test_executor_create_value_unnamed_int_pair(self): }], [tf.int32, collections.OrderedDict([('a', tf.int32)])])) self.assertIsInstance(val, eager_tf_executor.EagerValue) self.assertEqual(str(val.type_signature), '>') - self.assertIsInstance(val.internal_representation, structure.Struct) - self.assertLen(val.internal_representation, 2) - self.assertIsInstance(val.internal_representation[0], tf.Tensor) - self.assertIsInstance(val.internal_representation[1], structure.Struct) - self.assertLen(val.internal_representation[1], 1) - self.assertEqual(dir(val.internal_representation[1]), ['a']) - self.assertIsInstance(val.internal_representation[1][0], tf.Tensor) - self.assertEqual(val.internal_representation[0], 10) - self.assertEqual(val.internal_representation[1][0], 20) + self.assertIsInstance(val.reference, structure.Struct) + self.assertLen(val.reference, 2) + self.assertIsInstance(val.reference[0], tf.Tensor) + self.assertIsInstance(val.reference[1], structure.Struct) + self.assertLen(val.reference[1], 1) + self.assertEqual(dir(val.reference[1]), ['a']) + self.assertIsInstance(val.reference[1][0], tf.Tensor) + self.assertEqual(val.reference[0], 10) + self.assertEqual(val.reference[1][0], 20) def test_executor_create_value_named_type_unnamed_value(self): ex = eager_tf_executor.EagerTFExecutor() @@ -475,12 +475,12 @@ def test_executor_create_value_named_type_unnamed_value(self): collections.OrderedDict(a=tf.int32, b=tf.int32))) self.assertIsInstance(val, eager_tf_executor.EagerValue) self.assertEqual(str(val.type_signature), '') - self.assertIsInstance(val.internal_representation, structure.Struct) - self.assertLen(val.internal_representation, 2) - self.assertIsInstance(val.internal_representation[0], tf.Tensor) - self.assertIsInstance(val.internal_representation[1], tf.Tensor) - self.assertEqual(val.internal_representation[0], 10) - self.assertEqual(val.internal_representation[1], 20) + self.assertIsInstance(val.reference, structure.Struct) + self.assertLen(val.reference, 2) + self.assertIsInstance(val.reference[0], tf.Tensor) + self.assertIsInstance(val.reference[1], tf.Tensor) + self.assertEqual(val.reference[0], 10) + self.assertEqual(val.reference[1], 20) def test_executor_create_value_no_arg_computation(self): ex = eager_tf_executor.EagerTFExecutor() @@ -495,8 +495,8 @@ def comp(): computation_types.FunctionType(None, tf.int32))) self.assertIsInstance(val, eager_tf_executor.EagerValue) self.assertEqual(str(val.type_signature), '( -> int32)') - self.assertTrue(callable(val.internal_representation)) - result = val.internal_representation() + self.assertTrue(callable(val.reference)) + result = val.reference() self.assertIsInstance(result, tf.Tensor) self.assertEqual(result, 1000) @@ -516,9 +516,9 @@ def comp(a, b): ('b', tf.int32)]), tf.int32))) self.assertIsInstance(val, eager_tf_executor.EagerValue) self.assertEqual(str(val.type_signature), '( -> int32)') - self.assertTrue(callable(val.internal_representation)) + self.assertTrue(callable(val.reference)) arg = structure.Struct([('a', tf.constant(10)), ('b', tf.constant(10))]) - result = val.internal_representation(arg) + result = val.reference(arg) self.assertIsInstance(result, tf.Tensor) self.assertEqual(result, 20) @@ -537,8 +537,8 @@ def comp(a, b): result = asyncio.run(ex.create_call(comp, arg)) self.assertIsInstance(result, eager_tf_executor.EagerValue) self.assertEqual(str(result.type_signature), 'int32') - self.assertIsInstance(result.internal_representation, tf.Tensor) - self.assertEqual(result.internal_representation, 30) + self.assertIsInstance(result.reference, tf.Tensor) + self.assertEqual(result.reference, 30) def test_dynamic_lookup_table_usage(self): @@ -566,8 +566,8 @@ def comp(table_args, to_lookup): result_1 = asyncio.run(ex.create_call(comp, arg_1)) result_2 = asyncio.run(ex.create_call(comp, arg_2)) - self.assertEqual(self.evaluate(result_1.internal_representation), 0) - self.assertEqual(self.evaluate(result_2.internal_representation), 3) + self.assertEqual(self.evaluate(result_1.reference), 0) + self.assertEqual(self.evaluate(result_2.reference), 3) # TODO(b/137602785): bring GPU test back after the fix for `wrap_function`. @tensorflow_test_utils.skip_test_for_gpu @@ -585,9 +585,8 @@ def comp(ds): result = asyncio.run(ex.create_call(comp, arg)) self.assertIsInstance(result, eager_tf_executor.EagerValue) self.assertEqual(str(result.type_signature), 'int32*') - self.assertIn('Dataset', type(result.internal_representation).__name__) - self.assertCountEqual([x.numpy() for x in result.internal_representation], - [10, 20]) + self.assertIn('Dataset', type(result.reference).__name__) + self.assertCountEqual([x.numpy() for x in result.reference], [10, 20]) # TODO(b/137602785): bring GPU test back after the fix for `wrap_function`. @tensorflow_test_utils.skip_test_for_gpu @@ -612,9 +611,8 @@ def comp(ds): result = asyncio.run(ex.create_call(comp, arg)) self.assertIsInstance(result, eager_tf_executor.EagerValue) self.assertEqual(str(result.type_signature), 'int64*') - self.assertIn('Dataset', type(result.internal_representation).__name__) - self.assertCountEqual([x.numpy() for x in result.internal_representation], - [0, 1]) + self.assertIn('Dataset', type(result.reference).__name__) + self.assertCountEqual([x.numpy() for x in result.reference], [0, 1]) # TODO(b/137602785): bring GPU test back after the fix for `wrap_function`. @tensorflow_test_utils.skip_test_for_gpu @@ -632,9 +630,8 @@ def comp(ds): result = asyncio.run(ex.create_call(comp, arg)) self.assertIsInstance(result, eager_tf_executor.EagerValue) self.assertEqual(str(result.type_signature), 'int32*') - self.assertIn('Dataset', type(result.internal_representation).__name__) - self.assertCountEqual([x.numpy() for x in result.internal_representation], - [10, 10, 10]) + self.assertIn('Dataset', type(result.reference).__name__) + self.assertCountEqual([x.numpy() for x in result.reference], [10, 10, 10]) # TODO(b/137602785): bring GPU test back after the fix for `wrap_function`. @tensorflow_test_utils.skip_test_for_gpu @@ -652,8 +649,8 @@ def comp(ds): result = asyncio.run(ex.create_call(comp, arg)) self.assertIsInstance(result, eager_tf_executor.EagerValue) self.assertEqual(str(result.type_signature), 'int32') - self.assertIsInstance(result.internal_representation, tf.Tensor) - self.assertEqual(result.internal_representation, 90) + self.assertIsInstance(result.reference, tf.Tensor) + self.assertEqual(result.reference, 90) # TODO(b/137602785): bring GPU test back after the fix for `wrap_function`. @tensorflow_test_utils.skip_test_for_gpu @@ -675,12 +672,12 @@ def comp(ds): result = asyncio.run(ex.create_call(comp, arg)) self.assertIsInstance(result, eager_tf_executor.EagerValue) self.assertEqual(str(result.type_signature), '') - self.assertIsInstance(result.internal_representation, structure.Struct) - self.assertCountEqual(dir(result.internal_representation), ['a', 'b']) - self.assertIsInstance(result.internal_representation.a, tf.Tensor) - self.assertIsInstance(result.internal_representation.b, tf.Tensor) - self.assertEqual(result.internal_representation.a, 60) - self.assertEqual(result.internal_representation.b, 15) + self.assertIsInstance(result.reference, structure.Struct) + self.assertCountEqual(dir(result.reference), ['a', 'b']) + self.assertIsInstance(result.reference.a, tf.Tensor) + self.assertIsInstance(result.reference.b, tf.Tensor) + self.assertEqual(result.reference.a, 60) + self.assertEqual(result.reference.b, 15) def test_executor_create_struct_and_selection(self): ex = eager_tf_executor.EagerTFExecutor() @@ -693,24 +690,24 @@ async def gather_values(values): v3 = asyncio.run( ex.create_struct(collections.OrderedDict([('a', v1), ('b', v2)]))) self.assertIsInstance(v3, eager_tf_executor.EagerValue) - self.assertIsInstance(v3.internal_representation, structure.Struct) - self.assertLen(v3.internal_representation, 2) - self.assertCountEqual(dir(v3.internal_representation), ['a', 'b']) - self.assertIsInstance(v3.internal_representation[0], tf.Tensor) - self.assertIsInstance(v3.internal_representation[1], tf.Tensor) + self.assertIsInstance(v3.reference, structure.Struct) + self.assertLen(v3.reference, 2) + self.assertCountEqual(dir(v3.reference), ['a', 'b']) + self.assertIsInstance(v3.reference[0], tf.Tensor) + self.assertIsInstance(v3.reference[1], tf.Tensor) self.assertEqual(str(v3.type_signature), '') - self.assertEqual(v3.internal_representation[0], 10) - self.assertEqual(v3.internal_representation[1], 20) + self.assertEqual(v3.reference[0], 10) + self.assertEqual(v3.reference[1], 20) v4 = asyncio.run(ex.create_selection(v3, 0)) self.assertIsInstance(v4, eager_tf_executor.EagerValue) - self.assertIsInstance(v4.internal_representation, tf.Tensor) + self.assertIsInstance(v4.reference, tf.Tensor) self.assertEqual(str(v4.type_signature), 'int32') - self.assertEqual(v4.internal_representation, 10) + self.assertEqual(v4.reference, 10) v5 = asyncio.run(ex.create_selection(v3, 1)) self.assertIsInstance(v5, eager_tf_executor.EagerValue) - self.assertIsInstance(v5.internal_representation, tf.Tensor) + self.assertIsInstance(v5.reference, tf.Tensor) self.assertEqual(str(v5.type_signature), 'int32') - self.assertEqual(v5.internal_representation, 20) + self.assertEqual(v5.reference, 20) def test_executor_compute(self): ex = eager_tf_executor.EagerTFExecutor() @@ -764,9 +761,8 @@ def _generate_items(): val = asyncio.run(ex.create_value(_generate_items, type_spec)) self.assertIsInstance(val, eager_tf_executor.EagerValue) self.assertEqual(str(val.type_signature), str(type_spec)) - self.assertIn('Dataset', type(val.internal_representation).__name__) - self.assertCountEqual([x.numpy() for x in val.internal_representation], - [2, 5, 10]) + self.assertIn('Dataset', type(val.reference).__name__) + self.assertCountEqual([x.numpy() for x in val.reference], [2, 5, 10]) if __name__ == '__main__': diff --git a/tensorflow_federated/python/core/impl/executors/executor_service_test.py b/tensorflow_federated/python/core/impl/executors/executor_service_test.py index 144bc8b6a3..8e5e503725 100644 --- a/tensorflow_federated/python/core/impl/executors/executor_service_test.py +++ b/tensorflow_federated/python/core/impl/executors/executor_service_test.py @@ -100,6 +100,10 @@ def __init__(self, v, t): self._v = v self._t = t + @property + def reference(self): + return self._v + @property def type_signature(self): return self._t diff --git a/tensorflow_federated/python/core/impl/executors/executor_test_utils.py b/tensorflow_federated/python/core/impl/executors/executor_test_utils.py index 2713c5ae7e..3edf6ce27b 100644 --- a/tensorflow_federated/python/core/impl/executors/executor_test_utils.py +++ b/tensorflow_federated/python/core/impl/executors/executor_test_utils.py @@ -238,7 +238,7 @@ def index(self): return self._index @property - def value(self): + def reference(self): return self._value @property diff --git a/tensorflow_federated/python/core/impl/executors/executor_value_base.py b/tensorflow_federated/python/core/impl/executors/executor_value_base.py index 5578ab5acf..4652ed17be 100644 --- a/tensorflow_federated/python/core/impl/executors/executor_value_base.py +++ b/tensorflow_federated/python/core/impl/executors/executor_value_base.py @@ -18,13 +18,19 @@ from tensorflow_federated.python.core.impl.types import typed_object -class ExecutorValue(typed_object.TypedObject, metaclass=abc.ABCMeta): +class ExecutorValue(abc.ABC, typed_object.TypedObject): """Represents the abstract interface for values embedded within executors. The embedded values may represent computations in-flight that may materialize in the future or fail before they materialize. """ + @property + @abc.abstractmethod + def reference(self): + """Returns a reference to the value without transferring ownership.""" + raise NotImplementedError + @abc.abstractmethod async def compute(self): """A coroutine that asynchronously returns the computed form of the value. diff --git a/tensorflow_federated/python/core/impl/executors/federated_composing_strategy.py b/tensorflow_federated/python/core/impl/executors/federated_composing_strategy.py index 8e05b594fa..5cd39585ea 100644 --- a/tensorflow_federated/python/core/impl/executors/federated_composing_strategy.py +++ b/tensorflow_federated/python/core/impl/executors/federated_composing_strategy.py @@ -11,11 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -# -# pytype: skip-file -# This modules disables the Pytype analyzer, see -# https://github.com/tensorflow/federated/blob/main/docs/pytype.md for more -# information. """A strategy for composing federated types and intrinsics in disjoint scopes. +------------+ @@ -87,7 +82,7 @@ def __init__(self, value, type_signature): self._type_signature = computation_types.to_type(type_signature) @property - def internal_representation(self): + def reference(self): return self._value @property @@ -309,14 +304,14 @@ async def compute_federated_value( @tracing.trace async def compute_federated_aggregate( - self, - arg: FederatedComposingStrategyValue) -> FederatedComposingStrategyValue: + self, arg: executor_value_base.ExecutorValue + ) -> executor_value_base.ExecutorValue: value_type, zero_type, accumulate_type, merge_type, report_type = ( executor_utils.parse_federated_aggregate_argument_types( arg.type_signature)) - py_typecheck.check_type(arg.internal_representation, structure.Struct) - py_typecheck.check_len(arg.internal_representation, 5) - val = arg.internal_representation[0] + py_typecheck.check_type(arg.reference, structure.Struct) + py_typecheck.check_len(arg.reference, 5) + val = arg.reference[0] py_typecheck.check_type(val, list) py_typecheck.check_len(val, len(self._target_executors)) identity_report, identity_report_type = ( @@ -330,9 +325,9 @@ async def compute_federated_aggregate( aggr_comp = executor_utils.create_intrinsic_comp( intrinsic_defs.FEDERATED_AGGREGATE, aggr_type) zero = await (await self._executor.create_selection(arg, 1)).compute() - accumulate = arg.internal_representation[2] - merge = arg.internal_representation[3] - report = arg.internal_representation[4] + accumulate = arg.reference[2] + merge = arg.reference[3] + report = arg.reference[4] async def _child_fn(ex, v): py_typecheck.check_type(v, executor_value_base.ExecutorValue) @@ -375,18 +370,18 @@ async def _child_fn(ex, v): @tracing.trace async def compute_federated_apply( - self, - arg: FederatedComposingStrategyValue) -> FederatedComposingStrategyValue: - py_typecheck.check_type(arg.internal_representation, structure.Struct) - py_typecheck.check_len(arg.internal_representation, 2) + self, arg: executor_value_base.ExecutorValue + ) -> executor_value_base.ExecutorValue: + py_typecheck.check_type(arg.reference, structure.Struct) + py_typecheck.check_len(arg.reference, 2) fn_type = arg.type_signature[0] py_typecheck.check_type(fn_type, computation_types.FunctionType) val_type = arg.type_signature[1] type_analysis.check_federated_type( val_type, fn_type.parameter, placements.SERVER, all_equal=True) - fn = arg.internal_representation[0] + fn = arg.reference[0] py_typecheck.check_type(fn, pb.Computation) - val = arg.internal_representation[1] + val = arg.reference[1] py_typecheck.check_type(val, executor_value_base.ExecutorValue) return FederatedComposingStrategyValue( await self._server_executor.create_call( @@ -395,17 +390,17 @@ async def compute_federated_apply( @tracing.trace async def compute_federated_broadcast( - self, - arg: FederatedComposingStrategyValue) -> FederatedComposingStrategyValue: + self, arg: executor_value_base.ExecutorValue + ) -> executor_value_base.ExecutorValue: return await executor_utils.compute_intrinsic_federated_broadcast( self._executor, arg) @tracing.trace async def _eval(self, arg, intrinsic, placement, all_equal): py_typecheck.check_type(arg.type_signature, computation_types.FunctionType) - py_typecheck.check_type(arg.internal_representation, pb.Computation) + py_typecheck.check_type(arg.reference, pb.Computation) py_typecheck.check_type(placement, placements.PlacementLiteral) - fn = arg.internal_representation + fn = arg.reference fn_type = arg.type_signature eval_type = computation_types.FunctionType( fn_type, @@ -429,28 +424,29 @@ async def _child_fn(ex): @tracing.trace async def compute_federated_eval_at_clients( - self, - arg: FederatedComposingStrategyValue) -> FederatedComposingStrategyValue: + self, arg: executor_value_base.ExecutorValue + ) -> executor_value_base.ExecutorValue: return await self._eval(arg, intrinsic_defs.FEDERATED_EVAL_AT_CLIENTS, placements.CLIENTS, False) @tracing.trace async def compute_federated_eval_at_server( - self, - arg: FederatedComposingStrategyValue) -> FederatedComposingStrategyValue: + self, arg: executor_value_base.ExecutorValue + ) -> executor_value_base.ExecutorValue: py_typecheck.check_type(arg.type_signature, computation_types.FunctionType) - py_typecheck.check_type(arg.internal_representation, pb.Computation) + py_typecheck.check_type(arg.reference, pb.Computation) fn_type = arg.type_signature embedded_fn = await self._server_executor.create_value( - arg.internal_representation, fn_type) + arg.reference, fn_type + ) embedded_call = await self._server_executor.create_call(embedded_fn) return FederatedComposingStrategyValue( embedded_call, computation_types.at_server(fn_type.result)) @tracing.trace async def _map(self, arg, all_equal=None): - py_typecheck.check_type(arg.internal_representation, structure.Struct) - py_typecheck.check_len(arg.internal_representation, 2) + py_typecheck.check_type(arg.reference, structure.Struct) + py_typecheck.check_len(arg.reference, 2) fn_type = arg.type_signature[0] py_typecheck.check_type(fn_type, computation_types.FunctionType) val_type = arg.type_signature[1] @@ -460,9 +456,9 @@ async def _map(self, arg, all_equal=None): elif all_equal and not val_type.all_equal: raise ValueError( 'Cannot map a non-all_equal argument into an all_equal result.') - fn = arg.internal_representation[0] + fn = arg.reference[0] py_typecheck.check_type(fn, pb.Computation) - val = arg.internal_representation[1] + val = arg.reference[1] py_typecheck.check_type(val, list) map_type = computation_types.FunctionType( @@ -486,20 +482,20 @@ async def _child_fn(ex, v): @tracing.trace async def compute_federated_map( - self, - arg: FederatedComposingStrategyValue) -> FederatedComposingStrategyValue: + self, arg: executor_value_base.ExecutorValue + ) -> executor_value_base.ExecutorValue: return await self._map(arg, all_equal=False) @tracing.trace async def compute_federated_map_all_equal( - self, - arg: FederatedComposingStrategyValue) -> FederatedComposingStrategyValue: + self, arg: executor_value_base.ExecutorValue + ) -> executor_value_base.ExecutorValue: return await self._map(arg, all_equal=True) @tracing.trace async def compute_federated_mean( - self, - arg: FederatedComposingStrategyValue) -> FederatedComposingStrategyValue: + self, arg: executor_value_base.ExecutorValue + ) -> executor_value_base.ExecutorValue: type_analysis.check_federated_type( arg.type_signature, placement=placements.CLIENTS) member_type = arg.type_signature.member @@ -534,8 +530,8 @@ async def _create_multiply_arg(): @tracing.trace async def compute_federated_sum( - self, - arg: FederatedComposingStrategyValue) -> FederatedComposingStrategyValue: + self, arg: executor_value_base.ExecutorValue + ) -> executor_value_base.ExecutorValue: type_analysis.check_federated_type( arg.type_signature, placement=placements.CLIENTS) id_comp, id_type = tensorflow_computation_factory.create_identity( @@ -557,25 +553,25 @@ async def compute_federated_sum( @tracing.trace async def compute_federated_secure_sum_bitwidth( - self, - arg: FederatedComposingStrategyValue) -> FederatedComposingStrategyValue: + self, arg: executor_value_base.ExecutorValue + ) -> executor_value_base.ExecutorValue: raise NotImplementedError('The secure sum intrinsic is not implemented.') @tracing.trace async def compute_federated_secure_select( - self, - arg: FederatedComposingStrategyValue) -> FederatedComposingStrategyValue: + self, arg: executor_value_base.ExecutorValue + ) -> executor_value_base.ExecutorValue: raise NotImplementedError('The secure select intrinsic is not implemented.') @tracing.trace async def compute_federated_select( - self, - arg: FederatedComposingStrategyValue) -> FederatedComposingStrategyValue: + self, arg: executor_value_base.ExecutorValue + ) -> executor_value_base.ExecutorValue: client_keys_type, max_key_type, server_val_type, select_fn_type = ( arg.type_signature) del client_keys_type # Unused - py_typecheck.check_type(arg.internal_representation, structure.Struct) - client_keys, max_key, server_val, select_fn = arg.internal_representation + py_typecheck.check_type(arg.reference, structure.Struct) + client_keys, max_key, server_val, select_fn = arg.reference py_typecheck.check_type(client_keys, list) py_typecheck.check_len(client_keys, len(self._target_executors)) py_typecheck.check_type(max_key, executor_value_base.ExecutorValue) @@ -614,22 +610,22 @@ async def child_fn(child, child_client_keys): @tracing.trace async def compute_federated_value_at_clients( - self, - arg: FederatedComposingStrategyValue) -> FederatedComposingStrategyValue: + self, arg: executor_value_base.ExecutorValue + ) -> executor_value_base.ExecutorValue: return await executor_utils.compute_intrinsic_federated_value( self._executor, arg, placements.CLIENTS) @tracing.trace async def compute_federated_value_at_server( - self, - arg: FederatedComposingStrategyValue) -> FederatedComposingStrategyValue: + self, arg: executor_value_base.ExecutorValue + ) -> executor_value_base.ExecutorValue: return await executor_utils.compute_intrinsic_federated_value( self._executor, arg, placements.SERVER) @tracing.trace async def compute_federated_weighted_mean( - self, - arg: FederatedComposingStrategyValue) -> FederatedComposingStrategyValue: + self, arg: executor_value_base.ExecutorValue + ) -> executor_value_base.ExecutorValue: return await executor_utils.compute_intrinsic_federated_weighted_mean( self._executor, arg, @@ -651,8 +647,8 @@ async def _zip_struct_into_child(self, child, child_index, value, value_type): @tracing.trace async def compute_federated_zip_at_clients( - self, - arg: FederatedComposingStrategyValue) -> FederatedComposingStrategyValue: + self, arg: executor_value_base.ExecutorValue + ) -> executor_value_base.ExecutorValue: py_typecheck.check_type(arg.type_signature, computation_types.StructType) result_type = computation_types.at_clients( type_transformations.strip_placement(arg.type_signature)) @@ -662,7 +658,8 @@ async def compute_federated_zip_at_clients( async def _child_fn(child, child_index): struct_value = await self._zip_struct_into_child( - child, child_index, arg.internal_representation, arg.type_signature) + child, child_index, arg.reference, arg.type_signature + ) return await child.create_call( await child.create_value(zip_comp, zip_type), struct_value) @@ -686,11 +683,12 @@ async def _zip_struct_into_server(self, value, value_type): @tracing.trace async def compute_federated_zip_at_server( - self, - arg: FederatedComposingStrategyValue) -> FederatedComposingStrategyValue: + self, arg: executor_value_base.ExecutorValue + ) -> executor_value_base.ExecutorValue: py_typecheck.check_type(arg.type_signature, computation_types.StructType) result_type = computation_types.at_server( type_transformations.strip_placement(arg.type_signature)) return FederatedComposingStrategyValue( - await self._zip_struct_into_server(arg.internal_representation, - arg.type_signature), result_type) + await self._zip_struct_into_server(arg.reference, arg.type_signature), + result_type, + ) diff --git a/tensorflow_federated/python/core/impl/executors/federated_resolving_strategy.py b/tensorflow_federated/python/core/impl/executors/federated_resolving_strategy.py index 9337d70097..f3b3f25efa 100644 --- a/tensorflow_federated/python/core/impl/executors/federated_resolving_strategy.py +++ b/tensorflow_federated/python/core/impl/executors/federated_resolving_strategy.py @@ -11,11 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -# -# pytype: skip-file -# This modules disables the Pytype analyzer, see -# https://github.com/tensorflow/federated/blob/main/docs/pytype.md for more -# information. """A strategy for resolving federated types and intrinsics. +------------+ @@ -47,7 +42,7 @@ """ import asyncio -from typing import Any +from typing import Any, Union from absl import logging import tensorflow as tf @@ -85,7 +80,7 @@ def __init__(self, value, type_signature): self._type_signature = computation_types.to_type(type_signature) @property - def internal_representation(self): + def reference(self): return self._value @property @@ -146,11 +141,14 @@ class FederatedResolvingStrategy(federating_executor.FederatingStrategy): """ @classmethod - def factory(cls, - target_executors: dict[str, executor_base.Executor], - local_computation_factory: local_computation_factory_base - .LocalComputationFactory = tensorflow_computation_factory - .TensorFlowComputationFactory()): + def factory( + cls, + target_executors: dict[ + placements.PlacementLiteral, + Union[list[executor_base.Executor], executor_base.Executor], + ], + local_computation_factory: local_computation_factory_base.LocalComputationFactory = tensorflow_computation_factory.TensorFlowComputationFactory(), + ): # pylint:disable=g-long-lambda return lambda executor: cls( executor, @@ -158,12 +156,15 @@ def factory(cls, local_computation_factory=local_computation_factory) # pylint:enable=g-long-lambda - def __init__(self, - executor: federating_executor.FederatingExecutor, - target_executors: dict[str, executor_base.Executor], - local_computation_factory: local_computation_factory_base - .LocalComputationFactory = tensorflow_computation_factory - .TensorFlowComputationFactory()): + def __init__( + self, + executor: federating_executor.FederatingExecutor, + target_executors: dict[ + placements.PlacementLiteral, + Union[list[executor_base.Executor], executor_base.Executor], + ], + local_computation_factory: local_computation_factory_base.LocalComputationFactory = tensorflow_computation_factory.TensorFlowComputationFactory(), + ): """Creates a `FederatedResolvingStrategy`. Args: @@ -220,7 +221,7 @@ def close(self): def _check_arg_is_structure(self, arg): py_typecheck.check_type(arg.type_signature, computation_types.StructType) - py_typecheck.check_type(arg.internal_representation, structure.Struct) + py_typecheck.check_type(arg.reference, structure.Struct) def _check_strategy_compatible_with_placement(self, placement): """Tests that this executor is compatible with the given `placement`. @@ -295,9 +296,9 @@ async def compute_federated_value( async def _eval(self, arg, placement, all_equal): py_typecheck.check_type(arg.type_signature, computation_types.FunctionType) py_typecheck.check_none(arg.type_signature.parameter) - py_typecheck.check_type(arg.internal_representation, pb.Computation) + py_typecheck.check_type(arg.reference, pb.Computation) py_typecheck.check_type(placement, placements.PlacementLiteral) - fn = arg.internal_representation + fn = arg.reference fn_type = arg.type_signature self._check_strategy_compatible_with_placement(placement) children = self._target_executors[placement] @@ -314,7 +315,7 @@ async def call(child): @tracing.trace async def _map(self, arg, all_equal=None): self._check_arg_is_structure(arg) - py_typecheck.check_len(arg.internal_representation, 2) + py_typecheck.check_len(arg.reference, 2) fn_type = arg.type_signature[0] py_typecheck.check_type(fn_type, computation_types.FunctionType) val_type = arg.type_signature[1] @@ -324,9 +325,9 @@ async def _map(self, arg, all_equal=None): elif all_equal and not val_type.all_equal: raise ValueError( 'Cannot map a non-all_equal argument into an all_equal result.') - fn = arg.internal_representation[0] + fn = arg.reference[0] py_typecheck.check_type(fn, pb.Computation) - val = arg.internal_representation[1] + val = arg.reference[1] py_typecheck.check_type(val, list) for v in val: py_typecheck.check_type(v, executor_value_base.ExecutorValue) @@ -376,11 +377,14 @@ async def _zip(self, arg, placement, all_equal): py_typecheck.check_type(placement, placements.PlacementLiteral) self._check_strategy_compatible_with_placement(placement) children = self._target_executors[placement] - results = await asyncio.gather(*[ - self._zip_struct_into_child(child, i, arg.internal_representation, - arg.type_signature) - for i, child in enumerate(children) - ]) + results = await asyncio.gather( + *[ + self._zip_struct_into_child( + child, i, arg.reference, arg.type_signature + ) + for i, child in enumerate(children) + ] + ) return FederatedResolvingStrategyValue( results, computation_types.FederatedType( @@ -390,15 +394,15 @@ async def _zip(self, arg, placement, all_equal): @tracing.trace async def compute_federated_aggregate( - self, - arg: FederatedResolvingStrategyValue) -> FederatedResolvingStrategyValue: + self, arg: executor_value_base.ExecutorValue + ) -> executor_value_base.ExecutorValue: val_type, zero_type, accumulate_type, merge_type, report_type = ( executor_utils.parse_federated_aggregate_argument_types( arg.type_signature)) del val_type, merge_type - py_typecheck.check_type(arg.internal_representation, structure.Struct) - py_typecheck.check_len(arg.internal_representation, 5) - val, zero, accumulate, merge, report = arg.internal_representation + py_typecheck.check_type(arg.reference, structure.Struct) + py_typecheck.check_len(arg.reference, 5) + val, zero, accumulate, merge, report = arg.reference # Discard `merge`. Since all aggregation happens on a single executor, # there's no need for this additional layer. @@ -417,60 +421,63 @@ async def compute_federated_aggregate( return await self.compute_federated_apply( FederatedResolvingStrategyValue( - structure.Struct([(None, report), - (None, pre_report.internal_representation)]), + structure.Struct([(None, report), (None, pre_report.reference)]), computation_types.StructType( - (report_type, pre_report.type_signature)))) + (report_type, pre_report.type_signature) + ), + ) + ) @tracing.trace async def compute_federated_apply( - self, - arg: FederatedResolvingStrategyValue) -> FederatedResolvingStrategyValue: + self, arg: executor_value_base.ExecutorValue + ) -> executor_value_base.ExecutorValue: return await self._map(arg) @tracing.trace async def compute_federated_broadcast( - self, - arg: FederatedResolvingStrategyValue) -> FederatedResolvingStrategyValue: - py_typecheck.check_type(arg.internal_representation, list) - if len(arg.internal_representation) != 1: + self, arg: executor_value_base.ExecutorValue + ) -> executor_value_base.ExecutorValue: + py_typecheck.check_type(arg.reference, list) + if len(arg.reference) != 1: raise ValueError( 'Federated broadcast expects a value with a single representation, ' - 'found {}.'.format(len(arg.internal_representation))) + 'found {}.'.format(len(arg.reference)) + ) return await executor_utils.compute_intrinsic_federated_broadcast( self._executor, arg) @tracing.trace async def compute_federated_eval_at_clients( - self, - arg: FederatedResolvingStrategyValue) -> FederatedResolvingStrategyValue: + self, arg: executor_value_base.ExecutorValue + ) -> executor_value_base.ExecutorValue: return await self._eval(arg, placements.CLIENTS, False) @tracing.trace async def compute_federated_eval_at_server( - self, - arg: FederatedResolvingStrategyValue) -> FederatedResolvingStrategyValue: + self, arg: executor_value_base.ExecutorValue + ) -> executor_value_base.ExecutorValue: return await self._eval(arg, placements.SERVER, True) @tracing.trace async def compute_federated_map( - self, - arg: FederatedResolvingStrategyValue) -> FederatedResolvingStrategyValue: + self, arg: executor_value_base.ExecutorValue + ) -> executor_value_base.ExecutorValue: return await self._map(arg, all_equal=False) @tracing.trace async def compute_federated_map_all_equal( - self, - arg: FederatedResolvingStrategyValue) -> FederatedResolvingStrategyValue: + self, arg: executor_value_base.ExecutorValue + ) -> executor_value_base.ExecutorValue: return await self._map(arg, all_equal=True) @tracing.trace async def compute_federated_mean( - self, - arg: FederatedResolvingStrategyValue) -> FederatedResolvingStrategyValue: + self, arg: executor_value_base.ExecutorValue + ) -> executor_value_base.ExecutorValue: arg_sum = await self.compute_federated_sum(arg) member_type = arg_sum.type_signature.member - count = float(len(arg.internal_representation)) + count = float(len(arg.reference)) if count < 1.0: raise RuntimeError('Cannot compute a federated mean over an empty group.') child = self._target_executors[placements.SERVER][0] @@ -485,8 +492,8 @@ async def compute_federated_mean( member_type, local_computation_factory=self._local_computation_factory)) multiply_arg = await child.create_struct( - structure.Struct([(None, arg_sum.internal_representation[0]), - (None, factor)])) + structure.Struct([(None, arg_sum.reference[0]), (None, factor)]) + ) result = await child.create_call(multiply, multiply_arg) return FederatedResolvingStrategyValue([result], arg_sum.type_signature) @@ -522,8 +529,8 @@ async def _move(v): @tracing.trace async def compute_federated_secure_sum_bitwidth( - self, - arg: FederatedResolvingStrategyValue) -> FederatedResolvingStrategyValue: + self, arg: executor_value_base.ExecutorValue + ) -> executor_value_base.ExecutorValue: raise NotImplementedError( '`tff.federated_secure_sum_bitwidth()` is not implemented in this ' 'executor. For a fake implementation of ' @@ -535,8 +542,8 @@ async def compute_federated_secure_sum_bitwidth( @tracing.trace async def compute_federated_secure_select( - self, - arg: FederatedResolvingStrategyValue) -> FederatedResolvingStrategyValue: + self, arg: executor_value_base.ExecutorValue + ) -> executor_value_base.ExecutorValue: raise NotImplementedError( '`tff.federated_secure_select()` is not implemented in this executor. ' 'For a fake implementation of `federated_secure_select` suitable for ' @@ -546,12 +553,12 @@ async def compute_federated_secure_select( @tracing.trace async def compute_federated_select( - self, - arg: FederatedResolvingStrategyValue) -> FederatedResolvingStrategyValue: + self, arg: executor_value_base.ExecutorValue + ) -> executor_value_base.ExecutorValue: client_keys_type, max_key_type, server_val_type, select_fn_type = ( arg.type_signature) - py_typecheck.check_type(arg.internal_representation, structure.Struct) - client_keys, max_key, server_val, select_fn = arg.internal_representation + py_typecheck.check_type(arg.reference, structure.Struct) + client_keys, max_key, server_val, select_fn = arg.reference # We slice up the value as-needed, so `max_key` is not used. del max_key, max_key_type del server_val_type # unused @@ -607,8 +614,8 @@ async def select_single_client(client, keys_at_client): @tracing.trace async def compute_federated_sum( - self, - arg: FederatedResolvingStrategyValue) -> FederatedResolvingStrategyValue: + self, arg: executor_value_base.ExecutorValue + ) -> executor_value_base.ExecutorValue: py_typecheck.check_type(arg.type_signature, computation_types.FederatedType) zero, plus = await asyncio.gather( executor_utils.embed_constant( @@ -620,27 +627,28 @@ async def compute_federated_sum( self._executor, arg.type_signature.member, local_computation_factory=self._local_computation_factory)) - return await self.reduce(arg.internal_representation, zero, - plus.internal_representation, plus.type_signature) + return await self.reduce( + arg.reference, zero, plus.reference, plus.type_signature + ) @tracing.trace async def compute_federated_value_at_clients( - self, - arg: FederatedResolvingStrategyValue) -> FederatedResolvingStrategyValue: + self, arg: executor_value_base.ExecutorValue + ) -> executor_value_base.ExecutorValue: return await executor_utils.compute_intrinsic_federated_value( self._executor, arg, placements.CLIENTS) @tracing.trace async def compute_federated_value_at_server( - self, - arg: FederatedResolvingStrategyValue) -> FederatedResolvingStrategyValue: + self, arg: executor_value_base.ExecutorValue + ) -> executor_value_base.ExecutorValue: return await executor_utils.compute_intrinsic_federated_value( self._executor, arg, placements.SERVER) @tracing.trace async def compute_federated_weighted_mean( - self, - arg: FederatedResolvingStrategyValue) -> FederatedResolvingStrategyValue: + self, arg: executor_value_base.ExecutorValue + ) -> executor_value_base.ExecutorValue: return await executor_utils.compute_intrinsic_federated_weighted_mean( self._executor, arg, @@ -648,12 +656,12 @@ async def compute_federated_weighted_mean( @tracing.trace async def compute_federated_zip_at_clients( - self, - arg: FederatedResolvingStrategyValue) -> FederatedResolvingStrategyValue: + self, arg: executor_value_base.ExecutorValue + ) -> executor_value_base.ExecutorValue: return await self._zip(arg, placements.CLIENTS, all_equal=False) @tracing.trace async def compute_federated_zip_at_server( - self, - arg: FederatedResolvingStrategyValue) -> FederatedResolvingStrategyValue: + self, arg: executor_value_base.ExecutorValue + ) -> executor_value_base.ExecutorValue: return await self._zip(arg, placements.SERVER, all_equal=True) diff --git a/tensorflow_federated/python/core/impl/executors/federating_executor.py b/tensorflow_federated/python/core/impl/executors/federating_executor.py index 14b6fd277e..5e1ec36169 100644 --- a/tensorflow_federated/python/core/impl/executors/federating_executor.py +++ b/tensorflow_federated/python/core/impl/executors/federating_executor.py @@ -11,11 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -# -# pytype: skip-file -# This modules disables the Pytype analyzer, see -# https://github.com/tensorflow/federated/blob/main/docs/pytype.md for more -# information. """An executor that handles federated types and federated operators. +------------+ @@ -432,19 +427,23 @@ async def create_call( computation_types.FunctionType) param_type = comp.type_signature.parameter param_type.check_assignable_from(arg.type_signature) - arg = self._strategy.ingest_value(arg.internal_representation, param_type) - if isinstance(comp.internal_representation, pb.Computation): - which_computation = comp.internal_representation.WhichOneof('computation') - if ((which_computation in ['tensorflow', 'xla', 'intrinsic']) or - ((which_computation == 'intrinsic') and - (comp.internal_representation.intrinsic.uri - in FederatingExecutor._FORWARDED_INTRINSICS))): + arg = self._strategy.ingest_value(arg.reference, param_type) + if isinstance(comp.reference, pb.Computation): + which_computation = comp.reference.WhichOneof('computation') + if (which_computation in ['tensorflow', 'xla', 'intrinsic']) or ( + (which_computation == 'intrinsic') + and ( + comp.reference.intrinsic.uri + in FederatingExecutor._FORWARDED_INTRINSICS + ) + ): embedded_comp = await self._unplaced_executor.create_value( - comp.internal_representation, comp.type_signature) + comp.reference, comp.type_signature + ) if arg is not None: embedded_arg = await executor_utils.delegate_entirely_to_executor( - arg.internal_representation, arg.type_signature, - self._unplaced_executor) + arg.reference, arg.type_signature, self._unplaced_executor + ) else: embedded_arg = None result = await self._unplaced_executor.create_call( @@ -454,12 +453,16 @@ async def create_call( raise ValueError( 'Directly calling computations of type {} is unsupported.'.format( which_computation)) - elif isinstance(comp.internal_representation, intrinsic_defs.IntrinsicDef): + elif isinstance(comp.reference, intrinsic_defs.IntrinsicDef): return await self._strategy.compute_federated_intrinsic( - comp.internal_representation.uri, arg) + comp.reference.uri, arg + ) else: - raise ValueError('Calling objects of type {} is unsupported.'.format( - py_typecheck.type_string(type(comp.internal_representation)))) + raise ValueError( + 'Calling objects of type {} is unsupported.'.format( + py_typecheck.type_string(type(comp.reference)) + ) + ) @tracing.trace async def create_struct( @@ -482,7 +485,7 @@ async def create_struct( for name, value in structure.iter_elements( structure.from_container(elements)): py_typecheck.check_type(value, executor_value_base.ExecutorValue) - element_values.append((name, value.internal_representation)) + element_values.append((name, value.reference)) if name is not None: element_types.append((name, value.type_signature)) else: @@ -518,17 +521,18 @@ async def create_selection(self, source: executor_value_base.ExecutorValue, """ py_typecheck.check_type(source, executor_value_base.ExecutorValue) py_typecheck.check_type(source.type_signature, computation_types.StructType) - if isinstance(source.internal_representation, - executor_value_base.ExecutorValue): + if isinstance(source.reference, executor_value_base.ExecutorValue): result = await self._unplaced_executor.create_selection( - source.internal_representation, index) + source.reference, index + ) return self._strategy.ingest_value(result, result.type_signature) - elif isinstance(source.internal_representation, structure.Struct): - value = source.internal_representation[index] + elif isinstance(source.reference, structure.Struct): + value = source.reference[index] type_signature = source.type_signature[index] return self._strategy.ingest_value(value, type_signature) else: raise ValueError( 'Unexpected internal representation while creating selection. ' 'Expected one of `Struct` or value embedded in target ' - 'executor, received {}'.format(source.internal_representation)) + 'executor, received {}'.format(source.reference) + ) diff --git a/tensorflow_federated/python/core/impl/executors/reference_resolving_executor.py b/tensorflow_federated/python/core/impl/executors/reference_resolving_executor.py index 5e585322e4..1adb62a198 100644 --- a/tensorflow_federated/python/core/impl/executors/reference_resolving_executor.py +++ b/tensorflow_federated/python/core/impl/executors/reference_resolving_executor.py @@ -11,11 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -# -# pytype: skip-file -# This modules disables the Pytype analyzer, see -# https://github.com/tensorflow/federated/blob/main/docs/pytype.md for more -# information. """An executor that understands lambda expressions and related abstractions.""" import asyncio @@ -214,12 +209,7 @@ def __init__(self, value: LambdaValueInner, type_spec=None): self._type_signature = type_spec @property - def internal_representation(self) -> LambdaValueInner: - """Returns a representation of the value embedded in the executor. - - This property is only intended for use by the lambda executor and tests. Not - for consumption by consumers of the executor interface. - """ + def reference(self) -> LambdaValueInner: return self._value @property @@ -306,7 +296,7 @@ async def create_struct(self, elements): async def create_selection(self, source, index): py_typecheck.check_type(source, ReferenceResolvingExecutorValue) py_typecheck.check_type(source.type_signature, computation_types.StructType) - source_repr = source.internal_representation + source_repr = source.reference if isinstance(source_repr, executor_value_base.ExecutorValue): return ReferenceResolvingExecutorValue( await self._target_executor.create_selection(source_repr, index)) @@ -332,7 +322,7 @@ async def create_call(self, comp, arg=None): 'takes an argument of type {}, but was supplied ' 'an argument of type {}'.format(param_type, arg_type)) - comp_repr = comp.internal_representation + comp_repr = comp.reference if isinstance(comp_repr, executor_value_base.ExecutorValue): # `comp` represents a function in the target executor, so we convert the # argument to a value inside the target executor and `create_call` on @@ -372,7 +362,7 @@ async def _embed_value_in_target_exec( is in a form that cannot be delegated. """ py_typecheck.check_type(value, ReferenceResolvingExecutorValue) - value_repr = value.internal_representation + value_repr = value.reference if isinstance(value_repr, executor_value_base.ExecutorValue): return value_repr elif isinstance(value_repr, structure.Struct): diff --git a/tensorflow_federated/python/core/impl/executors/remote_executor.py b/tensorflow_federated/python/core/impl/executors/remote_executor.py index bcf3c78adf..9b732cb743 100644 --- a/tensorflow_federated/python/core/impl/executors/remote_executor.py +++ b/tensorflow_federated/python/core/impl/executors/remote_executor.py @@ -11,11 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -# -# pytype: skip-file -# This modules disables the Pytype analyzer, see -# https://github.com/tensorflow/federated/blob/main/docs/pytype.md for more -# information. """A local proxy for a remote executor service hosted on a separate machine.""" import asyncio @@ -79,14 +74,14 @@ def finalizer(value_ref, executor, executor_id): def type_signature(self): return self._type_signature + @property + def reference(self): + return self._value_ref + @tracing.trace(span=True) async def compute(self): return await self._executor._compute(self._value_ref, self._type_signature) # pylint: disable=protected-access - @property - def value_ref(self): - return self._value_ref - class RemoteExecutor(executor_base.Executor): """The remote executor is a local proxy for a remote executor instance.""" diff --git a/tensorflow_federated/python/core/impl/executors/remote_executor_test.py b/tensorflow_federated/python/core/impl/executors/remote_executor_test.py index 59f13fb761..f671207712 100644 --- a/tensorflow_federated/python/core/impl/executors/remote_executor_test.py +++ b/tensorflow_federated/python/core/impl/executors/remote_executor_test.py @@ -258,7 +258,7 @@ def test_create_value_for_nested_struct_with_stream_structs( self, stream_structs, mock_stub): if stream_structs: self.skipTest( - 'b/263261613 - Missing support for multiple return_value types in mock_stub' + 'b/263261613 - Support multiple return_value types in mock_stub' ) mock_stub.create_value.return_value = executor_pb2.CreateStructResponse() else: @@ -428,7 +428,8 @@ def test_create_struct_reraises_type_error_with_stream_structs( @mock.patch.object(remote_executor_stub, 'RemoteExecutorStub') def test_create_selection_returns_remote_value_with_stream_structs( self, stream_structs, mock_stub): - mock_stub.create_selection.return_value = executor_pb2.CreateSelectionResponse( + mock_stub.create_selection.return_value = ( + executor_pb2.CreateSelectionResponse() ) executor = remote_executor.RemoteExecutor( mock_stub, stream_structs=stream_structs) diff --git a/tensorflow_federated/python/core/impl/executors/sequence_executor.py b/tensorflow_federated/python/core/impl/executors/sequence_executor.py index 07eaaf41e0..b7f272c8fc 100644 --- a/tensorflow_federated/python/core/impl/executors/sequence_executor.py +++ b/tensorflow_federated/python/core/impl/executors/sequence_executor.py @@ -275,9 +275,12 @@ async def _delegate(val, type_spec: computation_types.Type, class SequenceExecutorValue(executor_value_base.ExecutorValue): """A representation of a value owned and managed by the `SequenceExecutor`.""" - _VALID_INTERNAL_REPRESENTATION_TYPES = (_Sequence, _SequenceOp, - executor_value_base.ExecutorValue, - structure.Struct) + _VALUE_TYPES = ( + _Sequence, + _SequenceOp, + executor_value_base.ExecutorValue, + structure.Struct, + ) def __init__(self, value, type_spec): """Creates an instance of `SequenceExecutorValue` to represent `value`. @@ -300,13 +303,12 @@ def __init__(self, value, type_spec): type_spec: The TFF type of the value. """ py_typecheck.check_type(type_spec, computation_types.Type) - py_typecheck.check_type( - value, SequenceExecutorValue._VALID_INTERNAL_REPRESENTATION_TYPES) + py_typecheck.check_type(value, SequenceExecutorValue._VALUE_TYPES) self._type_signature = type_spec self._value = value @property - def internal_representation(self): + def reference(self): return self._value @property @@ -432,18 +434,18 @@ async def create_value(self, value, type_spec=None): async def create_call(self, comp, arg=None): py_typecheck.check_type(comp, SequenceExecutorValue) py_typecheck.check_type(comp.type_signature, computation_types.FunctionType) - fn = comp.internal_representation + fn = comp.reference if isinstance(fn, executor_value_base.ExecutorValue): if arg is not None: - arg = await _delegate(arg.internal_representation, arg.type_signature, - self._target_executor) + arg = await _delegate( + arg.reference, arg.type_signature, self._target_executor + ) target_result = await self._target_executor.create_call(fn, arg) return SequenceExecutorValue(target_result, target_result.type_signature) if isinstance(fn, _SequenceOp): py_typecheck.check_type(arg, SequenceExecutorValue) comp.type_signature.parameter.check_assignable_from(arg.type_signature) - result = await fn.execute(self._target_executor, - arg.internal_representation) + result = await fn.execute(self._target_executor, arg.reference) result_type = comp.type_signature.result return SequenceExecutorValue(result, result_type) raise NotImplementedError( @@ -458,7 +460,7 @@ async def create_struct(self, elements): type_elements = [] for k, v in elements: py_typecheck.check_type(v, SequenceExecutorValue) - val_elements.append((k, v.internal_representation)) + val_elements.append((k, v.reference)) type_elements.append((k, v.type_signature)) return SequenceExecutorValue( structure.Struct(val_elements), @@ -468,15 +470,16 @@ async def create_struct(self, elements): async def create_selection(self, source, index): py_typecheck.check_type(source, SequenceExecutorValue) py_typecheck.check_type(source.type_signature, computation_types.StructType) - if isinstance(source.internal_representation, - executor_value_base.ExecutorValue): + if isinstance(source.reference, executor_value_base.ExecutorValue): target_val = await self._target_executor.create_selection( - source.internal_representation, index) + source.reference, index + ) return SequenceExecutorValue(target_val, target_val.type_signature) - py_typecheck.check_type(source.internal_representation, structure.Struct) + py_typecheck.check_type(source.reference, structure.Struct) py_typecheck.check_type(index, int) - return SequenceExecutorValue(source.internal_representation[index], - source.type_signature[index]) + return SequenceExecutorValue( + source.reference[index], source.type_signature[index] + ) def close(self): self._target_executor.close() diff --git a/tensorflow_federated/python/core/impl/executors/sequence_executor_test.py b/tensorflow_federated/python/core/impl/executors/sequence_executor_test.py index bdfc2e7d37..8c0021c0af 100644 --- a/tensorflow_federated/python/core/impl/executors/sequence_executor_test.py +++ b/tensorflow_federated/python/core/impl/executors/sequence_executor_test.py @@ -82,10 +82,8 @@ def test_create_value_and_compute_with_int_const(self): self._sequence_executor.create_value(int_const, type_spec)) self.assertIsInstance(val, sequence_executor.SequenceExecutorValue) self.assertEqual(str(val.type_signature), str(type_spec)) - self.assertIsInstance(val.internal_representation, - eager_tf_executor.EagerValue) - self.assertEqual(val.internal_representation.internal_representation, - int_const) + self.assertIsInstance(val.reference, eager_tf_executor.EagerValue) + self.assertEqual(val.reference.reference, int_const) result = asyncio.run(val.compute()) self.assertEqual(result, 10) @@ -97,11 +95,9 @@ def test_create_value_and_compute_with_struct(self): self._sequence_executor.create_value(my_struct, type_spec)) self.assertIsInstance(val, sequence_executor.SequenceExecutorValue) self.assertEqual(str(val.type_signature), str(type_spec)) - self.assertIsInstance(val.internal_representation, structure.Struct) - self.assertIsInstance(val.internal_representation.a, - eager_tf_executor.EagerValue) - self.assertIsInstance(val.internal_representation.b, - eager_tf_executor.EagerValue) + self.assertIsInstance(val.reference, structure.Struct) + self.assertIsInstance(val.reference.a, eager_tf_executor.EagerValue) + self.assertIsInstance(val.reference.b, eager_tf_executor.EagerValue) result = asyncio.run(val.compute()) self.assertEqual(str(result), '') @@ -116,15 +112,12 @@ def test_create_struct(self): val = asyncio.run(self._sequence_executor.create_struct(elements)) self.assertIsInstance(val, sequence_executor.SequenceExecutorValue) self.assertEqual(str(val.type_signature), '') - self.assertIsInstance(val.internal_representation, structure.Struct) - self.assertListEqual( - structure.name_list(val.internal_representation), ['v10', 'v20']) - self.assertIsInstance(val.internal_representation.v10, - eager_tf_executor.EagerValue) - self.assertIsInstance(val.internal_representation.v20, - eager_tf_executor.EagerValue) - self.assertEqual(asyncio.run(val.internal_representation.v10.compute()), 10) - self.assertEqual(asyncio.run(val.internal_representation.v20.compute()), 20) + self.assertIsInstance(val.reference, structure.Struct) + self.assertListEqual(structure.name_list(val.reference), ['v10', 'v20']) + self.assertIsInstance(val.reference.v10, eager_tf_executor.EagerValue) + self.assertIsInstance(val.reference.v20, eager_tf_executor.EagerValue) + self.assertEqual(asyncio.run(val.reference.v10.compute()), 10) + self.assertEqual(asyncio.run(val.reference.v20.compute()), 20) def test_create_selection(self): my_struct = collections.OrderedDict([('a', 10), ('b', 20)]) @@ -144,11 +137,10 @@ def test_create_value_with_tf_computation(self): val = asyncio.run(self._sequence_executor.create_value(comp_pb, type_spec)) self.assertIsInstance(val, sequence_executor.SequenceExecutorValue) self.assertEqual(str(val.type_signature), str(type_spec)) - self.assertIsInstance(val.internal_representation, - eager_tf_executor.EagerValue) + self.assertIsInstance(val.reference, eager_tf_executor.EagerValue) async def _fn(): - fn = val.internal_representation + fn = val.reference arg = await self._target_executor.create_value(10, tf.int32) result = await self._target_executor.create_call(fn, arg) return await result.compute() @@ -193,9 +185,8 @@ def test_create_value_with_eager_tf_dataset(self): val = asyncio.run(self._sequence_executor.create_value(ds, type_spec)) self.assertIsInstance(val, sequence_executor.SequenceExecutorValue) self.assertEqual(str(val.type_signature), str(type_spec)) - self.assertIsInstance(val.internal_representation, - sequence_executor._Sequence) - self.assertIs(asyncio.run(val.internal_representation.compute()), ds) + self.assertIsInstance(val.reference, sequence_executor._Sequence) + self.assertIs(asyncio.run(val.reference.compute()), ds) result = list(asyncio.run(val.compute())) self.assertListEqual(result, list(range(5))) @@ -222,8 +213,7 @@ def test_create_value_with_sequence_reduce_intrinsic_spec(self): tf.int32) self.assertIsInstance(val, sequence_executor.SequenceExecutorValue) self.assertEqual(str(val.type_signature), str(type_spec)) - self.assertIsInstance(val.internal_representation, - sequence_executor._SequenceReduceOp) + self.assertIsInstance(val.reference, sequence_executor._SequenceReduceOp) def test_sequence_reduce_tf_dataset(self): ds = tf.data.Dataset.range(5) @@ -272,8 +262,7 @@ def test_create_value_with_sequence_map_intrinsic_spec(self): val = _make_sequence_map_value(self._sequence_executor, tf.int32, tf.int32) self.assertIsInstance(val, sequence_executor.SequenceExecutorValue) self.assertEqual(str(val.type_signature), str(type_spec)) - self.assertIsInstance(val.internal_representation, - sequence_executor._SequenceMapOp) + self.assertIsInstance(val.reference, sequence_executor._SequenceMapOp) def test_sequence_map_tf_dataset(self): ds = tf.data.Dataset.range(3) @@ -293,8 +282,9 @@ def test_sequence_map_tf_dataset(self): self._sequence_executor.create_call(sequence_map_val, arg_val)) self.assertIsInstance(result_val, sequence_executor.SequenceExecutorValue) self.assertEqual(str(result_val.type_signature), 'int64*') - self.assertIsInstance(result_val.internal_representation, - sequence_executor._SequenceFromMap) + self.assertIsInstance( + result_val.reference, sequence_executor._SequenceFromMap + ) result = list(asyncio.run(result_val.compute())) self.assertListEqual(result, [2, 3, 4]) @@ -337,8 +327,9 @@ def map_fn_2(a, b): self._sequence_executor.create_call(sequence_map_2_val, arg_2_val)) self.assertIsInstance(result_2_val, sequence_executor.SequenceExecutorValue) self.assertEqual(str(result_2_val.type_signature), 'int64*') - self.assertIsInstance(result_2_val.internal_representation, - sequence_executor._SequenceFromMap) + self.assertIsInstance( + result_2_val.reference, sequence_executor._SequenceFromMap + ) result = list(asyncio.run(result_2_val.compute())) self.assertListEqual(result, [5, 7, 9]) diff --git a/tensorflow_federated/python/core/impl/executors/sizing_executor.py b/tensorflow_federated/python/core/impl/executors/sizing_executor.py index 44a0978096..95e8549d0e 100644 --- a/tensorflow_federated/python/core/impl/executors/sizing_executor.py +++ b/tensorflow_federated/python/core/impl/executors/sizing_executor.py @@ -120,22 +120,23 @@ async def create_value(self, value, type_spec=None): async def create_call(self, comp, arg=None): if arg is not None: - target_val = await self._target.create_call(comp.value, arg.value) + target_val = await self._target.create_call(comp.reference, arg.reference) wrapped_val = SizingExecutorValue(self, target_val) return wrapped_val else: - target_val = await self._target.create_call(comp.value) + target_val = await self._target.create_call(comp.reference) wrapped_val = SizingExecutorValue(self, target_val) return wrapped_val async def create_struct(self, elements): target_val = await self._target.create_struct( - structure.map_structure(lambda x: x.value, elements)) + structure.map_structure(lambda x: x.reference, elements) + ) wrapped_val = SizingExecutorValue(self, target_val) return wrapped_val async def create_selection(self, source, index): - target_val = await self._target.create_selection(source.value, index) + target_val = await self._target.create_selection(source.reference, index) wrapped_val = SizingExecutorValue(self, target_val) return wrapped_val @@ -159,7 +160,7 @@ def __init__(self, owner, value): self._value = value @property - def value(self): + def reference(self): return self._value @property diff --git a/tensorflow_federated/python/core/impl/executors/thread_delegating_executor.py b/tensorflow_federated/python/core/impl/executors/thread_delegating_executor.py index eb98fbeedd..ad01e2a565 100644 --- a/tensorflow_federated/python/core/impl/executors/thread_delegating_executor.py +++ b/tensorflow_federated/python/core/impl/executors/thread_delegating_executor.py @@ -11,11 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -# -# pytype: skip-file -# This modules disables the Pytype analyzer, see -# https://github.com/tensorflow/federated/blob/main/docs/pytype.md for more -# information. """A concurrent executor that does work asynchronously in multiple threads.""" from typing import Optional @@ -42,12 +37,12 @@ def __init__(self, value: evb.ExecutorValue, self._async_runner = async_runner @property - def internal_representation(self) -> evb.ExecutorValue: + def reference(self) -> evb.ExecutorValue: return self._value @property def type_signature(self): - return self.internal_representation.type_signature + return self.reference.type_signature async def compute(self): return await _delegate_with_trace_ctx(self._value.compute(), @@ -97,21 +92,21 @@ async def create_call( self, comp: evb.ExecutorValue, arg: Optional[evb.ExecutorValue] = None) -> evb.ExecutorValue: - comp = comp.internal_representation - arg = arg.internal_representation if arg else None + comp = comp.reference + arg = arg.reference if arg else None return await self._delegate(self._target_executor.create_call(comp, arg)) @tracing.trace async def create_struct(self, elements): elements_as_structure = structure.from_container(elements) elements_iter = structure.iter_elements(elements_as_structure) - pairs = ((n, v.internal_representation) for (n, v) in elements_iter) + pairs = ((n, v.reference) for (n, v) in elements_iter) inner_elements = structure.Struct(pairs) return await self._delegate( self._target_executor.create_struct(inner_elements)) @tracing.trace async def create_selection(self, source, index): - source = source.internal_representation + source = source.reference return await self._delegate( self._target_executor.create_selection(source, index)) diff --git a/tensorflow_federated/python/core/impl/executors/thread_delegating_executor_test.py b/tensorflow_federated/python/core/impl/executors/thread_delegating_executor_test.py index 7422fb2c1c..ff24fe6418 100644 --- a/tensorflow_federated/python/core/impl/executors/thread_delegating_executor_test.py +++ b/tensorflow_federated/python/core/impl/executors/thread_delegating_executor_test.py @@ -46,9 +46,8 @@ class ThreadDelegatingExecutorTest(absltest.TestCase): def _threaded_eager_value_to_numpy(self, value): self.assertIsInstance( value, thread_delegating_executor.ThreadDelegatingExecutorValue) - self.assertIsInstance(value.internal_representation, - eager_tf_executor.EagerValue) - return value.internal_representation.internal_representation.numpy() + self.assertIsInstance(value.reference, eager_tf_executor.EagerValue) + return value.reference.reference.numpy() def test_nondeterminism_with_fake_executor_that_synchronously_sleeps(self): @@ -92,9 +91,7 @@ async def gather_coro(vals): return await asyncio.gather(*vals) results = asyncio.run(gather_coro(vals)) - results = [ - thread_value.internal_representation for thread_value in results - ] + results = [thread_value.reference for thread_value in results] self.assertCountEqual(results, list(range(10))) del executors return test_ex.output diff --git a/tensorflow_federated/python/core/impl/executors/value_serialization.py b/tensorflow_federated/python/core/impl/executors/value_serialization.py index 29b05d44e5..bf76dd266c 100644 --- a/tensorflow_federated/python/core/impl/executors/value_serialization.py +++ b/tensorflow_federated/python/core/impl/executors/value_serialization.py @@ -11,11 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -# -# pytype: skip-file -# This modules disables the Pytype analyzer, see -# https://github.com/tensorflow/federated/blob/main/docs/pytype.md for more -# information. """A set of utility methods for serializing Value protos using pybind11 bindings.""" import collections