首页
视频
资源
登录
原
.netcore3.1 RabbitMq RPC
4677
人阅读
2021/1/28 16:28
总访问:
2591094
评论:
0
收藏:
0
手机
分类:
RabbitMq
![](https://img.tnblog.net/arcimg/hb/585b0f1ffa7f4c2095baa20c175b32a0.png) >#.netcore3.1 RabbitMq RPC [TOC] tn>如果我们需要在远程计算机上运行功能并等待结果怎么办?这种模式通常称为`远程过程调用`或`RPC`。我们举一个简单的例子,如下图所示: ![](https://img.tnblog.net/arcimg/hb/4c27f4d72512448eacf25787df04f171.png) tn>这个就图片讲得就很清楚了,我也不用再讲了。<br/> 但这就引发了一个新问题,在获取结果队列中收到响应后,尚不清楚响应属于哪个请求。<br/> 那就是使用`CorrelationId`属性的时候 。我们将为每个请求将其设置为唯一值。稍后,当我们在获取结果队列中收到消息时,我们将查看该属性,并基于此属性将响应与请求进行匹配。如果看到未知的`CorrelationId`值,则可以安全地丢弃该消息-它不属于我们的请求。 RPC服务器端 ------------ ```csharp var factory = new ConnectionFactory() { HostName = "47.98.187.188", UserName = "bob", Password = "bob" }; // 创建一个链接 using (var connection = factory.CreateConnection()) { // 创建一个通道 using (var channel = connection.CreateModel()) { // 声明离职队列 channel.QueueDeclare( queue: "leave_rpc_queue", durable: true, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(0, 1, false); var consumer = new EventingBasicConsumer(channel); channel.BasicConsume(queue: "leave_rpc_queue", autoAck: false, consumer: consumer); // 处理事件 consumer.Received += (model, ea) => { string response = null; // 获取消息 var body = ea.Body.ToArray(); var props = ea.BasicProperties; var replyProps = channel.CreateBasicProperties(); // 获取客户端的唯一ID replyProps.CorrelationId = props.CorrelationId; try { // 人事做离职处理 var message = Encoding.UTF8.GetString(body); Console.WriteLine("HR Handle:{0}", message); response = "The exit formalities are complete"; } catch (Exception e) { Console.WriteLine("Have Error:" + e.Message); response = ""; } finally { var responseBytes = Encoding.UTF8.GetBytes(response); channel.BasicPublish(string.Empty, routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); } }; Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } ``` ![](https://img.tnblog.net/arcimg/hb/fb5ec1b066ab42e48cf5bdb692307b3e.png) 客户端 ------------ >### 创建Rpc帮助类 ```csharp public class RpcClient { private readonly IConnection connection; private readonly IModel channel; private readonly string replyQueueName; private readonly EventingBasicConsumer consumer; // 关于BlockingCollection可以参考这篇:https://www.cnblogs.com/imxiangzi/articles/2801106.html private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>(); private readonly IBasicProperties props; public RpcClient() { var factory = new ConnectionFactory() { HostName = "47.98.187.188", UserName = "bob", Password = "bob" }; var connection = factory.CreateConnection(); channel = connection.CreateModel(); // 创建临时队列 replyQueueName = channel.QueueDeclare().QueueName; consumer = new EventingBasicConsumer(channel); props = channel.CreateBasicProperties(); // 创建 ID var correlationId = Guid.NewGuid().ToString(); props.CorrelationId = correlationId; // 回调的队列 props.ReplyTo = replyQueueName; // 处理离职结果 consumer.Received += Consumer_Received; } // 处理离职结果的方法 private void Consumer_Received(object sender, BasicDeliverEventArgs ea) { var body = ea.Body.ToArray(); var response = Encoding.UTF8.GetString(body); if (ea.BasicProperties.CorrelationId == props.CorrelationId) { // jiang respQueue.Add(response); } } /// <summary> /// 发送消息处理 /// </summary> /// <param name="message"></param> /// <returns></returns> public string Call(string message) { // 创建离职消息 var messageBytes = Encoding.UTF8.GetBytes(message); // 发送离职请求到人事 channel.BasicPublish( exchange: string.Empty, routingKey: "leave_rpc_queue", basicProperties: props, body: messageBytes); // 绑定临时队列与处理 channel.BasicConsume( consumer: consumer, queue: replyQueueName, autoAck: true ); // 从BlockingCollection中移除一个项。 return respQueue.Take(); } public void Close() { connection.Close(); } } ``` >### Main代码 ```csharp var rpcClient = new RpcClient(); string result = rpcClient.Call("I want to leave"); Console.WriteLine($"Result Handle: {result}"); Console.ReadLine(); ``` >### 运行结果 ![](https://img.tnblog.net/arcimg/hb/78ebddc58fbf46de942ccf8edc21dfe5.png) 有关RPC的声明 ------------ >尽管RPC是计算中非常普遍的模式,但它经常受到批评。当程序员不知道函数调用是本地的还是缓慢的RPC时,就会出现问题。这样的混乱会导致系统变幻莫测,并给调试增加了不必要的复杂性。滥用RPC可能会导致无法维护的意大利面条代码,而不是简化软件。 牢记这一点,请考虑以下建议: - 确保明显的是哪个函数调用是本地的,哪个是远程的。 - 记录您的系统。明确组件之间的依赖关系。 - 处理错误案例。RPC服务器长时间关闭后,客户端应如何反应? 如有疑问,请避免使用RPC。如果可以的话,应该使用异步管道-代替类似RPC的阻塞,将结果异步推送到下一个计算阶段。
欢迎加群讨论技术,1群:677373950(满了,可以加,但通过不了),2群:656732739
👈{{preArticle.title}}
👉{{nextArticle.title}}
评价
{{titleitem}}
{{titleitem}}
{{item.content}}
{{titleitem}}
{{titleitem}}
{{item.content}}
尘叶心繁
这一世以无限游戏为使命!
博主信息
排名
6
文章
6
粉丝
16
评论
8
文章类别
.net后台框架
168篇
linux
17篇
linux中cve
1篇
windows中cve
0篇
资源分享
10篇
Win32
3篇
前端
28篇
传说中的c
4篇
Xamarin
9篇
docker
15篇
容器编排
101篇
grpc
4篇
Go
15篇
yaml模板
1篇
理论
2篇
更多
Sqlserver
4篇
云产品
39篇
git
3篇
Unity
1篇
考证
2篇
RabbitMq
23篇
Harbor
1篇
Ansible
8篇
Jenkins
17篇
Vue
1篇
Ids4
18篇
istio
1篇
架构
2篇
网络
7篇
windbg
4篇
AI
18篇
threejs
2篇
人物
1篇
嵌入式
2篇
python
13篇
HuggingFace
8篇
pytorch
9篇
opencv
6篇
最新文章
最新评价
{{item.articleTitle}}
{{item.blogName}}
:
{{item.content}}
关于我们
ICP备案 :
渝ICP备18016597号-1
网站信息:
2018-2024
TNBLOG.NET
技术交流:
群号656732739
联系我们:
contact@tnblog.net
欢迎加群
欢迎加群交流技术