use std::{fmt, str::FromStr, sync::Mutex, time};
use futures::{
channel::mpsc, channel::oneshot, executor, future::BoxFuture, future::Either, pin_mut, Future,
FutureExt, Stream, StreamExt,
};
use crate::global;
use crate::sdk::trace::Span;
use crate::{
sdk::export::trace::{ExportResult, SpanData, SpanExporter},
trace::{TraceError, TraceResult},
Context,
};
const OTEL_BSP_SCHEDULE_DELAY_MILLIS: &str = "OTEL_BSP_SCHEDULE_DELAY_MILLIS";
const OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT: u64 = 5000;
const OTEL_BSP_MAX_QUEUE_SIZE: &str = "OTEL_BSP_MAX_QUEUE_SIZE";
const OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT: usize = 2048;
const OTEL_BSP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BSP_MAX_EXPORT_BATCH_SIZE";
const OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512;
const OTEL_BSP_EXPORT_TIMEOUT_MILLIS: &str = "OTEL_BSP_EXPORT_TIMEOUT_MILLIS";
const OTEL_BSP_EXPORT_TIMEOUT_MILLIS_DEFAULT: u64 = 30000;
pub trait SpanProcessor: Send + Sync + std::fmt::Debug {
fn on_start(&self, span: &Span, cx: &Context);
fn on_end(&self, span: SpanData);
fn force_flush(&self) -> TraceResult<()>;
fn shutdown(&mut self) -> TraceResult<()>;
}
#[derive(Debug)]
pub struct SimpleSpanProcessor {
exporter: Mutex<Box<dyn SpanExporter>>,
}
impl SimpleSpanProcessor {
pub(crate) fn new(exporter: Box<dyn SpanExporter>) -> Self {
SimpleSpanProcessor {
exporter: Mutex::new(exporter),
}
}
}
impl SpanProcessor for SimpleSpanProcessor {
fn on_start(&self, _span: &Span, _cx: &Context) {
}
fn on_end(&self, span: SpanData) {
if let Ok(mut exporter) = self.exporter.lock() {
let _result = executor::block_on(exporter.export(vec![span]));
} else {
global::handle_error(TraceError::from("When export span with the SimpleSpanProcessor, the exporter's lock has been poisoned"));
}
}
fn force_flush(&self) -> TraceResult<()> {
Ok(())
}
fn shutdown(&mut self) -> TraceResult<()> {
if let Ok(mut exporter) = self.exporter.lock() {
exporter.shutdown();
Ok(())
} else {
Err(TraceError::Other(
"When shutting down the SimpleSpanProcessor, the exporter's lock has been poisoned"
.into(),
))
}
}
}
pub struct BatchSpanProcessor {
message_sender: Mutex<mpsc::Sender<BatchMessage>>,
}
impl fmt::Debug for BatchSpanProcessor {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BatchSpanProcessor")
.field("message_sender", &self.message_sender)
.finish()
}
}
impl SpanProcessor for BatchSpanProcessor {
fn on_start(&self, _span: &Span, _cx: &Context) {
}
fn on_end(&self, span: SpanData) {
if let Ok(mut sender) = self.message_sender.lock() {
let _ = sender.try_send(BatchMessage::ExportSpan(span));
}
}
fn force_flush(&self) -> TraceResult<()> {
let mut sender = self.message_sender.lock().map_err(|_| TraceError::from("When force flushing the BatchSpanProcessor, the message sender's lock has been poisoned"))?;
let (res_sender, res_receiver) = oneshot::channel::<Vec<ExportResult>>();
sender.try_send(BatchMessage::Flush(Some(res_sender)))?;
for result in futures::executor::block_on(res_receiver)? {
result?;
}
Ok(())
}
fn shutdown(&mut self) -> TraceResult<()> {
let mut sender = self.message_sender.lock().map_err(|_| TraceError::from("When shutting down the BatchSpanProcessor, the message sender's lock has been poisoned"))?;
let (res_sender, res_receiver) = oneshot::channel::<Vec<ExportResult>>();
sender.try_send(BatchMessage::Shutdown(res_sender))?;
for result in futures::executor::block_on(res_receiver)? {
result?;
}
Ok(())
}
}
#[derive(Debug)]
enum BatchMessage {
ExportSpan(SpanData),
Flush(Option<oneshot::Sender<Vec<ExportResult>>>),
Shutdown(oneshot::Sender<Vec<ExportResult>>),
}
impl BatchSpanProcessor {
pub(crate) fn new<S, SH, SO, I, IS, ISI, D, DS>(
mut exporter: Box<dyn SpanExporter>,
spawn: S,
interval: I,
delay: D,
config: BatchConfig,
) -> Self
where
S: Fn(BoxFuture<'static, ()>) -> SH,
SH: Future<Output = SO> + Send + Sync + 'static,
I: Fn(time::Duration) -> IS,
IS: Stream<Item = ISI> + Send + 'static,
D: (Fn(time::Duration) -> DS) + Send + Sync + 'static,
DS: Future<Output = ()> + 'static + Send + Sync,
{
let (message_sender, message_receiver) = mpsc::channel(config.max_queue_size);
let ticker = interval(config.scheduled_delay).map(|_| BatchMessage::Flush(None));
let _worker_handle = spawn(Box::pin(async move {
let mut spans = Vec::new();
let mut messages = Box::pin(futures::stream::select(message_receiver, ticker));
while let Some(message) = messages.next().await {
match message {
BatchMessage::ExportSpan(span) => {
if spans.len() < config.max_queue_size {
spans.push(span);
}
}
BatchMessage::Flush(Some(ch)) => {
let mut results =
Vec::with_capacity(spans.len() / config.max_export_batch_size + 1);
while !spans.is_empty() {
let batch = spans.split_off(
spans.len().saturating_sub(config.max_export_batch_size),
);
results.push(
export_with_timeout(
config.max_export_timeout,
exporter.as_mut(),
&delay,
batch,
).await,
);
}
let send_result = ch.send(results);
if send_result.is_err() {
global::handle_error(TraceError::from("fail to send the export response from worker handle in BatchProcessor"))
}
}
BatchMessage::Flush(None) => {
while !spans.is_empty() {
let batch = spans.split_off(
spans.len().saturating_sub(config.max_export_batch_size),
);
let _result = export_with_timeout(
config.max_export_timeout,
exporter.as_mut(),
&delay,
batch,
).await;
}
}
BatchMessage::Shutdown(ch) => {
let mut results =
Vec::with_capacity(spans.len() / config.max_export_batch_size + 1);
while !spans.is_empty() {
let batch = spans.split_off(
spans.len().saturating_sub(config.max_export_batch_size),
);
results.push(
export_with_timeout(
config.max_export_timeout,
exporter.as_mut(),
&delay,
batch,
).await,
);
}
exporter.shutdown();
let send_result = ch.send(results);
if send_result.is_err() {
global::handle_error(TraceError::from("fail to send the export response from worker handle in BatchProcessor"))
}
break;
}
}
}
})).map(|_| ());
BatchSpanProcessor {
message_sender: Mutex::new(message_sender),
}
}
pub fn builder<E, S, SH, SO, I, IO, D, DS>(
exporter: E,
spawn: S,
delay: D,
interval: I,
) -> BatchSpanProcessorBuilder<E, S, I, D>
where
E: SpanExporter,
S: Fn(BoxFuture<'static, ()>) -> SH,
SH: Future<Output = SO> + Send + Sync + 'static,
I: Fn(time::Duration) -> IO,
D: (Fn(time::Duration) -> DS) + Send + Sync + 'static,
DS: Future<Output = ()> + 'static + Send + Sync,
{
BatchSpanProcessorBuilder {
exporter,
spawn,
interval,
delay,
config: Default::default(),
}
}
pub fn from_env<E, S, SH, SO, I, IO, D, DS>(
exporter: E,
spawn: S,
interval: I,
delay: D,
) -> BatchSpanProcessorBuilder<E, S, I, D>
where
E: SpanExporter,
S: Fn(BoxFuture<'static, ()>) -> SH,
SH: Future<Output = SO> + Send + Sync + 'static,
I: Fn(time::Duration) -> IO,
D: (Fn(time::Duration) -> DS) + Send + Sync + 'static,
DS: Future<Output = ()> + 'static + Send + Sync,
{
let mut config = BatchConfig::default();
let schedule_delay = std::env::var(OTEL_BSP_SCHEDULE_DELAY_MILLIS)
.map(|delay| u64::from_str(&delay).unwrap_or(OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT))
.unwrap_or(OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT);
config.scheduled_delay = time::Duration::from_millis(schedule_delay);
let max_queue_size = std::env::var(OTEL_BSP_MAX_QUEUE_SIZE)
.map(|queue_size| {
usize::from_str(&queue_size).unwrap_or(OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT)
})
.unwrap_or(OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT);
config.max_queue_size = max_queue_size;
let max_export_batch_size = std::env::var(OTEL_BSP_MAX_EXPORT_BATCH_SIZE)
.map(|batch_size| {
usize::from_str(&batch_size).unwrap_or(OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT)
})
.unwrap_or(OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT);
if max_export_batch_size > max_queue_size {
config.max_export_batch_size = max_queue_size;
} else {
config.max_export_batch_size = max_export_batch_size;
}
let max_export_time_out = std::env::var(OTEL_BSP_EXPORT_TIMEOUT_MILLIS)
.map(|timeout| {
u64::from_str(&timeout).unwrap_or(OTEL_BSP_EXPORT_TIMEOUT_MILLIS_DEFAULT)
})
.unwrap_or(OTEL_BSP_EXPORT_TIMEOUT_MILLIS_DEFAULT);
config.max_export_timeout = time::Duration::from_millis(max_export_time_out);
BatchSpanProcessorBuilder {
config,
exporter,
spawn,
delay,
interval,
}
}
}
async fn export_with_timeout<D, DS, E>(
time_out: time::Duration,
exporter: &mut E,
delay: &D,
batch: Vec<SpanData>,
) -> ExportResult
where
D: (Fn(time::Duration) -> DS) + Send + Sync + 'static,
DS: Future<Output = ()> + 'static + Send + Sync,
E: SpanExporter + ?Sized,
{
let export = exporter.export(batch);
let timeout = delay(time_out);
pin_mut!(export);
pin_mut!(timeout);
match futures::future::select(export, timeout).await {
Either::Left((export_res, _)) => export_res,
Either::Right((_, _)) => ExportResult::Err(TraceError::ExportTimedOut(time_out)),
}
}
#[derive(Debug)]
pub struct BatchConfig {
max_queue_size: usize,
scheduled_delay: time::Duration,
max_export_batch_size: usize,
max_export_timeout: time::Duration,
}
impl Default for BatchConfig {
fn default() -> Self {
BatchConfig {
max_queue_size: OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT,
scheduled_delay: time::Duration::from_millis(OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT),
max_export_batch_size: OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
max_export_timeout: time::Duration::from_millis(OTEL_BSP_EXPORT_TIMEOUT_MILLIS_DEFAULT),
}
}
}
#[derive(Debug)]
pub struct BatchSpanProcessorBuilder<E, S, I, D> {
exporter: E,
interval: I,
spawn: S,
delay: D,
config: BatchConfig,
}
impl<E, S, SH, SO, I, IS, ISI, D, DS> BatchSpanProcessorBuilder<E, S, I, D>
where
E: SpanExporter + 'static,
S: Fn(BoxFuture<'static, ()>) -> SH,
SH: Future<Output = SO> + Send + Sync + 'static,
I: Fn(time::Duration) -> IS,
IS: Stream<Item = ISI> + Send + 'static,
D: (Fn(time::Duration) -> DS) + Send + Sync + 'static,
DS: Future<Output = ()> + 'static + Send + Sync,
{
pub fn with_max_queue_size(self, size: usize) -> Self {
let mut config = self.config;
config.max_queue_size = size;
BatchSpanProcessorBuilder { config, ..self }
}
pub fn with_scheduled_delay(self, delay: time::Duration) -> Self {
let mut config = self.config;
config.scheduled_delay = delay;
BatchSpanProcessorBuilder { config, ..self }
}
pub fn with_max_timeout(self, timeout: time::Duration) -> Self {
let mut config = self.config;
config.max_export_timeout = timeout;
BatchSpanProcessorBuilder { config, ..self }
}
pub fn with_max_export_batch_size(self, size: usize) -> Self {
let mut config = self.config;
if size > config.max_queue_size {
config.max_export_batch_size = config.max_queue_size;
} else {
config.max_export_batch_size = size;
}
BatchSpanProcessorBuilder { config, ..self }
}
pub fn build(self) -> BatchSpanProcessor {
BatchSpanProcessor::new(
Box::new(self.exporter),
self.spawn,
self.interval,
self.delay,
self.config,
)
}
}
#[cfg(all(test, feature = "testing", feature = "trace"))]
mod tests {
use std::fmt::Debug;
use std::time;
use std::time::Duration;
use async_trait::async_trait;
use crate::sdk::export::trace::{stdout, ExportResult, SpanData, SpanExporter};
use crate::sdk::trace::span_processor::OTEL_BSP_EXPORT_TIMEOUT_MILLIS;
use crate::sdk::trace::BatchConfig;
use crate::testing::trace::{
new_test_export_span_data, new_test_exporter, new_tokio_test_exporter,
};
use crate::util::tokio_interval_stream;
use futures::Future;
use super::{
BatchSpanProcessor, SimpleSpanProcessor, SpanProcessor, OTEL_BSP_MAX_EXPORT_BATCH_SIZE,
OTEL_BSP_MAX_QUEUE_SIZE, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BSP_SCHEDULE_DELAY_MILLIS,
OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT,
};
#[test]
fn simple_span_processor_on_end_calls_export() {
let (exporter, rx_export, _rx_shutdown) = new_test_exporter();
let processor = SimpleSpanProcessor::new(Box::new(exporter));
processor.on_end(new_test_export_span_data());
assert!(rx_export.try_recv().is_ok());
}
#[test]
fn simple_span_processor_shutdown_calls_shutdown() {
let (exporter, _rx_export, rx_shutdown) = new_test_exporter();
let mut processor = SimpleSpanProcessor::new(Box::new(exporter));
let _result = processor.shutdown();
assert!(rx_shutdown.try_recv().is_ok());
}
#[test]
fn test_build_batch_span_processor_from_env() {
std::env::set_var(OTEL_BSP_MAX_EXPORT_BATCH_SIZE, "500");
std::env::set_var(OTEL_BSP_EXPORT_TIMEOUT_MILLIS, "2046");
std::env::set_var(OTEL_BSP_SCHEDULE_DELAY_MILLIS, "I am not number");
let mut builder = BatchSpanProcessor::from_env(
stdout::Exporter::new(std::io::stdout(), true),
tokio::spawn,
tokio_interval_stream,
tokio::time::sleep,
);
assert_eq!(builder.config.max_export_batch_size, 500);
assert_eq!(
builder.config.scheduled_delay,
time::Duration::from_millis(OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT)
);
assert_eq!(
builder.config.max_queue_size,
OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT
);
assert_eq!(
builder.config.max_export_timeout,
time::Duration::from_millis(2046)
);
std::env::set_var(OTEL_BSP_MAX_QUEUE_SIZE, "120");
builder = BatchSpanProcessor::from_env(
stdout::Exporter::new(std::io::stdout(), true),
tokio::spawn,
tokio_interval_stream,
tokio::time::sleep,
);
assert_eq!(builder.config.max_export_batch_size, 120);
assert_eq!(builder.config.max_queue_size, 120);
}
#[tokio::test]
async fn test_batch_span_processor() {
let (exporter, mut export_receiver, _shutdown_receiver) = new_tokio_test_exporter();
let config = BatchConfig {
scheduled_delay: Duration::from_secs(60 * 60 * 24),
..Default::default()
};
let spawn = |fut| tokio::task::spawn_blocking(|| futures::executor::block_on(fut));
let mut processor = BatchSpanProcessor::new(
Box::new(exporter),
spawn,
tokio_interval_stream,
tokio::time::sleep,
config,
);
let handle = tokio::spawn(async move {
loop {
if let Some(span) = export_receiver.recv().await {
assert_eq!(span.span_context, new_test_export_span_data().span_context);
break;
}
}
});
tokio::time::sleep(Duration::from_secs(1)).await;
processor.on_end(new_test_export_span_data());
let flush_res = processor.force_flush();
assert!(flush_res.is_ok());
let _shutdown_result = processor.shutdown();
assert!(
tokio::time::timeout(Duration::from_secs(5), handle)
.await
.is_ok(),
"timed out in 5 seconds. force_flush may not export any data when called"
);
}
struct BlockingExporter<D> {
delay_for: time::Duration,
delay_fn: D,
}
impl<D, DS> Debug for BlockingExporter<D>
where
D: Fn(time::Duration) -> DS + 'static + Send + Sync,
DS: Future<Output = ()> + Send + Sync + 'static,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("blocking exporter for testing")
}
}
#[async_trait]
impl<D, DS> SpanExporter for BlockingExporter<D>
where
D: Fn(time::Duration) -> DS + 'static + Send + Sync,
DS: Future<Output = ()> + Send + Sync + 'static,
{
async fn export(&mut self, _batch: Vec<SpanData>) -> ExportResult {
(self.delay_fn)(self.delay_for).await;
Ok(())
}
}
#[test]
fn test_timeout_tokio_timeout() {
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
runtime.block_on(timeout_test_tokio(true));
}
#[test]
fn test_timeout_tokio_not_timeout() {
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
runtime.block_on(timeout_test_tokio(false));
}
#[test]
#[cfg(feature = "async-std")]
fn test_timeout_async_std_timeout() {
async_std::task::block_on(timeout_test_std_async(true));
}
#[test]
#[cfg(feature = "async-std")]
fn test_timeout_async_std_not_timeout() {
async_std::task::block_on(timeout_test_std_async(false));
}
#[cfg(feature = "async-std")]
async fn timeout_test_std_async(time_out: bool) {
let config = BatchConfig {
max_export_timeout: time::Duration::from_millis(if time_out { 5 } else { 60 }),
scheduled_delay: Duration::from_secs(60 * 60 * 24),
..Default::default()
};
let exporter = BlockingExporter {
delay_for: time::Duration::from_millis(if !time_out { 5 } else { 60 }),
delay_fn: async_std::task::sleep,
};
let mut processor = BatchSpanProcessor::new(
Box::new(exporter),
async_std::task::spawn,
async_std::stream::interval,
async_std::task::sleep,
config,
);
processor.on_end(new_test_export_span_data());
let flush_res = processor.force_flush();
if time_out {
assert!(flush_res.is_err());
} else {
assert!(flush_res.is_ok());
}
let shutdown_res = processor.shutdown();
assert!(shutdown_res.is_ok());
}
async fn timeout_test_tokio(time_out: bool) {
let config = BatchConfig {
max_export_timeout: time::Duration::from_millis(if time_out { 5 } else { 60 }),
scheduled_delay: Duration::from_secs(60 * 60 * 24),
..Default::default()
};
let exporter = BlockingExporter {
delay_for: time::Duration::from_millis(if !time_out { 5 } else { 60 }),
delay_fn: tokio::time::sleep,
};
let spawn = |fut| tokio::task::spawn_blocking(|| futures::executor::block_on(fut));
let mut processor = BatchSpanProcessor::new(
Box::new(exporter),
spawn,
tokio_interval_stream,
tokio::time::sleep,
config,
);
tokio::time::sleep(time::Duration::from_secs(1)).await;
processor.on_end(new_test_export_span_data());
let flush_res = processor.force_flush();
if time_out {
assert!(flush_res.is_err());
} else {
assert!(flush_res.is_ok());
}
let shutdown_res = processor.shutdown();
assert!(shutdown_res.is_ok());
}
}