Skip to content
This repository was archived by the owner on Nov 7, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using System;
using Microsoft.VisualStudio.TestTools.UnitTesting;

namespace Microsoft.DataTransfer.AzureTable.UnitTests
{
[TestClass]
public class ContinuationTokenParserTests
{
[TestMethod]
public void EncodeContinuationToken_StringEncoded()
{
var testString = "test";
var encodedToken = ContinuationTokenParser.EncodeContinuationToken(testString);

Assert.AreEqual("1!8!dGVzdA--", encodedToken, "The encoded token should be as expected.");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="12.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
<ProjectGuid>{428B3874-8780-4939-AC5A-79C3980E01C7}</ProjectGuid>
<OutputType>Library</OutputType>
<AppDesignerFolder>Properties</AppDesignerFolder>
<RootNamespace>Microsoft.DataTransfer.AzureTable.UnitTests</RootNamespace>
<AssemblyName>Microsoft.DataTransfer.AzureTable.UnitTests</AssemblyName>
<TargetFrameworkVersion>v4.5.2</TargetFrameworkVersion>
<FileAlignment>512</FileAlignment>
<ProjectTypeGuids>{3AC096D0-A1C2-E12C-1390-A8335801FDAB};{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}</ProjectTypeGuids>
<VisualStudioVersion Condition="'$(VisualStudioVersion)' == ''">15.0</VisualStudioVersion>
<VSToolsPath Condition="'$(VSToolsPath)' == ''">$(MSBuildExtensionsPath32)\Microsoft\VisualStudio\v$(VisualStudioVersion)</VSToolsPath>
<ReferencePath>$(ProgramFiles)\Common Files\microsoft shared\VSTT\$(VisualStudioVersion)\UITestExtensionPackages</ReferencePath>
<IsCodedUITest>False</IsCodedUITest>
<TestProjectType>UnitTest</TestProjectType>
<NuGetPackageImportStamp>
</NuGetPackageImportStamp>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|x64' ">
<DebugSymbols>true</DebugSymbols>
<DebugType>full</DebugType>
<Optimize>false</Optimize>
<OutputPath>bin\Debug\</OutputPath>
<DefineConstants>DEBUG;TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|x64' ">
<DebugType>pdbonly</DebugType>
<Optimize>true</Optimize>
<OutputPath>bin\Release\</OutputPath>
<DefineConstants>TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<ItemGroup>
<Reference Include="Castle.Core, Version=3.3.0.0, Culture=neutral, PublicKeyToken=407dd0808d44fbdc, processorArchitecture=MSIL">
<HintPath>..\..\packages\Castle.Core.3.3.3\lib\net45\Castle.Core.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="Microsoft.Azure.Documents.Client, Version=2.2.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>..\..\packages\Microsoft.Azure.DocumentDB.2.2.1\lib\net45\Microsoft.Azure.Documents.Client.dll</HintPath>
</Reference>
<Reference Include="Moq, Version=4.5.21.0, Culture=neutral, PublicKeyToken=69f491c39445e920, processorArchitecture=MSIL">
<HintPath>..\..\packages\Moq.4.5.21\lib\net45\Moq.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="Newtonsoft.Json, Version=10.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL">
<HintPath>..\..\packages\Newtonsoft.Json.10.0.2\lib\net45\Newtonsoft.Json.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="System" />
<Reference Include="System.Core" />
</ItemGroup>
<Choose>
<When Condition="('$(VisualStudioVersion)' == '10.0' or '$(VisualStudioVersion)' == '') and '$(TargetFrameworkVersion)' == 'v3.5'">
<ItemGroup>
<Reference Include="Microsoft.VisualStudio.QualityTools.UnitTestFramework, Version=10.1.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL" />
</ItemGroup>
</When>
<Otherwise>
<ItemGroup>
<Reference Include="Microsoft.VisualStudio.QualityTools.UnitTestFramework" />
</ItemGroup>
</Otherwise>
</Choose>
<ItemGroup>
<Compile Include="ContinuationTokenParserTests.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
Comment thread
dongyangcheng marked this conversation as resolved.
</ItemGroup>
<ItemGroup>
<None Include="packages.config" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Microsoft.DataTransfer.AzureTable\Microsoft.DataTransfer.AzureTable.csproj">
<Project>{366ba489-e851-4899-9ba3-2f9c7599d24b}</Project>
<Name>Microsoft.DataTransfer.AzureTable</Name>
</ProjectReference>
</ItemGroup>
<Import Project="$(VSToolsPath)\TeamTest\Microsoft.TestTools.targets" Condition="Exists('$(VSToolsPath)\TeamTest\Microsoft.TestTools.targets')" />
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<Target Name="EnsureNuGetPackageBuildImports" BeforeTargets="PrepareForBuild">
<PropertyGroup>
<ErrorText>This project references NuGet package(s) that are missing on this computer. Use NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}.</ErrorText>
</PropertyGroup>
<Error Condition="!Exists('..\..\packages\MSTest.TestAdapter.1.3.2\build\net45\MSTest.TestAdapter.props')" Text="$([System.String]::Format('$(ErrorText)', '..\..\packages\MSTest.TestAdapter.1.3.2\build\net45\MSTest.TestAdapter.props'))" />
<Error Condition="!Exists('..\..\packages\MSTest.TestAdapter.1.3.2\build\net45\MSTest.TestAdapter.targets')" Text="$([System.String]::Format('$(ErrorText)', '..\..\packages\MSTest.TestAdapter.1.3.2\build\net45\MSTest.TestAdapter.targets'))" />
</Target>
<Import Project="..\..\packages\MSTest.TestAdapter.1.3.2\build\net45\MSTest.TestAdapter.targets" Condition="Exists('..\..\packages\MSTest.TestAdapter.1.3.2\build\net45\MSTest.TestAdapter.targets')" />
</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;

[assembly: AssemblyTitle("Microsoft.DataTransfer.AzureTable.UnitTests")]
[assembly: AssemblyDescription("")]
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("")]
[assembly: AssemblyProduct("Microsoft.DataTransfer.AzureTable.UnitTests")]
[assembly: AssemblyCopyright("Copyright © 2019")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]

[assembly: ComVisible(false)]

[assembly: Guid("428b3874-8780-4939-ac5a-79c3980e01c7")]

// [assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("1.0.0.0")]
[assembly: AssemblyFileVersion("1.0.0.0")]
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
Comment thread
dongyangcheng marked this conversation as resolved.
<package id="MSTest.TestAdapter" version="1.3.2" targetFramework="net452" />
<package id="MSTest.TestFramework" version="1.3.2" targetFramework="net452" />
</packages>
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
using System;
using System.Text;

namespace Microsoft.DataTransfer.AzureTable
{
/// <summary>
/// The class which is used to encode the continuation token
/// </summary>
public class ContinuationTokenParser
Comment thread
dongyangcheng marked this conversation as resolved.
Outdated
{
/// <summary>
/// Encode continuation token to fomart: (Version)!(TokenLength)!(CustomBase64EncodedToken)
/// </summary>
Comment thread
dongyangcheng marked this conversation as resolved.
public static string EncodeContinuationToken(string key)
{
StringBuilder encodedContinuationToken = new StringBuilder();
// Version of the ContinuationToken
encodedContinuationToken.Append(1);
encodedContinuationToken.Append(exclamationDelimiter);

UTF8Encoding utf8Encoding = new UTF8Encoding();
Comment thread
dongyangcheng marked this conversation as resolved.
Outdated
string base64EncodedToken = Convert.ToBase64String(utf8Encoding.GetBytes(key.ToString()));
string customBase64EncodedString = UrlCustomEscapeBase64String(base64EncodedToken);

//Size is the lenght of base64 encoded key
Comment thread
dongyangcheng marked this conversation as resolved.
Outdated
encodedContinuationToken.Append(customBase64EncodedString.Length);
encodedContinuationToken.Append(exclamationDelimiter);

encodedContinuationToken.Append(customBase64EncodedString);
return encodedContinuationToken.ToString();
}

private static string UrlCustomEscapeBase64String(string token)
Comment thread
dongyangcheng marked this conversation as resolved.
Outdated
{
StringBuilder escapedString = new StringBuilder();
foreach (char c in token.ToCharArray())
{
escapedString.Append(TranslateChar(c));
}

return escapedString.ToString();
}

private static char TranslateChar(char c)
{
switch (c)
{
case '/': return '_';
case '+': return '*';
case '=': return '-';
default: return c;
}
}

private const char exclamationDelimiter = '!';
Comment thread
dongyangcheng marked this conversation as resolved.
Outdated
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
<DesignTime>True</DesignTime>
<DependentUpon>ConfigurationResources.resx</DependentUpon>
</Compile>
<Compile Include="ContinuationTokenParser.cs" />
<Compile Include="Defaults.cs" />
<Compile Include="DynamicConfigurationResources.cs" />
<Compile Include="Errors.cs" />
Expand All @@ -99,6 +100,8 @@
<DesignTime>True</DesignTime>
<DependentUpon>Resources.resx</DependentUpon>
</Compile>
<Compile Include="Resumption\AzureTablePrimaryKey.cs" />
<Compile Include="Resumption\AzureTableResumptionAdaptor.cs" />
<Compile Include="Shared\AzureStorageLocationMode.cs" />
<Compile Include="Shared\IAzureTableAdapterConfiguration.cs" />
<Compile Include="Sink\BatchSizeTracker.cs" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Microsoft.DataTransfer.AzureTable.Resumption
{
/// <summary>
/// Define the checkpoint for AzureTable data transfer
Comment thread
dongyangcheng marked this conversation as resolved.
Outdated
/// </summary>
public class AzureTablePrimaryKey
{
/// <summary>
/// The partition key of the checkpoint
/// </summary>
public string PartitionKey { get; set; }

/// <summary>
/// The row key of the checkpoint
/// </summary>
public string RowKey { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
using Microsoft.DataTransfer.Basics;
using Microsoft.DataTransfer.Extensibility;
using Newtonsoft.Json;
using System;
using System.IO;

namespace Microsoft.DataTransfer.AzureTable.Resumption
{
/// <summary>
/// Adaptor for the resume functionality for data transfer between Azure Table Storage
/// </summary>
public class AzureTableResumptionAdaptor : IDataTransferResumptionAdapter<AzureTablePrimaryKey>
{
private readonly string _fileFullPath;

/// <summary>
/// Constructor
Comment thread
dongyangcheng marked this conversation as resolved.
Outdated
/// </summary>
/// <param name="fileName">The name of the checkpoint file</param>
public AzureTableResumptionAdaptor(string fileName)
{
Guard.NotEmpty(nameof(fileName), fileName);

if (fileName.IndexOfAny(Path.GetInvalidFileNameChars()) >= 0)
{
throw new ArgumentException("File name contains invalid characters.");
}

var localAppDataFolder = Environment.GetFolderPath(Environment.SpecialFolder.LocalApplicationData);
var folderName = Path.Combine(localAppDataFolder, "dt");
Directory.CreateDirectory(folderName);

_fileFullPath = Path.Combine(folderName, fileName);
}

/// <summary>
/// Get the checkpoint from the file
/// </summary>
/// <returns>The checkpoint</returns>
public AzureTablePrimaryKey GetCheckpoint()
{
if (File.Exists(_fileFullPath))
{
return JsonConvert.DeserializeObject<AzureTablePrimaryKey>(File.ReadAllText(_fileFullPath));
}

return null;
}

/// <summary>
/// Save the checkpoint to the file
/// </summary>
/// <param name="checkpoint">The checkpoint to store</param>
public void SaveCheckpoint(AzureTablePrimaryKey checkpoint)
{
Guard.NotNull(nameof(checkpoint), checkpoint);

using (StreamWriter file = File.CreateText(_fileFullPath))
{
JsonSerializer serializer = new JsonSerializer();
serializer.Serialize(file, checkpoint);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Microsoft.Azure.CosmosDB.Table;
using Microsoft.Azure.Storage;
using Microsoft.Azure.Storage.RetryPolicies;
using Microsoft.DataTransfer.AzureTable.Resumption;
using Microsoft.DataTransfer.AzureTable.Sink.Bulk;
using Microsoft.DataTransfer.AzureTable.Source;
using Microsoft.DataTransfer.AzureTable.Utils;
Expand All @@ -26,6 +27,7 @@ internal sealed class TableAPIBulkSinkAdapter : IDataSinkAdapter
private long _maxInputBufferSizeInBytes;
private int _throughput;
private int _maxLengthInBytesPerBatch;
private IDataTransferResumptionAdapter<AzureTablePrimaryKey> _resumptionAdapter;

private CloudTable cloudtable;
private ConcurrentDictionary<string, TableBatchOperation> dict;
Expand All @@ -39,14 +41,16 @@ public int MaxDegreeOfParallelism
}

public TableAPIBulkSinkAdapter(string connectionString, string tableName,
bool overwrite, long maxInputBufferSizeInBytes, int throughput, int batchSize)
bool overwrite, long maxInputBufferSizeInBytes, int throughput, int batchSize,
IDataTransferResumptionAdapter<AzureTablePrimaryKey> resumptionAdapter)
{
_connectionString = connectionString;
_tableName = tableName;
_overwrite = overwrite;
_maxInputBufferSizeInBytes = maxInputBufferSizeInBytes;
_throughput = throughput;
_maxLengthInBytesPerBatch = batchSize;
_resumptionAdapter = resumptionAdapter;

CloudStorageAccount storageAccount = CloudStorageAccount.Parse(_connectionString);

Expand Down Expand Up @@ -76,6 +80,11 @@ public async Task InitializeAsync(CancellationToken cancellation)
public async Task WriteAsync(IDataItem dataItem, CancellationToken cancellation)
{
var item = GetITableEntityFromIDataItem(dataItem);
if (dict.Count == 0)
{
_resumptionAdapter?.SaveCheckpoint(
new AzureTablePrimaryKey { PartitionKey = item.PartitionKey, RowKey = item.RowKey });
}

inputSizeTracker.Add(item);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace Microsoft.DataTransfer.TableAPI.Sink.Bulk
{
using Microsoft.DataTransfer.AzureTable;
using Microsoft.DataTransfer.AzureTable.Resumption;
using Microsoft.DataTransfer.Basics;
using Microsoft.DataTransfer.Extensibility;
using Microsoft.DataTransfer.Extensibility.Basics;
Expand Down Expand Up @@ -61,7 +62,8 @@ public async Task<IDataSinkAdapter> CreateAsync(ITableAPIBulkSinkAdapterConfigur

var sink = new TableAPIBulkSinkAdapter(configuration.ConnectionString,
configuration.TableName, configuration.Overwrite,
maxInputBufferSizeInBytes, throughput, batchSize);
maxInputBufferSizeInBytes, throughput, batchSize,
context.EnableResumeFunction ? new AzureTableResumptionAdaptor(context.RunConfigSignature + ".json") : null);

await sink.InitializeAsync(cancellation);

Expand Down
Loading