From 5a3bef1a1f510a7c6ff924cb43f7e1caf33546a5 Mon Sep 17 00:00:00 2001 From: Kinan <104761667+kibibytium@users.noreply.github.com> Date: Wed, 11 Dec 2024 15:33:47 +0100 Subject: [PATCH] wip --- packages/node-mimimi/src/importer.rs | 1596 ++++++++--------- packages/node-mimimi/src/reduce_to_chunks.rs | 257 +-- .../resources/testmail/attachment_sample.eml | 14 +- 3 files changed, 940 insertions(+), 927 deletions(-) diff --git a/packages/node-mimimi/src/importer.rs b/packages/node-mimimi/src/importer.rs index 56ced03895c4..1b41d595912f 100644 --- a/packages/node-mimimi/src/importer.rs +++ b/packages/node-mimimi/src/importer.rs @@ -1,7 +1,7 @@ use crate::importer::importable_mail::{ - ImportableMailAttachment, ImportableMailAttachmentMetaData, KeyedImportableMailAttachment, + ImportableMailAttachment, ImportableMailAttachmentMetaData, KeyedImportableMailAttachment, }; -use crate::reduce_to_chunks::UnitImport; +use crate::reduce_to_chunks::{AttachmentUploadData, KeyedImportMailData}; use base64::prelude::BASE64_URL_SAFE_NO_PAD; use base64::Engine; use file_reader::{FileImport, FileIterationError}; @@ -19,8 +19,8 @@ use tutasdk::crypto::randomizer_facade::RandomizerFacade; use tutasdk::entities::generated::sys::{BlobReferenceTokenWrapper, StringWrapper}; use tutasdk::entities::generated::tutanota::{ - ImportAttachment, ImportMailData, ImportMailGetIn, ImportMailPostIn, ImportMailPostOut, - ImportMailState, + ImportAttachment, ImportMailGetIn, ImportMailPostIn, ImportMailPostOut, + ImportMailState, }; use tutasdk::entities::json_size_estimator::estimate_json_size; use tutasdk::rest_error::PreconditionFailedReason::ImportFailure; @@ -39,47 +39,47 @@ pub const MAX_REQUEST_SIZE: usize = 1024 * 1024 * 8; #[derive(Debug)] pub enum ImportError { - SdkError { - // action we were trying to perform on sdk - action: &'static str, - // actual error sdk returned - error: ApiCallError, - }, - /// login feature is not available for this user - NoImportFeature, - /// Blob responded with empty server url list - EmptyBlobServerList, - /// Server did not return any element id for the newly posted import state - NoElementIdForState, - /// Can not create Native Rest client - NoNativeRestClient(std::io::Error), - /// Can not create valid credential from given raw input - CredentialValidationError(()), - /// Error when trying to resume the session passed from client - LoginError(tutasdk::login::LoginError), - /// Error while iterating through import source - IterationError(IterationError), - /// Some mail was too big - TooBigChunk, - /// number of mails we expected to be imported vs number of mails server had written is not same - MismatchedImportCount { expected: usize, imported: usize }, + SdkError { + // action we were trying to perform on sdk + action: &'static str, + // actual error sdk returned + error: ApiCallError, + }, + /// login feature is not available for this user + NoImportFeature, + /// Blob responded with empty server url list + EmptyBlobServerList, + /// Server did not return any element id for the newly posted import state + NoElementIdForState, + /// Can not create Native Rest client + NoNativeRestClient(std::io::Error), + /// Can not create valid credential from given raw input + CredentialValidationError(()), + /// Error when trying to resume the session passed from client + LoginError(tutasdk::login::LoginError), + /// Error while iterating through import source + IterationError(IterationError), + /// Some mail was too big + TooBigChunk, + /// number of mails we expected to be imported vs number of mails server had written is not same + MismatchedImportCount { expected: usize, imported: usize }, } #[derive(Debug)] pub enum IterationError { - Imap(ImapIterationError), - File(FileIterationError), + Imap(ImapIterationError), + File(FileIterationError), } #[derive(Clone, PartialEq)] pub enum ImportParams { - Imap { - imap_import_config: ImapImportConfig, - }, - LocalFile { - file_path: String, - is_mbox: bool, - }, + Imap { + imap_import_config: ImapImportConfig, + }, + LocalFile { + file_path: String, + is_mbox: bool, + }, } /// current state of the imap_reader import for this tuta account @@ -91,814 +91,808 @@ pub enum ImportParams { #[cfg_attr(test, derive(Debug))] #[repr(u8)] pub enum ImportStatus { - #[default] - Started = 0, - Paused = 1, - Running = 2, - Canceled = 3, - Finished = 4, + #[default] + Started = 0, + Paused = 1, + Running = 2, + Canceled = 3, + Finished = 4, } /// when state callback function is called after every chunk of import, /// javascript handle is expected to respond with this struct #[cfg_attr(feature = "javascript", napi_derive::napi(object))] pub struct StateCallbackResponse { - pub should_stop: bool, + pub should_stop: bool, } pub struct ImportEssential { - logged_in_sdk: Arc, - target_owner_group: GeneratedId, - mail_group_key: VersionedAesKey, - target_mailset: IdTupleGenerated, - randomizer_facade: RandomizerFacade, + logged_in_sdk: Arc, + target_owner_group: GeneratedId, + mail_group_key: VersionedAesKey, + target_mailset: IdTupleGenerated, + randomizer_facade: RandomizerFacade, } pub struct ImportState { - last_server_update: SystemTime, - pub remote_state: ImportMailState, - pub imported_mail_ids: Vec, + last_server_update: SystemTime, + pub remote_state: ImportMailState, + pub imported_mail_ids: Vec, } pub struct Importer { - essentials: ImportEssential, - state: ImportState, - source: ImportSource, + essentials: ImportEssential, + state: ImportState, + source: ImportSource, } pub enum ImportSource { - RemoteImap { imap_import_client: ImapImport }, - LocalFile { fs_email_client: FileImport }, + RemoteImap { imap_import_client: ImapImport }, + LocalFile { fs_email_client: FileImport }, } impl Iterator for ImportSource { - type Item = ImportableMail; - - fn next(&mut self) -> Option { - let next_importable_mail = match self { - // the other way (converting fs_source to an async_iterator) would be nicer, but that's a nightly feature - ImportSource::RemoteImap { imap_import_client } => imap_import_client - .fetch_next_mail() - .map_err(IterationError::Imap), - ImportSource::LocalFile { fs_email_client } => fs_email_client - .get_next_importable_mail() - .map_err(IterationError::File), - }; - - match next_importable_mail { - Ok(next_importable_mail) => Some(next_importable_mail), - - // source says, all the iteration have ended, - Err(IterationError::File(FileIterationError::SourceEnd)) - | Err(IterationError::Imap(ImapIterationError::SourceEnd)) => None, - - Err(e) => { - // once we handle this case we will need another iterator that filters (and logs) the - // errors so we don't have to handle the error case during the chunking + upload - panic!("Cannot get next email from source: {e:?}") - }, - } - } + type Item = ImportableMail; + + fn next(&mut self) -> Option { + let next_importable_mail = match self { + // the other way (converting fs_source to an async_iterator) would be nicer, but that's a nightly feature + ImportSource::RemoteImap { imap_import_client } => imap_import_client + .fetch_next_mail() + .map_err(IterationError::Imap), + ImportSource::LocalFile { fs_email_client } => fs_email_client + .get_next_importable_mail() + .map_err(IterationError::File), + }; + + match next_importable_mail { + Ok(next_importable_mail) => Some(next_importable_mail), + + // source says, all the iteration have ended, + Err(IterationError::File(FileIterationError::SourceEnd)) + | Err(IterationError::Imap(ImapIterationError::SourceEnd)) => None, + + Err(e) => { + // once we handle this case we will need another iterator that filters (and logs) the + // errors so we don't have to handle the error case during the chunking + upload + panic!("Cannot get next email from source: {e:?}") + } + } + } } -pub type ImportableMailsButcher = - super::reduce_to_chunks::Butcher<{ MAX_REQUEST_SIZE }, UnitImport, Source>; +pub type ImportableMailsButcher = super::reduce_to_chunks::Butcher<{ MAX_REQUEST_SIZE }, AttachmentUploadData, Source>; impl Importer { - fn make_random_aggregate_id(randomizer_facade: &RandomizerFacade) -> CustomId { - let new_id_bytes = randomizer_facade.generate_random_array::<4>(); - let new_id_string = BASE64_URL_SAFE_NO_PAD.encode(new_id_bytes); - CustomId(new_id_string) - } - - pub fn get_remote_state(&self) -> &ImportMailState { - &self.state.remote_state - } - - async fn initialize_remote_state(&mut self) -> Result<(), ImportError> { - let mailbox = self - .essentials - .logged_in_sdk - .mail_facade() - .load_user_mailbox() - .await - .map_err(|e| ImportError::sdk("loading mailbox", e))?; - let session_key = - GenericAesKey::Aes256(aes::Aes256Key::generate(&self.essentials.randomizer_facade)); - let owner_enc_session_key = self.essentials.mail_group_key.encrypt_key( - &session_key, - Iv::generate(&self.essentials.randomizer_facade), - ); - - let mut import_state_id = - IdTupleGenerated::new(mailbox.mailImportStates.clone(), GeneratedId::min_id()); - let mut import_state_for_upload = ImportMailState { - _format: 0, - _id: Some(import_state_id.clone()), - _permissions: mailbox._permissions, - _ownerGroup: Some(mailbox._ownerGroup.unwrap()), - _ownerEncSessionKey: Some(owner_enc_session_key.object), - _ownerKeyVersion: Some(owner_enc_session_key.version), - status: ImportStatus::Running as i64, - successfulMails: 0, - failedMails: 0, - targetFolder: self.essentials.target_mailset.clone(), - _errors: Some(Default::default()), - _finalIvs: Default::default(), - }; - - let create_data = self - .essentials - .logged_in_sdk - .mail_facade() - .get_crypto_entity_client() - .create_instance(import_state_for_upload.clone(), Some(&session_key)) - .await - .map_err(|e| ImportError::sdk("creating remote import state", e))?; - - import_state_id.element_id = create_data - .generatedId - .ok_or(ImportError::NoElementIdForState)?; - - import_state_for_upload._permissions = create_data.permissionListId; - import_state_for_upload._id = Some(import_state_id); - self.state.remote_state = import_state_for_upload; - self.state.last_server_update = SystemTime::now(); - - Ok(()) - } + fn make_random_aggregate_id(randomizer_facade: &RandomizerFacade) -> CustomId { + let new_id_bytes = randomizer_facade.generate_random_array::<4>(); + let new_id_string = BASE64_URL_SAFE_NO_PAD.encode(new_id_bytes); + CustomId(new_id_string) + } + + pub fn get_remote_state(&self) -> &ImportMailState { + &self.state.remote_state + } + + async fn initialize_remote_state(&mut self) -> Result<(), ImportError> { + let mailbox = self + .essentials + .logged_in_sdk + .mail_facade() + .load_user_mailbox() + .await + .map_err(|e| ImportError::sdk("loading mailbox", e))?; + let session_key = + GenericAesKey::Aes256(aes::Aes256Key::generate(&self.essentials.randomizer_facade)); + let owner_enc_session_key = self.essentials.mail_group_key.encrypt_key( + &session_key, + Iv::generate(&self.essentials.randomizer_facade), + ); + + let mut import_state_id = + IdTupleGenerated::new(mailbox.mailImportStates.clone(), GeneratedId::min_id()); + let mut import_state_for_upload = ImportMailState { + _format: 0, + _id: Some(import_state_id.clone()), + _permissions: mailbox._permissions, + _ownerGroup: Some(mailbox._ownerGroup.unwrap()), + _ownerEncSessionKey: Some(owner_enc_session_key.object), + _ownerKeyVersion: Some(owner_enc_session_key.version), + status: ImportStatus::Running as i64, + successfulMails: 0, + failedMails: 0, + targetFolder: self.essentials.target_mailset.clone(), + _errors: Some(Default::default()), + _finalIvs: Default::default(), + }; + + let create_data = self + .essentials + .logged_in_sdk + .mail_facade() + .get_crypto_entity_client() + .create_instance(import_state_for_upload.clone(), Some(&session_key)) + .await + .map_err(|e| ImportError::sdk("creating remote import state", e))?; + + import_state_id.element_id = create_data + .generatedId + .ok_or(ImportError::NoElementIdForState)?; + + import_state_for_upload._permissions = create_data.permissionListId; + import_state_for_upload._id = Some(import_state_id); + self.state.remote_state = import_state_for_upload; + self.state.last_server_update = SystemTime::now(); + + Ok(()) + } } impl ImportEssential { - const IMPORT_DISABLED_ERR: ApiCallError = ApiCallError::ServerResponseError { - source: HttpError::PreconditionFailedError(Some(ImportFailure( - ImportFailureReason::ImportDisabled, - ))), - }; - - async fn make_serialized_chunk( - &self, - importable_chunk: Vec, - ) -> Result<(ImportMailPostIn, GenericAesKey), ImportError> { - let mut serialized_imports = Vec::with_capacity(importable_chunk.len()); - - let mut upload_data_per_mail: Vec<(Vec, Vec)> = - Vec::with_capacity(importable_chunk.len()); - let attachments_count_per_mail: Vec = importable_chunk - .iter() - .map(|mail| mail.attachments.len()) - .collect(); - - // aggregate attachment data from multiple mails to upload in fewer request to the BlobService - let (attachments_per_mail, other_data): ( - Vec>, - Vec<(ImportMailData, GenericAesKey)>, - ) = importable_chunk - .into_iter() - .map(|mail| (mail.attachments, (mail.import_mail_data, mail.session_key))) - .collect(); - - for attachments_one_mail in attachments_per_mail { - if attachments_one_mail.len() > 0 { - let keyed_attachments: Vec = attachments_one_mail - .into_iter() - .map(|attachment| attachment.make_keyed_importable_mail_attachment(self)) - .collect(); - - let (attachments_file_data, attachments_meta_data): ( - Vec, - Vec, - ) = keyed_attachments - .into_iter() - .map(|keyed_attachment| { - let file_datum = FileData { - session_key: keyed_attachment.attachment_session_key, - data: keyed_attachment.content, - }; - (file_datum, keyed_attachment.meta_data) - }) - .unzip(); - - upload_data_per_mail.push((attachments_file_data, attachments_meta_data)) - } else { - upload_data_per_mail.push((vec![], vec![])); - } - } - - let (attachments_file_data_per_mail, attachments_meta_data_per_mail): ( - Vec>, - Vec>, - ) = upload_data_per_mail.into_iter().unzip(); - - let attachments_file_data_flattened_refs: Vec<&FileData> = - attachments_file_data_per_mail.iter().flatten().collect(); - - if attachments_file_data_flattened_refs.len() > 0 { - // upload all attachments in this chunk in one call to the blob_facade - // the blob_facade chunks them into efficient request to the BlobService - let mut reference_tokens_per_attachment_flattened = self - .logged_in_sdk - .blob_facade() - .encrypt_and_upload_multiple( - ArchiveDataType::Attachments, - &self.target_owner_group, - attachments_file_data_flattened_refs, - ) - .await - .map_err(|e| ImportError::sdk("uploading multiple attachments", e))?; - - // reference mails and received reference tokens again, by using the attachments count per mail - let mut all_reference_tokens_per_mail: Vec>> = - Vec::new(); - for attachments_count in attachments_count_per_mail { - if attachments_count == 0 { - println!("attachments 0 for mail"); - all_reference_tokens_per_mail.push(vec![]); - } else { - println!("attachments {attachments_count} for mail"); - let reference_tokens_per_mail = reference_tokens_per_attachment_flattened - .drain(0..attachments_count) - .collect(); - all_reference_tokens_per_mail.push(reference_tokens_per_mail); - } - } - - let import_attachments_per_mail: Vec> = - attachments_file_data_per_mail - .into_iter() - .zip( - attachments_meta_data_per_mail - .into_iter() - .zip(all_reference_tokens_per_mail), - ) - .map(|(file_data, (meta_data, reference_tokens_vectors))| { - let import_attachments_for_one_mail = file_data - .into_iter() - .zip(meta_data.into_iter().zip(reference_tokens_vectors)) - .map(|(file_datum, (meta_datum, reference_tokens))| { - if reference_tokens.len() == 0 { - let len = file_datum.data.len(); - println!("reference tokens empty!!!! {len}"); - } - meta_datum.make_import_attachment_data( - self, - &file_datum.session_key, - reference_tokens, - ) - }) - .collect(); - import_attachments_for_one_mail - }) - .collect(); - - let length = import_attachments_per_mail.len(); - println!("import_attachments_per_mail {length}"); - - // serialize multiple import_mail_data into on request to the ImportMailService - for ((mut import_mail_data, session_key), import_attachments) in - other_data.into_iter().zip(import_attachments_per_mail) - { - import_mail_data.importedAttachments = import_attachments; - - let serialized_import = self - .logged_in_sdk - .serialize_instance_to_json(import_mail_data, &session_key) - .map_err(|e| ImportError::sdk("serializing import_mail_data to json", e))?; - let wrapped_import_data = StringWrapper { - _id: Some(Importer::make_random_aggregate_id(&self.randomizer_facade)), - value: serialized_import, - }; - serialized_imports.push(wrapped_import_data); - } - } else { - // case: no mail in chunk has attachment - - // serialize multiple import_mail_data into on request to the ImportMailService - for (mut import_mail_data, session_key) in other_data { - import_mail_data.importedAttachments = vec![]; - - let serialized_import = self - .logged_in_sdk - .serialize_instance_to_json(import_mail_data, &session_key) - .map_err(|e| ImportError::sdk("serializing instance to json", e))?; - let wrapped_import_data = StringWrapper { - _id: Some(Importer::make_random_aggregate_id(&self.randomizer_facade)), - value: serialized_import, - }; - serialized_imports.push(wrapped_import_data); - } - } - - let session_key = GenericAesKey::Aes256(aes::Aes256Key::generate(&self.randomizer_facade)); - let owner_enc_sk_for_import_post = self - .mail_group_key - .encrypt_key(&session_key, Iv::generate(&self.randomizer_facade)); - - let post_in = ImportMailPostIn { - ownerGroup: self.target_owner_group.clone(), - encImports: serialized_imports, - targetMailFolder: self.target_mailset.clone(), - ownerKeyVersion: owner_enc_sk_for_import_post.version, - ownerEncSessionKey: owner_enc_sk_for_import_post.object, - newImportedMailSetName: "@internal-imported-mailset".to_string(), - _finalIvs: Default::default(), - _format: 0, - _errors: None, - }; - - Ok((post_in, session_key)) - } - - // distribute load across the cluster. should be switched to read token (once it is implemented on the - // BlobFacade) and use ArchiveDataType::MailDetails to target one of the nodes that actually stores the - // data - async fn get_server_url_to_upload(&self) -> Result { - self.logged_in_sdk - .request_blob_facade_write_token(ArchiveDataType::Attachments) - .await - .map_err(|e| ImportError::sdk("request blob write token", e))? - .servers - .last() - .map(|s| s.url.to_string()) - .ok_or(ImportError::EmptyBlobServerList) - } - - async fn make_import_service_call( - &self, - import_mail_data: (ImportMailPostIn, GenericAesKey), - ) -> Result { - self.verify_import_feature_enabled().await?; - - let server_to_upload = self.get_server_url_to_upload().await?; - let (import_mail_post_in, session_key_for_import_post) = import_mail_data; - - self.logged_in_sdk - .get_service_executor() - .post::( - import_mail_post_in, - ExtraServiceParams { - base_url: Some(server_to_upload), - session_key: Some(session_key_for_import_post), - ..Default::default() - }, - ) - .await - .map_err(|e| { - if e == Self::IMPORT_DISABLED_ERR { - ImportError::NoImportFeature - } else { - ImportError::sdk("calling ImportMailService", e) - } - }) - } - - pub async fn verify_import_feature_enabled(&self) -> Result<(), ImportError> { - self.logged_in_sdk - .get_service_executor() - .get::( - ImportMailGetIn { _format: 0 }, - ExtraServiceParams::default(), - ) - .await - .map_err(|e| { - if e == Self::IMPORT_DISABLED_ERR { - ImportError::NoImportFeature - } else { - ImportError::sdk("calling ImportMailService", e) - } - }) - } + const IMPORT_DISABLED_ERR: ApiCallError = ApiCallError::ServerResponseError { + source: HttpError::PreconditionFailedError(Some(ImportFailure( + ImportFailureReason::ImportDisabled, + ))), + }; + + async fn upload_attachments_for_chunk( + &self, + importable_chunk: Vec, + ) -> Result<(Vec), ImportError> { + let mut upload_data_per_mail: Vec<(Vec, Vec)> = + Vec::with_capacity(importable_chunk.len()); + let attachments_count_per_mail: Vec = importable_chunk + .iter() + .map(|mail| mail.attachments.len()) + .collect(); + + // aggregate attachment data from multiple mails to upload in fewer request to the BlobService + let (attachments_per_mail, keyed_import_mail_data): ( + Vec>, + Vec, + ) = importable_chunk + .into_iter() + .map(|mail| (mail.attachments, mail.keyed_import_mail_data)) + .unzip(); + + for attachments_next_mail in attachments_per_mail { + if !attachments_next_mail.is_empty() { + let keyed_attachments: Vec = attachments_next_mail + .into_iter() + .map(|attachment| attachment.make_keyed_importable_mail_attachment(self)) + .collect(); + + let (attachments_file_data, attachments_meta_data): ( + Vec, + Vec, + ) = keyed_attachments + .into_iter() + .map(|keyed_attachment| { + let file_datum = FileData { + session_key: keyed_attachment.attachment_session_key, + data: keyed_attachment.content, + }; + (file_datum, keyed_attachment.meta_data) + }) + .unzip(); + upload_data_per_mail.push((attachments_file_data, attachments_meta_data)) + } else { + // attachments_next_mail is empty we push empty vectors in order to maintain + // correct order of blob reference tokens across different attachments and mails + // these empty vectors indicate + // * an empty list of attachments + // * and an empty list of corresponding attachment metadata for this mail + upload_data_per_mail.push((vec![], vec![])); + } + } + + let (attachments_file_data_per_mail, attachments_meta_data_per_mail): ( + Vec>, + Vec>, + ) = upload_data_per_mail + .into_iter() + .unzip(); + + let attachments_file_data_flattened: Vec<&FileData> = attachments_file_data_per_mail + .iter() + .flatten() + .collect(); + + // upload all attachments in this chunk in one call to the blob_facade + // the blob_facade chunks them into efficient request to the BlobService + let mut reference_tokens_per_attachment_flattened = self + .logged_in_sdk + .blob_facade() + .encrypt_and_upload_multiple( + ArchiveDataType::Attachments, + &self.target_owner_group, + attachments_file_data_flattened, + ) + .await + .map_err(|e| ImportError::sdk("fail to upload multiple attachments", e))?; + + // reference mails and received reference tokens, by using the attachments count per mail + let mut all_reference_tokens_per_mail: Vec>> = vec![]; + for attachments_count in attachments_count_per_mail { + if attachments_count == 0 { + all_reference_tokens_per_mail.push(vec![]); + } else { + let reference_tokens_per_mail = reference_tokens_per_attachment_flattened + .drain(..attachments_count) + .collect(); + all_reference_tokens_per_mail.push(reference_tokens_per_mail); + } + } + + let import_attachments_per_mail: Vec> = + attachments_file_data_per_mail + .into_iter() + .zip(attachments_meta_data_per_mail.into_iter().zip(all_reference_tokens_per_mail)) + .map(|(file_data, (meta_data, reference_tokens_per_attachment))| { + let import_attachments_single_mail = file_data + .into_iter() + .zip(meta_data.into_iter().zip(reference_tokens_per_attachment)) + .map(|(file_datum, (meta_datum, reference_tokens))| { + meta_datum.make_import_attachment_data( + self, + &file_datum.session_key, + reference_tokens, + ) + }) + .collect(); + import_attachments_single_mail + }) + .collect(); + + let unit_import_results = keyed_import_mail_data.into_iter().zip(import_attachments_per_mail) + .map(|(mut unit_import, import_attachments)| { + unit_import.import_mail_data.importedAttachments = import_attachments; + unit_import + }) + .collect(); + + Ok(unit_import_results) + } + + async fn make_serialized_chunk( + &self, + importable_chunk: Vec, + ) -> Result<(ImportMailPostIn, GenericAesKey), ImportError> { + let mut serialized_imports = Vec::with_capacity(importable_chunk.len()); + + for unit_import in importable_chunk { + let serialized_import = self + .logged_in_sdk + .serialize_instance_to_json(unit_import.import_mail_data, &unit_import.session_key) + .map_err(|e| ImportError::sdk("serializing instance to json", e))?; + let wrapped_import_data = StringWrapper { + _id: Some(Importer::make_random_aggregate_id(&self.randomizer_facade)), + value: serialized_import, + }; + serialized_imports.push(wrapped_import_data); + } + + let session_key = GenericAesKey::Aes256(aes::Aes256Key::generate(&self.randomizer_facade)); + let owner_enc_sk_for_import_post = self + .mail_group_key + .encrypt_key(&session_key, Iv::generate(&self.randomizer_facade)); + + let post_in = ImportMailPostIn { + ownerGroup: self.target_owner_group.clone(), + encImports: serialized_imports, + targetMailFolder: self.target_mailset.clone(), + ownerKeyVersion: owner_enc_sk_for_import_post.version, + ownerEncSessionKey: owner_enc_sk_for_import_post.object, + newImportedMailSetName: "@internal-imported-mailset".to_string(), + _finalIvs: Default::default(), + _format: 0, + _errors: None, + }; + + Ok((post_in, session_key)) + } + + // distribute load across the cluster. should be switched to read token (once it is implemented on the + // BlobFacade) and use ArchiveDataType::MailDetails to target one of the nodes that actually stores the + // data + async fn get_server_url_to_upload(&self) -> Result { + self.logged_in_sdk + .request_blob_facade_write_token(ArchiveDataType::Attachments) + .await + .map_err(|e| ImportError::sdk("request blob write token", e))? + .servers + .last() + .map(|s| s.url.to_string()) + .ok_or(ImportError::EmptyBlobServerList) + } + + async fn make_import_service_call( + &self, + import_mail_data: (ImportMailPostIn, GenericAesKey), + ) -> Result { + self.verify_import_feature_enabled().await?; + + let server_to_upload = self.get_server_url_to_upload().await?; + let (import_mail_post_in, session_key_for_import_post) = import_mail_data; + + self.logged_in_sdk + .get_service_executor() + .post::( + import_mail_post_in, + ExtraServiceParams { + base_url: Some(server_to_upload), + session_key: Some(session_key_for_import_post), + ..Default::default() + }, + ) + .await + .map_err(|e| { + if e == Self::IMPORT_DISABLED_ERR { + ImportError::NoImportFeature + } else { + ImportError::sdk("calling ImportMailService", e) + } + }) + } + + pub async fn verify_import_feature_enabled(&self) -> Result<(), ImportError> { + self.logged_in_sdk + .get_service_executor() + .get::( + ImportMailGetIn { _format: 0 }, + ExtraServiceParams::default(), + ) + .await + .map_err(|e| { + if e == Self::IMPORT_DISABLED_ERR { + ImportError::NoImportFeature + } else { + ImportError::sdk("calling ImportMailService", e) + } + }) + } } impl ImportState { - async fn update_import_state_on_server( - &mut self, - logged_in_sdk: &LoggedInSdk, - ) -> Result<(), ImportError> { - if self.last_server_update.elapsed().unwrap_or_default() > Duration::from_secs(6) { - self.force_update_import_state_on_server(logged_in_sdk) - .await - } else { - Ok(()) - } - } - - async fn force_update_import_state_on_server( - &mut self, - logged_in_sdk: &LoggedInSdk, - ) -> Result<(), ImportError> { - logged_in_sdk - .mail_facade() - .get_crypto_entity_client() - .update_instance(self.remote_state.clone()) - .await - .map_err(|e| ImportError::sdk("update remote import state", e))?; - - self.last_server_update = SystemTime::now(); - Ok(()) - } - - fn change_status(&mut self, status: ImportStatus) { - self.remote_state.status = status as i64; - } - - fn add_newly_imported_mails(&mut self, mut newly_imported_mails: Vec) { - self.remote_state.successfulMails = self - .remote_state - .successfulMails - .saturating_add(newly_imported_mails.len().try_into().unwrap_or_default()); - - self.imported_mail_ids.append(&mut newly_imported_mails); - } - - fn add_failed_mails_count(&mut self, newly_failed_mails_count: usize) { - self.remote_state.failedMails = self - .remote_state - .failedMails - .saturating_add(newly_failed_mails_count.try_into().unwrap_or_default()); - } + async fn update_import_state_on_server( + &mut self, + logged_in_sdk: &LoggedInSdk, + ) -> Result<(), ImportError> { + if self.last_server_update.elapsed().unwrap_or_default() > Duration::from_secs(6) { + self.force_update_import_state_on_server(logged_in_sdk) + .await + } else { + Ok(()) + } + } + + async fn force_update_import_state_on_server( + &mut self, + logged_in_sdk: &LoggedInSdk, + ) -> Result<(), ImportError> { + logged_in_sdk + .mail_facade() + .get_crypto_entity_client() + .update_instance(self.remote_state.clone()) + .await + .map_err(|e| ImportError::sdk("update remote import state", e))?; + + self.last_server_update = SystemTime::now(); + Ok(()) + } + + fn change_status(&mut self, status: ImportStatus) { + self.remote_state.status = status as i64; + } + + fn add_newly_imported_mails(&mut self, mut newly_imported_mails: Vec) { + self.remote_state.successfulMails = self + .remote_state + .successfulMails + .saturating_add(newly_imported_mails.len().try_into().unwrap_or_default()); + + self.imported_mail_ids.append(&mut newly_imported_mails); + } + + fn add_failed_mails_count(&mut self, newly_failed_mails_count: usize) { + self.remote_state.failedMails = self + .remote_state + .failedMails + .saturating_add(newly_failed_mails_count.try_into().unwrap_or_default()); + } } impl Importer { - pub async fn create_imap_importer( - logged_in_sdk: Arc, - target_owner_group: GeneratedId, - target_mailset: IdTupleGenerated, - imap_config: ImapImportConfig, - ) -> Result { - let import_source = ImportSource::RemoteImap { - imap_import_client: ImapImport::new(imap_config), - }; - let mail_group_key = logged_in_sdk - .get_current_sym_group_key(&target_owner_group) - .await - .map_err(|e| ImportError::sdk("getting current_sym_group for imap import", e))?; - - let importer = Importer::new( - logged_in_sdk, - mail_group_key, - target_mailset, - import_source, - target_owner_group, - ); - - Ok(importer) - } - - pub async fn create_file_importer( - logged_in_sdk: Arc, - target_owner_group: GeneratedId, - target_mailset: IdTupleGenerated, - source_paths: Vec, - ) -> Result { - let fs_email_client = FileImport::new(source_paths) - .map_err(|e| ImportError::IterationError(IterationError::File(e)))?; - let import_source = ImportSource::LocalFile { fs_email_client }; - let mail_group_key = logged_in_sdk - .get_current_sym_group_key(&target_owner_group) - .await - .map_err(|err| { - ImportError::sdk("trying to get mail group key for target owner group", err) - })?; - - let importer = Importer::new( - logged_in_sdk, - mail_group_key, - target_mailset, - import_source, - target_owner_group, - ); - - Ok(importer) - } - - pub fn new( - logged_in_sdk: Arc, - mail_group_key: VersionedAesKey, - target_mailset: IdTupleGenerated, - import_source: ImportSource, - target_owner_group: GeneratedId, - ) -> Self { - let randomizer_facade = RandomizerFacade::from_core(rand::rngs::OsRng); - Self { - state: ImportState { - last_server_update: SystemTime::now(), - remote_state: ImportMailState { - _format: Default::default(), - _id: Default::default(), - _ownerEncSessionKey: Default::default(), - _ownerGroup: Default::default(), - _ownerKeyVersion: Default::default(), - _permissions: Default::default(), - status: Default::default(), - failedMails: Default::default(), - successfulMails: Default::default(), - targetFolder: IdTupleGenerated::new(Default::default(), Default::default()), - _errors: Default::default(), - _finalIvs: Default::default(), - }, - imported_mail_ids: vec![], - }, - source: import_source, - essentials: ImportEssential { - logged_in_sdk, - target_owner_group, - mail_group_key, - target_mailset, - randomizer_facade, - }, - } - } - - pub async fn import_next_chunk(&mut self) -> Result<(), ImportError> { - let Self { - essentials: import_essentials, - state: import_state, - source: import_source, - } = self; - - let mapped_import_source = import_source.into_iter().map(|importable_mail| { - UnitImport::create_from_importable_mail( - &import_essentials.randomizer_facade, - &import_essentials.mail_group_key, - importable_mail, - ) - }); - let mut chunked_mails_provider = - ImportableMailsButcher::new(mapped_import_source, |unit_import| { - estimate_json_size(&unit_import.import_mail_data) - }); - - match chunked_mails_provider.next() { - // everything have been finished - None => { - import_state.change_status(ImportStatus::Finished); - }, - - // this chunk was too big to import - Some(Err(too_big_chunk)) => { - import_state.add_failed_mails_count(1); - Err(ImportError::TooBigChunk)? - }, - - // these chunks can be imported in single request - Some(Ok(chunked_import_data)) => { - let expected_imported_mails_count = chunked_import_data.len(); - - let importable_post_data = import_essentials - .make_serialized_chunk(chunked_import_data) - .await - .map_err(|e| { - import_state.add_failed_mails_count(expected_imported_mails_count); - e - })?; - - let import_mails_post_out = import_essentials - .make_import_service_call(importable_post_data) - .await - .map_err(|e| { - import_state.add_failed_mails_count(expected_imported_mails_count); - e - })?; - - let imported_mails_count = import_mails_post_out.mails.len(); - import_state.add_newly_imported_mails(import_mails_post_out.mails); - - // make sure what we uploaded and what we got are same - if imported_mails_count != expected_imported_mails_count { - Err(ImportError::MismatchedImportCount { - expected: expected_imported_mails_count, - imported: imported_mails_count, - })? - } - }, - } - - Ok(()) - } - - pub async fn start_stateful_import( - &mut self, - callback_handle: impl Fn() -> CallbackHandle, - ) -> Result<(), Err> - where - CallbackHandle: Future>, - Err: From, - { - self.initialize_remote_state().await?; - - while self.get_remote_state().status != ImportStatus::Finished as i64 { - self.state.change_status(ImportStatus::Running); - - let callback_response = callback_handle().await?; - if callback_response.should_stop { - self.state.change_status(ImportStatus::Canceled); - break; - } - - self.import_next_chunk().await?; - - self.state - .update_import_state_on_server(&self.essentials.logged_in_sdk) - .await?; - } - - self.state - .force_update_import_state_on_server(&self.essentials.logged_in_sdk) - .await?; - - Ok(()) - } + pub async fn create_imap_importer( + logged_in_sdk: Arc, + target_owner_group: GeneratedId, + target_mailset: IdTupleGenerated, + imap_config: ImapImportConfig, + ) -> Result { + let import_source = ImportSource::RemoteImap { + imap_import_client: ImapImport::new(imap_config), + }; + let mail_group_key = logged_in_sdk + .get_current_sym_group_key(&target_owner_group) + .await + .map_err(|e| ImportError::sdk("getting current_sym_group for imap import", e))?; + + let importer = Importer::new( + logged_in_sdk, + mail_group_key, + target_mailset, + import_source, + target_owner_group, + ); + + Ok(importer) + } + + pub async fn create_file_importer( + logged_in_sdk: Arc, + target_owner_group: GeneratedId, + target_mailset: IdTupleGenerated, + source_paths: Vec, + ) -> Result { + let fs_email_client = FileImport::new(source_paths) + .map_err(|e| ImportError::IterationError(IterationError::File(e)))?; + let import_source = ImportSource::LocalFile { fs_email_client }; + let mail_group_key = logged_in_sdk + .get_current_sym_group_key(&target_owner_group) + .await + .map_err(|err| { + ImportError::sdk("trying to get mail group key for target owner group", err) + })?; + + let importer = Importer::new( + logged_in_sdk, + mail_group_key, + target_mailset, + import_source, + target_owner_group, + ); + + Ok(importer) + } + + pub fn new( + logged_in_sdk: Arc, + mail_group_key: VersionedAesKey, + target_mailset: IdTupleGenerated, + import_source: ImportSource, + target_owner_group: GeneratedId, + ) -> Self { + let randomizer_facade = RandomizerFacade::from_core(rand::rngs::OsRng); + Self { + state: ImportState { + last_server_update: SystemTime::now(), + remote_state: ImportMailState { + _format: Default::default(), + _id: Default::default(), + _ownerEncSessionKey: Default::default(), + _ownerGroup: Default::default(), + _ownerKeyVersion: Default::default(), + _permissions: Default::default(), + status: Default::default(), + failedMails: Default::default(), + successfulMails: Default::default(), + targetFolder: IdTupleGenerated::new(Default::default(), Default::default()), + _errors: Default::default(), + _finalIvs: Default::default(), + }, + imported_mail_ids: vec![], + }, + source: import_source, + essentials: ImportEssential { + logged_in_sdk, + target_owner_group, + mail_group_key, + target_mailset, + randomizer_facade, + }, + } + } + + pub async fn import_next_chunk(&mut self) -> Result<(), ImportError> { + let Self { + essentials: import_essentials, + state: import_state, + source: import_source, + } = self; + + let attachment_upload_data = import_source.into_iter() + .map(|importable_mail| { + AttachmentUploadData::create_from_importable_mail( + &import_essentials.randomizer_facade, + &import_essentials.mail_group_key, + importable_mail, + ) + }); + let mut chunked_mails_provider = ImportableMailsButcher::new( + attachment_upload_data, + |upload_data| { + estimate_json_size(&upload_data.keyed_import_mail_data.import_mail_data) + } + ); + + match chunked_mails_provider.next() { + // everything have been finished + None => { + import_state.change_status(ImportStatus::Finished); + } + + // this chunk was too big to import + Some(Err(too_big_chunk)) => { + import_state.add_failed_mails_count(1); + Err(ImportError::TooBigChunk)? + } + + // these chunks can be imported in single request + Some(Ok(chunked_import_data)) => { + let expected_imported_mails_count = chunked_import_data.len(); + + let unit_import_data = import_essentials + .upload_attachments_for_chunk(chunked_import_data) + .await + .map_err(|e| { + import_state.add_failed_mails_count(expected_imported_mails_count); + e + })?; + + let importable_post_data = import_essentials + .make_serialized_chunk(unit_import_data) + .await + .map_err(|e| { + import_state.add_failed_mails_count(expected_imported_mails_count); + e + })?; + + let import_mails_post_out = import_essentials + .make_import_service_call(importable_post_data) + .await + .map_err(|e| { + import_state.add_failed_mails_count(expected_imported_mails_count); + e + })?; + + let imported_mails_count = import_mails_post_out.mails.len(); + import_state.add_newly_imported_mails(import_mails_post_out.mails); + + // make sure what we uploaded and what we got are same + if imported_mails_count != expected_imported_mails_count { + Err(ImportError::MismatchedImportCount { + expected: expected_imported_mails_count, + imported: imported_mails_count, + })? + } + } + } + + Ok(()) + } + + pub async fn start_stateful_import( + &mut self, + callback_handle: impl Fn() -> CallbackHandle, + ) -> Result<(), Err> + where + CallbackHandle: Future>, + Err: From, + { + self.initialize_remote_state().await?; + + while self.get_remote_state().status != ImportStatus::Finished as i64 { + self.state.change_status(ImportStatus::Running); + + let callback_response = callback_handle().await?; + if callback_response.should_stop { + self.state.change_status(ImportStatus::Canceled); + break; + } + + self.import_next_chunk().await?; + + self.state + .update_import_state_on_server(&self.essentials.logged_in_sdk) + .await?; + } + + self.state + .force_update_import_state_on_server(&self.essentials.logged_in_sdk) + .await?; + + Ok(()) + } } impl ImportError { - pub fn sdk(action: &'static str, error: ApiCallError) -> Self { - Self::SdkError { action, error } - } + pub fn sdk(action: &'static str, error: ApiCallError) -> Self { + Self::SdkError { action, error } + } } #[cfg(test)] mod tests { - use super::*; - use crate::importer::imap_reader::{ImapCredentials, LoginMechanism}; - use crate::tuta_imap::testing::GreenMailTestServer; - use mail_builder::MessageBuilder; - use tutasdk::entities::generated::tutanota::MailFolder; - use tutasdk::folder_system::MailSetKind; - use tutasdk::net::native_rest_client::NativeRestClient; - use tutasdk::Sdk; - - const IMPORTED_MAIL_ADDRESS: &str = "map-premium@tutanota.de"; - - pub async fn import_all_of_source(importer: &mut Importer) -> Result<(), ImportError> { - importer - .start_stateful_import(|| async { Ok(StateCallbackResponse { should_stop: false }) }) - .await - } - - fn sample_email(subject: String) -> String { - let email = MessageBuilder::new() + use super::*; + use crate::importer::imap_reader::{ImapCredentials, LoginMechanism}; + use crate::tuta_imap::testing::GreenMailTestServer; + use mail_builder::MessageBuilder; + use tutasdk::entities::generated::tutanota::MailFolder; + use tutasdk::folder_system::MailSetKind; + use tutasdk::net::native_rest_client::NativeRestClient; + use tutasdk::Sdk; + + const IMPORTED_MAIL_ADDRESS: &str = "map-premium@tutanota.de"; + + pub async fn import_all_of_source(importer: &mut Importer) -> Result<(), ImportError> { + importer + .start_stateful_import(|| async { Ok(StateCallbackResponse { should_stop: false }) }) + .await + } + + fn sample_email(subject: String) -> String { + let email = MessageBuilder::new() .from(("Matthias", "map@example.org")) .to(("Johannes", "jhm@example.org")) .subject(subject) .text_body("Hello tutao! this is the first step to have email import.Want to see html 😀?

