mirror of
https://gitlab.com/fabinfra/fabaccess/bffh.git
synced 2025-06-11 19:03:21 +02:00
Moving towards implementing the 0.3.2 featureset
This commit is contained in:
@ -45,7 +45,6 @@ slab = "0.4"
|
||||
arrayvec = { version = "0.7.0" }
|
||||
futures-timer = "3.0.2"
|
||||
once_cell = "1.4.0"
|
||||
lever = "0.1"
|
||||
tracing = "0.1.19"
|
||||
crossbeam-queue = "0.3.0"
|
||||
|
||||
|
@ -50,7 +50,6 @@ use core::fmt;
|
||||
use crossbeam_queue::ArrayQueue;
|
||||
use fmt::{Debug, Formatter};
|
||||
use lazy_static::lazy_static;
|
||||
use lever::prelude::TTas;
|
||||
use placement::CoreId;
|
||||
use std::collections::VecDeque;
|
||||
use std::time::Duration;
|
||||
@ -168,7 +167,6 @@ pub struct ThreadManager<Runner> {
|
||||
|
||||
runner: Runner,
|
||||
last_frequency: AtomicU64,
|
||||
frequencies: TTas<VecDeque<u64>>,
|
||||
}
|
||||
|
||||
impl<Runner: Debug> Debug for ThreadManager<Runner> {
|
||||
@ -192,7 +190,6 @@ impl<Runner: Debug> Debug for ThreadManager<Runner> {
|
||||
))
|
||||
.field("runner", &self.runner)
|
||||
.field("last_frequency", &self.last_frequency)
|
||||
.field("frequencies", &self.frequencies.try_lock())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
@ -213,9 +210,6 @@ impl<Runner: DynamicRunner + Sync + Send> ThreadManager<Runner> {
|
||||
|
||||
runner,
|
||||
last_frequency: AtomicU64::new(0),
|
||||
frequencies: TTas::new(VecDeque::with_capacity(
|
||||
FREQUENCY_QUEUE_SIZE.saturating_add(1),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
@ -398,57 +392,6 @@ impl<Runner: DynamicRunner + Sync + Send> ThreadManager<Runner> {
|
||||
///
|
||||
/// It uses frequency based calculation to define work. Utilizing average processing rate.
|
||||
fn scale_pool(&'static self) {
|
||||
// Fetch current frequency, it does matter that operations are ordered in this approach.
|
||||
let current_frequency = self.last_frequency.swap(0, Ordering::SeqCst);
|
||||
let mut freq_queue = self.frequencies.lock();
|
||||
|
||||
// Make it safe to start for calculations by adding initial frequency scale
|
||||
if freq_queue.len() == 0 {
|
||||
freq_queue.push_back(0);
|
||||
}
|
||||
|
||||
// Calculate message rate for the given time window
|
||||
let frequency = (current_frequency as f64 / SCALER_POLL_INTERVAL as f64) as u64;
|
||||
|
||||
// Calculates current time window's EMA value (including last sample)
|
||||
let prev_ema_frequency = Self::calculate_ema(&freq_queue);
|
||||
|
||||
// Add seen frequency data to the frequency histogram.
|
||||
freq_queue.push_back(frequency);
|
||||
if freq_queue.len() == FREQUENCY_QUEUE_SIZE.saturating_add(1) {
|
||||
freq_queue.pop_front();
|
||||
}
|
||||
|
||||
// Calculates current time window's EMA value (including last sample)
|
||||
let curr_ema_frequency = Self::calculate_ema(&freq_queue);
|
||||
trace!("Current EMA freq: {}", curr_ema_frequency);
|
||||
|
||||
// Adapts the thread count of pool
|
||||
//
|
||||
// Sliding window of frequencies visited by the pool manager.
|
||||
// Pool manager creates EMA value for previous window and current window.
|
||||
// Compare them to determine scaling amount based on the trends.
|
||||
// If current EMA value is bigger, we will scale up.
|
||||
if curr_ema_frequency > prev_ema_frequency {
|
||||
// "Scale by" amount can be seen as "how much load is coming".
|
||||
// "Scale" amount is "how many threads we should spawn".
|
||||
let scale_by: f64 = curr_ema_frequency - prev_ema_frequency;
|
||||
let scale = num_cpus::get().min(
|
||||
((DEFAULT_LOW_WATERMARK as f64 * scale_by) + DEFAULT_LOW_WATERMARK as f64) as usize,
|
||||
);
|
||||
trace!("unparking {} threads", scale);
|
||||
|
||||
// It is time to scale the pool!
|
||||
self.provision_threads(scale, &self.fences);
|
||||
} else if (curr_ema_frequency - prev_ema_frequency).abs() < f64::EPSILON
|
||||
&& current_frequency != 0
|
||||
{
|
||||
// Throughput is low. Allocate more threads to unblock flow.
|
||||
// If we fall to this case, scheduler is congested by longhauling tasks.
|
||||
// For unblock the flow we should add up some threads to the pool, but not that many to
|
||||
// stagger the program's operation.
|
||||
trace!("unparking {} threads", DEFAULT_LOW_WATERMARK);
|
||||
self.provision_threads(DEFAULT_LOW_WATERMARK as usize, &self.fences);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user