Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 51 additions & 30 deletions crates/edgezero-adapter-fastly/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ use edgezero_core::error::EdgeError;
use edgezero_core::http::{header, HeaderMap, HeaderValue, Method, Uri};
use edgezero_core::proxy::{ProxyClient, ProxyRequest, ProxyResponse};
use fastly::{
http::body::StreamingBody, Backend, Request as FastlyRequest, Response as FastlyResponse,
error::anyhow, http::body::StreamingBody, Backend, Request as FastlyRequest,
Response as FastlyResponse,
};
use futures_util::stream::{BoxStream, StreamExt};
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::io::{self, Write};
use std::time::Duration;

const BACKEND_PREFIX: &str = "edgezero-dynamic-";

Expand All @@ -22,10 +22,10 @@ pub struct FastlyProxyClient;
impl ProxyClient for FastlyProxyClient {
async fn send(&self, request: ProxyRequest) -> Result<ProxyResponse, EdgeError> {
let (method, uri, headers, body, _ext) = request.into_parts();
let backend = ensure_backend(&uri)?;
let backend_name = ensure_backend(&uri)?;
let fastly_request = build_fastly_request(method, &uri, headers)?;
let (mut streaming_body, pending_request) = fastly_request
.send_async_streaming(backend.name())
.send_async_streaming(&backend_name)
.map_err(EdgeError::internal)?;
forward_request_body(body, &mut streaming_body).await?;
streaming_body.finish().map_err(EdgeError::internal)?;
Expand Down Expand Up @@ -88,45 +88,66 @@ async fn forward_request_body(
Ok(())
}

fn ensure_backend(uri: &Uri) -> Result<Backend, EdgeError> {
fn ensure_backend(uri: &Uri) -> Result<String, EdgeError> {
let host = uri
.host()
.ok_or_else(|| EdgeError::bad_request("proxy target must include host"))?;
let port = uri.port_u16();
let target = match port {
Some(port) => format!("{}:{}", host, port),
None => host.to_string(),
};

let name = backend_name(&target, uri.scheme_str());
let scheme = uri.scheme_str().unwrap_or("https");
let is_https = scheme.eq_ignore_ascii_case("https");

let host_with_port = match port {
Some(p) => format!("{}:{}", host, p),
None => host.to_string(),
let target_port = match (uri.port_u16(), is_https) {
(Some(p), _) => p,
(None, true) => 443,
(None, false) => 80,
};

let builder = Backend::builder(&name, &host_with_port).override_host(host);
let host_with_port = format!("{}:{}", host, target_port);

// Human-readable name: backend_{scheme}_{host}_{port} with dots/colons sanitised
let name_base = format!("{}_{}_{}", scheme, host, target_port);
let backend_name = format!("{}{}", BACKEND_PREFIX, name_base.replace(['.', ':'], "_"));

let mut builder = Backend::builder(&backend_name, &host_with_port)
.override_host(host)
.connect_timeout(Duration::from_secs(1))
.first_byte_timeout(Duration::from_secs(15))
.between_bytes_timeout(Duration::from_secs(10));

if is_https {
builder = builder
.enable_ssl()
.sni_hostname(host)
.check_certificate(host);
log::debug!("enable ssl for backend: {}", backend_name);
}

match builder.finish() {
Ok(backend) => Ok(backend),
Err(_) => {
let mut builder = Backend::builder(&name, &target);
if uri.scheme_str() == Some("https") {
builder = builder.enable_ssl();
Ok(_) => {
log::debug!(
"created dynamic backend: {} -> {}",
backend_name,
host_with_port
);
Ok(backend_name)
}
Err(e) => {
let msg = e.to_string();
if msg.contains("NameInUse") || msg.contains("already in use") {
log::debug!("reusing existing dynamic backend: {}", backend_name);
Ok(backend_name)
} else {
Err(EdgeError::internal(anyhow!(
"dynamic backend creation failed ({} -> {}): {}",
backend_name,
host_with_port,
msg
)))
}
builder.finish().map_err(EdgeError::internal)
}
}
}

fn backend_name(target: &str, scheme: Option<&str>) -> String {
let mut hasher = DefaultHasher::new();
target.hash(&mut hasher);
scheme.hash(&mut hasher);
let hash = hasher.finish();
format!("{}{:016x}", BACKEND_PREFIX, hash)
}

fn convert_response(fastly_response: &mut FastlyResponse) -> Result<ProxyResponse, EdgeError> {
let status = fastly_response.get_status();
let mut proxy_response = ProxyResponse::new(status, Body::empty());
Expand Down