diff --git a/.github/workflows/publish_docker_image.yaml b/.github/workflows/publish_docker_image.yaml index 7ee34df..3ea5a65 100644 --- a/.github/workflows/publish_docker_image.yaml +++ b/.github/workflows/publish_docker_image.yaml @@ -6,7 +6,7 @@ on: - rc-* - hotfix/* - release/* - - upgrade/libplanet-4.0 + - feature/issue-117 env: DOCKER_REPO: planetariumhq/market-service jobs: diff --git a/MarketService.Tests/RpcClientTest.cs b/MarketService.Tests/RpcClientTest.cs index a565771..3e9e582 100644 --- a/MarketService.Tests/RpcClientTest.cs +++ b/MarketService.Tests/RpcClientTest.cs @@ -8,6 +8,7 @@ using Bencodex.Types; using Grpc.Core; using Lib9c.Model.Order; +using Lib9c.Renderers; using Libplanet.Action.State; using Libplanet.Crypto; using Libplanet.Types.Assets; @@ -329,10 +330,15 @@ public RpcClientTest(ITestOutputHelper output) .UseLowerCaseNamingConvention().Options, new DbContextFactorySource()); #pragma warning restore EF1001 var rpcConfigOptions = new RpcConfigOptions {Host = "localhost", Port = 5000}; - var receiver = new Receiver(new Logger(new LoggerFactory())); + var workerOptions = new WorkerOptions + { + SyncProduct = false, + SyncShop = false, + }; + var receiver = new Receiver(new Logger(new LoggerFactory()), new ActionRenderer(), new OptionsWrapper(workerOptions)); using var logger = _output.BuildLoggerFor(); _client = new TestClient(new OptionsWrapper(rpcConfigOptions), - logger, receiver, _contextFactory, _testService); + logger, receiver, _contextFactory, _testService, new ActionRenderer()); } [Theory] @@ -668,8 +674,8 @@ public async Task UpdateProducts(bool legacy) private class TestClient : RpcClient { public TestClient(IOptions options, ILogger logger, Receiver receiver, - IDbContextFactory contextFactory, TestService service) : base(options, logger, receiver, - contextFactory) + IDbContextFactory contextFactory, TestService service, ActionRenderer renderer) : base(options, logger, receiver, + contextFactory, renderer) { Service = service; var path = "../../../genesis"; diff --git a/MarketService/MarketService.csproj b/MarketService/MarketService.csproj index 33b45f5..ec29c26 100644 --- a/MarketService/MarketService.csproj +++ b/MarketService/MarketService.csproj @@ -31,6 +31,7 @@ + diff --git a/MarketService/ProductWorker.cs b/MarketService/ProductWorker.cs index 59a6d36..a62870a 100644 --- a/MarketService/ProductWorker.cs +++ b/MarketService/ProductWorker.cs @@ -54,7 +54,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) stopWatch.Stop(); var ts = stopWatch.Elapsed; _logger.LogInformation("[ProductWorker]Complete sync product on {BlockIndex}. {TotalElapsed}", _rpcClient.Tip.Index, ts); - await Task.Delay(8000, stoppingToken); + await Task.Delay(60000 * 5, stoppingToken); } } } diff --git a/MarketService/Program.cs b/MarketService/Program.cs index bfd764b..2e1f2bd 100644 --- a/MarketService/Program.cs +++ b/MarketService/Program.cs @@ -1,4 +1,8 @@ using System.Text.Json.Serialization; +using Lib9c.Formatters; +using Lib9c.Renderers; +using MessagePack; +using MessagePack.Resolvers; using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Diagnostics; @@ -64,31 +68,43 @@ public void ConfigureServices(IServiceCollection services) .UseLowerCaseNamingConvention() .ConfigureWarnings(w => w.Throw(RelationalEventId.MultipleCollectionIncludeWarning)) ); - services.AddSingleton(); - services.AddSingleton(); - services.AddHostedService(); + services.AddMvc() + .AddJsonOptions( + options => { options.JsonSerializerOptions.ReferenceHandler = ReferenceHandler.IgnoreCycles; } + ); + var healthChecksBuilder = services.AddHealthChecks() + .AddDbContextCheck(); + WorkerOptions workerOptions = new(); Configuration.GetSection(WorkerOptions.WorkerConfig) .Bind(workerOptions); - if (workerOptions.SyncShop) + var writeMode = workerOptions.SyncProduct || workerOptions.SyncShop; + if (writeMode) { - services.AddHostedService(); - } + services + .AddHostedService() + .AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddHostedService(); + services.AddSingleton(); + var resolver = MessagePack.Resolvers.CompositeResolver.Create( + NineChroniclesResolver.Instance, + StandardResolver.Instance + ); + var options = MessagePackSerializerOptions.Standard.WithResolver(resolver); + MessagePackSerializer.DefaultOptions = options; + if (workerOptions.SyncShop) + { + services.AddHostedService(); + } - if (workerOptions.SyncProduct) - { - services.AddHostedService(); + if (workerOptions.SyncProduct) + { + services.AddHostedService(); + } + healthChecksBuilder.AddCheck(nameof(RpcNodeHealthCheck)); } - services.AddMvc() - .AddJsonOptions( - options => { options.JsonSerializerOptions.ReferenceHandler = ReferenceHandler.IgnoreCycles; } - ); - services - .AddHostedService() - .AddSingleton(); - services.AddHealthChecks() - .AddDbContextCheck() - .AddCheck(nameof(RpcNodeHealthCheck)); } public void Configure(IApplicationBuilder app, IWebHostEnvironment env) diff --git a/MarketService/Receiver.cs b/MarketService/Receiver.cs index 2514d46..c0008aa 100644 --- a/MarketService/Receiver.cs +++ b/MarketService/Receiver.cs @@ -1,6 +1,11 @@ +using System.IO.Compression; using Bencodex; using Bencodex.Types; +using Lib9c.Renderers; using Libplanet.Types.Blocks; +using MessagePack; +using Microsoft.Extensions.Options; +using Nekoyume.Action; using Nekoyume.Shared.Hubs; namespace MarketService; @@ -11,14 +16,36 @@ public class Receiver : IActionEvaluationHubReceiver public Block PreviousTip; private readonly ILogger _logger; private readonly Codec _codec = new Codec(); + private readonly ActionRenderer _actionRenderer; + private readonly WorkerOptions _workerOptions; - public Receiver(ILogger logger) + public Receiver(ILogger logger, ActionRenderer actionRenderer, IOptions options) { _logger = logger; + _actionRenderer = actionRenderer; + _workerOptions = options.Value; } public void OnRender(byte[] evaluation) { + if (_workerOptions.SyncProduct || _workerOptions.SyncShop) + { + using (var cp = new MemoryStream(evaluation)) + { + using (var decompressed = new MemoryStream()) + { + using (var df = new DeflateStream(cp, CompressionMode.Decompress)) + { + df.CopyTo(decompressed); + decompressed.Seek(0, SeekOrigin.Begin); + var dec = decompressed.ToArray(); + var ev = MessagePackSerializer.Deserialize(dec) + .ToActionEvaluation(); + _actionRenderer.ActionRenderSubject.OnNext(ev); + } + } + } + } } public void OnUnrender(byte[] evaluation) diff --git a/MarketService/RpcClient.cs b/MarketService/RpcClient.cs index 7a3e71c..b74fce5 100644 --- a/MarketService/RpcClient.cs +++ b/MarketService/RpcClient.cs @@ -5,6 +5,8 @@ using Grpc.Core; using Grpc.Net.Client; using Lib9c.Model.Order; +using Lib9c.Renderers; +using Libplanet.Action; using Libplanet.Action.State; using Libplanet.Crypto; using Libplanet.Types.Blocks; @@ -13,6 +15,7 @@ using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Options; using Nekoyume; +using Nekoyume.Action; using Nekoyume.Model.Item; using Nekoyume.Model.Market; using Nekoyume.Model.State; @@ -28,6 +31,9 @@ public class RpcClient { private const int MaxDegreeOfParallelism = 8; + /// + /// for + /// private static readonly List ShardedSubTypes = new() { ItemSubType.Weapon, @@ -61,9 +67,10 @@ public class RpcClient public Block Tip => _receiver.Tip; public Block PreviousTip => _receiver.PreviousTip; + private readonly ActionRenderer _actionRenderer; public RpcClient(IOptions options, ILogger logger, Receiver receiver, - IDbContextFactory contextFactory) + IDbContextFactory contextFactory, ActionRenderer actionRenderer) { _logger = logger; _address = new PrivateKey().Address; @@ -82,6 +89,153 @@ public RpcClient(IOptions options, ILogger logger, ); _receiver = receiver; _contextFactory = contextFactory; + _actionRenderer = actionRenderer; + _actionRenderer.ActionRenderSubject.Subscribe(RenderAction); + } + + /// + /// Insert or Update by Market related actions. + /// + /// + public async void RenderAction(ActionEvaluation ev) + { + if (ev.Exception is null) + { + var seed = ev.RandomSeed; + var random = new LocalRandom(seed); + var stateRootHash = ev.OutputState; + var hashBytes = stateRootHash.ToByteArray(); + switch (ev.Action) + { + // Insert new product + case RegisterProduct registerProduct: + { + var crystalEquipmentGrindingSheet = await GetSheet(hashBytes); + var crystalMonsterCollectionMultiplierSheet = + await GetSheet(hashBytes); + var costumeStatSheet = await GetSheet(hashBytes); + var products = new List(); + var productIds = registerProduct.RegisterInfos.Select(_ => random.GenerateRandomGuid()).ToList(); + var states = await GetProductStates(productIds, hashBytes); + foreach (var kv in states) + { + if (kv.Value is List deserialized) + { + products.Add(ProductFactory.DeserializeProduct(deserialized)); + } + } + + await InsertProducts(products, costumeStatSheet, crystalEquipmentGrindingSheet, crystalMonsterCollectionMultiplierSheet); + break; + } + // Update product exist = false + case BuyProduct buyProduct: + { + var orderIds = new List(); + var productIds = new List(); + foreach (var productInfo in buyProduct.ProductInfos) + { + if (productInfo is ItemProductInfo {Legacy: true} _) + { + orderIds.Add(productInfo.ProductId); + } + else + { + productIds.Add(productInfo.ProductId); + } + } + + var marketContext = await _contextFactory.CreateDbContextAsync(); + if (orderIds.Any()) + { + await UpdateProducts(productIds, marketContext, true); + } + + if (productIds.Any()) + { + await UpdateProducts(productIds, marketContext, false); + } + + break; + } + case CancelProductRegistration cancelProductRegistration: + { + var orderIds = new List(); + var productIds = new List(); + foreach (var productInfo in cancelProductRegistration.ProductInfos) + { + if (productInfo is ItemProductInfo {Legacy: true} _) + { + orderIds.Add(productInfo.ProductId); + } + else + { + productIds.Add(productInfo.ProductId); + } + } + + var marketContext = await _contextFactory.CreateDbContextAsync(); + if (orderIds.Any()) + { + await UpdateProducts(productIds, marketContext, true); + } + + if (productIds.Any()) + { + await UpdateProducts(productIds, marketContext, false); + } + + break; + } + // Insert new product and Update product exist = false + case ReRegisterProduct reRegisterProduct: + { + var deletedOrderIds = new List(); + var deletedProductIds = new List(); + var productIds = new List(); + foreach (var (productInfo, _) in reRegisterProduct.ReRegisterInfos) + { + if (productInfo is ItemProductInfo {Legacy: true} _) + { + deletedOrderIds.Add(productInfo.ProductId); + } + else + { + deletedProductIds.Add(productInfo.ProductId); + } + productIds.Add(random.GenerateRandomGuid()); + } + var crystalEquipmentGrindingSheet = await GetSheet(hashBytes); + var crystalMonsterCollectionMultiplierSheet = + await GetSheet(hashBytes); + var costumeStatSheet = await GetSheet(hashBytes); + var products = new List(); + var states = await GetProductStates(productIds, hashBytes); + foreach (var kv in states) + { + // check db all product ids avoid already synced products + if (kv.Value is List deserialized) + { + products.Add(ProductFactory.DeserializeProduct(deserialized)); + } + } + + await InsertProducts(products, costumeStatSheet, crystalEquipmentGrindingSheet, crystalMonsterCollectionMultiplierSheet); + var marketContext = await _contextFactory.CreateDbContextAsync(); + if (deletedOrderIds.Any()) + { + await UpdateProducts(deletedOrderIds, marketContext, true); + } + + if (deletedProductIds.Any()) + { + await UpdateProducts(deletedProductIds, marketContext, false); + } + + break; + } + } + } } public async Task StartAsync(CancellationToken stoppingToken) @@ -135,6 +289,12 @@ public async Task StopAsync(CancellationToken cancellationToken) await _hub.LeaveAsync(); } + /// + /// Get of for get registered agent addresses + /// + /// + /// + /// public async Task> GetOrderDigests(ItemSubType itemSubType, byte[] hashBytes) { while (Tip is null) await Task.Delay(100); @@ -145,7 +305,7 @@ public async Task> GetOrderDigests(ItemSubType itemSubType, by var addressList = GetShopAddress(itemSubType); var result = await Service.GetBulkStateByStateRootHash(hashBytes, ReservedAddresses.LegacyAccount.ToByteArray(), addressList); - var shopStates = GetShopStates(result); + var shopStates = DeserializeShopStates(result); foreach (var shopState in shopStates) foreach (var orderDigest in shopState.OrderDigestList) { @@ -160,6 +320,13 @@ public async Task> GetOrderDigests(ItemSubType itemSubType, by return orderDigestList; } + /// + /// Insert and Update from + /// + /// byte array from + /// + /// + /// public async Task SyncOrder(byte[] hashBytes, CrystalEquipmentGrindingSheet crystalEquipmentGrindingSheet, CrystalMonsterCollectionMultiplierSheet crystalMonsterCollectionMultiplierSheet, @@ -316,6 +483,13 @@ await InsertOrders(hashBytes, orderIds.ToList(), tradableIds, marketContext, ord _logger.LogDebug("RestoreProducts: {Ts}", sw.Elapsed); } + /// + /// Set exist = false + /// + /// + /// + /// + /// public async Task UpdateProducts(List deletedIds, MarketContext marketContext, bool legacy, bool exist = false) { @@ -331,6 +505,13 @@ await marketContext.Database.ExecuteSqlRawAsync( } } + /// + /// Create from + /// + /// byte array from + /// + /// + /// public async Task InsertOrders(byte[] hashBytes, List orderIds, List tradableIds, MarketContext marketContext, List orderDigestList, CrystalEquipmentGrindingSheet crystalEquipmentGrindingSheet, @@ -377,6 +558,13 @@ public async Task InsertOrders(byte[] hashBytes, List orderIds, List } } + /// + /// Insert and Update ProductModel from + /// + /// byte array from + /// + /// + /// public async Task SyncProduct(byte[] hashBytes, CrystalEquipmentGrindingSheet crystalEquipmentGrindingSheet, CrystalMonsterCollectionMultiplierSheet crystalMonsterCollectionMultiplierSheet, CostumeStatSheet costumeStatSheet) @@ -416,7 +604,7 @@ public async Task SyncProduct(byte[] hashBytes, CrystalEquipmentGrindingSheet cr sw.Stop(); _logger.LogDebug("[ProductWorker]Get ChunkedStates: {Elapsed}", sw.Elapsed); sw.Restart(); - var productLists = GetProductsState(productListResult); + var productLists = DeserializeProductsState(productListResult); sw.Stop(); _logger.LogDebug("[ProductWorker]Get ProductsState: {Elapsed}", sw.Elapsed); sw.Restart(); @@ -475,6 +663,13 @@ private async Task> GetProductStates(List product return result; } + /// + /// Insert from + /// + /// List of + /// + /// + /// public async Task InsertProducts(List products, CostumeStatSheet costumeStatSheet, CrystalEquipmentGrindingSheet crystalEquipmentGrindingSheet, CrystalMonsterCollectionMultiplierSheet crystalMonsterCollectionMultiplierSheet) @@ -564,31 +759,7 @@ public async Task GetBlockStateRootHashBytes() return _receiver.Tip.StateRootHash.ToByteArray(); } - public async Task> GetProductStates(IEnumerable
avatarAddressList, - byte[] hashBytes) - { - var productListAddresses = avatarAddressList.Select(a => ProductsState.DeriveAddress(a).ToByteArray()).ToList(); - var productListResult = - await GetChunkedStates(hashBytes, ReservedAddresses.LegacyAccount.ToByteArray(), productListAddresses); - var productLists = GetProductsState(productListResult); - var productIdList = productLists.SelectMany(p => p.ProductIds).ToList(); - var productIds = new Dictionary(); - foreach (var productId in productIdList) productIds[Product.DeriveAddress(productId)] = productId; - var productResult = await GetChunkedStates( - hashBytes, - ReservedAddresses.LegacyAccount.ToByteArray(), - productIds.Keys.Select(a => a.ToByteArray()).ToList()); - var result = new Dictionary(); - foreach (var kv in productResult) - { - var productId = productIds[kv.Key]; - result[productId] = kv.Value; - } - - return result; - } - - public List GetProductsState(Dictionary queryResult) + public List DeserializeProductsState(Dictionary queryResult) { var result = new List(); foreach (var kv in queryResult) @@ -610,7 +781,12 @@ public IEnumerable GetShopAddress(ItemSubType itemSubType) return new[] {ShardedShopStateV2.DeriveAddress(itemSubType, "").ToByteArray()}; } - public IEnumerable GetShopStates(Dictionary queryResult) + /// + /// Get of for listing + /// + /// + /// + public IEnumerable DeserializeShopStates(Dictionary queryResult) { var result = new List(); foreach (var kv in queryResult) @@ -622,6 +798,14 @@ public IEnumerable GetShopStates(Dictionary return result; } + /// + /// Get of for . + /// + /// + /// + /// + /// + /// public async Task> GetOrders(IEnumerable orderIds, byte[] hashBytes) { var orderAddressList = orderIds.Select(i => Order.DeriveAddress(i).ToByteArray()).ToList(); @@ -644,6 +828,14 @@ await Parallel.ForEachAsync(chunks, _parallelOptions, async (chunk, token) => return orderBag.ToList(); } + /// + /// Get of for . + /// + /// + /// + /// + /// + /// public async Task> GetItems(IEnumerable tradableIds, byte[] hashBytes) { var itemAddressList = tradableIds.Select(i => Addresses.GetItemAddress(i).ToByteArray()).ToList(); @@ -681,6 +873,13 @@ public async Task> GetStates(byte[] hashBytes, byte[ return result.ToDictionary(kv => kv.Key, kv => kv.Value); } + /// + /// GetBulkState with chunking size 1000 + /// + /// + /// + /// + /// public async Task> GetChunkedStates(byte[] hashBytes, byte[] accountBytes, List addressList) { var result = new ConcurrentDictionary(); @@ -698,6 +897,12 @@ await Parallel.ForEachAsync(chunks, _parallelOptions, async (chunk, token) => return result.ToDictionary(kv => kv.Key, kv => kv.Value); } + /// + /// Get for listing avatar addresses. + /// + /// + /// + /// public async Task> GetAgentStates(byte[] hashBytes, List addressList) { var result = new ConcurrentDictionary(); @@ -721,6 +926,11 @@ public async Task> GetAgentStates(byte[] hashByt return result.ToDictionary(kv => kv.Key, kv => kv.Value); } + /// + /// Get for listing avatar addresses. + /// + /// + /// public async Task GetMarket(byte[] hashBytes) { var marketResult = await Service.GetStateByStateRootHash( @@ -733,6 +943,12 @@ public async Task GetMarket(byte[] hashBytes) return new MarketState(); } + /// + /// Get of from avatar addresses. + /// + /// + /// + /// of public async Task> GetOrderDigests(List
avatarAddresses, byte[] hashBytes) { var digestListStateAddresses = @@ -758,4 +974,14 @@ public async Task> GetOrderDigests(List
avatarAddress }); return orderDigests.ToList(); } + + internal class LocalRandom : System.Random, IRandom + { + public int Seed { get; } + + public LocalRandom(int seed) : base(seed) + { + Seed = seed; + } + } } diff --git a/MarketService/ShopWorker.cs b/MarketService/ShopWorker.cs index 6ee2fd9..8da82b2 100644 --- a/MarketService/ShopWorker.cs +++ b/MarketService/ShopWorker.cs @@ -54,7 +54,7 @@ await _rpcClient.SyncOrder(hashBytes, stopWatch.Stop(); var ts = stopWatch.Elapsed; _logger.LogInformation("Complete sync shop on {BlockIndex}. {TotalElapsed}", _rpcClient.Tip, ts); - await Task.Delay(8000, stoppingToken); + await Task.Delay(60000 * 5, stoppingToken); } } } diff --git a/lib9c b/lib9c index fef54f5..95968df 160000 --- a/lib9c +++ b/lib9c @@ -1 +1 @@ -Subproject commit fef54f5fc2043a82cc11f8aae91507c6e2f0553a +Subproject commit 95968dfc2482cdaf21ad5fc71cf40b5f191b6d39