diff --git a/lightning/src/ln/async_payments_tests.rs b/lightning/src/ln/async_payments_tests.rs index 25522346d9c..7c1f0eddb07 100644 --- a/lightning/src/ln/async_payments_tests.rs +++ b/lightning/src/ln/async_payments_tests.rs @@ -2879,6 +2879,7 @@ fn async_payment_e2e() { .unwrap(); let (peer_node_id, static_invoice_om, static_invoice) = extract_static_invoice_om(invoice_server, &[sender_lsp, sender]); + let payment_hash = lock_in_htlc_for_static_invoice(&static_invoice_om, peer_node_id, sender, sender_lsp); @@ -2965,6 +2966,167 @@ fn async_payment_e2e() { assert_eq!(res, Some(PaidBolt12Invoice::StaticInvoice(static_invoice))); } +#[test] +fn async_payment_e2e_release_before_hold_registered() { + let chanmon_cfgs = create_chanmon_cfgs(4); + let node_cfgs = create_node_cfgs(4, &chanmon_cfgs); + + let (sender_cfg, recipient_cfg) = (often_offline_node_cfg(), often_offline_node_cfg()); + let mut sender_lsp_cfg = test_default_channel_config(); + sender_lsp_cfg.enable_htlc_hold = true; + let mut invoice_server_cfg = test_default_channel_config(); + invoice_server_cfg.accept_forwards_to_priv_channels = true; + + let node_chanmgrs = create_node_chanmgrs( + 4, + &node_cfgs, + &[Some(sender_cfg), Some(sender_lsp_cfg), Some(invoice_server_cfg), Some(recipient_cfg)], + ); + let nodes = create_network(4, &node_cfgs, &node_chanmgrs); + create_unannounced_chan_between_nodes_with_value(&nodes, 0, 1, 1_000_000, 0); + create_announced_chan_between_nodes_with_value(&nodes, 1, 2, 1_000_000, 0); + create_unannounced_chan_between_nodes_with_value(&nodes, 2, 3, 1_000_000, 0); + unify_blockheight_across_nodes(&nodes); + let sender = &nodes[0]; + let sender_lsp = &nodes[1]; + let invoice_server = &nodes[2]; + let recipient = &nodes[3]; + + let recipient_id = vec![42; 32]; + let inv_server_paths = + invoice_server.node.blinded_paths_for_async_recipient(recipient_id.clone(), None).unwrap(); + recipient.node.set_paths_to_static_invoice_server(inv_server_paths).unwrap(); + expect_offer_paths_requests(recipient, &[invoice_server, sender_lsp]); + let invoice_flow_res = + pass_static_invoice_server_messages(invoice_server, recipient, recipient_id.clone()); + let invoice = invoice_flow_res.invoice; + let invreq_path = invoice_flow_res.invoice_request_path; + + let offer = recipient.node.get_async_receive_offer().unwrap(); + recipient.node.peer_disconnected(invoice_server.node.get_our_node_id()); + recipient.onion_messenger.peer_disconnected(invoice_server.node.get_our_node_id()); + invoice_server.node.peer_disconnected(recipient.node.get_our_node_id()); + invoice_server.onion_messenger.peer_disconnected(recipient.node.get_our_node_id()); + + let amt_msat = 5000; + let payment_id = PaymentId([1; 32]); + sender.node.pay_for_offer(&offer, Some(amt_msat), payment_id, Default::default()).unwrap(); + + let (peer_id, invreq_om) = extract_invoice_request_om(sender, &[sender_lsp, invoice_server]); + invoice_server.onion_messenger.handle_onion_message(peer_id, &invreq_om); + + let mut events = invoice_server.node.get_and_clear_pending_events(); + assert_eq!(events.len(), 1); + let (reply_path, invreq) = match events.pop().unwrap() { + Event::StaticInvoiceRequested { + recipient_id: ev_id, reply_path, invoice_request, .. + } => { + assert_eq!(recipient_id, ev_id); + (reply_path, invoice_request) + }, + _ => panic!(), + }; + + invoice_server + .node + .respond_to_static_invoice_request(invoice, reply_path, invreq, invreq_path) + .unwrap(); + let (peer_node_id, static_invoice_om, static_invoice) = + extract_static_invoice_om(invoice_server, &[sender_lsp, sender]); + + // Lock the HTLC in with the sender LSP, but stop before the sender's revoke_and_ack is handed + // back to the sender LSP. This reproduces the real LSPS2 timing where ReleaseHeldHtlc can + // arrive before the held HTLC is queued for decode on the sender LSP. + sender.onion_messenger.handle_onion_message(peer_node_id, &static_invoice_om); + check_added_monitors(sender, 1); + let commitment_update = get_htlc_update_msgs(&sender, &sender_lsp.node.get_our_node_id()); + let update_add = commitment_update.update_add_htlcs[0].clone(); + let payment_hash = update_add.payment_hash; + assert!(update_add.hold_htlc.is_some()); + sender_lsp.node.handle_update_add_htlc(sender.node.get_our_node_id(), &update_add); + sender_lsp.node.handle_commitment_signed_batch_test( + sender.node.get_our_node_id(), + &commitment_update.commitment_signed, + ); + check_added_monitors(sender_lsp, 1); + let (_extra_msg_option, sender_raa, sender_holding_cell_htlcs) = + do_main_commitment_signed_dance(sender_lsp, sender, false); + assert!(sender_holding_cell_htlcs.is_empty()); + + let held_htlc_om_to_inv_server = sender + .onion_messenger + .next_onion_message_for_peer(invoice_server.node.get_our_node_id()) + .unwrap(); + invoice_server + .onion_messenger + .handle_onion_message(sender_lsp.node.get_our_node_id(), &held_htlc_om_to_inv_server); + + let mut events_rc = core::cell::RefCell::new(Vec::new()); + invoice_server.onion_messenger.process_pending_events(&|e| Ok(events_rc.borrow_mut().push(e))); + let events = events_rc.into_inner(); + let held_htlc_om = events + .into_iter() + .find_map(|ev| { + if let Event::OnionMessageIntercepted { message, .. } = ev { + let peeled_onion = recipient.onion_messenger.peel_onion_message(&message).unwrap(); + if matches!( + peeled_onion, + PeeledOnion::Offers(OffersMessage::InvoiceRequest { .. }, _, _) + ) { + return None; + } + + assert!(matches!( + peeled_onion, + PeeledOnion::AsyncPayments(AsyncPaymentsMessage::HeldHtlcAvailable(_), _, _) + )); + Some(message) + } else { + None + } + }) + .unwrap(); + + let mut reconnect_args = ReconnectArgs::new(invoice_server, recipient); + reconnect_args.send_channel_ready = (true, true); + reconnect_nodes(reconnect_args); + + let events = core::cell::RefCell::new(Vec::new()); + invoice_server.onion_messenger.process_pending_events(&|e| Ok(events.borrow_mut().push(e))); + assert_eq!(events.borrow().len(), 1); + assert!(matches!(events.into_inner().pop().unwrap(), Event::OnionMessagePeerConnected { .. })); + expect_offer_paths_requests(recipient, &[invoice_server]); + + recipient + .onion_messenger + .handle_onion_message(invoice_server.node.get_our_node_id(), &held_htlc_om); + let (peer_id, release_htlc_om) = + extract_release_htlc_oms(recipient, &[sender, sender_lsp, invoice_server]).pop().unwrap(); + sender_lsp.onion_messenger.handle_onion_message(peer_id, &release_htlc_om); + + // Now let the sender LSP receive the sender's revoke_and_ack and continue processing the held + // HTLC. Old code drops the release request above; fixed code remembers it and forwards the HTLC. + sender_lsp.node.handle_revoke_and_ack(sender.node.get_our_node_id(), &sender_raa); + check_added_monitors(sender_lsp, 1); + assert!(sender_lsp.node.get_and_clear_pending_msg_events().is_empty()); + sender_lsp.node.process_pending_htlc_forwards(); + let mut events = sender_lsp.node.get_and_clear_pending_msg_events(); + assert_eq!(events.len(), 1); + let ev = remove_first_msg_event_to_node(&invoice_server.node.get_our_node_id(), &mut events); + check_added_monitors(&sender_lsp, 1); + + let path: &[&Node] = &[invoice_server, recipient]; + let args = PassAlongPathArgs::new(sender_lsp, path, amt_msat, payment_hash, ev) + .with_dummy_tlvs(&[DummyTlvs::default(); DEFAULT_PAYMENT_DUMMY_HOPS]); + let claimable_ev = do_pass_along_path(args).unwrap(); + + let route: &[&[&Node]] = &[&[sender_lsp, invoice_server, recipient]]; + let keysend_preimage = extract_payment_preimage(&claimable_ev); + let (res, _) = + claim_payment_along_route(ClaimAlongRouteArgs::new(sender, route, keysend_preimage)); + assert_eq!(res, Some(PaidBolt12Invoice::StaticInvoice(static_invoice))); +} + #[test] fn held_htlc_timeout() { // Test that if a held HTLC doesn't get released for a long time, it will eventually time out and diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 70617b20894..b9663eb16f9 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -2798,6 +2798,7 @@ pub struct ChannelManager< /// 2. HTLCs that are being held on behalf of an often-offline sender until receipt of a /// [`ReleaseHeldHtlc`] onion message from an often-offline recipient pending_intercepted_htlcs: Mutex>, + pending_released_async_htlcs: Mutex>, /// Outbound SCID Alias -> pending `update_add_htlc`s to decode. /// We use the scid alias because regular scids may change if a splice occurs. @@ -3144,6 +3145,11 @@ pub(crate) const CLTV_FAR_FAR_AWAY: u32 = 14 * 24 * 6; // a payment was being routed, so we add an extra block to be safe. pub const MIN_FINAL_CLTV_EXPIRY_DELTA: u16 = HTLC_FAIL_BACK_BUFFER as u16 + 3; +// `ReleaseHeldHtlc` can legitimately arrive before a held HTLC completes pending-update-add +// processing, but unsolicited onion messages should not be able to grow this bookkeeping without +// bound. +const MAX_PENDING_ASYNC_EARLY_RELEASES: usize = 4096; + // Check that our MIN_CLTV_EXPIRY_DELTA gives us enough time to get everything on chain and locked // in with enough time left to fail the corresponding HTLC back to our inbound edge before they // force-close on us. @@ -3603,6 +3609,7 @@ impl< decode_update_add_htlcs: Mutex::new(new_hash_map()), claimable_payments: Mutex::new(ClaimablePayments { claimable_payments: new_hash_map(), pending_claiming_payments: new_hash_map() }), pending_intercepted_htlcs: Mutex::new(new_hash_map()), + pending_released_async_htlcs: Mutex::new(new_hash_set()), short_to_chan_info: FairRwLock::new(new_hash_map()), our_network_pubkey, @@ -6002,6 +6009,82 @@ impl< ) } + fn release_held_htlc(&self, intercept_id: InterceptId, mut htlc: PendingAddHTLCInfo) { + let next_hop_scid = match htlc.forward_info.routing { + PendingHTLCRouting::Forward { ref mut hold_htlc, short_channel_id, .. } => { + debug_assert!(hold_htlc.is_some()); + *hold_htlc = None; + short_channel_id + }, + _ => { + debug_assert!(false, "HTLC intercepts can only be forwards"); + return; + }, + }; + + let logger = WithContext::from( + &self.logger, + Some(htlc.prev_counterparty_node_id), + Some(htlc.prev_channel_id), + Some(htlc.forward_info.payment_hash), + ); + log_trace!(logger, "Releasing held htlc with intercept_id {}", intercept_id); + + let prev_chan_public = { + let per_peer_state = self.per_peer_state.read().unwrap(); + let peer_state = + per_peer_state.get(&htlc.prev_counterparty_node_id).map(|mtx| mtx.lock().unwrap()); + let chan_state = peer_state + .as_ref() + .map(|state| state.channel_by_id.get(&htlc.prev_channel_id)) + .flatten(); + if let Some(chan_state) = chan_state { + chan_state.context().should_announce() + } else { + return; + } + }; + + let should_intercept = self + .do_funded_channel_callback(next_hop_scid, |chan| { + self.forward_needs_intercept_to_known_chan(prev_chan_public, chan) + }) + .unwrap_or_else(|| self.forward_needs_intercept_to_unknown_chan(next_hop_scid)); + + if should_intercept { + let intercept_id = InterceptId::from_htlc_id_and_chan_id( + htlc.prev_htlc_id, + &htlc.prev_channel_id, + &htlc.prev_counterparty_node_id, + ); + let mut pending_intercepts = self.pending_intercepted_htlcs.lock().unwrap(); + match pending_intercepts.entry(intercept_id) { + hash_map::Entry::Vacant(entry) => { + if let Ok(intercept_ev) = create_htlc_intercepted_event(intercept_id, &htlc) { + self.pending_events.lock().unwrap().push_back((intercept_ev, None)); + entry.insert(htlc); + } else { + debug_assert!(false); + return; + } + }, + hash_map::Entry::Occupied(_) => { + log_error!( + logger, + "Failed to forward incoming HTLC: detected duplicate intercepted payment", + ); + debug_assert!( + false, + "Should never have two HTLCs with the same channel id and htlc id", + ); + return; + }, + } + } else { + self.forward_htlcs([htlc]); + } + } + /// Signals that no further attempts for the given payment should occur. Useful if you have a /// pending outbound payment with retries remaining, but wish to stop retrying the payment before /// retries are exhausted. @@ -7492,23 +7575,32 @@ impl< Some(update_add_htlc.payment_hash), ); if pending_add.forward_info.routing.should_hold_htlc() { - let mut held_htlcs = self.pending_intercepted_htlcs.lock().unwrap(); let intercept_id = intercept_id(); - match held_htlcs.entry(intercept_id) { - hash_map::Entry::Vacant(entry) => { - log_debug!( - logger, - "Intercepted held HTLC with id {intercept_id}, holding until the recipient is online" - ); - entry.insert(pending_add); - }, - hash_map::Entry::Occupied(_) => { - debug_assert!(false, "Should never have two HTLCs with the same channel id and htlc id"); - log_error!(logger, "Duplicate intercept id for HTLC"); - fail_htlc_continue_to_next!( - LocalHTLCFailureReason::TemporaryNodeFailure - ); - }, + if self + .pending_released_async_htlcs + .lock() + .unwrap() + .remove(&intercept_id) + { + self.release_held_htlc(intercept_id, pending_add); + } else { + let mut held_htlcs = self.pending_intercepted_htlcs.lock().unwrap(); + match held_htlcs.entry(intercept_id) { + hash_map::Entry::Vacant(entry) => { + log_debug!( + logger, + "Intercepted held HTLC with id {intercept_id}, holding until the recipient is online" + ); + entry.insert(pending_add); + }, + hash_map::Entry::Occupied(_) => { + debug_assert!(false, "Should never have two HTLCs with the same channel id and htlc id"); + log_error!(logger, "Duplicate intercept id for HTLC"); + fail_htlc_continue_to_next!( + LocalHTLCFailureReason::TemporaryNodeFailure + ); + }, + } } } else if intercept_forward { let intercept_id = intercept_id(); @@ -17197,104 +17289,35 @@ impl< } core::mem::drop(decode_update_add_htlcs); - let mut htlc = { + let htlc = { let mut pending_intercept_htlcs = self.pending_intercepted_htlcs.lock().unwrap(); match pending_intercept_htlcs.remove(&intercept_id) { Some(htlc) => htlc, None => { - log_trace!( - self.logger, - "Failed to release HTLC with intercept_id {}: HTLC not found", - intercept_id - ); - return; - }, - } - }; - let next_hop_scid = match htlc.forward_info.routing { - PendingHTLCRouting::Forward { ref mut hold_htlc, short_channel_id, .. } => { - debug_assert!(hold_htlc.is_some()); - *hold_htlc = None; - short_channel_id - }, - _ => { - debug_assert!(false, "HTLC intercepts can only be forwards"); - // Let the HTLC be auto-failed before it expires. - return; - }, - }; - - let logger = WithContext::from( - &self.logger, - Some(htlc.prev_counterparty_node_id), - Some(htlc.prev_channel_id), - Some(htlc.forward_info.payment_hash), - ); - log_trace!(logger, "Releasing held htlc with intercept_id {}", intercept_id); - - let prev_chan_public = { - let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state = per_peer_state - .get(&htlc.prev_counterparty_node_id) - .map(|mtx| mtx.lock().unwrap()); - let chan_state = peer_state - .as_ref() - .map(|state| state.channel_by_id.get(&htlc.prev_channel_id)) - .flatten(); - if let Some(chan_state) = chan_state { - chan_state.context().should_announce() - } else { - // If the inbound channel has closed since the HTLC was held, we really - // shouldn't forward it - forwarding it now would result in, at best, - // having to claim the HTLC on chain. Instead, drop the HTLC and let the - // counterparty claim their money on chain. - return; - } - }; - - let should_intercept = self - .do_funded_channel_callback(next_hop_scid, |chan| { - self.forward_needs_intercept_to_known_chan(prev_chan_public, chan) - }) - .unwrap_or_else(|| self.forward_needs_intercept_to_unknown_chan(next_hop_scid)); - - if should_intercept { - let intercept_id = InterceptId::from_htlc_id_and_chan_id( - htlc.prev_htlc_id, - &htlc.prev_channel_id, - &htlc.prev_counterparty_node_id, - ); - let mut pending_intercepts = self.pending_intercepted_htlcs.lock().unwrap(); - match pending_intercepts.entry(intercept_id) { - hash_map::Entry::Vacant(entry) => { - if let Ok(intercept_ev) = - create_htlc_intercepted_event(intercept_id, &htlc) + let mut pending_releases = + self.pending_released_async_htlcs.lock().unwrap(); + if pending_releases.len() < MAX_PENDING_ASYNC_EARLY_RELEASES + || pending_releases.contains(&intercept_id) { - self.pending_events.lock().unwrap().push_back((intercept_ev, None)); - entry.insert(htlc); + pending_releases.insert(intercept_id); + log_trace!( + self.logger, + "Queued release request for HTLC with intercept_id {} until it is fully held", + intercept_id + ); } else { - debug_assert!(false); - // Let the HTLC be auto-failed before it expires. - return; + log_trace!( + self.logger, + "Dropping early release request for intercept_id {} because the pending queue is full", + intercept_id + ); } - }, - hash_map::Entry::Occupied(_) => { - log_error!( - logger, - "Failed to forward incoming HTLC: detected duplicate intercepted payment", - ); - debug_assert!( - false, - "Should never have two HTLCs with the same channel id and htlc id", - ); - // Let the HTLC be auto-failed before it expires. return; }, } - } else { - self.forward_htlcs([htlc]); - } + }; + self.release_held_htlc(intercept_id, htlc); }, _ => return, } @@ -20091,6 +20114,7 @@ impl< inbound_payment_key: expanded_inbound_key, pending_outbound_payments: pending_outbounds, pending_intercepted_htlcs: Mutex::new(pending_intercepted_htlcs), + pending_released_async_htlcs: Mutex::new(new_hash_set()), forward_htlcs: Mutex::new(forward_htlcs), decode_update_add_htlcs: Mutex::new(decode_update_add_htlcs),