commit
fd1b7967b1
979
Cargo.lock
generated
979
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
|
@ -1,4 +1,4 @@
|
||||||
{
|
{
|
||||||
"amqp 0.1.0 (git+https://github.com/grahamc/rust-amqp.git#f9aec2f40aef69a459f26003ce47048f8e2a08d1)": "09k6fl7l0rcwilnckdfv3smiv1ilrwi1jxmrrkjwbrj64lky3jdy",
|
"amqp 0.1.0 (git+https://github.com/grahamc/rust-amqp.git#b58edf8822072688d882966f7427f0a9e67aee78)": "0a0n8h71lnsl2rbi6v1zmy015f7hl91y5mgx3qzxlfrn3pjz8sy1",
|
||||||
"hubcaps 0.3.16 (git+https://github.com/grahamc/hubcaps.git#5e656ba35ab4ee74aa72b3b5c3a62e1bf351ff6a)": "1p7rn8y71fjwfag65437gz7a56pysz9n69smaknvblyxpjdzmh4d"
|
"hubcaps 0.3.16 (git+https://github.com/grahamc/hubcaps.git#5e656ba35ab4ee74aa72b3b5c3a62e1bf351ff6a)": "1p7rn8y71fjwfag65437gz7a56pysz9n69smaknvblyxpjdzmh4d"
|
||||||
}
|
}
|
|
@ -29,4 +29,4 @@ chrono = "0.4.6"
|
||||||
separator = "0.4.1"
|
separator = "0.4.1"
|
||||||
|
|
||||||
async-std = "1.5.0"
|
async-std = "1.5.0"
|
||||||
lapin = "1.0.0-beta4"
|
lapin = "1.0.0-rc6"
|
||||||
|
|
|
@ -18,12 +18,10 @@ use lapin::options::{
|
||||||
ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions,
|
ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions,
|
||||||
};
|
};
|
||||||
use lapin::types::{AMQPValue, FieldTable};
|
use lapin::types::{AMQPValue, FieldTable};
|
||||||
use lapin::{
|
use lapin::{BasicProperties, Channel, Connection, ConnectionProperties, ExchangeKind};
|
||||||
BasicProperties, Channel, CloseOnDrop, Connection, ConnectionProperties, ExchangeKind,
|
|
||||||
};
|
|
||||||
use tracing::{debug, trace};
|
use tracing::{debug, trace};
|
||||||
|
|
||||||
pub fn from_config(cfg: &RabbitMQConfig) -> Result<CloseOnDrop<Connection>, lapin::Error> {
|
pub fn from_config(cfg: &RabbitMQConfig) -> Result<Connection, lapin::Error> {
|
||||||
let mut props = FieldTable::default();
|
let mut props = FieldTable::default();
|
||||||
props.insert(
|
props.insert(
|
||||||
"ofborg_version".into(),
|
"ofborg_version".into(),
|
||||||
|
@ -34,7 +32,7 @@ pub fn from_config(cfg: &RabbitMQConfig) -> Result<CloseOnDrop<Connection>, lapi
|
||||||
task::block_on(Connection::connect(&cfg.as_uri(), opts))
|
task::block_on(Connection::connect(&cfg.as_uri(), opts))
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ChannelExt for CloseOnDrop<Channel> {
|
impl ChannelExt for Channel {
|
||||||
type Error = lapin::Error;
|
type Error = lapin::Error;
|
||||||
|
|
||||||
fn declare_exchange(&mut self, config: ExchangeConfig) -> Result<(), Self::Error> {
|
fn declare_exchange(&mut self, config: ExchangeConfig) -> Result<(), Self::Error> {
|
||||||
|
@ -81,7 +79,7 @@ impl ChannelExt for CloseOnDrop<Channel> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, W: SimpleWorker + 'a> ConsumerExt<'a, W> for CloseOnDrop<Channel> {
|
impl<'a, W: SimpleWorker + 'a> ConsumerExt<'a, W> for Channel {
|
||||||
type Error = lapin::Error;
|
type Error = lapin::Error;
|
||||||
type Handle = Pin<Box<dyn Future<Output = ()> + 'a>>;
|
type Handle = Pin<Box<dyn Future<Output = ()> + 'a>>;
|
||||||
|
|
||||||
|
@ -117,7 +115,7 @@ impl<'a, W: SimpleWorker + 'a> ConsumerExt<'a, W> for CloseOnDrop<Channel> {
|
||||||
|
|
||||||
/// Same as a regular channel, but without prefetching,
|
/// Same as a regular channel, but without prefetching,
|
||||||
/// used for services with multiple instances.
|
/// used for services with multiple instances.
|
||||||
pub struct WorkerChannel(pub CloseOnDrop<Channel>);
|
pub struct WorkerChannel(pub Channel);
|
||||||
|
|
||||||
impl<'a, W: SimpleWorker + 'a> ConsumerExt<'a, W> for WorkerChannel {
|
impl<'a, W: SimpleWorker + 'a> ConsumerExt<'a, W> for WorkerChannel {
|
||||||
type Error = lapin::Error;
|
type Error = lapin::Error;
|
||||||
|
@ -130,12 +128,12 @@ impl<'a, W: SimpleWorker + 'a> ConsumerExt<'a, W> for WorkerChannel {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct ChannelNotificationReceiver<'a> {
|
pub struct ChannelNotificationReceiver<'a> {
|
||||||
channel: &'a mut CloseOnDrop<lapin::Channel>,
|
channel: &'a mut lapin::Channel,
|
||||||
deliver: &'a Delivery,
|
deliver: &'a Delivery,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> ChannelNotificationReceiver<'a> {
|
impl<'a> ChannelNotificationReceiver<'a> {
|
||||||
pub fn new(channel: &'a mut CloseOnDrop<lapin::Channel>, deliver: &'a Delivery) -> Self {
|
pub fn new(channel: &'a mut lapin::Channel, deliver: &'a Delivery) -> Self {
|
||||||
ChannelNotificationReceiver { channel, deliver }
|
ChannelNotificationReceiver { channel, deliver }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -149,7 +147,7 @@ impl<'a> NotificationReceiver for ChannelNotificationReceiver<'a> {
|
||||||
|
|
||||||
// FIXME the consumer trait for SimpleWorker and SimpleNotifyWorker conflict,
|
// FIXME the consumer trait for SimpleWorker and SimpleNotifyWorker conflict,
|
||||||
// but one could probably be implemented in terms of the other instead.
|
// but one could probably be implemented in terms of the other instead.
|
||||||
pub struct NotifyChannel(pub CloseOnDrop<Channel>);
|
pub struct NotifyChannel(pub Channel);
|
||||||
|
|
||||||
impl<'a, W: SimpleNotifyWorker + 'a> ConsumerExt<'a, W> for NotifyChannel {
|
impl<'a, W: SimpleNotifyWorker + 'a> ConsumerExt<'a, W> for NotifyChannel {
|
||||||
type Error = lapin::Error;
|
type Error = lapin::Error;
|
||||||
|
@ -190,7 +188,7 @@ impl<'a, W: SimpleNotifyWorker + 'a> ConsumerExt<'a, W> for NotifyChannel {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn action_deliver(
|
async fn action_deliver(
|
||||||
chan: &CloseOnDrop<Channel>,
|
chan: &Channel,
|
||||||
deliver: &Delivery,
|
deliver: &Delivery,
|
||||||
action: Action,
|
action: Action,
|
||||||
) -> Result<(), lapin::Error> {
|
) -> Result<(), lapin::Error> {
|
||||||
|
|
|
@ -2,7 +2,6 @@ use amqp::protocol::basic;
|
||||||
use amqp::Basic;
|
use amqp::Basic;
|
||||||
use async_std::task;
|
use async_std::task;
|
||||||
use lapin::options::BasicPublishOptions;
|
use lapin::options::BasicPublishOptions;
|
||||||
use lapin::CloseOnDrop;
|
|
||||||
|
|
||||||
include!(concat!(env!("OUT_DIR"), "/events.rs"));
|
include!(concat!(env!("OUT_DIR"), "/events.rs"));
|
||||||
|
|
||||||
|
@ -59,8 +58,8 @@ impl SysEvents for RabbitMQ<amqp::Channel> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RabbitMQ<CloseOnDrop<lapin::Channel>> {
|
impl RabbitMQ<lapin::Channel> {
|
||||||
pub fn from_lapin(identity: &str, channel: CloseOnDrop<lapin::Channel>) -> Self {
|
pub fn from_lapin(identity: &str, channel: lapin::Channel) -> Self {
|
||||||
RabbitMQ {
|
RabbitMQ {
|
||||||
identity: identity.to_owned(),
|
identity: identity.to_owned(),
|
||||||
channel,
|
channel,
|
||||||
|
@ -68,7 +67,7 @@ impl RabbitMQ<CloseOnDrop<lapin::Channel>> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SysEvents for RabbitMQ<CloseOnDrop<lapin::Channel>> {
|
impl SysEvents for RabbitMQ<lapin::Channel> {
|
||||||
fn notify(&mut self, event: Event) {
|
fn notify(&mut self, event: Event) {
|
||||||
let props = lapin::BasicProperties::default().with_content_type("application/json".into());
|
let props = lapin::BasicProperties::default().with_content_type("application/json".into());
|
||||||
task::block_on(async {
|
task::block_on(async {
|
||||||
|
|
Loading…
Reference in a new issue