Skip to content

Commit

Permalink
Merge pull request #1 from Arlodotexe/fix/pubsub-fixups
Browse files Browse the repository at this point in the history
Updated PubSub to align with 0.11.0
  • Loading branch information
Arlodotexe authored Dec 16, 2021
2 parents 6b38ef0 + eec74ba commit 0745920
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 61 deletions.
41 changes: 22 additions & 19 deletions src/CoreApi/PubSubApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Threading;
using System.Threading.Tasks;
using Ipfs.CoreApi;
using Multiformats.Base;

namespace Ipfs.Http
{
Expand Down Expand Up @@ -46,44 +47,45 @@ internal PubSubApi(IpfsClient ipfs)
{
var url = new StringBuilder();
url.Append("/api/v0/pubsub/pub");
url.Append("?arg=");
url.Append(System.Net.WebUtility.UrlEncode(topic));
url.Append("&arg=");
var data = Encoding.ASCII.GetString(System.Net.WebUtility.UrlEncodeToBytes(message, 0, message.Length));
url.Append(data);
return ipfs.DoCommandAsync(new Uri(ipfs.ApiUri, url.ToString()), cancel);
url.Append("?arg=u");
url.Append(Multibase.Encode(MultibaseEncoding.Base64Url, Encoding.UTF8.GetBytes(topic)));

return ipfs.DoCommandAsync(new Uri(ipfs.ApiUri, url.ToString()), message, cancel);
}

public Task PublishAsync(string topic, Stream message, CancellationToken cancel = default(CancellationToken))
{
using (MemoryStream ms = new MemoryStream())
{
message.CopyTo(ms);
return PublishAsync(topic, ms.ToArray(), cancel);
}
var url = new StringBuilder();
url.Append("/api/v0/pubsub/pub");
url.Append("?arg=u");
url.Append(Multibase.Encode(MultibaseEncoding.Base64Url, Encoding.UTF8.GetBytes(topic)));

return ipfs.DoCommandAsync(new Uri(ipfs.ApiUri, url.ToString()), message, cancel);
}

public async Task PublishAsync(string topic, string message, CancellationToken cancel = default(CancellationToken))
public Task PublishAsync(string topic, string message, CancellationToken cancel = default(CancellationToken))
{
var _ = await ipfs.DoCommandAsync("pubsub/pub", cancel, topic, "arg=" + message);
return;
var url = new StringBuilder();
url.Append("/api/v0/pubsub/pub");
url.Append("?arg=u");
url.Append(Multibase.Encode(MultibaseEncoding.Base64Url, Encoding.UTF8.GetBytes(topic)));

return ipfs.DoCommandAsync(new Uri(ipfs.ApiUri, url.ToString()), message, cancel);
}

public async Task SubscribeAsync(string topic, Action<IPublishedMessage> handler, CancellationToken cancellationToken)
{
var messageStream = await ipfs.PostDownloadAsync("pubsub/sub", cancellationToken, topic);
var messageStream = await ipfs.PostDownloadAsync("pubsub/sub", cancellationToken, $"u{Multibase.Encode(MultibaseEncoding.Base64Url, Encoding.UTF8.GetBytes(topic))}");
var sr = new StreamReader(messageStream);

#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Task.Run(() => ProcessMessages(topic, handler, sr, cancellationToken));
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
_ = Task.Run(() => ProcessMessages(topic, handler, sr, cancellationToken));

return;
}

void ProcessMessages(string topic, Action<PublishedMessage> handler, StreamReader sr, CancellationToken ct)
{
log.DebugFormat("Start listening for '{0}' messages", topic);
log.DebugFormat($"Start listening for '{topic}' messages");

// .Net needs a ReadLine(CancellationToken)
// As a work-around, we register a function to close the stream
Expand All @@ -95,6 +97,7 @@ void ProcessMessages(string topic, Action<PublishedMessage> handler, StreamReade
var json = sr.ReadLine();
if (json == null)
break;

if (log.IsDebugEnabled)
log.DebugFormat("PubSub message {0}", json);

Expand Down
50 changes: 44 additions & 6 deletions src/IpfsClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public partial class IpfsClient : ICoreApi
/// The environment variable "IpfsHttpApi" overrides this value.
/// </remarks>
public static Uri DefaultApiUri = new Uri(
Environment.GetEnvironmentVariable("IpfsHttpApi")
Environment.GetEnvironmentVariable("IpfsHttpApi")
?? "http://localhost:5001");

/// <summary>
Expand Down Expand Up @@ -95,7 +95,7 @@ public IpfsClient(string host)
{
ApiUri = new Uri(host);
}

/// <summary>
/// The URL to the IPFS API server. The default is "http://localhost:5001".
/// </summary>
Expand Down Expand Up @@ -230,7 +230,7 @@ HttpClient Api()
}
api = new HttpClient(handler)
{
Timeout = System.Threading.Timeout.InfiniteTimeSpan
Timeout = Timeout.InfiniteTimeSpan
};
api.DefaultRequestHeaders.Add("User-Agent", UserAgent);
}
Expand Down Expand Up @@ -276,16 +276,41 @@ public async Task<string> DoCommandAsync(string command, CancellationToken cance
}
}

