Java8里CompletableFuture强大的依靠感,简直是无法比拟啊! - 知乎 (zhihu.com)
CompletableFuture 详解 | JavaGuide
可以使用 java.util.concurrent 包中的工具来实现阻塞等待某个事件完成,并在事件完成后进行通知。一个常用的方式是使用 CompletableFuture。下面是一个简单的示例,展示如何实现这一功能:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureExample {
public static void main(String[] args) {
// 创建一个 CompletableFuture 对象
CompletableFuture<String> future = new CompletableFuture<>();
// 模拟异步操作,例如在另一个线程中完成事件
new Thread(() -> {
try {
// 模拟事件处理时间
Thread.sleep(2000);
// 完成事件,并设置返回值
future.complete("事件完成的结果");
} catch (InterruptedException e) {
future.completeExceptionally(e);
}
}).start();
// 主线程中阻塞等待事件完成
try {
System.out.println("等待事件完成...");
// 获取事件完成后的返回值
String result = future.get(3, TimeUnit.SECONDS);//可以设置超时时间
System.out.println("收到通知,事件结果: " + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
dotnet中类似的实现代码:
using System;
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Options;
using MQTTnet.Client.Receiving;
class Program
{
private static IMqttClient _mqttClient;
private static TaskCompletionSource<string> _responseTaskCompletionSource;
static async Task Main(string[] args)
{
var options = new MqttClientOptionsBuilder()
.WithClientId("Client1")
.WithTcpServer("broker.hivemq.com")
.Build();
_mqttClient = new MqttFactory().CreateMqttClient();
_mqttClient.UseApplicationMessageReceivedHandler(OnMessageReceived);
await _mqttClient.ConnectAsync(options, CancellationToken.None);
string message = "Hello MQTT";
string response = await SendMessageAndWaitForResponse("test/topic/request", message, "test/topic/response");
Console.WriteLine($"Received Response: {response}");
await _mqttClient.DisconnectAsync();
}
private static async Task<string> SendMessageAndWaitForResponse(string requestTopic, string message, string responseTopic)
{
_responseTaskCompletionSource = new TaskCompletionSource<string>();
var mqttMessage = new MqttApplicationMessageBuilder()
.WithTopic(requestTopic)
.WithPayload(message)
.WithExactlyOnceQoS()
.WithRetainFlag(false)
.Build();
await _mqttClient.PublishAsync(mqttMessage, CancellationToken.None);
// Wait for the response message
return await _responseTaskCompletionSource.Task;
}
private static void OnMessageReceived(MqttApplicationMessageReceivedEventArgs e)
{
string topic = e.ApplicationMessage.Topic;
string payload = System.Text.Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
// Check if the received message is the response we are waiting for
if (topic == "test/topic/response")
{
_responseTaskCompletionSource.TrySetResult(payload);
}
}
}