diff --git a/src/CoreApi/PubSubApi.cs b/src/CoreApi/PubSubApi.cs index c41e111..d55d657 100644 --- a/src/CoreApi/PubSubApi.cs +++ b/src/CoreApi/PubSubApi.cs @@ -9,6 +9,7 @@ using System.Threading; using System.Threading.Tasks; using Ipfs.CoreApi; +using Multiformats.Base; namespace Ipfs.Http { @@ -46,44 +47,45 @@ internal PubSubApi(IpfsClient ipfs) { var url = new StringBuilder(); url.Append("/api/v0/pubsub/pub"); - url.Append("?arg="); - url.Append(System.Net.WebUtility.UrlEncode(topic)); - url.Append("&arg="); - var data = Encoding.ASCII.GetString(System.Net.WebUtility.UrlEncodeToBytes(message, 0, message.Length)); - url.Append(data); - return ipfs.DoCommandAsync(new Uri(ipfs.ApiUri, url.ToString()), cancel); + url.Append("?arg=u"); + url.Append(Multibase.Encode(MultibaseEncoding.Base64Url, Encoding.UTF8.GetBytes(topic))); + + return ipfs.DoCommandAsync(new Uri(ipfs.ApiUri, url.ToString()), message, cancel); } public Task PublishAsync(string topic, Stream message, CancellationToken cancel = default(CancellationToken)) { - using (MemoryStream ms = new MemoryStream()) - { - message.CopyTo(ms); - return PublishAsync(topic, ms.ToArray(), cancel); - } + var url = new StringBuilder(); + url.Append("/api/v0/pubsub/pub"); + url.Append("?arg=u"); + url.Append(Multibase.Encode(MultibaseEncoding.Base64Url, Encoding.UTF8.GetBytes(topic))); + + return ipfs.DoCommandAsync(new Uri(ipfs.ApiUri, url.ToString()), message, cancel); } - public async Task PublishAsync(string topic, string message, CancellationToken cancel = default(CancellationToken)) + public Task PublishAsync(string topic, string message, CancellationToken cancel = default(CancellationToken)) { - var _ = await ipfs.DoCommandAsync("pubsub/pub", cancel, topic, "arg=" + message); - return; + var url = new StringBuilder(); + url.Append("/api/v0/pubsub/pub"); + url.Append("?arg=u"); + url.Append(Multibase.Encode(MultibaseEncoding.Base64Url, Encoding.UTF8.GetBytes(topic))); + + return ipfs.DoCommandAsync(new Uri(ipfs.ApiUri, url.ToString()), message, cancel); } public async Task SubscribeAsync(string topic, Action handler, CancellationToken cancellationToken) { - var messageStream = await ipfs.PostDownloadAsync("pubsub/sub", cancellationToken, topic); + var messageStream = await ipfs.PostDownloadAsync("pubsub/sub", cancellationToken, $"u{Multibase.Encode(MultibaseEncoding.Base64Url, Encoding.UTF8.GetBytes(topic))}"); var sr = new StreamReader(messageStream); -#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - Task.Run(() => ProcessMessages(topic, handler, sr, cancellationToken)); -#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + _ = Task.Run(() => ProcessMessages(topic, handler, sr, cancellationToken)); return; } void ProcessMessages(string topic, Action handler, StreamReader sr, CancellationToken ct) { - log.DebugFormat("Start listening for '{0}' messages", topic); + log.DebugFormat($"Start listening for '{topic}' messages"); // .Net needs a ReadLine(CancellationToken) // As a work-around, we register a function to close the stream @@ -95,6 +97,7 @@ void ProcessMessages(string topic, Action handler, StreamReade var json = sr.ReadLine(); if (json == null) break; + if (log.IsDebugEnabled) log.DebugFormat("PubSub message {0}", json); diff --git a/src/IpfsClient.cs b/src/IpfsClient.cs index 0b8ce55..5a69f1e 100644 --- a/src/IpfsClient.cs +++ b/src/IpfsClient.cs @@ -45,7 +45,7 @@ public partial class IpfsClient : ICoreApi /// The environment variable "IpfsHttpApi" overrides this value. /// public static Uri DefaultApiUri = new Uri( - Environment.GetEnvironmentVariable("IpfsHttpApi") + Environment.GetEnvironmentVariable("IpfsHttpApi") ?? "http://localhost:5001"); /// @@ -95,7 +95,7 @@ public IpfsClient(string host) { ApiUri = new Uri(host); } - + /// /// The URL to the IPFS API server. The default is "http://localhost:5001". /// @@ -230,7 +230,7 @@ HttpClient Api() } api = new HttpClient(handler) { - Timeout = System.Threading.Timeout.InfiniteTimeSpan + Timeout = Timeout.InfiniteTimeSpan }; api.DefaultRequestHeaders.Add("User-Agent", UserAgent); } @@ -276,16 +276,41 @@ public async Task DoCommandAsync(string command, CancellationToken cance } } - internal async Task DoCommandAsync(Uri url, CancellationToken cancel) + internal Task DoCommandAsync(Uri url, CancellationToken cancel) + { + return DoCommandAsync(url, (HttpContent)null, cancel); + } + + internal Task DoCommandAsync(Uri url, byte[] bytes, CancellationToken cancel) + { + return DoCommandAsync(url, new ByteArrayContent(bytes), cancel); + } + + internal Task DoCommandAsync(Uri url, Stream stream, CancellationToken cancel) + { + return DoCommandAsync(url, new StreamContent(stream), cancel); + } + + internal Task DoCommandAsync(Uri url, string str, CancellationToken cancel) + { + return DoCommandAsync(url, new StringContent(str), cancel); + } + + internal async Task DoCommandAsync(Uri url, HttpContent content, CancellationToken cancel) { if (log.IsDebugEnabled) log.Debug("POST " + url.ToString()); - using (var response = await Api().PostAsync(url, null, cancel)) + + using (var response = await Api().PostAsync(url, new MultipartFormDataContent + { + {content, "\"file\""} + }, cancel)) { await ThrowOnErrorAsync(response); var body = await response.Content.ReadAsStringAsync(); if (log.IsDebugEnabled) log.Debug("RSP " + body); + return; } } @@ -354,9 +379,10 @@ public async Task PostDownloadAsync(string command, CancellationToken ca var url = BuildCommand(command, arg, options); if (log.IsDebugEnabled) log.Debug("POST " + url.ToString()); - var request = new HttpRequestMessage(HttpMethod.Post, url); + var request = new HttpRequestMessage(HttpMethod.Post, url); var response = await Api().SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancel); + await ThrowOnErrorAsync(response); return await response.Content.ReadAsStreamAsync(); } @@ -389,6 +415,7 @@ public async Task DownloadAsync(string command, CancellationToken cancel var url = BuildCommand(command, arg, options); if (log.IsDebugEnabled) log.Debug("GET " + url.ToString()); + var response = await Api().GetAsync(url, HttpCompletionOption.ResponseHeadersRead, cancel); await ThrowOnErrorAsync(response); return await response.Content.ReadAsStreamAsync(); @@ -422,6 +449,7 @@ public async Task DownloadBytesAsync(string command, CancellationToken c var url = BuildCommand(command, arg, options); if (log.IsDebugEnabled) log.Debug("GET " + url.ToString()); + var response = await Api().GetAsync(url, HttpCompletionOption.ResponseHeadersRead, cancel); await ThrowOnErrorAsync(response); return await response.Content.ReadAsByteArrayAsync(); @@ -460,6 +488,7 @@ public async Task UploadAsync(string command, CancellationToken cancel, var content = new MultipartFormDataContent(); var streamContent = new StreamContent(data); streamContent.Headers.ContentType = new MediaTypeHeaderValue("application/octet-stream"); + if (string.IsNullOrEmpty(name)) content.Add(streamContent, "file", unknownFilename); else @@ -468,12 +497,14 @@ public async Task UploadAsync(string command, CancellationToken cancel, var url = BuildCommand(command, null, options); if (log.IsDebugEnabled) log.Debug("POST " + url.ToString()); + using (var response = await Api().PostAsync(url, content, cancel)) { await ThrowOnErrorAsync(response); var json = await response.Content.ReadAsStringAsync(); if (log.IsDebugEnabled) log.Debug("RSP " + json); + return json; } } @@ -510,6 +541,7 @@ public async Task Upload2Async(string command, CancellationToken cancel, var content = new MultipartFormDataContent(); var streamContent = new StreamContent(data); streamContent.Headers.ContentType = new MediaTypeHeaderValue("application/octet-stream"); + if (string.IsNullOrEmpty(name)) content.Add(streamContent, "file", unknownFilename); else @@ -518,6 +550,7 @@ public async Task Upload2Async(string command, CancellationToken cancel, var url = BuildCommand(command, null, options); if (log.IsDebugEnabled) log.Debug("POST " + url.ToString()); + var response = await Api().PostAsync(url, content, cancel); await ThrowOnErrorAsync(response); return await response.Content.ReadAsStreamAsync(); @@ -536,12 +569,14 @@ public async Task UploadAsync(string command, CancellationToken cancel, var url = BuildCommand(command, null, options); if (log.IsDebugEnabled) log.Debug("POST " + url.ToString()); + using (var response = await Api().PostAsync(url, content, cancel)) { await ThrowOnErrorAsync(response); var json = await response.Content.ReadAsStringAsync(); if (log.IsDebugEnabled) log.Debug("RSP " + json); + return json; } } @@ -561,17 +596,20 @@ async Task ThrowOnErrorAsync(HttpResponseMessage response) { if (response.IsSuccessStatusCode) return true; + if (response.StatusCode == HttpStatusCode.NotFound) { var error = "Invalid IPFS command: " + response.RequestMessage.RequestUri.ToString(); if (log.IsDebugEnabled) log.Debug("ERR " + error); + throw new HttpRequestException(error); } var body = await response.Content.ReadAsStringAsync(); if (log.IsDebugEnabled) log.Debug("ERR " + body); + string message = body; try { diff --git a/src/IpfsHttpClient.csproj b/src/IpfsHttpClient.csproj index 6501f2f..df78e96 100644 --- a/src/IpfsHttpClient.csproj +++ b/src/IpfsHttpClient.csproj @@ -1,36 +1,37 @@  - - netstandard14;netstandard2;net45 - Ipfs.Http.Client - Ipfs.Http - bin\$(Configuration)\$(TargetFramework)\$(AssemblyName).xml - full - true + + netstandard2; + Ipfs.Http.Client + Ipfs.Http + bin\$(Configuration)\$(TargetFramework)\$(AssemblyName).xml + full + true - - 0.42 - 0.42 - - - Ipfs.Http.Client - Richard Schneider - IPFS HTTP Client - Provides .Net client access to the InterPlanetary File System. - false - https://github.com/richardschneider/net-ipfs-http-client/releases - © 2015-2019 Richard Schneider - ipfs peer-to-peer distributed file-system - True - https://github.com/richardschneider/net-ipfs-http-client - https://raw.githubusercontent.com/richardschneider/net-ipfs-core/master/doc/images/ipfs-cs-logo-64x64.png - + + 0.42 + 0.42 + + + Ipfs.Http.Client + Richard Schneider + IPFS HTTP Client + Provides .Net client access to the InterPlanetary File System. + false + https://github.com/richardschneider/net-ipfs-http-client/releases + © 2015-2019 Richard Schneider + ipfs peer-to-peer distributed file-system + True + https://github.com/richardschneider/net-ipfs-http-client + https://raw.githubusercontent.com/richardschneider/net-ipfs-core/master/doc/images/ipfs-cs-logo-64x64.png + + + + + + + + + - - - - - - - diff --git a/src/PublishedMessage.cs b/src/PublishedMessage.cs index b14c6de..adc91fc 100644 --- a/src/PublishedMessage.cs +++ b/src/PublishedMessage.cs @@ -5,6 +5,7 @@ using System.IO; using System.Text; using System.Runtime.Serialization; +using Multiformats.Base; namespace Ipfs.Http { @@ -12,7 +13,7 @@ namespace Ipfs.Http /// A published message. /// /// - /// The is used to publish and subsribe to a message. + /// The is used to publish and subscribe to a message. /// [DataContract] public class PublishedMessage : IPublishedMessage @@ -27,11 +28,13 @@ public class PublishedMessage : IPublishedMessage public PublishedMessage(string json) { var o = JObject.Parse(json); - this.Sender = Convert.FromBase64String((string)o["from"]).ToBase58(); - this.SequenceNumber = Convert.FromBase64String((string)o["seqno"]); - this.DataBytes = Convert.FromBase64String((string)o["data"]); + + this.Sender = (string)o["from"]; + this.SequenceNumber = Multibase.Decode((string)o["seqno"], out MultibaseEncoding _); + this.DataBytes = Multibase.Decode((string)o["data"], out MultibaseEncoding _); + var topics = (JArray) (o["topicIDs"]); - this.Topics = topics.Select(t => (string)t); + this.Topics = topics.Select(t => Encoding.UTF8.GetString(Multibase.Decode((string)t, out MultibaseEncoding _))); } ///