internal async Task DoCommandAsync(Uri url, CancellationToken cancel)
internal Task DoCommandAsync(Uri url, CancellationToken cancel)
{
return DoCommandAsync(url, (HttpContent)null, cancel);
}

internal Task DoCommandAsync(Uri url, byte[] bytes, CancellationToken cancel)
{
return DoCommandAsync(url, new ByteArrayContent(bytes), cancel);
}

internal Task DoCommandAsync(Uri url, Stream stream, CancellationToken cancel)
{
return DoCommandAsync(url, new StreamContent(stream), cancel);
}

internal Task DoCommandAsync(Uri url, string str, CancellationToken cancel)
{
return DoCommandAsync(url, new StringContent(str), cancel);
}

internal async Task DoCommandAsync(Uri url, HttpContent content, CancellationToken cancel)
{
if (log.IsDebugEnabled)
log.Debug("POST " + url.ToString());
using (var response = await Api().PostAsync(url, null, cancel))

using (var response = await Api().PostAsync(url, new MultipartFormDataContent
{
{content, "\"file\""}
}, cancel))
{
await ThrowOnErrorAsync(response);
var body = await response.Content.ReadAsStringAsync();
if (log.IsDebugEnabled)
log.Debug("RSP " + body);

return;
}
}
Expand Down Expand Up @@ -354,9 +379,10 @@ public async Task<Stream> PostDownloadAsync(string command, CancellationToken ca
var url = BuildCommand(command, arg, options);
if (log.IsDebugEnabled)
log.Debug("POST " + url.ToString());
var request = new HttpRequestMessage(HttpMethod.Post, url);

var request = new HttpRequestMessage(HttpMethod.Post, url);
var response = await Api().SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancel);