red

") .write_to_string() .unwrap(); - email - } - - async fn get_test_import_folder_id( - logged_in_sdk: &Arc, - kind: MailSetKind, - ) -> MailFolder { - let mail_facade = logged_in_sdk.mail_facade(); - let mailbox = mail_facade.load_user_mailbox().await.unwrap(); - let folders = mail_facade - .load_folders_for_mailbox(&mailbox) - .await - .unwrap(); - folders - .system_folder_by_type(kind) - .expect("inbox should exist") - .clone() - } - - pub async fn init_importer(import_source: ImportSource) -> Importer { - let logged_in_sdk = Sdk::new( - "http://localhost:9000".to_string(), - Arc::new(NativeRestClient::try_new().unwrap()), - ) - .create_session(IMPORTED_MAIL_ADDRESS, "map") - .await - .unwrap(); - - let target_mail_folder = get_test_import_folder_id(&logged_in_sdk, MailSetKind::Archive) - .await - ._id - .unwrap(); - - let target_owner_group = logged_in_sdk - .mail_facade() - .get_group_id_for_mail_address(IMPORTED_MAIL_ADDRESS) - .await - .unwrap(); - let mail_group_key = logged_in_sdk - .get_current_sym_group_key(&target_owner_group) - .await - .unwrap(); - - Importer::new( - logged_in_sdk, - mail_group_key, - target_mail_folder, - import_source, - target_owner_group, - ) - } - - async fn init_imap_importer() -> (Importer, GreenMailTestServer) { - let greenmail = GreenMailTestServer::new(); - let imap_import_config = ImapImportConfig { - root_import_mail_folder_name: "/".to_string(), - credentials: ImapCredentials { - host: "127.0.0.1".to_string(), - port: greenmail.imaps_port.try_into().unwrap(), - login_mechanism: LoginMechanism::Plain { - username: "sug@example.org".to_string(), - password: "sug".to_string(), - }, - }, - }; - - let import_source = ImportSource::RemoteImap { - imap_import_client: ImapImport::new(imap_import_config), - }; - (init_importer(import_source).await, greenmail) - } - - pub async fn init_file_importer(source_paths: Vec<&str>) -> Importer { - let files = source_paths - .into_iter() - .map(|file_name| { - format!( - "{}/tests/resources/testmail/{file_name}", - env!("CARGO_MANIFEST_DIR") - ) - }) - .collect(); - let import_source = ImportSource::LocalFile { - fs_email_client: FileImport::new(files).unwrap(), - }; - init_importer(import_source).await - } - - #[tokio::test] - pub async fn import_multiple_from_imap_default_folder() { - let (mut importer, greenmail) = init_imap_importer().await; - - let email_first = sample_email("Hello from imap 😀! -- Список.doc".to_string()); - let email_second = sample_email("Second time: hello".to_string()); - greenmail.store_mail("sug@example.org", email_first.as_str()); - greenmail.store_mail("sug@example.org", email_second.as_str()); - - import_all_of_source(&mut importer).await.unwrap(); - let remote_state = importer.get_remote_state(); - - assert_eq!(remote_state.status, ImportStatus::Finished as i64); - assert_eq!(remote_state.failedMails, 0); - assert_eq!(remote_state.successfulMails, 2); - } - - #[tokio::test] - pub async fn import_single_from_imap_default_folder() { - let (mut importer, greenmail) = init_imap_importer().await; - - let email = sample_email("Single email".to_string()); - greenmail.store_mail("sug@example.org", email.as_str()); - - import_all_of_source(&mut importer).await.unwrap(); - let remote_state = importer.get_remote_state(); - - assert_eq!(remote_state.status, ImportStatus::Finished as i64); - assert_eq!(remote_state.failedMails, 0); - assert_eq!(remote_state.successfulMails, 1); - } - - #[tokio::test] - async fn can_import_single_eml_file_without_attachment() { - let mut importer = init_file_importer(vec!["sample.eml"]).await; - import_all_of_source(&mut importer).await.unwrap(); - let remote_state = importer.get_remote_state(); - - assert_eq!(remote_state.status, ImportStatus::Finished as i64); - assert_eq!(remote_state.failedMails, 0); - assert_eq!(remote_state.successfulMails, 1); - } - - #[tokio::test] - async fn can_import_single_eml_file_with_attachment() { - let mut importer = init_file_importer(vec!["attachment_sample.eml"]).await; - import_all_of_source(&mut importer).await.unwrap(); - let remote_state = importer.get_remote_state(); - - assert_eq!(remote_state.status, ImportStatus::Finished as i64); - assert_eq!(remote_state.failedMails, 0); - assert_eq!(remote_state.successfulMails, 1); - } - - #[tokio::test] - async fn should_stop_if_true_response() { - let mut importer = init_file_importer(vec!["sample.eml"]).await; - - let callback_resolver = - || async { Result::<_, ImportError>::Ok(StateCallbackResponse { should_stop: true }) }; - importer - .start_stateful_import(callback_resolver) - .await - .unwrap(); - let remote_state = importer.get_remote_state(); - - assert_eq!(remote_state.status, ImportStatus::Canceled as i64); - assert_eq!(remote_state.failedMails, 0); - assert_eq!(remote_state.successfulMails, 0); - } + email + } + + async fn get_test_import_folder_id( + logged_in_sdk: &Arc, + kind: MailSetKind, + ) -> MailFolder { + let mail_facade = logged_in_sdk.mail_facade(); + let mailbox = mail_facade.load_user_mailbox().await.unwrap(); + let folders = mail_facade + .load_folders_for_mailbox(&mailbox) + .await + .unwrap(); + folders + .system_folder_by_type(kind) + .expect("inbox should exist") + .clone() + } + + pub async fn init_importer(import_source: ImportSource) -> Importer { + let logged_in_sdk = Sdk::new( + "http://localhost:9000".to_string(), + Arc::new(NativeRestClient::try_new().unwrap()), + ) + .create_session(IMPORTED_MAIL_ADDRESS, "map") + .await + .unwrap(); + + let target_mail_folder = get_test_import_folder_id(&logged_in_sdk, MailSetKind::Archive) + .await + ._id + .unwrap(); + + let target_owner_group = logged_in_sdk + .mail_facade() + .get_group_id_for_mail_address(IMPORTED_MAIL_ADDRESS) + .await + .unwrap(); + let mail_group_key = logged_in_sdk + .get_current_sym_group_key(&target_owner_group) + .await + .unwrap(); + + Importer::new( + logged_in_sdk, + mail_group_key, + target_mail_folder, + import_source, + target_owner_group, + ) + } + + async fn init_imap_importer() -> (Importer, GreenMailTestServer) { + let greenmail = GreenMailTestServer::new(); + let imap_import_config = ImapImportConfig { + root_import_mail_folder_name: "/".to_string(), + credentials: ImapCredentials { + host: "127.0.0.1".to_string(), + port: greenmail.imaps_port.try_into().unwrap(), + login_mechanism: LoginMechanism::Plain { + username: "sug@example.org".to_string(), + password: "sug".to_string(), + }, + }, + }; + + let import_source = ImportSource::RemoteImap { + imap_import_client: ImapImport::new(imap_import_config), + }; + (init_importer(import_source).await, greenmail) + } + + pub async fn init_file_importer(source_paths: Vec<&str>) -> Importer { + let files = source_paths + .into_iter() + .map(|file_name| { + format!( + "{}/tests/resources/testmail/{file_name}", + env!("CARGO_MANIFEST_DIR") + ) + }) + .collect(); + let import_source = ImportSource::LocalFile { + fs_email_client: FileImport::new(files).unwrap(), + }; + init_importer(import_source).await + } + + #[tokio::test] + pub async fn import_multiple_from_imap_default_folder() { + let (mut importer, greenmail) = init_imap_importer().await; + + let email_first = sample_email("Hello from imap 😀! -- Список.doc".to_string()); + let email_second = sample_email("Second time: hello".to_string()); + greenmail.store_mail("sug@example.org", email_first.as_str()); + greenmail.store_mail("sug@example.org", email_second.as_str()); + + import_all_of_source(&mut importer).await.unwrap(); + let remote_state = importer.get_remote_state(); + + assert_eq!(remote_state.status, ImportStatus::Finished as i64); + assert_eq!(remote_state.failedMails, 0); + assert_eq!(remote_state.successfulMails, 2); + } + + #[tokio::test] + pub async fn import_single_from_imap_default_folder() { + let (mut importer, greenmail) = init_imap_importer().await; + + let email = sample_email("Single email".to_string()); + greenmail.store_mail("sug@example.org", email.as_str()); + + import_all_of_source(&mut importer).await.unwrap(); + let remote_state = importer.get_remote_state(); + + assert_eq!(remote_state.status, ImportStatus::Finished as i64); + assert_eq!(remote_state.failedMails, 0); + assert_eq!(remote_state.successfulMails, 1); + } + + #[tokio::test] + async fn can_import_single_eml_file_without_attachment() { + let mut importer = init_file_importer(vec!["sample.eml"]).await; + import_all_of_source(&mut importer).await.unwrap(); + let remote_state = importer.get_remote_state(); + + assert_eq!(remote_state.status, ImportStatus::Finished as i64); + assert_eq!(remote_state.failedMails, 0); + assert_eq!(remote_state.successfulMails, 1); + } + + #[tokio::test] + async fn can_import_single_eml_file_with_attachment() { + let mut importer = init_file_importer(vec!["attachment_sample.eml"]).await; + import_all_of_source(&mut importer).await.unwrap(); + let remote_state = importer.get_remote_state(); + + assert_eq!(remote_state.status, ImportStatus::Finished as i64); + assert_eq!(remote_state.failedMails, 0); + assert_eq!(remote_state.successfulMails, 1); + } + + #[tokio::test] + async fn should_stop_if_true_response() { + let mut importer = init_file_importer(vec!["sample.eml"]).await; + + let callback_resolver = + || async { Result::<_, ImportError>::Ok(StateCallbackResponse { should_stop: true }) }; + importer + .start_stateful_import(callback_resolver) + .await + .unwrap(); + let remote_state = importer.get_remote_state(); + + assert_eq!(remote_state.status, ImportStatus::Canceled as i64); + assert_eq!(remote_state.failedMails, 0); + assert_eq!(remote_state.successfulMails, 0); + } } diff --git a/packages/node-mimimi/src/reduce_to_chunks.rs b/packages/node-mimimi/src/reduce_to_chunks.rs index 54ad6785fb19..cc584fffb7dd 100644 --- a/packages/node-mimimi/src/reduce_to_chunks.rs +++ b/packages/node-mimimi/src/reduce_to_chunks.rs @@ -5,152 +5,159 @@ use tutasdk::crypto::key::{GenericAesKey, VersionedAesKey}; use tutasdk::crypto::randomizer_facade::RandomizerFacade; use tutasdk::entities::generated::tutanota::ImportMailData; -pub struct UnitImport { - pub import_mail_data: ImportMailData, - pub attachments: Vec, - pub session_key: GenericAesKey, +pub struct AttachmentUploadData { + pub keyed_import_mail_data: KeyedImportMailData, + pub attachments: Vec, } -impl UnitImport { - pub fn create_from_importable_mail( - randomizer_facade: &RandomizerFacade, - mail_group_key: &VersionedAesKey, - mut importable_mail: ImportableMail, - ) -> Self { - let session_key = GenericAesKey::Aes256(aes::Aes256Key::generate(randomizer_facade)); - let owner_enc_session_key = - mail_group_key.encrypt_key(&session_key, aes::Iv::generate(randomizer_facade)); - - let attachments = importable_mail.take_out_attachments(); - let import_mail_data = importable_mail - .make_import_mail_data(owner_enc_session_key.object, owner_enc_session_key.version); - - UnitImport { - import_mail_data, - attachments, - session_key, - } - } +pub struct KeyedImportMailData { + pub session_key: GenericAesKey, + pub import_mail_data: ImportMailData, +} + +impl AttachmentUploadData { + pub fn create_from_importable_mail( + randomizer_facade: &RandomizerFacade, + mail_group_key: &VersionedAesKey, + mut importable_mail: ImportableMail, + ) -> Self { + let session_key = GenericAesKey::Aes256(aes::Aes256Key::generate(randomizer_facade)); + let owner_enc_session_key = mail_group_key + .encrypt_key(&session_key, aes::Iv::generate(randomizer_facade)); + + let attachments = importable_mail + .take_out_attachments(); + let import_mail_data = importable_mail + .make_import_mail_data(owner_enc_session_key.object, owner_enc_session_key.version); + + AttachmentUploadData { + keyed_import_mail_data: KeyedImportMailData { + import_mail_data, + session_key, + }, + attachments, + } + } } pub struct Butcher< - // Butcher will gurantee every chunk is equal or less than this limit - const CHUNK_LIMIT: usize, - // type of element which is to be chunked - ResolvingElement, - // streamer for all the ResolvingElement till end - Source: Iterator, + // Butcher will gurantee every chunk is equal or less than this limit + const CHUNK_LIMIT: usize, + // type of element which is to be chunked + ResolvingElement, + // streamer for all the ResolvingElement till end + Source: Iterator, > { - // Butcher will try to adjust every element into one chunk, - // but if CHUNK_LIMIT is surpassed, it should put the element back in same position it was before, - // hence, we should be able to peek one element ahead - provider: Peekable, + // Butcher will try to adjust every element into one chunk, + // but if CHUNK_LIMIT is surpassed, it should put the element back in same position it was before, + // hence, we should be able to peek one element ahead + provider: Peekable, - // given a ResolvingElement, estimate it's size - sizer: fn(&ResolvingElement) -> usize, + // given a ResolvingElement, estimate it's size + sizer: fn(&ResolvingElement) -> usize, } -impl> Butcher { - pub fn new(source: Src, sizer: fn(&Re) -> usize) -> Self { - Self { - provider: source.peekable(), - sizer, - } - } +impl> Butcher { + pub fn new(source: Src, sizer: fn(&Re) -> usize) -> Self { + Self { + provider: source.peekable(), + sizer, + } + } } /// Iterating over Butcher, will resolve to this item. /// Ok: collection of element that is guaranteed to be within CHUNK_LIMIT /// Err: single element which was already larger than CHUNK_LIMIT, hence can not even make a single chunk pub(super) type ChunkedImportItem = - Result, ResolvingElement>; +Result, ResolvingElement>; impl Iterator - for Butcher +for Butcher where - Source: Iterator, + Source: Iterator, { - type Item = ChunkedImportItem; - - fn next(&mut self) -> Option { - let Self { provider, sizer } = self; - let mut imports_in_this_chunk = Vec::new(); - - let mut cumulative_import_size: usize = 0; - while let Some(next_element_to_include) = provider.peek() { - cumulative_import_size = - cumulative_import_size.saturating_add(sizer(next_element_to_include)); - - if cumulative_import_size <= CHUNK_LIMIT { - let next_element_to_include = - provider.next().expect("was peekable item must be there"); - imports_in_this_chunk.push(next_element_to_include); - } else { - break; - } - } - - let item = if imports_in_this_chunk.is_empty() { - let too_big_import = self.provider.next()?; - // not a single item was added to chunk, - // because single chunk was too big, return as-is as failure, - Err(too_big_import) - } else { - Ok(imports_in_this_chunk) - }; - Some(item) - } + type Item = ChunkedImportItem; + + fn next(&mut self) -> Option { + let Self { provider, sizer } = self; + let mut imports_in_this_chunk = Vec::new(); + + let mut cumulative_import_size: usize = 0; + while let Some(next_element_to_include) = provider.peek() { + cumulative_import_size = + cumulative_import_size.saturating_add(sizer(next_element_to_include)); + + if cumulative_import_size <= CHUNK_LIMIT { + let next_element_to_include = + provider.next().expect("was peekable item must be there"); + imports_in_this_chunk.push(next_element_to_include); + } else { + break; + } + } + + let item = if imports_in_this_chunk.is_empty() { + let too_big_import = self.provider.next()?; + // not a single item was added to chunk, + // because single chunk was too big, return as-is as failure, + Err(too_big_import) + } else { + Ok(imports_in_this_chunk) + }; + Some(item) + } } #[cfg(test)] mod tests { - use super::*; - - fn run_butcher(data: Vec) -> Vec, usize>> { - Butcher::>::new(data.into_iter(), usize::clone) - .into_iter() - .collect() - } - - #[test] - fn should_optimize_for_maximum_chunk() { - assert_eq!( - run_butcher::<6>(vec![1, 2, 3, 4, 5, 6]), - vec![Ok(vec![1, 2, 3]), Ok(vec![4]), Ok(vec![5]), Ok(vec![6])] - ); - } - - #[test] - fn should_err_on_too_big_of_chunk() { - assert_eq!( - run_butcher::<6>(vec![0, 2, 10, 1, 2, 3]), - vec![Ok(vec![0, 2]), Err(10), Ok(vec![1, 2, 3])] - ); - } - - #[test] - fn element_with_maximum_size_is_accepted() { - assert_eq!( - run_butcher::<5>(vec![5, 5, 1, 4, 1]), - vec![Ok(vec![5]), Ok(vec![5]), Ok(vec![1, 4]), Ok(vec![1])] - ); - } - - #[test] - fn should_greedy_chunk() { - assert_eq!( - run_butcher::<10_000>(vec![2; 5_000]), - vec![Ok(vec![2; 5_000])] - ); - } - - #[test] - fn should_accept_empty_source() { - assert_eq!(run_butcher::<10>(vec![]), vec![]); - } - - #[test] - fn should_accept_all_big_source() { - assert_eq!(run_butcher::<5>(vec![6; 10]), vec![Err(6); 10]); - } + use super::*; + + fn run_butcher(data: Vec) -> Vec, usize>> { + Butcher::>::new(data.into_iter(), usize::clone) + .into_iter() + .collect() + } + + #[test] + fn should_optimize_for_maximum_chunk() { + assert_eq!( + run_butcher::<6>(vec![1, 2, 3, 4, 5, 6]), + vec![Ok(vec![1, 2, 3]), Ok(vec![4]), Ok(vec![5]), Ok(vec![6])] + ); + } + + #[test] + fn should_err_on_too_big_of_chunk() { + assert_eq!( + run_butcher::<6>(vec![0, 2, 10, 1, 2, 3]), + vec![Ok(vec![0, 2]), Err(10), Ok(vec![1, 2, 3])] + ); + } + + #[test] + fn element_with_maximum_size_is_accepted() { + assert_eq!( + run_butcher::<5>(vec![5, 5, 1, 4, 1]), + vec![Ok(vec![5]), Ok(vec![5]), Ok(vec![1, 4]), Ok(vec![1])] + ); + } + + #[test] + fn should_greedy_chunk() { + assert_eq!( + run_butcher::<10_000>(vec![2; 5_000]), + vec![Ok(vec![2; 5_000])] + ); + } + + #[test] + fn should_accept_empty_source() { + assert_eq!(run_butcher::<10>(vec![]), vec![]); + } + + #[test] + fn should_accept_all_big_source() { + assert_eq!(run_butcher::<5>(vec![6; 10]), vec![Err(6); 10]); + } } diff --git a/packages/node-mimimi/tests/resources/testmail/attachment_sample.eml b/packages/node-mimimi/tests/resources/testmail/attachment_sample.eml index f7d975358461..dd5bfa23934a 100644 --- a/packages/node-mimimi/tests/resources/testmail/attachment_sample.eml +++ b/packages/node-mimimi/tests/resources/testmail/attachment_sample.eml @@ -14,7 +14,19 @@ HBvcnRpbmcmcmRxdW87IHRvIGZvY3VzIGp1c3Qgb24gdGhlICZsZHF1bztpbXBvcnRpbm cmcmRxdW87LDwvcD48cD5idXQgdGhlbiBJIHRob3VnaHQsIHdoeSBub3QgZG8gYm90aD8 gJiN4MjYzQTs8L3A+PC9odG1sPg== --festivus -Content-Type: image/gif; name="TestFile.gif"; +Content-Type: image/gif; name="TestFile1.gif"; +Content-Transfer-Encoding: Base64 +Content-Disposition: attachment + +R0lGODlhAQABAIAAAAAAAP///yH5BAEAAAAALAAAAAABAAEAAAIBRAA7 +--festivus +Content-Type: image/gif; name="TestFile2.gif"; +Content-Transfer-Encoding: Base64 +Content-Disposition: attachment + +R0lGODlhAQABAIAAAAAAAP///yH5BAEAAAAALAAAAAABAAEAAAIBRAA7 +--festivus +Content-Type: image/gif; name="TestFile3.gif"; Content-Transfer-Encoding: Base64 Content-Disposition: attachment