From 80b46b770b53ac9d6ecd7665e6658a567bff3a23 Mon Sep 17 00:00:00 2001 From: lqlive Date: Mon, 15 Sep 2025 10:54:02 +0800 Subject: [PATCH 01/12] feat: enhance Kubernetes client with watch functionality --- examples/clientset/Program.cs | 14 ++- examples/watch/Program.cs | 24 ++-- .../KubernetesClient.Aot.csproj | 4 +- src/LibKubernetesGenerator/ParamHelper.cs | 22 +++- src/LibKubernetesGenerator/TypeHelper.cs | 10 +- .../templates/Client.cs.template | 82 ++++++++++++-- .../OperationsExtensions.cs.template | 104 +++++++++++++++--- 7 files changed, 220 insertions(+), 40 deletions(-) diff --git a/examples/clientset/Program.cs b/examples/clientset/Program.cs index 9c2cac461..c2a486ef0 100644 --- a/examples/clientset/Program.cs +++ b/examples/clientset/Program.cs @@ -1,4 +1,4 @@ -// See https://aka.ms/new-console-template for more information +// See https://aka.ms/new-console-template for more information using k8s; using k8s.ClientSets; using System.Threading.Tasks; @@ -12,15 +12,21 @@ private static async Task Main(string[] args) var config = KubernetesClientConfiguration.BuildConfigFromConfigFile(); var client = new Kubernetes(config); - ClientSet clientSet = new ClientSet(client); + var clientSet = new ClientSet(client); var list = await clientSet.CoreV1.Pod.ListAsync("default").ConfigureAwait(false); foreach (var item in list) { System.Console.WriteLine(item.Metadata.Name); } - var pod = await clientSet.CoreV1.Pod.GetAsync("test","default").ConfigureAwait(false); + var pod = await clientSet.CoreV1.Pod.GetAsync("test", "default").ConfigureAwait(false); System.Console.WriteLine(pod?.Metadata?.Name); + + var watch = clientSet.CoreV1.Pod.WatchListAsync("default"); + await foreach (var (_, item)in watch.ConfigureAwait(false)) + { + System.Console.WriteLine(item.Metadata.Name); + } } } -} +} \ No newline at end of file diff --git a/examples/watch/Program.cs b/examples/watch/Program.cs index f21f8f88c..fff5f4133 100644 --- a/examples/watch/Program.cs +++ b/examples/watch/Program.cs @@ -1,5 +1,4 @@ using k8s; -using k8s.Models; using System; using System.Threading; using System.Threading.Tasks; @@ -8,9 +7,10 @@ IKubernetes client = new Kubernetes(config); -var podlistResp = client.CoreV1.ListNamespacedPodWithHttpMessagesAsync("default", watch: true); +var podlistResp = client.CoreV1.WatchListNamespacedPodAsync("default"); + // C# 8 required https://docs.microsoft.com/en-us/archive/msdn-magazine/2019/november/csharp-iterating-with-async-enumerables-in-csharp-8 -await foreach (var (type, item) in podlistResp.WatchAsync().ConfigureAwait(false)) +await foreach (var (type, item) in podlistResp.ConfigureAwait(false)) { Console.WriteLine("==on watch event=="); Console.WriteLine(type); @@ -22,14 +22,24 @@ void WatchUsingCallback(IKubernetes client) #pragma warning restore CS8321 // Remove unused private members { - var podlistResp = client.CoreV1.ListNamespacedPodWithHttpMessagesAsync("default", watch: true); - using (podlistResp.Watch((type, item) => + using var podlistResp = client.CoreV1.WatchListNamespacedPod("default"); + podlistResp.OnEvent += (type, item) => { Console.WriteLine("==on watch event=="); Console.WriteLine(type); Console.WriteLine(item.Metadata.Name); Console.WriteLine("==on watch event=="); - })) + }; + podlistResp.OnError += (error) => + { + Console.WriteLine("==on watch error=="); + Console.WriteLine(error.Message); + Console.WriteLine("==on watch error=="); + }; + podlistResp.OnClosed += () => + { + Console.WriteLine("==on watch closed=="); + }; { Console.WriteLine("press ctrl + c to stop watching"); @@ -37,4 +47,4 @@ void WatchUsingCallback(IKubernetes client) Console.CancelKeyPress += (sender, eventArgs) => ctrlc.Set(); ctrlc.Wait(); } -} +} \ No newline at end of file diff --git a/src/KubernetesClient.Aot/KubernetesClient.Aot.csproj b/src/KubernetesClient.Aot/KubernetesClient.Aot.csproj index ced074146..8ceca3b94 100644 --- a/src/KubernetesClient.Aot/KubernetesClient.Aot.csproj +++ b/src/KubernetesClient.Aot/KubernetesClient.Aot.csproj @@ -89,8 +89,8 @@ - - + + diff --git a/src/LibKubernetesGenerator/ParamHelper.cs b/src/LibKubernetesGenerator/ParamHelper.cs index 216c2f357..fcaf030d9 100644 --- a/src/LibKubernetesGenerator/ParamHelper.cs +++ b/src/LibKubernetesGenerator/ParamHelper.cs @@ -3,6 +3,7 @@ using Scriban.Runtime; using System; using System.Linq; +using System.Collections.Generic; namespace LibKubernetesGenerator { @@ -21,6 +22,8 @@ public void RegisterHelper(ScriptObject scriptObject) { scriptObject.Import(nameof(GetModelCtorParam), new Func(GetModelCtorParam)); scriptObject.Import(nameof(IfParamContains), IfParamContains); + scriptObject.Import(nameof(FilterParameters), FilterParameters); + scriptObject.Import(nameof(GetParameterValueForWatch), new Func(GetParameterValueForWatch)); } public static bool IfParamContains(OpenApiOperation operation, string name) @@ -39,6 +42,23 @@ public static bool IfParamContains(OpenApiOperation operation, string name) return found; } + public static IEnumerable FilterParameters(OpenApiOperation operation, string excludeParam) + { + return operation.Parameters.Where(p => p.Name != excludeParam); + } + + public string GetParameterValueForWatch(OpenApiParameter parameter, bool watch, string init = "false") + { + if (parameter.Name == "watch") + { + return watch ? "true" : "false"; + } + else + { + return generalNameHelper.GetDotNetNameOpenApiParameter(parameter, init); + } + } + public string GetModelCtorParam(JsonSchema schema) { return string.Join(", ", schema.Properties.Values @@ -57,4 +77,4 @@ public string GetModelCtorParam(JsonSchema schema) })); } } -} +} \ No newline at end of file diff --git a/src/LibKubernetesGenerator/TypeHelper.cs b/src/LibKubernetesGenerator/TypeHelper.cs index 057a363a2..0b6811ae6 100644 --- a/src/LibKubernetesGenerator/TypeHelper.cs +++ b/src/LibKubernetesGenerator/TypeHelper.cs @@ -245,6 +245,14 @@ string toType() } break; + case "T": + // Return single item type from list type (e.g., V1Pod from V1PodList) + return !string.IsNullOrEmpty(t) && t.EndsWith("List", StringComparison.Ordinal) + ? t.Substring(0, t.Length - 4) + : t; + case "TList": + // Return list type as-is + return t; } return t; @@ -284,4 +292,4 @@ public static bool IfType(JsonSchemaProperty property, string type) return false; } } -} +} \ No newline at end of file diff --git a/src/LibKubernetesGenerator/templates/Client.cs.template b/src/LibKubernetesGenerator/templates/Client.cs.template index 02c55d7b1..3f1b569a3 100644 --- a/src/LibKubernetesGenerator/templates/Client.cs.template +++ b/src/LibKubernetesGenerator/templates/Client.cs.template @@ -17,10 +17,11 @@ public partial class {{name}}Client : ResourceClient } {{for api in apis }} + {{~ $filteredParams = FilterParameters api.operation "watch" ~}} /// /// {{ToXmlDoc api.operation.description}} /// - {{ for parameter in api.operation.parameters}} + {{ for parameter in $filteredParams}} /// /// {{ToXmlDoc parameter.description}} /// @@ -29,7 +30,7 @@ public partial class {{name}}Client : ResourceClient /// A which can be used to cancel the asynchronous operation. /// public async Task{{GetReturnType api.operation "<>"}} {{GetActionName api.operation name "Async"}}( - {{ for parameter in api.operation.parameters}} + {{ for parameter in $filteredParams}} {{GetDotNetTypeOpenApiParameter parameter}} {{GetDotNetNameOpenApiParameter parameter "true"}}, {{ end }} CancellationToken cancellationToken = default(CancellationToken)) @@ -37,7 +38,7 @@ public partial class {{name}}Client : ResourceClient {{if IfReturnType api.operation "stream"}} var _result = await Client.{{group}}.{{GetOperationId api.operation "WithHttpMessagesAsync"}}( {{ for parameter in api.operation.parameters}} - {{GetDotNetNameOpenApiParameter parameter "false"}}, + {{GetParameterValueForWatch parameter false}}, {{end}} null, cancellationToken); @@ -47,7 +48,7 @@ public partial class {{name}}Client : ResourceClient {{if IfReturnType api.operation "obj"}} using (var _result = await Client.{{group}}.{{GetOperationId api.operation "WithHttpMessagesAsync"}}( {{ for parameter in api.operation.parameters}} - {{GetDotNetNameOpenApiParameter parameter "false"}}, + {{GetParameterValueForWatch parameter false}}, {{end}} null, cancellationToken).ConfigureAwait(false)) @@ -58,7 +59,7 @@ public partial class {{name}}Client : ResourceClient {{if IfReturnType api.operation "void"}} using (var _result = await Client.{{group}}.{{GetOperationId api.operation "WithHttpMessagesAsync"}}( {{ for parameter in api.operation.parameters}} - {{GetDotNetNameOpenApiParameter parameter "false"}}, + {{GetParameterValueForWatch parameter false}}, {{end}} null, cancellationToken).ConfigureAwait(false)) @@ -71,7 +72,7 @@ public partial class {{name}}Client : ResourceClient /// /// {{ToXmlDoc api.operation.description}} /// - {{ for parameter in api.operation.parameters}} + {{ for parameter in $filteredParams}} /// /// {{ToXmlDoc parameter.description}} /// @@ -80,14 +81,14 @@ public partial class {{name}}Client : ResourceClient /// A which can be used to cancel the asynchronous operation. /// public async Task {{GetActionName api.operation name "Async"}}( - {{ for parameter in api.operation.parameters}} + {{ for parameter in $filteredParams}} {{GetDotNetTypeOpenApiParameter parameter}} {{GetDotNetNameOpenApiParameter parameter "false"}}, {{ end }} CancellationToken cancellationToken = default(CancellationToken)) { using (var _result = await Client.{{group}}.{{GetOperationId api.operation "WithHttpMessagesAsync"}}( {{ for parameter in api.operation.parameters}} - {{GetDotNetNameOpenApiParameter parameter "false"}}, + {{GetParameterValueForWatch parameter false}}, {{end}} null, cancellationToken).ConfigureAwait(false)) @@ -96,5 +97,68 @@ public partial class {{name}}Client : ResourceClient } } {{end}} + + {{if IfParamContains api.operation "watch"}} + /// + /// Watch {{ToXmlDoc api.operation.description}} + /// + {{ for parameter in $filteredParams}} + /// + /// {{ToXmlDoc parameter.description}} + /// + {{ end }} + /// Callback when any event raised from api server + /// Callback when any exception was caught during watching + /// Callback when the server closes the connection + public Watcher<{{GetReturnType api.operation "T"}}> Watch{{GetActionName api.operation name ""}}( + {{ for parameter in $filteredParams}} + {{GetDotNetTypeOpenApiParameter parameter}} {{GetDotNetNameOpenApiParameter parameter "true"}}, + {{ end }} + Action onEvent = null, + Action onError = null, + Action onClosed = null) + { + if (onEvent == null) throw new ArgumentNullException(nameof(onEvent)); + + var responseTask = Client.{{group}}.{{GetOperationId api.operation "WithHttpMessagesAsync"}}( + {{ for parameter in api.operation.parameters}} + {{GetParameterValueForWatch parameter true}}, + {{ end }} + null, + CancellationToken.None); + + return responseTask.Watch<{{GetReturnType api.operation "T"}}, {{GetReturnType api.operation "TList"}}>( + onEvent, onError, onClosed); + } + + /// + /// Watch {{ToXmlDoc api.operation.description}} as async enumerable + /// + {{ for parameter in $filteredParams}} + /// + /// {{ToXmlDoc parameter.description}} + /// + {{ end }} + /// Callback when any exception was caught during watching + /// Cancellation token + public IAsyncEnumerable<(WatchEventType, {{GetReturnType api.operation "T"}})> Watch{{GetActionName api.operation name "Async"}}( + {{ for parameter in $filteredParams}} + {{GetDotNetTypeOpenApiParameter parameter}} {{GetDotNetNameOpenApiParameter parameter "true"}}, + {{ end }} + Action onError = null, + CancellationToken cancellationToken = default) + { + var responseTask = Client.{{group}}.{{GetOperationId api.operation "WithHttpMessagesAsync"}}( + {{ for parameter in api.operation.parameters}} + {{GetParameterValueForWatch parameter true}}, + {{ end }} + null, + cancellationToken); + + return responseTask.WatchAsync<{{GetReturnType api.operation "T"}}, {{GetReturnType api.operation "TList"}}>( + onError, cancellationToken); + } {{end}} -} + + {{end}} +} \ No newline at end of file diff --git a/src/LibKubernetesGenerator/templates/OperationsExtensions.cs.template b/src/LibKubernetesGenerator/templates/OperationsExtensions.cs.template index b05f0e243..78a7cabab 100644 --- a/src/LibKubernetesGenerator/templates/OperationsExtensions.cs.template +++ b/src/LibKubernetesGenerator/templates/OperationsExtensions.cs.template @@ -12,26 +12,27 @@ namespace k8s; public static partial class {{name}}OperationsExtensions { {{for api in apis }} + {{~ $filteredParams = FilterParameters api.operation "watch" ~}} /// /// {{ToXmlDoc api.operation.description}} /// /// /// The operations group for this extension method. /// - {{ for parameter in api.operation.parameters}} + {{ for parameter in $filteredParams}} /// - /// {{ToXmlDoc api.description}} + /// {{ToXmlDoc parameter.description}} /// {{ end }} public static {{GetReturnType api.operation "void"}} {{GetOperationId api.operation ""}}( this I{{name}}Operations operations -{{ for parameter in api.operation.parameters}} +{{ for parameter in $filteredParams}} ,{{GetDotNetTypeOpenApiParameter parameter}} {{GetDotNetNameOpenApiParameter parameter "true"}} {{end}} ) { {{GetReturnType api.operation "return"}} operations.{{GetOperationId api.operation "Async"}}( -{{ for parameter in api.operation.parameters}} +{{ for parameter in $filteredParams}} {{GetDotNetNameOpenApiParameter parameter "false"}}, {{end}} CancellationToken.None @@ -45,20 +46,20 @@ public static partial class {{name}}OperationsExtensions /// /// The operations group for this extension method. /// - {{ for parameter in api.operation.parameters}} + {{ for parameter in $filteredParams}} /// /// {{ToXmlDoc parameter.description}} /// {{end}} public static T {{GetOperationId api.operation ""}}( this I{{name}}Operations operations -{{ for parameter in api.operation.parameters}} +{{ for parameter in $filteredParams}} ,{{GetDotNetTypeOpenApiParameter parameter}} {{GetDotNetNameOpenApiParameter parameter "true"}} {{end}} ) { return operations.{{GetOperationId api.operation "Async"}}( -{{ for parameter in api.operation.parameters}} +{{ for parameter in $filteredParams}} {{GetDotNetNameOpenApiParameter parameter "false"}}, {{end}} CancellationToken.None @@ -72,7 +73,7 @@ public static partial class {{name}}OperationsExtensions /// /// The operations group for this extension method. /// - {{ for parameter in api.operation.parameters}} + {{ for parameter in $filteredParams}} /// /// {{ToXmlDoc parameter.description}} /// @@ -82,7 +83,7 @@ public static partial class {{name}}OperationsExtensions /// public static async Task{{GetReturnType api.operation "<>"}} {{GetOperationId api.operation "Async"}}( this I{{name}}Operations operations, -{{ for parameter in api.operation.parameters}} +{{ for parameter in $filteredParams}} {{GetDotNetTypeOpenApiParameter parameter}} {{GetDotNetNameOpenApiParameter parameter "true"}}, {{ end }} CancellationToken cancellationToken = default(CancellationToken)) @@ -90,7 +91,7 @@ public static partial class {{name}}OperationsExtensions {{if IfReturnType api.operation "stream"}} var _result = await operations.{{GetOperationId api.operation "WithHttpMessagesAsync"}}( {{ for parameter in api.operation.parameters}} - {{GetDotNetNameOpenApiParameter parameter "false"}}, + {{GetParameterValueForWatch parameter false}}, {{end}} null, cancellationToken); @@ -100,7 +101,7 @@ public static partial class {{name}}OperationsExtensions {{if IfReturnType api.operation "obj"}} using (var _result = await operations.{{GetOperationId api.operation "WithHttpMessagesAsync"}}( {{ for parameter in api.operation.parameters}} - {{GetDotNetNameOpenApiParameter parameter "false"}}, + {{GetParameterValueForWatch parameter false}}, {{end}} null, cancellationToken).ConfigureAwait(false)) @@ -111,7 +112,7 @@ public static partial class {{name}}OperationsExtensions {{if IfReturnType api.operation "void"}} using (var _result = await operations.{{GetOperationId api.operation "WithHttpMessagesAsync"}}( {{ for parameter in api.operation.parameters}} - {{GetDotNetNameOpenApiParameter parameter "false"}}, + {{GetParameterValueForWatch parameter false}}, {{end}} null, cancellationToken).ConfigureAwait(false)) @@ -127,7 +128,7 @@ public static partial class {{name}}OperationsExtensions /// /// The operations group for this extension method. /// - {{ for parameter in api.operation.parameters}} + {{ for parameter in $filteredParams}} /// /// {{ToXmlDoc parameter.description}} /// @@ -137,14 +138,14 @@ public static partial class {{name}}OperationsExtensions /// public static async Task {{GetOperationId api.operation "Async"}}( this I{{name}}Operations operations, -{{ for parameter in api.operation.parameters}} +{{ for parameter in $filteredParams}} {{GetDotNetTypeOpenApiParameter parameter}} {{GetDotNetNameOpenApiParameter parameter "true"}}, {{ end }} CancellationToken cancellationToken = default(CancellationToken)) { using (var _result = await operations.{{GetOperationId api.operation "WithHttpMessagesAsync"}}( {{ for parameter in api.operation.parameters}} - {{GetDotNetNameOpenApiParameter parameter "false"}}, + {{GetParameterValueForWatch parameter false}}, {{end}} null, cancellationToken).ConfigureAwait(false)) @@ -154,5 +155,76 @@ public static partial class {{name}}OperationsExtensions } {{end}} - {{end}} +{{if IfParamContains api.operation "watch"}} +{{~ $filteredParams = FilterParameters api.operation "watch" ~}} +/// +/// Watch {{ToXmlDoc api.operation.description}} +/// +/// +/// The operations group for this extension method. +/// +{{ for parameter in $filteredParams}} +/// +/// {{ToXmlDoc parameter.description}} +/// +{{ end }} +/// Callback when any event raised from api server +/// Callback when any exception was caught during watching +/// Callback when the server closes the connection +public static Watcher<{{GetReturnType api.operation "T"}}> Watch{{GetOperationId api.operation ""}}( + this I{{name}}Operations operations, +{{ for parameter in $filteredParams}} + {{GetDotNetTypeOpenApiParameter parameter}} {{GetDotNetNameOpenApiParameter parameter "true"}}, +{{end}} + Action onEvent = null, + Action onError = null, + Action onClosed = null) +{ + if (onEvent == null) throw new ArgumentNullException(nameof(onEvent)); + + var responseTask = operations.{{GetOperationId api.operation "WithHttpMessagesAsync"}}( +{{ for parameter in api.operation.parameters}} + {{GetParameterValueForWatch parameter true}}, +{{end}} + null, + CancellationToken.None); + + return responseTask.Watch<{{GetReturnType api.operation "T"}}, {{GetReturnType api.operation "TList"}}>( + onEvent, onError, onClosed); } + +/// +/// Watch {{ToXmlDoc api.operation.description}} as async enumerable +/// +/// +/// The operations group for this extension method. +/// +{{ for parameter in $filteredParams}} +/// +/// {{ToXmlDoc parameter.description}} +/// +{{ end }} +/// Callback when any exception was caught during watching +/// Cancellation token +public static IAsyncEnumerable<(WatchEventType, {{GetReturnType api.operation "T"}})> Watch{{GetOperationId api.operation "Async"}}( + this I{{name}}Operations operations, +{{ for parameter in $filteredParams}} + {{GetDotNetTypeOpenApiParameter parameter}} {{GetDotNetNameOpenApiParameter parameter "true"}}, +{{end}} + Action onError = null, + CancellationToken cancellationToken = default) +{ + var responseTask = operations.{{GetOperationId api.operation "WithHttpMessagesAsync"}}( +{{ for parameter in api.operation.parameters}} + {{GetParameterValueForWatch parameter true}}, +{{end}} + null, + cancellationToken); + + return responseTask.WatchAsync<{{GetReturnType api.operation "T"}}, {{GetReturnType api.operation "TList"}}>( + onError, cancellationToken); +} +{{end}} + + {{end}} +} \ No newline at end of file From 6979dd5a3c57f82cc1302d8514fb8ed7c8da4ae8 Mon Sep 17 00:00:00 2001 From: lqlive Date: Tue, 16 Sep 2025 09:41:35 +0800 Subject: [PATCH 02/12] refactor: simplify watch event handling in Kubernetes client example --- examples/clientset/Program.cs | 3 +-- examples/watch/Program.cs | 15 ++------------- 2 files changed, 3 insertions(+), 15 deletions(-) diff --git a/examples/clientset/Program.cs b/examples/clientset/Program.cs index c2a486ef0..1b479d878 100644 --- a/examples/clientset/Program.cs +++ b/examples/clientset/Program.cs @@ -1,5 +1,4 @@ -// See https://aka.ms/new-console-template for more information -using k8s; +using k8s; using k8s.ClientSets; using System.Threading.Tasks; diff --git a/examples/watch/Program.cs b/examples/watch/Program.cs index fff5f4133..dbfbf4dc7 100644 --- a/examples/watch/Program.cs +++ b/examples/watch/Program.cs @@ -22,24 +22,13 @@ void WatchUsingCallback(IKubernetes client) #pragma warning restore CS8321 // Remove unused private members { - using var podlistResp = client.CoreV1.WatchListNamespacedPod("default"); - podlistResp.OnEvent += (type, item) => + using var podlistResp = client.CoreV1.WatchListNamespacedPod("default", onEvent: (type, item) => { Console.WriteLine("==on watch event=="); Console.WriteLine(type); Console.WriteLine(item.Metadata.Name); Console.WriteLine("==on watch event=="); - }; - podlistResp.OnError += (error) => - { - Console.WriteLine("==on watch error=="); - Console.WriteLine(error.Message); - Console.WriteLine("==on watch error=="); - }; - podlistResp.OnClosed += () => - { - Console.WriteLine("==on watch closed=="); - }; + }); { Console.WriteLine("press ctrl + c to stop watching"); From 136b10349393cdd5debad0c1f6c1d44cb6115e3c Mon Sep 17 00:00:00 2001 From: lqlive Date: Wed, 17 Sep 2025 15:14:07 +0800 Subject: [PATCH 03/12] refactor: update Kubernetes watch functionality to use new event handling methods and add async enumerable support --- examples/watch/Program.cs | 4 +- .../templates/IOperations.cs.template | 3 + tests/E2E.Tests/MinikubeTests.cs | 85 +++- tests/KubernetesClient.Tests/WatchTests.cs | 385 +++++++++++++++--- 4 files changed, 421 insertions(+), 56 deletions(-) diff --git a/examples/watch/Program.cs b/examples/watch/Program.cs index dbfbf4dc7..1aff65883 100644 --- a/examples/watch/Program.cs +++ b/examples/watch/Program.cs @@ -22,13 +22,13 @@ void WatchUsingCallback(IKubernetes client) #pragma warning restore CS8321 // Remove unused private members { - using var podlistResp = client.CoreV1.WatchListNamespacedPod("default", onEvent: (type, item) => + using (var podlistResp = client.CoreV1.WatchListNamespacedPod("default", onEvent: (type, item) => { Console.WriteLine("==on watch event=="); Console.WriteLine(type); Console.WriteLine(item.Metadata.Name); Console.WriteLine("==on watch event=="); - }); + })) { Console.WriteLine("press ctrl + c to stop watching"); diff --git a/src/LibKubernetesGenerator/templates/IOperations.cs.template b/src/LibKubernetesGenerator/templates/IOperations.cs.template index 6904b8b91..425ecf1ea 100644 --- a/src/LibKubernetesGenerator/templates/IOperations.cs.template +++ b/src/LibKubernetesGenerator/templates/IOperations.cs.template @@ -25,6 +25,9 @@ public partial interface I{{name}}Operations /// /// A which can be used to cancel the asynchronous operation. /// + {{if IfParamContains api.operation "watch"}} + [Obsolete("This method will be deprecated in future versions. Please use the Watch method instead.", false)] + {{end}} Task"}}> {{GetOperationId api.operation "WithHttpMessagesAsync"}}( {{ for parameter in api.operation.parameters}} {{GetDotNetTypeOpenApiParameter parameter}} {{GetDotNetNameOpenApiParameter parameter "true"}}, diff --git a/tests/E2E.Tests/MinikubeTests.cs b/tests/E2E.Tests/MinikubeTests.cs index 547ee90ea..59656b517 100644 --- a/tests/E2E.Tests/MinikubeTests.cs +++ b/tests/E2E.Tests/MinikubeTests.cs @@ -224,13 +224,12 @@ public async Task WatcherIntegrationTest() var started = new AsyncManualResetEvent(); var connectionClosed = new AsyncManualResetEvent(); - var watcher = kubernetes.BatchV1.ListNamespacedJobWithHttpMessagesAsync( + var watcher = kubernetes.BatchV1.WatchListNamespacedJob( job.Metadata.NamespaceProperty, fieldSelector: $"metadata.name={job.Metadata.Name}", resourceVersion: job.Metadata.ResourceVersion, timeoutSeconds: 30, - watch: true).Watch( - (type, source) => + onEvent: (type, source) => { Debug.WriteLine($"Watcher 1: {type}, {source}"); events.Add(new Tuple(type, source)); @@ -250,6 +249,86 @@ public async Task WatcherIntegrationTest() new V1DeleteOptions() { PropagationPolicy = "Foreground" }).ConfigureAwait(false); } + [MinikubeFact] + public async Task WatcherIntegrationTestAsyncEnumerable() + { + using var kubernetes = CreateClient(); + + var job = await kubernetes.BatchV1.CreateNamespacedJobAsync( + new V1Job() + { + ApiVersion = "batch/v1", + Kind = V1Job.KubeKind, + Metadata = new V1ObjectMeta() { Name = nameof(WatcherIntegrationTestAsyncEnumerable).ToLowerInvariant() }, + Spec = new V1JobSpec() + { + Template = new V1PodTemplateSpec() + { + Spec = new V1PodSpec() + { + Containers = new List() + { + new V1Container() + { + Image = "ubuntu", + Name = "runner", + Command = new List() { "/bin/bash", "-c", "--" }, + Args = new List() + { + "trap : TERM INT; sleep infinity & wait", + }, + }, + }, + RestartPolicy = "Never", + }, + }, + }, + }, + "default").ConfigureAwait(false); + + var events = new Collection>(); + + var started = new AsyncManualResetEvent(); + var watchCompleted = new AsyncManualResetEvent(); + + // Start async enumerable watch in background task to mimic callback behavior + var watchTask = Task.Run(async () => + { + try + { + await foreach (var (type, source) in kubernetes.BatchV1.WatchListNamespacedJobAsync( + job.Metadata.NamespaceProperty, + fieldSelector: $"metadata.name={job.Metadata.Name}", + resourceVersion: job.Metadata.ResourceVersion, + timeoutSeconds: 30).ConfigureAwait(false)) + { + Debug.WriteLine($"AsyncEnumerable Watcher: {type}, {source}"); + events.Add(new Tuple(type, source)); + job = source; + started.Set(); + } + } + catch (Exception ex) + { + Debug.WriteLine($"Watch exception: {ex.GetType().FullName}: {ex.Message}"); + } + finally + { + watchCompleted.Set(); + } + }); + + await started.WaitAsync().ConfigureAwait(false); + + await Task.WhenAny(watchCompleted.WaitAsync(), Task.Delay(TimeSpan.FromMinutes(3))).ConfigureAwait(false); + Assert.True(watchCompleted.IsSet); + + var st = await kubernetes.BatchV1.DeleteNamespacedJobAsync( + job.Metadata.Name, + job.Metadata.NamespaceProperty, + new V1DeleteOptions() { PropagationPolicy = "Foreground" }).ConfigureAwait(false); + } + [MinikubeFact] public void LeaderIntegrationTest() { diff --git a/tests/KubernetesClient.Tests/WatchTests.cs b/tests/KubernetesClient.Tests/WatchTests.cs index 3b3a695d3..dbb76a33d 100644 --- a/tests/KubernetesClient.Tests/WatchTests.cs +++ b/tests/KubernetesClient.Tests/WatchTests.cs @@ -54,24 +54,30 @@ public async Task CannotWatch() var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() }); // did not pass watch param - var listTask = client.CoreV1.ListNamespacedPodWithHttpMessagesAsync("default", watch: true); var onErrorCalled = false; - using (listTask.Watch((type, item) => { }, e => { onErrorCalled = true; })) + using (var watcher = client.CoreV1.WatchListNamespacedPod( + "default", + onEvent: (type, item) => { }, + onError: e => { onErrorCalled = true; })) { - await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(true); // delay for onerror to be called + // delay for onerror to be called + await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(true); } Assert.True(onErrorCalled); - // server did not response line by line await Assert.ThrowsAnyAsync(() => { - return client.CoreV1.ListNamespacedPodWithHttpMessagesAsync("default", watch: true); + using (var testWatcher = client.CoreV1.WatchListNamespacedPod( + "default")) + { + return Task.CompletedTask; + } // this line did not throw - // listTask.Watch((type, item) => { }); + // using (var testWatcher = client.CoreV1.WatchListNamespacedPod("default", onEvent: (type, item) => { })) }).ConfigureAwait(true); } } @@ -93,8 +99,7 @@ public async Task AsyncWatcher() var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() }); - var listTask = client.CoreV1.ListNamespacedPodWithHttpMessagesAsync("default", watch: true); - using (listTask.Watch((type, item) => { eventsReceived.Set(); })) + using (var watcher = client.CoreV1.WatchListNamespacedPod("default", onEvent: (type, item) => { eventsReceived.Set(); })) { // here watcher is ready to use, but http server has not responsed yet. created.Set(); @@ -134,27 +139,26 @@ public async Task SurviveBadLine() { var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() }); - var listTask = await client.CoreV1.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).ConfigureAwait(true); - var events = new HashSet(); var errors = 0; - var watcher = listTask.Watch( - (type, item) => + var watcher = client.CoreV1.WatchListNamespacedPod( + "default", + onEvent: (type, item) => { testOutput.WriteLine($"Watcher received '{type}' event."); events.Add(type); eventsReceived.Signal(); }, - error => + onError: error => { testOutput.WriteLine($"Watcher received '{error.GetType().FullName}' error."); errors += 1; eventsReceived.Signal(); }, - connectionClosed.Set); + onClosed: connectionClosed.Set); // wait server yields all events await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true); @@ -195,17 +199,16 @@ public async Task DisposeWatch() { var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() }); - var listTask = await client.CoreV1.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).ConfigureAwait(true); - var events = new HashSet(); - var watcher = listTask.Watch( - (type, item) => + var watcher = client.CoreV1.WatchListNamespacedPod( + "default", + onEvent: (type, item) => { events.Add(type); eventsReceived.Signal(); }, - error => + onError: error => { testOutput.WriteLine($"Watcher received '{error.GetType().FullName}' error."); }, @@ -255,27 +258,26 @@ public async Task WatchAllEvents() { var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() }); - var listTask = await client.CoreV1.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).ConfigureAwait(true); - var events = new HashSet(); var errors = 0; - var watcher = listTask.Watch( - (type, item) => + var watcher = client.CoreV1.WatchListNamespacedPod( + "default", + onEvent: (type, item) => { testOutput.WriteLine($"Watcher received '{type}' event."); events.Add(type); eventsReceived.Signal(); }, - error => + onError: error => { testOutput.WriteLine($"Watcher received '{error.GetType().FullName}' error."); errors += 1; eventsReceived.Signal(); }, - waitForClosed.Set); + onClosed: waitForClosed.Set); // wait server yields all events await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true); @@ -324,27 +326,26 @@ public async Task WatchEventsWithTimeout() { var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() }); - var listTask = await client.CoreV1.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).ConfigureAwait(true); - var events = new HashSet(); var errors = 0; - var watcher = listTask.Watch( - (type, item) => + var watcher = client.CoreV1.WatchListNamespacedPod( + "default", + onEvent: (type, item) => { testOutput.WriteLine($"Watcher received '{type}' event."); events.Add(type); eventsReceived.Signal(); }, - error => + onError: error => { testOutput.WriteLine($"Watcher received '{error.GetType().FullName}' error."); errors += 1; eventsReceived.Signal(); }, - connectionClosed.Set); + onClosed: connectionClosed.Set); // wait server yields all events await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true); @@ -386,18 +387,17 @@ public async Task WatchServerDisconnect() { var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() }); - var listTask = await client.CoreV1.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).ConfigureAwait(true); - waitForException.Set(); Watcher watcher; - watcher = listTask.Watch( - (type, item) => { }, - e => + watcher = client.CoreV1.WatchListNamespacedPod( + "default", + onEvent: (type, item) => { }, + onError: e => { exceptionCatched = e; exceptionReceived.Set(); }, - waitForClosed.Set); + onClosed: waitForClosed.Set); // wait server down await Task.WhenAny(exceptionReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true); @@ -456,12 +456,11 @@ public async Task TestWatchWithHandlers() Assert.False(handler1.Called); Assert.False(handler2.Called); - var listTask = await client.CoreV1.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).ConfigureAwait(true); - var events = new HashSet(); - var watcher = listTask.Watch( - (type, item) => + var watcher = client.CoreV1.WatchListNamespacedPod( + "default", + onEvent: (type, item) => { events.Add(type); eventsReceived.Signal(); @@ -507,7 +506,9 @@ public async Task DirectWatchAllEvents() var events = new HashSet(); var errors = 0; - var watcher = client.CoreV1.ListNamespacedPodWithHttpMessagesAsync("default", fieldSelector: $"metadata.name=${"myPod"}", watch: true).Watch( + var watcher = client.CoreV1.WatchListNamespacedPod( + "default", + fieldSelector: $"metadata.name=${"myPod"}", onEvent: (type, item) => { @@ -554,10 +555,10 @@ public async Task EnsureTimeoutWorks() { using var server = new MockKubeApiServer(testOutput, async httpContext => { - await Task.Delay(TimeSpan.FromSeconds(120)).ConfigureAwait(true); // The default timeout is 100 seconds - await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse).ConfigureAwait(true); + await Task.Delay(TimeSpan.FromSeconds(120)).ConfigureAwait(true); // The default timeout is 100 seconds + await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse).ConfigureAwait(true); - return false; + return false; }); // raw timeout @@ -608,7 +609,9 @@ public async Task DirectWatchEventsWithTimeout() var events = new HashSet(); var errors = 0; - var watcher = client.CoreV1.ListNamespacedPodWithHttpMessagesAsync("default", fieldSelector: $"metadata.name=${"myPod"}", watch: true).Watch( + var watcher = client.CoreV1.WatchListNamespacedPod( + "default", + fieldSelector: $"metadata.name=${"myPod"}", onEvent: (type, item) => { @@ -656,7 +659,6 @@ public async Task WatchShouldCancelAfterRequested() httpContext.Response.StatusCode = 200; await httpContext.Response.Body.FlushAsync().ConfigureAwait(true); await Task.Delay(TimeSpan.FromSeconds(5)).ConfigureAwait(true); // The default timeout is 100 seconds - return true; }, resp: "")) { @@ -667,8 +669,10 @@ public async Task WatchShouldCancelAfterRequested() await Assert.ThrowsAnyAsync(async () => { - await client.CoreV1.ListNamespacedPodWithHttpMessagesAsync("default", watch: true, - cancellationToken: cts.Token).ConfigureAwait(true); + using var watcher = client.CoreV1.WatchListNamespacedPod( + "default", + onEvent: (type, item) => { }); + await Task.Delay(TimeSpan.FromSeconds(5), cts.Token).ConfigureAwait(true); }).ConfigureAwait(true); } } @@ -729,7 +733,7 @@ protected override Task SendAsync(HttpRequestMessage reques } [Fact] - public async Task MustHttp2VersionSet() + public void MustHttp2VersionSet() { var server = new MockKubeApiServer(testOutput, async httpContext => { @@ -742,8 +746,287 @@ public async Task MustHttp2VersionSet() new KubernetesClientConfiguration { Host = server.Uri.ToString() }, handler); Assert.Null(handler.Version); - await client.CoreV1.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).ConfigureAwait(true); + using var watcher = client.CoreV1.WatchListNamespacedPod("default", onEvent: (type, item) => { }); Assert.Equal(HttpVersion.Version20, handler.Version); } + + [Fact] + public async Task AsyncEnumerableWatchAllEvents() + { + var eventsReceived = new AsyncCountdownEvent(4); + var serverShutdown = new AsyncManualResetEvent(); + var watchCompleted = new AsyncManualResetEvent(); + + using (var server = new MockKubeApiServer(testOutput, async httpContext => + { + await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(true); + await WriteStreamLine(httpContext, MockDeletedStreamLine).ConfigureAwait(true); + await WriteStreamLine(httpContext, MockModifiedStreamLine).ConfigureAwait(true); + await WriteStreamLine(httpContext, MockErrorStreamLine).ConfigureAwait(true); + + // make server alive, cannot set to int.max as of it would block response + await serverShutdown.WaitAsync().ConfigureAwait(true); + return false; + })) + { + var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() }); + + var events = new HashSet(); + var errors = 0; + + // Start async enumerable watch in background task + var watchTask = Task.Run(async () => + { + try + { + await foreach (var (type, item) in client.CoreV1.WatchListNamespacedPodAsync("default").ConfigureAwait(false)) + { + testOutput.WriteLine($"AsyncEnumerable Watcher received '{type}' event."); + events.Add(type); + eventsReceived.Signal(); + + // Break when we have all expected events + if (events.Count >= 4) + { + break; + } + } + } + catch (Exception ex) + { + testOutput.WriteLine($"AsyncEnumerable Watcher received exception: {ex.GetType().FullName}"); + errors++; + eventsReceived.Signal(); + } + finally + { + watchCompleted.Set(); + } + }); + + // wait server yields all events + await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true); + + Assert.True( + eventsReceived.CurrentCount == 0, + "Timed out waiting for all events / errors to be received."); + + Assert.Contains(WatchEventType.Added, events); + Assert.Contains(WatchEventType.Deleted, events); + Assert.Contains(WatchEventType.Modified, events); + Assert.Contains(WatchEventType.Error, events); + + Assert.Equal(0, errors); + + serverShutdown.Set(); + + await Task.WhenAny(watchCompleted.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true); + Assert.True(watchCompleted.IsSet); + } + } + + [Fact] + public async Task AsyncEnumerableWatchWithCancellation() + { + var eventsReceived = new AsyncCountdownEvent(2); + var serverShutdown = new AsyncManualResetEvent(); + + using (var server = new MockKubeApiServer(testOutput, async httpContext => + { + await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(true); + await WriteStreamLine(httpContext, MockModifiedStreamLine).ConfigureAwait(true); + + // Keep server alive + await serverShutdown.WaitAsync().ConfigureAwait(true); + return false; + })) + { + var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() }); + + var events = new HashSet(); + var cts = new CancellationTokenSource(); + + var watchTask = Task.Run(async () => + { + try + { + await foreach (var (type, item) in client.CoreV1.WatchListNamespacedPodAsync("default", cancellationToken: cts.Token).ConfigureAwait(false)) + { + testOutput.WriteLine($"AsyncEnumerable Watcher received '{type}' event."); + events.Add(type); + eventsReceived.Signal(); + } + } + catch (OperationCanceledException) + { + testOutput.WriteLine("AsyncEnumerable Watcher was cancelled as expected."); + } + }); + + // Wait for some events to be received + await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true); + + Assert.True( + eventsReceived.CurrentCount == 0, + "Timed out waiting for events to be received."); + + Assert.Contains(WatchEventType.Added, events); + Assert.Contains(WatchEventType.Modified, events); + + // Cancel the watch + cts.Cancel(); + + // Wait for watch task to complete + await Task.WhenAny(watchTask, Task.Delay(TimeSpan.FromSeconds(5))).ConfigureAwait(true); + Assert.True(watchTask.IsCompletedSuccessfully || watchTask.IsCanceled); + + serverShutdown.Set(); + } + } + + [Fact] + public async Task AsyncEnumerableWatchWithFieldSelector() + { + var eventsReceived = new AsyncCountdownEvent(3); + var serverShutdown = new AsyncManualResetEvent(); + var watchCompleted = new AsyncManualResetEvent(); + + using (var server = new MockKubeApiServer(testOutput, async httpContext => + { + await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(true); + await WriteStreamLine(httpContext, MockDeletedStreamLine).ConfigureAwait(true); + await WriteStreamLine(httpContext, MockModifiedStreamLine).ConfigureAwait(true); + + await serverShutdown.WaitAsync().ConfigureAwait(true); + return false; + })) + { + var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() }); + + var events = new List<(WatchEventType, V1Pod)>(); + + var watchTask = Task.Run(async () => + { + try + { + await foreach (var (type, item) in client.CoreV1.WatchListNamespacedPodAsync( + "default", + fieldSelector: $"metadata.name={"testPod"}").ConfigureAwait(false)) + { + testOutput.WriteLine($"AsyncEnumerable Watcher received '{type}' event for pod '{item?.Metadata?.Name}'."); + events.Add((type, item)); + eventsReceived.Signal(); + + if (events.Count >= 3) + { + break; + } + } + } + catch (Exception ex) + { + testOutput.WriteLine($"AsyncEnumerable Watcher received exception: {ex.GetType().FullName}"); + } + finally + { + watchCompleted.Set(); + } + }); + + // Wait for events + await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true); + + Assert.True( + eventsReceived.CurrentCount == 0, + "Timed out waiting for all events to be received."); + + Assert.Equal(3, events.Count); + Assert.Contains(events, e => e.Item1 == WatchEventType.Added); + Assert.Contains(events, e => e.Item1 == WatchEventType.Deleted); + Assert.Contains(events, e => e.Item1 == WatchEventType.Modified); + + serverShutdown.Set(); + + await Task.WhenAny(watchCompleted.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true); + Assert.True(watchCompleted.IsSet); + } + } + + [Fact] + public async Task AsyncEnumerableWatchErrorHandling() + { + var eventsReceived = new AsyncCountdownEvent(3); + var serverShutdown = new AsyncManualResetEvent(); + var watchCompleted = new AsyncManualResetEvent(); + var errorReceived = new AsyncManualResetEvent(); + + using (var server = new MockKubeApiServer(testOutput, async httpContext => + { + await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse).ConfigureAwait(true); + await WriteStreamLine(httpContext, MockBadStreamLine).ConfigureAwait(true); + await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(true); + + await serverShutdown.WaitAsync().ConfigureAwait(true); + return false; + })) + { + var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() }); + + var events = new List<(WatchEventType, V1Pod)>(); + var errorCaught = false; + + var watchTask = Task.Run(async () => + { + try + { + await foreach (var (type, item) in client.CoreV1.WatchListNamespacedPodAsync( + "default", + onError: ex => + { + testOutput.WriteLine($"AsyncEnumerable Watcher onError called: {ex.GetType().FullName}"); + errorCaught = true; + errorReceived.Set(); + eventsReceived.Signal(); + }).ConfigureAwait(false)) + { + testOutput.WriteLine($"AsyncEnumerable Watcher received '{type}' event."); + events.Add((type, item)); + eventsReceived.Signal(); + + // Expect some valid events plus error handling + if (events.Count >= 2) + { + break; + } + } + } + catch (Exception ex) + { + testOutput.WriteLine($"AsyncEnumerable Watcher caught exception: {ex.GetType().FullName}"); + } + finally + { + watchCompleted.Set(); + } + }); + + // Wait for events and errors + await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true); + + Assert.True( + eventsReceived.CurrentCount == 0, + "Timed out waiting for events and errors to be received."); + + // Should have received at least one valid event and one error + Assert.True(events.Count >= 1, "Should have received at least one valid event"); + Assert.True(errorCaught, "Should have caught parsing error"); + Assert.True(errorReceived.IsSet, "Error callback should have been called"); + + serverShutdown.Set(); + + await Task.WhenAny(watchCompleted.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true); + Assert.True(watchCompleted.IsSet); + } + } } } From f9a3d9cdcf76ec5b91d0ccf9a4ab0935f034e811 Mon Sep 17 00:00:00 2001 From: lqlive Date: Wed, 17 Sep 2025 15:17:09 +0800 Subject: [PATCH 04/12] fix --- tests/KubernetesClient.Tests/WatchTests.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/KubernetesClient.Tests/WatchTests.cs b/tests/KubernetesClient.Tests/WatchTests.cs index dbb76a33d..4950a577e 100644 --- a/tests/KubernetesClient.Tests/WatchTests.cs +++ b/tests/KubernetesClient.Tests/WatchTests.cs @@ -555,10 +555,10 @@ public async Task EnsureTimeoutWorks() { using var server = new MockKubeApiServer(testOutput, async httpContext => { - await Task.Delay(TimeSpan.FromSeconds(120)).ConfigureAwait(true); // The default timeout is 100 seconds - await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse).ConfigureAwait(true); + await Task.Delay(TimeSpan.FromSeconds(120)).ConfigureAwait(true); // The default timeout is 100 seconds + await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse).ConfigureAwait(true); - return false; + return false; }); // raw timeout From 201be855fdf79a3609fda84c04ff358663d002d3 Mon Sep 17 00:00:00 2001 From: lqlive Date: Wed, 17 Sep 2025 15:20:14 +0800 Subject: [PATCH 05/12] fix --- tests/KubernetesClient.Tests/WatchTests.cs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/KubernetesClient.Tests/WatchTests.cs b/tests/KubernetesClient.Tests/WatchTests.cs index 4950a577e..c84ba5bea 100644 --- a/tests/KubernetesClient.Tests/WatchTests.cs +++ b/tests/KubernetesClient.Tests/WatchTests.cs @@ -61,8 +61,7 @@ public async Task CannotWatch() onEvent: (type, item) => { }, onError: e => { onErrorCalled = true; })) { - // delay for onerror to be called - await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(true); + await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(true); // delay for onerror to be called } Assert.True(onErrorCalled); From 9ac1f9cfef360dabc018d5c7791b0646f160a5d6 Mon Sep 17 00:00:00 2001 From: lqlive Date: Thu, 18 Sep 2025 09:45:29 +0800 Subject: [PATCH 06/12] fix: correct usage of Pod list items in client example and update Obsolete attribute formatting --- examples/clientset/Program.cs | 4 ++-- src/LibKubernetesGenerator/templates/IOperations.cs.template | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/examples/clientset/Program.cs b/examples/clientset/Program.cs index 5744d4fc1..b1b62f25a 100644 --- a/examples/clientset/Program.cs +++ b/examples/clientset/Program.cs @@ -1,4 +1,4 @@ -using k8s; +using k8s; using k8s.ClientSets; using System.Threading.Tasks; @@ -13,7 +13,7 @@ private static async Task Main(string[] args) var clientSet = new ClientSet(client); var list = await clientSet.CoreV1.Pod.ListAsync("default").ConfigureAwait(false); - foreach (var item in list) + foreach (var item in list.Items) { System.Console.WriteLine(item.Metadata.Name); } diff --git a/src/LibKubernetesGenerator/templates/IOperations.cs.template b/src/LibKubernetesGenerator/templates/IOperations.cs.template index 425ecf1ea..5d10e0f8a 100644 --- a/src/LibKubernetesGenerator/templates/IOperations.cs.template +++ b/src/LibKubernetesGenerator/templates/IOperations.cs.template @@ -26,8 +26,8 @@ public partial interface I{{name}}Operations /// A which can be used to cancel the asynchronous operation. /// {{if IfParamContains api.operation "watch"}} - [Obsolete("This method will be deprecated in future versions. Please use the Watch method instead.", false)] - {{end}} + [Obsolete("This method will be deprecated in future versions. Please use the Watch method instead.")] + {{ end }} Task"}}> {{GetOperationId api.operation "WithHttpMessagesAsync"}}( {{ for parameter in api.operation.parameters}} {{GetDotNetTypeOpenApiParameter parameter}} {{GetDotNetNameOpenApiParameter parameter "true"}}, From 4d752ed39430334194c810f45b756887fa10f191 Mon Sep 17 00:00:00 2001 From: lqlive Date: Tue, 23 Sep 2025 10:31:01 +0800 Subject: [PATCH 07/12] fix: update client example to use correct Pod list method and improve Obsolete attribute formatting --- examples/clientset/Program.cs | 3 ++- src/LibKubernetesGenerator/templates/IOperations.cs.template | 2 +- tests/KubernetesClient.Tests/WatchTests.cs | 4 ++-- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/examples/clientset/Program.cs b/examples/clientset/Program.cs index b1b62f25a..b420baa56 100644 --- a/examples/clientset/Program.cs +++ b/examples/clientset/Program.cs @@ -1,4 +1,5 @@ using k8s; +using k8s.Models; using k8s.ClientSets; using System.Threading.Tasks; @@ -13,7 +14,7 @@ private static async Task Main(string[] args) var clientSet = new ClientSet(client); var list = await clientSet.CoreV1.Pod.ListAsync("default").ConfigureAwait(false); - foreach (var item in list.Items) + foreach (var item in list) { System.Console.WriteLine(item.Metadata.Name); } diff --git a/src/LibKubernetesGenerator/templates/IOperations.cs.template b/src/LibKubernetesGenerator/templates/IOperations.cs.template index 5d10e0f8a..e3ba9c790 100644 --- a/src/LibKubernetesGenerator/templates/IOperations.cs.template +++ b/src/LibKubernetesGenerator/templates/IOperations.cs.template @@ -27,7 +27,7 @@ public partial interface I{{name}}Operations /// {{if IfParamContains api.operation "watch"}} [Obsolete("This method will be deprecated in future versions. Please use the Watch method instead.")] - {{ end }} + {{end}} Task"}}> {{GetOperationId api.operation "WithHttpMessagesAsync"}}( {{ for parameter in api.operation.parameters}} {{GetDotNetTypeOpenApiParameter parameter}} {{GetDotNetNameOpenApiParameter parameter "true"}}, diff --git a/tests/KubernetesClient.Tests/WatchTests.cs b/tests/KubernetesClient.Tests/WatchTests.cs index c84ba5bea..f92376575 100644 --- a/tests/KubernetesClient.Tests/WatchTests.cs +++ b/tests/KubernetesClient.Tests/WatchTests.cs @@ -568,7 +568,7 @@ await Assert.ThrowsAsync(async () => Host = server.Uri.ToString(), HttpClientTimeout = TimeSpan.FromSeconds(5), }); - await client.CoreV1.ListNamespacedPodWithHttpMessagesAsync("default").ConfigureAwait(true); + await client.CoreV1.ListNamespacedPodAsync("default").ConfigureAwait(true); }).ConfigureAwait(true); // cts @@ -580,7 +580,7 @@ await Assert.ThrowsAsync(async () => { Host = server.Uri.ToString(), }); - await client.CoreV1.ListNamespacedPodWithHttpMessagesAsync("default", cancellationToken: cts.Token).ConfigureAwait(true); + await client.CoreV1.ListNamespacedPodAsync("default", cancellationToken: cts.Token).ConfigureAwait(true); }).ConfigureAwait(true); } From 6de55587c4552a305f41e2bee312a3a7de3d9251 Mon Sep 17 00:00:00 2001 From: lqlive Date: Tue, 23 Sep 2025 21:38:49 +0800 Subject: [PATCH 08/12] refactor: enhance type resolution for list items in TypeHelper by adding TryGetItemTypeFromSchema method --- src/LibKubernetesGenerator/TypeHelper.cs | 33 ++++++++++++++++++++---- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/src/LibKubernetesGenerator/TypeHelper.cs b/src/LibKubernetesGenerator/TypeHelper.cs index 0b6811ae6..d14d64cc0 100644 --- a/src/LibKubernetesGenerator/TypeHelper.cs +++ b/src/LibKubernetesGenerator/TypeHelper.cs @@ -246,12 +246,14 @@ string toType() break; case "T": - // Return single item type from list type (e.g., V1Pod from V1PodList) - return !string.IsNullOrEmpty(t) && t.EndsWith("List", StringComparison.Ordinal) - ? t.Substring(0, t.Length - 4) - : t; + var itemType = TryGetItemTypeFromSchema(response); + if (itemType != null) + { + return itemType; + } + + break; case "TList": - // Return list type as-is return t; } @@ -291,5 +293,26 @@ public static bool IfType(JsonSchemaProperty property, string type) return false; } + + private string TryGetItemTypeFromSchema(OpenApiResponse response) + { + var listSchema = response?.Schema?.Reference; + if (listSchema?.Properties?.TryGetValue("items", out var itemsProperty) != true) + { + return null; + } + + if (itemsProperty.Reference != null) + { + return classNameHelper.GetClassNameForSchemaDefinition(itemsProperty.Reference); + } + + if (itemsProperty.Item?.Reference != null) + { + return classNameHelper.GetClassNameForSchemaDefinition(itemsProperty.Item.Reference); + } + + return null; + } } } \ No newline at end of file From c6bda21013a83f02534ba30fc248b2cd9b5719fa Mon Sep 17 00:00:00 2001 From: lqlive Date: Wed, 24 Sep 2025 09:22:54 +0800 Subject: [PATCH 09/12] feat: mark Watch methods as obsolete to prepare for future deprecation --- src/KubernetesClient/WatcherExt.cs | 3 +++ src/LibKubernetesGenerator/templates/IOperations.cs.template | 3 --- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/KubernetesClient/WatcherExt.cs b/src/KubernetesClient/WatcherExt.cs index f3047f8ca..7246b9df2 100644 --- a/src/KubernetesClient/WatcherExt.cs +++ b/src/KubernetesClient/WatcherExt.cs @@ -16,6 +16,7 @@ public static class WatcherExt /// The action to invoke when the server closes the connection. /// /// a watch object + [Obsolete("This method will be deprecated in future versions.")] public static Watcher Watch( this Task> responseTask, Action onEvent, @@ -52,6 +53,7 @@ private static Func> MakeStreamReaderCreator(Task /// a watch object + [Obsolete("This method will be deprecated in future versions.")] public static Watcher Watch( this HttpOperationResponse response, Action onEvent, @@ -71,6 +73,7 @@ public static Watcher Watch( /// a callback when any exception was caught during watching /// cancellation token /// IAsyncEnumerable of watch events + [Obsolete("This method will be deprecated in future versions.")] public static IAsyncEnumerable<(WatchEventType, T)> WatchAsync( this Task> responseTask, Action onError = null, diff --git a/src/LibKubernetesGenerator/templates/IOperations.cs.template b/src/LibKubernetesGenerator/templates/IOperations.cs.template index e3ba9c790..6904b8b91 100644 --- a/src/LibKubernetesGenerator/templates/IOperations.cs.template +++ b/src/LibKubernetesGenerator/templates/IOperations.cs.template @@ -25,9 +25,6 @@ public partial interface I{{name}}Operations /// /// A which can be used to cancel the asynchronous operation. /// - {{if IfParamContains api.operation "watch"}} - [Obsolete("This method will be deprecated in future versions. Please use the Watch method instead.")] - {{end}} Task"}}> {{GetOperationId api.operation "WithHttpMessagesAsync"}}( {{ for parameter in api.operation.parameters}} {{GetDotNetTypeOpenApiParameter parameter}} {{GetDotNetNameOpenApiParameter parameter "true"}}, From 85d7011e65ecb74b105fccbf585c09136fd78b72 Mon Sep 17 00:00:00 2001 From: lqlive Date: Sun, 28 Sep 2025 20:55:48 +0800 Subject: [PATCH 10/12] fix --- src/KubernetesClient.Aot/KubernetesClient.Aot.csproj | 4 ++-- src/LibKubernetesGenerator/templates/Client.cs.template | 3 ++- .../templates/OperationsExtensions.cs.template | 3 ++- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/KubernetesClient.Aot/KubernetesClient.Aot.csproj b/src/KubernetesClient.Aot/KubernetesClient.Aot.csproj index 595a03e6a..55305a1a1 100644 --- a/src/KubernetesClient.Aot/KubernetesClient.Aot.csproj +++ b/src/KubernetesClient.Aot/KubernetesClient.Aot.csproj @@ -87,8 +87,8 @@ - - + + diff --git a/src/LibKubernetesGenerator/templates/Client.cs.template b/src/LibKubernetesGenerator/templates/Client.cs.template index 3f1b569a3..f9ce845e5 100644 --- a/src/LibKubernetesGenerator/templates/Client.cs.template +++ b/src/LibKubernetesGenerator/templates/Client.cs.template @@ -98,6 +98,7 @@ public partial class {{name}}Client : ResourceClient } {{end}} +#if !K8S_AOT {{if IfParamContains api.operation "watch"}} /// /// Watch {{ToXmlDoc api.operation.description}} @@ -159,6 +160,6 @@ public partial class {{name}}Client : ResourceClient onError, cancellationToken); } {{end}} - +#endif {{end}} } \ No newline at end of file diff --git a/src/LibKubernetesGenerator/templates/OperationsExtensions.cs.template b/src/LibKubernetesGenerator/templates/OperationsExtensions.cs.template index 78a7cabab..7544d235d 100644 --- a/src/LibKubernetesGenerator/templates/OperationsExtensions.cs.template +++ b/src/LibKubernetesGenerator/templates/OperationsExtensions.cs.template @@ -155,6 +155,7 @@ public static partial class {{name}}OperationsExtensions } {{end}} +#if !K8S_AOT {{if IfParamContains api.operation "watch"}} {{~ $filteredParams = FilterParameters api.operation "watch" ~}} /// @@ -225,6 +226,6 @@ public static IAsyncEnumerable<(WatchEventType, {{GetReturnType api.operation "T onError, cancellationToken); } {{end}} - +#endif {{end}} } \ No newline at end of file From 0877710ff4a9d5ce4953366d7fd0ca51aa859ecd Mon Sep 17 00:00:00 2001 From: lqlive Date: Mon, 29 Sep 2025 19:36:30 +0800 Subject: [PATCH 11/12] refactor: update WatcherExt class to internal and remove obsolete attributes; improve example method signature in Program.cs --- examples/clientset/Program.cs | 2 +- src/KubernetesClient/WatcherExt.cs | 5 +---- src/LibKubernetesGenerator/TypeHelper.cs | 3 +-- tests/KubernetesClient.Tests/WatchTests.cs | 3 ++- 4 files changed, 5 insertions(+), 8 deletions(-) diff --git a/examples/clientset/Program.cs b/examples/clientset/Program.cs index b420baa56..a1b74e0f8 100644 --- a/examples/clientset/Program.cs +++ b/examples/clientset/Program.cs @@ -23,7 +23,7 @@ private static async Task Main(string[] args) System.Console.WriteLine(pod?.Metadata?.Name); var watch = clientSet.CoreV1.Pod.WatchListAsync("default"); - await foreach (var (_, item)in watch.ConfigureAwait(false)) + await foreach (var (_, item) in watch.ConfigureAwait(false)) { System.Console.WriteLine(item.Metadata.Name); } diff --git a/src/KubernetesClient/WatcherExt.cs b/src/KubernetesClient/WatcherExt.cs index 7246b9df2..f82301cf1 100644 --- a/src/KubernetesClient/WatcherExt.cs +++ b/src/KubernetesClient/WatcherExt.cs @@ -2,7 +2,7 @@ namespace k8s { - public static class WatcherExt + internal static class WatcherExt { /// /// create a watch object from a call to api server with watch=true @@ -16,7 +16,6 @@ public static class WatcherExt /// The action to invoke when the server closes the connection. /// /// a watch object - [Obsolete("This method will be deprecated in future versions.")] public static Watcher Watch( this Task> responseTask, Action onEvent, @@ -53,7 +52,6 @@ private static Func> MakeStreamReaderCreator(Task /// a watch object - [Obsolete("This method will be deprecated in future versions.")] public static Watcher Watch( this HttpOperationResponse response, Action onEvent, @@ -73,7 +71,6 @@ public static Watcher Watch( /// a callback when any exception was caught during watching /// cancellation token /// IAsyncEnumerable of watch events - [Obsolete("This method will be deprecated in future versions.")] public static IAsyncEnumerable<(WatchEventType, T)> WatchAsync( this Task> responseTask, Action onError = null, diff --git a/src/LibKubernetesGenerator/TypeHelper.cs b/src/LibKubernetesGenerator/TypeHelper.cs index d14d64cc0..db27dab97 100644 --- a/src/LibKubernetesGenerator/TypeHelper.cs +++ b/src/LibKubernetesGenerator/TypeHelper.cs @@ -122,7 +122,6 @@ private string GetDotNetType(JsonSchema schema, JsonSchemaProperty parent) return $"IDictionary"; } - if (schema?.Reference != null) { return classNameHelper.GetClassNameForSchemaDefinition(schema.Reference); @@ -306,7 +305,7 @@ private string TryGetItemTypeFromSchema(OpenApiResponse response) { return classNameHelper.GetClassNameForSchemaDefinition(itemsProperty.Reference); } - + if (itemsProperty.Item?.Reference != null) { return classNameHelper.GetClassNameForSchemaDefinition(itemsProperty.Item.Reference); diff --git a/tests/KubernetesClient.Tests/WatchTests.cs b/tests/KubernetesClient.Tests/WatchTests.cs index f92376575..b3f151106 100644 --- a/tests/KubernetesClient.Tests/WatchTests.cs +++ b/tests/KubernetesClient.Tests/WatchTests.cs @@ -732,7 +732,7 @@ protected override Task SendAsync(HttpRequestMessage reques } [Fact] - public void MustHttp2VersionSet() + public async Task MustHttp2VersionSet() { var server = new MockKubeApiServer(testOutput, async httpContext => { @@ -747,6 +747,7 @@ public void MustHttp2VersionSet() Assert.Null(handler.Version); using var watcher = client.CoreV1.WatchListNamespacedPod("default", onEvent: (type, item) => { }); Assert.Equal(HttpVersion.Version20, handler.Version); + await Task.CompletedTask.ConfigureAwait(true); } [Fact] From 09a77173a7f86a27a2e8423f6bcdedb938751ce8 Mon Sep 17 00:00:00 2001 From: lqlive Date: Fri, 10 Oct 2025 09:28:11 +0800 Subject: [PATCH 12/12] refactor: change WatcherExt class from internal to public and mark methods as obsolete for future deprecation --- src/KubernetesClient/WatcherExt.cs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/KubernetesClient/WatcherExt.cs b/src/KubernetesClient/WatcherExt.cs index f82301cf1..28863341f 100644 --- a/src/KubernetesClient/WatcherExt.cs +++ b/src/KubernetesClient/WatcherExt.cs @@ -2,7 +2,7 @@ namespace k8s { - internal static class WatcherExt + public static class WatcherExt { /// /// create a watch object from a call to api server with watch=true @@ -16,6 +16,7 @@ internal static class WatcherExt /// The action to invoke when the server closes the connection. /// /// a watch object + [Obsolete("This method will be deprecated in future versions. Please use the Watch method instead.")] public static Watcher Watch( this Task> responseTask, Action onEvent, @@ -52,6 +53,7 @@ private static Func> MakeStreamReaderCreator(Task /// a watch object + [Obsolete("This method will be deprecated in future versions. Please use the Watch method instead.")] public static Watcher Watch( this HttpOperationResponse response, Action onEvent, @@ -71,6 +73,7 @@ public static Watcher Watch( /// a callback when any exception was caught during watching /// cancellation token /// IAsyncEnumerable of watch events + [Obsolete("This method will be deprecated in future versions. Please use the WatchAsync method instead.")] public static IAsyncEnumerable<(WatchEventType, T)> WatchAsync( this Task> responseTask, Action onError = null,