await ThrowOnErrorAsync(response);
return await response.Content.ReadAsStreamAsync();
}
Expand Down Expand Up @@ -389,6 +415,7 @@ public async Task<Stream> DownloadAsync(string command, CancellationToken cancel
var url = BuildCommand(command, arg, options);
if (log.IsDebugEnabled)
log.Debug("GET " + url.ToString());

var response = await Api().GetAsync(url, HttpCompletionOption.ResponseHeadersRead, cancel);
await ThrowOnErrorAsync(response);
return await response.Content.ReadAsStreamAsync();
Expand Down Expand Up @@ -422,6 +449,7 @@ public async Task<byte[]> DownloadBytesAsync(string command, CancellationToken c
var url = BuildCommand(command, arg, options);
if (log.IsDebugEnabled)
log.Debug("GET " + url.ToString());

var response = await Api().GetAsync(url, HttpCompletionOption.ResponseHeadersRead, cancel);
await ThrowOnErrorAsync(response);
return await response.Content.ReadAsByteArrayAsync();
Expand Down Expand Up @@ -460,6 +488,7 @@ public async Task<String> UploadAsync(string command, CancellationToken cancel,
var content = new MultipartFormDataContent();
var streamContent = new StreamContent(data);
streamContent.Headers.ContentType = new MediaTypeHeaderValue("application/octet-stream");

if (string.IsNullOrEmpty(name))
content.Add(streamContent, "file", unknownFilename);
else
Expand All @@ -468,12 +497,14 @@ public async Task<String> UploadAsync(string command, CancellationToken cancel,
var url = BuildCommand(command, null, options);
if (log.IsDebugEnabled)
log.Debug("POST " + url.ToString());

using (var response = await Api().PostAsync(url, content, cancel))
{
await ThrowOnErrorAsync(response);
var json = await response.Content.ReadAsStringAsync();
if (log.IsDebugEnabled)
log.Debug("RSP " + json);

return json;
}
}
Expand Down Expand Up @@ -510,6 +541,7 @@ public async Task<Stream> Upload2Async(string command, CancellationToken cancel,
var content = new MultipartFormDataContent();
var streamContent = new StreamContent(data);
streamContent.Headers.ContentType = new MediaTypeHeaderValue("application/octet-stream");

if (string.IsNullOrEmpty(name))
content.Add(streamContent, "file", unknownFilename);
else
Expand All @@ -518,6 +550,7 @@ public async Task<Stream> Upload2Async(string command, CancellationToken cancel,
var url = BuildCommand(command, null, options);
if (log.IsDebugEnabled)
log.Debug("POST " + url.ToString());

var response = await Api().PostAsync(url, content, cancel);
await ThrowOnErrorAsync(response);
return await response.Content.ReadAsStreamAsync();
Expand All @@ -536,12 +569,14 @@ public async Task<String> UploadAsync(string command, CancellationToken cancel,
var url = BuildCommand(command, null, options);
if (log.IsDebugEnabled)
log.Debug("POST " + url.ToString());

using (var response = await Api().PostAsync(url, content, cancel))
{
await ThrowOnErrorAsync(response);
var json = await response.Content.ReadAsStringAsync();
if (log.IsDebugEnabled)
log.Debug("RSP " + json);

return json;
}
}
Expand All @@ -561,17 +596,20 @@ async Task<bool> ThrowOnErrorAsync(HttpResponseMessage response)
{
if (response.IsSuccessStatusCode)
return true;

if (response.StatusCode == HttpStatusCode.NotFound)
{
var error = "Invalid IPFS command: " + response.RequestMessage.RequestUri.ToString();
if (log.IsDebugEnabled)
log.Debug("ERR " + error);

throw new HttpRequestException(error);
}

var body = await response.Content.ReadAsStringAsync();
if (log.IsDebugEnabled)
log.Debug("ERR " + body);

string message = body;
try
{
Expand Down
63 changes: 32 additions & 31 deletions src/IpfsHttpClient.csproj
Original file line number Diff line number Diff line change
@@ -1,36 +1,37 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>netstandard14;netstandard2;net45</TargetFrameworks>
<AssemblyName>Ipfs.Http.Client</AssemblyName>
<RootNamespace>Ipfs.Http</RootNamespace>
<DocumentationFile>bin\$(Configuration)\$(TargetFramework)\$(AssemblyName).xml</DocumentationFile>
<DebugType>full</DebugType>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<PropertyGroup>
<TargetFrameworks>netstandard2;</TargetFrameworks>
<AssemblyName>Ipfs.Http.Client</AssemblyName>
<RootNamespace>Ipfs.Http</RootNamespace>
<DocumentationFile>bin\$(Configuration)\$(TargetFramework)\$(AssemblyName).xml</DocumentationFile>
<DebugType>full</DebugType>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>

<!-- developer build is always 0.42 -->
<AssemblyVersion>0.42</AssemblyVersion>
<Version>0.42</Version>

<!-- Nuget specs -->
<PackageId>Ipfs.Http.Client</PackageId>
<Authors>Richard Schneider</Authors>
<Title>IPFS HTTP Client</Title>
<Description> Provides .Net client access to the InterPlanetary File System.</Description>
<PackageRequireLicenseAcceptance>false</PackageRequireLicenseAcceptance>
<PackageReleaseNotes>https://github.com/richardschneider/net-ipfs-http-client/releases</PackageReleaseNotes>
<Copyright>© 2015-2019 Richard Schneider</Copyright>
<PackageTags>ipfs peer-to-peer distributed file-system</PackageTags>
<IncludeSymbols>True</IncludeSymbols>
<PackageProjectUrl>https://github.com/richardschneider/net-ipfs-http-client</PackageProjectUrl>
<PackageIconUrl>https://raw.githubusercontent.com/richardschneider/net-ipfs-core/master/doc/images/ipfs-cs-logo-64x64.png</PackageIconUrl>
</PropertyGroup>
<!-- developer build is always 0.42 -->
<AssemblyVersion>0.42</AssemblyVersion>
<Version>0.42</Version>

<!-- Nuget specs -->
<PackageId>Ipfs.Http.Client</PackageId>
<Authors>Richard Schneider</Authors>
<Title>IPFS HTTP Client</Title>
<Description> Provides .Net client access to the InterPlanetary File System.</Description>
<PackageRequireLicenseAcceptance>false</PackageRequireLicenseAcceptance>
<PackageReleaseNotes>https://github.com/richardschneider/net-ipfs-http-client/releases</PackageReleaseNotes>
<Copyright>© 2015-2019 Richard Schneider</Copyright>
<PackageTags>ipfs peer-to-peer distributed file-system</PackageTags>
<IncludeSymbols>True</IncludeSymbols>
<PackageProjectUrl>https://github.com/richardschneider/net-ipfs-http-client</PackageProjectUrl>
<PackageIconUrl>https://raw.githubusercontent.com/richardschneider/net-ipfs-core/master/doc/images/ipfs-cs-logo-64x64.png</PackageIconUrl>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Ipfs.Core" Version="0.55.0" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.2" />
<PackageReference Include="System.Net.Http" Version="4.3.3" Condition="'$(TargetFramework)' == 'netstandard14'" />
<PackageReference Include="System.Net.Http" Version="4.3.3" Condition="'$(TargetFramework)' == 'net45'" />
<PackageReference Include="Multiformats.Base" Version="2.0.2"/>
</ItemGroup>

<ItemGroup>
<PackageReference Include="Ipfs.Core" Version="0.55.0" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.2" />
<PackageReference Include="System.Net.Http" Version="4.3.3" Condition="'$(TargetFramework)' == 'netstandard14'" />
<PackageReference Include="System.Net.Http" Version="4.3.3" Condition="'$(TargetFramework)' == 'net45'" />
</ItemGroup>

</Project>
13 changes: 8 additions & 5 deletions src/PublishedMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
using System.IO;
using System.Text;
using System.Runtime.Serialization;
using Multiformats.Base;

namespace Ipfs.Http
{
/// <summary>
/// A published message.
/// </summary>
/// <remarks>
/// The <see cref="PubSubApi"/> is used to publish and subsribe to a message.
/// The <see cref="PubSubApi"/> is used to publish and subscribe to a message.
/// </remarks>
[DataContract]
public class PublishedMessage : IPublishedMessage
Expand All @@ -27,11 +28,13 @@ public class PublishedMessage : IPublishedMessage
public PublishedMessage(string json)
{
var o = JObject.Parse(json);
this.Sender = Convert.FromBase64String((string)o["from"]).ToBase58();
this.SequenceNumber = Convert.FromBase64String((string)o["seqno"]);
this.DataBytes = Convert.FromBase64String((string)o["data"]);

this.Sender = (string)o["from"];
this.SequenceNumber = Multibase.Decode((string)o["seqno"], out MultibaseEncoding _);
this.DataBytes = Multibase.Decode((string)o["data"], out MultibaseEncoding _);

var topics = (JArray) (o["topicIDs"]);
this.Topics = topics.Select(t => (string)t);
this.Topics = topics.Select(t => Encoding.UTF8.GetString(Multibase.Decode((string)t, out MultibaseEncoding _)));
}

/// <inheritdoc />
Expand Down

0 comments on commit 0745920

Please sign in to comment.